Tuesday 16 January 2024

Asyncio as_completed

When looking into one colleague's code recently I realised that I was missing the right/simple way to deal with one common asyncio situation. So I have a list of Awaitables (Futures/Tasks) and I want to run some code as soon as any of them completes, and continue to do so until all of them have completed. Given an async function like this that I will be callintg in paralell:



delays = {
    "A1": 2,
    "B1": 1,
    "C1": 0.1,
}

async def get_post(id: str) -> str:
    print(f"getting post: {id}")
    if not id in delays:
        await asyncio.sleep(0.5)
        raise Exception(f"Missing post: {id}")   
     
    await asyncio.sleep(delays[id])
    return f"POST: [[{id}]]"


I was using asyncio.wait in a loop, like this:



async def retrieve_posts():
    post_ids = ["A1", "B1", "C1", "D1"]
    pending_tasks = [asyncio.create_task(get_post(id)) for id in post_ids]
    while len(pending_tasks):
        done_tasks, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
        for done_task in done_tasks:
            try:
                # at this point, where the done_block is a resolved Future, these 2 sentences are equivalent:
                #block_result = done_task.result()
                post = await done_task
                print(f"post obtained: {post}")
            except Exception as ex:
                print(f"Error retrieving post: {ex}")
                continue

asyncio.run(retrieve_posts())

That works, but my colleage was using a more elegant approach, asyncio.as_completed. as_completed returns an iterator that in each iteration returns a Future (a new one, not one of those that you've passed over to it). That new Future resolves as soon as one of the provided Futures returns. This means that you can rewrite the above like this:



async def retrieve_posts():
    post_ids = ["A1", "B1", "C1", "D1"]
    tasks = [asyncio.create_task(get_post(id)) for id in post_ids]
    for task in asyncio.as_completed(tasks):
        try:
            post = await task
            print(f"post obtained: {post}")
        except BaseException as ex:
            print("Exception getting post: {ex}")
            

asyncio.run(retrieve_posts())	

So asyncio.wait is a better fit when you have some awaitables and as they resolve you'll be launching additional async tasks. asyncio.as_completed should be used when you have beforehand all the awaitables that you're going to run.

Looping over awaitables brings to my mind that python feature, the async for construct (equivalent to JavaScript for-await). I wonder why as_complete returns an iterable-iterator rather than an asynchronous iterable-iterator, but well, we can easily leverage as_completed to create an asynchronous generator, like this:



    async def as_completed_generator(awaitables: list[Awaitable]):
        for aw in asyncio.as_completed(awaitables):
            try:
                res = await aw
                yield res
            except BaseException as ex:
                yield ex

    async def retrieve_posts():
        post_ids = ["A1", "B1", "C1", "D1"]
        tasks = [asyncio.create_task(get_post(id)) for id in post_ids]
        async for post in as_completed_generator(tasks):
            if not isinstance(post, Exception):
                print(f"post retrieved: {post}")
            else:
                print(f"Exception: {post}")	
				

Notice how in order to allow us manage rejected awaitables our async generator yields either values or exceptions.

I'll leverage this post about asyncio to mention something that I did not include in this previous post about Futures vs Futures. The result() method in concurent.futures.Future is a blocking method, that blocks the current thread until a result is available, while the result() method in asyncio.Future is not. It will immediatelly return a value/throw and exception if the Future has been resolved/rejected, or throw an exception (InvalidStateException) if it's still suspended.

No comments:

Post a Comment