Thursday 8 February 2024

Kotlin Coroutines and CPU and IO bound code

Time for a short follow-up to this recent post that showed my first non-theorical fiddling with Kotlin Coroutines. In that post I was running several suspend functions concurrently, by starting several coroutines concurrently (with the async() coroutine builder), but everything was running in the same Thread, in an event-loop created by the coroutine created by the runBlocking() coroutine builder. Obviously that's great for I/O bound code, but what if we have CPU bound code that we want to run in parallel in separate threads? Particularly let's say we have functions that both do CPU crushing and IO bound tasks. We can use for this coroutines with a dispatcher that leverages the ThreadPool.

If we have a CPU bound function and we launch 2 invokations using the async coroutine builder just as we did in my previous post, the code will run sequentially. This is so because we have no suspension points inside the CPU bound function, so once the first coroutine is launched by async() it will run without suspending and it's not until when it finishes that the second coroutine is launched (everyting running in the event-loop thread)


fun doCalculation(id: Int, duration: Int): Int{
    var count = 0
    while (count++ < duration) {
        println("calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
    println("Finishing calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
    return id
}

fun performCalculationsSequentially() {
    runBlocking {
        val time = measureTimeMillis {
            val c1 = async { doCalculation(id = 1, duration = 3) }
            val c2 = async { doCalculation(id = 2, duration = 6) }
            val results = listOf(c1, c2).awaitAll()
            println("tasks finished, ${results.joinToString(", ")}")
        }
        println("Time taken: $time") 
    }
}

/*
Started, cpubound
calculation: 1, step: 1, thread: main
calculation: 1, step: 2, thread: main
calculation: 1, step: 3, thread: main
Finishing calculation: 1, step: 4, thread: main
calculation: 2, step: 1, thread: main
calculation: 2, step: 2, thread: main
calculation: 2, step: 3, thread: main
calculation: 2, step: 4, thread: main
calculation: 2, step: 5, thread: main
calculation: 2, step: 6, thread: main
Finishing calculation: 2, step: 7, thread: main
tasks finished, 1, 2
Time taken: 4591
*/

So running coroutines with an event-loop is great for cooperating between multiple suspend functions that really have suspension points (yes, the equivalent to JavaScript async code or Python asyncio) but is useless when the function is just doing processing without suspending.

If we invoke the async coroutine builder with Dispatchers.Default. I mean: async (Dispatchers.Default), the coroutine will run in a ThreadPool, so in the below code, the calculations will run in parallel, each of them in a Thread from the ThreadPool.



fun doCalculation(id: Int, duration: Int): Int{
    var count = 0
    while (count++ < duration) {
        println("calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
    println("Finishing calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
    return id
}

fun performCalculations3() {
    runBlocking {
        println("coroutineContext $coroutineContext, thread: ${Thread.currentThread().name}")
        val time = measureTimeMillis {
            val c1 = async (Dispatchers.Default){
                println("inside async, coroutineContext $coroutineContext")
                doCalculation(id = 1, duration = 3)
            }
            val c2 = async (Dispatchers.Default){
                println("inside async, coroutineContext $coroutineContext")
                doCalculation(id = 2, duration = 6)
            }
            val results = listOf(c1, c2).awaitAll()
            println("tasks finished: ${results.joinToString(", ")}, coroutineContext $coroutineContext, thread: ${Thread.currentThread().name}")
        }
        println("Time taken: $time") // Time taken: 3079
    }
}

/*
coroutineContext [BlockingCoroutine{Active}@46f5f779, BlockingEventLoop@1c2c22f3], thread: main
inside async, coroutineContext [DeferredCoroutine{Active}@3a286e59, Dispatchers.Default]
inside async, coroutineContext [DeferredCoroutine{Active}@5e11571d, Dispatchers.Default]
calculation: 2, step: 1, thread: DefaultDispatcher-worker-2
calculation: 1, step: 1, thread: DefaultDispatcher-worker-1
calculation: 1, step: 2, thread: DefaultDispatcher-worker-1
calculation: 2, step: 2, thread: DefaultDispatcher-worker-2
calculation: 1, step: 3, thread: DefaultDispatcher-worker-1
calculation: 2, step: 3, thread: DefaultDispatcher-worker-2
Finishing calculation: 1, step: 4, thread: DefaultDispatcher-worker-1
calculation: 2, step: 4, thread: DefaultDispatcher-worker-2
calculation: 2, step: 5, thread: DefaultDispatcher-worker-2
calculation: 2, step: 6, thread: DefaultDispatcher-worker-2
Finishing calculation: 2, step: 7, thread: DefaultDispatcher-worker-2
Time taken: 3079
*/

So the main coroutine is using the event-looop, but the 2 coroutines created with async (Dispatchers.Default) are using the ThreadPool. It's clear that the 2 coroutines are running in parallel as the code takes 3 seconds to finish (duration 6 * 500 milliseconds), otherwise it wold take 4.5 seconds (9 * 500).

In this example, given that our doCalculation function is just CPU-bound and never suspending, we could wonder why we are using coroutines at all if what we want is to run our code in a ThreadPool. Rather than that we could directly use the ThreadPoolExecutor. Well thanks to the async CoroutineBuilder we obtain Deferred objects that we can await very easily, while I guess with the ThreadPoolExecutor this is not so straight forward. Anyway, the need for coroutines with thread-pool dispatchers is way more evident when we have a suspend function that is both suspending at some IO call and running some CPU-bound code (like doing a http request to obtain a text and then encrypting it).

It is also possible to switch the kind of dispatcher that a coroutine is already using, by means of the withContext function. This does not create a new coroutine, it creates a new CoroutineContext (with that Dispatcher that we've passed to it), and the block will run with that Context and Dispatcher. Remember that when we create a coroutine it gets a CoroutineContext (containing among other things a Dispatcher). For each suspend function invoked from that coroutine a Continuation object is created, that also points to that CoroutineContext. When we invoke withContext(), the Continuation objects corresponding to the suspend functions invoked from the block passed to withContext will use that new CoroutineContext, rather than the one of the coroutine. This way we can start a coroutine running in an event-loop, and at some point invoke some suspend function that will run in the ThreadPool.

No comments:

Post a Comment