fix: refine pool statuses handling
[poolifier.git] / src / pools / abstract-pool.ts
index 11cc9cc15ef419346827e5f627150f04e73ba158..b72d8cbcb4c3354294dda16718a6d229e64d04e0 100644 (file)
@@ -10,7 +10,6 @@ import type {
 } from '../utility-types'
 import {
   DEFAULT_TASK_NAME,
-  DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
   average,
   exponentialDelay,
@@ -81,11 +80,6 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public emitter?: EventEmitterAsyncResource
 
-  /**
-   * Dynamic pool maximum size property placeholder.
-   */
-  protected readonly max?: number
-
   /**
    * The task execution response promise map:
    * - `key`: The message id of each submitted task.
@@ -136,22 +130,25 @@ export abstract class AbstractPool<
   /**
    * Constructs a new poolifier pool.
    *
-   * @param numberOfWorkers - Number of workers that this pool should manage.
+   * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
    * @param filePath - Path to the worker file.
    * @param opts - Options for the pool.
+   * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
    */
   public constructor (
-    protected readonly numberOfWorkers: number,
+    protected readonly minimumNumberOfWorkers: number,
     protected readonly filePath: string,
-    protected readonly opts: PoolOptions<Worker>
+    protected readonly opts: PoolOptions<Worker>,
+    protected readonly maximumNumberOfWorkers?: number
   ) {
     if (!this.isMain()) {
       throw new Error(
         'Cannot start a pool from a worker with the same type as the pool'
       )
     }
+    this.checkPoolType()
     checkFilePath(this.filePath)
-    this.checkNumberOfWorkers(this.numberOfWorkers)
+    this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
     this.checkPoolOptions(this.opts)
 
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
@@ -186,20 +183,28 @@ export abstract class AbstractPool<
     this.startTimestamp = performance.now()
   }
 
-  private checkNumberOfWorkers (numberOfWorkers: number): void {
-    if (numberOfWorkers == null) {
+  private checkPoolType (): void {
+    if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+      throw new Error(
+        'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+      )
+    }
+  }
+
+  private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+    if (minimumNumberOfWorkers == null) {
       throw new Error(
         'Cannot instantiate a pool without specifying the number of workers'
       )
-    } else if (!Number.isSafeInteger(numberOfWorkers)) {
+    } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
       throw new TypeError(
         'Cannot instantiate a pool with a non safe integer number of workers'
       )
-    } else if (numberOfWorkers < 0) {
+    } else if (minimumNumberOfWorkers < 0) {
       throw new RangeError(
         'Cannot instantiate a pool with a negative number of workers'
       )
-    } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+    } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
       throw new RangeError('Cannot instantiate a fixed pool with zero worker')
     }
   }
@@ -215,9 +220,8 @@ export abstract class AbstractPool<
       this.checkValidWorkerChoiceStrategyOptions(
         opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
       )
