Microsoft .NET Reactive Extensions and .NET Framework Task Parallel Library

In making parallel computing more accessible, Microsoft has
also made asynchronous execution more accessible. Nowhere is this fact more
apparent than in technologies built from the ground up on these new
asynchronous capabilities. Take, for example, Reactive Extensions (Rx);
a set of libraries built on LINQ and
the Task
Parallel Library
(TPL).

Rx’s LINQ compositional model has garnered a lot of
attention in the Rx documentation. This should be no surprise; much of what a
developer creates in Rx are LINQ expressions. As stated earlier; Task Parallel
Library (TPL) is also an important component in the Rx architecture. In fact,
TPL underlies some of Rx’s asynchronous behavior. When dealing with issues
like Thread affinity; understanding how Rx leverages TPL is essential. Using an
Rx sample application, this article will demonstrate how TPL fits within Rx.

Rx is relatively new, so here is a quick introduction.

Rx Introduction

As stated earlier Rx is a set of extensions to the .NET
Framework
for asynchronously consuming observable collections. Rx is built
on .NET Framework components like LINQ and the Task Parallel Library.

Understanding Rx begins with understanding the IObservable,
IObserver, and IEnumerable Interfaces. IEnumerable collections are consumed in
a “pull-based” fashion. For example: Foreach loop. IObservable are consumed
in a “push-based” or rather eventing fashion. The IObservable and IObserver interfaces
appear below.

public interface IObservable<out T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }

    public interface IObserver<in T>
    {
        void OnCompleted();
        void OnError(Exception error);
        void OnNext(T value);
    }

Eventing usually follows a subscribe, receive events, and
unsubscribe sort of pattern. IObservable encapsulates this in Subscribe,
OnNext, OnError methods. Wrapping something like, for example, text output
operations would allow a developer to consume text output in a LINQ query.

As I stated earlier, TPL underlies some of Rx’s asynchronous
behavior. Before showing where TPL fits into Rx, it’s important to know what
TPL is.

Task Parallel Library Overview

I think of TPL as a new set of classes that follow an
alternative model to traditional .NET Threads and Threading data structures. The core part of TPL is the Task class. Tasks are a sort of wrapper for work
to be done. Tasks are assigned a workload, which is usually a delegate or
llamda expression. Tasks are a higher level of abstraction than a Thread. A
Thread executes the underlying work, but a Task allows a developer to structure
and compose the execution of the underlying work, by invoking a Task and even
linking a Task’s completion to other Tasks.

Task execution is actually handled by a second component
called a TaskScheduler. TaskScheduler manages the collection of Threads for a
Task class workload. .NET Framework includes a default TaskSheduler, but a
developer may want to create a custom TaskScheduler to handle custom workloads.

For the most part, the TPL is hidden within Rx. In fact,
guidelines mentioned in the resources below lead a developer to believe that
concurrency features of TPL may not always be necessary. However there are
areas where TPL surfaces. As I mentioned earlier, I’m going to demonstrate
where, using some sample code.

Rx Sample

The sample application appears below.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Concurrency;
using System.Threading;
using System.Threading.Tasks;
using System.Disposables;

namespace Test.Rx.TaskScheduler
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> ob =
            Observable.CreateWithDisposable<int>(o =>
            {
                var cancel = new CancellationDisposable();

                Scheduler.NewThread.Schedule(() =>
                {
                    int i = 0;
                    while (true )
                    {
                        Thread.Sleep(200);
                        if (!cancel.Token.IsCancellationRequested) { o.OnNext(i++); }
                        else
                        {
                            Console.WriteLine("Cancel event signaled");
                            o.OnCompleted();
                            break;
                        }
                    }
                }
                );

                return cancel;
            }
            );

            IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
            Console.WriteLine("Press any key to cancel. . . ");
            Console.ReadKey();
            subscription.Dispose();
            Console.WriteLine("Press any key to quit. . . ");
            Console.ReadKey();
        }
    }
}

I’ll cover the Scheduler class later in the article. For
now, understand that Scheduler is the point where a developer can interact with
the TPL.

CancellationDisposable is a wrapper for the TPL CancellationToken. The Token property allows access to the CancellationToken. As you can see in
the code above; the llamda pauses before checking the cancellation token. A
complete discussion of CancellationTokens is beyond the scope of this article.

The sample invokes the CancellationDisposable Token property
when the user presses any key. When the Token is invoked; IsCancellationRequested
becomes true and the llamda invokes OnCompleted. OnCompleted signals the
observer that there are no more events to observe.

Rx Scheduler class is the most interesting part of the
sample and as mentioned before the point where a developer can adjust how TPL
works with Rx.

Rx Scheduler

The Scheduler class appears below.

    // Summary:
    //     Provides a set of static methods for creating Schedulers. public static class Scheduler
    {
        // Summary:
        //     Gets the scheduler that schedules work as soon as possible on the current
        //     thread. public static CurrentThreadScheduler CurrentThread { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on the current Dispatcher. public static DispatcherScheduler Dispatcher { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work immediately on the current thread. public static ImmediateScheduler Immediate { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on a new thread. public static NewThreadScheduler NewThread { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on the default Task Factory. public static TaskPoolScheduler TaskPool { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on the ThreadPool. public static ThreadPoolScheduler ThreadPool { get; }

        // Summary:
        //     Schedules action to be executed recursively. public static IDisposable Schedule(this IScheduler scheduler, Action<Action> action);
        //
        // Summary:
        //     Schedules action to be executed recursively at each dueTime. public static IDisposable Schedule(this IScheduler scheduler, Action<Action<DateTimeOffset>> action, DateTimeOffset dueTime);
        //
        // Summary:
        //     Schedules action to be executed recursively after each dueTime. public static IDisposable Schedule(this IScheduler scheduler, Action<Action<TimeSpan>> action, TimeSpan dueTime);
        //
        // Summary:
        //     Schedules action to be executed at dueTime. public static IDisposable Schedule(this IScheduler scheduler, Action action, DateTimeOffset dueTime);
    }

Each property in the Scheduler class is a portal to changing
where Rx invokes the llambda expression. Deciding which Scheduler class
property to use depends on the application.

This was a long running llambda so the sample utilized the
Scheduler that, according to documentation, creates a new Thread. Had the
Immediate or CurrentThread property been used, the application Thread would
never have been available to display or collect input.

Dispatcher can be used with Windows
Presentation Foundation
(WPF). WPF controls can only be adjusted from the
Thread they have been created in. At first this may appear to be a limitation,
but consider what would happen if two separate Threads attempted to adjust a
control at the same time.

Conclusion

Rx is a set of libraries built on LINQ and the Task Parallel
Library (TPL). The Rx Scheduler class allows a developer to plug into the
underlying TPL. Scheduler includes properties for working with Windows
Presentation Foundation (WPF) and even a custom TPL TaskScheduler.

Sources

Reactive Framework
Rx Wiki

Understanding
Tasks in .NET Framework 4.0 Task Parallel Library

More by Author

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Must Read