From: Jérôme Benoit Date: Tue, 19 Sep 2023 09:52:43 +0000 (+0200) Subject: fix: remove worker task usage at task function removal X-Git-Tag: v2.7.0~1^2~5 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=adee605399485348ae224e7e4c022f024373b0ef;hp=e4c07d066abc51e978a18f44a973a548f24fb7ad;p=poolifier.git fix: remove worker task usage at task function removal Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 1bd5ee2c..33585218 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -768,8 +768,8 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): Promise { - const workerId = this.getWorkerInfo(workerNodeKey).id as number return await new Promise((resolve, reject) => { + const workerId = this.getWorkerInfo(workerNodeKey).id as number this.registerWorkerMessageListener(workerNodeKey, message => { if ( message.workerId === workerId && @@ -794,7 +794,7 @@ export abstract class AbstractPool< } private async sendTaskFunctionOperationToWorkers ( - message: Omit, 'workerId'> + message: MessageValue ): Promise { return await new Promise((resolve, reject) => { const responsesReceived = new Array>() @@ -857,12 +857,13 @@ export abstract class AbstractPool< if (typeof fn !== 'function') { throw new TypeError('fn argument must be a function') } - this.taskFunctions.set(name, fn) - return await this.sendTaskFunctionOperationToWorkers({ + const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionName: name, taskFunction: fn.toString() }) + this.taskFunctions.set(name, fn) + return opResult } /** @inheritDoc */ @@ -872,11 +873,13 @@ export abstract class AbstractPool< 'Cannot remove a task function not handled on the pool side' ) } - this.taskFunctions.delete(name) - return await this.sendTaskFunctionOperationToWorkers({ + const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', taskFunctionName: name }) + this.deleteTaskFunctionWorkerUsages(name) + this.taskFunctions.delete(name) + return opResult } /** @inheritDoc */ @@ -900,6 +903,12 @@ export abstract class AbstractPool< }) } + private deleteTaskFunctionWorkerUsages (name: string): void { + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } + } + private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index ed667702..e9680261 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -183,6 +183,11 @@ implements IWorkerNode { return this.taskFunctionsUsage.get(name) } + /** @inheritdoc */ + public deleteTaskFunctionWorkerUsage (name: string): boolean { + return this.taskFunctionsUsage.delete(name) + } + private async startOnEmptyQueue (): Promise { if ( this.onEmptyQueueCount > 0 && diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 37d63085..4c877bc8 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -315,4 +315,11 @@ export interface IWorkerNode { * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise. */ readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined + /** + * Deletes task function worker usage statistics. + * + * @param name - The task function name. + * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise. + */ + readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index bd03d86e..6c6de861 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -386,7 +386,7 @@ export abstract class AbstractWorker< new Function(`return ${taskFunction as string}`)() as TaskFunction< Data, Response - > /* NOSONAR */ + > ) } else if (taskFunctionOperation === 'remove') { response = this.removeTaskFunction(taskFunctionName as string) @@ -396,10 +396,14 @@ export abstract class AbstractWorker< this.sendToMainWorker({ taskFunctionOperation, taskFunctionOperationStatus: response.status, - workerError: { - name: taskFunctionName as string, - message: this.handleError(response.error as Error | string) - } + taskFunctionName, + ...(!response.status && + response?.error != null && { + workerError: { + name: taskFunctionName as string, + message: this.handleError(response.error as Error | string) + } + }) }) } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 99ebbfa1..947c7897 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -1316,6 +1316,31 @@ describe('Abstract pool test suite', () => { const taskFunctionData = { test: 'test' } const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo') expect(echoResult).toStrictEqual(taskFunctionData) + for (const workerNode of dynamicThreadPool.workerNodes) { + expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({ + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + stolen: 0, + failed: 0 + }, + runTime: { + history: new CircularArray() + }, + waitTime: { + history: new CircularArray() + }, + elu: { + idle: { + history: new CircularArray() + }, + active: { + history: new CircularArray() + } + } + }) + } await dynamicThreadPool.destroy() }) @@ -1493,11 +1518,52 @@ describe('Abstract pool test suite', () => { ) const workerNodeKey = 0 await expect( - pool.sendKillMessageToWorker( - workerNodeKey, - pool.workerNodes[workerNodeKey].info.id - ) + pool.sendKillMessageToWorker(workerNodeKey) ).resolves.toBeUndefined() await pool.destroy() }) + + it('Verify sendTaskFunctionOperationToWorker()', async () => { + const pool = new DynamicClusterPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + const workerNodeKey = 0 + await expect( + pool.sendTaskFunctionOperationToWorker(workerNodeKey, { + taskFunctionOperation: 'add', + taskFunctionName: 'empty', + taskFunction: (() => {}).toString() + }) + ).resolves.toBe(true) + expect( + pool.workerNodes[workerNodeKey].info.taskFunctionNames + ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty']) + await pool.destroy() + }) + + it('Verify sendTaskFunctionOperationToWorkers()', async () => { + const pool = new DynamicClusterPool( + Math.floor(numberOfWorkers / 2), + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + await waitPoolEvents(pool, PoolEvents.ready, 1) + await expect( + pool.sendTaskFunctionOperationToWorkers({ + taskFunctionOperation: 'add', + taskFunctionName: 'empty', + taskFunction: (() => {}).toString() + }) + ).resolves.toBe(true) + for (const workerNode of pool.workerNodes) { + expect(workerNode.info.taskFunctionNames).toStrictEqual([ + DEFAULT_TASK_NAME, + 'test', + 'empty' + ]) + } + await pool.destroy() + }) }) diff --git a/tests/pools/abstract/worker-node.test.js b/tests/pools/abstract/worker-node.test.js index f6305fe7..ee469eda 100644 --- a/tests/pools/abstract/worker-node.test.js +++ b/tests/pools/abstract/worker-node.test.js @@ -221,4 +221,19 @@ describe('Worker node test suite', () => { }) expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) }) + + it('Worker node deleteTaskFunctionWorkerUsage()', () => { + expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([ + DEFAULT_TASK_NAME, + 'fn1', + 'fn2' + ]) + expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) + expect( + threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction') + ).toBe(false) + expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2) + expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true) + expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1) + }) })