Monday 19 September 2022

Python AsyncExecutor

After writing my JavaScript AsyncExecutor it became clear that at some point I would have to do a Python AsyncExecutor :-)

I already did a very basic post some months ago about the differences between JavaScript and Python async/await. The way async/await, Native Coroutines, Tasks and Futures works in Python seems more complex to me than async/await and Promises in Javascript, though maybe it's just that I'm more familiar with async code in JavaScript.

Anyway, implementing an AsyncExecutor is not much different. In Python we have not just Promises as awaitable objects, but Native Coroutine objects, Futures and Tasks (that indeed derive from Future). So we submit to the AsyncExecutor "awaitable functions" (functions that return any of those awaitable objects), along with its parameters. The AsyncExecutor returns a Future, that will be resolved (will have its result set) when the corresponding asynchronous function completes. The _run_action() method takes care of running the "awaitable function" when a slot is available. _run_action is a Native Coroutine, so when we invoke it either from submit() (for the first submitted functions, before filling the available slots) or from _process_result() (as slots are released), 2 functions that are not coroutines themselves, we have to use asyncio.create_task(self._run_action(action)). This is the main difference with the JavaScript version, where functions marked as async are executed immediatelly, with no need of awaiting them (or creating a task). I copy the code here (and I have also uploaded it to a gist)


import asyncio
from dataclasses import dataclass
import random
from typing import Any
import json

@dataclass
class AsyncAction:
    future: asyncio.Future
    awaitable_fn: Any # function that returns an awaitable (coroutine, a task...)
    args: list[Any]
    kwargs: list[Any]


class AsyncExecutor:
    def __init__(self, event_loop: asyncio.AbstractEventLoop, max_running_actions: int):
        self.event_loop = event_loop
        self.max_running_actions = max_running_actions
        self.running_counter = 0
        self.not_launched_actions = []

    def submit(self, awaitable_fn, *args, **kwargs) -> asyncio.Future:
        """
        receives a function to be executed when there's one available slot. That function returns and awaitable
        """
        future = self.event_loop.create_future()
        action = AsyncAction(future, awaitable_fn, args, kwargs)
        if self.running_counter < self.max_running_actions:
            self.running_counter += 1
            # _run_action returns a coroutine, so if I'm not awaiting it need to run it as a task
            #self._run_action(action)
            asyncio.create_task(self._run_action(action))
        else:
            self.not_launched_actions.append(action)
        return future

    async def _run_action(self, action: AsyncAction):
        result = await action.awaitable_fn(*(action.args), **(action.kwargs))
        self._process_result(action, result)

    def _process_result(self, action: AsyncAction, result: Any):
        self.running_counter -= 1
        action.future.set_result(result)
        if len(self.not_launched_actions):
            self.running_counter += 1
            asyncio.create_task(self._run_action(self.not_launched_actions.pop(0)))           


async def mock_download(url: str, delay: int):
    print("starting mock download")
    await asyncio.sleep(delay)
    return url.upper()

def create_download_task(url: str, delay: int):
    print(create_download_task.__name__)
    return asyncio.get_running_loop().create_task(mock_download(url, delay))
 

async def main():
    async_executor = AsyncExecutor(asyncio.get_running_loop(), 4)
    futures = []
    for i in range(0,10):
        delay = random.randint(1, 4)
        if i % 2 == 0:
            future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay)
        else:
            future = async_executor.submit(create_download_task, f"www.jesoutienslapolice.fr/post_{i}", delay)

        future.add_done_callback(lambda fut: print(f"{fut.result()} done"))
        futures.append(future)
    future = async_executor.submit(mock_download, f"www.jesoutienslapolice.fr/post_{i}", delay)
    future.add_done_callback(lambda fut: print(f"{fut.result()} done"))
    futures.append(future)
    print(f"{len(futures)} submitted")

    results = await asyncio.gather(*futures)
    print(f"all finished: {json.dumps(results, indent=4)}")


asyncio.run(main())

As we've seen, asyncio.create_task is really important in python's asynchronous code. This discussion helps to understand what it does.

It submits the coroutine to run "in the background", i.e. concurrently with the current task and all other tasks, switching between them at await points. It returns an awaitable handle called a "task" which you can also use to cancel the execution of the coroutine.

It's one of the central primitives of asyncio, the asyncio equivalent of starting a thread. (In the same analogy, awaiting the task with await is the equivalent of joining a thread.)

No comments:

Post a Comment