fix: sync worker task performance measurement requirements
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 May 2024 13:07:14 +0000 (15:07 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 6 May 2024 13:07:14 +0000 (15:07 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
tests/pools/abstract-pool.test.mjs

index 53632f35326494021a21fe959ef683a1dd765179..a8d8025146b4fc27430e4f74a505d40cae19700b 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Ensure poolifier worker task performance measurement requirements are synchronized with task function objects' worker choice strategies.
+
 ## [4.0.1] - 2024-05-02
 
 ### Fixed
index 677d33b60d7c2b9a6736229004bb528008edf25d..07fec75f140a81ee10f39671274ecc4037a4fee3 100644 (file)
@@ -868,6 +868,9 @@ export abstract class AbstractPool<
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
       this.getWorkerWorkerChoiceStrategies()
     )
+    for (const workerNodeKey of this.workerNodes.keys()) {
+      this.sendStatisticsMessageToWorker(workerNodeKey)
+    }
     return opResult
   }
 
@@ -885,11 +888,16 @@ export abstract class AbstractPool<
         this.taskFunctions.get(name)
       )
     })
-    this.deleteTaskFunctionWorkerUsages(name)
+    for (const workerNode of this.workerNodes) {
+      workerNode.deleteTaskFunctionWorkerUsage(name)
+    }
     this.taskFunctions.delete(name)
     this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
       this.getWorkerWorkerChoiceStrategies()
     )
+    for (const workerNodeKey of this.workerNodes.keys()) {
+      this.sendStatisticsMessageToWorker(workerNodeKey)
+    }
     return opResult
   }
 
@@ -974,12 +982,6 @@ export abstract class AbstractPool<
     })
   }
 
-  private deleteTaskFunctionWorkerUsages (name: string): void {
-    for (const workerNode of this.workerNodes) {
-      workerNode.deleteTaskFunctionWorkerUsage(name)
-    }
-  }
-
   private shallExecuteTask (workerNodeKey: number): boolean {
     return (
       this.tasksQueueSize(workerNodeKey) === 0 &&
@@ -1236,7 +1238,7 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
-    let needWorkerChoiceStrategyUpdate = false
+    let needWorkerChoiceStrategiesUpdate = false
     // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
     if (this.workerNodes[workerNodeKey]?.usage != null) {
       const workerUsage = this.workerNodes[workerNodeKey].usage
@@ -1251,7 +1253,7 @@ export abstract class AbstractPool<
         workerUsage,
         message
       )
-      needWorkerChoiceStrategyUpdate = true
+      needWorkerChoiceStrategiesUpdate = true
     }
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1276,9 +1278,9 @@ export abstract class AbstractPool<
         taskFunctionWorkerUsage,
         message
       )
-      needWorkerChoiceStrategyUpdate = true
+      needWorkerChoiceStrategiesUpdate = true
     }
-    if (needWorkerChoiceStrategyUpdate) {
+    if (needWorkerChoiceStrategiesUpdate) {
       this.workerChoiceStrategiesContext?.update(workerNodeKey)
     }
   }
@@ -1883,11 +1885,11 @@ export abstract class AbstractPool<
       this.handleWorkerReadyResponse(message)
     } else if (taskFunctionsProperties != null) {
       // Task function properties message received from worker
-      const workerInfo = this.getWorkerInfo(
-        this.getWorkerNodeKeyByWorkerId(workerId)
-      )
+      const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
       if (workerInfo != null) {
         workerInfo.taskFunctionsProperties = taskFunctionsProperties
+        this.sendStatisticsMessageToWorker(workerNodeKey)
       }
     } else if (taskId != null) {
       // Task execution response received from worker
@@ -1907,10 +1909,11 @@ export abstract class AbstractPool<
     if (ready == null || !ready) {
       throw new Error(`Worker ${workerId} failed to initialize`)
     }
-    const workerNode =
-      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+    const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+    const workerNode = this.workerNodes[workerNodeKey]
     workerNode.info.ready = ready
     workerNode.info.taskFunctionsProperties = taskFunctionsProperties
+    this.sendStatisticsMessageToWorker(workerNodeKey)
     this.checkAndEmitReadyEvent()
   }
 
index 42e2959be9230185635759f07d6916769f030ae3..3158166b655dec28e913536d3e56fa880acdbd7c 100644 (file)
@@ -1453,21 +1453,55 @@ describe('Abstract pool test suite', () => {
         waitTime: {
           history: new CircularArray()
         },
-        elu: {
-          idle: {
-            aggregate: 0,
-            maximum: 0,
-            minimum: 0,
-            history: new CircularArray()
-          },
-          active: {
-            aggregate: 0,
-            maximum: 0,
-            minimum: 0,
-            history: new CircularArray()
-          }
-        }
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
+            history: expect.any(CircularArray)
+          }),
+          active: expect.objectContaining({
+            history: expect.any(CircularArray)
+          })
+        })
       })
+      expect(
+        workerNode.getTaskFunctionWorkerUsage('echo').tasks.executed
+      ).toBeGreaterThan(0)
+      if (
+        workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate ==
+        null
+      ) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+        ).toBeUndefined()
+      } else {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.active.aggregate
+        ).toBeGreaterThan(0)
+      }
+      if (
+        workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate == null
+      ) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+        ).toBeUndefined()
+      } else {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.idle.aggregate
+        ).toBeGreaterThanOrEqual(0)
+      }
+      if (
+        workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization == null
+      ) {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+        ).toBeUndefined()
+      } else {
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+        ).toBeGreaterThanOrEqual(0)
+        expect(
+          workerNode.getTaskFunctionWorkerUsage('echo').elu.utilization
+        ).toBeLessThanOrEqual(1)
+      }
     }
     await dynamicThreadPool.destroy()
   })