From a2ed505383d2c128bd616a29ffb48a1975174036 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 10 Jul 2023 13:40:41 +0200 Subject: [PATCH] feat: ensure on worker error that queued tasks are redistributed to ready workers MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 119 +++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 58 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4bf7785d..27d8af0f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -904,19 +904,6 @@ export abstract class AbstractPool< message: MessageValue ): void - /** - * Registers a listener callback on the given worker. - * - * @param worker - The worker which should register a listener. - * @param listener - The message listener callback. - */ - private registerWorkerMessageListener( - worker: Worker, - listener: (message: MessageValue) => void - ): void { - worker.on('message', listener as MessageHandler) - } - /** * Creates a new worker. * @@ -924,24 +911,6 @@ export abstract class AbstractPool< */ protected abstract createWorker (): Worker - /** - * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. - * Can be overridden. - * - * @param worker - The newly created worker. - */ - protected afterWorkerSetup (worker: Worker): void { - // Listen to worker messages. - this.registerWorkerMessageListener(worker, this.workerListener()) - // Send startup message to worker. - this.sendToWorker(worker, { - ready: false, - workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number - }) - // Setup worker task statistics computation. - this.setWorkerStatistics(worker) - } - /** * Creates a new worker and sets it up completely in the pool worker nodes. * @@ -983,33 +952,6 @@ export abstract class AbstractPool< return worker } - private redistributeQueuedTasks (workerNodeKey: number): void { - while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey - let minQueuedTasks = Infinity - for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued === 0 - ) { - targetWorkerNodeKey = workerNodeId - break - } - if ( - workerNodeId !== workerNodeKey && - workerNode.usage.tasks.queued < minQueuedTasks - ) { - minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId - } - } - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) - } - } - /** * Creates a new dynamic worker and sets it up completely in the pool worker nodes. * @@ -1041,6 +983,67 @@ export abstract class AbstractPool< return worker } + /** + * Registers a listener callback on the given worker. + * + * @param worker - The worker which should register a listener. + * @param listener - The message listener callback. + */ + private registerWorkerMessageListener( + worker: Worker, + listener: (message: MessageValue) => void + ): void { + worker.on('message', listener as MessageHandler) + } + + /** + * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. + * Can be overridden. + * + * @param worker - The newly created worker. + */ + protected afterWorkerSetup (worker: Worker): void { + // Listen to worker messages. + this.registerWorkerMessageListener(worker, this.workerListener()) + // Send startup message to worker. + this.sendToWorker(worker, { + ready: false, + workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number + }) + // Setup worker task statistics computation. + this.setWorkerStatistics(worker) + } + + private redistributeQueuedTasks (workerNodeKey: number): void { + while (this.tasksQueueSize(workerNodeKey) > 0) { + let targetWorkerNodeKey: number = workerNodeKey + let minQueuedTasks = Infinity + for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { + const workerInfo = this.getWorkerInfo(workerNodeId) + if ( + workerNodeId !== workerNodeKey && + workerInfo.ready && + workerNode.usage.tasks.queued === 0 + ) { + targetWorkerNodeKey = workerNodeId + break + } + if ( + workerNodeId !== workerNodeKey && + workerInfo.ready && + workerNode.usage.tasks.queued < minQueuedTasks + ) { + minQueuedTasks = workerNode.usage.tasks.queued + targetWorkerNodeKey = workerNodeId + } + } + this.enqueueTask( + targetWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) + } + } + /** * This function is the listener registered for each worker message. * -- 2.34.1