fix: remove worker task usage at task function removal
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 19 Sep 2023 09:52:43 +0000 (11:52 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 19 Sep 2023 09:52:43 +0000 (11:52 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/abstract/worker-node.test.js

index 1bd5ee2cc73152e912cc08ecc3492f8e4a9ae6b7..33585218249e599cd1998fdc19158e7ab4b22168 100644 (file)
@@ -768,8 +768,8 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Data>
   ): Promise<boolean> {
-    const workerId = this.getWorkerInfo(workerNodeKey).id as number
     return await new Promise<boolean>((resolve, reject) => {
+      const workerId = this.getWorkerInfo(workerNodeKey).id as number
       this.registerWorkerMessageListener(workerNodeKey, message => {
         if (
           message.workerId === workerId &&
@@ -794,7 +794,7 @@ export abstract class AbstractPool<
   }
 
   private async sendTaskFunctionOperationToWorkers (
-    message: Omit<MessageValue<Data>, 'workerId'>
+    message: MessageValue<Data>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
       const responsesReceived = new Array<MessageValue<Data | Response>>()
@@ -857,12 +857,13 @@ export abstract class AbstractPool<
     if (typeof fn !== 'function') {
       throw new TypeError('fn argument must be a function')
     }
-    this.taskFunctions.set(name, fn)
-    return await this.sendTaskFunctionOperationToWorkers({
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionName: name,
       taskFunction: fn.toString()
     })
+    this.taskFunctions.set(name, fn)
+    return opResult
   }
 
   /** @inheritDoc */
@@ -872,11 +873,13 @@ export abstract class AbstractPool<
         'Cannot remove a task function not handled on the pool side'
       )
     }
-    this.taskFunctions.delete(name)
-    return await this.sendTaskFunctionOperationToWorkers({
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'remove',
       taskFunctionName: name
     })
+    this.deleteTaskFunctionWorkerUsages(name)
+    this.taskFunctions.delete(name)
+    return opResult
   }
 
   /** @inheritDoc */
@@ -900,6 +903,12 @@ export abstract class AbstractPool<
     })
   }
 
+  private deleteTaskFunctionWorkerUsages (name: string): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.deleteTaskFunctionWorkerUsage(name)
+    }
+  }
+
   private shallExecuteTask (workerNodeKey: number): boolean {
     return (
       this.tasksQueueSize(workerNodeKey) === 0 &&
index ed6677027f810474aa9aa01e768fd5f483250679..e9680261b591e26707c11c5ca868873f3ca5a6e4 100644 (file)
@@ -183,6 +183,11 @@ implements IWorkerNode<Worker, Data> {
     return this.taskFunctionsUsage.get(name)
   }
 
+  /** @inheritdoc */
+  public deleteTaskFunctionWorkerUsage (name: string): boolean {
+    return this.taskFunctionsUsage.delete(name)
+  }
+
   private async startOnEmptyQueue (): Promise<void> {
     if (
       this.onEmptyQueueCount > 0 &&
index 37d6308507669dd1f0ebcc295b790a1857c16a3e..4c877bc8b1dcc58871a55cd70c87871d918c4596 100644 (file)
@@ -315,4 +315,11 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
    */
   readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
+  /**
+   * Deletes task function worker usage statistics.
+   *
+   * @param name - The task function name.
+   * @returns `true` if the task function worker usage statistics were deleted, `false` otherwise.
+   */
+  readonly deleteTaskFunctionWorkerUsage: (name: string) => boolean
 }
index bd03d86e06eefee9871215a49aeb11343236be0d..6c6de861156e040285fc3a4ad4c50ba5e796f04f 100644 (file)
@@ -386,7 +386,7 @@ export abstract class AbstractWorker<
         new Function(`return ${taskFunction as string}`)() as TaskFunction<
         Data,
         Response
-        > /* NOSONAR */
+        >
       )
     } else if (taskFunctionOperation === 'remove') {
       response = this.removeTaskFunction(taskFunctionName as string)
@@ -396,10 +396,14 @@ export abstract class AbstractWorker<
     this.sendToMainWorker({
       taskFunctionOperation,
       taskFunctionOperationStatus: response.status,
-      workerError: {
-        name: taskFunctionName as string,
-        message: this.handleError(response.error as Error | string)
-      }
+      taskFunctionName,
+      ...(!response.status &&
+        response?.error != null && {
+        workerError: {
+          name: taskFunctionName as string,
+          message: this.handleError(response.error as Error | string)
+        }
+      })
     })
   }
 
index 99ebbfa1b9c930a2dd6182e517c71f2f23ef2d35..947c7897af7d3cc8ff97e7ffec1603ecad3a429b 100644 (file)
@@ -1316,6 +1316,31 @@ describe('Abstract pool test suite', () => {
     const taskFunctionData = { test: 'test' }
     const echoResult = await dynamicThreadPool.execute(taskFunctionData, 'echo')
     expect(echoResult).toStrictEqual(taskFunctionData)
+    for (const workerNode of dynamicThreadPool.workerNodes) {
+      expect(workerNode.getTaskFunctionWorkerUsage('echo')).toStrictEqual({
+        tasks: {
+          executed: expect.any(Number),
+          executing: 0,
+          queued: 0,
+          stolen: 0,
+          failed: 0
+        },
+        runTime: {
+          history: new CircularArray()
+        },
+        waitTime: {
+          history: new CircularArray()
+        },
+        elu: {
+          idle: {
+            history: new CircularArray()
+          },
+          active: {
+            history: new CircularArray()
+          }
+        }
+      })
+    }
     await dynamicThreadPool.destroy()
   })
 
@@ -1493,11 +1518,52 @@ describe('Abstract pool test suite', () => {
     )
     const workerNodeKey = 0
     await expect(
-      pool.sendKillMessageToWorker(
-        workerNodeKey,
-        pool.workerNodes[workerNodeKey].info.id
-      )
+      pool.sendKillMessageToWorker(workerNodeKey)
     ).resolves.toBeUndefined()
     await pool.destroy()
   })
+
+  it('Verify sendTaskFunctionOperationToWorker()', async () => {
+    const pool = new DynamicClusterPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    const workerNodeKey = 0
+    await expect(
+      pool.sendTaskFunctionOperationToWorker(workerNodeKey, {
+        taskFunctionOperation: 'add',
+        taskFunctionName: 'empty',
+        taskFunction: (() => {}).toString()
+      })
+    ).resolves.toBe(true)
+    expect(
+      pool.workerNodes[workerNodeKey].info.taskFunctionNames
+    ).toStrictEqual([DEFAULT_TASK_NAME, 'test', 'empty'])
+    await pool.destroy()
+  })
+
+  it('Verify sendTaskFunctionOperationToWorkers()', async () => {
+    const pool = new DynamicClusterPool(
+      Math.floor(numberOfWorkers / 2),
+      numberOfWorkers,
+      './tests/worker-files/cluster/testWorker.js'
+    )
+    await waitPoolEvents(pool, PoolEvents.ready, 1)
+    await expect(
+      pool.sendTaskFunctionOperationToWorkers({
+        taskFunctionOperation: 'add',
+        taskFunctionName: 'empty',
+        taskFunction: (() => {}).toString()
+      })
+    ).resolves.toBe(true)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.info.taskFunctionNames).toStrictEqual([
+        DEFAULT_TASK_NAME,
+        'test',
+        'empty'
+      ])
+    }
+    await pool.destroy()
+  })
 })
index f6305fe7566e4262008777530ed63568e487b2e0..ee469eda7b39692f4a5a26e9ba3782453c66c981 100644 (file)
@@ -221,4 +221,19 @@ describe('Worker node test suite', () => {
     })
     expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
   })
+
+  it('Worker node deleteTaskFunctionWorkerUsage()', () => {
+    expect(threadWorkerNode.info.taskFunctionNames).toStrictEqual([
+      DEFAULT_TASK_NAME,
+      'fn1',
+      'fn2'
+    ])
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
+    expect(
+      threadWorkerNode.deleteTaskFunctionWorkerUsage('invalidTaskFunction')
+    ).toBe(false)
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(2)
+    expect(threadWorkerNode.deleteTaskFunctionWorkerUsage('fn1')).toBe(true)
+    expect(threadWorkerNode.taskFunctionsUsage.size).toBe(1)
+  })
 })