From: Jérôme Benoit Date: Mon, 6 May 2024 13:07:14 +0000 (+0200) Subject: fix: sync worker task performance measurement requirements X-Git-Tag: v4.0.2~5 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=9a55fa8c3756e2e3628a85adba8b5c13b542de84;p=poolifier.git fix: sync worker task performance measurement requirements Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 53632f35..a8d80251 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Ensure poolifier worker task performance measurement requirements are synchronized with task function objects' worker choice strategies. + ## [4.0.1] - 2024-05-02 ### Fixed diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 677d33b6..07fec75f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -868,6 +868,9 @@ export abstract class AbstractPool< this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -885,11 +888,16 @@ export abstract class AbstractPool< this.taskFunctions.get(name) ) }) - this.deleteTaskFunctionWorkerUsages(name) + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } this.taskFunctions.delete(name) this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( this.getWorkerWorkerChoiceStrategies() ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -974,12 +982,6 @@ export abstract class AbstractPool< }) } - private deleteTaskFunctionWorkerUsages (name: string): void { - for (const workerNode of this.workerNodes) { - workerNode.deleteTaskFunctionWorkerUsage(name) - } - } - private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && @@ -1236,7 +1238,7 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - let needWorkerChoiceStrategyUpdate = false + let needWorkerChoiceStrategiesUpdate = false // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage @@ -1251,7 +1253,7 @@ export abstract class AbstractPool< workerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1276,9 +1278,9 @@ export abstract class AbstractPool< taskFunctionWorkerUsage, message ) - needWorkerChoiceStrategyUpdate = true + needWorkerChoiceStrategiesUpdate = true } - if (needWorkerChoiceStrategyUpdate) { + if (needWorkerChoiceStrategiesUpdate) { this.workerChoiceStrategiesContext?.update(workerNodeKey) } } @@ -1883,11 +1885,11 @@ export abstract class AbstractPool< this.handleWorkerReadyResponse(message) } else if (taskFunctionsProperties != null) { // Task function properties message received from worker - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerInfo = this.getWorkerInfo(workerNodeKey) if (workerInfo != null) { workerInfo.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) } } else if (taskId != null) { // Task execution response received from worker @@ -1907,10 +1909,11 @@ export abstract class AbstractPool< if (ready == null || !ready) { throw new Error(`Worker ${workerId} failed to initialize`) } - const workerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] workerNode.info.ready = ready workerNode.info.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) this.checkAndEmitReadyEvent() } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 42e2959b..3158166b 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -1453,21 +1453,55 @@ describe('Abstract pool test suite', () => { waitTime: { history: new CircularArray() }, - elu: { - idle: { - aggregate: 0, - maximum: 0, - minimum: 0, - history: new CircularArray() - }, - active: { - aggregate: 0, - maximum: 0, - minimum: 0, - history: new CircularArray() - } - } + elu: expect.objectContaining({ + idle: expect.objectContaining({ + history: expect.any(CircularArray) + }), + active: expect.objectContaining({ + history: expect.any(CircularArray) + }) + }) }) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed + ).toBeGreaterThan(0) + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate == + null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate + ).toBeGreaterThan(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate + ).toBeGreaterThanOrEqual(0) + } + if ( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null + ) { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeUndefined() + } else { + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeGreaterThanOrEqual(0) + expect( + workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization + ).toBeLessThanOrEqual(1) + } } await dynamicThreadPool.destroy() })