From 60ff5f050aac60f90e63638029b88e80e91e259c Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 26 Aug 2024 16:17:03 +0200 Subject: [PATCH] fix: ensure no deleted dynamic worker can be used to steal task(s) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 58 +++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d572bc0f..15d7076f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -281,14 +281,13 @@ export abstract class AbstractPool< "WorkerNode event detail 'workerNodeKey' property must be defined" ) } - const workerNodeInfo = this.getWorkerInfo(workerNodeKey) - if (workerNodeInfo == null) { - throw new Error( - `Worker node with key '${workerNodeKey.toString()}' not found in pool` - ) + const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (workerNode == null) { + return } if ( - !workerNodeInfo.continuousStealing && + !workerNode.info.continuousStealing && (this.cannotStealTask() || (this.info.stealingWorkerNodes ?? 0) > Math.round( @@ -299,12 +298,12 @@ export abstract class AbstractPool< ) { return } - const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + const workerNodeTasksUsage = workerNode.usage.tasks if ( - workerNodeInfo.continuousStealing && + workerNode.info.continuousStealing && !this.isWorkerNodeIdle(workerNodeKey) ) { - workerNodeInfo.continuousStealing = false + workerNode.info.continuousStealing = false if (workerNodeTasksUsage.sequentiallyStolen > 0) { this.resetTaskSequentiallyStolenStatisticsWorkerUsage( workerNodeKey, @@ -313,7 +312,7 @@ export abstract class AbstractPool< } return } - workerNodeInfo.continuousStealing = true + workerNode.info.continuousStealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) this.updateTaskSequentiallyStolenStatisticsWorkerUsage( workerNodeKey, @@ -359,29 +358,28 @@ export abstract class AbstractPool< sourceWorkerNode: IWorkerNode, destinationWorkerNodeKey: number ): Task | undefined => { - const destinationWorkerInfo = this.getWorkerInfo(destinationWorkerNodeKey) - if (destinationWorkerInfo == null) { - throw new Error( - `Worker node with key '${destinationWorkerNodeKey.toString()}' not found in pool` - ) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (destinationWorkerNode == null) { + return } // Avoid cross and cascade task stealing. Could be smarter by checking stealing/stolen worker ids pair. if ( !sourceWorkerNode.info.ready || sourceWorkerNode.info.stolen || sourceWorkerNode.info.stealing || - !destinationWorkerInfo.ready || - destinationWorkerInfo.stolen || - destinationWorkerInfo.stealing + !destinationWorkerNode.info.ready || + destinationWorkerNode.info.stolen || + destinationWorkerNode.info.stealing ) { return } - destinationWorkerInfo.stealing = true + destinationWorkerNode.info.stealing = true sourceWorkerNode.info.stolen = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()! sourceWorkerNode.info.stolen = false - destinationWorkerInfo.stealing = false + destinationWorkerNode.info.stealing = false this.handleTask(destinationWorkerNodeKey, stolenTask) this.updateTaskStolenStatisticsWorkerUsage( destinationWorkerNodeKey, @@ -1291,32 +1289,28 @@ export abstract class AbstractPool< } private isWorkerNodeBusy (workerNodeKey: number): boolean { + const workerNode = this.workerNodes[workerNodeKey] if (this.opts.enableTasksQueue === true) { return ( - this.workerNodes[workerNodeKey].info.ready && - this.workerNodes[workerNodeKey].usage.tasks.executing >= + workerNode.info.ready && + workerNode.usage.tasks.executing >= // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.concurrency! ) } - return ( - this.workerNodes[workerNodeKey].info.ready && - this.workerNodes[workerNodeKey].usage.tasks.executing > 0 - ) + return workerNode.info.ready && workerNode.usage.tasks.executing > 0 } private isWorkerNodeIdle (workerNodeKey: number): boolean { + const workerNode = this.workerNodes[workerNodeKey] if (this.opts.enableTasksQueue === true) { return ( - this.workerNodes[workerNodeKey].info.ready && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 && + workerNode.info.ready && + workerNode.usage.tasks.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0 ) } - return ( - this.workerNodes[workerNodeKey].info.ready && - this.workerNodes[workerNodeKey].usage.tasks.executing === 0 - ) + return workerNode.info.ready && workerNode.usage.tasks.executing === 0 } private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { -- 2.34.1