Thursday, 17 June 2021

Playing with Ix.Net and IAsyncEnumerable

When IAsyncEnumerable was introduced along with C# 8 many of us missed missed that Linq to Objects had not been extended to work with IAsyncEnumerables as it does with IEnumerables. Well, the community took the effort in their own hands and it's called System.Linq.Async. It's also called (or it's part of, I'm a bit confused about it) Ix.NET (Interactive Extensions for .Net, one part of the Reactive Extensions for .Netproject) and is sort of the equivalent to the nice javascript library IxJS about which I talked in this post.

As expected, it includes all the main linq methods "Select, Where" so that you can filter, transform... your async pull streams. But in addition, you have methods like SelectAwait, WhereAwait... These methods use an async function (a function returning a Task) rather than a normal function. This means that you can project, filter... your asynchronous streams both with synchronous and asynchronous functions. Let's see an example where I apply first and async function, then filter with a synchronous one and finally apply a synchronous function:



private static async IAsyncEnumerable<string> GetCurrentTrendyCities()
{
    foreach (var city in new List<string> {"Paris", "Toulouse", "Berlin", /*"Bruxelles",*/ "Hamburg" })
    {
        Console.WriteLine("Obtaining Trendy City");
        await Task.Delay(1000);
        Console.WriteLine("Trendy City Obtained");

        yield return city;

    }
}

public static async Task<Country> SearchCountryAsync(string city)
{
    await Task.Delay(2000);
    return countries.FirstOrDefault(country => country.MainCities.Contains(city));
}

private static async Task RunAsyncEnumerableSample(IAsyncEnumerable<string> asyncCities)
{
    var countries = asyncCities.SelectAwait(async (city) => {
        var country = await SearchCountryAsync(city);
        return (city, country);
    })
    .Where (cityCountryTuple => cityCountryTuple.country != null)
    .Select(cityCountryTuple => $"{cityCountryTuple.city} - {cityCountryTuple.country.Name.ToUpper()}");
    
    await foreach(var cityCountry in countries)
    {
        Console.WriteLine(cityCountry);
    }

}


RunAsyncEnumerableSample(GetCurrentTrendyCities());

        

Sometimes we need to apply asynchronous functions to synchronous enumerables. The Linq to Objects extension methods are designed to work with synchronous functions, so when using them with functions returning a Task, you'll end up working all over your methods chain with IEnumerable<Task<TResult>> rather than with IEnumerable<TResult> which makes the thing a bit odd. Let's see what I mean:


IEnumerable<string> citiesToSearch = new List<string> {"Paris", "Toulouse", "Berlin", "Hamburg" };
var cityPromises = citiesToSearch.Select(async (city) => await SearchCountryAsync(city))
    .Select(async (country) => (await country).Name.ToUpper());

foreach (var cityPromise in cityPromises)
{
    var city = await cityPromise;
    Console.WriteLine(city);
}

A much more elegant solution is obtaining an IAsyncEnumerable from your IEnumerable (with ToAsyncEnumerable) and then use the Linq.Async methods, like this:


IEnumerable<string> citiesToSearch = new List<string> {"Paris", "Toulouse", "Berlin", "Hamburg" };
IAsyncEnumerable<string> asyncCities =  citiesToSearch.ToAsyncEnumerable();          
var countries = asyncCities.SelectAwait(async (city) => await SearchCountryAsync(city))
    .Where (country => country != null)
    .Select(country => country.Name.ToUpper());

await foreach(var country in countries)
{
    Console.WriteLine(country);
}

No comments:

Post a Comment