I've been recently working with Python's very nice concurrent.futures.Executor (the ProcessPoolExecutor particularly). When combining it with the async - await magic I've come across an important gotcha that had me confused for a while.
So I have a function running inside an eventloop. In that function I create a ProcesPoolExecutor and submit some actions to it. As soon as one action completes I want to do something with its result and then continue to wait for the next action, so until all of them are done. My code looked like this:
async def parse_blocks(blocks):
parser = BlockParser()
pending_blocks = []
for block in blocks:
future_result = executor.submit(parser.parse, block, 0)
pending_blocks.append(future_result))
while len(pending_blocks):
done_blocks, pending_blocks = await asyncio.wait(pending_blocks, return_when=asyncio.FIRST_COMPLETED)
for done_block in done_blocks:
print("block parsed!)
result_writer.write(done_block)
print("all blocks parsed")
The above code was giving me an odd exception in the wait call: TypeError: An asyncio.Future, a coroutine or an awaitable is required. But, executor.submit returns a Future, so what's happening?. Well, executor.submit returns a concurrent.futures.Future, and asyncio.wait is expecting an asyncio.Future. Hopefully we can obtain an asyncio.Future from a concurrent.futures.Future by means of asyncio.wrap_future. I found about it here.
To convert a concurrent.futures.Future into an asyncio.Future, you can call asyncio.wrap_future. The returned asyncio future is awaitable in the asyncio event loop and will complete when the underlying threading future completes. This is effectively how run_in_executor is implemented.
So this code now works fine:
async def parse_blocks(blocks):
parser = BlockParser()
pending_blocks = []
for block in blocks:
future_result = executor.submit(parser.parse, block, 0)
pending_blocks.append(asyncio.wrap_future(concurrent_futures_future))
while len(pending_blocks):
done_blocks, pending_blocks = await asyncio.wait(pending_blocks, return_when=asyncio.FIRST_COMPLETED)
for done_block in done_blocks:
print("block parsed!)
result_writer.write(done_block)
print("all blocks parsed")
It seems odd to have to different Future classes, that both represent the same, a result that will be provided in the future. Well one of the differences that I get (for example from this discussion) is that asyncio.Future.add_done_callback will invoke its callbacks through the event loop, which I guess is necessary for the coopeative nature of asyncio. A concurrent.futures.Future will not use the eventloop for those callbacks, so this probably would break that cooperativism. This discussion goes a bit more in depth.
No comments:
Post a Comment