Rx Study Materials

I recently taught myself Rx and found the following materials are helpful in understanding the concepts and the APIs of Rx.

  • The introduction to Reactive Programming you’ve been missing
    This article explains how to think in reactive. It explains the core concepts of reactive programming with many examples. This is not-specific to Rx, but most helpful in understanding what reactive programming is.

  • Introduction to Rx
    This is a book on Rx freely available. Part 3 is most helpful as it shows how to use each operator of Rx.

  • ReactiveX
    The best way to understand a Rx operator is to draw a marble diagram. ReactiveX explains each operator with marble diagram. For example, take a look at Repeat.

  • 101 Rx Samples
    This site hosts a number of Rx operator examples.

  • Rx Workshop
    Video tutorials on Rx provided by Microsoft Channel9.

Rx RetryWithDelay extension method

Rx provides Observable.Retry which repeats the source observable sequence until it successfully terminates.

Observable.Retry is a useful combinator, but it is a bit limited when retrying an I/O request. For example, when retrying a network request, we should wait a few seconds before sending a new request.

Here is an implementation of Observable.RetryWithDelay which repeats the source observable with delay.

    public static class ObservableExtensions
    {
        public static IObservable<T> RetryWithDelay<T>(this IObservable<T> source, TimeSpan timeSpan)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (timeSpan < TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("timeSpan");
            if (timeSpan == TimeSpan.Zero)
                return source.Retry();

            return source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry());
        }
    }

After validating arguments, Observable.RetryWithDelay create a new observable sequence by concatenating the following two observables:

  • Observable.Timer(timeSpan)
  • source

SelectMany is used to concatenate these two observable sequences. Observable.Timer(timeSpan) has a single value that fires after timeSpan. SelectMany ignores this value and returns source. Then it repeats the result sequence with Retry.

  • Observable.Timer(timeSpan).SelectMany(_ => source).Retry()

This is the next observable sequence we want to continue when source is terminated by an exception. Catch swallows the exception thrown by source and continues with the next observable.

  • source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry())