fix: fix fair share algorithm implementation
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 6 May 2023 18:25:18 +0000 (20:25 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 6 May 2023 18:25:18 +0000 (20:25 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/less-used-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
src/utils.ts
tests/pools/selection-strategies/selection-strategies.test.js

index b778fe17e4c42bea82c6e39c3edf36149a2f3b2c..ab93fc007b698f56b029aa4043182c87b330ab3b 100644 (file)
@@ -322,7 +322,7 @@ export abstract class AbstractPool<
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
-        return workerNode.tasksUsage?.running === 0
+        return workerNode.tasksUsage.running === 0
       }) === -1
     )
   }
@@ -412,7 +412,8 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerTasksUsage = this.getWorkerTasksUsage(worker)
+    const workerNodeKey = this.getWorkerNodeKey(worker)
+    const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
     --workerTasksUsage.running
     ++workerTasksUsage.run
     if (message.error != null) {
@@ -432,7 +433,7 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
-    this.workerChoiceStrategyContext.update()
+    this.workerChoiceStrategyContext.update(workerNodeKey)
   }
 
   /**
@@ -447,13 +448,14 @@ export abstract class AbstractPool<
     if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
       const workerCreated = this.createAndSetupWorker()
       this.registerWorkerMessageListener(workerCreated, message => {
+        const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
         if (
           isKillBehavior(KillBehaviors.HARD, message.kill) ||
           (message.kill != null &&
-            this.getWorkerTasksUsage(workerCreated)?.running === 0)
+            this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
         ) {
           // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          this.flushTasksQueueByWorker(workerCreated)
+          this.flushTasksQueue(currentWorkerNodeKey)
           void (this.destroyWorker(workerCreated) as Promise<void>)
         }
       })
@@ -579,21 +581,6 @@ export abstract class AbstractPool<
     workerNode.tasksUsage = tasksUsage
   }
 
-  /**
-   * Gets the given worker its tasks usage in the pool.
-   *
-   * @param worker - The worker.
-   * @throws Error if the worker is not found in the pool worker nodes.
-   * @returns The worker tasks usage.
-   */
-  private getWorkerTasksUsage (worker: Worker): TasksUsage {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    if (workerNodeKey !== -1) {
-      return this.workerNodes[workerNodeKey].tasksUsage
-    }
-    throw new Error('Worker could not be found in the pool worker nodes')
-  }
-
   /**
    * Pushes the given worker in the pool worker nodes.
    *
@@ -676,11 +663,6 @@ export abstract class AbstractPool<
     }
   }
 
-  private flushTasksQueueByWorker (worker: Worker): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    this.flushTasksQueue(workerNodeKey)
-  }
-
   private flushTasksQueues (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
       this.flushTasksQueue(workerNodeKey)
index 9aabdfd29d3a512df229b7b28bcfe631eaae2804..e2dd626de76106991b11d1567d70c9e6f3d21ab7 100644 (file)
@@ -66,7 +66,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   public abstract reset (): boolean
 
   /** @inheritDoc */
-  public abstract update (): boolean
+  public abstract update (workerNodeKey: number): boolean
 
   /** @inheritDoc */
   public abstract choose (): number
@@ -106,7 +106,7 @@ export abstract class AbstractWorkerChoiceStrategy<
    */
   private findFirstFreeWorkerNodeKey (): number {
     return this.pool.workerNodes.findIndex(workerNode => {
-      return workerNode.tasksUsage?.running === 0
+      return workerNode.tasksUsage.running === 0
     })
   }
 
