From a5ed75b7c39de907a0047f4c30f2ea219ca4f917 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Fri, 21 Jul 2023 17:21:06 +0200 Subject: [PATCH] fix: ensure the task concurrency is respected at queued task redistribution MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 11 +++++++++-- tests/pools/abstract/abstract-pool.test.js | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1222d686..0d3d2183 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -173,7 +173,11 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { - if (!Number.isSafeInteger(max)) { + if (max == null) { + throw new Error( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + } else if (!Number.isSafeInteger(max)) { throw new TypeError( 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size' ) @@ -1008,7 +1012,10 @@ export abstract class AbstractPool< workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { - if (workerNode.usage.tasks.executing === 0) { + if ( + this.workerNodes[workerNodeId].usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { executeTask = true } targetWorkerNodeKey = workerNodeId diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 10736b6b..2c904362 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -86,6 +86,18 @@ describe('Abstract pool test suite', () => { }) it('Verify that dynamic pool sizing is checked', () => { + expect( + () => + new DynamicClusterPool( + 1, + undefined, + './tests/worker-files/cluster/testWorker.js' + ) + ).toThrowError( + new TypeError( + 'Cannot instantiate a dynamic pool without specifying the maximum pool size' + ) + ) expect( () => new DynamicThreadPool( @@ -103,7 +115,7 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 0, 0.5, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/cluster/testWorker.js' ) ).toThrowError( new TypeError( @@ -123,7 +135,7 @@ describe('Abstract pool test suite', () => { new DynamicClusterPool( 1, 1, - './tests/worker-files/thread/testWorker.js' + './tests/worker-files/cluster/testWorker.js' ) ).toThrowError( new RangeError( -- 2.34.1