What this article is about
This is an introductory article on the fascinating subject of mastering two very powerful but underused, misused and often not well understood frameworks: Reactive Extensions and Dynamic-Data. More in-depth articles dedicated to these two frameworks will be coming soon.
Using both packages properly can greatly simplify and speed up designing and building real world applications, both backend (or middle tier) and the client side.
Reactive Extensions provide a way to easily manipulate streams of data by applying LINQ-like operators to them.
Dynamic Data allows observing collections' changes as streams and changing/combining streams using LINQ-like operators to create and update collections. This is very important, especially on the front end to create displayable collections. For example, using Dynamic Data one can create a data-grid with almost all the Virtualization, Filtering, Sorting and even Grouping capability provided on the non-visual collections (view models) with the visual DataGrid passively reflecting the non-visual objects manipulated by the Dynamic Data.
The reason Reactive Extensions and Dynamic Data frameworks are not used as much as they could have been is because of the lack of well written documentation on how to use them. This is precisely the purpose of this and the coming articles - to describe the correct usage patterns for Reactive Extensions and Dynamic Data.
All code samples for his series are written in C#/.NET 9 so some familiarity with C# and especially with its LINQ operators is assumed.
Introduction to Reactive Extensions (Rx)
Reactive Extensions provide a way of manipulating data coming in streams. Each stream usually emits data of the same type. Reactive Extensions provide numerous set of operators that allow manipulating the streaming data in various ways, including
Transforming
Filtering
Sorting
Combining multiple streams into one
Grouping
and many others.
In fact a stream can be considered a dual of a collection and all the LINQ Collection operators have their dual operators for Rx streams. I will not get much more into duality here - duality was important for discovering the concept but not so important for using and understanding it.
Unlike collection, streams also have a time dimension. Therefore, many operators that work on Rx streams are built to work in time and do not have their corresponding duals in LINQ. As a result, Rx streams have more operators than their collection counterparts.
Rx was first created by Microsoft around 2010-2011, as part of their .NET framework and was called Rx.NET. Since then, Reactive Extensions were translated into many other languages including JavaScript (RxJs), Java (RxJava), Python (RxPY) and many others.
Reactive Extensions, especially in combination with Dynamic Data, has a great potential to make software development considerably easier and faster (similar to what LINQ operators did for collections, but more so, since streams can be used in many more scenarios than collections and I'll expand on it further down). Also, note that the streams are the best way of communications between the backend (or middle tier) software and the front end (Web, Mobile or Desktop).
Despite these unique advantages, Reactive Extensions have not lived up to its potential. Many people encountered Rx, did a bit of a coding with it, but very few of them can be called Reactive Extensions experts. As mentioned above, the main reason for that is the lack of good documentation explaining how to use Reactive Extensions.
You can find a lot of Rx documentation on the web, for example ReactiveX or Introduction to Rx, but these on-line books are rather handbooks - detailing each Rx.NET operator and describing how they work in detail. They are very valuable when one needs to figure out how to use a specific Rx operator, but a bit boring to read (as any handbook would be).
There is also a wealth of samples e.g. reactive 101 samples. While these samples also provide a good reference guide, they do not highlight the main Rx concepts.
Instead of writing another handbook, my primary purpose is to show how to use Reactive Extensions correctly to achieve your goal of building real life applications clean and fast.
Introduction to Dynamic Data
Dynamic Data has been built by Roland Pheasant (apparently around 2014) by applying Rx.NET to monitor and update collections.
Using Dynamic Data one can do some really crazy things with collections, for example Create a collection B that monitors some other collection A for updates, removes and inserts so that each item within B corresponds to an item within A as long as some condition on the item is satisfies certain condition (filtering).
Using Dynamic Data you can apply many other operations on collections also (e.g. transformations, sorting, grouping, combing elements from different collections into one collection).
Dynamic Data very fascinating and gives you a lot of power especially when it comes to displaying collection items in various ways.
Documentation for Dynamic Data is also somewhat lacking. There are some blog entries by Roland Pheasant at Dynamic Data, some good explanations at reactivemarbles/dynamic-data, many responses to questions on Stackoverflow (c# and dynamic-data tags) and there are some samples on github, e.g. at DynamicData.Snippets and Dynamic.Trader.
Rx.NET Streams
Stream is an object that emits data. Usually single stream emits data of the same type within the same thread or async context multiple times with various time intervals between the emissions - time intervals do not have to be the same. Some streams can also be empty or have only a single data emission.
Rx Streams can be observed (subscribed to). If a stream is observed, each data emission will result in an action (event) fired with the emitted data being passed as the single parameter to that action.
In Rx.NET each stream is represented by IObservable<T>
type (from System.Runtime.dll which means that this interface is basic part of C# language). Type argument T
is the type of data being emitted. Here is the definition of IObservable<T>
interface
// Summary:
// Defines a provider for push-based notification.
// Type parameters:
// T:
// The object that provides notification information.
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
The interface is very simple - it has only one method - Subscribe(...)
that takes an object IObserver<T> observer
as its only argument and returns an IDisposable
.
IObserver<T>
interface is also defined in System.Runtime.dll. It has three methods:
// Summary:
// Provides a mechanism for receiving push-based notifications.
// Type parameters:
// T:
// The object that provides notification information.
public interface IObserver<in T>
{
void OnNext(T value);
void OnCompleted();
void OnError(Exception error);
}
Each of these 3 methods will be called on an IObserver<T>
which is subscribed to (observes) the IObservable<T>
stream under the following conditions:
-
OnNext(T value)
will be called when next data is emitted by the observed stream (potentially the stream might be empty and it might never be called). -
OnCompleted()
will be called when the stream signals that it has completed properly - no more data will be coming through it. Some streams might never signal completion so this method, might not ever be called either. -
OnError(Exception error)
will be called only if something went wrong with the stream and the stream signals that it is broken.
No data will be coming through the stream after it signaled completion or error.
Without Subscribe(...)
being called on it, IObservable<T>
stream is doing nothing - compare it to a shutdown water faucet. When one or several Subscribe(...)
methods are called on an observable stream - the data might start flowing - dripping through the stream.
Disposing the returned IDisposable
object will unsubscribe.
For those interested in duality - IObservable<T>
is dual to IEnumerable<T>
and IObserver<T>
is dual to IEnumerator<T>
- a very rarely used interface serving as an iterator through an IEnumerable<T>
collection. Instead of using and implementing IEnumerator<T>
, we use extention methods defined in static class System.Linq.Enumerable
to sort, filter, transform or combine several IEnumerable<T>
collection objects into the Enumerable collection we want.
In the similar fashion IObserver<T>
is almost never used or implemented outside of the Rx packages. Instead various observable streams are sorted, filtered, transformed or combined into other stream(s) in many different ways by various extension methods many of which are defined in static class System.Reactive.Linq.Observable
.
Reactive Stream Sources
Using Streams of Remote Data
The most natural and fastest ways of receiving streams - getting remote data as stream(s) (as opposed to the older request/reply paradigm).
Many modern communications packages supports streaming, for example:
- gRPC (Google RPC) and its simplified version grpc-web - one of the most popular and fastest middleware protocols.
- SignalR - one of the greatest and most powerful middleware packages - unfortunately, it requires .NET backend.
- Web Sockets
- Akka/Akka.NET
- Kafka
Here is a communication schema of a multi-tier application:
The backend consists of databases and live streams (containing and transporting, e.g. financial or other data).
IObservable<T>
Stream Adapter is part of the application that translates the streaming data into IObservable<T>
streams (or streams that are easy to convert to IObservable<T>
). Note that it can be factored out into e.g. a middle tier - as in the figure above - or it can be part of each client application.
This way of producing streams requires several tiers (parts of the application) communicating with each other. I will write a detailed article about it in the future as part of these series, but for now I shall provide other, more simple examples of creating Rx Streams.
Using System.Reactive and DynamicData Functionality to Create Streams
All the samples for this section are provided as XUnit [Fact]
s in CreatingAndManipulatingRxStreams project, which is part of NP.Samples github public repository.
To run or debug the samples, download or clone the repository onto you local machine, cd to RxStream/CreatingAndManipulatingRxStreams folder and open CreatingAndManipulatingRxStreams.sln solution in Visual Studio 2022.
For more on running and debuggin XUnit projects, please take a look at Everything you need to know to create XUnit C# Tests.
Note that the project depends on DynamicData package which in turn depends on System.Reactive package so that we get full access to both Rx.Net and DynamicData functionality.
There is only a dependency on System.Linq.Async package, but only one test demonstrating converting IAsyncEnumerable<T>
to IObservable<T>
depends on it.
Converting Collection into IObservable<T>
Here we demo turning IEnumerable<T>
collection into IObservable<T>
streams, by using Observable.ToObservable(...)
extension method.
The corresponding sample is locate within file StreamFromCollection.cs. Take a look at the static [Fact]
method TestStreamFromCollection()
:
[Fact]
public static void TestStreamFromCollection()
{
// source collection consists of int numbers 1 to 10
int[] sourceCollection1_10 =
Enumerable.Range(1, 10).ToArray();
IObservable<int> sourceObservable =
sourceCollection1_10.ToObservable();
// observable emitting squares for only even numbers
// out of the source
IObservable<int> evenSquaresObservable =
sourceObservable
.Where(i => i % 2 == 0) // filter only even numbers
.Select(i => i * i); // transform i to square(i)
// create result collection
var resultEvenSquaresCollection = new List<int>();
// populate result collection by subscribing
// to it.
// Note that before subscription, the observables
// however complex they are do not consume any
// heap resources and cannot result in a memory leak.
// Only subscription needs to be dispose of
// (which we do automatically by providing
// the 'using' clause).
using var subscriptionDisposable =
evenSquaresObservable
.Subscribe(i => resultEvenSquaresCollection.Add(i));
// create collection of squares of even positive ints
// smaller or equal to 10 to compare the
// result with
int[] expectedResultsCollection =
[2 * 2, 4 * 4, 6 * 6, 8 * 8, 10 * 10];
// assert that the result and expectedResults collections
// are the same.
Assert.True
(
resultEvenSquaresCollection
.SequenceEqual(expectedResultsCollection)
);
}
Since this is our first Rx Stream example, I'll explain it almost line by line.
The following line creates an array containing int
numbers from 1 to 10 as the sourceCollection
to be turned into an observable.
int[] sourceCollection1_10 = Enumerable.Range(1, 10).ToArray();
We create an observable for every number within the collection by calling static extension method Observable.ToObservable(...)
on it:
IObservable<int> sourceObservable =
sourceCollection1_10.ToObservable();
We use the powerful composition LINQ-like operators for Rx Streams to filter in only even numbers of the Stream and transform the filtered even numbers into their squares:
IObservable<int> evenSquaresObservable =
sourceObservable
.Where(i => i % 2 == 0) // filter only even numbers
.Select(i => i * i); // transform i to square(i)
Note that since there has been no subscription made so far, nothing is happening, the stream is not operating and no resources need to be freed or disposed.
Next we'll create collection of results, we want to populate within our subscription method and we also subscribe to the observable to populate this collection:
List<int> resultEvenSquaresCollection = new List<int>();
using var subscriptionDisposable =
evenSquaresObservable
.Subscribe(i => resultEvenSquaresCollection.Add(i));
Within our Observable.Subscribe(...)
extension method, we add each observed result to the resultEvenSquaresCollection
. Subscription returns an IDisposable
object which has to be disposed in order to prevent memory leaks (this is why we employ the using
clause here). In more complex examples, disposing a subscription can also serve for unsubscribing from the stream.
The rest of the code merely tests that we indeed got a collection of squares of even positive integers less or equal to 10.
Observable.Subscribe(...)
extension method has several overrides. We used the one that takes Action<T> onNext
argument that fires when new data is emitted. We call also use other overrides that use Action onCompleted
and Action<Exception> onError
arguments as well.
For example we can define bool completed = false
variable before the calling Subscribe(...)
and then call:
bool completed = false;
using var subscriptionDisposable =
evenSquaresObservable
.Subscribe
(
i => resultEvenSquaresCollection.Add(i),
() => completed = true
);
Assert.True(completed);
After observable stream is run, the completed
flag will be flipped to true
.
Another thing that could have been done differently is we did not need to create sourceObservable
and evenSquaresObservable
- I created them purely for clarity. In fact we could have done creating observable, performing filtering and transformation on it and subscribing within the same long C# statement:
using var subscriptionDisposable =
sourceCollection1_10
.ToObservable()
.Where(i => i % 2 == 0)
.Select(i => i * i)
.Subscribe(i => resultEvenSquaresCollection.Add(i));
Another test TestTimeSpannedStreamFromCollection()
from the same file shows how to create a 'real' (spaced in time) stream from the same collection by combining it with a Time Spanned stream using Zip(...)
operator:
[Fact]
public static async Task TestTimeSpannedStreamFromCollection()
{
// source collection consists of int numbers 1 to 10
int[] sourceCollection1_10 =
Enumerable.Range(1, 10).ToArray();
IObservable<int> evenSquaresObservable =
sourceCollection1_10
.ToObservable()
.Where(i => i % 2 == 0)
.Select(i => i * i);
// create an infinite stream emitting
// empty data (Unit.Default stands for empty data)
// every second
IObservable<Unit> eachSecondObservable =
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Unit.Default);
// Zip operator matches the data emitted from
// two or more observable streams - 1st to 1st, 2nd to 2nd
// etc.
// Emission number N happens after EACH one of its
// observable streams produces N-th emission.
//
// The Zipped stream completes when ANY of its
// observable streams complete.
IObservable<int> timeSpannedEvenSquaresObservable =
evenSquaresObservable
.Zip
(
eachSecondObservable,
// we only need values from
// the first sequence - evenSquaresObservable
// From the second sequence
// (eachSecondObservable) we only
// need time of emission
(i, unit) => i
);
var resultEvenSquaresCollection = new List<int>();
bool completed = false;
using var subscriptionDisposable =
timeSpannedEvenSquaresObservable
.Subscribe
(
i => resultEvenSquaresCollection.Add(i),
() => completed = true
);
// delay the testing until completed flag is
// switched to true
while (! completed)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
int[] expectedResultsCollection =
[2 * 2, 4 * 4, 6 * 6, 8 * 8, 10 * 10];
Assert.True
(
resultEvenSquaresCollection
.SequenceEqual(expectedResultsCollection)
);
}
Note that TestTimeSpannedStreamFromCollection()
is async
method and we wait for the completed
flag to become true
until we check the assertion. Running of this test should take more than 5 seconds.
And this is another example highlighting the great powers of Rx composition. In order to span out the emissions of the values of the collection we use Zip(...)
operator to combine the evenSquaresObservable
with another stream spanned in time. This (other) stream is created by using Observable.Interval(...)
method:
IObservable<Unit> eachSecondObservable =
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Unit.Default);
The eachSecondObservable
emits data of type Unit.Default
every second and it never sends the completion signal (it is potentially an infinite stream). Type Unit
and its only value Unit.Default
is used in Rx, when the emission data is not important (only emission timing is). In a sense it matches a C# event
that takes no data (of type Action
).
Then we combine these two streams (evenSquaresObservable
and eachSecondObservable
) into one by using Observable.Zip(...)
operator:
IObservable<int> timeSpannedEvenSquaresObservable =
evenSquaresObservable
.Zip
(
eachSecondObservable,
// we only need values from
// the first sequence - evenSquaresObservable
(i, unit) => i
);
Observable.Zip(...)
operator combines two or more streams into one stream, by combining the corresponding data items (1st to 1st, 2nd to 2nd, ... , N-th to N-th, an so on).
The N-th item of the resulting stream is emitted only after N-th items are emitted in every stream it combines.
The resulting stream is completed when any of its source streams is completed.
In a sense we marry our evenSquaresObservable
stream obtained from the collection and eachSecondObservable
obtained from Observable.Interval(...)
method taking emitted data from the first stream and emission time from the second.
IAsyncEnumerable to Stream
IAsyncEnumerable<T>
is a different (lower level) way of presenting a stream built into C# language. IAsyncEnumerable<T>
is especially important because gRPC and grpc-web streams are translated into IAsyncEnumerable<T>
in C# and converting it into an IObservable<T>
allows employing the powerful set on Rx Operators on it.
The demo of IAsyncEnumerable<T>
converted into an IObservable<T>
stream is located within StreamFromAsyncEnumerable.cs file.
The static method GenerateAsyncEnumerable()
generates IAsyncEnumerable<T>
stream:
// returns an IAsyncEnumerable stream
// of numbers from 1 to 6; each number
// takes one second to emit.
public static async IAsyncEnumerable<int>
GenerateAsyncEnumerable()
{
for (int i = 1; i <= 6; i++)
{
// one second delay
await Task.Delay(TimeSpan.FromSeconds(1));
yield return i;
}
}
It emits an integers from 1 to 6 every second.
Method TestStreamFromAsyncEnumerable()
converts IAsyncEnumerable<T>
into IObservable<T>
, filters only even number and transforms them into their squares (same as in the previous sample) and then tests the result using Assert
statement:
[Fact]
public static async Task TestStreamFromAsyncEnumerable()
{
// Get asyncEnumerable
IAsyncEnumerable<int> asyncEnumerable =
GenerateAsyncEnumerable();
// convert to an observable
// stream
IObservable<int> observable =
asyncEnumerable.ToObservable();
// create a result list to be populated
// within the observable's Subscription
var resultEvenSquaresCollection = new List<int>();
// define completed flag
bool completed = false;
// subscribe to receive
// squares of even numbers
using var subscribeDisposable =
observable
// filter in only even numbers
.Where(i => i % 2 == 0)
// transform i to square(i)
.Select(i => i * i)
.Subscribe
(
onNext:
i => resultEvenSquaresCollection.Add(i),
onCompleted:
() => completed = true
);
// delay the testing until completed flag is
// switched to true
while (!completed)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
// assert that the result as expected
Assert.True
(
resultEvenSquaresCollection
.SequenceEqual([2 * 2, 4 * 4, 6 * 6])
);
}
The only method unique to this test, demonstrated here, is AsyncEnumerable.ToObservable(...)
static extension method defined within System.Linq.Async.dll to convert IAsyncEnumerable<T>
into IObservable<T>
stream.
Using Subjects for Creating Observable Streams.
Subjects are special Rx observables that also implement three methods allowing to push data into them:
-
OnNext(T data)
- for pushing emitted data -
OnCompleted()
- for signaling the end of the stream -
OnError(Exception e)
for signaling a stream error
In fact ISubject<T>
implements both IObservable<T>
(for using it as a an observable stream source) and IObserver<T>
(for populating it with data).
Subjects are great
- For tests (since you have full control over the data you want to populate it with and completion/error behavior)
- For creating a stream from some non Rx Stream data.
- For using them to initialize auxiliary observable streams to facilitate stream composition.
Subject tests are located within StreamFromSubject.cs file.
Take a look at TestStreamFromSubject()
method:
[Fact]
public static void TestStreamFromSubject()
{
ISubject<int> subject = new Subject<int>();
var resultEvenSquaresCollection = new List<int>();
bool completed = false;
string exceptionMessage = null;
using var subscriptionDisposable =
subject
.Where(i => i % 2 == 0) // filter only even
.Select(i => i * i) // get squares
.Subscribe
(
onNext: i => resultEvenSquaresCollection.Add(i),
onCompleted: () => completed = true,
onError: exception =>
exceptionMessage = exception.Message
);
// push data into the stream
// by using OnNext() method
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);
// completion signal has not been sent yet
Assert.False(completed);
// no exception occurred yet
Assert.Null(exceptionMessage);
Assert.True
(
resultEvenSquaresCollection
.SequenceEqual([2 * 2, 4 * 4])
);
// add another even number
subject.OnNext(10);
// verify the output of the stream
Assert.True
(
resultEvenSquaresCollection
.SequenceEqual([2 * 2, 4 * 4, 10 * 10])
);
// no completion yet
Assert.False(completed);
// signal source stream completion
subject.OnCompleted();
// the completed flag should
// be reset to True
Assert.True(completed);
}
Note that our resulting observable stream does the same filtering and transformation as in the previous sample - it filters in only even number and than uses their squares for the output.
The source stream (ISubject<T> subject
variable) is populated by calling its OnNext(int i)
method and passing some integer to it:
// push data into the stream
// by using OnNext() method
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);
...
// add another even number
subject.OnNext(10);
Then the source stream is completed by calling
// signal source stream completion
subject.OnCompleted();
which also causes the result stream to complete.
The other test TestStreamFromSubjectWithException()
from the same file does everything the same as the first test, only signals an error at the end instead of completion:
// signal error
subject.OnError(new Exception("Error Occurred"));
// verify that our onError action worked
Assert.True(errorMessage == "Error Occurred");
The last Assert
call verifies that the error message was really passed to Subscribe(...)
method's onError
argument.
There are several useful variations of Subjects but since my purpose is to avoid writing a handbook, I'll skip them at this point.
C# Events to Streams
There is a close affinity between C# events and Observable Streams. Both implement observable pattern.
In one (not very important) way C# events are more powerful than streams - because they can take an arbitrary number of arguments while Rx stream takes only one (the no argument stream can also be imitated by IObservable<Unit>
). This shortcoming can be easily circumvented by using C# Tuples and transforming them with Select(...)
method.
There is one very important way in which Rx Observable Streams are much more powerful than C# events - streams can be transformed, manipulated and combined using many LINQ-like operators operators, while the events cannot. Composing events can be a big pain if you ever tried it. Complex event compositions are simply impossible.
StreamFromEvent.cs file has tests demonstrating how to convert an event into an Rx Observable stream.
There is a tiny EventContainingClass
class defined. Its only purpose is to contain an Action<bool>
event and provide a method for firing it:
class EventContainingClass
{
// boolean event
public event Action<bool>? TestEvent;
// fire event
public void FireTestEvent(bool arg)
{
TestEvent?.Invoke(arg);
}
}
There is also an extension method GetObservableFromEventObj(this EventContainingClass eventObj)
that given an object of that type, returns an Rx observable stream observing TestEvent
:
// extension method that
// gets the observable for the TestEvent
// of the EventContainingClass passed as
// the argument.
private static IObservable<bool>
GetObservableFromEventObj(this EventContainingClass eventObj)
{
// method Observable.FromEvent(...)
// is used to create an observable
// from an event.
// The first two arguments to it
// specify how to add and remove
// an event handler.
IObservable<bool> observable =
Observable
.FromEvent<Action<bool>, bool>
(
// add handler
eventHandler =>
eventObj.TestEvent += eventHandler,
// remove handler
eventHandler =>
eventObj.TestEvent -= eventHandler
);
return observable;
}
This method is provided for the sake of the code reuse and it is called multiple times from the tests within the file. It also demonstrates turning an event into an observable stream by using Observable.FromEvent(...)
method and providing two arguments to it - first argument specifying how to add a handler to the event and the second how to remove the handler from the event.
The first test method TestStreamFromEvent()
simply tests the observable obtained from an event:
[Fact]
public static void TestStreamFromEvent()
{
// create an object containing an
// Action<bool> event
EventContainingClass eventContainingObj =
new EventContainingClass();
// create an observable from
// eventContainingObj.TestEvent
IObservable<bool> observable =
eventContainingObj.GetObservableFromEventObj();
// boolean to specify
// the last argument to the event
bool lastFired = false;
// subscribe to change the lastFired
// to the last argument passed to
// fire the event
using var subscriptionDisposable =
observable.Subscribe(b => lastFired = b);
// false originally
Assert.False(lastFired);
// after true is fired, lastFired
// should become true
eventContainingObj.FireTestEvent(true);
Assert.True(lastFired);
// after another true is fired, lastFired
// should still be true
eventContainingObj.FireTestEvent(true);
Assert.True(lastFired);
// after false is fired, lastFired
// should change to false
eventContainingObj.FireTestEvent(false);
Assert.False(lastFired);
}
The second test method TestStreamFromEventComposition()
demonstrates combining the latest values of two different streams obtained form events by using Observable.CombineLatest(...)
method:
[Fact]
public static void TestStreamFromEventComposition()
{
// first object containing an event
EventContainingClass eventContainingObj1 =
new EventContainingClass();
// observable obtained from the
// first objects
IObservable<bool> observable1 =
eventContainingObj1.GetObservableFromEventObj();
// second object containing the event
EventContainingClass eventContainingObj2 =
new EventContainingClass();
// observable obtained from the
// second object
IObservable<bool> observable2 =
eventContainingObj2.GetObservableFromEventObj();
// combine the two observables by using
// Observable.CombineLatest(...) extension
// method. This method fires every time any
// observable changes.
IObservable<bool> combinedLatestObservable =
observable1
.CombineLatest
(
observable2,
// obtain the combined
// observable value by
// doing boolean AND
// operator on the two
// latest values from the
// two observables
resultSelector: (b1, b2) => b1 && b2
);
// get an observable to fire only if the
// result changes from the previous value
// (do not emit data if it stays the same)
IObservable<bool> combinedLatestDistinctObservable =
combinedLatestObservable.DistinctUntilChanged();
// latest value from
// the resulting observable
bool latestCompoundValue = false;
// subscribe to set the
// latestCompoundValue when
// the result changes
using var subscriptionDisposable =
combinedLatestDistinctObservable
.Subscribe
(
b => latestCompoundValue = b
);
Assert.False(latestCompoundValue);
eventContainingObj1.FireTestEvent(true);
eventContainingObj2.FireTestEvent(true);
// since both observable fired true
// the result should change to true
Assert.True(latestCompoundValue);
// one fire false - result
// changes to false
eventContainingObj1.FireTestEvent(false);
Assert.False(latestCompoundValue);
// both are false, result
// stays false
eventContainingObj2.FireTestEvent(false);
Assert.False(latestCompoundValue);
// only one changed back to true
// the result still stays false
eventContainingObj1.FireTestEvent(true);
Assert.False(latestCompoundValue);
// both changed to true
// the result changes to true
eventContainingObj2.FireTestEvent(true);
Assert.True(latestCompoundValue);
}
The most interesting and new Rx operators demonstrated here are CombineLatest(...)
and DistinctUntilChanged(...)
.
Observable.CombineLatest(...)
combines two or more observable streams by emitting new data when any of its source streams changes (unlike Zip(...)
operator that fires when all of its source stream change). Its argument resultSelector
specifies how to combine the result from the latest values of the two observables:
// combine the two observables by using
// Observable.CombineLatest(...) extension
// method. This method fires every time any
// observable changes.
IObservable<bool> combinedLatestObservable =
observable1
.CombineLatest
(
observable2,
// obtain the combined
// observable value by
// doing boolean AND
// operator on the two
// latest values from the
// two observables
resultSelector: (b1, b2) => b1 && b2
);
In our case, we use boolean AND operator (&&
) to combine the two bool values into a bool result.
The other very interesting method is Observable.DistinctUntilChanged(...)
. It prevents emitting data unless the data changes (prevents unnecessary stream data emissions).
C# Notifiable Properties to Streams
A property which fires an event every time it changes is very similar to an observable stream. Each property change corresponds to data emission and the data emitted will be just the property value.
And again as in case of events, converting properties to Rx Observable streams enables using the powerful stream manipulation, transform and composition operators.
In Avalonia, WPF and other XAML based frameworks, in order for a view model (non-visual) property change to be picked up by the visuals (the view) the class containing the property has to implement System.ComponentModel.INotifiablePropertyChange
interface and the property has to fire PropertyChanged
event when its value changes.
The best library for converting notifiable properties to streams and vice versa is ReactiveUI - a platform independent library that references both System.Reactive and DynamicData libraries. RectiveUI is beyond the scope of this article (though later I plan to write about it as part of these series).
DynamicData also has a means of converting a notifiable property to an Rx Stream - static extension method NotifyPropertyChangedEx.WhenValueChanged(...)
and we shall demo it in this subsection.
The test is located within StreamFromNotifiableProperty.cs file. The file also contains an PropContainer
class which has NotifiableProperty
of type bool
in it that fires PropertyChanged(...)
event when the property changes;
// class with single notifiable property
class PropContainer : INotifyPropertyChanged
{
public event PropertyChangedEventHandler? PropertyChanged;
bool _notifiableProperty = false;
public bool NotifiableProperty
{
get => _notifiableProperty;
set
{
if (_notifiableProperty == value)
{
return;
}
_notifiableProperty = value;
// fire PropertyChanged event
// if the property changes.
PropertyChanged?.Invoke
(
this,
new PropertyChangedEventArgs(nameof(NotifiableProperty))
);
}
}
}
And here is the code of the test:
[Fact]
public static void TestNotifyablePropertyValueChange ()
{
// create an object containing a notifiable property
var propContainer = new PropContainer ();
// use DynamicData's
// NotifyPropertyChangedEx.WhenValueChanged(...)
// method to convert the property to IObservable<bool>
IObservable<bool> whenPropChangedObservable =
propContainer.WhenValueChanged
(
pContainer => pContainer.NotifiableProperty
);
var currentPropValue = propContainer.NotifiableProperty;
// subscribe to change currentPropValue
// every time the property changes via the observable
using var subscriptionDisposable =
whenPropChangedObservable
.Subscribe(newValue => currentPropValue = newValue);
// test that the currentPropValue (obtained through
// the observable) really matches the current property.
Assert.Equal
(
currentPropValue,
propContainer.NotifiableProperty
);
propContainer.NotifiableProperty = true;
Assert.Equal
(
currentPropValue,
propContainer.NotifiableProperty
);
propContainer.NotifiableProperty = false;
Assert.Equal
(
currentPropValue,
propContainer.NotifiableProperty
);
}
The way we get the property observable is by calling WhenValueChanged(...)
operator on the object containing the property and passing an C# expression corresponding to that property to it:
// use DynamicData's
// NotifyPropertyChangedEx.WhenValueChanged(...)
// method to convert the property to IObservable<bool>
IObservable<bool> whenPropChangedObservable =
propContainer.WhenValueChanged
(
pContainer => pContainer.NotifiablePropert
);
Note that DynamicData also contains some other useful property notification converter operators including
-
WhebPropertyChanged(...)
- creates an observable that returns the changed object also (on top of the current property value). This can be important e.g. when dealing with collections. -
WhenChanged(...)
methods allowing to monitor several properties with single observable.
Dynamic Data Collections To Rx Observable Streams (Preview)
I plan to start talking about DynamidData in-depth further in the next article of the series. So here i'll just give a bit of a pep talk about the main ideas behind Dynamic Data.
C# has a built-in ObservableCollection<T>
class which implements INotifyCollectionChanged
interface and fires CollectionChanged
event each time the items are added, removed or rearranged within the collection.
The arguments to CollectionChanged
event carry full information about the changed items sufficient to reconstruct the changed collection.
Potentially CollectionChanged
arguments can be used as an IObservable<T>
streams allowing to use the collection changes to create and update other collections dependent on the original one.
Instead of ObservableCollection<T>
, the Dynamic Data uses ISourceList<T>
and ISourceCache<T, TKey>
as collection containers.
ISourceCache<T, TKey>
assumes that the collection items have a primary key and based on the primary key of incoming data - we either insert a new item or update the current item (with the same primary key) within the collection.
Both ISourceList<T>
and ISourceCache<T, TKey>
have a Connect()
method that allows to get IObservable<IChangeSet<T>>
(in case of ISourceList
) and IObservable<IChangeSet<T, TKey>>
(in case of the ISourceCache
). These observables are nothing more than the observables of changes to the corresponding collections. Note that IChangeSet
is not identical to CollectionChanged
event arguments, but is optimized for better performance and collection reconstruction.
Using IObservable<IChangeSet<T>>
and IObservable<IChangeSet<T, TKey>>
we can create other collections that mimic the original one, with some difference provided by the operations performed on IObservable
. For example we can create a collection that filters in only even int number from the original collection of integers (or something much much more complex).
The main trick is that such dependencies between the collections will be maintained automagically by Dynamic Data even if the source collection(s) change.
More about Dynamic data is coming in the future articles.
Top comments (0)