Posted in:

In this second part of my series on IAsyncEnumerable<T> (part 1 is here), I want to consider what happens when we want to make asynchronous calls within a LINQ pipeline. This is actually something that's very difficult to achieve with an IEnumerable<T> but much more straightforward with IAsyncEnumerable<T>.

Asynchronous Mapping and Filtering

LINQ supports many "operators" that can be chained together into pipelines. The most commonly used are probably the LINQ Select and Where operators for mapping and filtering elements in a sequence.

These will serve as good examples of the challenges of introducing asynchronous code into a regular LINQ pipeline.

Consider this simple LINQ pipeline, where we have a list of filenames and want to find which are large files. We might do that with a simple Select and Where like this:

var largeFiles = fileNames
                    .Select(f => GetFileInfo(f))
                    .Where(f => f.Length > 1000000);

This is fine, but lets imagine that getting the file size is an asynchronous operation (for example, instead of local files, maybe these are Azure blobs). What developers will often try is something like this, where they make an asynchronous call in the Select:

// will not compile, as we now have a sequence of Task<FileInfo>
var largeFiles = fileNames
                    .Select(async f => await GetFileInfoAsync(f))
                    .Where(f => f.Length > 1000000);

Of course, that code doesn't even compile, as now we've got an IEnumerable sequence of Task<FileInfo>, rather than FileInfo objects which is what our Where clause is expecting.

One ugly workaround that I see sometimes is to turn the asynchronous method back into a synchronous one by blocking (e.g. by calling Result). Whilst this "solves" the problem - it's an antipattern to block on tasks, for reasons of performance and potential deadlocks.

// "works" but is an antipattern - don't block on Tasks
var largeFiles = fileNames
                    .Select(f => GetFileInfoAsync(f).Result)
                    .Where(f => f.Length > 1000000);

Likewise, if the method in the Where clause is asynchronous, we have a similar problem:

// also won't compile
var corruptFiles = fileNames
                    .Select(f => GetFileInfo(f))
                    .Where(async f => await IsCorruptAsync(f));

Our "predicate" function needs to return a bool not a Task<bool> and although you can use the same trick to block, again this is an antipattern to be avoided:

// don't do this
var corruptFiles = fileNames
                    .Select(f => GetFileInfo(f))
                    .Where(f => IsCorruptAsync(f).Result);

So how can we resolve this?

Well, one way is to avoid writing LINQ pipelines that need to call asynchronous methods. That's actually quite a good practice, as LINQ encourages a "functional" style of programming, where you try to mostly use "pure" functions that have no "side-effects". Since they're not allowed to perform network or disk IO, they will not be asynchronous functions and you've pushed the problem out of your LINQ pipeline into some other part of the code.

But there may be some cases where it really would be helpful to perform asynchronous transformations to a sequence of data, and it turns out that IAsyncEnumerable<T> able to solve this problem.

LINQ Extensions for IAsyncEnumerable<T>

At first glance, IAsyncEnumerable<T> doesn't seem to help very much. If you try to chain a Select or Where onto an IAsyncEnumerable<T> you'll get a compile error.

However, if you reference the System.Linq.Async NuGet package then you'll get access to essentially all the same LINQ operators that you're familiar with using on a regular IEnumerable<T>. You can explore the code for the full list of available operators here.

In this post, we're particularly focusing on the Select and Where operators, and if we look at the code, we can see method signatures for those methods that work exactly the same as their IEnumerable<T> equivalents:

 IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> selector)
 IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)

This means that if we have an IAsyncEnumerable<T> we can use these extension methods to make a LINQ-like pipeline based on an IAsyncEnumerable<string> just like we did in our first example:

IAsyncEnumerable<string> fileNames = GetFileNames();
var longFiles = fileNames
                .Select(f => GetFileInfo(f))
                .Where(f => f.Length > 1000000);
await foreach(var f in longFiles)
{
    // ...
}

But of course, while this is very useful for mapping and filtering an IAsyncEnumerable<T> sequence, it doesn't address the question we started with of how we can call asynchronous methods inside the LINQ operators.

Fortunately, the System.Linq.Async NuGet package can help us here as well. In addition to the Select and Where methods whose lambdas work just like their IEnumerable<T> equivalents, it also provides SelectAwait and WhereAwait for the specific scenarios where we want to call asynchronous functions. These methods still return a regular IAsyncEnumerable<T> so they can be chained together into a pipeline.

Here's the method signatures for the "await" versions of Where and Select:

IAsyncEnumerable<TSource> WhereAwait<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
IAsyncEnumerable<TResult> SelectAwait<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> selector)

And here's an example showing how these operators allow us to make those asynchronous calls within a pipeline:

IAsyncEnumerable<string> fileNames = GetFileNames();
var corruptFiles = fileNames
                .SelectAwait(async f => await GetFileInfoAsync(f))
                .WhereAwait(async f => await IsCorruptAsync(f));
await foreach(var f in corruptFiles)
{
    // ...
}

by the way, if you're wondering why my lambdas are using the await syntax rather than just returning the method directly (e.g. SelectAwait(f => GetFileInfoAsync(f))), it's that the extension methods on IAsyncEnumerable<T> all expect a ValueTask<T> rather than a Task<T> which is more likely what your regular async methods are returning. This is because of a performance optimization that you can learn more about here, but the easiest way to deal with it is just to use await in the lambda.

So far we've seen that we can construct a LINQ pipeline on our IAsyncEnumerable<T>, even if the methods we want to call within our mapping and filtering steps are asynchronous. Let's see next how we can get the same benefits with IEnumerable<T>.

Converting an IEnumerable<T> into an IAsyncEnumerable<T>

Of course, our original example didn't start with an IAsyncEnumerable<string>, but an IEnumerable<string> instead. Fortunately, there's a very easy way to get around that, and that's by calling the ToAsyncEnumerable() extension method which converts from an IEnumerable<T> into IAsyncEnumerable<T>, allowing us to use those extension methods.

var files = new[] { "file1.txt", "file2.txt", "file3.txt"};
var corruptFiles = files
    .ToAsyncEnumerable()
    .SelectAwait(async f => await GetFileInfo(f))
    .WhereAwait(async f => await IsCorruptAsync(f));
await foreach(var f in corruptFiles)
{
    //...
}

By the way, there is also a ToEnumerable() extension method that does the opposite, but I highly recommend you avoid using it, as it will introduce the antipattern of making blocking calls on asynchronous methods to turn them into synchronous methods.

Even more extension methods for IAsyncEnumerable<T>

On top of the operators available in System.Linq.Async there is also an additional library of operators in the System.Interactive.Async NuGet package . You can explore the available operators here.

These additional operators help with a variety of common scenarios, and are definitely worth exploring if you find yourself working regularly with IAsyncEnumerable<T>.

I won't go into detail on the methods here (maybe another day), but the Merge extension method was particularly useful for a problem I was looking at recently. I had multiple IAsyncEnumerable<T> sequences, wanted to merge them together with the elements coming in whatever order they come out of their source sequences.

Summary

In this post we've seen how the System.Linq.Async extension methods make it possible to make asynchronous calls within a LINQ pipeline, whether you start with IAsyncEnumerable<T> or IEnumerable<T>. Of course, it's not always the best decision to introduce a lot of asynchronous methods into a pipeline, but there are situations where its useful.

I hope to follow up with another article in this series soon, where we look at some additional considerations to bear in mind when mixing LINQ and asynchronous method calls.

Want to learn more about LINQ? Be sure to check out my Pluralsight course LINQ Best Practices.

Comments

Comment by Stuart Turner

FYI: I'm currently working on a fork of MoreLinq (SuperLinq), to include most of the additional operators from MoreLinq to IAsyncEnumerable<t> (https://github.com/viceroyp...

Stuart Turner
Comment by Mark Heath

nice. MoreLinq is actually a bit harder to work with these days due to naming clashes with new extensions they've added to .NET 6. Would be nice for it to be cleaned up a bit

Mark Heath
Comment by Stuart Turner

Yes, I've already addressed that. :) Mainly because I submitted a PR to fix it over a year ago and they have not approved it. SuperLinq is completely compatible with .net 6.

Stuart Turner