fix: avoid cascading tasks stealing under back pressure
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Aug 2023 08:16:16 +0000 (10:16 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Thu, 24 Aug 2023 08:16:16 +0000 (10:16 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts

index ab21a07781f819151919de0ce36a53bf9e7e638f..2b2a87416377aeaac5a3f089f8425dbd14133c8f 100644 (file)
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixes
+
+- Avoid cascading tasks stealing under back pressure.
+
+### Changed
+
+- Add fastpath to queued tasks rescheduling.
+
 ## [2.6.33] - 2023-08-24
 
 ### Fixed
index 7b0d541d8bcafaaaab1150fa01067c66acea30c2..507fcc10e91e809b13948b85325e413c1ccac621 100644 (file)
@@ -1190,7 +1190,7 @@ export abstract class AbstractPool<
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
-      let destinationWorkerNodeKey: number = workerNodeKey
+      let destinationWorkerNodeKey!: number
       let minQueuedTasks = Infinity
       let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
@@ -1211,15 +1211,17 @@ export abstract class AbstractPool<
           }
         }
       }
-      const task = {
-        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-        workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
-          .id as number
-      }
-      if (executeTask) {
-        this.executeTask(destinationWorkerNodeKey, task)
-      } else {
-        this.enqueueTask(destinationWorkerNodeKey, task)
+      if (destinationWorkerNodeKey != null) {
+        const task = {
+          ...(this.dequeueTask(workerNodeKey) as Task<Data>),
+          workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
+            .id as number
+        }
+        if (executeTask) {
+          this.executeTask(destinationWorkerNodeKey, task)
+        } else {
+          this.enqueueTask(destinationWorkerNodeKey, task)
+        }
       }
     }
   }
@@ -1234,6 +1236,9 @@ export abstract class AbstractPool<
           workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
       )
     for (const sourceWorkerNode of workerNodes) {
+      if (sourceWorkerNode.usage.tasks.queued === 0) {
+        break
+      }
       if (
         sourceWorkerNode.info.ready &&
         sourceWorkerNode.info.id !== workerId &&
@@ -1267,10 +1272,11 @@ export abstract class AbstractPool<
       )
     for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
       if (
+        sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
         workerNode.info.id !== workerId &&
-        sourceWorkerNode.usage.tasks.queued > 0 &&
-        !workerNode.hasBackPressure()
+        workerNode.usage.tasks.queued <
+          (this.opts.tasksQueueOptions?.size as number) - 1
       ) {
         const task = {
           ...(sourceWorkerNode.popTask() as Task<Data>),
index 0fc1e0da3da3e8319cb71e90308e324e64fa82bd..86a7f5ca8f801c6b5e98dc380e9ee5dea80d37e3 100644 (file)
@@ -60,7 +60,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
 
   /** @inheritDoc */
   public choose (): number | undefined {
-    let roundId: number = this.roundId
+    let roundId!: number
     let workerNodeId: number | undefined
     for (
       let roundIndex = this.roundId;