Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
index 6761882589bb14bd9d205c33684ac7e7b3ab5128..0cb4c30ce14ca40b3c2784ffa9cab3e65de0004b 100644 (file)
@@ -640,6 +640,8 @@ export abstract class AbstractPool<
     tasksQueueOptions?: TasksQueueOptions
   ): void {
     if (this.opts.enableTasksQueue === true && !enable) {
+      this.unsetTaskStealing()
+      this.unsetTasksStealingOnBackPressure()
       this.flushTasksQueues()
     }
     this.opts.enableTasksQueue = enable
@@ -653,6 +655,16 @@ export abstract class AbstractPool<
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
       this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+      if (this.opts.tasksQueueOptions.taskStealing === true) {
+        this.setTaskStealing()
+      } else {
+        this.unsetTaskStealing()
+      }
+      if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+        this.setTasksStealingOnBackPressure()
+      } else {
+        this.unsetTasksStealingOnBackPressure()
+      }
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
@@ -664,6 +676,32 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
+    }
+  }
+
+  private unsetTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onEmptyQueue
+    }
+  }
+
+  private setTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onBackPressure =
+        this.tasksStealingOnBackPressure.bind(this)
+    }
+  }
+
+  private unsetTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onBackPressure
+    }
+  }
+
   private buildTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {