Friday, 25 July 2025

Python Conditional Collection Literals

I've previously mentioned that I regularly visit the discuss python forum (the ideas and PEP's sections). You frequently find there interesting ideas, and though normally they will be rejected and will never be incorporated into python, learning about them is really worth. You'll find ideas that are present in other languages and you were not aware of, and alternative ways to achieve what the idea proposes by using available Python techniques. One of those ideas that recently caught my attention is Conditional collection literals. It proposes a syntax for defining collections literals in which some elements are added or omitted based on one condition. Let's see an example to make it clear:


my_list = [
    elem_1,
    elem_2 if condition,
    elem_3,
]


OK, this is something rarely needed, but the idea looks pretty interesting. Reading the ensuing discussion I've come up with an alternative that looks pretty good to me, just using a sentinel value (OMIT) to indicate that that element must be omitted and function performing the filtering. Let's see it (it's two functions, one for list-like collections and one for dictionaries:


from typing import Any, Callable, Iterable, TypeVar

T = TypeVar('T')
K = TypeVar('K')
V = TypeVar('V')

# OMIT = object() # better use a class to have it available for typing
class OMIT: pass

def conditional_list(*items: T | type[OMIT]) -> list[T]:
    return [it for it in items if it is not OMIT]

def conditional_dict(*pairs: tuple[K, V] | type[OMIT]) -> dict[K, V]:
    return dict(pair for pair in pairs if pair is not OMIT)


def test_conditional_list():
    l1 = conditional_list(
        "a",
        "b",
        "c" if False else OMIT,
        "d",
        "e" if True else OMIT,
    )
    print(l1)

def test_conditional_dict():
    d1 = conditional_dict(
        ("a", "AA"),
        ("b", "BB"),
        ("c", "CC") if False else OMIT,
        ("d", "DD"),
        ("e", "EE") if True else OMIT,
    )        
    print(d1)

test_conditional_list()
print("------------------------")
test_conditional_dict()

# ['a', 'b', 'd', 'e']
# ------------------------
# {'a': 'AA', 'b': 'BB', 'd': 'DD', 'e': 'EE'}


For the OMIT "sentinel" value I was using initially an object instance, but in order to have something that we can use in the type annotations I've ended up using a class. There's a deferred PEP-661 (but it seems it could be close to being approved by the Steering Council) for adding a utility to define sentinel values. There's an interesting discussion about it here and here

Friday, 18 July 2025

Kotlin Coroutines Revisited. Part 3: Event Loops and More

Part III of this series of posts [1] and [2] revisiting some Kotlin coroutines concepts (that' I've managed to better understand thanks to some GPT (Claude) discussions). This time we'll see a pretty interesting low level detail about the Kotlin asynchronous system. JavaScript and Python asynchrony revolves around an event loop. This event loop is not accessible from JavaScript, but in Python we even can interact with it (though this is rarely necessary unless we are developing some low level library).

Main idea: We can say that in asynchronous systems there are 2 main actions. Waiting for IO events to happen, and managing a queue with the tasks to be invoked (the callbacks that were set in our awaitables (Promises, Futures), resuming continuations...) when those IO events (new data available) happen. JavaScript and Python asyncio manage both aspects, while kotlinx.coroutines manages the task to be invoked, but the awaiting of IO events is done by additional libraries.

There are many articles about the node.js event loop, describing it with different levels of detail (which makes it confusing some times), but the very basic idea is that we have a loop that waits for IO events (a network response, a file read, a timer...) using epoll (or kqueue or whatever the OS provides). When those events happen, the callback for the event goes to the macrotask queue, and when that callback is executed it will normally resolve or reject a Promise, and the "then-catch callbacks" for that Promise go to the microtask queue. The mechanism in Python is similar. When the last function in a chain of async calls performs a real asynchronous operation, it will obtain a file descriptor (fd) and add it to the event loop list of "events to watch" with ev_loop.add_reader(fd, callback_fn). It also creates a Future object (fut1). That callback_fn will take care of invoking fut1.set_result() sometime in the future, which in turn will invoke Future._schedule_callbacks. This _schedule_callbacks will add to the event loop queue of actions the callbacks that had been set in that Future via add_done_callback(). This is done via loop.call_soon(callback, self, context=ctx) (that instructs the event loop to run those callbacks in its next iteration).

The thing is that in both JavaScript and Python a single event loop (running in a single thread) manages, in the different phases of each of its iterations, both awaiting for the IO events that will resolve/complete Promises/Futures (awaitables) and executing the callbacks for the resolution/completion of those Promises/Futures (callbacks that will take care of resuming our paused/suspended async functions/coroutines).

In Kotlin this is a bit different. Let's say that the awaiting for the IO events and the "execution of the callbacks" (that in Kotlin means resuming the suspend functions (the coroutine) via the continuation mechanism), happen in 2 different levels. We should already know about Coroutine Dispatchers, that are an essential part of the Kotlin coroutines machinery. These Dispatchers decide where to start the coroutine and where to resume it. "Coroutine Dispatchers determine the thread or thread pool where a coroutine executes. They play a critical role in managing how and where coroutines run". It can be in a thread pool or in a thread running an event loop. Dispatcher.IO and Dispatcher.Default use a thread pool. Dispatcher.Main will use the event loop in the Main UI Thread (in Android, Java FX, Swing... applications), and a coroutine created with the runBlocking coroutine builder will also run in an event loop (an EventLoop is indeed a Coroutine Dispatcher, as you can see here):
internal abstract class EventLoop : CoroutineDispatcher()

So those dispatchers (part of the kotlinx.coroutines library) manage the start and resumption of our coroutines, but what about waiting for the IO events (IO asynchronous calls)? That depends on the different libraries that we use for those IO operations, it's not part of the kotlinx.coroutines library, but a lower level layer. If we use netty for performing http requests I think it will manage the low level IO via some kind of low level event loop based on NIO (something like this). Other libraries performing non blocking IO will use its own event loop (or a thread pool). When the data is available, their callbacks will invoke continuation.resume, and we'll move back into our Kotlin dispatchers (the upper level layer). This means that in a same process we can have multiple event loops running, a high level one for the Kotlin dispatchers layer and 1 or more low level ones (using low level mechanisms like NIO, epoll) for the different non blocking IO libraries. The same happens for thread pools. This can feel like a bit of redundant.

On one hand the JavaScript and Python async systems seem more integrated. Python IO libraries have access not just to Futures (on which to set results), but to the asyncio event loop itself where they can register file descriptors for the IO operations, as the asyncio event loop watches for IO events on those descriptors (via epoll or whatever). In Kotlin that's not like that because kotlinx.coroutines does not provide a mechanism for watching for the IO events (it stays in an upper level, it does not go down to the epoll and file descriptors underworld), it's the libraries who have to take care of that at that lower level stuff, and then jump into the upper level by invoking continuation.resume through a callback.

On the other hand, the Kotlin asynchronous model is more versatile, with its powerful dispatchers, that can use a single thread event loop and/or thread pools. Indeed, Dispatcher.IO uses a Thread pool optimized for IO operations and Dispatcher.Default a thread pool optimized for CPU bound operations.

Wednesday, 9 July 2025

Kotlin Coroutines Revisited. Part 2: Suspension

Part II of this series of posts that I started last week revisiting some Kotlin coroutines concepts (that' I've managed to better understand thanks to some GPT (Claude) discussions). We'll deal today with an essential part, the suspension mechanism. We know (and if you don't know it, probably this post is not for you) that each suspend function is transformed by the Kotlin compiler into a state machine. When we have a chain of suspend function calls (suspend functionA calls suspend functionB that calls suspend functionC...) the way these functions are linked (now that because of suspension and resumption we don't have then "linked" in a normal call stack) so that when functionC completes functionB gets resumed in the point where it got suspended/paused is by means of Continuation objects (callbacks on steroids). In JavaScript or Python async functions return a Promise or a Future, and it's those objects what indicates that a function is being "suspended". But in Kotlin where we pass continuations rather than returning "awaitables", how does a suspend function (the "leaf" function in our chain of suspend calls) ultimately communicate its caller that it's suspending? (it could suspend or it could just return a normal value sequentially depending on a conditional).

For a suspension to happen the last suspend function in the call (the one performing a low level asynchronous IO operation) has to invoke suspendCoroutine or suspendCancellableCoroutine, that in turn have will return the special marker value COROUTINE_SUSPENDED if the suspension really must take place. This COROUTINE_SUSPENDED value will be managed by the state machines created by the compiler for the different suspend functions and when a suspension must take place it will propagate through the stack (this suspension indicator is a normal return, so it propagates through the normal stack, then, when the suspend function "returns" a value asynchronously, it will propagate through the continuations chain).

suspendCoroutine and suspendCancellableCoroutine are intrinsic compiler functions. They're the only way to actually create a suspension point in the execution.

This is a very, very rough approximation (the signature is wrong and this state machine goes into a class with additional logic, it's not just a function) to how the state machine corresponding to a suspend function looks like:


// Conceptually, your userFunction becomes something like:
fun userFunction$stateMachine(continuation: Continuation, label: Int): Any? {
    when (label) {
        0 -> {
            // Initial call
            val result = networkCall(continuation.with(label = 1))
            if (result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
            // Fall through to label 1
        }
        1 -> {
            // Resumed here - no "return" up the stack
            val networkResult = continuation.getResult()
            return processResult(networkResult)
        }
    }
}

And, how do these so special suspendCoroutine and suspendCancellableCoroutine functions work? Both functions receive a block (that expects as Continuation as parameter), and it's this block who will perform (or not) a low level asynchronous operation that once ready will call us back, meaning that in between our suspend functions will get suspended (to be resumed when we are called back). Both suspendCoroutine and suspendCancellableCoroutine end up calling suspendCoroutineUninterceptedOrReturn but suspendCoroutine passes over to the block a Continuation and suspendCancellableCoroutine passes over a CancellableContinuation. So in the end the magic happens in suspendCoroutineUninterceptedOrReturn. That's an intrinsic function, there's no Kotlin code for it. "Intrinsic compiler functions" are special functions that don't have normal implementations - instead, the compiler has built-in knowledge of what they do and generates specific bytecode for them.

I'll show now the code that the GPT (Claude) came up with during our discussions.

This is how a function (and the associated block) that invokes suspedCoroutine/suspendCancellableCoroutine looks. The first one is not performing anything asynchronous and hence there is no suspension. The second one is causing a suspension for real (it invokes an asynchronous function that will use as callback continuation.resume).


//Path 1: Immediate Completion (No Suspension)
suspend fun quickOperation(): String {
    return suspendCoroutine { cont ->
        // This completes immediately
        cont.resume("immediate result")
        // shouldSuspend() will return false
    }
}

//Path 2: Deferred Completion (Actual Suspension)
suspend fun networkCall(): String {
    return suspendCoroutine { cont ->
        // Register with event loop, but don't complete yet
        eventLoop.registerSocket(socket) { response ->
            cont.resume(response) // This happens later!
        }
        // Block ends without calling cont.resume()
        // shouldSuspend() will return true
    }
}


And this is the "Kotlin pseudocode" for suspendCoroutineUninterceptedOrReturn (as I've said, being an intrinsic there's no Kotlin source for it)


// Conceptual implementation of suspendCoroutine
suspend fun  suspendCoroutineUninterceptedOrReturn(block: (Continuation) -> Unit): T {
    val continuation = getCurrentContinuation()
    
    // Mark continuation as "not completed yet"
    continuation.markAsPending()
    
    block(continuation) // User code runs
    
    // Check if the block completed the continuation synchronously
    if (continuation.isCompleted) {
        // Fast path - no actual suspension needed
        return continuation.getResult()
    } else {
        // Slow path - we need to suspend
        return COROUTINE_SUSPENDED
    }
}

Notice that in the quickOperation() function that invokes suspendCoroutine with a block that returns immediatelly, that block is calling continuation.resume. That feels odd, cause as it's not suspending there's no need to resume a continuation, just set the continuation as completed and return normally. Well, Claude to the rescue, that's exactly what continuation.resume is doing in that particular case.

In practice, continuation.resume() is indeed doing exactly what you're thinking - it's setting the result and marking the continuation as completed, but without triggering any actual resumption logic.

So the same continuation.resume function manages 2 completely different situations:

- Synchronous completion:
Continuation is completed within the suspendCoroutine block
No actual suspension occurs
Result flows directly back through the call stack


- Asynchronous completion (typical case):
Continuation is completed later from a callback/event handler
Actual suspension and resumption occurs
Result triggers coroutine resumption on appropriate dispatcher

Claude can guess that its implementation should look more or less like this:


// Conceptual implementation
class ContinuationImpl {
    private var result: Any? = UNINITIALIZED
    private var isCompleted = false
    
    fun resume(value: T) {
        if (!isCompleted) {
            this.result = Result.success(value)
            this.isCompleted = true
            
            // Key point: if we're in synchronous context,
            // this doesn't trigger async resumption
            if (inSuspendCoroutineBlock) {
                // Just mark as completed, don't dispatch
            } else {
                // Actually dispatch to resume suspended coroutine
                dispatch()
            }
        }
    }
}

An additional point. The suspendCoroutine function (and suspendCancellableCoroutine and suspendCoroutineUninterceptedOrReturn) is marked with the suspend modifier, but indeed, is this necessary? suspendCoroutine is never going to get suspended (no state machine needed for it), depending on what the block does it will return a value or return the CoroutineSuspended marker value. The resuming will start in the continuation corresponding to the caller of suspendCoroutine. So, why is it marked with suspend? Again, Claude to the rescue:

The suspend modifier on suspendCoroutine is purely for access control and type safety - it ensures it's only called from suspend context where the compiler can properly handle the COROUTINE_SUSPENDED return value.

For closing up I'll add a link to a pretty good article on this topic.

Sunday, 6 July 2025

Kotlin Coroutines Revisited. Part 1: Cancellation

Last year I wrote several posts about Kotlin coroutines as I I was learning how they worked. Lately I've had some exciting GPT (Claude) discussions to dive deeper into different aspects of their implementation. I plan to write a few posts about this, and I'll start today by how cancellation works. The texts in green in this post are GPT (Claude) wisdom.

In this previous post I explained how cancellation is cooperative. The Kotlin documentation makes it clear:

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

Based on the above, my understanding when I wrote that post was that those krolinx.coroutines suspend functions would perform checks for cancellation in the different state transitions of each state machine, when they were resumed. Well, there's much more to that. Let's see:

First of all, let's say we have a function that is suspended waiting for some IO operation and the job associated to its coroutine is cancelled (we call job.cancel() and it transitions into the cancelled state). Does this "cooperative nature" mean that the coroutine won't be cancelled until the IO operation is completed and the suspend function gets resumed and can do a check for cancellation? No, it's not like that. Normally the coroutine will get cancelled immediatelly. For that, the suspend function must have been suspended by calling suspendCancellableCoroutine rather than suspendCoroutine. Notice that these 2 functions are the only way for a suspend function to perform a suspension (returning the COROUTINE_SUSPENDED value, I'll further explain this in a separate post).

Functions using suspendCancellableCoroutine register a cancellation handler that can immediately interrupt the underlying operation (like cancelling a timer, closing a socket, etc.).

For example, delay() is implemented roughly like:


suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return
    return suspendCancellableCoroutine { cont ->
        // Schedule timer
        val timer = scheduleTimer(timeMillis) { cont.resume(Unit) }
        // Register cancellation handler to cancel the timer
        cont.invokeOnCancellation { timer.cancel() }
    }
}

That cancellation handler will take care of finishing/cleaning the underlying asynchronous operation. So my understanding is that when Job.cancel is invoked, it will check if the coroutine associated to that job is suspended. If so it will follow the chain of continuations associated to the chain of suspend functions for that coroutine, and if the last continuation has a cancellation handler it will invoke it. Then, it will resume the continuation providing it a CancellationException (we'll see this a bit more in depth at the end of the post). We can say the using suspendCancellableCoroutine is the first kind of cooperative behaviour, as we allow for immediate cancellation during suspension.

A second kind of cooperative behaviour is checking for cancellation before performing suspension (before invoking suspendCoroutine/suspendCancellableCoroutine). Indeed, most I/O-bound kotlinx.coroutines functions check for cancellation (using ensureActive()) before performing suspension.

If the suspend function has no way to cancel the underlying low level operation it will use suspendCoroutine rather than suspendCancellableCoroutine to get suspended. That means that it won't be cancelled while it's in suspended state, and it must check for cancellation when it's resumed. It'll do that using ensureActive(). That's the third kind of cooperative behaviour.

There's a fourth kind of cooperative behaviour. When a suspend function performs also some kind of CPU-bound operation it should check for cancellation before starting the CPU-bound operation, in between (if possible) and at the end of the CPU-bound operation.

I said a few paragraphs above that I would farther talk about CancellationException, well, indeed about how Exceptions are propagated inside coroutines. Once one function in a chain of suspend function calls in one coroutine gets suspended, we no longer have a call stack, but a mechanism of resumption based on continuations. So, how can an exception propagate from one function to its calling function? Well, the same as a returned value, by means of a Result object. In other asynchronous systems like JavaScript or Python asyncio, async funtions return Promises (that can be resolved or rejected), and Futures (that are set as Done to a result or to an exception). In Kotlin we have continuations (a callback on steroids), that are resumed with a Result object.

So continuations don't just pass values - they pass Result objects that can encapsulate either success or failure. When an exception occurs, it's wrapped in a failed Result.

Let's say we have these 2 suspend functions: getReportInfo that invokes getPersonInfo (that in turn invokes three other 3 suspend functions: getName, getLastName and getCity )


suspend fun getReportInfo() {
    try {
        val personInfo = getPersonInfo() // Exception propagates here
    } catch (e: Exception) {
        // Catches the exception from getLastName()
        println("Caught: ${e.message}")
    }
}

suspend fun getPersonInfo(): PersonInfo {
    val name = getName()        // State 0 → 1
    val lastName = getLastName() // State 1 → 2 (exception thrown here)
    val city = getCity()        // State 2 → 3 (never reached)
    return PersonInfo(name, lastName, city)
}

This is how the Kotlin compiler magic would transform them (thanks GPT!!!)


// Conceptual representation of compiled suspend function
class GetPersonInfoStateMachine : Continuation {
    override fun resumeWith(result: Result) {
        try {
            when (state) {
                0 -> { /* initial state */ }
                1 -> { 
                    // After getName() completes
                    if (result.isFailure) {
                        // Exception from getName - propagate up
                        completion.resumeWithException(result.exceptionOrNull()!!)
                        return
                    }
                    // Continue to getLastName()
                }
                2 -> {
                    // After getLastName() completes  
                    if (result.isFailure) {
                        // This is where your getLastName exception arrives
                        completion.resumeWithException(result.exceptionOrNull()!!)
                        return
                    }
                    // Would continue to getCity()
                }
                // ... more states
            }
        } catch (e: Exception) {
            // Any synchronous exception in state machine
            completion.resumeWithException(e)
        }
    }
}

// getReportInfo() with try-catch - compiler-generated logic
class GetReportInfoStateMachine : Continuation {
    override fun resumeWith(result: Result) {
        try {
            when (state) {
                0 -> {
                    // About to call getPersonInfo()
                    state = 1
                    getPersonInfo(this) // pass this as continuation
                }
                1 -> {
                    // getPersonInfo() completed
                    if (result.isFailure) {
                        // YOUR try-catch exists, so throw to trigger it
                        throw result.exceptionOrNull()!!
                    }
                    // Success case - continue with result
                    val personInfo = result.getOrThrow()
                    // ... rest of function
                }
            }
        } catch (e: Exception) {
            if (isInTryBlock) {
                // Execute your catch block logic here
                handleException(e)
            } else {
                // No try-catch, propagate up
                completion.resumeWithException(e)
            }
        }
    }
}

the magic happens in that if (result.isFailure) throw exception check that gets inserted wherever the compiler detects you want traditional exception handling semantics.

Notice that at each "entry point" of the state machine corresponding to the suspend function there's a check to see if an exception has happened. If the original code (before the compiler magic transformed the function into a state machine) was wrapped in a try-catch, the excpetion is thrown, else we pass it to the caller by invoking continuation.resumeWithException. Notice that resumeWithException just creates a Result object with failure set to the exception and invokes resumeWith(result). Its source code is just:


public inline fun  Continuation.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

Wednesday, 2 July 2025

Builder Pattern in Kotlin

I've recently been re-reading this discussion about the Builder pattern in Java, and I've been reflecting a bit on the use of this pattern in modern languages like Kotlin. In languages supporting named and default parameters one of the use cases for the Builder pattern no longer exists. You can have a constructor that accepts a long list of parameters, most of them with default values, and you can invoke it passing named parameters. Kotlin constructors are perfectly friendly for that, same as dataclasses in Python.

That said, there are still other use cases for the Builder pattern. Let's say you want to build your object over time, as the data needed for it gets ready. OK, we could use that constructor with default parameters to start off, and then set other properties as the data for it is obtained (even using the apply scope function). If you want an immutable object that approach is not valid, and there's another main problem that applies both if immutability is or is not a concern to you. That problem is that we have an object in an incomplete/inconsistent state, that we could start to use before time just by error. Having a builder with a build() method that checks the consistency of the object and throws an exception if necessary is the way to go. Also, when the initialization logic is complex, not just setting properties with parameters, but doing some validations, calculations, conditional logic... having that logic as methods of the builder is pretty neat.

OK, so now that it's clear that the Builder pattern is still necessary for certain cases, I'm going to delve a bit into a way of using it that confused me a bit. When using the Kotlin Ktor library for doing an http request, you find code like this:


client.request {
    url("https://api.example.com/users")
    headers {
        append("Authorization", "Bearer token")
        append("Accept", "application/json")
        if (someCondition) append("X-Custom", "value")
    }
    timeout {
        requestTimeoutMillis = 5000
        connectTimeoutMillis = 3000
    }
}

That code corresponds to this signature (that I'll call the "complex" one):
inline suspend fun HttpClient.request(block: HttpRequestBuilder.() -> Unit): HttpResponse
So the request method creates a HttpRequestBuilder object and passes it over (as receiver) to the block that it has got as argument. That block configures the Builder and I guess then the request method will end up invoking the build() method on the constructed builder. That feels like a bit overcomplicated, why not just create the builder, configure it and pass it over to request()? Well, indeed there's another overload for request() that just expects that, a Builder object. This is the corresponding signature (that I'll call the "simple" one):
inline suspend fun HttpClient.request(builder: HttpRequestBuilder = HttpRequestBuilder()): HttpResponse

So what are the pros and cons of each overload?

The "simple" one is perfect when we want a builder that we will reuse on several calls. We create the builder first and pass it over to the different calls. However, if we have a one-shot builder, we'll want to create/configure it in place, and the HttpRequestBuilder is not that nice for that. It does not come with a fluid interface, mainly I guess because it's expected that part of the chaining could be conditional, and for that we use apply(). This means having a slightly more verbose code:


client.request(HttpRequestBuilder().apply {
    url("https://api.example.com/users")
    headers {
        append("Authorization", "Bearer token")
        append("Accept", "application/json")
        if (someCondition) append("X-Custom", "value")
    }
    timeout {
        requestTimeoutMillis = 5000
        connectTimeoutMillis = 3000
    }
})

Asking a GPT for his opinion, the main conclusion is that the "complex" approach is:

More idiomatic Kotlin. It follows Ktor's DSL patterns used throughout the library.
So while functionally similar to apply(), the DSL overload is specifically designed for Ktor's fluent API style and is generally preferred for inline request building. It's a great example of Kotlin's DSL capabilities making APIs more expressive and readable.

I rather agree. The "complex" approach is more complex in terms of reasoning about how it works, but it's more expressive and looks more natural.