fix: refine pool statuses handling
[poolifier.git] / src / pools / abstract-pool.ts
index 56bd0348829590429dc465944fef22b9f28c4b4c..b72d8cbcb4c3354294dda16718a6d229e64d04e0 100644 (file)
@@ -130,10 +130,10 @@ export abstract class AbstractPool<
   /**
    * Constructs a new poolifier pool.
    *
-   * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage.
+   * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
    * @param filePath - Path to the worker file.
    * @param opts - Options for the pool.
-   * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage.
+   * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
    */
   public constructor (
     protected readonly minimumNumberOfWorkers: number,
@@ -191,20 +191,20 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkMinimumNumberOfWorkers (numberOfWorkers: number): void {
-    if (numberOfWorkers == null) {
+  private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+    if (minimumNumberOfWorkers == null) {
       throw new Error(
         'Cannot instantiate a pool without specifying the number of workers'
       )
-    } else if (!Number.isSafeInteger(numberOfWorkers)) {
+    } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
       throw new TypeError(
         'Cannot instantiate a pool with a non safe integer number of workers'
       )
-    } else if (numberOfWorkers < 0) {
+    } else if (minimumNumberOfWorkers < 0) {
       throw new RangeError(
         'Cannot instantiate a pool with a negative number of workers'
       )
-    } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+    } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
       throw new RangeError('Cannot instantiate a fixed pool with zero worker')
     }
   }
@@ -298,6 +298,13 @@ export abstract class AbstractPool<
             : accumulator,
         0
       ),
+      ...(this.opts.enableTasksQueue === true && {
+        stealingWorkerNodes: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            workerNode.info.stealing ? accumulator + 1 : accumulator,
+          0
+        )
+      }),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, _workerNode, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@ -984,8 +991,8 @@ export abstract class AbstractPool<
 
   private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
     await new Promise<void>((resolve, reject) => {
-      if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
-        reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+      if (this.workerNodes?.[workerNodeKey] == null) {
+        resolve()
         return
       }
       const killMessageListener = (message: MessageValue<Response>): void => {
@@ -1178,9 +1185,7 @@ export abstract class AbstractPool<
    *
    * @returns Whether to create a dynamic worker or not.
    */
-  private shallCreateDynamicWorker (): boolean {
-    return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
-  }
+  protected abstract shallCreateDynamicWorker (): boolean
 
   /**
    * Sends a message to worker given its worker node key.
@@ -1219,7 +1224,6 @@ export abstract class AbstractPool<
       this.emitter?.emit(PoolEvents.error, error)
       if (
         this.started &&
-        !this.starting &&
         !this.destroying &&
         this.opts.restartWorkerOnError === true
       ) {
@@ -1229,7 +1233,11 @@ export abstract class AbstractPool<
           this.createAndSetupWorkerNode()
         }
       }
-      if (this.started && this.opts.enableTasksQueue === true) {
+      if (
+        this.started &&
+        !this.destroying &&
+        this.opts.enableTasksQueue === true
+      ) {
         this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
       workerNode?.terminate().catch(error => {
@@ -1399,6 +1407,10 @@ export abstract class AbstractPool<
     })
   }
 
+  private cannotStealTask (): boolean {
+    return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+  }
+
   private handleTask (workerNodeKey: number, task: Task<Data>): void {
     if (this.shallExecuteTask(workerNodeKey)) {
       this.executeTask(workerNodeKey, task)
@@ -1411,7 +1423,7 @@ export abstract class AbstractPool<
     if (workerNodeKey === -1) {
       return
     }
-    if (this.workerNodes.length <= 1) {
+    if (this.cannotStealTask()) {
       return
     }
     while (this.tasksQueueSize(workerNodeKey) > 0) {
@@ -1505,15 +1517,22 @@ export abstract class AbstractPool<
     eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
-    if (this.workerNodes.length <= 1) {
-      return
-    }
     const { workerNodeKey } = eventDetail
     if (workerNodeKey == null) {
       throw new Error(
-        'WorkerNode event detail workerNodeKey attribute must be defined'
+        'WorkerNode event detail workerNodeKey property must be defined'
       )
     }
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes as number) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
+      if (previousStolenTask != null) {
+        this.getWorkerInfo(workerNodeKey).stealing = false
+      }
+      return
+    }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
       previousStolenTask != null &&
@@ -1521,6 +1540,7 @@ export abstract class AbstractPool<
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
+      this.getWorkerInfo(workerNodeKey).stealing = false
       for (const taskName of this.workerNodes[workerNodeKey].info
         .taskFunctionNames as string[]) {
         this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
@@ -1531,6 +1551,7 @@ export abstract class AbstractPool<
       this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       return
     }
+    this.getWorkerInfo(workerNodeKey).stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1577,6 +1598,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       (sourceWorkerNode, sourceWorkerNodeKey) =>
         sourceWorkerNode.info.ready &&
+        !sourceWorkerNode.info.stealing &&
         sourceWorkerNodeKey !== workerNodeKey &&
         sourceWorkerNode.usage.tasks.queued > 0
     )
@@ -1595,7 +1617,11 @@ export abstract class AbstractPool<
   private readonly handleBackPressureEvent = (
     eventDetail: WorkerNodeEventDetail
   ): void => {
-    if (this.workerNodes.length <= 1) {
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes as number) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
       return
     }
     const { workerId } = eventDetail
@@ -1615,16 +1641,19 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
+        !workerNode.info.stealing &&
         workerNode.info.id !== workerId &&
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
+        this.getWorkerInfo(workerNodeKey).stealing = true
         const task = sourceWorkerNode.popTask() as Task<Data>
         this.handleTask(workerNodeKey, task)
         this.updateTaskStolenStatisticsWorkerUsage(
           workerNodeKey,
           task.name as string
         )
+        this.getWorkerInfo(workerNodeKey).stealing = false
       }
     }
   }
@@ -1729,13 +1758,10 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkAndEmitDynamicWorkerCreationEvents (): void {
-    if (this.type === PoolTypes.dynamic) {
-      if (this.full) {
-        this.emitter?.emit(PoolEvents.full, this.info)
-      }
-    }
-  }
+  /**
+   * Emits dynamic worker creation events.
+   */
+  protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
 
   /**
    * Gets the worker information given its worker node key.