DEV Community

Cover image for Using Rx Observable Streams and Dynamic Data Patterns to build Applications Clean and Fast (Introduction)
Nick Polyak
Nick Polyak

Posted on • Edited on

Using Rx Observable Streams and Dynamic Data Patterns to build Applications Clean and Fast (Introduction)

What this article is about

This is an introductory article on the fascinating subject of mastering two very powerful but underused, misused and often not well understood frameworks: Reactive Extensions and Dynamic-Data. More in-depth articles dedicated to these two frameworks will be coming soon.

Using both packages properly can greatly simplify and speed up designing and building real world applications, both backend (or middle tier) and the client side.

Reactive Extensions provide a way to easily manipulate streams of data by applying LINQ-like operators to them.

Dynamic Data allows observing collections' changes as streams and changing/combining streams using LINQ-like operators to create and update collections. This is very important, especially on the front end to create displayable collections. For example, using Dynamic Data one can create a data-grid with almost all the Virtualization, Filtering, Sorting and even Grouping capability provided on the non-visual collections (view models) with the visual DataGrid passively reflecting the non-visual objects manipulated by the Dynamic Data.

The reason Reactive Extensions and Dynamic Data frameworks are not used as much as they could have been is because of the lack of well written documentation on how to use them. This is precisely the purpose of this and the coming articles - to describe the correct usage patterns for Reactive Extensions and Dynamic Data.

All code samples for his series are written in C#/.NET 9 so some familiarity with C# and especially with its LINQ operators is assumed.

Introduction to Reactive Extensions (Rx)

Reactive Extensions provide a way of manipulating data coming in streams. Each stream usually emits data of the same type. Reactive Extensions provide numerous set of operators that allow manipulating the streaming data in various ways, including

  • Transforming

  • Filtering

  • Sorting

  • Combining multiple streams into one

  • Grouping

and many others.
Single Rx Stream TrasnformMultiple Rx Stream Trasnform
In fact a stream can be considered a dual of a collection and all the LINQ Collection operators have their dual operators for Rx streams. I will not get much more into duality here - duality was important for discovering the concept but not so important for using and understanding it.

Unlike collection, streams also have a time dimension. Therefore, many operators that work on Rx streams are built to work in time and do not have their corresponding duals in LINQ. As a result, Rx streams have more operators than their collection counterparts.

Rx was first created by Microsoft around 2010-2011, as part of their .NET framework and was called Rx.NET. Since then, Reactive Extensions were translated into many other languages including JavaScript (RxJs), Java (RxJava), Python (RxPY) and many others.

Reactive Extensions, especially in combination with Dynamic Data, has a great potential to make software development considerably easier and faster (similar to what LINQ operators did for collections, but more so, since streams can be used in many more scenarios than collections and I'll expand on it further down). Also, note that the streams are the best way of communications between the backend (or middle tier) software and the front end (Web, Mobile or Desktop).

Despite these unique advantages, Reactive Extensions have not lived up to its potential. Many people encountered Rx, did a bit of a coding with it, but very few of them can be called Reactive Extensions experts. As mentioned above, the main reason for that is the lack of good documentation explaining how to use Reactive Extensions.

You can find a lot of Rx documentation on the web, for example ReactiveX or Introduction to Rx, but these on-line books are rather handbooks - detailing each Rx.NET operator and describing how they work in detail. They are very valuable when one needs to figure out how to use a specific Rx operator, but a bit boring to read (as any handbook would be).

There is also a wealth of samples e.g. reactive 101 samples. While these samples also provide a good reference guide, they do not highlight the main Rx concepts.

Instead of writing another handbook, my primary purpose is to show how to use Reactive Extensions correctly to achieve your goal of building real life applications clean and fast.

Introduction to Dynamic Data

Dynamic Data has been built by Roland Pheasant (apparently around 2014) by applying Rx.NET to monitor and update collections.

Using Dynamic Data one can do some really crazy things with collections, for example Create a collection B that monitors some other collection A for updates, removes and inserts so that each item within B corresponds to an item within A as long as some condition on the item is satisfies certain condition (filtering).

Using Dynamic Data you can apply many other operations on collections also (e.g. transformations, sorting, grouping, combing elements from different collections into one collection).Image description
Image description
Dynamic Data very fascinating and gives you a lot of power especially when it comes to displaying collection items in various ways.

Documentation for Dynamic Data is also somewhat lacking. There are some blog entries by Roland Pheasant at Dynamic Data, some good explanations at reactivemarbles/dynamic-data, many responses to questions on Stackoverflow (c# and dynamic-data tags) and there are some samples on github, e.g. at DynamicData.Snippets and Dynamic.Trader.

Rx.NET Streams

Stream is an object that emits data. Usually single stream emits data of the same type within the same thread or async context multiple times with various time intervals between the emissions - time intervals do not have to be the same. Some streams can also be empty or have only a single data emission.

Rx Streams can be observed (subscribed to). If a stream is observed, each data emission will result in an action (event) fired with the emitted data being passed as the single parameter to that action.

In Rx.NET each stream is represented by IObservable<T> type (from System.Runtime.dll which means that this interface is basic part of C# language). Type argument T is the type of data being emitted. Here is the definition of IObservable<T> interface

// Summary:
//     Defines a provider for push-based notification.
// Type parameters:
//   T:
//     The object that provides notification information.
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}
Enter fullscreen mode Exit fullscreen mode

The interface is very simple - it has only one method - Subscribe(...) that takes an object IObserver<T> observer as its only argument and returns an IDisposable.

IObserver<T> interface is also defined in System.Runtime.dll. It has three methods:

// Summary:
//     Provides a mechanism for receiving push-based notifications.
// Type parameters:
//   T:
//     The object that provides notification information.
public interface IObserver<in T>
{
    void OnNext(T value);
    void OnCompleted();
    void OnError(Exception error);
}
Enter fullscreen mode Exit fullscreen mode

Each of these 3 methods will be called on an IObserver<T> which is subscribed to (observes) the IObservable<T> stream under the following conditions:

  1. OnNext(T value) will be called when next data is emitted by the observed stream (potentially the stream might be empty and it might never be called).
  2. OnCompleted() will be called when the stream signals that it has completed properly - no more data will be coming through it. Some streams might never signal completion so this method, might not ever be called either.
  3. OnError(Exception error) will be called only if something went wrong with the stream and the stream signals that it is broken.

No data will be coming through the stream after it signaled completion or error.

Without Subscribe(...) being called on it, IObservable<T> stream is doing nothing - compare it to a shutdown water faucet. When one or several Subscribe(...) methods are called on an observable stream - the data might start flowing - dripping through the stream.

Disposing the returned IDisposable object will unsubscribe.

For those interested in duality - IObservable<T> is dual to IEnumerable<T> and IObserver<T> is dual to IEnumerator<T> - a very rarely used interface serving as an iterator through an IEnumerable<T> collection. Instead of using and implementing IEnumerator<T>, we use extention methods defined in static class System.Linq.Enumerable to sort, filter, transform or combine several IEnumerable<T> collection objects into the Enumerable collection we want.

In the similar fashion IObserver<T> is almost never used or implemented outside of the Rx packages. Instead various observable streams are sorted, filtered, transformed or combined into other stream(s) in many different ways by various extension methods many of which are defined in static class System.Reactive.Linq.Observable.

Reactive Stream Sources

Using Streams of Remote Data

The most natural and fastest ways of receiving streams - getting remote data as stream(s) (as opposed to the older request/reply paradigm).

Many modern communications packages supports streaming, for example:

  • gRPC (Google RPC) and its simplified version grpc-web - one of the most popular and fastest middleware protocols.
  • SignalR - one of the greatest and most powerful middleware packages - unfortunately, it requires .NET backend.
  • Web Sockets
  • Akka/Akka.NET
  • Kafka

Here is a communication schema of a multi-tier application:
Image description
The backend consists of databases and live streams (containing and transporting, e.g. financial or other data).

IObservable<T> Stream Adapter is part of the application that translates the streaming data into IObservable<T> streams (or streams that are easy to convert to IObservable<T>). Note that it can be factored out into e.g. a middle tier - as in the figure above - or it can be part of each client application.

This way of producing streams requires several tiers (parts of the application) communicating with each other. I will write a detailed article about it in the future as part of these series, but for now I shall provide other, more simple examples of creating Rx Streams.

Using System.Reactive and DynamicData Functionality to Create Streams

All the samples for this section are provided as XUnit [Fact]s in CreatingAndManipulatingRxStreams project, which is part of NP.Samples github public repository.

To run or debug the samples, download or clone the repository onto you local machine, cd to RxStream/CreatingAndManipulatingRxStreams folder and open CreatingAndManipulatingRxStreams.sln solution in Visual Studio 2022.

For more on running and debuggin XUnit projects, please take a look at Everything you need to know to create XUnit C# Tests.

Note that the project depends on DynamicData package which in turn depends on System.Reactive package so that we get full access to both Rx.Net and DynamicData functionality.

There is only a dependency on System.Linq.Async package, but only one test demonstrating converting IAsyncEnumerable<T> to IObservable<T> depends on it.

Converting Collection into IObservable<T>

Here we demo turning IEnumerable<T> collection into IObservable<T> streams, by using Observable.ToObservable(...) extension method.

The corresponding sample is locate within file StreamFromCollection.cs. Take a look at the static [Fact] method TestStreamFromCollection():

[Fact]
public static void TestStreamFromCollection()
{
    // source collection consists of int numbers 1 to 10
    int[] sourceCollection1_10 = 
                  Enumerable.Range(1, 10).ToArray();

    IObservable<int> sourceObservable = 
                        sourceCollection1_10.ToObservable();

    // observable emitting squares for only even numbers
    // out of the source
    IObservable<int> evenSquaresObservable =
            sourceObservable
            .Where(i => i % 2 == 0) // filter only even numbers
            .Select(i => i * i);    // transform i to square(i)


    // create result collection
    var resultEvenSquaresCollection = new List<int>();

    // populate result collection by subscribing
    // to it.
    // Note that before subscription, the observables
    // however complex they are do not consume any
    // heap resources and cannot result in a memory leak.
    // Only subscription needs to be dispose of 
    // (which we do automatically by providing 
    //  the 'using' clause).
    using var subscriptionDisposable =
        evenSquaresObservable
            .Subscribe(i => resultEvenSquaresCollection.Add(i));

    // create collection of squares of even positive ints
    // smaller or equal to 10 to compare the
    // result with
    int[] expectedResultsCollection =
        [2 * 2, 4 * 4, 6 * 6, 8 * 8, 10 * 10];

    // assert that the result and expectedResults collections
    // are the same.
    Assert.True
    (
         resultEvenSquaresCollection
              .SequenceEqual(expectedResultsCollection)
    );
}
Enter fullscreen mode Exit fullscreen mode

Since this is our first Rx Stream example, I'll explain it almost line by line.

The following line creates an array containing int numbers from 1 to 10 as the sourceCollection to be turned into an observable.

int[] sourceCollection1_10 = Enumerable.Range(1, 10).ToArray();
Enter fullscreen mode Exit fullscreen mode

We create an observable for every number within the collection by calling static extension method Observable.ToObservable(...) on it:

IObservable<int> sourceObservable = 
                     sourceCollection1_10.ToObservable();
Enter fullscreen mode Exit fullscreen mode

We use the powerful composition LINQ-like operators for Rx Streams to filter in only even numbers of the Stream and transform the filtered even numbers into their squares:

IObservable<int> evenSquaresObservable =
       sourceObservable
        .Where(i => i % 2 == 0) // filter only even numbers
        .Select(i => i * i);    // transform i to square(i)
Enter fullscreen mode Exit fullscreen mode

Note that since there has been no subscription made so far, nothing is happening, the stream is not operating and no resources need to be freed or disposed.

Next we'll create collection of results, we want to populate within our subscription method and we also subscribe to the observable to populate this collection:

List<int> resultEvenSquaresCollection = new List<int>();
using var subscriptionDisposable =
    evenSquaresObservable
        .Subscribe(i => resultEvenSquaresCollection.Add(i));
Enter fullscreen mode Exit fullscreen mode

Within our Observable.Subscribe(...) extension method, we add each observed result to the resultEvenSquaresCollection. Subscription returns an IDisposable object which has to be disposed in order to prevent memory leaks (this is why we employ the using clause here). In more complex examples, disposing a subscription can also serve for unsubscribing from the stream.

The rest of the code merely tests that we indeed got a collection of squares of even positive integers less or equal to 10.

Observable.Subscribe(...) extension method has several overrides. We used the one that takes Action<T> onNext argument that fires when new data is emitted. We call also use other overrides that use Action onCompleted and Action<Exception> onError arguments as well.

For example we can define bool completed = false variable before the calling Subscribe(...) and then call:

bool completed = false;
using var subscriptionDisposable =
    evenSquaresObservable
        .Subscribe
         (
              i => resultEvenSquaresCollection.Add(i), 
              () => completed = true
         );

Assert.True(completed);
Enter fullscreen mode Exit fullscreen mode

After observable stream is run, the completed flag will be flipped to true.

Another thing that could have been done differently is we did not need to create sourceObservable and evenSquaresObservable - I created them purely for clarity. In fact we could have done creating observable, performing filtering and transformation on it and subscribing within the same long C# statement:

using var subscriptionDisposable =
    sourceCollection1_10
        .ToObservable()
        .Where(i => i % 2 == 0)
        .Select(i => i * i)
        .Subscribe(i => resultEvenSquaresCollection.Add(i));
Enter fullscreen mode Exit fullscreen mode

Another test TestTimeSpannedStreamFromCollection() from the same file shows how to create a 'real' (spaced in time) stream from the same collection by combining it with a Time Spanned stream using Zip(...) operator:

[Fact]
public static async Task TestTimeSpannedStreamFromCollection()
{
    // source collection consists of int numbers 1 to 10
    int[] sourceCollection1_10 = 
            Enumerable.Range(1, 10).ToArray();

    IObservable<int> evenSquaresObservable =
                        sourceCollection1_10
                            .ToObservable()
                            .Where(i => i % 2 == 0)
                            .Select(i => i * i);

    // create an infinite stream emitting 
    // empty data (Unit.Default stands for empty data)
    // every second
    IObservable<Unit> eachSecondObservable =
        Observable.Interval(TimeSpan.FromSeconds(1))
                    .Select(_ => Unit.Default);

    // Zip operator matches the data emitted from
    // two or more observable streams - 1st to 1st, 2nd to 2nd
    // etc.
    // Emission number N happens after EACH one of its
    // observable streams produces N-th emission. 
    //
    // The Zipped stream completes when ANY of its 
    // observable streams complete.
    IObservable<int> timeSpannedEvenSquaresObservable =
        evenSquaresObservable
            .Zip
            (
                eachSecondObservable,

                // we only need values from 
                // the first sequence - evenSquaresObservable
                // From the second sequence 
                // (eachSecondObservable) we only 
                // need time of emission
                (i, unit) => i
            );

    var resultEvenSquaresCollection = new List<int>();

    bool completed = false;
    using var subscriptionDisposable =
                    timeSpannedEvenSquaresObservable
                    .Subscribe
                    (
                        i => resultEvenSquaresCollection.Add(i),
                        () => completed = true
                    );

    // delay the testing until completed flag is 
    // switched to true
    while (! completed)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
    }

    int[] expectedResultsCollection =
        [2 * 2, 4 * 4, 6 * 6, 8 * 8, 10 * 10];

    Assert.True
    (
        resultEvenSquaresCollection
           .SequenceEqual(expectedResultsCollection)
    );
}
Enter fullscreen mode Exit fullscreen mode

Note that TestTimeSpannedStreamFromCollection() is async method and we wait for the completed flag to become true until we check the assertion. Running of this test should take more than 5 seconds.

And this is another example highlighting the great powers of Rx composition. In order to span out the emissions of the values of the collection we use Zip(...) operator to combine the evenSquaresObservable with another stream spanned in time. This (other) stream is created by using Observable.Interval(...) method:

IObservable<Unit> eachSecondObservable =
    Observable.Interval(TimeSpan.FromSeconds(1))
              .Select(_ => Unit.Default);
Enter fullscreen mode Exit fullscreen mode

The eachSecondObservable emits data of type Unit.Default every second and it never sends the completion signal (it is potentially an infinite stream). Type Unit and its only value Unit.Default is used in Rx, when the emission data is not important (only emission timing is). In a sense it matches a C# event that takes no data (of type Action).

Then we combine these two streams (evenSquaresObservable and eachSecondObservable) into one by using Observable.Zip(...) operator:

IObservable<int> timeSpannedEvenSquaresObservable =
    evenSquaresObservable
        .Zip
        (
            eachSecondObservable,

            // we only need values from 
            // the first sequence - evenSquaresObservable
            (i, unit) => i
        );
Enter fullscreen mode Exit fullscreen mode

Observable.Zip(...) operator combines two or more streams into one stream, by combining the corresponding data items (1st to 1st, 2nd to 2nd, ... , N-th to N-th, an so on).

The N-th item of the resulting stream is emitted only after N-th items are emitted in every stream it combines.

The resulting stream is completed when any of its source streams is completed.

In a sense we marry our evenSquaresObservable stream obtained from the collection and eachSecondObservable obtained from Observable.Interval(...) method taking emitted data from the first stream and emission time from the second.

IAsyncEnumerable to Stream

IAsyncEnumerable<T> is a different (lower level) way of presenting a stream built into C# language. IAsyncEnumerable<T> is especially important because gRPC and grpc-web streams are translated into IAsyncEnumerable<T> in C# and converting it into an IObservable<T> allows employing the powerful set on Rx Operators on it.

The demo of IAsyncEnumerable<T> converted into an IObservable<T> stream is located within StreamFromAsyncEnumerable.cs file.

The static method GenerateAsyncEnumerable() generates IAsyncEnumerable<T> stream:

// returns an IAsyncEnumerable stream
// of numbers from 1 to 6; each number
// takes one second to emit.
public static async IAsyncEnumerable<int> 
    GenerateAsyncEnumerable()
{
    for (int i = 1; i <= 6; i++)
    {
        // one second delay
        await Task.Delay(TimeSpan.FromSeconds(1));

        yield return i;
    }
}
Enter fullscreen mode Exit fullscreen mode

It emits an integers from 1 to 6 every second.

Method TestStreamFromAsyncEnumerable() converts IAsyncEnumerable<T> into IObservable<T>, filters only even number and transforms them into their squares (same as in the previous sample) and then tests the result using Assert statement:

[Fact]
public static async Task TestStreamFromAsyncEnumerable()
{
    // Get asyncEnumerable
    IAsyncEnumerable<int> asyncEnumerable = 
        GenerateAsyncEnumerable();

    // convert to an observable
    // stream
    IObservable<int> observable = 
        asyncEnumerable.ToObservable();

    // create a result list to be populated
    // within the observable's Subscription
    var resultEvenSquaresCollection = new List<int>();

    // define completed flag
    bool completed = false;

    // subscribe to receive
    // squares of even numbers
    using var subscribeDisposable =
        observable

            // filter in only even numbers
            .Where(i => i % 2 == 0) 

            // transform i to square(i)
            .Select(i => i * i)
            .Subscribe
            (
                onNext:
                    i => resultEvenSquaresCollection.Add(i),

                onCompleted:
                    () => completed = true
            );

    // delay the testing until completed flag is 
    // switched to true
    while (!completed)
    {
        await Task.Delay(TimeSpan.FromSeconds(1));
    }

    // assert that the result as expected
    Assert.True
    (
        resultEvenSquaresCollection
            .SequenceEqual([2 * 2, 4 * 4, 6 * 6])
    );
}
Enter fullscreen mode Exit fullscreen mode

The only method unique to this test, demonstrated here, is AsyncEnumerable.ToObservable(...) static extension method defined within System.Linq.Async.dll to convert IAsyncEnumerable<T> into IObservable<T> stream.

Using Subjects for Creating Observable Streams.

Subjects are special Rx observables that also implement three methods allowing to push data into them:

  1. OnNext(T data) - for pushing emitted data
  2. OnCompleted() - for signaling the end of the stream
  3. OnError(Exception e) for signaling a stream error

In fact ISubject<T> implements both IObservable<T> (for using it as a an observable stream source) and IObserver<T> (for populating it with data).

Subjects are great

  1. For tests (since you have full control over the data you want to populate it with and completion/error behavior)
  2. For creating a stream from some non Rx Stream data.
  3. For using them to initialize auxiliary observable streams to facilitate stream composition.

Subject tests are located within StreamFromSubject.cs file.

Take a look at TestStreamFromSubject() method:

[Fact]
public static void TestStreamFromSubject()
{
    ISubject<int> subject = new Subject<int>();

    var resultEvenSquaresCollection = new List<int>();
    bool completed = false;
    string exceptionMessage = null;

    using var subscriptionDisposable =
        subject
            .Where(i => i % 2 == 0) // filter only even
            .Select(i => i * i) // get squares
            .Subscribe
            (
                onNext: i => resultEvenSquaresCollection.Add(i), 
                onCompleted: () => completed = true,
                onError: exception => 
                            exceptionMessage = exception.Message
            );

    // push data into the stream
    // by using OnNext() method
    subject.OnNext(1);
    subject.OnNext(2);
    subject.OnNext(3);
    subject.OnNext(4);

    // completion signal has not been sent yet
    Assert.False(completed);    

    // no exception occurred yet
    Assert.Null(exceptionMessage);

    Assert.True
    (
        resultEvenSquaresCollection
            .SequenceEqual([2 * 2, 4 * 4])
    );

    // add another even number
    subject.OnNext(10);

    // verify the output of the stream
    Assert.True
    (
        resultEvenSquaresCollection
            .SequenceEqual([2 * 2, 4 * 4, 10 * 10])
    );

    // no completion yet
    Assert.False(completed);

    // signal source stream completion
    subject.OnCompleted();

    // the completed flag should
    // be reset to True
    Assert.True(completed);
}
Enter fullscreen mode Exit fullscreen mode

Note that our resulting observable stream does the same filtering and transformation as in the previous sample - it filters in only even number and than uses their squares for the output.

The source stream (ISubject<T> subject variable) is populated by calling its OnNext(int i) method and passing some integer to it:

// push data into the stream
// by using OnNext() method
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);

...

// add another even number
subject.OnNext(10);
Enter fullscreen mode Exit fullscreen mode

Then the source stream is completed by calling

// signal source stream completion
subject.OnCompleted();
Enter fullscreen mode Exit fullscreen mode

which also causes the result stream to complete.

The other test TestStreamFromSubjectWithException() from the same file does everything the same as the first test, only signals an error at the end instead of completion:

// signal error
subject.OnError(new Exception("Error Occurred"));

// verify that our onError action worked
Assert.True(errorMessage == "Error Occurred");
Enter fullscreen mode Exit fullscreen mode

The last Assert call verifies that the error message was really passed to Subscribe(...) method's onError argument.

There are several useful variations of Subjects but since my purpose is to avoid writing a handbook, I'll skip them at this point.

C# Events to Streams

There is a close affinity between C# events and Observable Streams. Both implement observable pattern.

In one (not very important) way C# events are more powerful than streams - because they can take an arbitrary number of arguments while Rx stream takes only one (the no argument stream can also be imitated by IObservable<Unit>). This shortcoming can be easily circumvented by using C# Tuples and transforming them with Select(...) method.

There is one very important way in which Rx Observable Streams are much more powerful than C# events - streams can be transformed, manipulated and combined using many LINQ-like operators operators, while the events cannot. Composing events can be a big pain if you ever tried it. Complex event compositions are simply impossible.

StreamFromEvent.cs file has tests demonstrating how to convert an event into an Rx Observable stream.

There is a tiny EventContainingClass class defined. Its only purpose is to contain an Action<bool> event and provide a method for firing it:

class EventContainingClass
{
    // boolean event
    public event Action<bool>? TestEvent;

    // fire event
    public void FireTestEvent(bool arg)
    {
        TestEvent?.Invoke(arg);
    }
}
Enter fullscreen mode Exit fullscreen mode

There is also an extension method GetObservableFromEventObj(this EventContainingClass eventObj) that given an object of that type, returns an Rx observable stream observing TestEvent:

// extension method that 
// gets the observable for the TestEvent
// of the EventContainingClass passed as
// the argument.
private static IObservable<bool> 
    GetObservableFromEventObj(this EventContainingClass eventObj)
{
    // method Observable.FromEvent(...)
    // is used to create an observable 
    // from an event. 
    // The first two arguments to it
    // specify how to add and remove
    // an event handler. 
    IObservable<bool> observable =
        Observable
            .FromEvent<Action<bool>, bool>
            (
            // add handler
            eventHandler =>
                eventObj.TestEvent += eventHandler,

            // remove handler
            eventHandler =>
                eventObj.TestEvent -= eventHandler
            );

    return observable;
}
Enter fullscreen mode Exit fullscreen mode

