Wednesday 7 August 2019

Throttling Async Calls

The TPL and Parallel.ForEach make it easy to run multiple instances of a same operation in parallel, taking advantage of the number of processing units (processor cores) in your machine. This is great for CPU bound operations, but what about async I/O bound operations (async http calls for example)?. If the operation returns a Task, it makes no sense to start several threads and keep them idle waiting for the asynchronous result. We should use a single thread to launch each operation one after another (without awaiting for the Task to be completed) and let a continuation to be invoked in Thread Pool thread once the result is available (or maybe push the Tasks in a collection and do a WaitAll). Something like this (consider it pseudocode)


var postTasks = List<Task<string>>();
foreach (var url in urls)
{
 contents.Add(this.getPost(url));
}
var posts = postTasks.WaitAll();

That looks good, but in many occasions launching a huge number of async operations in parallel is not a good idea. Let's say we are sending those http requests to the same server, we don't want to overload it (or make it think that we're doing a DOS attack!). Thinking about this, I found this discussion in SO explaining the options to throttle these async operations. The usage of SemaphoreSlim is pretty interesting.

A questions comes to mind, how to deal with this same problem in that different universe called JavaScript, where awaitables are called Promises rather than Tasks and where the runtime is single-threaded? I've been playing a bit with this, and here it goes what I've came up with.

I've created a simple TicketManager class that is like a sort of SemaphoreSlim. It has a number of available tickets (as many as parallel async calls we want to do). Before invoking the async function we'll request a ticket, receiving a Promise that will be resolved as soon as an execution slot is available for us, and after completing the async function we'll release it. If there are no tickets available we'll await for one (getTicket returns a Promise).

class TicketProvider{
    constructor(maxActiveTickets){
        this.activeTickets = 0;
        this.maxActiveTickets = maxActiveTickets;
        this.waitingTickets = []; //array of resolve function, to get invoked once we have a free slot, so the code waiting for the ticket can move on
    }

    //returns Promise
    getTicket(){
        if (this.activeTickets {
                //save the resolve function so that we can invoke it when another ticket gets released
    this.waitingTickets.push(res);
            });
        }
    }

    releaseTicket(){
        this.activeTickets--;
        let nextTicketToResolve = this.waitingTickets.shift();
        if (nextTicketToResolve) {
            nextTicketToResolve();
        }
    }
}

So when a ticket is requested, if we still have some slot available we return a resolved promise, so the client will immediatelly continue. If not, we return a Promise. This Promise will get resolved when a slot gets realeased. Promises can not be resolved from outside (contrary to .Net, where a Task can be resolved by an associated TaskCompletionSource), it's from "inside" the Promise that we can resolve it, by means of the function received as parameter in the Promise constructor. So we have to create a Promise that checks at intervals if a slot has been released, and in that case resolves itself store that resolve function in the TicketManager.

Given an async function like this:

function getPost(postId:number):Promise{
    console.log("getPost " + postId + " started");
    return new Promise(res => {
        setTimeout(() => {
            console.log("---> getPost " + postId + " finishing");
            res(`${postId} - content`)
        }, 2000 * (1 + getRandomInt(2)));
    });
}

We can leverage this TicketManager like this (notice that I've wrapped in an additional function the call to the async call, in order to wait for it and then call to releaseTicket) in order to run no more than 3 post requests at the same time.

async function getPostAndReleaseTicket(postId:number, ticketProvider:TicketProvider):Promise{
    let post = await getPost(postId);
    ticketProvider.releaseTicket();
    return post;
}

async function runWithThrottling1(){
    let ticketProvider = new TicketProvider(3);
    let curPos = 0;
    let requests = [];
    while (curPos < 10){
        let ticket = await ticketProvider.getTicket();
        requests.push(getPostAndReleaseTicket(curPos, ticketProvider));
        curPos++;
    }
    await Promise.all(requests);
    console.log("all posts retrieved");
}

Or like this (now the wrapper funtion takes care of both getting the ticket and releasing it)

async function getTicketAndGetPostAndReleaseTicket(postId:number, ticketProvider:TicketProvider):Promise{
    await ticketProvider.getTicket();
    let post = await getPost(postId);
    ticketProvider.releaseTicket();
    return post;
}

async function runWithThrottling2(){
    let ticketProvider = new TicketProvider(3);
    let curPos = 0;
    let requests = [];
    while (curPos < 10){
        requests.push(getTicketAndGetPostAndReleaseTicket(curPos, ticketProvider));
        curPos++;
    }
    await Promise.all(requests);
    console.log("all posts retrieved");
}

I've uploaded the code to a gist

No comments:

Post a Comment