Thursday 15 February 2024

Cancelling Kotlin Coroutines

Kotlin coroutines come with a powerful cancellation mechanism. The first thing to make clear is that Cancellation is Cooperative. This is not like killing a process with kill -9, a coroutine has to cooperate with the cancellation mechanism by checking if a cancellation has been requested.

When we create a coroutine with a CoroutineBuilder like launch() or async() we obtain a Job (Deferred derives from Job) that we can cancel by invoking its cancel() method, that makes it transition into the cancelling state. The CoroutineContext has a reference to the Job, and a suspend function has always access to its context through the "magical" coroutineContext property defined in the kotlin.Coroutines package. I call it magical because its source code looks like this:


@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
    get() {
        throw NotImplementedError("Implemented as intrinsic")
    }

That "implemented as intrinsic" means this:

Intrinsic basically means that the implementation is internal to the compiler.
...
In general intrinsic thus means that it is something that is built in to the “translation” system rather than provided in other ways (library) but is not a specified element of the language syntax (the language itself is always built in to the compiler).

If we compile to bytecodes some Kotlin suspend function using that property and then transpile to Java code, the Java code looks like this: ((Continuation)$continuation).getContext()), which makes pretty much sense. We know that each suspend function is associated to a continuation object, and a continuation object references the CoroutineContext of the coroutine (that as I've said in turn references a Job, a Dispatcher...). So the Kotlin implementation of coroutines and suspend makes Cancellation very simple. As we know, every suspend function in a chain of suspend calls in a coroutine receives as parameter the continuation of the caller suspend function (we end up with a chain of continuations), and that continuation gives access to the CoroutineContext, hence the Job and hence checking if the Job has been cancelled. Notice that JavaScript promises do not support cancellation (though there are alternative implementations like bluebird that do so).

In JavaScript the chain of Promises of an "async call stack" is created from the deepest Promise outwards, while in Kotlin the chain of Continuations of an "async call stack" is created from the outer caller to the deepest callee, and I guess that this makes implementing cancellation more straightforward in Kotlin.

I mentioned above that "a coroutine has to cooperate with the cancellation mechanism by checking if a cancellation has been requested" but indeed most times we don't have to do anything, this is implicitly done for us, as stated in the documentation:

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

So in most occasions your suspend functions will just call to other suspend functions that already take care of cancellation (not only the ones in kotlinx.coroutines, but for example also Ktor requests. Let's think of a case where that's not the case. For example a function performing several sequential CPU-bound operations. We'll run that function in the ThreadPool (in its own Coroutine with the Dispatchers.Default dispatcher) and it will check after each operation if a cancellation has been requested. We use for that coroutineContext.ensureActive()


class Message(
    val header: String,
    val content: String,
    val footer: String,
    ) {}

// Example 1, as our suspend functions invoke delay, that is cancellable, our functions are also cancellable
suspend fun decryptHeader(txt: String): String {
    println("decryptHeader started")
    Thread.sleep(2000)
    return "[[$txt]]"
}

suspend fun decryptContent(txt: String): String {
    println("decryptContent started")
    Thread.sleep(2000)
    return "[[$txt]]"
}

suspend fun decryptFooter(txt: String): String {
    println("decryptFooter started")
    Thread.sleep(2000)
    return "[[$txt]]"
}

suspend fun decryptMessage(): String {
    val message = Message("Title", "Main", "notes")
    println("decryptMessage started")
    val header = decryptHeader(message.header)
    println("header obtained: $header")
    coroutineContext.ensureActive()

    val content = decryptContent(message.content)
    println("content obtained: $content")
    coroutineContext.ensureActive()

    val footer = decryptFooter(message.footer)
    println("footer obtained: $footer")
    coroutineContext.ensureActive()

    return "$header - $content - $footer"
}

suspend fun cancelComputation(c1: Deferred) {
    val elapsed = measureTimeMillis {
        delay(1000)
        c1.cancel()
        println("after invoking cancel")
        val res = try {
            c1.await()
        } catch (ex: Exception) {
            ex.message
        }
        println("result: $res")
    }
    println("elapsed time: $elapsed")
}

fun runCancellableComputation() {
    println("started")
    runBlocking {
        // this runs in the eventloop
        val c1 = async (Dispatchers.Default) {
            // this runs in the ThreadPool
            decryptMessage()
        }
        cancelComputation(c1)

    }
    println("Finished, current thread: ${Thread.currentThread().name}")
}

/*
started
decryptMessage started
decryptHeader started
after cancelling
header obtained: [[Title]]
result: DeferredCoroutine was cancelled
elapsed time: 2023
Finished, current thread: main
*/

After running the first CPU bound function we check in one step with ensureActive() if a cancellation has been requested and exit the coroutine in that case, pretty nice. I've seen some articles where they mention using yield() rather than ensureActive(). yield will also work, as it's a "cancel aware" suspend function but it's intended for something different. With yield we are cooperating, telling other coroutines in my same dispatcher to run if they need. In cases like this where we have less coroutines (just 1) than threads in the threadpool, that yielding would have no effect, but in cases where many coroutines fight for some thread, using yield is normally a good thing (if you are coopperative), but could be different from what you want, as on some occasions you'll want that task to finish as soon as possible in detriment of other tasks. Of course, do not confuse this yield with the SequenceScope.yield used for the "generator-like functionality".

There's a pretty nice explanation about this in stackoverflow:

I would answer the question in the context of 4 related things:

Sequence yield(value: T) is totally unrelated to coroutine yield()

isActive is just a flag to identify if the coroutine is still active or cancelled. You can check this flag periodically and decide to stop current coroutine or continue. Of course, normally, we only continue if it's true. Otherwise don't run anything or throws exception, ex. CancellationException.

ensureActive() checks the isActive flag above and throws CancellationException if it's false.

Coroutine yield() not only calls ensureActive() first, but then also politely tells other coroutines in the same dispatcher that: "Hey, you guys could go first, then I will continue." The reason could be "My job is not so important at the moment." or "I am sorry to block you guys for so long. I am not a selfish person, so it's your turn now." You can understand here exactly like this meaning in dictionary: "yield (to somebody/something): to allow vehicles on a bigger road to go first." SYNONYM: give way.

An additional note. I guess the check for cancellation could have been implemented implicitly after each call to a suspend function. When a suspension function finishes the coroutine invokes continuation.resume (or resumeWith) to continue on, so that resume() method could perform that check each time it's invoked.

No comments:

Post a Comment