This method is provided for the sake of the code reuse and it is called multiple times from the tests within the file. It also demonstrates turning an event into an observable stream by using Observable.FromEvent(...) method and providing two arguments to it - first argument specifying how to add a handler to the event and the second how to remove the handler from the event.

The first test method TestStreamFromEvent() simply tests the observable obtained from an event:

[Fact]
public static void TestStreamFromEvent()
{
    // create an object containing an
    // Action<bool> event
    EventContainingClass eventContainingObj = 
        new EventContainingClass();

    // create an observable from 
    // eventContainingObj.TestEvent
    IObservable<bool> observable =
        eventContainingObj.GetObservableFromEventObj();

    // boolean to specify 
    // the last argument to the event
    bool lastFired = false;

    // subscribe to change the lastFired
    // to the last argument passed to 
    // fire the event
    using var subscriptionDisposable =
        observable.Subscribe(b => lastFired = b);

    // false originally
    Assert.False(lastFired);

    // after true is fired, lastFired 
    // should become true
    eventContainingObj.FireTestEvent(true);
    Assert.True(lastFired);

    // after another true is fired, lastFired 
    // should still be true
    eventContainingObj.FireTestEvent(true);
    Assert.True(lastFired);

    // after false is fired, lastFired 
    // should change to false
    eventContainingObj.FireTestEvent(false);
    Assert.False(lastFired);
}
Enter fullscreen mode Exit fullscreen mode

