0 Comments Posted in:

Suppose in C# we have a number of tasks to perform that we're currently doing sequentially, but would like to speed up by running them in parallel. As a trivial example, imagine we're downloading a bunch of web pages like this:

var urls = new [] { 
        "https://github.com/naudio/NAudio", 
        "https://twitter.com/mark_heath", 
        "https://github.com/markheath/azure-functions-links",
        "https://pluralsight.com/authors/mark-heath",
        "https://github.com/markheath/advent-of-code-js",
        "http://stackoverflow.com/users/7532/mark-heath",
        "https://mvp.microsoft.com/en-us/mvp/Mark%20%20Heath-5002551",
        "https://github.com/markheath/func-todo-backend",
        "https://github.com/markheath/typescript-tetris",
};
var client = new HttpClient();
foreach(var url in urls)
{
    var html = await client.GetStringAsync(url);
    Console.WriteLine($"retrieved {html.Length} characters from {url}");
}

To parallelize this, we could just turn every single download into a separate Task with Task.Run and wait for them all to complete, but what if we wanted to limit the number of concurrent downloads? Let's say we only want 4 downloads to happen at a time.

In this trivial example, the exact number might not matter too much, but it's not hard to imagine a situation in which you would want to avoid too many concurrent calls to a downstream service.

In this post I'll look at four different ways of solving this problem.

Technique 1 - ConcurrentQueue

The first technique has been my go-to approach for many years. The basic idea is to put the work onto a queue and then have multiple threads reading off that queue. This is a nice simple approach but it does require us to remember to lock the queue as it will be accessed by multiple threads. In this example I'm using ConcurrentQueue to give us thread safety.

We fill the queue with all the urls to download, and then start one Task for each thread that simply sits in a loop trying to read from the queue, and exits when there are no more items left in the queue. We put each of these queue reader tasks in a list and then use Task.WhenAll to wait for them all to exit, which will happen once the final download has completed.

var maxThreads = 4;
var q = new ConcurrentQueue<string>(urls);
var tasks = new List<Task>();
for(int n = 0; n < maxThreads; n++)
{
    tasks.Add(Task.Run(async () => {
        while(q.TryDequeue(out string url)) 
        {
            var html = await client.GetStringAsync(url);
            Console.WriteLine($"retrieved {html.Length} characters from {url}");
        }
    }));
}
await Task.WhenAll(tasks);

I still like this approach as its conceptually simple. But it can be a bit of a pain if we are still generating more work to do while we've started processing work as the reader threads could exit too early.

Technique 2 - SemaphoreSlim

Another approach (inspired by this StackOverflow answer) to use a SemaphoreSlim with an initialCount equal to the maximum number of threads. Then you use WaitAsync to wait until it's OK to queue up another. So immediately we kick off four tasks, but then have to wait for the first of those to finish before we get past WaitAsync to add the next.

var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: maxThreads);
foreach (var url in urls)
{
    await throttler.WaitAsync();
    allTasks.Add(
        Task.Run(async () =>
        {
            try
            {
                var html = await client.GetStringAsync(url);
                Console.WriteLine($"retrieved {html.Length} characters from {url}");
            }
            finally
            {
                throttler.Release();
            }
        }));
}
await Task.WhenAll(allTasks);

The code here is a bit more verbose than the ConcurrentQueue approach and also ends up with a potentially huge list containing mostly completed Tasks, but this approach does have an advantage if you are generating the tasks to be completed at the same time you are executing them.

For example, to upload a large file to Azure blob storage you might read 1MB chunks sequentially, but want to upload up to four in parallel. You don't want to read all the chunks in advance of uploading them as that uses a lot of time and memory before we can even start uploading. With this approach we can generate the work to be done just in time, as threads become available for uploading which is more efficient.

Technique 3 - Parallel.ForEach

The Parallel.ForEach method at first appears to be the perfect solution to this problem. You can simply specify the MaxDegreeOfParallelism and then provide an Action to perform on each item in your IEnumerable:

var options = new ParallelOptions() { MaxDegreeOfParallelism = maxThreads };
Parallel.ForEach(urls, options, url =>
    {
        var html = client.GetStringAsync(url).Result;
        Console.WriteLine($"retrieved {html.Length} characters from {url}");
    });

Looks nice and simple doesn't it? However, there is a nasty gotcha here. Because Parallel.ForEach takes an Action, not a Func<T> it should only be used to call synchronous functions. You might notice we've ended up putting a .Result after GetStringAsync which is a dangerous antipattern.

So unfortunately, this method should only be used if you have a synchronous method you want to perform in parallel. There is a NuGet package that implements an asynchronous version of Parallel.ForEach so you could try that if you'd like to write something like this instead:

await uris.ParallelForEachAsync(
    async url =>
    {
        var html = await httpClient.GetStringAsync(url);
        Console.WriteLine($"retrieved {html.Length} characters from {url}");
    },
    maxDegreeOfParalellism: maxThreads);

Technique 4 - Polly Bulkhead Policy

The final technique is to use the "Bulkhead" isolation policy from Polly. A bulkhead policy restricts the number of concurrent calls that can be made, and optionally allows you to queue up calls that exceed that number.

Here we set up a bulkhead policy with a constrained number of concurrent executions and an unlimited number of queued tasks. Then we simply call ExecuteAsync repeatedly on the bulkhead policy, allowing it to either run it immediately or queue it up if too many.

    var bulkhead = Policy.BulkheadAsync(maxThreads, Int32.MaxValue);
    var tasks = new List<Task>();
    foreach (var url in urls)
    {
        var t = bulkhead.ExecuteAsync(async () =>
        {
            var html = await client.GetStringAsync(url);
            Console.WriteLine($"retrieved {html.Length} characters from {url}");
        });
        tasks.Add(t);
    }
    await Task.WhenAll(tasks);

As with several other of our solutions we put the tasks into a list and use Task.WhenAll to wait for them. It is worth pointing out though, that this pattern is really designed for the situation where concurrent tasks are being generated from multiple threads (for example from ASP.NET controller actions). They simply use a shared bulkhead policy and then you just run a task with await bulkhead.ExecuteAsync(...). So this approach is very straightforward to use in the situations it is designed for.

Summary

Parallelism can greatly speed up the overall performance of your application, but when misused can cause more problems than it solves. These patterns allow you to use a constrained number of threads to work through a batch of jobs. The one you should pick depends on the way you're generating tasks - do you know them all up front, or are they created on the fly while you're already processing earlier tasks? And are you generating these tasks sequentially on a single thread, or are multiple threads able to produce additional work items on the fly?

Of course, I'm sure there are plenty of other clever ways of approaching this problem, so do let me know in the comments what your preffered solution is.

Vote on HN