feat: improve IWRR implementation
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 30 Aug 2023 10:47:19 +0000 (12:47 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 30 Aug 2023 10:47:19 +0000 (12:47 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
docs/worker-choice-strategies.md
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
tests/pools/selection-strategies/selection-strategies.test.js

index b28d6eb61ff41020ddfafff5dbcba30a8fab52cd..286623fb64c171a29874cbf39a9fbbd2453cc3b4 100644 (file)
@@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Bundle typescript types declaration into one file.
 
+### Changed
+
+- Improve interleaved weighted round robin worker choice strategy implementation.
+
 ## [2.6.37] - 2023-08-28
 
 ### Fixed
index b26819a48b63ebf8a2e712e2d3fca284003097e3..49f7c65ce1f76e3c8914ed5460bca0fefd7c8883 100644 (file)
@@ -20,12 +20,12 @@ By default, the strategy uses the simple moving average task execution time for
 
 ### Weighted round robin
 
-The worker weights are maximum tasks execution time, once the worker has reached its maximum tasks execution time, the next task is assigned to the next worker. The default worker weight is the same for each and computed given the CPU cores speed and theirs numbers.
+The worker weights are maximum tasks execution time. Once the worker has reached its maximum tasks execution time, the next task is assigned to the next worker. The default worker weight is the same for each and computed given the CPU cores speed and theirs numbers.
 
-### Interleaved weighted round robin
+### Interleaved weighted round robin (experimental)
 
 The worker weights are maximum tasks execution time. The rounds are the deduplicated worker weights.  
-During a round, if the worker weight is superior or equal to the current round weight, the task is assigned to the worker. Once all workers weight have been tested, the next round starts.  
+During a round, if the worker weight is superior or equal to the current round weight and its tasks execution time is inferior or equal to the current round weight, the task is assigned to the worker. Once all workers weight have been tested, the next round starts.  
 The default worker weights is the same for each and computed given the CPU cores speed and theirs numbers. So the default 'rounds' consists of a unique worker weight.
 
 ## Statistics
index 2fba4e9fbd1a17be10fd9d6a86a6cb574a9ad03b..cd8736c25e771470871497be2ef2f6933105aaa4 100644 (file)
@@ -197,7 +197,7 @@ export abstract class AbstractWorkerChoiceStrategy<
   /**
    * Check the next worker node eligibility.
    *
-   * @param chosenNextWorkerNodeKey - The chosen worker node key.
+   * @param chosenNextWorkerNodeKey - The chosen next worker node key.
    */
   protected checkNextWorkerNodeEligibility (
     chosenNextWorkerNodeKey: number | undefined
index e37543820593a68b6860e7b597c40919c8866e9d..ca5b7db9d914d0531fade68eccac9fc7d614bf11 100644 (file)
@@ -1,9 +1,13 @@
 import type { IWorker } from '../worker'
 import type { IPool } from '../pool'
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import {
+  DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+  DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+} from '../../utils'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
+  TaskStatisticsRequirements,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -21,19 +25,37 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
   >
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
+  /** @inheritDoc */
+  public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
+    runTime: {
+      aggregate: true,
+      average: true,
+      median: false
+    },
+    waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+    elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
+  }
+
   /**
    * Round id.
-   * This is used to determine the current round weight.
    */
   private roundId: number = 0
+  /**
+   * Default worker weight.
+   */
+  private readonly defaultWorkerWeight: number
   /**
    * Round weights.
    */
   private roundWeights: number[]
   /**
-   * Default worker weight.
+   * Worker node id.
    */
-  private readonly defaultWorkerWeight: number
+  private workerNodeId: number = 0
+  /**
+   * Worker virtual task runtime.
+   */
+  private workerVirtualTaskRunTime: number = 0
 
   /** @inheritDoc */
   public constructor (
@@ -50,6 +72,8 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
   public reset (): boolean {
     this.resetWorkerNodeKeyProperties()
     this.roundId = 0
+    this.workerNodeId = 0
+    this.workerVirtualTaskRunTime = 0
     return true
   }
 
@@ -60,47 +84,59 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): number | undefined {
-    let roundId!: number
-    let workerNodeId: number | undefined
     for (
       let roundIndex = this.roundId;
       roundIndex < this.roundWeights.length;
       roundIndex++
     ) {
-      roundId = roundIndex
+      this.roundId = roundIndex
       for (
-        let workerNodeKey =
-          this.nextWorkerNodeKey ?? this.previousWorkerNodeKey;
+        let workerNodeKey = this.workerNodeId;
         workerNodeKey < this.pool.workerNodes.length;
         workerNodeKey++
       ) {
+        this.workerNodeId = workerNodeKey
         if (!this.isWorkerNodeEligible(workerNodeKey)) {
           continue
         }
+        if (
+          this.workerNodeId !== this.nextWorkerNodeKey &&
+          this.workerVirtualTaskRunTime !== 0
+        ) {
+          this.workerVirtualTaskRunTime = 0
+        }
         const workerWeight =
           this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
-        if (workerWeight >= this.roundWeights[roundIndex]) {
-          workerNodeId = workerNodeKey
-          break
+        if (
+          workerWeight >= this.roundWeights[roundIndex] &&
+          this.workerVirtualTaskRunTime < workerWeight
+        ) {
+          this.workerVirtualTaskRunTime =
+            this.workerVirtualTaskRunTime +
+            this.getWorkerTaskRunTime(workerNodeKey)
+          this.previousWorkerNodeKey =
+            this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+          this.nextWorkerNodeKey = workerNodeKey
+          return this.nextWorkerNodeKey
         }
       }
     }
-    this.roundId = roundId
-    if (workerNodeId == null) {
-      this.previousWorkerNodeKey =
-        this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
-    }
-    this.nextWorkerNodeKey = workerNodeId
-    const chosenWorkerNodeKey = this.nextWorkerNodeKey
-    if (this.nextWorkerNodeKey === this.pool.workerNodes.length - 1) {
-      this.nextWorkerNodeKey = 0
-      this.roundId =
-        this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
+    this.interleavedWeightedRoundRobinNextWorkerNodeId()
+  }
+
+  private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
+    if (
+      this.roundId === this.roundWeights.length - 1 &&
+      this.workerNodeId === this.pool.workerNodes.length - 1
+    ) {
+      this.roundId = 0
+      this.workerNodeId = 0
+    } else if (this.workerNodeId === this.pool.workerNodes.length - 1) {
+      this.roundId = this.roundId + 1
+      this.workerNodeId = 0
     } else {
-      this.nextWorkerNodeKey =
-        (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+      this.workerNodeId = this.workerNodeId + 1
     }
-    return chosenWorkerNodeKey
   }
 
   /** @inheritDoc */
@@ -109,10 +145,11 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
       if (this.pool.workerNodes.length === 0) {
         this.nextWorkerNodeKey = 0
       } else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
-        this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
         this.roundId =
           this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
+        this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
       }
+      this.workerVirtualTaskRunTime = 0
     }
     return true
   }
index 46575b80d851ec719b783a370d20194b8ba72d58..034c88e6a2a51706d2adf9032dc293794ac6f0e7 100644 (file)
@@ -90,14 +90,13 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   }
 
   private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
