Monday 19 February 2024

Yielding aka Allowing others to Run

At the end of my previous post I talked about the yield() Kotlin function, and how we use it to cooperate with other coroutines by asking if another coroutine wants to run, and if so, suspending the caller coroutine so that another coroutine gets scheduled. We don't have a specific yield function in JavaScript or Python, we just leverage another more generic function for that purpose, let's see.

In Python we use await asyncio.sleep(0) for that. asyncio.sleep() will suspend the current coroutine in all cases, regardless of the interval provided (from the documentation: "sleep() always suspends the current task, allowing other tasks to run."). The event loop will take control and will check if another coroutine wants to run, and if not, if the interval is 0, will resume the sleeping coroutine immediatelly. So it's the same as yield in Kotlin

Javascript follows the same strategy as Python for this. The setTimeout() function always returns control to the event loop, even when the interval is 0, with the callback being added to the macrotask queue (from the documentation: "If this parameter is omitted, a value of 0 is used, meaning execute "immediately", or more accurately, the next event cycle". If there's nothing in the microtask queue or the macrotask queue, the callback we've just added will run immediatelly (in the next event loop iteration), as we've provided a 0 wait interval. OK, I'm talking about callbacks, which seems a bit odd, so notice that we can easily "promisify" the setTimeout function getting an equivalent to asyncio.sleep(), like this:


async function asleep(timeout) {
    return new Promise(res => setTimeout(res, timeout));
}

Additionally, in JavaScript we can await any value, not just a Promise. We can just write await "aa" or await Promise.resolve("aa") (both are equivalent, await "aa" is indeed transformed in await Promise.resolve("aa")). Awaiting for an already resolved Promise will add the "then callback" for that promise (in an async function that's the invokation to the next state of the state-machine for that function) to the microtask queue. When the event loop gets back the control it will schedule the next task in the microtask queue. So if there was some previous task it will be executed, else our "then callback" task will be executed, so this is equivalent to yielding.

So all in all in JavaScript we have 2 levels of yielding. We can yield to tasks in the microtask queue by awaiting a resolved Promise, or yield to tasks in the macrotask queue, by using setTimeout(0). You can read more about this here and here.

We know that Kotlin has a suspend delay() function, but we can not use it as a replacement for yield(), because delay() only suspends when the interval is bigger than 0 (from the documentation: "If the given timeMillis is non-positive, this function returns immediately". So delay(0) has no effect at all, the next line will run sequentially, no suspension and chances for other coroutines to run will happen.

I must say that I pretty like the Kotlin approach. Having a specific yield function rather than leveraging a particular case of another function is more semantic

Thursday 15 February 2024

Cancelling Kotlin Coroutines

Kotlin coroutines come with a powerful cancellation mechanism. The first thing to make clear is that Cancellation is Cooperative. This is not like killing a process with kill -9, a coroutine has to cooperate with the cancellation mechanism by checking if a cancellation has been requested.

When we create a coroutine with a CoroutineBuilder like launch() or async() we obtain a Job (Deferred derives from Job) that we can cancel by invoking its cancel() method, that makes it transition into the cancelling state. The CoroutineContext has a reference to the Job, and a suspend function has always access to its context through the "magical" coroutineContext property defined in the kotlin.Coroutines package. I call it magical because its source code looks like this:


@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
    get() {
        throw NotImplementedError("Implemented as intrinsic")
    }

That "implemented as intrinsic" means this:

Intrinsic basically means that the implementation is internal to the compiler.
...
In general intrinsic thus means that it is something that is built in to the “translation” system rather than provided in other ways (library) but is not a specified element of the language syntax (the language itself is always built in to the compiler).

If we compile to bytecodes some Kotlin suspend function using that property and then transpile to Java code, the Java code looks like this: ((Continuation)$continuation).getContext()), which makes pretty much sense. We know that each suspend function is associated to a continuation object, and a continuation object references the CoroutineContext of the coroutine (that as I've said in turn references a Job, a Dispatcher...). So the Kotlin implementation of coroutines and suspend makes Cancellation very simple. As we know, every suspend function in a chain of suspend calls in a coroutine receives as parameter the continuation of the caller suspend function (we end up with a chain of continuations), and that continuation gives access to the CoroutineContext, hence the Job and hence checking if the Job has been cancelled. Notice that JavaScript promises do not support cancellation (though there are alternative implementations like bluebird that do so).

In JavaScript the chain of Promises of an "async call stack" is created from the deepest Promise outwards, while in Kotlin the chain of Continuations of an "async call stack" is created from the outer caller to the deepest callee, and I guess that this makes implementing cancellation more straightforward in Kotlin.

I mentioned above that "a coroutine has to cooperate with the cancellation mechanism by checking if a cancellation has been requested" but indeed most times we don't have to do anything, this is implicitly done for us, as stated in the documentation:

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

So in most occasions your suspend functions will just call to other suspend functions that already take care of cancellation (not only the ones in kotlinx.coroutines, but for example also Ktor requests. Let's think of a case where that's not the case. For example a function performing several sequential CPU-bound operations. We'll run that function in the ThreadPool (in its own Coroutine with the Dispatchers.Default dispatcher) and it will check after each operation if a cancellation has been requested. We use for that coroutineContext.ensureActive()


class Message(
    val header: String,
    val content: String,
    val footer: String,
    ) {}

// Example 1, as our suspend functions invoke delay, that is cancellable, our functions are also cancellable
suspend fun decryptHeader(txt: String): String {
    println("decryptHeader started")
    Thread.sleep(2000)
    return "[[$txt]]"
}

suspend fun decryptContent(txt: String): String {
    println("decryptContent started")
    Thread.sleep(2000)
    return "[[$txt]]"
}

suspend fun decryptFooter(txt: String): String {
    println("decryptFooter started")
    Thread.sleep(2000)
    return "[[$txt]]"
}

suspend fun decryptMessage(): String {
    val message = Message("Title", "Main", "notes")
    println("decryptMessage started")
    val header = decryptHeader(message.header)
    println("header obtained: $header")
    coroutineContext.ensureActive()

    val content = decryptContent(message.content)
    println("content obtained: $content")
    coroutineContext.ensureActive()

    val footer = decryptFooter(message.footer)
    println("footer obtained: $footer")
    coroutineContext.ensureActive()

    return "$header - $content - $footer"
}

suspend fun cancelComputation(c1: Deferred) {
    val elapsed = measureTimeMillis {
        delay(1000)
        c1.cancel()
        println("after invoking cancel")
        val res = try {
            c1.await()
        } catch (ex: Exception) {
            ex.message
        }
        println("result: $res")
    }
    println("elapsed time: $elapsed")
}

fun runCancellableComputation() {
    println("started")
    runBlocking {
        // this runs in the eventloop
        val c1 = async (Dispatchers.Default) {
            // this runs in the ThreadPool
            decryptMessage()
        }
        cancelComputation(c1)

    }
    println("Finished, current thread: ${Thread.currentThread().name}")
}

/*
started
decryptMessage started
decryptHeader started
after cancelling
header obtained: [[Title]]
result: DeferredCoroutine was cancelled
elapsed time: 2023
Finished, current thread: main
*/

After running the first CPU bound function we check in one step with ensureActive() if a cancellation has been requested and exit the coroutine in that case, pretty nice. I've seen some articles where they mention using yield() rather than ensureActive(). yield will also work, as it's a "cancel aware" suspend function but it's intended for something different. With yield we are cooperating, telling other coroutines in my same dispatcher to run if they need. In cases like this where we have less coroutines (just 1) than threads in the threadpool, that yielding would have no effect, but in cases where many coroutines fight for some thread, using yield is normally a good thing (if you are coopperative), but could be different from what you want, as on some occasions you'll want that task to finish as soon as possible in detriment of other tasks. Of course, do not confuse this yield with the SequenceScope.yield used for the "generator-like functionality".

There's a pretty nice explanation about this in stackoverflow:

I would answer the question in the context of 4 related things:

Sequence yield(value: T) is totally unrelated to coroutine yield()

isActive is just a flag to identify if the coroutine is still active or cancelled. You can check this flag periodically and decide to stop current coroutine or continue. Of course, normally, we only continue if it's true. Otherwise don't run anything or throws exception, ex. CancellationException.

ensureActive() checks the isActive flag above and throws CancellationException if it's false.

Coroutine yield() not only calls ensureActive() first, but then also politely tells other coroutines in the same dispatcher that: "Hey, you guys could go first, then I will continue." The reason could be "My job is not so important at the moment." or "I am sorry to block you guys for so long. I am not a selfish person, so it's your turn now." You can understand here exactly like this meaning in dictionary: "yield (to somebody/something): to allow vehicles on a bigger road to go first." SYNONYM: give way.

An additional note. I guess the check for cancellation could have been implemented implicitly after each call to a suspend function. When a suspension function finishes the coroutine invokes continuation.resume (or resumeWith) to continue on, so that resume() method could perform that check each time it's invoked.

Thursday 8 February 2024

Kotlin Coroutines and CPU and IO bound code

Time for a short follow-up to this recent post that showed my first non-theorical fiddling with Kotlin Coroutines. In that post I was running several suspend functions concurrently, by starting several coroutines concurrently (with the async() coroutine builder), but everything was running in the same Thread, in an event-loop created by the coroutine created by the runBlocking() coroutine builder. Obviously that's great for I/O bound code, but what if we have CPU bound code that we want to run in parallel in separate threads? Particularly let's say we have functions that both do CPU crushing and IO bound tasks. We can use for this coroutines with a dispatcher that leverages the ThreadPool.

If we have a CPU bound function and we launch 2 invokations using the async coroutine builder just as we did in my previous post, the code will run sequentially. This is so because we have no suspension points inside the CPU bound function, so once the first coroutine is launched by async() it will run without suspending and it's not until when it finishes that the second coroutine is launched (everyting running in the event-loop thread)


fun doCalculation(id: Int, duration: Int): Int{
    var count = 0
    while (count++ < duration) {
        println("calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
    println("Finishing calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
    return id
}

fun performCalculationsSequentially() {
    runBlocking {
        val time = measureTimeMillis {
            val c1 = async { doCalculation(id = 1, duration = 3) }
            val c2 = async { doCalculation(id = 2, duration = 6) }
            val results = listOf(c1, c2).awaitAll()
            println("tasks finished, ${results.joinToString(", ")}")
        }
        println("Time taken: $time") 
    }
}

/*
Started, cpubound
calculation: 1, step: 1, thread: main
calculation: 1, step: 2, thread: main
calculation: 1, step: 3, thread: main
Finishing calculation: 1, step: 4, thread: main
calculation: 2, step: 1, thread: main
calculation: 2, step: 2, thread: main
calculation: 2, step: 3, thread: main
calculation: 2, step: 4, thread: main
calculation: 2, step: 5, thread: main
calculation: 2, step: 6, thread: main
Finishing calculation: 2, step: 7, thread: main
tasks finished, 1, 2
Time taken: 4591
*/

So running coroutines with an event-loop is great for cooperating between multiple suspend functions that really have suspension points (yes, the equivalent to JavaScript async code or Python asyncio) but is useless when the function is just doing processing without suspending.

If we invoke the async coroutine builder with Dispatchers.Default. I mean: async (Dispatchers.Default), the coroutine will run in a ThreadPool, so in the below code, the calculations will run in parallel, each of them in a Thread from the ThreadPool.



fun doCalculation(id: Int, duration: Int): Int{
    var count = 0
    while (count++ < duration) {
        println("calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
        Thread.sleep(500)
    }
    println("Finishing calculation: $id, step: $count, thread: ${Thread.currentThread().name}")
    return id
}

fun performCalculations3() {
    runBlocking {
        println("coroutineContext $coroutineContext, thread: ${Thread.currentThread().name}")
        val time = measureTimeMillis {
            val c1 = async (Dispatchers.Default){
                println("inside async, coroutineContext $coroutineContext")
                doCalculation(id = 1, duration = 3)
            }
            val c2 = async (Dispatchers.Default){
                println("inside async, coroutineContext $coroutineContext")
                doCalculation(id = 2, duration = 6)
            }
            val results = listOf(c1, c2).awaitAll()
            println("tasks finished: ${results.joinToString(", ")}, coroutineContext $coroutineContext, thread: ${Thread.currentThread().name}")
        }
        println("Time taken: $time") // Time taken: 3079
    }
}

/*
coroutineContext [BlockingCoroutine{Active}@46f5f779, BlockingEventLoop@1c2c22f3], thread: main
inside async, coroutineContext [DeferredCoroutine{Active}@3a286e59, Dispatchers.Default]
inside async, coroutineContext [DeferredCoroutine{Active}@5e11571d, Dispatchers.Default]
calculation: 2, step: 1, thread: DefaultDispatcher-worker-2
calculation: 1, step: 1, thread: DefaultDispatcher-worker-1
calculation: 1, step: 2, thread: DefaultDispatcher-worker-1
calculation: 2, step: 2, thread: DefaultDispatcher-worker-2
calculation: 1, step: 3, thread: DefaultDispatcher-worker-1
calculation: 2, step: 3, thread: DefaultDispatcher-worker-2
Finishing calculation: 1, step: 4, thread: DefaultDispatcher-worker-1
calculation: 2, step: 4, thread: DefaultDispatcher-worker-2
calculation: 2, step: 5, thread: DefaultDispatcher-worker-2
calculation: 2, step: 6, thread: DefaultDispatcher-worker-2
Finishing calculation: 2, step: 7, thread: DefaultDispatcher-worker-2
Time taken: 3079
*/

So the main coroutine is using the event-looop, but the 2 coroutines created with async (Dispatchers.Default) are using the ThreadPool. It's clear that the 2 coroutines are running in parallel as the code takes 3 seconds to finish (duration 6 * 500 milliseconds), otherwise it wold take 4.5 seconds (9 * 500).

In this example, given that our doCalculation function is just CPU-bound and never suspending, we could wonder why we are using coroutines at all if what we want is to run our code in a ThreadPool. Rather than that we could directly use the ThreadPoolExecutor. Well thanks to the async CoroutineBuilder we obtain Deferred objects that we can await very easily, while I guess with the ThreadPoolExecutor this is not so straight forward. Anyway, the need for coroutines with thread-pool dispatchers is way more evident when we have a suspend function that is both suspending at some IO call and running some CPU-bound code (like doing a http request to obtain a text and then encrypting it).

It is also possible to switch the kind of dispatcher that a coroutine is already using, by means of the withContext function. This does not create a new coroutine, it creates a new CoroutineContext (with that Dispatcher that we've passed to it), and the block will run with that Context and Dispatcher. Remember that when we create a coroutine it gets a CoroutineContext (containing among other things a Dispatcher). For each suspend function invoked from that coroutine a Continuation object is created, that also points to that CoroutineContext. When we invoke withContext(), the Continuation objects corresponding to the suspend functions invoked from the block passed to withContext will use that new CoroutineContext, rather than the one of the coroutine. This way we can start a coroutine running in an event-loop, and at some point invoke some suspend function that will run in the ThreadPool.

Friday 2 February 2024

Python late-bound default parameters

Last year I wrote a post about the odd behaviour of default arguments/parameters in Python (by the way, I always get confused about whether I should say arguments or parameters in this case). This behaviour comes from the fact that default parameters are bound at function definition time rather than at call time (late bound). As explained in that post, JavaScript, Kotlin and Ruby behave differently, the value for a default parameter is evaluated each time the function is call. At that time I had not paid attention to how powerful such a feature is. Looking into the MDN documentation I've seen that parameters can use previous parameters in its definition:


function greet(name, greeting, message = `${greeting} ${name}`) {
  return [name, greeting, message];
}

Kotlin documentation does not stress that much these advanced uses, but of course it also comes with them:


fun read(
    b: ByteArray,
    off: Int = 0,
    len: Int = b.size,
) { /*...*/ }

What has led me to review what default parameters allow in these languages is that I recently came across with a draft for a PEP (671), that of course was not received with particular interest by part of the community (probably the same ones that tells us that optional chaining has no particular use case...), that proposes taking Python default arguments to the next level by making them late-bound (and allowing them access to other parameters). As interesting as the proposal is the fact that one smart guy has sort of implemented it in the late module, by means of decorators.

The way to implement such thing in pure Python is not so misterious (particularly after checking the source code :-D Given that parameters are bound at definition time, let's bind something that will produce a value, rather than a value itself. Then, in order to make that producer run each time the function is invoked, let's wrap that function with another one that will do the calling. Join to this the powerful inspect(fn).signature() method, and we are done. What the late guy has implemented is really nice, but it does not seem so powerful as what the PEP proposes (and that is the same that we have in JavaScript and Kotlin). It does not allow late-bound parameters to depend on other parameters (either also late-bounds or normal). So after having checked the source code I went ahead with implementing my own version of late-binding (or call-time binding) for default parameters. Here it is:



from dataclasses import dataclass
from typing import Callable, Any
import functools
import inspect

@dataclass
class LateBound:
	resolver: Callable

def _invoke_late_bound(callable: Callable, arg_name_to_value: dict[str, Any]) -> Any:
    """
    invokes a callable passing over to it the parameters defined in its signature
    we obtain those values from the arg_name_to_value dictionary
    """
    expected_params = inspect.signature(callable).parameters.keys()
    kwargs = {name: arg_name_to_value[name]
         for name in expected_params
    }
    return callable(**kwargs)


def _add_late_bounds(arg_name_to_value: dict[str, Any], late_bounds: list[str, Callable]):
    """resolves late-bound values and adds them to the arg_name_to_value dictionary"""
    for name, callable in late_bounds:
        val = _invoke_late_bound(callable, arg_name_to_value)
        #this way one late bound can depend on a previous late boud 
        arg_name_to_value[name] = val
    

def _resolve_args(target_fn: Callable, *args, **kwargs) -> dict[str, Any]:
    """returns a dictionary with the name and value all the parameters (the ones already provided, the calculated latebounds and the normal defaults)"""
    # dictionary of the arguments and values received by the function at runtime
    # we use it to be able to calculate late_bound values based on other parameters
    arg_name_to_value: dict[str, Any] = {}
    arg_names = list(inspect.signature(target_fn).parameters.keys())
    for index, arg in enumerate(args):
        arg_name_to_value[arg_names[index]] = arg
    arg_name_to_value = {**arg_name_to_value, **kwargs}
    
    # obtain the values for all default parameters that have not been provided
    # we obtain them all here so that late_bounds can depend on other (compile-time or late-bound) default parameters
    #late bounds to calculate (were not provided in args-kwargs)
    not_late_bounds  = {name: param.default 
        for name, param in inspect.signature(target_fn).parameters.items()
        if not isinstance(param.default, LateBound) and not name in arg_name_to_value
    }
    arg_name_to_value = {**arg_name_to_value, **not_late_bounds}

    # list rather than dictionary as order matters (so that a late-bound can depend on a previous late-bound)
    late_bounds = [(name, param.default.resolver) 
        for name, param in inspect.signature(target_fn).parameters.items()
        if isinstance(param.default, LateBound) and not name in arg_name_to_value
    ]

    _add_late_bounds(arg_name_to_value, late_bounds)
    return arg_name_to_value


#decorator function
def late_bind(target_fn: Callable | type) -> Callable | type:
    """decorates a function enabling late-binding of default parameters for it"""
    @functools.wraps(target_fn)
    def wrapper(*args, **kwargs):
        kwargs = _resolve_args(target_fn, *args, **kwargs)
        return target_fn(**kwargs)

    return wrapper

And you can use it like this:


from datetime import datetime
from dataclasses import dataclass
from late_bound_default_args import late_bind, LateBound

@late_bind
def say_hi(source: str, target: str, greet: str, 
    extra = LateBound(lambda: f"[{datetime.now():%Y-%m-%d_%H%M%S}]"),
    ):
    """"""
    return f"{greet} from {source} to {target}. {extra}"

@late_bind
def say_hi2(source: str, target: str, greet: str, 
    extra = LateBound(lambda greet: f"[{greet.upper()}!]"),
    ):
    """"""
    return f"{greet} from {source} to {target}. {extra}"

print(say_hi("Xuan", "Francois", "Bonjour"))
print(say_hi2("Xuan", "Francois", "Bonjour"))

#Bonjour from Xuan to Francois. [2024-02-02_002939]
#Bonjour from Xuan to Francois. [BONJOUR!]


# access to the "self" parameter in a late-bound method works also fine
@dataclass
class Person:
    name: str
    birth_place: str

    @late_bind
    def travel(self, by: str, 
        start_city: str = LateBound(lambda self: self.birth_place), 
        to: str = "Paris"
        ):
        """ """
        return(f"{self.name} is travelling from {start_city} to {to} by {by}")
    
p1 = Person("Xuan", "Xixon")
print(p1.travel("train"))
# Xuan is travelling from Xixon to Paris by train

I've uploaded it to a gist