-      this.opts.workerChoiceStrategyOptions = {
-        ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
-        ...opts.workerChoiceStrategyOptions
+      if (opts.workerChoiceStrategyOptions != null) {
+        this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
       }
       this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
       this.opts.enableEvents = opts.enableEvents ?? true
@@ -244,25 +248,10 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
-    if (
-      workerChoiceStrategyOptions?.retries != null &&
-      !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
-    ) {
-      throw new TypeError(
-        'Invalid worker choice strategy options: retries must be an integer'
-      )
-    }
-    if (
-      workerChoiceStrategyOptions?.retries != null &&
-      workerChoiceStrategyOptions.retries < 0
-    ) {
-      throw new RangeError(
-        `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
-      )
-    }
     if (
       workerChoiceStrategyOptions?.weights != null &&
-      Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+      Object.keys(workerChoiceStrategyOptions.weights).length !==
+        (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
     ) {
       throw new Error(
         'Invalid worker choice strategy options: must have a weight for each worker node'
@@ -295,11 +284,11 @@ export abstract class AbstractPool<
       started: this.started,
       ready: this.ready,
       strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
-      minSize: this.minSize,
-      maxSize: this.maxSize,
-      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+      minSize: this.minimumNumberOfWorkers,
+      maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
         .runTime.aggregate &&
-        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+        this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
           .waitTime.aggregate && { utilization: round(this.utilization) }),
       workerNodes: this.workerNodes.length,
       idleWorkerNodes: this.workerNodes.reduce(
@@ -309,6 +298,13 @@ export abstract class AbstractPool<
             : accumulator,
         0
       ),
+      ...(this.opts.enableTasksQueue === true && {
+        stealingWorkerNodes: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            workerNode.info.stealing ? accumulator + 1 : accumulator,
+          0
+        )
+      }),
       busyWorkerNodes: this.workerNodes.reduce(
         (accumulator, _workerNode, workerNodeKey) =>
           this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
@@ -353,7 +349,7 @@ export abstract class AbstractPool<
           accumulator + workerNode.usage.tasks.failed,
         0
       ),
-      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
         .runTime.aggregate && {
         runTime: {
           minimum: round(
@@ -370,7 +366,7 @@ export abstract class AbstractPool<
               )
             )
           ),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
             .runTime.average && {
             average: round(
               average(
@@ -382,7 +378,7 @@ export abstract class AbstractPool<
               )
             )
           }),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
             .runTime.median && {
             median: round(
               median(
@@ -396,7 +392,7 @@ export abstract class AbstractPool<
           })
         }
       }),
-      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+      ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
         .waitTime.aggregate && {
         waitTime: {
           minimum: round(
@@ -413,7 +409,7 @@ export abstract class AbstractPool<
               )
             )
           ),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
             .waitTime.average && {
             average: round(
               average(
@@ -425,7 +421,7 @@ export abstract class AbstractPool<
               )
             )
           }),
-          ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
             .waitTime.median && {
             median: round(
               median(
@@ -453,7 +449,7 @@ export abstract class AbstractPool<
             ? accumulator + 1
             : accumulator,
         0
-      ) >= this.minSize
+      ) >= this.minimumNumberOfWorkers
     )
   }
 
@@ -464,7 +460,8 @@ export abstract class AbstractPool<
    */
   private get utilization (): number {
     const poolTimeCapacity =
-      (performance.now() - this.startTimestamp) * this.maxSize
+      (performance.now() - this.startTimestamp) *
+      (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
     const totalTasksRunTime = this.workerNodes.reduce(
       (accumulator, workerNode) =>
         accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
@@ -490,20 +487,6 @@ export abstract class AbstractPool<
    */
   protected abstract get worker (): WorkerType
 
-  /**
-   * The pool minimum size.
-   */
-  protected get minSize (): number {
-    return this.numberOfWorkers
-  }
-
-  /**
-   * The pool maximum size.
-   */
-  protected get maxSize (): number {
-    return this.max ?? this.numberOfWorkers
-  }
-
   /**
    * Checks if the worker id sent in the received message from a worker is valid.
    *
@@ -556,11 +539,11 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = {
-      ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
-      ...workerChoiceStrategyOptions
+    if (workerChoiceStrategyOptions != null) {
+      this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
     }
     this.workerChoiceStrategyContext.setOptions(
+      this,
       this.opts.workerChoiceStrategyOptions
     )
   }
@@ -607,7 +590,9 @@ export abstract class AbstractPool<
     tasksQueueOptions: TasksQueueOptions
   ): TasksQueueOptions {
     return {
-      ...getDefaultTasksQueueOptions(this.maxSize),
+      ...getDefaultTasksQueueOptions(
+        this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+      ),
       ...tasksQueueOptions
     }
   }
@@ -660,7 +645,10 @@ export abstract class AbstractPool<
    * The pool filling boolean status.
    */
   protected get full (): boolean {
-    return this.workerNodes.length >= this.maxSize
+    return (
+      this.workerNodes.length >=
+      (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+    )
   }
 
   /**
@@ -968,7 +956,7 @@ export abstract class AbstractPool<
         (accumulator, workerNode) =>
           !workerNode.info.dynamic ? accumulator + 1 : accumulator,
         0
-      ) < this.numberOfWorkers
+      ) < this.minimumNumberOfWorkers
     ) {
       this.createAndSetupWorkerNode()
     }
@@ -1001,12 +989,10 @@ export abstract class AbstractPool<
     this.started = false
   }
 
-  protected async sendKillMessageToWorker (
-    workerNodeKey: number
-  ): Promise<void> {
+  private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
     await new Promise<void>((resolve, reject) => {
-      if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
-        reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
+      if (this.workerNodes?.[workerNodeKey] == null) {
+        resolve()
         return
       }
       const killMessageListener = (message: MessageValue<Response>): void => {
@@ -1043,7 +1029,9 @@ export abstract class AbstractPool<
       'taskFinished',
       flushedTasks,
       this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
-        getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+        getDefaultTasksQueueOptions(
+          this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+        ).tasksFinishedTimeout
     )
     await this.sendKillMessageToWorker(workerNodeKey)
     await workerNode.terminate()
@@ -1197,9 +1185,7 @@ export abstract class AbstractPool<
    *
    * @returns Whether to create a dynamic worker or not.
    */
-  private shallCreateDynamicWorker (): boolean {
-    return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
-  }
+  protected abstract shallCreateDynamicWorker (): boolean
 
   /**
    * Sends a message to worker given its worker node key.
@@ -1238,7 +1224,6 @@ export abstract class AbstractPool<
       this.emitter?.emit(PoolEvents.error, error)
       if (
         this.started &&
-        !this.starting &&
         !this.destroying &&
         this.opts.restartWorkerOnError === true
       ) {
@@ -1248,7 +1233,11 @@ export abstract class AbstractPool<
           this.createAndSetupWorkerNode()
         }
       }
-      if (this.started && this.opts.enableTasksQueue === true) {
+      if (
+        this.started &&
+        !this.destroying &&
+        this.opts.enableTasksQueue === true
+      ) {
         this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
       }
       workerNode?.terminate().catch(error => {
@@ -1418,6 +1407,10 @@ export abstract class AbstractPool<
     })
   }
 
+  private cannotStealTask (): boolean {
+    return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
+  }
+
   private handleTask (workerNodeKey: number, task: Task<Data>): void {
     if (this.shallExecuteTask(workerNodeKey)) {
       this.executeTask(workerNodeKey, task)
@@ -1430,7 +1423,7 @@ export abstract class AbstractPool<
     if (workerNodeKey === -1) {
       return
     }
-    if (this.workerNodes.length <= 1) {
+    if (this.cannotStealTask()) {
       return
     }
     while (this.tasksQueueSize(workerNodeKey) > 0) {
@@ -1524,15 +1517,22 @@ export abstract class AbstractPool<
     eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
-    if (this.workerNodes.length <= 1) {
-      return
-    }
     const { workerNodeKey } = eventDetail
     if (workerNodeKey == null) {
       throw new Error(
-        'WorkerNode event detail workerNodeKey attribute must be defined'
+        'WorkerNode event detail workerNodeKey property must be defined'
       )
     }
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes as number) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
+      if (previousStolenTask != null) {
+        this.getWorkerInfo(workerNodeKey).stealing = false
+      }
+      return
+    }
     const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
     if (
       previousStolenTask != null &&
@@ -1540,6 +1540,7 @@ export abstract class AbstractPool<
       (workerNodeTasksUsage.executing > 0 ||
         this.tasksQueueSize(workerNodeKey) > 0)
     ) {
+      this.getWorkerInfo(workerNodeKey).stealing = false
       for (const taskName of this.workerNodes[workerNodeKey].info
         .taskFunctionNames as string[]) {
         this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
@@ -1550,6 +1551,7 @@ export abstract class AbstractPool<
       this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       return
     }
+    this.getWorkerInfo(workerNodeKey).stealing = true
     const stolenTask = this.workerNodeStealTask(workerNodeKey)
     if (
       this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
@@ -1596,6 +1598,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       (sourceWorkerNode, sourceWorkerNodeKey) =>
         sourceWorkerNode.info.ready &&
+        !sourceWorkerNode.info.stealing &&
         sourceWorkerNodeKey !== workerNodeKey &&
         sourceWorkerNode.usage.tasks.queued > 0
     )
@@ -1614,7 +1617,11 @@ export abstract class AbstractPool<
   private readonly handleBackPressureEvent = (
     eventDetail: WorkerNodeEventDetail
   ): void => {
-    if (this.workerNodes.length <= 1) {
+    if (
+      this.cannotStealTask() ||
+      (this.info.stealingWorkerNodes as number) >
+        Math.floor(this.workerNodes.length / 2)
+    ) {
       return
     }
     const { workerId } = eventDetail
@@ -1634,16 +1641,19 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
+        !workerNode.info.stealing &&
         workerNode.info.id !== workerId &&
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
+        this.getWorkerInfo(workerNodeKey).stealing = true
         const task = sourceWorkerNode.popTask() as Task<Data>
         this.handleTask(workerNodeKey, task)
         this.updateTaskStolenStatisticsWorkerUsage(
           workerNodeKey,
           task.name as string
         )
+        this.getWorkerInfo(workerNodeKey).stealing = false
       }
     }
   }
@@ -1748,13 +1758,10 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkAndEmitDynamicWorkerCreationEvents (): void {
-    if (this.type === PoolTypes.dynamic) {
-      if (this.full) {
-        this.emitter?.emit(PoolEvents.full, this.info)
-      }
-    }
-  }
+  /**
+   * Emits dynamic worker creation events.
+   */
+  protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
 
   /**
    * Gets the worker information given its worker node key.
@@ -1780,7 +1787,9 @@ export abstract class AbstractPool<
         workerOptions: this.opts.workerOptions,
         tasksQueueBackPressureSize:
           this.opts.tasksQueueOptions?.size ??
-          getDefaultTasksQueueOptions(this.maxSize).size
+          getDefaultTasksQueueOptions(
+            this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+          ).size
       }
     )
     // Flag the worker node as ready at pool startup.