-    const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime
     const workerWeight =
       this.opts.weights?.[
         this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
       ] ?? this.defaultWorkerWeight
-    if (workerVirtualTaskRunTime < workerWeight) {
+    if (this.workerVirtualTaskRunTime < workerWeight) {
       this.workerVirtualTaskRunTime =
-        workerVirtualTaskRunTime +
+        this.workerVirtualTaskRunTime +
         this.getWorkerTaskRunTime(
           this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
         )
index 253238ba313225abc3e384bf6c8ae1b47f56baae..6cc68a8b3cd58f26ff4911bcd9c0100de981a715 100644 (file)
@@ -675,7 +675,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -684,18 +684,18 @@ describe('Selection strategies test suite', () => {
           stolen: 0,
           failed: 0
         },
-        runTime: {
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
-        waitTime: {
+        }),
+        waitTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         elu: {
           idle: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           },
           active: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           }
         }
       })
@@ -733,7 +733,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -742,18 +742,18 @@ describe('Selection strategies test suite', () => {
           stolen: 0,
           failed: 0
         },
-        runTime: {
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
-        waitTime: {
+        }),
+        waitTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         elu: {
           idle: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           },
           active: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           }
         }
       })
@@ -872,7 +872,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -882,19 +882,19 @@ describe('Selection strategies test suite', () => {
           failed: 0
         },
         runTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
-        elu: {
-          idle: {
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
             history: expect.any(CircularArray)
-          },
-          active: {
+          }),
+          active: expect.objectContaining({
             history: expect.any(CircularArray)
-          }
-        }
+          })
+        })
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
@@ -936,7 +936,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -946,19 +946,19 @@ describe('Selection strategies test suite', () => {
           failed: 0
         },
         runTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
-        elu: {
-          idle: {
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
             history: expect.any(CircularArray)
-          },
-          active: {
+          }),
+          active: expect.objectContaining({
             history: expect.any(CircularArray)
-          }
-        }
+          })
+        })
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
@@ -1081,7 +1081,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1090,20 +1090,20 @@ describe('Selection strategies test suite', () => {
           stolen: 0,
           failed: 0
         },
-        runTime: {
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
-        elu: {
-          idle: {
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
             history: expect.any(CircularArray)
-          },
-          active: {
+          }),
+          active: expect.objectContaining({
             history: expect.any(CircularArray)
-          }
-        }
+          })
+        })
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
@@ -1160,7 +1160,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1169,20 +1169,20 @@ describe('Selection strategies test suite', () => {
           stolen: 0,
           failed: 0
         },
-        runTime: {
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
-        elu: {
-          idle: {
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
             history: expect.any(CircularArray)
-          },
-          active: {
+          }),
+          active: expect.objectContaining({
             history: expect.any(CircularArray)
-          }
-        }
+          })
+        })
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
@@ -1244,7 +1244,7 @@ describe('Selection strategies test suite', () => {
     }
     await Promise.all(promises)
     for (const workerNode of pool.workerNodes) {
-      expect(workerNode.usage).toMatchObject({
+      expect(workerNode.usage).toStrictEqual({
         tasks: {
           executed: expect.any(Number),
           executing: 0,
@@ -1253,20 +1253,20 @@ describe('Selection strategies test suite', () => {
           stolen: 0,
           failed: 0
         },
-        runTime: {
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
-        elu: {
-          idle: {
+        elu: expect.objectContaining({
+          idle: expect.objectContaining({
             history: expect.any(CircularArray)
-          },
-          active: {
+          }),
+          active: expect.objectContaining({
             history: expect.any(CircularArray)
-          }
-        }
+          })
+        })
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
@@ -1491,14 +1491,14 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         }),
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
         elu: {
           idle: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           },
           active: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           }
         }
       })
@@ -1559,14 +1559,14 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         }),
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
         elu: {
           idle: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           },
           active: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           }
         }
       })
