Friday 26 January 2024

Kotlin Coroutines Basic Example Explained

As I explained in this previous post the approach used in Kotlin to asynchronous programming (and "generators") is a bit different from what I'm used to in JavaScript and Python (and even C#). Once I've managed to mainly understand how suspend, continuation passing style, compiler magic... work together, it's time to fiddle with some real code. What should be clear is that asynchronous programming in kotlin is more customizable than in JavaScript and Python. I'm thinking mainly about the different dispatchers, with which your code can run in an eventloop or a thread pool. Indeed we have at least 2 different thread pools. We use Dispatchers.Default for a pool for "CPU bound tasks" (with as many threads as CPU's in your machine), and Dispatchers.IO for a pool for "IO bound tasks"). Read this excellent article if you want to know more.

So I've started with one of the most common tasks in asynchronous programming that I can think of, lauching several non blocking IO operations (a simulated http request to get a blog-post), wait for all of them to complete and do something with their results.

The Python code for that looks like this:


delays = {
    "A1": 3,
    "B1": 1,
    "C1": 0.1,
}

async def get_post(id: str) -> str:
    print(f"getting post: {id}")
    if not id in delays:
        await asyncio.sleep(0.5)
        raise Exception(f"Missing post: {id}")   
     
    await asyncio.sleep(delays[id])
    return f"POST: [[{id}]]"

# async def retrieve_posts_sequentially():
#     print("started")
#     post_ids = ["A1", "B1", "C1"]
#     retrieved_posts = []
#     for id in post_ids:
#         retrieved_posts.append(await get_post(id))
#     print("all posts retrieved")
#     for post in retrieved_posts:
#         print(f"post: {post}")

async def retrieve_posts():
    print("started")
    post_ids = ["A1", "B1", "C1", "D1"]
    requested_posts = [asyncio.create_task(get_post(id)) for id in post_ids]
    retrieved_posts = await asyncio.gather(*requested_posts, return_exceptions= True)
    print("all posts retrieved")
    for post in retrieved_posts:
        print(f"post: {post}")


asyncio.run(retrieve_posts())	


First of all we have to create an event loop (with asyncio.run) to run our asyncio code in it. Then we have an async function (called coroutine function in python). As we know, invoking a coroutine creates a coroutine object, but does not start running the code. To run it we have to either await it, or launch it through create_task. Awaiting the coroutine (await get_post()) would launch it and suspend the calling function, so we would not do the second call to get_post() until the first one finishes and the event loop resumes the calling function. So the requests would run sequentially one after another. That's what would happen in the commented retrieve_posts_sequentially(). On the other side, starting the coroutine through create_task, launches the function without forcing us to await it. So we can launch all our requests and then wait for all of them to complete (with await asyncio.gather()). This way the requests run concurrently in the event loop thread. That's what happens in retrieve_posts().

Let's see now the Kotlin equivalent to the above:


val idsToDelays = mapOf(
    "A1" to 1000,
    "B1" to 2000,
    "C1" to 500,
)

suspend fun getPost(id: String): String {
    println("getPost $id start, context: $coroutineContext Thread: ${Thread.currentThread().name}")
    delay(idsToDelays[id] ?: 500)
    println("getPost $id end, context: $coroutineContext Thread: ${Thread.currentThread().name}")
    return if (idsToDelays[id] !== null) "POST: [[${id}]]"
        else throw Exception("missing ID")
		
val postIds = listOf("A1", "B1", "C1")

//runBlocking {
//	println("context: $coroutineContext")
//	postIds.forEach { getPost(it) }
//}
	
runBlocking {
	// same as the previous test, but now causing an exception and handling it
	val futures = postIds.map { async {
		//this is a try-catch expression, so we don't need to write "return"
		try {
			getPost(it)
		}
		catch (ex: Exception) {
			ex.message
		}
	}}

	val posts = futures.awaitAll()
	posts.forEach(::println)
}

/*
getPost A1 start, context: [DeferredCoroutine{Active}@566776ad, BlockingEventLoop@6108b2d7] Thread: main
getPost B1 start, context: [DeferredCoroutine{Active}@1554909b, BlockingEventLoop@6108b2d7] Thread: main
getPost C1 start, context: [DeferredCoroutine{Active}@6bf256fa, BlockingEventLoop@6108b2d7] Thread: main
getPost D1 start, context: [DeferredCoroutine{Active}@6cd8737, BlockingEventLoop@6108b2d7] Thread: main
getPost C1 end, context: [DeferredCoroutine{Active}@6bf256fa, BlockingEventLoop@6108b2d7] Thread: main
getPost D1 end, context: [DeferredCoroutine{Active}@6cd8737, BlockingEventLoop@6108b2d7] Thread: main
getPost A1 end, context: [DeferredCoroutine{Active}@566776ad, BlockingEventLoop@6108b2d7] Thread: main
getPost B1 end, context: [DeferredCoroutine{Active}@1554909b, BlockingEventLoop@6108b2d7] Thread: main
POST: [[A1]]
POST: [[B1]]
POST: [[C1]]
missing ID
Finished, current thread: main
*/

So as expected we use the suspend keyword (there's no async keyword in Kotlin) to designate our suspendable (asynchronous) function. To run our suspend functions we have to do it from a coroutine (that is quite different from a Python coroutine), that we create through the runBlocking() function, that is a coroutine builder. Calling runBlocking without providing a CoroutineContext (as in this case) runs the code in an event loop, everything in the same thread. From the documentation:

Runs a new coroutine and blocks the current thread interruptibly until its completion. This function should not be used from a coroutine. It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in main functions and in tests.
The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine.

If we call the suspend function directly (see the commented block) we would have the same behaviour as in the commented python block. The calling function would be suspended and would not resume until getPost is completed. It means that we would be gathering the posts sequentially. To run the suspend functions concurrently we use the async() function (another coroutine builder). This way a new coroutine is created to run each getPost(), with these coroutines running concurrently. Calling async() without providing a specific dispatcher means that the new coroutines will use the same dispatcher as the parent coroutine (the coroutine that we created with runBlocking), that is, the event loop dispatcher. async() returns a Deferred object (similar to a Python Task-Future) that will complete when the corresponding coroutine completes. We can wait for all these Deferred to complete with the awaitAll() function. So this code works almost exactly the same as the Python code, there are no additional threads, everything runs in an event loop in its single thread. Notice what I mentioned in the first paragraph, asynchronous programming in Kotlin is amazingly powerful, and we can "easily" write coroutines that use a thread pool rather than an event loop, but that will be for another post.

To summarize:
- asyncio.run() function == runBlocking() function == start an event loop
- asyncio.createTask() function == async() function
- asyncio.gather(list[Task]) function == List[Deferred].awaitAll() function

Well, as my other main language is JavaScript I think I should also write the JavaScript version:



let postIds = ["A1", "B1", "C1"];
let postPromises = postIds.map(getPost);
let retrivedPosts = await Promise.all(postPromises);
retrivedPosts.forEach(console.log);


As we know in JavaScript invoking an async function returns a Promise, and there's no need of using await for launching the execution of the function. If we await the Promise the calling function gets suspended, and hence we would be in the sequential case. So we first do all the getPost calls and gather its returned Promises, and then we await for the completion of all of them with an await Promise.all(). We don't need to start an event loop on our own, as that's the basic driver of any JavaScript runtime.

No comments:

Post a Comment