Sunday, 6 July 2025

Kotlin Coroutines Revisited. Part 1: Cancellation

Last year I wrote several posts about Kotlin coroutines as I I was learning how they worked. Lately I've had some exciting GPT (Claude) discussions to dive deeper into different aspects of their implementation. I plan to write a few posts about this, and I'll start today by how cancellation works. The texts in green in this post are GPT (Claude) wisdom.

In this previous post I explained how cancellation is cooperative. The Kotlin documentation makes it clear:

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

Based on the above, my understanding when I wrote that post was that those krolinx.coroutines suspend functions would perform checks for cancellation in the different state transitions of each state machine, when they were resumed. Well, there's much more to that. Let's see:

First of all, let's say we have a function that is suspended waiting for some IO operation and the job associated to its coroutine is cancelled (we call job.cancel() and it transitions into the cancelled state). Does this "cooperative nature" mean that the coroutine won't be cancelled until the IO operation is completed and the suspend function gets resumed and can do a check for cancellation? No, it's not like that. Normally the coroutine will get cancelled immediatelly. For that, the suspend function must have been suspended by calling suspendCancellableCoroutine rather than suspendCoroutine. Notice that these 2 functions are the only way for a suspend function to perform a suspension (returning the COROUTINE_SUSPENDED value, I'll further explain this in a separate post).

Functions using suspendCancellableCoroutine register a cancellation handler that can immediately interrupt the underlying operation (like cancelling a timer, closing a socket, etc.).

For example, delay() is implemented roughly like:


suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return
    return suspendCancellableCoroutine { cont ->
        // Schedule timer
        val timer = scheduleTimer(timeMillis) { cont.resume(Unit) }
        // Register cancellation handler to cancel the timer
        cont.invokeOnCancellation { timer.cancel() }
    }
}

That cancellation handler will take care of finishing/cleaning the underlying asynchronous operation. So my understanding is that when Job.cancel is invoked, it will check if the coroutine associated to that job is suspended. If so it will follow the chain of continuations associated to the chain of suspend functions for that coroutine, and if the last continuation has a cancellation handler it will invoke it. Then, it will resume the continuation providing it a CancellationException (we'll see this a bit more in depth at the end of the post). We can say the using suspendCancellableCoroutine is the first kind of cooperative behaviour, as we allow for immediate cancellation during suspension.

A second kind of cooperative behaviour is checking for cancellation before performing suspension (before invoking suspendCoroutine/suspendCancellableCoroutine). Indeed, most I/O-bound kotlinx.coroutines functions check for cancellation (using ensureActive()) before performing suspension.

If the suspend function has no way to cancel the underlying low level operation it will use suspendCoroutine rather than suspendCancellableCoroutine to get suspended. That means that it won't be cancelled while it's in suspended state, and it must check for cancellation when it's resumed. It'll do that using ensureActive(). That's the third kind of cooperative behaviour.

There's a fourth kind of cooperative behaviour. When a suspend function performs also some kind of CPU-bound operation it should check for cancellation before starting the CPU-bound operation, in between (if possible) and at the end of the CPU-bound operation.

I said a few paragraphs above that I would farther talk about CancellationException, well, indeed about how Exceptions are propagated inside coroutines. Once one function in a chain of suspend function calls in one coroutine gets suspended, we no longer have a call stack, but a mechanism of resumption based on continuations. So, how can an exception propagate from one function to its calling function? Well, the same as a returned value, by means of a Result object. In other asynchronous systems like JavaScript or Python asyncio, async funtions return Promises (that can be resolved or rejected), and Futures (that are set as Done to a result or to an exception). In Kotlin we have continuations (a callback on steroids), that are resumed with a Result object.

So continuations don't just pass values - they pass Result objects that can encapsulate either success or failure. When an exception occurs, it's wrapped in a failed Result.

Let's say we have these 2 suspend functions: getReportInfo that invokes getPersonInfo (that in turn invokes three other 3 suspend functions: getName, getLastName and getCity )


suspend fun getReportInfo() {
    try {
        val personInfo = getPersonInfo() // Exception propagates here
    } catch (e: Exception) {
        // Catches the exception from getLastName()
        println("Caught: ${e.message}")
    }
}

suspend fun getPersonInfo(): PersonInfo {
    val name = getName()        // State 0 → 1
    val lastName = getLastName() // State 1 → 2 (exception thrown here)
    val city = getCity()        // State 2 → 3 (never reached)
    return PersonInfo(name, lastName, city)
}

This is how the Kotlin compiler magic would transform them (thanks GPT!!!)


// Conceptual representation of compiled suspend function
class GetPersonInfoStateMachine : Continuation {
    override fun resumeWith(result: Result) {
        try {
            when (state) {
                0 -> { /* initial state */ }
                1 -> { 
                    // After getName() completes
                    if (result.isFailure) {
                        // Exception from getName - propagate up
                        completion.resumeWithException(result.exceptionOrNull()!!)
                        return
                    }
                    // Continue to getLastName()
                }
                2 -> {
                    // After getLastName() completes  
                    if (result.isFailure) {
                        // This is where your getLastName exception arrives
                        completion.resumeWithException(result.exceptionOrNull()!!)
                        return
                    }
                    // Would continue to getCity()
                }
                // ... more states
            }
        } catch (e: Exception) {
            // Any synchronous exception in state machine
            completion.resumeWithException(e)
        }
    }
}

// getReportInfo() with try-catch - compiler-generated logic
class GetReportInfoStateMachine : Continuation {
    override fun resumeWith(result: Result) {
        try {
            when (state) {
                0 -> {
                    // About to call getPersonInfo()
                    state = 1
                    getPersonInfo(this) // pass this as continuation
                }
                1 -> {
                    // getPersonInfo() completed
                    if (result.isFailure) {
                        // YOUR try-catch exists, so throw to trigger it
                        throw result.exceptionOrNull()!!
                    }
                    // Success case - continue with result
                    val personInfo = result.getOrThrow()
                    // ... rest of function
                }
            }
        } catch (e: Exception) {
            if (isInTryBlock) {
                // Execute your catch block logic here
                handleException(e)
            } else {
                // No try-catch, propagate up
                completion.resumeWithException(e)
            }
        }
    }
}

the magic happens in that if (result.isFailure) throw exception check that gets inserted wherever the compiler detects you want traditional exception handling semantics.

Notice that at each "entry point" of the state machine corresponding to the suspend function there's a check to see if an exception has happened. If the original code (before the compiler magic transformed the function into a state machine) was wrapped in a try-catch, the excpetion is thrown, else we pass it to the caller by invoking continuation.resumeWithException. Notice that resumeWithException just creates a Result object with failure set to the exception and invokes resumeWith(result). Its source code is just:


public inline fun  Continuation.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

No comments:

Post a Comment