From 37cc14e81ea8e83ea2b78cf27e0e74f3c4694e21 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 14 Aug 2024 13:55:01 +0200 Subject: [PATCH] fix: ensure task stealing can't start twice on the same worker node MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 82 +++++++++++++++--------------- src/pools/pool.ts | 4 +- src/pools/worker-node.ts | 1 + src/pools/worker.ts | 7 ++- tests/pools/abstract-pool.test.mjs | 5 +- tests/pools/worker-node.test.mjs | 2 + 6 files changed, 53 insertions(+), 48 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index a45eef05..61f01a6e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -321,14 +321,7 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { stealingWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.info.stealing ? accumulator + 1 : accumulator, - 0 - ), - }), - ...(this.opts.enableTasksQueue === true && { - stolenWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.info.stolen ? accumulator + 1 : accumulator, + workerNode.info.continuousStealing ? accumulator + 1 : accumulator, 0 ), }), @@ -1675,7 +1668,7 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0 && workerInfo != null && - !workerInfo.stealing))) + !workerInfo.continuousStealing))) ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) @@ -1862,15 +1855,16 @@ export abstract class AbstractPool< private updateTaskSequentiallyStolenStatisticsWorkerUsage ( workerNodeKey: number, - taskName: string, + taskName?: string, previousTaskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (workerNode?.usage != null) { + if (workerNode?.usage != null && taskName != null) { ++workerNode.usage.tasks.sequentiallyStolen } if ( + taskName != null && this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { @@ -1892,7 +1886,7 @@ export abstract class AbstractPool< private resetTaskSequentiallyStolenStatisticsWorkerUsage ( workerNodeKey: number, - taskName: string + taskName?: string ): void { const workerNode = this.workerNodes[workerNodeKey] // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition @@ -1900,6 +1894,7 @@ export abstract class AbstractPool< workerNode.usage.tasks.sequentiallyStolen = 0 } if ( + taskName != null && this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { @@ -1934,16 +1929,16 @@ export abstract class AbstractPool< destinationWorkerInfo.stealing = true sourceWorkerNode.info.stolen = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueLastPrioritizedTask()! + const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()! sourceWorkerNode.info.stolen = false destinationWorkerInfo.stealing = false - this.handleTask(destinationWorkerNodeKey, task) + this.handleTask(destinationWorkerNodeKey, stolenTask) this.updateTaskStolenStatisticsWorkerUsage( destinationWorkerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - task.name! + stolenTask.name! ) - return task + return stolenTask } private readonly handleWorkerNodeIdleEvent = ( @@ -1956,6 +1951,13 @@ export abstract class AbstractPool< "WorkerNode event detail 'workerNodeKey' property must be defined" ) } + const workerNodeInfo = this.getWorkerInfo(workerNodeKey) + if (workerNodeInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey.toString()}' not found in pool` + ) + } + const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( this.cannotStealTask() || (this.info.stealingWorkerNodes ?? 0) > @@ -1965,37 +1967,36 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions!.tasksStealingRatio! ) ) { - if (previousStolenTask != null) { + workerNodeInfo.continuousStealing = false + if (workerNodeTasksUsage.sequentiallyStolen > 0) { this.resetTaskSequentiallyStolenStatisticsWorkerUsage( workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - previousStolenTask.name! + previousStolenTask?.name ) } return } - const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( - previousStolenTask != null && - (workerNodeTasksUsage.executing > 0 || - this.tasksQueueSize(workerNodeKey) > 0) + workerNodeInfo.continuousStealing || + workerNodeTasksUsage.executing > 0 || + this.tasksQueueSize(workerNodeKey) > 0 ) { - this.resetTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - previousStolenTask.name! - ) + workerNodeInfo.continuousStealing = false + if (workerNodeTasksUsage.sequentiallyStolen > 0) { + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + previousStolenTask?.name + ) + } return } + workerNodeInfo.continuousStealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) - if (stolenTask != null) { - this.updateTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - stolenTask.name!, - previousStolenTask?.name - ) - } + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + stolenTask?.name, + previousStolenTask?.name + ) sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) @@ -2123,7 +2124,7 @@ export abstract class AbstractPool< } private handleTaskExecutionResponse (message: MessageValue): void { - const { workerId, taskId, workerError, data } = message + const { taskId, workerError, data } = message // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { @@ -2147,8 +2148,7 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.promiseResponseMap.delete(taskId!) - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - workerNode?.emit('taskFinished', taskId) + workerNode.emit('taskFinished', taskId) if ( this.opts.enableTasksQueue === true && !this.destroying && @@ -2167,11 +2167,9 @@ export abstract class AbstractPool< } if ( workerNodeTasksUsage.executing === 0 && - this.tasksQueueSize(workerNodeKey) === 0 && - workerNodeTasksUsage.sequentiallyStolen === 0 + this.tasksQueueSize(workerNodeKey) === 0 ) { workerNode.emit('idle', { - workerId, workerNodeKey, }) } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 416d6961..02a0c795 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -88,10 +88,8 @@ export interface PoolInfo { readonly utilization?: number /** Pool total worker nodes. */ readonly workerNodes: number - /** Pool stealing worker nodes. */ + /** Pool continuous stealing worker nodes. */ readonly stealingWorkerNodes?: number - /** Pool stolen worker nodes. */ - readonly stolenWorkerNodes?: number /** Pool idle worker nodes. */ readonly idleWorkerNodes: number /** Pool busy worker nodes. */ diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index c79fa389..dbc27fa5 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -221,6 +221,7 @@ export class WorkerNode ready: false, stealing: false, stolen: false, + continuousStealing: false, backPressure: false, } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 956e1f0b..a0742c28 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -165,7 +165,7 @@ export interface WorkerInfo { ready: boolean /** * Stealing flag. - * This flag is set to `true` when worker node is stealing tasks from another worker node. + * This flag is set to `true` when worker node is stealing one task from another worker node. */ stealing: boolean /** @@ -173,6 +173,11 @@ export interface WorkerInfo { * This flag is set to `true` when worker node has one task stolen from another worker node. */ stolen: boolean + /** + * Continuous stealing flag. + * This flag is set to `true` when worker node continuously steal tasks from other worker nodes. + */ + continuousStealing: boolean /** * Back pressure flag. * This flag is set to `true` when worker node tasks queue has back pressure. diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 04c87246..2f5e6437 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -873,6 +873,7 @@ describe('Abstract pool test suite', () => { ready: true, stealing: false, stolen: false, + continuousStealing: false, backPressure: false, }) } @@ -891,6 +892,7 @@ describe('Abstract pool test suite', () => { ready: true, stealing: false, stolen: false, + continuousStealing: false, backPressure: false, }) } @@ -1259,9 +1261,8 @@ describe('Abstract pool test suite', () => { minSize: expect.any(Number), maxSize: expect.any(Number), workerNodes: expect.any(Number), - idleWorkerNodes: expect.any(Number), stealingWorkerNodes: expect.any(Number), - stolenWorkerNodes: expect.any(Number), + idleWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 1ebe0caf..c27339ac 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -242,6 +242,7 @@ describe('Worker node test suite', () => { ready: false, stealing: false, stolen: false, + continuousStealing: false, backPressure: false, }) expect(threadWorkerNode.usage).toStrictEqual({ @@ -302,6 +303,7 @@ describe('Worker node test suite', () => { ready: false, stealing: false, stolen: false, + continuousStealing: false, backPressure: false, }) expect(clusterWorkerNode.usage).toStrictEqual({ -- 2.34.1