From eebfd819241181d59144f45b51f566fca0211fc5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 13 Aug 2024 17:43:09 +0200 Subject: [PATCH] fix: protect worker node tasks queue from concurrent tasks stealing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit The locking primitive is a mutex implemented with cheap boolean checks. In a monothreaded JS runtime, it should be enough. Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 64 ++++++++++++++++++------------ src/pools/pool.ts | 2 + src/pools/worker-node.ts | 1 + src/pools/worker.ts | 5 +++ tests/pools/abstract-pool.test.mjs | 3 ++ tests/pools/worker-node.test.mjs | 2 + 6 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 095c08a6..4c387477 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -325,6 +325,13 @@ export abstract class AbstractPool< 0 ), }), + ...(this.opts.enableTasksQueue === true && { + stolenWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.stolen ? accumulator + 1 : accumulator, + 0 + ), + }), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, @@ -1649,10 +1656,10 @@ export abstract class AbstractPool< ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && - workerInfo != null && - !workerInfo.stealing && workerUsage.tasks.executing === 0 && - this.tasksQueueSize(localWorkerNodeKey) === 0))) + this.tasksQueueSize(localWorkerNodeKey) === 0 && + workerInfo != null && + !workerInfo.stealing))) ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) @@ -1913,7 +1920,6 @@ export abstract class AbstractPool< ) ) { if (previousStolenTask != null) { - workerInfo.stealing = false this.resetTaskSequentiallyStolenStatisticsWorkerUsage( workerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1928,7 +1934,6 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - workerInfo.stealing = false this.resetTaskSequentiallyStolenStatisticsWorkerUsage( workerNodeKey, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1936,7 +1941,6 @@ export abstract class AbstractPool< ) return } - workerInfo.stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if (stolenTask != null) { this.updateTaskSequentiallyStolenStatisticsWorkerUsage( @@ -1956,6 +1960,31 @@ export abstract class AbstractPool< }) } + private readonly stealTask = ( + sourceWorkerNode: IWorkerNode, + destinationWorkerNodeKey: number + ): Task | undefined => { + const destinationWorkerInfo = this.getWorkerInfo(destinationWorkerNodeKey) + if (destinationWorkerInfo == null) { + throw new Error( + `Worker node with key '${destinationWorkerNodeKey.toString()}' not found in pool` + ) + } + destinationWorkerInfo.stealing = true + sourceWorkerNode.info.stolen = true + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! + sourceWorkerNode.info.stolen = false + destinationWorkerInfo.stealing = false + this.handleTask(destinationWorkerNodeKey, task) + this.updateTaskStolenStatisticsWorkerUsage( + destinationWorkerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + task.name! + ) + return task + } + private readonly workerNodeStealTask = ( workerNodeKey: number ): Task | undefined => { @@ -1968,17 +1997,13 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( (sourceWorkerNode, sourceWorkerNodeKey) => sourceWorkerNode.info.ready && + !sourceWorkerNode.info.stolen && !sourceWorkerNode.info.stealing && sourceWorkerNodeKey !== workerNodeKey && sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueLastPrioritizedTask()! - this.handleTask(workerNodeKey, task) - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) - return task + return this.stealTask(sourceWorkerNode, workerNodeKey) } } @@ -2015,25 +2040,14 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && + !workerNode.info.stolen && !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.size! - sizeOffset ) { - const workerInfo = this.getWorkerInfo(workerNodeKey) - if (workerInfo == null) { - throw new Error( - `Worker node with key '${workerNodeKey.toString()}' not found in pool` - ) - } - workerInfo.stealing = true - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.dequeueLastPrioritizedTask()! - this.handleTask(workerNodeKey, task) - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) - workerInfo.stealing = false + this.stealTask(sourceWorkerNode, workerNodeKey) } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 226c2fb4..42478ef3 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -90,6 +90,8 @@ export interface PoolInfo { readonly workerNodes: number /** Pool stealing worker nodes. */ readonly stealingWorkerNodes?: number + /** Pool stolen worker nodes. */ + readonly stolenWorkerNodes?: number /** Pool idle worker nodes. */ readonly idleWorkerNodes: number /** Pool busy worker nodes. */ diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index b42d8493..c79fa389 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -220,6 +220,7 @@ export class WorkerNode dynamic: false, ready: false, stealing: false, + stolen: false, backPressure: false, } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 9f5fc0c1..956e1f0b 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -168,6 +168,11 @@ export interface WorkerInfo { * This flag is set to `true` when worker node is stealing tasks from another worker node. */ stealing: boolean + /** + * Stolen flag. + * This flag is set to `true` when worker node has one task stolen from another worker node. + */ + stolen: boolean /** * Back pressure flag. * This flag is set to `true` when worker node tasks queue has back pressure. diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index ed49bbca..f8b983d4 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -872,6 +872,7 @@ describe('Abstract pool test suite', () => { dynamic: false, ready: true, stealing: false, + stolen: false, backPressure: false, }) } @@ -889,6 +890,7 @@ describe('Abstract pool test suite', () => { dynamic: false, ready: true, stealing: false, + stolen: false, backPressure: false, }) } @@ -1262,6 +1264,7 @@ describe('Abstract pool test suite', () => { workerNodes: expect.any(Number), idleWorkerNodes: expect.any(Number), stealingWorkerNodes: expect.any(Number), + stolenWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 4fea0e43..1ebe0caf 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -241,6 +241,7 @@ describe('Worker node test suite', () => { dynamic: false, ready: false, stealing: false, + stolen: false, backPressure: false, }) expect(threadWorkerNode.usage).toStrictEqual({ @@ -300,6 +301,7 @@ describe('Worker node test suite', () => { dynamic: false, ready: false, stealing: false, + stolen: false, backPressure: false, }) expect(clusterWorkerNode.usage).toStrictEqual({ -- 2.34.1