Friday 26 January 2024

Kotlin Coroutines Basic Example Explained

As I explained in this previous post the approach used in Kotlin to asynchronous programming (and "generators") is a bit different from what I'm used to in JavaScript and Python (and even C#). Once I've managed to mainly understand how suspend, continuation passing style, compiler magic... work together, it's time to fiddle with some real code. What should be clear is that asynchronous programming in kotlin is more customizable than in JavaScript and Python. I'm thinking mainly about the different dispatchers, with which your code can run in an eventloop or a thread pool. Indeed we have at least 2 different thread pools. We use Dispatchers.Default for a pool for "CPU bound tasks" (with as many threads as CPU's in your machine), and Dispatchers.IO for a pool for "IO bound tasks"). Read this excellent article if you want to know more.

So I've started with one of the most common tasks in asynchronous programming that I can think of, lauching several non blocking IO operations (a simulated http request to get a blog-post), wait for all of them to complete and do something with their results.

The Python code for that looks like this:


delays = {
    "A1": 3,
    "B1": 1,
    "C1": 0.1,
}

async def get_post(id: str) -> str:
    print(f"getting post: {id}")
    if not id in delays:
        await asyncio.sleep(0.5)
        raise Exception(f"Missing post: {id}")   
     
    await asyncio.sleep(delays[id])
    return f"POST: [[{id}]]"

# async def retrieve_posts_sequentially():
#     print("started")
#     post_ids = ["A1", "B1", "C1"]
#     retrieved_posts = []
#     for id in post_ids:
#         retrieved_posts.append(await get_post(id))
#     print("all posts retrieved")
#     for post in retrieved_posts:
#         print(f"post: {post}")

async def retrieve_posts():
    print("started")
    post_ids = ["A1", "B1", "C1", "D1"]
    requested_posts = [asyncio.create_task(get_post(id)) for id in post_ids]
    retrieved_posts = await asyncio.gather(*requested_posts, return_exceptions= True)
    print("all posts retrieved")
    for post in retrieved_posts:
        print(f"post: {post}")


asyncio.run(retrieve_posts())	


First of all we have to create an event loop (with asyncio.run) to run our asyncio code in it. Then we have an async function (called coroutine function in python). As we know, invoking a coroutine creates a coroutine object, but does not start running the code. To run it we have to either await it, or launch it through create_task. Awaiting the coroutine (await get_post()) would launch it and suspend the calling function, so we would not do the second call to get_post() until the first one finishes and the event loop resumes the calling function. So the requests would run sequentially one after another. That's what would happen in the commented retrieve_posts_sequentially(). On the other side, starting the coroutine through create_task, launches the function without forcing us to await it. So we can launch all our requests and then wait for all of them to complete (with await asyncio.gather()). This way the requests run concurrently in the event loop thread. That's what happens in retrieve_posts().

Let's see now the Kotlin equivalent to the above:


val idsToDelays = mapOf(
    "A1" to 1000,
    "B1" to 2000,
    "C1" to 500,
)

suspend fun getPost(id: String): String {
    println("getPost $id start, context: $coroutineContext Thread: ${Thread.currentThread().name}")
    delay(idsToDelays[id] ?: 500)
    println("getPost $id end, context: $coroutineContext Thread: ${Thread.currentThread().name}")
    return if (idsToDelays[id] !== null) "POST: [[${id}]]"
        else throw Exception("missing ID")
		
val postIds = listOf("A1", "B1", "C1")

//runBlocking {
//	println("context: $coroutineContext")
//	postIds.forEach { getPost(it) }
//}
	
runBlocking {
	// same as the previous test, but now causing an exception and handling it
	val futures = postIds.map { async {
		//this is a try-catch expression, so we don't need to write "return"
		try {
			getPost(it)
		}
		catch (ex: Exception) {
			ex.message
		}
	}}

	val posts = futures.awaitAll()
	posts.forEach(::println)
}

/*
getPost A1 start, context: [DeferredCoroutine{Active}@566776ad, BlockingEventLoop@6108b2d7] Thread: main
getPost B1 start, context: [DeferredCoroutine{Active}@1554909b, BlockingEventLoop@6108b2d7] Thread: main
getPost C1 start, context: [DeferredCoroutine{Active}@6bf256fa, BlockingEventLoop@6108b2d7] Thread: main
getPost D1 start, context: [DeferredCoroutine{Active}@6cd8737, BlockingEventLoop@6108b2d7] Thread: main
getPost C1 end, context: [DeferredCoroutine{Active}@6bf256fa, BlockingEventLoop@6108b2d7] Thread: main
getPost D1 end, context: [DeferredCoroutine{Active}@6cd8737, BlockingEventLoop@6108b2d7] Thread: main
getPost A1 end, context: [DeferredCoroutine{Active}@566776ad, BlockingEventLoop@6108b2d7] Thread: main
getPost B1 end, context: [DeferredCoroutine{Active}@1554909b, BlockingEventLoop@6108b2d7] Thread: main
POST: [[A1]]
POST: [[B1]]
POST: [[C1]]
missing ID
Finished, current thread: main
*/

So as expected we use the suspend keyword (there's no async keyword in Kotlin) to designate our suspendable (asynchronous) function. To run our suspend functions we have to do it from a coroutine (that is quite different from a Python coroutine), that we create through the runBlocking() function, that is a coroutine builder. Calling runBlocking without providing a CoroutineContext (as in this case) runs the code in an event loop, everything in the same thread. From the documentation:

Runs a new coroutine and blocks the current thread interruptibly until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.
The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine.

If we call the suspend function directly (see the commented block) we would have the same behaviour as in the commented python block. The calling function would be suspended and would not resume until getPost is completed. It means that we would be gathering the posts sequentially. To run the suspend functions concurrently we use the async() function (another coroutine builder). This way a new coroutine is created to run each getPost(), with these coroutines running concurrently. Calling async() without providing a specific dispatcher means that the new coroutines will use the same dispatcher as the parent coroutine (the coroutine that we created with runBlocking), that is, the event loop dispatcher. async() returns a Deferred object (similar to a Python Task-Future) that will complete when the corresponding coroutine completes. We can wait for all these Deferred to complete with the awaitAll() function. So this code works almost exactly the same as the Python code, there are no additional threads, everything runs in an event loop in its single thread. Notice what I mentioned in the first paragraph, asynchronous programming in Kotlin is amazingly powerful, and we can "easily" write coroutines that use a thread pool rather than an event loop, but that will be for another post.

To summarize:
- asyncio.run() function == runBlocking() function == start an event loop
- asyncio.createTask() function == async() function
- asyncio.gather(list[Task]) function == List[Deferred].awaitAll() function

Well, as my other main language is JavaScript I think I should also write the JavaScript version:



let postIds = ["A1", "B1", "C1"];
let postPromises = postIds.map(getPost);
let retrivedPosts = await Promise.all(postPromises);
retrivedPosts.forEach(console.log);


As we know in JavaScript invoking an async function returns a Promise, and there's no need of using await for launching the execution of the function. If we await the Promise the calling function gets suspended, and hence we would be in the sequential case. So we first do all the getPost calls and gather its returned Promises, and then we await for the completion of all of them with an await Promise.all(). We don't need to start an event loop on our own, as that's the basic driver of any JavaScript runtime.

Saturday 20 January 2024

JavaScript async generators oddity

Recently I've found a stackoverflow discussion where a weird feature of JavaScript async generators is mentioned. If the async generator yields a Promise, the generator itself (well, the next() method in the corresponding generator object) will wait for the resolution of the Promise, returning its value.

I mean, this sentence for example:
yield Promise.resolve("AAA");
is replaced by this one:
yield await Promise.resolve("AAA");

This means that in this code below, the try-catch inside the generator will catch the exception:


function asleep(timeout) {
    return new Promise(res => setTimeout(res, timeout));
}

async function* asyncCities(){
    await asleep(1000);
    try {
        yield Promise.reject("rejected City");
    }
    catch (ex) {
        console.log(`Exception: ${ex} caught in the async generator`)
        yield "fixed";
    }
}

async function main() {
    let citiesGenerator = asyncCities();
    try {
        let city = await citiesGenerator.next();
        console.log(`city: ${city.value}`);
    }
    catch (ex){
        console.log(`Exception: ${ex} caught in the main function`)
    }
    //Exception: rejected City caught in the async generator
    //city: fixed
}

main();


This is slightly different from returns in asynchronous functions, where (as we saw in my previous post) if we return a Promise, the wrapping Promise will resolve to the resolution of that inner Promise, but all this is managed by the caller, not by an "invisible await" inside the async function. This means that in the below code the exception will be caught in the main function:


async function getCapital() {
    await asleep(1000);
    // this try-catch here is useless, returning a rejected promise is fine, it's outside when they wayt for it that they will get an exception and will have to handle it
    try {
        return Promise.reject("rejected City");
    }
    catch (ex) {
        console.log(`Exception: ${ex} caught in the async function`)
        return "fixed";
    }    
}

async function main() {
    // an exception happens here, as obviosuly in an async function the "return Promise" was not replaced by a "return await Promise" as it happens with the async generator
    // so the internal try-catch was useless
    try {
        let capital = await getCapital();
        console.log(`capital: ${capital}`);

    }
    catch (ex) {
        console.log(`Exception: ${ex} caught in main function`)
    }
    //Exception: rejected City caught in main function

}


I hardly can think of any situation where this behaviour will cause any gotcha, but it seemed interesting to me to mention it here.

Wednesday 17 January 2024

Promise.race and Access to the Resolved Promise

With the JavaScript Promise.race method we obtain the result/exception of the first resolved/rejected Promise. Normally that's all we need, but in some cases we would like to know the Promise itself that caused that resolution or rejection (notice that in Python asyncio.wait we obtain the Futures, not its values/exceptions), so what could we do? The solution for me is creating a "wrapper" Promise that resolves/rejects when the original Promise resolves/rejects. This wrapper Promise will resolve to the original Promise (not to its result). Initially I was thinking of resolving to a tuple with the result and the original Promise, but that's not necessary, as we can get the result by awaiting again the already resolved original Promise.

What we have to bear in mind is that a Promise A does not resolve to another Promise B, it will wait for that Promise B to be resolved to a "normal value", and then Promise A will resolve to that "normal value". I mean:


    let result = await Promise.resolve(Promise.resolve("AA"));
    console.log(result);
	//AA

    async function getMsg() {
        return Promise.resolve("AA");
    }
    result = await getMsg();
    console.log(result);
	//AA

    result = await Promise.resolve("").then(result => Promise.resolve("AA"));
    console.log(result);
	//AA
    
	result = await new Promise(resFn => resFn(Promise.resolve("AA")));
    console.log(result);
	//AA

Because of that, we will resolve the wrapper Promise to an array containing the original Promise, rather than directly to the original Promise. Else, when awaiting for the wrapper Promise we would end up getting the result of the original Promise, rather than the resolved original Promise.
So let's say we have this async getPost function.


async function asleep(timeout) {
    return new Promise(res => setTimeout(res, timeout));
}

const delays = {
    "A1": 1000,
    "B1": 2000,
    "C1": 50,
}

async function getPost(id) {
    console.log(`getting post: ${id}`)
    if (delays[id] === undefined) {
        await asleep(500);
        throw new Error(`Missing post: ${id}`);
    }

    await asleep(delays[id]);
    return `POST: [[${id}]]`;
}


We will launch a bunch of getPost actions, and we want to perform an action each time one of the posts is retrieved. We'll use Promise.race() for that, but we need to know the Promise that got resolved, so that then we can filter it out and invoke Promise.race again with the remaining ones (it's what we typically do in Python with asyncio.wait).


async function runPromises1(postPromises) {
    while (postPromises.length) {
        // Notice how we wrap the Promise in an array. That way we have a Promise that resolves to an Array of a Promise
        // if we had a Promise p1 resolving to a Promise p2, then we would have the thing that p1 would not really resolve until p2 resolves, resolving to its result
        
        // this simple syntax works fine:
        let [pr] = await Promise.race(postPromises.map(p => p.then(result => [p]).catch(ex => [p])));
        try {
			// the "internal" pr Promise is already resolved/rejected at this point
            let result = await pr;
            console.log(`resolved index: ${pr._index}, result: ${result}`);
        }
        catch (ex) {
            console.log(`Error: resolved index: ${pr._index}, exception: ${ex}`);
        }        
        postPromises = postPromises.filter(p => p !== pr);
    }
}

async function main() {
    let postPromises = ["A1", "B1", "C1", "D1"].map(getPost); // (id => getPost(id));
    postPromises.forEach((pr, index) => pr._index = index);
    await runPromises1(postPromises);
}

main();


As you can see, the important thing is this line:
await Promise.race(postPromises.map(p => p.then(result => [p]).catch(ex => [p])))
where the .then and .catch create the new Promise that will resolve/reject to an array containing the original Promise.

Rather than using then-catch we could write the above leveraging async, using an Immediately Invoked Async Arrow Function. An async function returns a new Promise that gets resolved/rejected when the function completes. As in the previous case we have to use the trick of returning the original Promise wrapped in an Array.


async function runPromises2(postPromises) {
    while (postPromises.length) {
        // Notice how we wrap the Promise in an array. That way we have a Promise that resolves to an Array of a Promise
        // if we had a Promise p1 resolving to a Promise p2, then we would have the thing that p1 would not really resolve until p2 resolves 
        
        // this more complex syntax also works fine, it's the same idea as the above
        // we have an Immediatelly Invoked Async Function Expression, it creates a Promise that resolves when the internal promise is resolved, returnig the promise itself (wrapped in an array)
        let [pr] = await Promise.race(postPromises.map(p => (async () => {
            try {
                await p;
            }
            catch {}
            return [p];
        })()));
        try {
            let result = await pr;
            console.log(`resolved index: ${pr._index}, result: ${result}`);
        }
        catch (ex) {
            console.log(`Error: resolved index: ${pr._index}, exception: ${ex}`);
        }        
        postPromises = postPromises.filter(p => p !== pr);
    }
}

This is one of those few cases where using .then().catch() looks cleaner than using async-await. Also, this need of knowing the Promise that has been resolved rather than just its value is not particularly realistic. In most cases we would just need to pass to .race/.wait... not just the getPost Promise, but a Promise for a function that both invokes getPromise and then performs the ensuing "print" action.

Tuesday 16 January 2024

Asyncio as_completed

When looking into one colleague's code recently I realised that I was missing the right/simple way to deal with one common asyncio situation. So I have a list of Awaitables (Futures/Tasks) and I want to run some code as soon as any of them completes, and continue to do so until all of them have completed. Given an async function like this that I will be callintg in paralell:



delays = {
    "A1": 2,
    "B1": 1,
    "C1": 0.1,
}

async def get_post(id: str) -> str:
    print(f"getting post: {id}")
    if not id in delays:
        await asyncio.sleep(0.5)
        raise Exception(f"Missing post: {id}")   
     
    await asyncio.sleep(delays[id])
    return f"POST: [[{id}]]"


I was using asyncio.wait in a loop, like this:



async def retrieve_posts():
    post_ids = ["A1", "B1", "C1", "D1"]
    pending_tasks = [asyncio.create_task(get_post(id)) for id in post_ids]
    while len(pending_tasks):
        done_tasks, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
        for done_task in done_tasks:
            try:
                # at this point, where the done_block is a resolved Future, these 2 sentences are equivalent:
                #block_result = done_task.result()
                post = await done_task
                print(f"post obtained: {post}")
            except Exception as ex:
                print(f"Error retrieving post: {ex}")
                continue

asyncio.run(retrieve_posts())

That works, but my colleage was using a more elegant approach, asyncio.as_completed. as_completed returns an iterator that in each iteration returns a Future (a new one, not one of those that you've passed over to it). That new Future resolves as soon as one of the provided Futures returns. This means that you can rewrite the above like this:



async def retrieve_posts():
    post_ids = ["A1", "B1", "C1", "D1"]
    tasks = [asyncio.create_task(get_post(id)) for id in post_ids]
    for task in asyncio.as_completed(tasks):
        try:
            post = await task
            print(f"post obtained: {post}")
        except BaseException as ex:
            print("Exception getting post: {ex}")
            

asyncio.run(retrieve_posts())	

So asyncio.wait is a better fit when you have some awaitables and as they resolve you'll be launching additional async tasks. asyncio.as_completed should be used when you have beforehand all the awaitables that you're going to run.

Looping over awaitables brings to my mind that python feature, the async for construct (equivalent to JavaScript for-await). I wonder why as_complete returns an iterable-iterator rather than an asynchronous iterable-iterator, but well, we can easily leverage as_completed to create an asynchronous generator, like this:



    async def as_completed_generator(awaitables: list[Awaitable]):
        for aw in asyncio.as_completed(awaitables):
            try:
                res = await aw
                yield res
            except BaseException as ex:
                yield ex

    async def retrieve_posts():
        post_ids = ["A1", "B1", "C1", "D1"]
        tasks = [asyncio.create_task(get_post(id)) for id in post_ids]
        async for post in as_completed_generator(tasks):
            if not isinstance(post, Exception):
                print(f"post retrieved: {post}")
            else:
                print(f"Exception: {post}")	
				

Notice how in order to allow us manage rejected awaitables our async generator yields either values or exceptions.

I'll leverage this post about asyncio to mention something that I did not include in this previous post about Futures vs Futures. The result() method in concurent.futures.Future is a blocking method, that blocks the current thread until a result is available, while the result() method in asyncio.Future is not. It will immediatelly return a value/throw and exception if the Future has been resolved/rejected, or throw an exception (InvalidStateException) if it's still suspended.

Wednesday 3 January 2024

Tail Call Optimization

Some days ago one colleage told me that his main complain about Python was the lack of Tail Call Optimization. Indeed Guido was very clear about that, in order to avoid messing the stack traces there will never be (well, don't know if now that he's no longer the BDFL maybe that could change) TCO in Python (in the cPython runtime I guess). Though over the time I have written several posts about how to adapt your code to prevent stack overflows (by using async-await in JavaScript and Python, or by using trampolines), I'd never paid much attention to this Tail Call Optimization thing, so I suddenly got intrigued.

First, I immediatelly associate Tail Call Optimization to Tail Recursive code, but this is not necessarilly so. A Tail Call happens when the last instruction in a function is a call to another function. Callbacks and Continuations come easily to mind. Normally Callbacks and Continuations are not used massively, so optimizing them won't be particularly important, neither in performance terms (improved memory locality) nor in terms of risking a stack overflow, save for functional style programming fully adhering to the CPS (Continuation Passing Style) style. That's why Tail Call Optimization is mainly important for recursive code (though it can be applied to any general Tail Call).

Another important point is distinguishing between direct recursion and mutual or indirect recursion. In direct recursion a function calls itself, in mutual/indirect recursion f1 calls f2 that calls f1 again. Indirect recursion and not recursive tail calls are more complex to optimize than direct recursive tail calls. From the wikipedia article:

The special case of tail-recursive calls, when a function calls itself, may be more amenable to call elimination than general tail calls. When the language semantics do not explicitly support general tail calls, a compiler can often still optimize sibling calls, or tail calls to functions which take and return the same types as the caller.

However, for language implementations which store function arguments and local variables on a call stack (which is the default implementation for many languages, at least on systems with a hardware stack, such as the x86), implementing generalized tail-call optimization (including mutual tail recursion) presents an issue: if the size of the callee's activation record is different from that of the caller, then additional cleanup or resizing of the stack frame may be required. For these cases, optimizing tail recursion remains trivial, but general tail-call optimization may be harder to implement efficiently.

For example, in the Java virtual machine (JVM), tail-recursive calls can be eliminated (as this reuses the existing call stack), but general tail calls cannot be (as this changes the call stack).[13][14] As a result, functional languages such as Scala that target the JVM can efficiently implement direct tail recursion, but not mutual tail recursion.

Curious about the last statement, I've found this that explains it pretty well.

Have you noticed that the method address starts with 0? That all methods offsets start with 0? JVM doesn't allow one to jump outside a method.

As I've aforementioned, Python does not provide any sort of TCO. We can modify a bit our code writing trampolines ourselves, that will work fine with tail and not tail recursion, direct and mutual recursion. We could also use the async/await trick, but I assume it'll be pretty awful in performance terms. And then, we have several smart people that have come up with some general solutions. There is this decorator that throws (an catches) an exception every 2 recursive calls, so the stack never grows (but I guess it'll hit performance pretty hard). Then this module that seems more ellaborate, and then this one that modifies one function bytecode to use trampolines.

Kotlin has the tailrec modifier that you can use in a function definition and will only work for direct tail recursion. In this article you can see how the Java bytecodes generated by the Kotlin compiler for a tailrec function implement the tail recursion by updating values in the current stack frame and performing a jump to the start of the function (so transforming the recursion into a loop), rather than doing a call.

I've learned from this article that .Net comes with support for tail calls at the bytecode level by means of the tail. instruction. I guess at the CLR bytecode level you are not allowed to write a jump to outside of the current function (same as in the JVM), so this tail function instructs the JIT compiler to compile the ensuing call bytecode into a native jump instruction, rather than creating a new stack frame and doing a native call. . It seems the F# compiler uses that instruction for non recursive tail calls, while transforming recursion into loops for recursive tail calls. The C# compiler does none of them.

The above brings up 2 general points to reflect about for me.
First, when providing Tail Call Optimizations in a VM environment, should the "high level language" to bytecodes compiler perform the optimization, or should that be delegated to the bytecodes to Native code JIT compiler. Well, for VM's on which we use an interpreter and 1 or n JIT's for "hot code" it's clear that that should be done by the "language to bytecodes" compiler. If there's no interpreter involved, just JIT, I'm not sure which one of the two compilers should perfomr optimization.
Second point, should we instruct the compiler about performing the optimization (as we do in Kotlin with tailrec) or should the compiler decide on its own and perform the optimization when he can and sees fit (as the F# compiler seems to do)? I would say I prefer the former approach.

There's this interesting discussion about the above topics appliced to C# and .Net.