Friday, 19 August 2022

AsyncExecutor

It's interesting how depending on what you have been working on lately you can come up with different solutions for the same issue. 3 years ago I posted about a solution to limit the number of asynchronous calls running in parallel in JavaScript. Basically I was asking permission to a TicketManager that was like a sort of C#'s SemaphoreSlim.

In the last months I've been working mainly with Python, and I've discovered the so nice functionality provided by ThreadPoolExecutor and ProcessPoolExecutor, and before the prospect of limiting async calls in JavaScript again (e.g. limiting the number of http requests in progress to the same server at a given time), I've naturally come up with the idea of implementing an AsyncExecutor.

So we have a class with a max number of slots (actions), and we submit to it asynchronous functions and their parameters. If it has some free slot it will run the asynchronous function and return the Promise returned by that call. If there are no slots, it puts the funtion in a queue, and returns a Promise that will get resolved once the function has been invoked and resolved. That's all:


class AsyncAction{
    constructor(fn, args, resolveFn, rejectFn){
        this.fn = fn;
        this.args = args;
        this.resolveFn = resolveFn;
        this.rejectFn = rejectFn;
    }
}

export class AsyncExecutor{
    constructor(maxRunningActions){
        this.maxRunningActions = maxRunningActions;
        this.runningCounter = 0;
        this.pendingActions = [];
    }

    submit(fn, ...args){
        let resolveFn, rejectFn;
        let pr = new Promise((resFn, rejFn) => {
            resolveFn = resFn;
            rejectFn = rejFn;
        });
        let action = new AsyncAction(fn, args, resolveFn, rejectFn);

        if (this.runningCounter < this.maxRunningActions){
            this._runAction(action);
        }
        else{
            this.pendingActions.push(action);
        }

        return pr;
    }

    async _runAction(action){
        this.runningCounter++;
        let result;
        try{
            result = await action.fn(...(action.args));
        }
        catch (ex){
            this._processActionResult(action, false, ex);
            return;
        }
        this._processActionResult(action, true, result);
    }

    _processActionResult(action, successful /*bool*/, result){
        this.runningCounter--;
        if (successful){
            action.resolveFn(result);
        }
        else{
            action.rejectFn(result); //here result is an exception
        }
        if (this.pendingActions.length){
            this.runningCounter++;
            this._runAction(this.pendingActions.shift());
        }
    }
}

You can test it with this:


import {AsyncExecutor} from "./asyncExecutor.js"


function asyncSleep(interval){
    return new Promise(resFn => {
        setTimeout(resFn, interval);
    })
}

async function mockDownload(url, delay){
    console.log(`starting download: ${url}`);
    await asyncSleep(delay);
    return url.toUpperCase();
}

let executor = new AsyncExecutor(3);
let asyncDownloads = []
for (let i=0; i<= 6; i++){
    asyncDownloads.push(executor.submit(
        mockDownload, 
        `www.resistence.fr/post_${i}`, Math.floor(Math.random() * 4000)
    ).then(result => {
        console.log(`download ${i} finished`);
        return result;
    }));
    console.log("after submit");
}

let results = await Promise.all(asyncDownloads);
console.log(`results: ${JSON.stringify(results, null, "\t")}`);

/*
starting download: www.jesoutienslapolice.fr/post_0
after submit
starting download: www.jesoutienslapolice.fr/post_1
after submit
starting download: www.jesoutienslapolice.fr/post_2
after submit
after submit
after submit
after submit
after submit
starting download: www.jesoutienslapolice.fr/post_3
download 2 finished
starting download: www.jesoutienslapolice.fr/post_4
download 3 finished
starting download: www.jesoutienslapolice.fr/post_5
download 0 finished
starting download: www.jesoutienslapolice.fr/post_6
download 1 finished
download 5 finished
download 6 finished
download 4 finished
results: [
        "WWW.JESOUTIENSLAPOLICE.FR/POST_0",
        "WWW.JESOUTIENSLAPOLICE.FR/POST_1",
        "WWW.JESOUTIENSLAPOLICE.FR/POST_2",
        "WWW.JESOUTIENSLAPOLICE.FR/POST_3",
        "WWW.JESOUTIENSLAPOLICE.FR/POST_4",
        "WWW.JESOUTIENSLAPOLICE.FR/POST_5",
        "WWW.JESOUTIENSLAPOLICE.FR/POST_6"
]

*/

As usual, I've uploaded it to a gist.

No comments:

Post a Comment