Creating a ServerSentEventServer with ReactiveWebServer

Recently, I am working on a toy project named ReactiveWebServer. It is an event-driven web server built with Rx.

Here’s a simple web server returning “Hello World”.

using (var ws = new WebServer("http://*:8080/"))
{
    const string responseBody = "Hello World";

    ws.GET("/")
        .Subscribe(ctx => ctx.Respond(responseBody));

    Console.ReadLine();
}

The code is simple, but there is nothing new compared to other web servers such as NancyFx.

The real power of ReactiveWebServer comes from its ability to handle streaming data. For example, here’s a ServerSentEvent server implementation streaming integers every second.

using (var ws = new WebServer("http://*:8000/"))
{
    ws.GET("/events").Subscribe(ctx =>
    {
        var obs = Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(t => new ServerSentEvent(t.ToString()));

        ctx.Respond(new ServerSentEventsResponse(obs));
    });

    Console.ReadLine();
}

Data streaming is made simple and easy. You just need to create an instance of IObservable<ServerSentEvents>, wrap it with ServerSentEventsResponse, and pass the result to ctx.Respond method.

Rx Study Materials

I recently taught myself Rx and found the following materials are helpful in understanding the concepts and the APIs of Rx.

  • The introduction to Reactive Programming you’ve been missing
    This article explains how to think in reactive. It explains the core concepts of reactive programming with many examples. This is not-specific to Rx, but most helpful in understanding what reactive programming is.

  • Introduction to Rx
    This is a book on Rx freely available. Part 3 is most helpful as it shows how to use each operator of Rx.

  • ReactiveX
    The best way to understand a Rx operator is to draw a marble diagram. ReactiveX explains each operator with marble diagram. For example, take a look at Repeat.

  • 101 Rx Samples
    This site hosts a number of Rx operator examples.

  • Rx Workshop
    Video tutorials on Rx provided by Microsoft Channel9.

Rx RetryWithDelay extension method

Rx provides Observable.Retry which repeats the source observable sequence until it successfully terminates.

Observable.Retry is a useful combinator, but it is a bit limited when retrying an I/O request. For example, when retrying a network request, we should wait a few seconds before sending a new request.

Here is an implementation of Observable.RetryWithDelay which repeats the source observable with delay.

    public static class ObservableExtensions
    {
        public static IObservable<T> RetryWithDelay<T>(this IObservable<T> source, TimeSpan timeSpan)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (timeSpan < TimeSpan.Zero)
                throw new ArgumentOutOfRangeException("timeSpan");
            if (timeSpan == TimeSpan.Zero)
                return source.Retry();

            return source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry());
        }
    }

After validating arguments, Observable.RetryWithDelay create a new observable sequence by concatenating the following two observables:

  • Observable.Timer(timeSpan)
  • source

SelectMany is used to concatenate these two observable sequences. Observable.Timer(timeSpan) has a single value that fires after timeSpan. SelectMany ignores this value and returns source. Then it repeats the result sequence with Retry.

  • Observable.Timer(timeSpan).SelectMany(_ => source).Retry()

This is the next observable sequence we want to continue when source is terminated by an exception. Catch swallows the exception thrown by source and continues with the next observable.

  • source.Catch(Observable.Timer(timeSpan).SelectMany(_ => source).Retry())