From 94407def0f4cd4356982ce5144a6108aec9a1ff9 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 19 Aug 2023 18:54:33 +0200 Subject: [PATCH] feat: untangle worker choice strategies tasks distribution and dynamic worker creation usage MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++ src/pools/abstract-pool.ts | 61 ++++++++++++------- .../round-robin-worker-choice-strategy.ts | 2 +- tests/pools/abstract/abstract-pool.test.js | 4 +- tests/pools/cluster/dynamic.test.js | 4 +- tests/pools/cluster/fixed.test.js | 6 +- .../selection-strategies.test.js | 10 ++- tests/pools/thread/dynamic.test.js | 4 +- tests/pools/thread/fixed.test.js | 6 +- 9 files changed, 65 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3128d3fd..8adad1e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix worker choice strategy retries mechanism on some edge cases. +### Changed + +- Make orthogonal worker choice strategies tasks distribution and dynamic worker creation usage. + ## [2.6.30] - 2023-08-19 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 59466274..428dd912 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -709,7 +709,7 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo if ( name != null && Array.isArray(workerInfo.taskFunctions) && @@ -805,15 +805,24 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage - ++taskFunctionWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + if (taskFunctionWorkerUsage != null) { + ++taskFunctionWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + } } } @@ -828,11 +837,18 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) - if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + message.taskPerformance?.name ?? DEFAULT_TASK_NAME + ) != null + ) { const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey ].getTaskFunctionWorkerUsage( @@ -853,6 +869,7 @@ export abstract class AbstractPool< private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( + workerInfo != null && Array.isArray(workerInfo.taskFunctions) && workerInfo.taskFunctions.length > 2 ) @@ -1002,7 +1019,7 @@ export abstract class AbstractPool< worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', (error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) @@ -1056,7 +1073,7 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number @@ -1121,7 +1138,7 @@ export abstract class AbstractPool< elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number }) } @@ -1131,7 +1148,7 @@ export abstract class AbstractPool< let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) + const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo if ( workerNodeId !== workerNodeKey && workerInfo.ready && @@ -1185,8 +1202,10 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (message.taskFunctions != null) { // Task functions message received from worker - this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) + ( + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ) as WorkerInfo ).taskFunctions = message.taskFunctions } } @@ -1198,7 +1217,7 @@ export abstract class AbstractPool< } const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ) + ) as WorkerInfo workerInfo.ready = message.ready as boolean workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { @@ -1260,8 +1279,8 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { + return this.workerNodes[workerNodeKey]?.info } /** diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 10bdf871..41bc4085 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -24,7 +24,7 @@ export class RoundRobinWorkerChoiceStrategy< implements IWorkerChoiceStrategy { /** @inheritDoc */ public readonly strategyPolicy: StrategyPolicy = { - dynamicWorkerUsage: true, + dynamicWorkerUsage: false, dynamicWorkerReady: true } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 03bcce05..64e8606b 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -756,7 +756,9 @@ describe('Abstract pool test suite', () => { } }) expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) - expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.runTime.history.length).toBe(0) expect(workerNode.usage.waitTime.history.length).toBe(0) expect(workerNode.usage.elu.idle.history.length).toBe(0) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index c91447ee..166b546d 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -34,9 +34,7 @@ describe('Dynamic cluster pool test suite', () => { } expect(pool.workerNodes.length).toBeLessThanOrEqual(max) expect(pool.workerNodes.length).toBeGreaterThan(min) - // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. - // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. - expect(poolBusy).toBe(max + 1) + expect(poolBusy).toBe(1) const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min) expect(numberOfExitEvents).toBe(max - min) }) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 3baaa98a..a5790db0 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -92,12 +92,14 @@ describe('Fixed cluster pool test suite', () => { expect(poolReady).toBe(1) }) - it("Verify that 'busy' event is emitted", () => { + it("Verify that 'busy' event is emitted", async () => { + const promises = new Set() let poolBusy = 0 pool.emitter.on(PoolEvents.busy, () => ++poolBusy) for (let i = 0; i < numberOfWorkers * 2; i++) { - pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool. expect(poolBusy).toBe(numberOfWorkers + 1) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 8f09c502..0e47e18e 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -123,7 +123,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ - dynamicWorkerUsage: true, + dynamicWorkerUsage: false, dynamicWorkerReady: true }) await pool.destroy() @@ -134,7 +134,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ - dynamicWorkerUsage: true, + dynamicWorkerUsage: false, dynamicWorkerReady: true }) // We need to clean up the resources after our test @@ -261,7 +261,7 @@ describe('Selection strategies test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, @@ -282,6 +282,10 @@ describe('Selection strategies test suite', () => { } } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index f85d44ff..ea9964ca 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -34,9 +34,7 @@ describe('Dynamic thread pool test suite', () => { } expect(pool.workerNodes.length).toBeLessThanOrEqual(max) expect(pool.workerNodes.length).toBeGreaterThan(min) - // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. - // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. - expect(poolBusy).toBe(max + 1) + expect(poolBusy).toBe(1) const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min) expect(numberOfExitEvents).toBe(max - min) }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index aeebc4a7..e3ee0f65 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -92,12 +92,14 @@ describe('Fixed thread pool test suite', () => { expect(poolReady).toBe(1) }) - it("Verify that 'busy' event is emitted", () => { + it("Verify that 'busy' event is emitted", async () => { + const promises = new Set() let poolBusy = 0 pool.emitter.on(PoolEvents.busy, () => ++poolBusy) for (let i = 0; i < numberOfThreads * 2; i++) { - pool.execute() + promises.add(pool.execute()) } + await Promise.all(promises) // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers. // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool. expect(poolBusy).toBe(numberOfThreads + 1) -- 2.34.1