Introduction to Rx
Kindle edition (2012)
Practical Rx Training
London 6-7 October 2015
Presented by the author of IntroToRx.com
Leaving the monad
An observable sequence is a useful construct, especially when we have the power of LINQ to compose complex queries over it. Even though we recognize the benefits of the observable sequence, sometimes it is required to leave the IObservable<T> paradigm for another paradigm, maybe to enable you to integrate with an existing API (i.e. use events or Task<T>). You might leave the observable paradigm if you find it easier for testing, or it may simply be easier for you to learn Rx by moving between an observable paradigm and a more familiar one.
What is a monad
We have casually referred to the term monad earlier in the book, but to most it will be a very foreign term. I am going to try to avoid overcomplicating what a monad is, but give enough of an explanation to help us out with our next category of methods. The full definition of a monad is quite abstract. Many others have tried to provide their definition of a monad using all sorts of metaphors from astronauts to Alice in Wonderland. Many of the tutorials for monadic programming use Haskell for the code examples which can add to the confusion. For us, a monad is effectively a programming structure that represents computations. Compare this to other programming structures:
- Data structure
- Purely state e.g. a List, a Tree or a Tuple
- Contract definition or abstract functionality e.g. an interface or abstract class
- Object-Orientated structure
- State and behavior together
Generally a monadic structure allows you to chain together operators to produce a pipeline, just as we do with our extension methods.Monads are a kind of abstract data type constructor that encapsulate program logic instead of data in the domain model.
This neat definition of a monad lifted from Wikipedia allows us to start viewing sequences as monads; the abstract data type in this case is the IObservable<T> type. When we use an observable sequence, we compose functions onto the abstract data type (the IObservable<T>) to create a query. This query becomes our encapsulated programming logic.
The use of monads to define control flows is particularly useful when dealing with typically troublesome areas of programming such as IO, concurrency and exceptions. This just happens to be some of Rx's strong points!
Why leave the monad?
There is a variety of reasons you may want to consume an observable sequence in a different paradigm. Libraries that need to expose functionality externally may be required to present it as events or as Task instances. In demonstration and sample code you may prefer to use blocking methods to limit the number of asynchronous moving parts. This may help make the learning curve to Rx a little less steep!
In production code, it is rarely advised to 'break the monad', especially moving from an observable sequence to blocking methods. Switching between asynchronous and synchronous paradigms should be done with caution, as this is a common root cause for concurrency problems such as deadlock and scalability issues.
In this chapter, we will look at the methods in Rx which allow you to leave the IObservable<T> monad.
The ForEach method provides a way to process elements as they are received. The key difference between ForEach and Subscribe is that ForEach will block the current thread until the sequence completes.
Note that the completed line is last, as you would expect. To be clear, you can get similar functionality from the Subscribe extension method, but the Subscribe method will not block. So if we substitute the call to ForEach with a call to Subscribe, we will see the completed line happen first.
Unlike the Subscribe extension method, ForEach has only the one
overload; the one that take an Action<T> as its single argument.
In contrast, previous (pre-release) versions of Rx, the ForEach method
had most of the same overloads as Subscribe. Those overloads of ForEach
have been deprecated, and I think rightly so. There is no need to have an
handler in a synchronous call, it is unnecessary. You can just place the call immediately
after the ForEach call as we have done above. Also, the
handler can now be replaced with standard Structured Exception Handling like you
would use for any other synchronous code, with a
block. This also gives symmetry to the ForEach instance method on the List<T>
The ForEach method, like its other blocking friends (First and Last etc.), should be used with care. I would leave the ForEach method for spikes, tests and demo code only. We will discuss the problems with introducing blocking calls when we look at concurrency.
An alternative way to switch out of the IObservable<T> is to call the ToEnumerable extension method. As a simple example:
The source observable sequence will be subscribed to when you start to enumerate the sequence (i.e. lazily). In contrast to the ForEach extension method, using the ToEnumerable method means you are only blocked when you try to move to the next element and it is not available. Also, if the sequence produces values faster than you consume them, they will be cached for you.
To cater for errors, you can wrap your
foreach loop in a
as you do with any other enumerable sequence:
As you are moving from a push to a pull model (non-blocking to blocking), the standard warning applies.
To a single collection
To avoid having to oscillate between push and pull, you can use one of the next four methods to get the entire list back in a single notification. They all have the same semantics, but just produce the data in a different format. They are similar to their corresponding IEnumerable<T> operators, but the return values differ in order to retain asynchronous behavior.
ToArray and ToList
Both ToArray and ToList take an observable sequence and package it into an array or an instance of List<T> respectively. Once the observable sequence completes, the array or list will be pushed as the single value of the result sequence.
As these methods still return observable sequences we can use our
handler for errors. Note that the source sequence is packaged to a single notification;
you either get the whole sequence or the error. If the source produces values
and then errors, you will not receive any of those values. All four operators (ToArray,
ToList, ToDictionary and ToLookup) handle errors like
ToDictionary and ToLookup
As an alternative to arrays and lists, Rx can package an observable sequence into a dictionary or lookup with the ToDictionary and ToLookup methods. Both methods have the same semantics as the ToArray and ToList methods, as they return a sequence with a single value and have the same error handling features.
The ToDictionary extension method overloads:
The ToLookup extension method overloads:
Both ToDictionary and ToLookup require a function that can be applied each value to get its key. In addition, the ToDictionary method overloads mandate that all keys should be unique. If a duplicate key is found, it terminate the sequence with a DuplicateKeyException. On the other hand, the ILookup<TKey, TElement> is designed to have multiple values grouped by the key. If you have many values per key, then ToLookup is probably the better option.
We have compared AsyncSubject<T> to Task<T> and even showed how to transition from a task to an observable sequence. The ToTask extension method will allow you to convert an observable sequence into a Task<T>. Like an AsyncSubject<T>, this method will ignore multiple values, only returning the last value.
This is a simple example of how the ToTask operator can be used. Note,
the ToTask method is in the
If the source sequence was to manifest error then the task would follow the error-handling semantics of tasks.
Once you have your task, you can of course engage in all the features of the TPL such as continuations.
Just as you can use an event as the source for an observable sequence with FromEventPattern, you can also make your observable sequence look like a standard .NET event with the ToEvent extension methods.
The ToEvent method returns an IEventSource<T>, which will
have a single event member on it:
When we convert the observable sequence with the ToEvent method, we can just subscribe by providing an Action<T>, which we do here with a lambda.
Note that this does not follow the standard pattern of events. Normally, when you
subscribe to an event, you need to handle the
sender and EventArgs
parameters. In the example above, we just get the value. If you want to expose your
sequence as an event that follows the standard pattern, you will need to use
The ToEventPattern will take an IObservable<EventPattern<TEventArgs>> and convert that into an IEventPatternSource<TEventArgs>. The public interface for these types is quite simple.
These look quite easy to work with. So if we create an EventArgs type and then apply a simple transform using Select, we can make a standard sequence fit the pattern.
The EventArgs type:
Now that we have a sequence that is compatible, we can use the ToEventPattern, and in turn, a standard event handler.
Now that we know how to get back into .NET events, let's take a break and remember why Rx is a better model.
- In C#, events have a curious interface. Some find the
-=operators an unnatural way to register a callback
- Events are difficult to compose
- Events do not offer the ability to be easily queried over time
- Events are a common cause of accidental memory leaks
- Events do not have a standard pattern for signaling completion
- Events provide almost no help for concurrency or multithreaded applications. For instance, raising an event on a separate thread requires you to do all of the plumbing
The set of methods we have looked at in this chapter complete the circle started in the Creating a Sequence chapter. We now have the means to enter and leave the observable sequence monad. Take care when opting in and out of the IObservable<T> monad. Doing so excessively can quickly make a mess of your code base, and may indicate a design flaw.
Additional recommended reading
|<< Back to : PART 3 - Taming the sequence||Moving on to : Advanced error handling>>|