fix: fix task stealing related options handling at runtime
[poolifier.git] / src / pools / abstract-pool.ts
index d3540098d4ca192ed0e93b6a491358435572c743..eb4c415b4a21be72446bb2da22db83ea9891acb7 100644 (file)
@@ -630,6 +630,8 @@ export abstract class AbstractPool<
     tasksQueueOptions?: TasksQueueOptions
   ): void {
     if (this.opts.enableTasksQueue === true && !enable) {
+      this.unsetTaskStealing()
+      this.unsetTasksStealingOnBackPressure()
       this.flushTasksQueues()
     }
     this.opts.enableTasksQueue = enable
@@ -643,6 +645,16 @@ export abstract class AbstractPool<
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
       this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+      if (this.opts.tasksQueueOptions.taskStealing === true) {
+        this.setTaskStealing()
+      } else {
+        this.unsetTaskStealing()
+      }
+      if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+        this.setTasksStealingOnBackPressure()
+      } else {
+        this.unsetTasksStealingOnBackPressure()
+      }
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
@@ -654,6 +666,32 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
+    }
+  }
+
+  private unsetTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onEmptyQueue
+    }
+  }
+
+  private setTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onBackPressure =
+        this.tasksStealingOnBackPressure.bind(this)
+    }
+  }
+
+  private unsetTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onBackPressure
+    }
+  }
+
   private buildTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
@@ -699,14 +737,13 @@ export abstract class AbstractPool<
               (this.opts.tasksQueueOptions?.concurrency as number)
         ) === -1
       )
-    } else {
-      return (
-        this.workerNodes.findIndex(
-          workerNode =>
-            workerNode.info.ready && workerNode.usage.tasks.executing === 0
-        ) === -1
-      )
     }
+    return (
+      this.workerNodes.findIndex(
+        workerNode =>
+          workerNode.info.ready && workerNode.usage.tasks.executing === 0
+      ) === -1
+    )
   }
 
   /** @inheritDoc */