Sunday 4 December 2022

Future vs Future (asyncio vs concurrent.futures)

I've been recently working with Python's very nice concurrent.futures.Executor (the ProcessPoolExecutor particularly). When combining it with the async - await magic I've come across an important gotcha that had me confused for a while.

So I have a function running inside an eventloop. In that function I create a ProcesPoolExecutor and submit some actions to it. As soon as one action completes I want to do something with its result and then continue to wait for the next action, so until all of them are done. My code looked like this:


async def parse_blocks(blocks):
	parser = BlockParser()
	pending_blocks = []
	for block in blocks:
		future_result = executor.submit(parser.parse, block, 0)
		pending_blocks.append(future_result)) 
	while len(pending_blocks):
		done_blocks, pending_blocks = await asyncio.wait(pending_blocks, return_when=asyncio.FIRST_COMPLETED)
		for done_block in done_blocks:
			print("block parsed!)
			result_writer.write(done_block)
	print("all blocks parsed")


The above code was giving me an odd exception in the wait call: TypeError: An asyncio.Future, a coroutine or an awaitable is required. But, executor.submit returns a Future, so what's happening?. Well, executor.submit returns a concurrent.futures.Future, and asyncio.wait is expecting an asyncio.Future. Hopefully we can obtain an asyncio.Future from a concurrent.futures.Future by means of asyncio.wrap_future. I found about it here.

To convert a concurrent.futures.Future into an asyncio.Future, you can call asyncio.wrap_future. The returned asyncio future is awaitable in the asyncio event loop and will complete when the underlying threading future completes. This is effectively how run_in_executor is implemented.

So this code now works fine:


async def parse_blocks(blocks):
	parser = BlockParser()
	pending_blocks = []
	for block in blocks:
		future_result = executor.submit(parser.parse, block, 0)
		pending_blocks.append(asyncio.wrap_future(concurrent_futures_future)) 
	while len(pending_blocks):
		done_blocks, pending_blocks = await asyncio.wait(pending_blocks, return_when=asyncio.FIRST_COMPLETED)
		for done_block in done_blocks:
			print("block parsed!)
			result_writer.write(done_block)
	print("all blocks parsed")


It seems odd to have to different Future classes, that both represent the same, a result that will be provided in the future. Well one of the differences that I get (for example from this discussion) is that asyncio.Future.add_done_callback will invoke its callbacks through the event loop, which I guess is necessary for the coopeative nature of asyncio. A concurrent.futures.Future will not use the eventloop for those callbacks, so this probably would break that cooperativism. This discussion goes a bit more in depth.

No comments:

Post a Comment