@@ -1632,14 +1632,14 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         }),
         waitTime: {
-          history: expect.any(CircularArray)
+          history: new CircularArray()
         },
         elu: {
           idle: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           },
           active: {
-            history: expect.any(CircularArray)
+            history: new CircularArray()
           }
         }
       })
@@ -1789,8 +1789,8 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
     ).toStrictEqual({
       runTime: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       },
       waitTime: {
@@ -1815,8 +1815,8 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
     ).toStrictEqual({
       runTime: {
-        aggregate: false,
-        average: false,
+        aggregate: true,
+        average: true,
         median: false
       },
       waitTime: {
@@ -1853,16 +1853,16 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           maxQueued: 0,
           stolen: 0,
           failed: 0
         },
-        runTime: {
-          history: new CircularArray()
-        },
+        runTime: expect.objectContaining({
+          history: expect.any(CircularArray)
+        }),
         waitTime: {
           history: new CircularArray()
         },
@@ -1875,6 +1875,10 @@ describe('Selection strategies test suite', () => {
           }
         }
       })
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1886,6 +1890,11 @@ describe('Selection strategies test suite', () => {
         pool.workerChoiceStrategyContext.workerChoiceStrategy
       ).roundId
     ).toBe(0)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).workerNodeId
+    ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -1931,9 +1940,9 @@ describe('Selection strategies test suite', () => {
           stolen: 0,
           failed: 0
         },
-        runTime: {
-          history: new CircularArray()
-        },
+        runTime: expect.objectContaining({
+          history: expect.any(CircularArray)
+        }),
         waitTime: {
           history: new CircularArray()
         },
@@ -1961,6 +1970,11 @@ describe('Selection strategies test suite', () => {
         pool.workerChoiceStrategyContext.workerChoiceStrategy
       ).roundId
     ).toBe(0)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).workerNodeId
+    ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy