From 5eb72b9e26eaffb43c67147fbc6b4d2b1b959d62 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 24 Dec 2023 12:08:02 +0100 Subject: [PATCH] fix: fix worker node cross tasks stealing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 5 ++++ src/pools/abstract-pool.ts | 40 +++++++++++++++++++++++----- src/pools/pool.ts | 2 ++ src/pools/worker-node.ts | 3 ++- src/pools/worker.ts | 5 ++++ tests/pools/abstract-pool.test.mjs | 9 ++++--- tests/pools/worker-node.test.mjs | 6 +++-- tests/worker/cluster-worker.test.mjs | 25 +++++++++-------- tests/worker/thread-worker.test.mjs | 18 +++++-------- 9 files changed, 76 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 188ce7a3..0511b419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Avoid worker node cross tasks stealing. +- Ensure only half the pool worker nodes can steal tasks. + ## [3.1.10] - 2023-12-23 ### Changed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index cfef89e3..c8d45511 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -298,6 +298,13 @@ export abstract class AbstractPool< : accumulator, 0 ), + ...(this.opts.enableTasksQueue === true && { + stealingWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.stealing ? accumulator + 1 : accumulator, + 0 + ) + }), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _workerNode, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, @@ -1397,6 +1404,10 @@ export abstract class AbstractPool< }) } + private cannotStealTask (): boolean { + return this.workerNodes.length <= 1 || this.info.queuedTasks === 0 + } + private handleTask (workerNodeKey: number, task: Task): void { if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) @@ -1409,7 +1420,7 @@ export abstract class AbstractPool< if (workerNodeKey === -1) { return } - if (this.workerNodes.length <= 1) { + if (this.cannotStealTask()) { return } while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -1503,15 +1514,22 @@ export abstract class AbstractPool< eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - if (this.workerNodes.length <= 1) { - return - } const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey attribute must be defined' + 'WorkerNode event detail workerNodeKey property must be defined' ) } + if ( + this.cannotStealTask() || + (this.info.stealingWorkerNodes as number) > + Math.floor(this.workerNodes.length / 2) + ) { + if (previousStolenTask != null) { + this.getWorkerInfo(workerNodeKey).stealing = false + } + return + } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( previousStolenTask != null && @@ -1519,6 +1537,7 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { + this.getWorkerInfo(workerNodeKey).stealing = false for (const taskName of this.workerNodes[workerNodeKey].info .taskFunctionNames as string[]) { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( @@ -1529,6 +1548,7 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } + this.getWorkerInfo(workerNodeKey).stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1575,6 +1595,7 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( (sourceWorkerNode, sourceWorkerNodeKey) => sourceWorkerNode.info.ready && + !sourceWorkerNode.info.stealing && sourceWorkerNodeKey !== workerNodeKey && sourceWorkerNode.usage.tasks.queued > 0 ) @@ -1593,7 +1614,11 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { - if (this.workerNodes.length <= 1) { + if ( + this.cannotStealTask() || + (this.info.stealingWorkerNodes as number) > + Math.floor(this.workerNodes.length / 2) + ) { return } const { workerId } = eventDetail @@ -1613,16 +1638,19 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && + !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { + this.getWorkerInfo(workerNodeKey).stealing = true const task = sourceWorkerNode.popTask() as Task this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string ) + this.getWorkerInfo(workerNodeKey).stealing = false } } } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 6058a3de..08aad620 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -69,6 +69,8 @@ export interface PoolInfo { readonly utilization?: number /** Pool total worker nodes. */ readonly workerNodes: number + /** Pool stealing worker nodes. */ + readonly stealingWorkerNodes?: number /** Pool idle worker nodes. */ readonly idleWorkerNodes: number /** Pool busy worker nodes. */ diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index c9ff0e70..d2397676 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -214,7 +214,8 @@ export class WorkerNode id: getWorkerId(worker), type: getWorkerType(worker) as WorkerType, dynamic: false, - ready: false + ready: false, + stealing: false } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 9a27d8a5..9c8b77fc 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -154,6 +154,11 @@ export interface WorkerInfo { * Ready flag. */ ready: boolean + /** + * Stealing flag. + * This flag is set to `true` when worker node is stealing tasks from another worker node. + */ + stealing: boolean /** * Task function names. */ diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 1e6bf3ad..cf4b55af 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -882,7 +882,8 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.cluster, dynamic: false, - ready: true + ready: true, + stealing: false }) } await pool.destroy() @@ -897,7 +898,8 @@ describe('Abstract pool test suite', () => { id: expect.any(Number), type: WorkerTypes.thread, dynamic: false, - ready: true + ready: true, + stealing: false }) } await pool.destroy() @@ -1269,6 +1271,7 @@ describe('Abstract pool test suite', () => { maxSize: expect.any(Number), workerNodes: expect.any(Number), idleWorkerNodes: expect.any(Number), + stealingWorkerNodes: expect.any(Number), busyWorkerNodes: expect.any(Number), executedTasks: expect.any(Number), executingTasks: expect.any(Number), @@ -1278,7 +1281,7 @@ describe('Abstract pool test suite', () => { stolenTasks: expect.any(Number), failedTasks: expect.any(Number) }) - expect(pool.hasBackPressure.callCount).toBe(5) + expect(pool.hasBackPressure.callCount).toBeGreaterThanOrEqual(7) await pool.destroy() }) diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 3b87a848..f22ea153 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -124,7 +124,8 @@ describe('Worker node test suite', () => { id: threadWorkerNode.worker.threadId, type: WorkerTypes.thread, dynamic: false, - ready: false + ready: false, + stealing: false }) expect(threadWorkerNode.usage).toStrictEqual({ tasks: { @@ -167,7 +168,8 @@ describe('Worker node test suite', () => { id: clusterWorkerNode.worker.id, type: WorkerTypes.cluster, dynamic: false, - ready: false + ready: false, + stealing: false }) expect(clusterWorkerNode.usage).toStrictEqual({ tasks: { diff --git a/tests/worker/cluster-worker.test.mjs b/tests/worker/cluster-worker.test.mjs index 8ceb403d..b785010e 100644 --- a/tests/worker/cluster-worker.test.mjs +++ b/tests/worker/cluster-worker.test.mjs @@ -4,13 +4,6 @@ import { ClusterWorker } from '../../lib/index.js' import { DEFAULT_TASK_NAME } from '../../lib/utils.js' describe('Cluster worker test suite', () => { - const sendStub = stub().returns() - class SpyWorker extends ClusterWorker { - getMainWorker () { - return { send: sendStub } - } - } - afterEach(() => { restore() }) @@ -25,6 +18,7 @@ describe('Cluster worker test suite', () => { send: stub().returns() }) worker.handleKillMessage() + expect(worker.getMainWorker.calledTwice).toBe(true) expect(worker.getMainWorker().send.calledOnce).toBe(true) expect(worker.opts.killHandler.calledOnce).toBe(true) }) @@ -37,6 +31,10 @@ describe('Cluster worker test suite', () => { return 2 } const worker = new ClusterWorker({ fn1, fn2 }) + worker.getMainWorker = stub().returns({ + id: 1, + send: stub().returns() + }) expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({ status: false, error: new TypeError('name parameter is not a string') @@ -45,10 +43,6 @@ describe('Cluster worker test suite', () => { status: false, error: new TypeError('name parameter is an empty string') }) - worker.getMainWorker = stub().returns({ - id: 1, - send: stub().returns() - }) expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) @@ -73,6 +67,7 @@ describe('Cluster worker test suite', () => { expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn2')).toBeUndefined() expect(worker.taskFunctions.size).toBe(2) + expect(worker.getMainWorker.calledTwice).toBe(true) expect(worker.getMainWorker().send.calledOnce).toBe(true) }) @@ -85,9 +80,13 @@ describe('Cluster worker test suite', () => { expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage) }) - it('Verify worker invokes the getMainWorker() and send() methods', () => { - const worker = new SpyWorker(() => {}) + it('Verify that sendToMainWorker() method invokes the getMainWorker() and send() methods', () => { + const worker = new ClusterWorker(() => {}) + worker.getMainWorker = stub().returns({ + send: stub().returns() + }) worker.sendToMainWorker({ ok: 1 }) + expect(worker.getMainWorker.calledTwice).toBe(true) expect(worker.getMainWorker().send.calledOnce).toBe(true) }) }) diff --git a/tests/worker/thread-worker.test.mjs b/tests/worker/thread-worker.test.mjs index 26da45e1..81b096f2 100644 --- a/tests/worker/thread-worker.test.mjs +++ b/tests/worker/thread-worker.test.mjs @@ -4,13 +4,6 @@ import { ThreadWorker } from '../../lib/index.js' import { DEFAULT_TASK_NAME } from '../../lib/utils.js' describe('Thread worker test suite', () => { - class SpyWorker extends ThreadWorker { - constructor (fn) { - super(fn) - this.port = { postMessage: stub().returns() } - } - } - afterEach(() => { restore() }) @@ -40,6 +33,9 @@ describe('Thread worker test suite', () => { return 2 } const worker = new ThreadWorker({ fn1, fn2 }) + worker.port = { + postMessage: stub().returns() + } expect(worker.removeTaskFunction(0, fn1)).toStrictEqual({ status: false, error: new TypeError('name parameter is not a string') @@ -48,9 +44,6 @@ describe('Thread worker test suite', () => { status: false, error: new TypeError('name parameter is an empty string') }) - worker.port = { - postMessage: stub().returns() - } expect(worker.taskFunctions.get(DEFAULT_TASK_NAME)).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn1')).toBeInstanceOf(Function) expect(worker.taskFunctions.get('fn2')).toBeInstanceOf(Function) @@ -87,8 +80,9 @@ describe('Thread worker test suite', () => { expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage) }) - it('Verify worker invokes the postMessage() method on port property', () => { - const worker = new SpyWorker(() => {}) + it('Verify that sendToMainWorker() method invokes the port property postMessage() method', () => { + const worker = new ThreadWorker(() => {}) + worker.port = { postMessage: stub().returns() } worker.sendToMainWorker({ ok: 1 }) expect(worker.port.postMessage.calledOnce).toBe(true) }) -- 2.34.1