From e25f86b30763ea5c2e5fc6c0ef16818b7e4efe83 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 18 Jul 2024 19:53:08 +0200 Subject: [PATCH] feat: add ratio of worker nodes in a pool allowed to perform concurrent tasks stealing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit closes #2284 Signed-off-by: Jérôme Benoit --- .../typescript/http-client-pool/package.json | 2 +- .../express-cluster/package.json | 2 +- .../express-hybrid/package.json | 2 +- .../express-worker_threads/package.json | 2 +- .../fastify-cluster/package.json | 2 +- .../fastify-hybrid/package.json | 2 +- .../fastify-worker_threads/package.json | 2 +- .../typescript/smtp-client-pool/package.json | 2 +- .../ws-cluster/package.json | 2 +- .../ws-hybrid/package.json | 2 +- .../ws-worker_threads/package.json | 2 +- package.json | 2 +- src/pools/abstract-pool.ts | 20 +++++-- src/pools/pool.ts | 5 ++ src/pools/utils.ts | 18 +++++++ tests/pools/abstract-pool.test.mjs | 53 ++++++++++++++++++- tests/pools/utils.test.mjs | 1 + typedoc.mjs | 2 +- 18 files changed, 104 insertions(+), 19 deletions(-) diff --git a/examples/typescript/http-client-pool/package.json b/examples/typescript/http-client-pool/package.json index 7e155913..17684135 100644 --- a/examples/typescript/http-client-pool/package.json +++ b/examples/typescript/http-client-pool/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/http-server-pool/express-cluster/package.json b/examples/typescript/http-server-pool/express-cluster/package.json index e5c0bf96..7c7d79a4 100644 --- a/examples/typescript/http-server-pool/express-cluster/package.json +++ b/examples/typescript/http-server-pool/express-cluster/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/http-server-pool/express-hybrid/package.json b/examples/typescript/http-server-pool/express-hybrid/package.json index 7425b13c..c2de18c3 100644 --- a/examples/typescript/http-server-pool/express-hybrid/package.json +++ b/examples/typescript/http-server-pool/express-hybrid/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/http-server-pool/express-worker_threads/package.json b/examples/typescript/http-server-pool/express-worker_threads/package.json index f960c001..a37d7b49 100644 --- a/examples/typescript/http-server-pool/express-worker_threads/package.json +++ b/examples/typescript/http-server-pool/express-worker_threads/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/http-server-pool/fastify-cluster/package.json b/examples/typescript/http-server-pool/fastify-cluster/package.json index b2504537..05328c50 100644 --- a/examples/typescript/http-server-pool/fastify-cluster/package.json +++ b/examples/typescript/http-server-pool/fastify-cluster/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/http-server-pool/fastify-hybrid/package.json b/examples/typescript/http-server-pool/fastify-hybrid/package.json index 8690f376..fa98ba54 100644 --- a/examples/typescript/http-server-pool/fastify-hybrid/package.json +++ b/examples/typescript/http-server-pool/fastify-hybrid/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/http-server-pool/fastify-worker_threads/package.json b/examples/typescript/http-server-pool/fastify-worker_threads/package.json index 3b271745..d8dd863d 100644 --- a/examples/typescript/http-server-pool/fastify-worker_threads/package.json +++ b/examples/typescript/http-server-pool/fastify-worker_threads/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/smtp-client-pool/package.json b/examples/typescript/smtp-client-pool/package.json index 66564b61..10df87dc 100644 --- a/examples/typescript/smtp-client-pool/package.json +++ b/examples/typescript/smtp-client-pool/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/websocket-server-pool/ws-cluster/package.json b/examples/typescript/websocket-server-pool/ws-cluster/package.json index 1e5252df..29b27b5c 100644 --- a/examples/typescript/websocket-server-pool/ws-cluster/package.json +++ b/examples/typescript/websocket-server-pool/ws-cluster/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/websocket-server-pool/ws-hybrid/package.json b/examples/typescript/websocket-server-pool/ws-hybrid/package.json index 336d80a1..adac296b 100644 --- a/examples/typescript/websocket-server-pool/ws-hybrid/package.json +++ b/examples/typescript/websocket-server-pool/ws-hybrid/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/examples/typescript/websocket-server-pool/ws-worker_threads/package.json b/examples/typescript/websocket-server-pool/ws-worker_threads/package.json index 67e32a12..6d49166d 100644 --- a/examples/typescript/websocket-server-pool/ws-worker_threads/package.json +++ b/examples/typescript/websocket-server-pool/ws-worker_threads/package.json @@ -6,7 +6,7 @@ "main": "dist/main.js", "type": "module", "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/package.json b/package.json index 176f2315..dac0b9e4 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ "pnpm": ">=9.0.0" }, "volta": { - "node": "22.4.1", + "node": "22.5.0", "pnpm": "9.5.0" }, "packageManager": "pnpm@9.5.0", diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index defc41a0..08c5d0a7 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -713,7 +713,10 @@ export abstract class AbstractPool< ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) if (workerChoiceStrategyOptions != null) { - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...this.opts.workerChoiceStrategyOptions, + ...workerChoiceStrategyOptions, + } this.workerChoiceStrategiesContext?.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -777,6 +780,7 @@ export abstract class AbstractPool< ...getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers ), + ...this.opts.tasksQueueOptions, ...tasksQueueOptions, } } @@ -1402,7 +1406,7 @@ export abstract class AbstractPool< // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing updateWaitTimeWorkerUsage( @@ -1902,7 +1906,11 @@ export abstract class AbstractPool< if ( this.cannotStealTask() || (this.info.stealingWorkerNodes ?? 0) > - Math.floor(this.workerNodes.length / 2) + Math.round( + this.workerNodes.length * + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.tasksStealingRatio! + ) ) { if (previousStolenTask != null) { workerInfo.stealing = false @@ -1981,7 +1989,11 @@ export abstract class AbstractPool< this.cannotStealTask() || this.hasBackPressure() || (this.info.stealingWorkerNodes ?? 0) > - Math.floor(this.workerNodes.length / 2) + Math.round( + this.workerNodes.length * + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.tasksStealingRatio! + ) ) { return } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 0df7549a..226c2fb4 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -157,6 +157,11 @@ export interface TasksQueueOptions { * @defaultValue false */ readonly tasksStealingOnBackPressure?: boolean + /** + * Ratio of worker nodes that can steal tasks from another worker node. + * @defaultValue 0.6 + */ + readonly tasksStealingRatio?: number /** * Queued tasks finished timeout in milliseconds at worker node termination. * @defaultValue 2000 diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 24f2cfb2..b652488a 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -44,6 +44,7 @@ export const getDefaultTasksQueueOptions = ( concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.6, tasksFinishedTimeout: 2000, } } @@ -146,6 +147,23 @@ export const checkValidTasksQueueOptions = ( `Invalid worker node tasks queue size: ${tasksQueueOptions.size.toString()} is a negative integer or zero` ) } + if ( + tasksQueueOptions?.tasksStealingRatio != null && + typeof tasksQueueOptions.tasksStealingRatio !== 'number' + ) { + throw new TypeError( + 'Invalid worker node tasks stealing ratio: must be a number' + ) + } + if ( + tasksQueueOptions?.tasksStealingRatio != null && + (tasksQueueOptions.tasksStealingRatio < 0 || + tasksQueueOptions.tasksStealingRatio > 1) + ) { + throw new RangeError( + 'Invalid worker node tasks stealing ratio: must be between 0 and 1' + ) + } } export const checkWorkerNodeArguments = ( diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 1f9213f3..421a2498 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -277,6 +277,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.6, tasksFinishedTimeout: 2000, }, workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED, @@ -439,6 +440,36 @@ describe('Abstract pool test suite', () => { ).toThrow( new TypeError('Invalid worker node tasks queue size: must be an integer') ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksStealingRatio: '' }, + } + ) + ).toThrow( + new TypeError( + 'Invalid worker node tasks stealing ratio: must be a number' + ) + ) + expect( + () => + new FixedThreadPool( + numberOfWorkers, + './tests/worker-files/thread/testWorker.mjs', + { + enableTasksQueue: true, + tasksQueueOptions: { tasksStealingRatio: 1.1 }, + } + ) + ).toThrow( + new RangeError( + 'Invalid worker node tasks stealing ratio: must be between 0 and 1' + ) + ) }) it('Verify that pool worker choice strategy options can be set', async () => { @@ -593,6 +624,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.6, tasksFinishedTimeout: 2000, }) pool.enableTasksQueue(true, { concurrency: 2 }) @@ -602,6 +634,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.6, tasksFinishedTimeout: 2000, }) pool.enableTasksQueue(false) @@ -621,6 +654,7 @@ describe('Abstract pool test suite', () => { size: Math.pow(numberOfWorkers, 2), taskStealing: true, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.6, tasksFinishedTimeout: 2000, }) for (const workerNode of pool.workerNodes) { @@ -633,6 +667,7 @@ describe('Abstract pool test suite', () => { size: 2, taskStealing: false, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.5, tasksFinishedTimeout: 3000, }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ @@ -640,6 +675,7 @@ describe('Abstract pool test suite', () => { size: 2, taskStealing: false, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.5, tasksFinishedTimeout: 3000, }) for (const workerNode of pool.workerNodes) { @@ -654,10 +690,11 @@ describe('Abstract pool test suite', () => { }) expect(pool.opts.tasksQueueOptions).toStrictEqual({ concurrency: 1, - size: Math.pow(numberOfWorkers, 2), + size: 2, taskStealing: true, tasksStealingOnBackPressure: true, - tasksFinishedTimeout: 2000, + tasksStealingRatio: 0.5, + tasksFinishedTimeout: 3000, }) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksQueueBackPressureSize).toBe( @@ -693,6 +730,18 @@ describe('Abstract pool test suite', () => { expect(() => pool.setTasksQueueOptions({ size: 0.2 })).toThrow( new TypeError('Invalid worker node tasks queue size: must be an integer') ) + expect(() => pool.setTasksQueueOptions({ tasksStealingRatio: '' })).toThrow( + new TypeError( + 'Invalid worker node tasks stealing ratio: must be a number' + ) + ) + expect(() => + pool.setTasksQueueOptions({ tasksStealingRatio: 1.1 }) + ).toThrow( + new TypeError( + 'Invalid worker node tasks stealing ratio: must be between 0 and 1' + ) + ) await pool.destroy() }) diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index 21ddaa77..e79294a2 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -31,6 +31,7 @@ describe('Pool utils test suite', () => { size: Math.pow(poolMaxSize, 2), taskStealing: true, tasksStealingOnBackPressure: false, + tasksStealingRatio: 0.6, tasksFinishedTimeout: 2000, }) }) diff --git a/typedoc.mjs b/typedoc.mjs index 0b0c7b46..d139d4e8 100644 --- a/typedoc.mjs +++ b/typedoc.mjs @@ -16,7 +16,7 @@ try { join(dirname(fileURLToPath(import.meta.url)), 'tmp', markdownFile) ) } - execSync('pnpm dlx typedoc', { stdio: 'inherit' }) + execSync('pnpm exec typedoc', { stdio: 'inherit' }) for (const markdownFile of markdownFiles) { copyFileSync( join(dirname(fileURLToPath(import.meta.url)), 'tmp', markdownFile), -- 2.34.1