From d9672350c04638ef28b10b6f480e8e7cd89256f0 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 19 Aug 2024 14:26:44 +0200 Subject: [PATCH] fix: account for all stealing worker nodes MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 12 ++++++++++-- src/pools/pool.ts | 2 +- src/pools/worker-node.ts | 1 + src/pools/worker.ts | 7 ++++++- tests/pools/abstract-pool.test.mjs | 2 ++ tests/pools/worker-node.test.mjs | 2 ++ 6 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7aae2c89..a65cbbbc 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -314,7 +314,10 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { stealingWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => - workerNode.info.continuousStealing ? accumulator + 1 : accumulator, + workerNode.info.continuousStealing || + workerNode.info.backPressureStealing + ? accumulator + 1 + : accumulator, 0 ), }), @@ -1672,7 +1675,7 @@ export abstract class AbstractPool< this.isWorkerNodeIdle(localWorkerNodeKey) && workerInfo != null && !workerInfo.continuousStealing && - !workerInfo.stealing) + !workerInfo.backPressureStealing) ) { // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) @@ -2061,7 +2064,12 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.opts.tasksQueueOptions!.size! - sizeOffset ) { + if (workerNode.info.backPressureStealing) { + continue + } + workerNode.info.backPressureStealing = true this.stealTask(sourceWorkerNode, workerNodeKey) + workerNode.info.backPressureStealing = false } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 02a0c795..27d897b2 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -88,7 +88,7 @@ export interface PoolInfo { readonly utilization?: number /** Pool total worker nodes. */ readonly workerNodes: number - /** Pool continuous stealing worker nodes. */ + /** Pool tasks stealing worker nodes. */ readonly stealingWorkerNodes?: number /** Pool idle worker nodes. */ readonly idleWorkerNodes: number diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index dbc27fa5..8748b414 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -222,6 +222,7 @@ export class WorkerNode stealing: false, stolen: false, continuousStealing: false, + backPressureStealing: false, backPressure: false, } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index a0742c28..3c4fe610 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -175,9 +175,14 @@ export interface WorkerInfo { stolen: boolean /** * Continuous stealing flag. - * This flag is set to `true` when worker node continuously steal tasks from other worker nodes. + * This flag is set to `true` when worker node is continuously stealing tasks from other worker nodes. */ continuousStealing: boolean + /** + * Back pressure stealing flag. + * This flag is set to `true` when worker node is stealing one task from another back pressured worker node. + */ + backPressureStealing: boolean /** * Back pressure flag. * This flag is set to `true` when worker node tasks queue has back pressure. diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 2f5e6437..5b4f0246 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -874,6 +874,7 @@ describe('Abstract pool test suite', () => { stealing: false, stolen: false, continuousStealing: false, + backPressureStealing: false, backPressure: false, }) } @@ -893,6 +894,7 @@ describe('Abstract pool test suite', () => { stealing: false, stolen: false, continuousStealing: false, + backPressureStealing: false, backPressure: false, }) } diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index c27339ac..69f1e15a 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -243,6 +243,7 @@ describe('Worker node test suite', () => { stealing: false, stolen: false, continuousStealing: false, + backPressureStealing: false, backPressure: false, }) expect(threadWorkerNode.usage).toStrictEqual({ @@ -304,6 +305,7 @@ describe('Worker node test suite', () => { stealing: false, stolen: false, continuousStealing: false, + backPressureStealing: false, backPressure: false, }) expect(clusterWorkerNode.usage).toStrictEqual({ -- 2.34.1