feat: handle worker node readyness in IWRR strategy
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 17 Oct 2023 15:23:26 +0000 (17:23 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 17 Oct 2023 15:23:26 +0000 (17:23 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts

index 57cfceb4190a0e14ab0fb6ced8ee721b475fab60..2509e78a032efe7986e3be7f29118f023fd29081 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Fix race condition at dynamic worker node task assignment and scheduled removal. See issue [#1468](https://github.com/poolifier/poolifier/issues/1468) and [#1496](https://github.com/poolifier/poolifier/issues/1496)
+
 ## [3.0.1] - 2023-10-16
 
 ### Fixed
index 75c7450d8ccc5f8b73d74b5404f4410f4376a46e..7a64a7e563f9d6d8e1fffdac9ce91d1bc64d5b1e 100644 (file)
@@ -126,16 +126,6 @@ export abstract class AbstractWorkerChoiceStrategy<
     return this.pool.workerNodes[workerNodeKey]?.info.ready ?? false
   }
 
-  /**
-   * Whether the worker node has back pressure or not (i.e. its tasks queue is full).
-   *
-   * @param workerNodeKey - The worker node key.
-   * @returns `true` if the worker node has back pressure, `false` otherwise.
-   */
-  protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
-    return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
-  }
-
   /**
    * Gets the worker node task runtime.
    * If the task statistics require the average runtime, the average runtime is returned.
index b034cba1d08d69761b3c8f05fdedf395c4702c0a..15a6c7f48eaac7f0a81e5a30bfd74762a426435d 100644 (file)
@@ -105,6 +105,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
         const workerWeight =
           this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
         if (
+          this.isWorkerNodeReady(workerNodeKey) &&
           workerWeight >= this.roundWeights[roundIndex] &&
           this.workerNodeVirtualTaskRunTime < workerWeight
         ) {
@@ -121,18 +122,20 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
   }
 
   private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
-    if (
-      this.roundId === this.roundWeights.length - 1 &&
-      this.workerNodeId === this.pool.workerNodes.length - 1
-    ) {
-      this.roundId = 0
-      this.workerNodeId = 0
-    } else if (this.workerNodeId === this.pool.workerNodes.length - 1) {
-      this.roundId = this.roundId + 1
-      this.workerNodeId = 0
-    } else {
-      this.workerNodeId = this.workerNodeId + 1
-    }
+    do {
+      if (
+        this.roundId === this.roundWeights.length - 1 &&
+        this.workerNodeId === this.pool.workerNodes.length - 1
+      ) {
+        this.roundId = 0
+        this.workerNodeId = 0
+      } else if (this.workerNodeId === this.pool.workerNodes.length - 1) {
+        this.roundId = this.roundId + 1
+        this.workerNodeId = 0
+      } else {
+        this.workerNodeId = this.workerNodeId + 1
+      }
+    } while (!this.isWorkerNodeReady(this.workerNodeId))
   }
 
   /** @inheritDoc */