The source code repro for this article can be found here.
As I am getting myself up to date with the modern C# language features, I'd like to share what feels like an interesting and natural use case for IAsyncEnumerable
: iterating through a sequence of arbitrary events.
A (not so) brief introduction (TLDR, skip)
Support for asynchronous iterators (or "async streams") has been added to C# 8.0, and includes the following language and runtime features:
-
IAsyncEnumerable
,IAsyncEnumerator
,IAsyncDisposable
-
await foreach
await using
EnumeratorCancellation
Asynchronous streams may come handy when implementing various producer/consumer scenarios in C#. IAsyncEnumerable
and await foreach
are just async counterparts for IEnumerable
and foreach
. Same as with IEnumerable<T> EnumSomething()
or async Task<T> DoSomethingAsync()
, when the C# compiler encounters async IAsyncEnumerable<T> EnumSomethingAsync()
, it generates a special state machine class. The compiler breaks the logical execution flow within EnumSomethingAsync
into multiple asynchronous continuations, separated by await
or yield return
operators. Prior to C# 8.0, it wasn't possible to combine these two within the same method. Now it is, and the whole set of the familiar Linq
extension is now available as part of System.Linq.Async, to operate on the asynchronous stream of data generated via yield return
or by other means, like below.
There is an abundance of great material on the web to help familiarize yourself with the concept of asynchronous streams. I could highly recommend "Iterating with Async Enumerables in C# 8", by Stephen Toub.
What I'd like to show here is how to turn a sequence of events of any kind of origin into an iterable async stream. While there are many ways of doing this, I'd like to focus on the following two: with Reactive Extensions (System.Reactive
) or by using an unbound Channel
from the relatively new System.Threading.Channels
.
Producing async streams with Reactive Extensions
To illustrate this, I wrote a simple WinForms app (source gist) that has two independent event sources: a timer and a button. It's a contrived example but it's easy to play with and step through, to show the concept.
We turn timer ticks and button clicks into Reactive's IObservable
observables with Observable.FromEventPattern
. Then we combine two observables into one using Observable.Merge
:
// observe Click events
var clickObservable = Observable
.FromEventPattern(
handler => button.Click += handler,
handler => button.Click -= handler)
.Select(_ => (button as Component, $"Clicked on {DateTime.Now}"));
// observe Tick events
var tickObservable = Observable
.FromEventPattern(
handler => timer.Tick += handler,
handler => timer.Tick -= handler)
.Select(_ => (timer as Component, $"Ticked on {DateTime.Now}"));
// merge two observables
var mergedObservable = Observable.Merge(clickObservable, tickObservable);
Now we simply turn the combined observable into an instance of IAsyncEnumerable
with ToAsyncEnumerable()
, and we can asynchronously iterate though all events with await foreach
as they occur:
// process events as async stream via ToAsyncEnumerable(),
// that's when the actual subscriptions happen, i.e.,
// the event handlers get connected to their corresponding events
await ReadEventStreamAsync(mergedObservable.ToAsyncEnumerable(), cts.Token);
static async Task ReadEventStreamAsync(
IAsyncEnumerable<(Component, string)> source,
CancellationToken token)
{
await foreach (var (component, text) in source.WithCancellation(token))
{
// e.g., delay processing
await Task.Delay(100, token);
Console.WriteLine($"{component.GetType().Name}: {text}");
}
}
Various LINQ operators can now be applied to the source
stream above, like projection, filtering, etc.
Running it and clicking the button:
Producing async streams with System.Threading.Channels
What if we don't want to involve Reactive Extensions here? They do seem a bit like an overkill for a simple producer/consumer workflow like above.
No worries, we can use an unbound Channel
to act as buffer for our event stream. A Channel
is like a pipe, we can push event data objects into one side of the pipe, and fetch them asynchronously from the other. In case with an unbound channel, its internal buffer size is limited by the available memory. In real-life scenarios, we'd almost always want to limit that. Channels are just one way of implementing the Asynchronous Queue data structure in .NET, there are some others, notably Dataflow BufferBlock<T>
. For more details on Channels, visit "An Introduction to System.Threading.Channels" by Stephen Toub.
So, we introduce a helper class EventChannel
(source) to expose Channel.Writer.TryWrite
, Channel.Reader.ReadAllAsync
and manage the scope of event handlers as IDisposable
:
public class EventChannel<T> : IDisposable
{
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();
public Task Completion => _channel.Reader.Completion;
/// <summary>Queue an event item to the write side of the channel</summary>
public bool Post(T item)
{
return _channel.Writer.TryWrite(item);
}
/// <summary>Read queued events as an async stream</summary>
public IAsyncEnumerable<T> ToAsyncEnumerable(CancellationToken token)
{
return _channel.Reader.ReadAllAsync(token);
}
/// <summary>A simple helper to wrap even handler scope as IDisposable</summary>
internal struct EventSubscription<TEventHandler> : IDisposable
where TEventHandler : Delegate
{
private readonly Action _unsubscribe;
public EventSubscription(
TEventHandler handler,
Action<TEventHandler> subscribe,
Action<TEventHandler> unsubscribe)
{
subscribe(handler);
_unsubscribe = () => unsubscribe(handler);
}
public void Dispose()
{
_unsubscribe();
}
}
/// <summary>
/// Subscribe to an event
/// </summary>
public IDisposable Subscribe<TEventHandler>(
TEventHandler handler,
Action<TEventHandler> subscribe,
Action<TEventHandler> unsubscribe) where TEventHandler : Delegate
{
return new EventSubscription<TEventHandler>(handler, subscribe, unsubscribe);
}
public void Dispose()
{
_channel.Writer.Complete();
}
}
The event subscription code now looks like this:
using var eventChannel = new EventChannel<(Component, string)>();
// push Click events to the channel
using var clickHandler = eventChannel.Subscribe<EventHandler>(
(s, e) => eventChannel.Post((button as Component, $"Cicked on {DateTime.Now}")),
handler => button!.Click += handler,
handler => button!.Click -= handler);
// push Tick events to the channel
using var tickHandler = eventChannel.Subscribe<EventHandler>(
(s, e) => eventChannel.Post((timer as Component, $"Ticked on {DateTime.Now}")),
handler => timer!.Tick += handler,
handler => timer!.Tick -= handler);
// process events as async stream via ToAsyncEnumerable(),
await ReadEventStreamAsync(eventChannel.ToAsyncEnumerable(cts.Token), cts.Token);
The consumer part almost hasn't changed, we only removed source.WithCancellation(token)
which is now redundant:
static async Task ReadEventStreamAsync(
IAsyncEnumerable<(Component, string)> source,
CancellationToken token)
{
await foreach (var (component, text) in source)
{
// e.g., delay processing
await Task.Delay(100, token);
Console.WriteLine($"{component.GetType().Name}: {text}");
}
}
It produces exactly the same result as with ReactiveX (provided we can manage to click the button with the same intervals 🙂). The full source code (a .NET Core 3.1 project) can be found in this repo.
Conclusion
The domain of the problems that C# asynchronous streams can help solving certainly overlaps with that of the Reactive Extensions (aka ReactiveX/Rx.NET/Rx). E.g., in the first example above I could have just subscribed to mergedObservable
notifications and used the powerful toolbox of System.Reactive.Linq
extensions to process them.
That said, I personally find it is easier to understand the pseudo-liner code flow of async
/await
, than the fluent syntax of ReactiveX. In my opinion, it may also be easier to structure the exception handling (particularly, cancellations) while using the familiar language constructs like foreach
, yield return
, try
/catch
/finally
. I myself only recently came across IAsyncEnumerable
, and I decided to try it out by implementing coroutines in C#. I certainly didn't need ReactiveX for that.
However, it should be quite possible to combine ReactiveX and C# asynchronous streams to work together for complex asynchronous workflows.
I hope this has been useful. I'll be blogging more on this topic as I make progress with my side project.
Updated, here's a follow-up blog post:
Asynchronous coroutines with C# 8.0 and IAsyncEnumerable.
Follow me on twitter for further updates, if interested.
Top comments (4)
There is some work in this area to combine Rx.NET with Asynchronous streams.
Hopefully it will be inside Rx.NET one day github.com/dotnet/reactive/pull/435.
I think you mean
AsyncRx.NET
, in which case it's already there in the master. However, it hasn't been officially published as a Nuget package, and sadly, it doesn't look like it will be any time soon:Note that
AsyncRx.NET
is built around IAsyncObservable<T>, which actually is dual toIAsyncEnumerable<T>
(similar to how Rx'sIObservable<T>
is dual toIEnumerable<T>
). So, even though Rx maintainers recommend usingAsyncEnumerables
above, it isn't a direct substitute for what can be done withIAsyncEnumerable
(e.g., here's how they play with it).OTOH, it appears that Project Orleans uses
IAsyncEnumerable
for their async streams implementations, so maybe it will have the future there.Do you have any practical examples where someone would want to do this?
It's a good question. In theory, I imagine it could be any code that needs to process a sequence of events, e.g., collecting data from an IoT device or processing stock market updates. For use cases like these, we would probably want to use a bounded channel or something like
BufferBlock<T>
as event buffer on the producer side, to prevent the queue from growing indefinitely.In practice though, I myself so far only have used that for automated coded UI testing of my side project
DevComrade
with asynchronous coroutines approach. In that case, the consumer ofIAsyncEnumerable
stream is essentially a pseudo-linear script (thanks to async/await) that just expects UI events in a particular order. I.e., it doesn't even need anawait foreach
loop. I'll go into more details about that with another blog post.