Rx RetryWithDelay extension method

Rx provides Observable.Retry which repeats the source observable sequence until it successfully terminates.

Observable.Retry is a useful combinator, but it is a bit limited when retrying an I/O request. For example, when retrying a network request, we should wait a few seconds before sending a new request.

Here is an implementation of Observable.RetryWithDelay which repeats the source observable with delay.

    public static class ObservableExtensions
    {
        public static IObservable<T> RetryWithDelay<T>(this IObservable<T> source, TimeSpan timeSpan)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (timeSpan < TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("timeSpan");
            if (timeSpan == TimeSpan.Zero)
                return source.Retry();

            return source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry());
        }
    }

After validating arguments, Observable.RetryWithDelay create a new observable sequence by concatenating the following two observables:

  • Observable.Timer(timeSpan)
  • source

SelectMany is used to concatenate these two observable sequences. Observable.Timer(timeSpan) has a single value that fires after timeSpan. SelectMany ignores this value and returns source. Then it repeats the result sequence with Retry.

  • Observable.Timer(timeSpan).SelectMany(_ => source).Retry()

This is the next observable sequence we want to continue when source is terminated by an exception. Catch swallows the exception thrown by source and continues with the next observable.

  • source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry())

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s