Using some code at work pretty similar to this code from last December I've come across an exception that was confusing me quite a lot. So I have some code more or less like this:
async def parse_blocks(blocks):
parser = BlockParser()
pending_blocks = []
for block in blocks:
future_result = executor.submit(parser.parse, block, 0)
pending_blocks.append(future_result))
while len(pending_blocks):
done_blocks, pending_blocks = await asyncio.wait(pending_blocks, return_when=asyncio.FIRST_COMPLETED)
for done_block in done_blocks:
#this is equivalent to the below: parsed_block = await done_block
parsed_block = done_block.result()
# do whatever
print("all blocks parsed")
The parser.parse function that I submit to the pool has a try-except wrapping its content, I mean, something like this:
class Parser:
def parse(self, block):
try:
#whatever
result = NormalResult()
except Exception as ex:
result = FailedResult()
return result
So there's no reason for a process in the process-pool of the ProcessPoolExecutor to crash. But to my surprise the main process was crashing in parse_blocks(), with a BrokenProcessPool exception happening in parsed_block = done_block.result(). When you access the result of a "rejected" Future (OK, yes, resolved-rejected is JavaScript terminology, I mean a Future that does not complete with a result, but with an exception), the exception that you have set with set_exception() is thrown. But, how can that Future finish with an exception if as I've said the code (running in the Process-Pool) that will "resolve-reject" that Future is in a try-except?
Well, that can happen if someone kills that child process. I guess the ProcessPoolExecutor detects that one of its process has died and sets an exception in the corresponding Future (that then is propagated to the asyncio.Future, remember from my december post that I'm wrapping the concurrent.futures.Future in an asyncio.Future). That way when you try to access the Future's result you get an exception.
As for who was killing that pool process another previous post comes into play. In some occasions some of the tasks that I submit to the pool involves some massive processing, and we end up using all the RAM and swap. At that point the OOM Killer comes into play and kills the process that more memory is using, that is one of the processes of the pool. Checking the kernel ring buffer with dmesg I could find an "Out of memory: Kill process" that corresponded with one of the processes of the Pool (I was writing to my application log the id's of these processes)
While writing this post I've been thinking again about why we have 2 kinds of Futures: concurrent.futures.Future and asyncio.Future. Given that the asyncio.Future is intended for use in an eventloop, you can not block on it to get a result. When you call to result() on it if the Future has not completed yet you'll get an exception. This is different in concurrent.futures.Future, where the result() method will block waiting for the Future to complete.
No comments:
Post a Comment