feat: untangle worker choice strategies tasks distribution and dynamic worker creatio...
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 19 Aug 2023 16:54:33 +0000 (18:54 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 19 Aug 2023 16:54:33 +0000 (18:54 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/dynamic.test.js
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/dynamic.test.js
tests/pools/thread/fixed.test.js

index 3128d3fd4e9a39b6f1062c620af3b735950fc51c..8adad1e4722d49eda423b972cb0d124efe28003c 100644 (file)
@@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Fix worker choice strategy retries mechanism on some edge cases.
 
+### Changed
+
+- Make orthogonal worker choice strategies tasks distribution and dynamic worker creation usage.
+
 ## [2.6.30] - 2023-08-19
 
 ### Fixed
index 59466274bd8c5740047b91e0c8b9936e3c4b3c8b..428dd9128ca8e26e20d5e766f6cab57b6c04ca2f 100644 (file)
@@ -709,7 +709,7 @@ export abstract class AbstractPool<
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
       if (
         name != null &&
         Array.isArray(workerInfo.taskFunctions) &&
@@ -805,15 +805,24 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     task: Task<Data>
   ): void {
-    const workerUsage = this.workerNodes[workerNodeKey].usage
-    ++workerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(workerUsage, task)
-    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
+      const workerUsage = this.workerNodes[workerNodeKey].usage
+      ++workerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(workerUsage, task)
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
+        task.name as string
+      ) != null
+    ) {
       const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
       ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
-      ++taskFunctionWorkerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+      if (taskFunctionWorkerUsage != null) {
+        ++taskFunctionWorkerUsage.tasks.executing
+        this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+      }
     }
   }
 
@@ -828,11 +837,18 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Response>
   ): void {
-    const workerUsage = this.workerNodes[workerNodeKey].usage
-    this.updateTaskStatisticsWorkerUsage(workerUsage, message)
-    this.updateRunTimeWorkerUsage(workerUsage, message)
-    this.updateEluWorkerUsage(workerUsage, message)
-    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+    if (this.workerNodes[workerNodeKey]?.usage != null) {
+      const workerUsage = this.workerNodes[workerNodeKey].usage
+      this.updateTaskStatisticsWorkerUsage(workerUsage, message)
+      this.updateRunTimeWorkerUsage(workerUsage, message)
+      this.updateEluWorkerUsage(workerUsage, message)
+    }
+    if (
+      this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
+      this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) != null
+    ) {
       const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
       ].getTaskFunctionWorkerUsage(
@@ -853,6 +869,7 @@ export abstract class AbstractPool<
   private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
+      workerInfo != null &&
       Array.isArray(workerInfo.taskFunctions) &&
       workerInfo.taskFunctions.length > 2
     )
@@ -1002,7 +1019,7 @@ export abstract class AbstractPool<
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('error', (error) => {
       const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
       workerInfo.ready = false
       this.workerNodes[workerNodeKey].closeChannel()
       this.emitter?.emit(PoolEvents.error, error)
@@ -1056,7 +1073,7 @@ export abstract class AbstractPool<
         })
       }
     })
-    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
     this.sendToWorker(workerNodeKey, {
       checkActive: true,
       workerId: workerInfo.id as number
@@ -1121,7 +1138,7 @@ export abstract class AbstractPool<
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu.aggregate
       },
-      workerId: this.getWorkerInfo(workerNodeKey).id as number
+      workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
     })
   }
 
@@ -1131,7 +1148,7 @@ export abstract class AbstractPool<
       let minQueuedTasks = Infinity
       let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
-        const workerInfo = this.getWorkerInfo(workerNodeId)
+        const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
         if (
           workerNodeId !== workerNodeKey &&
           workerInfo.ready &&
@@ -1185,8 +1202,10 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.getWorkerInfo(
-          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        (
+          this.getWorkerInfo(
+            this.getWorkerNodeKeyByWorkerId(message.workerId)
+          ) as WorkerInfo
         ).taskFunctions = message.taskFunctions
       }
     }
@@ -1198,7 +1217,7 @@ export abstract class AbstractPool<
     }
     const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    )
+    ) as WorkerInfo
     workerInfo.ready = message.ready as boolean
     workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
@@ -1260,8 +1279,8 @@ export abstract class AbstractPool<
    * @param workerNodeKey - The worker node key.
    * @returns The worker information.
    */
