From 930dcf12982091cb5658b5a44acd47676fbae631 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 11 Jun 2023 21:09:48 +0200 Subject: [PATCH] refactor: factor out dynamic worker creation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 52 +++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index db7c83ee..c3565446 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -614,28 +614,14 @@ export abstract class AbstractPool< * @returns The worker node key */ protected chooseWorkerNode (): number { - let workerNodeKey: number - if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) { - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { - const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && - this.workerNodes[currentWorkerNodeKey].workerUsage.tasks - .executing === 0) - ) { - // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - this.flushTasksQueue(currentWorkerNodeKey) - // FIXME: wait for tasks to be finished - void (this.destroyWorker(workerCreated) as Promise) - } - }) - workerNodeKey = this.getWorkerNodeKey(workerCreated) - } else { - workerNodeKey = this.workerChoiceStrategyContext.execute() + if (this.shallCreateDynamicWorker()) { + return this.getWorkerNodeKey(this.createAndSetupDynamicWorker()) } - return workerNodeKey + return this.workerChoiceStrategyContext.execute() + } + + protected shallCreateDynamicWorker (): boolean { + return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() } /** @@ -706,6 +692,30 @@ export abstract class AbstractPool< return worker } + /** + * Creates a new dynamic worker and sets it up completely in the pool worker nodes. + * + * @returns New, completely set up dynamic worker. + */ + protected createAndSetupDynamicWorker (): Worker { + const worker = this.createAndSetupWorker() + this.registerWorkerMessageListener(worker, message => { + const currentWorkerNodeKey = this.getWorkerNodeKey(worker) + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + (message.kill != null && + this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing === + 0) + ) { + // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) + this.flushTasksQueue(currentWorkerNodeKey) + // FIXME: wait for tasks to be finished + void (this.destroyWorker(worker) as Promise) + } + }) + return worker + } + /** * This function is the listener registered for each worker message. * -- 2.34.1