From 9a55fa8c3756e2e3628a85adba8b5c13b542de84 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 6 May 2024 15:07:14 +0200 Subject: [PATCH] fix: sync worker task performance measurement requirements MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++ src/pools/abstract-pool.ts | 35 +++++++++-------- tests/pools/abstract-pool.test.mjs | 62 +++++++++++++++++++++++------- 3 files changed, 71 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53632f35..a8d80251 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure poolifier worker task performance measurement requirements are synchronized with task function objects' worker choice strategies. + ## [4.0.1] - 2024-05-02 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 677d33b6..07fec75f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -868,6 +868,9 @@ export abstract class AbstractPool< this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -885,11 +888,16 @@ export abstract class AbstractPool< this.taskFunctions.get(name) ) }) - this.deleteTaskFunctionWorkerUsages(name) + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } this.taskFunctions.delete(name) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -974,12 +982,6 @@ export abstract class AbstractPool< }) } - private deleteTaskFunctionWorkerUsages (name: string): void { - for (const workerNode of this.workerNodes) { - workerNode.deleteTaskFunctionWorkerUsage(name) - } - } - private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -1236,7 +1238,7 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - let needWorkerChoiceStrategyUpdate = false + let needWorkerChoiceStrategiesUpdate = false // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage @@ -1251,7 +1253,7 @@ export abstract class AbstractPool< workerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1276,9 +1278,9 @@ export abstract class AbstractPool< taskFunctionWorkerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } - if (needWorkerChoiceStrategyUpdate) { + if (needWorkerChoiceStrategiesUpdate) { this.workerChoiceStrategiesContext?.update(workerNodeKey) } } @@ -1883,11 +1885,11 @@ export abstract class AbstractPool< this.handleWorkerReadyResponse(message) } else if (taskFunctionsProperties != null) { // Task function properties message received from worker - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -1907,10 +1909,11 @@ export abstract class AbstractPool< if (ready == null || !ready) { throw new Error(`Worker ${workerId} failed to initialize`) } - const workerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) this.checkAndEmitReadyEvent() } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 42e2959b..3158166b 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1453,21 +1453,55 @@ describe('Abstract pool test suite', () => { waitTime: { history: new CircularArray() }, - elu: { - idle: { - aggregate: 0, - maximum: 0, - minimum: 0, - history: new CircularArray() - }, - active: { - aggregate: 0, - maximum: 0, - minimum: 0, - history: new CircularArray() - } - } + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed + ).toBeGreaterThan(0) + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate == + null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeGreaterThan(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeGreaterThanOrEqual(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeGreaterThanOrEqual(0) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeLessThanOrEqual(1) + } } await dynamicThreadPool.destroy() }) -- 2.34.1