-  protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
-    return this.workerNodes[workerNodeKey].info
+  protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
+    return this.workerNodes[workerNodeKey]?.info
   }
 
   /**
index 10bdf8714b8bbd23eaf1fab96051ed11e7094bdd..41bc4085ebd5a1399975e48e873fc0612c6a611c 100644 (file)
@@ -24,7 +24,7 @@ export class RoundRobinWorkerChoiceStrategy<
   implements IWorkerChoiceStrategy {
   /** @inheritDoc */
   public readonly strategyPolicy: StrategyPolicy = {
-    dynamicWorkerUsage: true,
+    dynamicWorkerUsage: false,
     dynamicWorkerReady: true
   }
 
index 03bcce050f35a2f8907f0974ddf8c1c8c828937d..64e8606b12c1afdd36c40f605bf73f0ac2794e93 100644 (file)
@@ -756,7 +756,9 @@ describe('Abstract pool test suite', () => {
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
-      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(maxMultiplier)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+        numberOfWorkers * maxMultiplier
+      )
       expect(workerNode.usage.runTime.history.length).toBe(0)
       expect(workerNode.usage.waitTime.history.length).toBe(0)
       expect(workerNode.usage.elu.idle.history.length).toBe(0)
index c91447ee31b6fe1f23c048b9d25568f0bfd8d0db..166b546db5bc147272672dff20cfc9d9600bbb8d 100644 (file)
@@ -34,9 +34,7 @@ describe('Dynamic cluster pool test suite', () => {
     }
     expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
     expect(pool.workerNodes.length).toBeGreaterThan(min)
-    // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
-    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
-    expect(poolBusy).toBe(max + 1)
+    expect(poolBusy).toBe(1)
     const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
     expect(numberOfExitEvents).toBe(max - min)
   })
index 3baaa98ad04831151e433856bdcacee5682338f9..a5790db075f400b3ff4ff93dd6c0a01a3beb62fd 100644 (file)
@@ -92,12 +92,14 @@ describe('Fixed cluster pool test suite', () => {
     expect(poolReady).toBe(1)
   })
 
-  it("Verify that 'busy' event is emitted", () => {
+  it("Verify that 'busy' event is emitted", async () => {
+    const promises = new Set()
     let poolBusy = 0
     pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
     for (let i = 0; i < numberOfWorkers * 2; i++) {
-      pool.execute()
+      promises.add(pool.execute())
     }
+    await Promise.all(promises)
     // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the fixed pool.
     expect(poolBusy).toBe(numberOfWorkers + 1)
index 8f09c5024b28e78342c4c6af029156ebff3b80e3..0e47e18efade43e35b2894ba6824d666ddb13daa 100644 (file)
@@ -123,7 +123,7 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      dynamicWorkerUsage: true,
+      dynamicWorkerUsage: false,
       dynamicWorkerReady: true
     })
     await pool.destroy()
@@ -134,7 +134,7 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      dynamicWorkerUsage: true,
+      dynamicWorkerUsage: false,
       dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
@@ -261,7 +261,7 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           maxQueued: 0,
@@ -282,6 +282,10 @@ describe('Selection strategies test suite', () => {
           }
         }
       })
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
index f85d44ff4c42c6343ac49efa9cb73441c06b9e5f..ea9964caab100ac04a17deba24682f287d79cc7e 100644 (file)
@@ -34,9 +34,7 @@ describe('Dynamic thread pool test suite', () => {
     }
     expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
     expect(pool.workerNodes.length).toBeGreaterThan(min)
-    // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
-    // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
-    expect(poolBusy).toBe(max + 1)
+    expect(poolBusy).toBe(1)
     const numberOfExitEvents = await waitWorkerEvents(pool, 'exit', max - min)
     expect(numberOfExitEvents).toBe(max - min)
   })
index aeebc4a7a623a1151f37f41aaf7f56ddcfcadbc1..e3ee0f652dafe8a4401b95feb82f6c80c34751e6 100644 (file)
@@ -92,12 +92,14 @@ describe('Fixed thread pool test suite', () => {
     expect(poolReady).toBe(1)
   })
 
-  it("Verify that 'busy' event is emitted", () => {
+  it("Verify that 'busy' event is emitted", async () => {
+    const promises = new Set()
     let poolBusy = 0
     pool.emitter.on(PoolEvents.busy, () => ++poolBusy)
     for (let i = 0; i < numberOfThreads * 2; i++) {
-      pool.execute()
+      promises.add(pool.execute())
     }
+    await Promise.all(promises)
     // The `busy` event is triggered when the number of submitted tasks at once reach the number of fixed pool workers.
     // So in total numberOfThreads + 1 times for a loop submitting up to numberOfThreads * 2 tasks to the fixed pool.
     expect(poolBusy).toBe(numberOfThreads + 1)