Friday 22 July 2022

Run Function with Timeout

I've recently needed to be able to stop a function if it had not finished after a timeout period. I'm not talking about an eventloop and an asynchronous function returning an awaitable in Python (or a Promise in Javascript). Indeed, years ago I already posted about expirable promises in JavaScript. What I'm talking about is a synchronous Python function (normally doing some CPU bound calculation) that we want to stop if it has not finished after a given amount of time. I say stop its operation, not just stop waiting for it but let it run in the background uselessly consuming resources.

A solution that easily comes to mind is running the function in a separate Thread and joining that thread with the timeout option. If the timeout expires our calling code will stop blocking and continue on, but the problem is that the thread doing that costly CPU operation will continue to run and we have no way to kill it. This is something that really bothers me. Neither Python, nor .Net nor Java let you kill a thread "from outside" (I mean from another thread, what is still possible I think is killing the thread you are running in). I know that the reason for discouraging and not providing this functionality is that when you kill another thread it could be holding resources or doing something critical and killing it could mess things...) OK, but we are grown ups, so if you know that such thread can be killed withoug causing problems, you should be allowed... Well, anyway...

In order to overcome the limitations imposed by the GIL to parallelism, python provides an extremely powerful library to run your code in separate processes as easily as you run it in separate threads in other platforms: multiprocessing (and the higher level ProcessPoolExecutor in concurrent.futures). Contrary to threads, you are allowed to kill (terminate) processes, so I've implemented the timeout functionality running the function in a separate process.

So I have a create_func_with_timeout factory function, that receives a function fn and a timeout value and returns a new function that takes care or running fn in a separate process, waiting for it for a timeout time and throwing a TimeoutException if it expires. If fn itself were to throw an exception we want to have it available in the calling process, so to manage this we wrap the execution of fn in the mp_function_wrapper function. This function returns to the calling process the normal result of running fn (or an exception it that were the case) by writing it to a multiprocessing queue.

Reading the code should make it clear:


import multiprocessing
import queue
import threading
from time import sleep
from typing import Any, Dict, TextIO, List, Callable, Tuple, Optional, AnyStr, Match, cast


class TimeoutException(Exception):
    pass

def create_func_with_timeout(fn: Callable, timeout: int) -> Callable:
    """
    returns a new function that runs the provided fn in a separate process that either finishes normally or times out
    """
    def mp_function_wrapper(fn: Callable, queue: multiprocessing.Queue, *args, **kwargs):
        """
        this function has been started in a separate process
        it "returns" the result of fn to the main process by writing it to the queue
		we could take fn and timeout as closure values, but it seems more clear to pass them as parameters
        """
        try:
            result = fn(*args, **kwargs)
        except BaseException as ex:
            result = ex
        queue.put(result)
        
    def func_with_timeout(*args, **kwargs):
        """
        has the fn to run trapped as closure value
        """
        try:
            multiprocessing.set_start_method('fork')
            # multiprocessing.set_start_method('spawn')
        except RuntimeError as ex:
            print(f"set_start_method: {ex}")
        print(f"func_with_timeout {fn.__name__} {timeout}")
        response_queue = multiprocessing.Queue()
        extended_args = [fn, response_queue, *args]
        proc = multiprocessing.Process(target=mp_function_wrapper, args=extended_args, kwargs=kwargs)
        proc.start()
        try:
            result = response_queue.get(block=True, timeout=timeout)
        except queue.Empty:
            # the process is still running, so finish it off
            proc.terminate()
            raise TimeoutException()
        if result and isinstance(result, BaseException):
            raise result
        return result
  
    return func_with_timeout


def format_items(items):
    #print(f"format_items")
    results = []
    for item in items:
        sleep(1)
        results.append(item.upper())
        #print(item)
    return ",".join(results)

format_with_timeout = create_func_with_timeout(format_items, 5)

def run_test(items):
    print("- test")
    try:
        res = format_with_timeout(items)
        print(f"result: {res}")
    except BaseException as ex:
        print(f"exception: {type(ex).__name__} {ex}")


print("- starting")

run_test(["a", "b", "c"])

run_test(["a", "b", "c", "d", "e", "f", "g"])

run_test(["a", None])

print("- ending")

# - starting
# - test
# format_items
# result: A,B,C
# - test
# format_items
# exception: TimeoutException
# - test
# format_items
# exception: AttributeError 'NoneType' object has no attribute 'upper'
# - ending


I've also uploaded it to a gist

No comments:

Post a Comment