fix: fix faire share worker choice stategy internals update
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 6 May 2023 13:21:23 +0000 (15:21 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 6 May 2023 13:21:23 +0000 (15:21 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/less-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/selection-strategies/selection-strategies.test.js

index 820656bb55a69e9ff7ba8f18419456b4193223b8..b778fe17e4c42bea82c6e39c3edf36149a2f3b2c 100644 (file)
@@ -432,6 +432,7 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
+    this.workerChoiceStrategyContext.update()
   }
 
   /**
index 2af13d631e2b77708729544787a575896b6c2e2e..9aabdfd29d3a512df229b7b28bcfe631eaae2804 100644 (file)
@@ -65,6 +65,9 @@ export abstract class AbstractWorkerChoiceStrategy<
   /** @inheritDoc */
   public abstract reset (): boolean
 
+  /** @inheritDoc */
+  public abstract update (): boolean
+
   /** @inheritDoc */
   public abstract choose (): number
 
index f0bc8787a6005e4d896140a220f49ae89b8c391c..17133408e276191a9f85f1abbdf21209445420ab 100644 (file)
@@ -58,12 +58,19 @@ export class FairShareWorkerChoiceStrategy<
     return true
   }
 
+  /** @inheritDoc */
+  public update (): boolean {
+    for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
+      this.computeWorkerVirtualTaskTimestamp(workerNodeKey)
+    }
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): number {
     let minWorkerVirtualTaskEndTimestamp = Infinity
     let chosenWorkerNodeKey!: number
     for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
-      this.computeWorkerVirtualTaskTimestamp(workerNodeKey)
       const workerVirtualTaskEndTimestamp =
         this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? 0
       if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
@@ -95,7 +102,7 @@ export class FairShareWorkerChoiceStrategy<
       : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
     this.workersVirtualTaskTimestamp[workerNodeKey] = {
       start: workerVirtualTaskStartTimestamp,
-      end: workerVirtualTaskStartTimestamp + (workerVirtualTaskTRunTime ?? 0)
+      end: workerVirtualTaskStartTimestamp + workerVirtualTaskTRunTime
     }
   }
 }
index 79e65c592fd09f3d1e4d506f78463978441ff8ca..8feac71c27f10a2cf1fc91ca8b559ebbc0543e89 100644 (file)
@@ -43,6 +43,11 @@ export class LessBusyWorkerChoiceStrategy<
     return true
   }
 
+  /** @inheritDoc */
+  public update (): boolean {
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): number {
     const freeWorkerNodeKey = this.findFreeWorkerNodeKey()
index 1503e057ef42fc3890eb0e8cfbc33496a1dd9f5e..078dd8fd8149747ef121205145ee707c42204e36 100644 (file)
@@ -35,6 +35,11 @@ export class LessUsedWorkerChoiceStrategy<
     return true
   }
 
+  /** @inheritDoc */
+  public update (): boolean {
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): number {
     const freeWorkerNodeKey = this.findFreeWorkerNodeKey()
index d2d6f6d6574a23b092da2323957b312deee13d4d..5e35636af62e77d5b56720344c14c274a0c0a97e 100644 (file)
@@ -41,6 +41,11 @@ export class RoundRobinWorkerChoiceStrategy<
     return true
   }
 
+  /** @inheritDoc */
+  public update (): boolean {
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): number {
     const chosenWorkerNodeKey = this.nextWorkerNodeId
index 0e549cec338b685d36e80ab1ff3d147ef6a3f83f..968d2996e0dfd05f7a21404c656010e2bdfcc92b 100644 (file)
@@ -77,9 +77,15 @@ export interface IWorkerChoiceStrategy {
    */
   readonly requiredStatistics: RequiredStatistics
   /**
-   * Resets strategy internals (counters, statistics, etc.).
+   * Resets strategy internals.
    */
   reset: () => boolean
+  /**
+   * Updates strategy internals.
+   *
+   * @returns `true` if the update is successful, `false` otherwise.
+   */
+  update: () => boolean
   /**
    * Chooses a worker node in the pool and returns its key.
    */
@@ -88,6 +94,7 @@ export interface IWorkerChoiceStrategy {
    * Removes a worker node key from strategy internals.
    *
    * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node key is removed, `false` otherwise.
    */
   remove: (workerNodeKey: number) => boolean
   /**
index ad20db423826aa46a649da5512a3358e9f119572..1817394933978fcc9a18a4052dcec1c44efe5201 100644 (file)
@@ -61,16 +61,21 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     return true
   }
 
+  /** @inheritDoc */
+  public update (): boolean {
+    return true
+  }
+
   /** @inheritDoc */
   public choose (): number {
     const chosenWorkerNodeKey = this.currentWorkerNodeId
-    const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime ?? 0
-    const workerTaskWeight =
+    const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime
+    const workerWeight =
       this.opts.weights?.[chosenWorkerNodeKey] ?? this.defaultWorkerWeight
-    if (workerVirtualTaskRunTime < workerTaskWeight) {
+    if (workerVirtualTaskRunTime < workerWeight) {
       this.workerVirtualTaskRunTime =
         workerVirtualTaskRunTime +
-        (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
+        this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey)
     } else {
       this.currentWorkerNodeId =
         this.currentWorkerNodeId === this.pool.workerNodes.length - 1
index 2a487acbdfde0f2d07c2f9eae9233d26da387604..0a488549f9f9d7240b8d4fd421d64b89ada5401b 100644 (file)
@@ -114,6 +114,19 @@ export class WorkerChoiceStrategyContext<
     this.workerChoiceStrategies.get(this.workerChoiceStrategy)?.reset()
   }
 
+  /**
+   * Updates the worker choice strategy internals in the context.
+   *
+   * @returns `true` if the update is successful, `false` otherwise.
+   */
+  public update (): boolean {
+    return (
+      this.workerChoiceStrategies.get(
+        this.workerChoiceStrategy
+      ) as IWorkerChoiceStrategy
+    ).update()
+  }
+
   /**
    * Executes the worker choice strategy algorithm in the context.
    *
index 1edc123407cceb7f2ac79252b34b358c52e25c6d..e8d9d66b751e305b79ce2e5147eedb3eb6bcb48a 100644 (file)
@@ -438,6 +438,12 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+    }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -461,6 +467,12 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+    }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -493,7 +505,7 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBe(0)
       expect(workerNode.tasksUsage.medRunTime).toBeDefined()
-      expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -627,6 +639,12 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+    }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -650,14 +668,17 @@ describe('Selection strategies test suite', () => {
     )
     // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
     const promises = []
-    const maxMultiplier =
-      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).defaultWorkerWeight * 50
+    const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+    }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -695,7 +716,7 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
       expect(workerNode.tasksUsage.avgRunTime).toBe(0)
       expect(workerNode.tasksUsage.medRunTime).toBeDefined()
-      expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0)
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(