The second test method TestStreamFromEventComposition() demonstrates combining the latest values of two different streams obtained form events by using Observable.CombineLatest(...) method:

[Fact]
public static void TestStreamFromEventComposition()
{
    // first object containing an event
    EventContainingClass eventContainingObj1 =
        new EventContainingClass();

    // observable obtained from the
    // first objects
    IObservable<bool> observable1 =
        eventContainingObj1.GetObservableFromEventObj();

    // second object containing the event
    EventContainingClass eventContainingObj2 =
        new EventContainingClass();

    // observable obtained from the
    // second object
    IObservable<bool> observable2 =
        eventContainingObj2.GetObservableFromEventObj();

    // combine the two observables by using 
    // Observable.CombineLatest(...) extension 
    // method. This method fires every time any 
    // observable changes. 
    IObservable<bool> combinedLatestObservable =
        observable1
            .CombineLatest
            (
                observable2,

                // obtain the combined
                // observable value by 
                // doing boolean AND 
                // operator on the two
                // latest values from the
                // two observables
                 resultSelector: (b1, b2) => b1 && b2
            );

    // get an observable to fire only if the
    // result changes from the previous value
    // (do not emit data if it stays the same)
    IObservable<bool> combinedLatestDistinctObservable =
        combinedLatestObservable.DistinctUntilChanged();

    // latest value from 
    // the resulting observable
    bool latestCompoundValue = false;

    // subscribe to set the
    // latestCompoundValue when 
    // the result changes
    using var subscriptionDisposable =
        combinedLatestDistinctObservable
            .Subscribe
            (
                b => latestCompoundValue = b
            );

    Assert.False(latestCompoundValue);

    eventContainingObj1.FireTestEvent(true);
    eventContainingObj2.FireTestEvent(true);

    // since both observable fired true
    // the result should change to true
    Assert.True(latestCompoundValue);

    // one fire false - result
    // changes to false
    eventContainingObj1.FireTestEvent(false);
    Assert.False(latestCompoundValue);

    // both are false, result 
    // stays false
    eventContainingObj2.FireTestEvent(false);
    Assert.False(latestCompoundValue);

    // only one changed back to true
    // the result still stays false
    eventContainingObj1.FireTestEvent(true);
    Assert.False(latestCompoundValue);

    // both changed to true
    // the result changes to true
    eventContainingObj2.FireTestEvent(true);
    Assert.True(latestCompoundValue);
}
Enter fullscreen mode Exit fullscreen mode