@@ -122,10 +122,10 @@ export abstract class AbstractWorkerChoiceStrategy<
   private findLastFreeWorkerNodeKey (): number {
     // It requires node >= 18.0.0:
     // return this.workerNodes.findLastIndex(workerNode => {
-    //   return workerNode.tasksUsage?.running === 0
+    //   return workerNode.tasksUsage.running === 0
     // })
     for (let i = this.pool.workerNodes.length - 1; i >= 0; i--) {
-      if (this.pool.workerNodes[i].tasksUsage?.running === 0) {
+      if (this.pool.workerNodes[i].tasksUsage.running === 0) {
         return i
       }
     }
index 17133408e276191a9f85f1abbdf21209445420ab..e65575a1e05c805978111cbbf279c5f5a5767e6f 100644 (file)
@@ -13,7 +13,7 @@ import type {
  */
 interface WorkerVirtualTaskTimestamp {
   start: number
-  end: number
+  end?: number
 }
 
 /**
@@ -59,10 +59,8 @@ export class FairShareWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public update (): boolean {
-    for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
-      this.computeWorkerVirtualTaskTimestamp(workerNodeKey)
-    }
+  public update (workerNodeKey: number): boolean {
+    this.computeWorkerVirtualTaskTimestamp(workerNodeKey)
     return true
   }
 
index 8feac71c27f10a2cf1fc91ca8b559ebbc0543e89..40a3f649b2116eec3f25b4b07347a8c5134b2e77 100644 (file)
@@ -69,7 +69,7 @@ export class LessBusyWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public remove (workerNodeKey: number): boolean {
+  public remove (): boolean {
     return true
   }
 }
index 078dd8fd8149747ef121205145ee707c42204e36..295493a494f842da32645f563469e5ba272eea3d 100644 (file)
@@ -62,7 +62,7 @@ export class LessUsedWorkerChoiceStrategy<
   }
 
   /** @inheritDoc */
-  public remove (workerNodeKey: number): boolean {
+  public remove (): boolean {
     return true
   }
 }
index 968d2996e0dfd05f7a21404c656010e2bdfcc92b..114ae8c555141fd08bff0103fcf79d62a65b5f83 100644 (file)
@@ -78,16 +78,20 @@ export interface IWorkerChoiceStrategy {
   readonly requiredStatistics: RequiredStatistics
   /**
    * Resets strategy internals.
+   *
+   * @returns `true` if the reset is successful, `false` otherwise.
    */
   reset: () => boolean
   /**
-   * Updates strategy internals.
+   * Updates worker node strategy internals.
    *
    * @returns `true` if the update is successful, `false` otherwise.
    */
-  update: () => boolean
+  update: (workerNodeKey: number) => boolean
   /**
    * Chooses a worker node in the pool and returns its key.
+   *
+   * @returns The worker node key.
    */
   choose: () => number
   /**
index 0a488549f9f9d7240b8d4fd421d64b89ada5401b..e5fddc1b1fc5b400398b2e86bd0ac3b3fd6f97c9 100644 (file)
@@ -119,12 +119,12 @@ export class WorkerChoiceStrategyContext<
    *
    * @returns `true` if the update is successful, `false` otherwise.
    */
-  public update (): boolean {
+  public update (workerNodeKey: number): boolean {
     return (
       this.workerChoiceStrategies.get(
         this.workerChoiceStrategy
       ) as IWorkerChoiceStrategy
-    ).update()
+    ).update(workerNodeKey)
   }
 
   /**
index f30e175c05dd9b20561f8b82f36beaa93946584e..ed118da6b5e10165d23c2f6820af73e9ed7f02bb 100644 (file)
@@ -28,7 +28,7 @@ export const median = (dataSet: number[]): number => {
   const sortedDataSet = dataSet.slice().sort((a, b) => a - b)
   const middleIndex = Math.floor(sortedDataSet.length / 2)
   if (sortedDataSet.length % 2 === 0) {
-    return sortedDataSet[middleIndex / 2]
+    return sortedDataSet[middleIndex]
   }
   return (sortedDataSet[middleIndex - 1] + sortedDataSet[middleIndex]) / 2
 }
index e8d9d66b751e305b79ce2e5147eedb3eb6bcb48a..392aafcf3a58a79f6744a37268b01415243a2b97 100644 (file)
@@ -153,12 +153,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -171,12 +169,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -290,12 +286,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
     )
     // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -308,12 +302,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
     )
     // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -361,12 +353,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
     )
     // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -379,12 +369,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
     )
     // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -432,12 +420,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
@@ -461,12 +447,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
@@ -495,12 +479,10 @@ describe('Selection strategies test suite', () => {
       }
     )
     // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBe(0)
@@ -633,12 +615,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
@@ -667,12 +647,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
@@ -706,12 +684,10 @@ describe('Selection strategies test suite', () => {
       }
     )
     // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBe(0)