fix: fix task stealing related options handling at runtime
[poolifier.git] / src / pools / abstract-pool.ts
index fea2e3a1f4cec5b5c8f6187515f707524dc41dfe..eb4c415b4a21be72446bb2da22db83ea9891acb7 100644 (file)
@@ -68,8 +68,7 @@ export abstract class AbstractPool<
   public readonly emitter?: PoolEmitter
 
   /**
-   * The task execution response promise map.
-   *
+   * The task execution response promise map:
    * - `key`: The message id of each submitted task.
    * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
    *
@@ -92,14 +91,14 @@ export abstract class AbstractPool<
    */
   protected readonly max?: number
 
-  /**
-   * Whether the pool is starting or not.
-   */
-  private readonly starting: boolean
   /**
    * Whether the pool is started or not.
    */
   private started: boolean
+  /**
+   * Whether the pool is starting or not.
+   */
+  private starting: boolean
   /**
    * The start timestamp of the pool.
    */
@@ -145,10 +144,11 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
-    this.starting = true
-    this.startPool()
+    this.started = false
     this.starting = false
-    this.started = true
+    if (this.opts.startWorkers === true) {
+      this.start()
+    }
 
     this.startTimestamp = performance.now()
   }
@@ -212,6 +212,7 @@ export abstract class AbstractPool<
 
   private checkPoolOptions (opts: PoolOptions<Worker>): void {
     if (isPlainObject(opts)) {
+      this.opts.startWorkers = opts.startWorkers ?? true
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
@@ -314,11 +315,6 @@ export abstract class AbstractPool<
         `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
       )
     }
-    if (tasksQueueOptions?.queueMaxSize != null) {
-      throw new Error(
-        'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
-      )
-    }
     if (
       tasksQueueOptions?.size != null &&
       !Number.isSafeInteger(tasksQueueOptions?.size)
@@ -334,24 +330,13 @@ export abstract class AbstractPool<
     }
   }
 
-  private startPool (): void {
-    while (
-      this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
-        0
-      ) < this.numberOfWorkers
-    ) {
-      this.createAndSetupWorkerNode()
-    }
-  }
-
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
       version,
       type: this.type,
       worker: this.worker,
+      started: this.started,
       ready: this.ready,
       strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
       minSize: this.minSize,
@@ -645,6 +630,8 @@ export abstract class AbstractPool<
     tasksQueueOptions?: TasksQueueOptions
   ): void {
     if (this.opts.enableTasksQueue === true && !enable) {
+      this.unsetTaskStealing()
+      this.unsetTasksStealingOnBackPressure()
       this.flushTasksQueues()
     }
     this.opts.enableTasksQueue = enable
@@ -658,6 +645,16 @@ export abstract class AbstractPool<
       this.opts.tasksQueueOptions =
         this.buildTasksQueueOptions(tasksQueueOptions)
       this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
+      if (this.opts.tasksQueueOptions.taskStealing === true) {
+        this.setTaskStealing()
+      } else {
+        this.unsetTaskStealing()
+      }
+      if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+        this.setTasksStealingOnBackPressure()
+      } else {
+        this.unsetTasksStealingOnBackPressure()
+      }
     } else if (this.opts.tasksQueueOptions != null) {
       delete this.opts.tasksQueueOptions
     }
@@ -669,13 +666,41 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onEmptyQueue =
+        this.taskStealingOnEmptyQueue.bind(this)
+    }
+  }
+
+  private unsetTaskStealing (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onEmptyQueue
+    }
+  }
+
+  private setTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.workerNodes[workerNodeKey].onBackPressure =
+        this.tasksStealingOnBackPressure.bind(this)
+    }
+  }
+
+  private unsetTasksStealingOnBackPressure (): void {
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      delete this.workerNodes[workerNodeKey].onBackPressure
+    }
+  }
+
   private buildTasksQueueOptions (
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
     return {
       ...{
         size: Math.pow(this.maxSize, 2),
-        concurrency: 1
+        concurrency: 1,
+        taskStealing: true,
+        tasksStealingOnBackPressure: true
       },
       ...tasksQueueOptions
     }
@@ -712,14 +737,13 @@ export abstract class AbstractPool<
               (this.opts.tasksQueueOptions?.concurrency as number)
         ) === -1
       )
-    } else {
-      return (
-        this.workerNodes.findIndex(
-          workerNode =>
-            workerNode.info.ready && workerNode.usage.tasks.executing === 0
-        ) === -1
-      )
     }
+    return (
+      this.workerNodes.findIndex(
+        workerNode =>
+          workerNode.info.ready && workerNode.usage.tasks.executing === 0
+      ) === -1
+    )
   }
 
   /** @inheritDoc */
@@ -751,7 +775,7 @@ export abstract class AbstractPool<
   ): Promise<Response> {
     return await new Promise<Response>((resolve, reject) => {
       if (!this.started) {
-        reject(new Error('Cannot execute a task on destroyed pool'))
+        reject(new Error('Cannot execute a task on not started pool'))
         return
       }
       if (name != null && typeof name !== 'string') {
@@ -772,14 +796,13 @@ export abstract class AbstractPool<
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: workerInfo.id as number,
+        workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -799,6 +822,22 @@ export abstract class AbstractPool<
     })
   }
 
+  /** @inheritdoc */
+  public start (): void {
+    this.starting = true
+    while (
+      this.workerNodes.reduce(
+        (accumulator, workerNode) =>
+          !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+        0
+      ) < this.numberOfWorkers
+    ) {
+      this.createAndSetupWorkerNode()
+    }
+    this.starting = false
+    this.started = true
+  }
+
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
@@ -1072,8 +1111,8 @@ export abstract class AbstractPool<
       this.emitter?.emit(PoolEvents.error, error)
       if (
         this.opts.restartWorkerOnError === true &&
-        !this.starting &&
-        this.started
+        this.started &&
+        !this.starting
       ) {
         if (workerInfo.dynamic) {
           this.createAndSetupDynamicWorkerNode()
@@ -1167,10 +1206,14 @@ export abstract class AbstractPool<
     // Send the statistics message to worker.
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
-      this.workerNodes[workerNodeKey].onEmptyQueue =
-        this.taskStealingOnEmptyQueue.bind(this)
-      this.workerNodes[workerNodeKey].onBackPressure =
-        this.tasksStealingOnBackPressure.bind(this)
+      if (this.opts.tasksQueueOptions?.taskStealing === true) {
+        this.workerNodes[workerNodeKey].onEmptyQueue =
+          this.taskStealingOnEmptyQueue.bind(this)
+      }
+      if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+        this.workerNodes[workerNodeKey].onBackPressure =
+          this.tasksStealingOnBackPressure.bind(this)
+      }
     }
   }