The most interesting and new Rx operators demonstrated here are CombineLatest(...) and DistinctUntilChanged(...).

Observable.CombineLatest(...) combines two or more observable streams by emitting new data when any of its source streams changes (unlike Zip(...) operator that fires when all of its source stream change). Its argument resultSelector specifies how to combine the result from the latest values of the two observables:

// combine the two observables by using 
// Observable.CombineLatest(...) extension 
// method. This method fires every time any 
// observable changes. 
IObservable<bool> combinedLatestObservable =
    observable1
        .CombineLatest
        (
            observable2,

            // obtain the combined
            // observable value by 
            // doing boolean AND 
            // operator on the two
            // latest values from the
            // two observables
            resultSelector: (b1, b2) => b1 && b2
        );
Enter fullscreen mode Exit fullscreen mode

In our case, we use boolean AND operator (&&) to combine the two bool values into a bool result.

The other very interesting method is Observable.DistinctUntilChanged(...). It prevents emitting data unless the data changes (prevents unnecessary stream data emissions).

C# Notifiable Properties to Streams

A property which fires an event every time it changes is very similar to an observable stream. Each property change corresponds to data emission and the data emitted will be just the property value.

And again as in case of events, converting properties to Rx Observable streams enables using the powerful stream manipulation, transform and composition operators.

In Avalonia, WPF and other XAML based frameworks, in order for a view model (non-visual) property change to be picked up by the visuals (the view) the class containing the property has to implement System.ComponentModel.INotifiablePropertyChange interface and the property has to fire PropertyChanged event when its value changes.

The best library for converting notifiable properties to streams and vice versa is ReactiveUI - a platform independent library that references both System.Reactive and DynamicData libraries. RectiveUI is beyond the scope of this article (though later I plan to write about it as part of these series).

DynamicData also has a means of converting a notifiable property to an Rx Stream - static extension method NotifyPropertyChangedEx.WhenValueChanged(...) and we shall demo it in this subsection.

