One of the most exciting features of .NET Core 3.0 and C# 8.0 has been the addition of IAsyncEnumerable<T>
(aka async streams). But what's so special about it? What can we do now that wasn't possible before?
In this article, we'll look at what challenges IAsyncEnumerable<T>
is intended to solve, how to implement it in our own applications, and why IAsyncEnumerable<T>
will replace Task<IEnumerable<T>>
in many situations.
Check out all the new features in .NET Core 3
Life before IAsyncEnumerable<T>
Perhaps the best way to illustrate why IAsyncEnumerable<T>
is useful is to take a look at what challenges exist without it.
Imagine we're building a data access library, and we need a method that queries a data store or API for some data. It's pretty common for that method to return Task<IEnumerable<T>>
, like this:
public async Task<IEnumerable<Product>> GetAllProducts()
To implement the method, we typically perform some data access asynchronously, then return all the data when it's finished. The problem with this becomes more evident when we need to make multiple asynchronous calls to obtain the data. For example, our database or API could be returning data in pages, like this implementation that uses Azure Cosmos DB:
public async Task<IEnumerable<Product>> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
var products = new List<Product>();
while (iterator.HasMoreResults)
{
foreach (var product in await iterator.ReadNextAsync())
{
products.Add(product);
}
}
return products;
}
Notice we are paging through all the results in a while loop, instantiating all the product objects, placing them into a List<Product>
, and finally we return the whole thing. This is quite inefficient, especially for larger datasets.
Maybe we can create a more efficient implementation by changing our method to return results one page at a time:
public IEnumerable<Task<IEnumerable<Product>>> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
yield return iterator.ReadNextAsync().ContinueWith(t =>
{
return (IEnumerable<Product>)t.Result;
});
}
}
The caller would consume the method like this:
foreach (var productsTask in productsRepository.GetAllProducts())
{
foreach (var product in await productsTask)
{
Console.WriteLine(product.Name);
}
}
This implementation is more efficient, but the method now returns IEnumerable<Task<IEnumerable<Product>>>
. As we can see in the calling code, it's not intuitive to understand how to invoke the method and process the data. More importantly, paging is an implementation detail of the data access method that the caller should know nothing about.
IAsyncEnumerable<T>
to the rescue
What we really want to do is to retrieve data asynchronously from our database and stream results back to the caller as they become available.
In synchronous code, a method that returns IEnumerable<T>
can use the yield return
statement to return each piece of data to the caller as it is returned from the database.
public IEnumerable<Product> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
foreach (var product in iterator.ReadNextAsync().Result)
{
yield return product;
}
}
}
However, DO NOT DO THIS! The above code turns the async database call into a blocking call and will not scale.
If only we could use yield return
with asynchronous methods! That hasn't been possible... until now.
IAsyncEnumerable<T>
was introduced in .NET Core 3 (.NET Standard 2.1). It exposes an enumerator that has a MoveNextAsync()
method that can awaited. This means the producer can make asynchronous calls in between yielding results.
Instead of returning a Task<IEnumerable<T>>
, our method can now return IAsyncEnumerable<T>
and use yield return
to emit data.
public async IAsyncEnumerable<Product> GetAllProducts()
{
Container container = cosmosClient.GetContainer(DatabaseId, ContainerId);
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
foreach (var product in await iterator.ReadNextAsync())
{
yield return product;
}
}
}
To consume the results, we need to use the new await foreach()
syntax available in C# 8:
await foreach (var product in productsRepository.GetAllProducts())
{
Console.WriteLine(product);
}
This is much nicer. The method produces data as they are available. The calling code consumes the data at its own pace.
IAsyncEnumerable<T>
and ASP.NET Core
Starting with .NET Core 3 Preview 7, ASP.NET is able to return IAsyncEnumerable<T>
from an API controller action. That means we can return our method's results directly -- effectively streaming data from the database to the HTTP response.
[HttpGet]
public IAsyncEnumerable<Product> Get()
=> productsRepository.GetAllProducts();
Replacing Task<IEnumerable<T>>
with IAsyncEnumerable<T>
As times goes by and the adoption .NET Core 3 and .NET Standard 2.1 grows, expect to see IAsyncEnumerable<T>
to be used in places where we've typically used Task<IEnumerable<T>>
.
I look forward to seeing libraries support IAsyncEnumerable<T>
. Throughout this article, we've seen code like this for querying data using the Azure Cosmos DB 3.0 SDK:
var iterator = container.GetItemQueryIterator<Product>("SELECT * FROM c");
while (iterator.HasMoreResults)
{
foreach (var product in await iterator.ReadNextAsync())
{
Console.WriteLine(product.Name);
}
}
Like our earlier examples, Cosmos DB's own SDK also leaks its paging implementation detail and that makes it awkward to process query results.
To see what it could look like if GetItemQueryIterator<Product>()
returned IAsyncEnumerable<T>
instead, we can create an extension method on FeedIterator
:
public static class FeedIteratorExtensions
{
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this FeedIterator<T> iterator)
{
while (iterator.HasMoreResults)
{
foreach(var item in await iterator.ReadNextAsync())
{
yield return item;
}
}
}
}
Now we can process our query results in a much cleaner way:
var products = container
.GetItemQueryIterator<Product>("SELECT * FROM c")
.ToAsyncEnumerable();
await foreach (var product in products)
{
Console.WriteLine(product.Name);
}
Summary
IAsyncEnumerable<T>
is a welcomed addition to .NET and will make for much cleaner and more efficient code in many cases. Learn more about it with these resources:
Top comments (18)
This is cool. Can you write something about SignalR? Thank you.
Sure. Anything in particular that you want to read about?
I would love to read about a current state of SignalR / SSE (Server Sent Events) / gRPC / anything else? - features, gaps, areas to apply/consider using for. What is implemented / supported in .NET Core 3 for "realtime" like apps.
The use of IAsyncEnumerable in a SignalR application. Especially regarding clients communicating to each other. Thank you
It's great. I love SignalR!!
I would love to read about singalR
Is there any way that u can show with stopwatch or with memory consumption matching, showing old vs new way is much efficient?
I don't think it will be more efficient in that sense - it blocks less threads so your server can scale better (await more IO bound operations like the DB calls here)
If you use this for algorithms (CPU bound operations) the runtime will probably be worse (there surely is even more overhead than the one produced by async/awaits state machines)
Do you know how does it work under the hood? I mean, in your example you pass query like
select * from...
but, since it's lazy loaded, what's the actual query executed?How I see it is it retrieves records one by one, which is something similar to looping through
IQueryable
.Am I wrong? If so, how does it work exactly?
I don't know exactly how this works with other DB servers but MSSQL uses so called TDS (Tabular Data Stream) protocol to deliver results which is a stream of data rows. This allows asynchronous stream processing within .Net Core applications leveraging IAsyncEnumerable<>.
Can you provide an F# example? I've been looking for a way to return an asynSeq.
That's awesome! Thanks for sharing!
I'm new to this whole progg. stuff but I know there isn't no one out there better than Microsoft. Go .NET !!! hope I get to use this new feature one day
again GO GO .NET
Nice article, thanks. A thing I wonder when you are using IAsyncEnumerable in controller as you return type of a route. How do you handle the cas where you want to return a NotFoundResult when no items are in the IAsyncEnumerable.
Does the new interface support on demand pull?
I.e. what happens if the table contains 100 records but the client uses Take(15)?
The
MoveNextAsync
method is called 15 times. Wether or not it does something is an implementation detail.This article very nice. Useful and easy for understand ! Thank Chu !
That's great! It's very helpful. Thanks for sharing.