Rx provides Observable.Retry which repeats the source observable sequence until it successfully terminates.
Observable.Retry
is a useful combinator, but it is a bit limited when retrying an I/O request. For example, when retrying a network request, we should wait a few seconds before sending a new request.
Here is an implementation of Observable.RetryWithDelay
which repeats the source observable with delay.
public static class ObservableExtensions { public static IObservable<T> RetryWithDelay<T>(this IObservable<T> source, TimeSpan timeSpan) { if (source == null) throw new ArgumentNullException("source"); if (timeSpan < TimeSpan.Zero) throw new ArgumentOutOfRangeException("timeSpan"); if (timeSpan == TimeSpan.Zero) return source.Retry(); return source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry()); } }
After validating arguments, Observable.RetryWithDelay
create a new observable sequence by concatenating the following two observables:
- Observable.Timer(timeSpan)
- source
SelectMany
is used to concatenate these two observable sequences. Observable.Timer(timeSpan)
has a single value that fires after timeSpan
. SelectMany
ignores this value and returns source
. Then it repeats the result sequence with Retry
.
- Observable.Timer(timeSpan).SelectMany(_ => source).Retry()
This is the next observable sequence we want to continue when source
is terminated by an exception. Catch
swallows the exception thrown by source
and continues with the next observable.
- source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry())