fix: ensure worker node cannot be instantiaed without proper arguments
[poolifier.git] / src / pools / abstract-pool.ts
index eb2c2f7818a687ac8f8016eb27489895cadfaaee..183344f84b42a2a55411788d1d414fb82383f9dc 100644 (file)
@@ -84,6 +84,11 @@ export abstract class AbstractPool<
   Response
   >
 
+  /**
+   * Dynamic pool maximum size property placeholder.
+   */
+  protected readonly max?: number
+
   /**
    * Whether the pool is starting or not.
    */
@@ -117,8 +122,6 @@ export abstract class AbstractPool<
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
-    this.dequeueTask = this.dequeueTask.bind(this)
-    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
       this.emitter = new PoolEmitter()
@@ -204,9 +207,10 @@ export abstract class AbstractPool<
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-      this.opts.workerChoiceStrategyOptions =
-        opts.workerChoiceStrategyOptions ??
-        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+      this.opts.workerChoiceStrategyOptions = {
+        ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+        ...opts.workerChoiceStrategyOptions
+      }
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
@@ -244,6 +248,22 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+    ) {
+      throw new TypeError(
+        'Invalid worker choice strategy options: choice retries must be an integer'
+      )
+    }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      workerChoiceStrategyOptions.choiceRetries <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+      )
+    }
     if (
       workerChoiceStrategyOptions.weights != null &&
       Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
@@ -283,7 +303,7 @@ export abstract class AbstractPool<
       tasksQueueOptions.concurrency <= 0
     ) {
       throw new Error(
-        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
+        `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
       )
     }
   }
@@ -351,6 +371,9 @@ export abstract class AbstractPool<
           0
         )
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        backPressure: this.hasBackPressure()
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -492,12 +515,16 @@ export abstract class AbstractPool<
   /**
    * The pool minimum size.
    */
-  protected abstract get minSize (): number
+  protected get minSize (): number {
+    return this.numberOfWorkers
+  }
 
   /**
    * The pool maximum size.
    */
-  protected abstract get maxSize (): number
+  protected get maxSize (): number {
+    return this.max ?? this.numberOfWorkers
+  }
 
   /**
    * Checks if the worker id sent in the received message from a worker is valid.
@@ -566,7 +593,10 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.opts.workerChoiceStrategyOptions = {
+      ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+      ...workerChoiceStrategyOptions
+    }
     this.workerChoiceStrategyContext.setOptions(
       this.opts.workerChoiceStrategyOptions
     )
@@ -713,7 +743,6 @@ export abstract class AbstractPool<
       } else {
         this.enqueueTask(workerNodeKey, task)
       }
-      this.checkAndEmitEvents()
     })
   }
 
@@ -724,7 +753,7 @@ export abstract class AbstractPool<
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
-    this.emitter?.emit(PoolEvents.destroy)
+    this.emitter?.emit(PoolEvents.destroy, this.info)
   }
 
   protected async sendKillMessageToWorker (
@@ -779,12 +808,12 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
-      const taskWorkerUsage = this.workerNodes[
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
-      ++taskWorkerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+      ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+      ++taskFunctionWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
     }
   }
 
@@ -803,23 +832,29 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
-      const taskWorkerUsage = this.workerNodes[
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-      ].getTaskWorkerUsage(
+      ].getTaskFunctionWorkerUsage(
         message.taskPerformance?.name ?? DEFAULT_TASK_NAME
       ) as WorkerUsage
-      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-      this.updateEluWorkerUsage(taskWorkerUsage, message)
+      this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
     }
   }
 
-  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+  /**
+   * Whether the worker node shall update its task function worker usage or not.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
+   */
+  private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
       Array.isArray(workerInfo.taskFunctions) &&
-      workerInfo.taskFunctions.length > 1
+      workerInfo.taskFunctions.length > 2
     )
   }
 
@@ -828,7 +863,19 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     const workerTaskStatistics = workerUsage.tasks
-    --workerTaskStatistics.executing
+    if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing > 0
+    ) {
+      --workerTaskStatistics.executing
+    } else if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing < 0
+    ) {
+      throw new Error(
+        'Worker usage statistic for tasks executing cannot be negative'
+      )
+    }
     if (message.taskError == null) {
       ++workerTaskStatistics.executed
     } else {
@@ -1018,6 +1065,7 @@ export abstract class AbstractPool<
     if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
       workerInfo.ready = true
     }
+    this.checkAndEmitDynamicWorkerCreationEvents()
     return workerNodeKey
   }
 
@@ -1183,13 +1231,22 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkAndEmitEvents (): void {
-    if (this.emitter != null) {
-      if (this.busy) {
-        this.emitter.emit(PoolEvents.busy, this.info)
-      }
-      if (this.type === PoolTypes.dynamic && this.full) {
-        this.emitter.emit(PoolEvents.full, this.info)
+  private checkAndEmitTaskExecutionEvents (): void {
+    if (this.busy) {
+      this.emitter?.emit(PoolEvents.busy, this.info)
+    }
+  }
+
+  private checkAndEmitTaskQueuingEvents (): void {
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
+    }
+  }
+
+  private checkAndEmitDynamicWorkerCreationEvents (): void {
+    if (this.type === PoolTypes.dynamic) {
+      if (this.full) {
+        this.emitter?.emit(PoolEvents.full, this.info)
       }
     }
   }
@@ -1242,6 +1299,23 @@ export abstract class AbstractPool<
     }
   }
 
+  /** @inheritDoc */
+  public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    )
+  }
+
+  private hasBackPressure (): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes.findIndex(
+        (workerNode) => !workerNode.hasBackPressure()
+      ) === -1
+    )
+  }
+
   /**
    * Executes the given task on the worker given its worker node key.
    *
@@ -1251,19 +1325,13 @@ export abstract class AbstractPool<
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(workerNodeKey, task, task.transferList)
+    this.checkAndEmitTaskExecutionEvents()
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    if (
-      this.opts.enableTasksQueue === true &&
-      this.workerNodes[workerNodeKey].hasBackPressure()
-    ) {
-      this.emitter?.emit(PoolEvents.backPressure, {
-        workerId: this.getWorkerInfo(workerNodeKey).id,
-        ...this.info
-      })
-    }
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    this.checkAndEmitTaskQueuingEvents()
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {