fix: fix task stealing related options handling at runtime
[poolifier.git] / src / pools / abstract-pool.ts
index 9aa6ec3dfeab90f8ba462920290f8a7b74b91f80..eb4c415b4a21be72446bb2da22db83ea9891acb7 100644 (file)
@@ -630,6 +630,8 @@ export abstract class AbstractPool<
     tasksQueueOptions?: TasksQueueOptions
   ): void {
     if (this.opts.enableTasksQueue === true && !enable) {
+      this.unsetTaskStealing()
+      this.unsetTasksStealingOnBackPressure()
       this.flushTasksQueues()
     }
     this.opts.enableTasksQueue = enable
@@ -643,6 +645,16 @@ export abstract class AbstractPool<
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
       this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+      if (this.opts.tasksQueueOptions.taskStealing === true) {
+        this.setTaskStealing()
+      } else {
+        this.unsetTaskStealing()
+      }
+      if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+        this.setTasksStealingOnBackPressure()
+      } else {
+        this.unsetTasksStealingOnBackPressure()
+      }
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
@@ -654,6 +666,32 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
+    }
+  }
+
+  private unsetTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onEmptyQueue
+    }
+  }
+
+  private setTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onBackPressure =
+        this.tasksStealingOnBackPressure.bind(this)
+    }
+  }
+
+  private unsetTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onBackPressure
+    }
+  }
+
   private buildTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {