From d6ca14169b6d2e9ad12d438a7acca797ae683e7e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 17 Sep 2023 15:06:22 +0200 Subject: [PATCH] fix: fix task stealing related options handling at runtime MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .c8rc.json | 2 +- src/pools/abstract-pool.ts | 38 ++++++++++++++++++ tests/pools/abstract/abstract-pool.test.js | 45 +++++++++++++++++++++- 3 files changed, 83 insertions(+), 2 deletions(-) diff --git a/.c8rc.json b/.c8rc.json index 29100bb2..aba3e9b0 100644 --- a/.c8rc.json +++ b/.c8rc.json @@ -3,5 +3,5 @@ "lines": 94, "statements": 94, "functions": 95, - "branches": 92 + "branches": 93 } diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9aa6ec3d..eb4c415b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -630,6 +630,8 @@ export abstract class AbstractPool< tasksQueueOptions?: TasksQueueOptions ): void { if (this.opts.enableTasksQueue === true && !enable) { + this.unsetTaskStealing() + this.unsetTasksStealingOnBackPressure() this.flushTasksQueues() } this.opts.enableTasksQueue = enable @@ -643,6 +645,16 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + if (this.opts.tasksQueueOptions.taskStealing === true) { + this.setTaskStealing() + } else { + this.unsetTaskStealing() + } + if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.setTasksStealingOnBackPressure() + } else { + this.unsetTasksStealingOnBackPressure() + } } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } @@ -654,6 +666,32 @@ export abstract class AbstractPool< } } + private setTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + } + } + + private unsetTaskStealing (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + delete this.workerNodes[workerNodeKey].onEmptyQueue + } + } + + private setTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } + } + + private unsetTasksStealingOnBackPressure (): void { + for (const [workerNodeKey] of this.workerNodes.entries()) { + delete this.workerNodes[workerNodeKey].onBackPressure + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 719acd76..aeede25d 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -630,6 +630,10 @@ describe('Abstract pool test suite', () => { ) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } pool.enableTasksQueue(true) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -638,6 +642,10 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } pool.enableTasksQueue(true, { concurrency: 2 }) expect(pool.opts.enableTasksQueue).toBe(true) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -646,9 +654,17 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } pool.enableTasksQueue(false) expect(pool.opts.enableTasksQueue).toBe(false) expect(pool.opts.tasksQueueOptions).toBeUndefined() + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } await pool.destroy() }) @@ -664,13 +680,40 @@ describe('Abstract pool test suite', () => { taskStealing: true, tasksStealingOnBackPressure: true }) - pool.setTasksQueueOptions({ concurrency: 2 }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } + pool.setTasksQueueOptions({ + concurrency: 2, + taskStealing: false, + tasksStealingOnBackPressure: false + }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 2, size: 4, + taskStealing: false, + tasksStealingOnBackPressure: false + }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeUndefined() + expect(workerNode.onBackPressure).toBeUndefined() + } + pool.setTasksQueueOptions({ + concurrency: 1, + taskStealing: true, + tasksStealingOnBackPressure: true + }) + expect(pool.opts.tasksQueueOptions).toStrictEqual({ + concurrency: 1, + size: 4, taskStealing: true, tasksStealingOnBackPressure: true }) + for (const workerNode of pool.workerNodes) { + expect(workerNode.onEmptyQueue).toBeInstanceOf(Function) + expect(workerNode.onBackPressure).toBeInstanceOf(Function) + } expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions') ).toThrowError( -- 2.34.1