The test is located within StreamFromNotifiableProperty.cs file. The file also contains an PropContainer class which has NotifiableProperty of type bool in it that fires PropertyChanged(...) event when the property changes;

// class with single notifiable property 
class PropContainer : INotifyPropertyChanged
{
    public event PropertyChangedEventHandler? PropertyChanged;

    bool _notifiableProperty = false;
    public bool NotifiableProperty
    {
        get => _notifiableProperty;

        set
        {
            if (_notifiableProperty == value)
            {
                return;
            }

            _notifiableProperty = value;

            // fire PropertyChanged event 
            // if the property changes.
            PropertyChanged?.Invoke
            (
                this, 
                new PropertyChangedEventArgs(nameof(NotifiableProperty))
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

And here is the code of the test:

[Fact]
public static void TestNotifyablePropertyValueChange ()
{
    // create an object containing a notifiable property
    var propContainer = new PropContainer ();

    // use DynamicData's 
    // NotifyPropertyChangedEx.WhenValueChanged(...)
    // method to convert the property to IObservable<bool>
    IObservable<bool> whenPropChangedObservable = 
        propContainer.WhenValueChanged
        (
            pContainer => pContainer.NotifiableProperty
        );

    var currentPropValue = propContainer.NotifiableProperty;

    // subscribe to change currentPropValue
    // every time the property changes via the observable
    using var subscriptionDisposable = 
        whenPropChangedObservable
            .Subscribe(newValue => currentPropValue = newValue);

    // test that the currentPropValue (obtained through 
    // the observable) really matches the current property.
    Assert.Equal
    (
        currentPropValue, 
        propContainer.NotifiableProperty
    );

    propContainer.NotifiableProperty = true;
    Assert.Equal
    (
        currentPropValue, 
        propContainer.NotifiableProperty
    );

    propContainer.NotifiableProperty = false;
    Assert.Equal
    (
        currentPropValue, 
        propContainer.NotifiableProperty
    );
}
Enter fullscreen mode Exit fullscreen mode

The way we get the property observable is by calling WhenValueChanged(...) operator on the object containing the property and passing an C# expression corresponding to that property to it:

// use DynamicData's 
// NotifyPropertyChangedEx.WhenValueChanged(...)
// method to convert the property to IObservable<bool>
IObservable<bool> whenPropChangedObservable = 
    propContainer.WhenValueChanged
    (
        pContainer => pContainer.NotifiablePropert
    );
Enter fullscreen mode Exit fullscreen mode

Note that DynamicData also contains some other useful property notification converter operators including

  1. WhebPropertyChanged(...) - creates an observable that returns the changed object also (on top of the current property value). This can be important e.g. when dealing with collections.
  2. WhenChanged(...) methods allowing to monitor several properties with single observable.

Dynamic Data Collections To Rx Observable Streams (Preview)

I plan to start talking about DynamidData in-depth further in the next article of the series. So here i'll just give a bit of a pep talk about the main ideas behind Dynamic Data.

C# has a built-in ObservableCollection<T> class which implements INotifyCollectionChanged interface and fires CollectionChanged event each time the items are added, removed or rearranged within the collection.

The arguments to CollectionChanged event carry full information about the changed items sufficient to reconstruct the changed collection.

Potentially CollectionChanged arguments can be used as an IObservable<T> streams allowing to use the collection changes to create and update other collections dependent on the original one.

Instead of ObservableCollection<T>, the Dynamic Data uses ISourceList<T> and ISourceCache<T, TKey> as collection containers.

ISourceCache<T, TKey> assumes that the collection items have a primary key and based on the primary key of incoming data - we either insert a new item or update the current item (with the same primary key) within the collection.

Both ISourceList<T> and ISourceCache<T, TKey> have a Connect() method that allows to get IObservable<IChangeSet<T>> (in case of ISourceList) and IObservable<IChangeSet<T, TKey>> (in case of the ISourceCache). These observables are nothing more than the observables of changes to the corresponding collections. Note that IChangeSet is not identical to CollectionChanged event arguments, but is optimized for better performance and collection reconstruction.

Using IObservable<IChangeSet<T>> and IObservable<IChangeSet<T, TKey>> we can create other collections that mimic the original one, with some difference provided by the operations performed on IObservable. For example we can create a collection that filters in only even int number from the original collection of integers (or something much much more complex).

The main trick is that such dependencies between the collections will be maintained automagically by Dynamic Data even if the source collection(s) change.

More about Dynamic data is coming in the future articles.

Top comments (0)