fix: ensure worker node cannot be instantiaed without proper arguments
[poolifier.git] / src / pools / abstract-pool.ts
index e7c27cdae8a881392aa3cb4c51e8ab0386365f99..183344f84b42a2a55411788d1d414fb82383f9dc 100644 (file)
@@ -84,6 +84,11 @@ export abstract class AbstractPool<
   Response
   >
 
+  /**
+   * Dynamic pool maximum size property placeholder.
+   */
+  protected readonly max?: number
+
   /**
    * Whether the pool is starting or not.
    */
@@ -117,8 +122,6 @@ export abstract class AbstractPool<
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
-    this.dequeueTask = this.dequeueTask.bind(this)
-    this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
       this.emitter = new PoolEmitter()
@@ -204,9 +207,10 @@ export abstract class AbstractPool<
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-      this.opts.workerChoiceStrategyOptions =
-        opts.workerChoiceStrategyOptions ??
-        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+      this.opts.workerChoiceStrategyOptions = {
+        ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+        ...opts.workerChoiceStrategyOptions
+      }
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
@@ -244,6 +248,22 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+    ) {
+      throw new TypeError(
+        'Invalid worker choice strategy options: choice retries must be an integer'
+      )
+    }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      workerChoiceStrategyOptions.choiceRetries <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+      )
+    }
     if (
       workerChoiceStrategyOptions.weights != null &&
       Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
@@ -283,7 +303,7 @@ export abstract class AbstractPool<
       tasksQueueOptions.concurrency <= 0
     ) {
       throw new Error(
-        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
+        `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
       )
     }
   }
@@ -351,6 +371,9 @@ export abstract class AbstractPool<
           0
         )
       }),
+      ...(this.opts.enableTasksQueue === true && {
+        backPressure: this.hasBackPressure()
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -492,12 +515,16 @@ export abstract class AbstractPool<
   /**
    * The pool minimum size.
    */
-  protected abstract get minSize (): number
+  protected get minSize (): number {
+    return this.numberOfWorkers
+  }
 
   /**
    * The pool maximum size.
    */
-  protected abstract get maxSize (): number
+  protected get maxSize (): number {
+    return this.max ?? this.numberOfWorkers
+  }
 
   /**
    * Checks if the worker id sent in the received message from a worker is valid.
@@ -566,7 +593,10 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.opts.workerChoiceStrategyOptions = {
+      ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+      ...workerChoiceStrategyOptions
+    }
     this.workerChoiceStrategyContext.setOptions(
       this.opts.workerChoiceStrategyOptions
     )
@@ -646,14 +676,15 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (
-      Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
-      (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
-    ) {
-      return this.getWorkerInfo(0).taskFunctions as string[]
-    } else {
-      return []
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctions) &&
+        workerNode.info.taskFunctions.length > 0
+      ) {
+        return workerNode.info.taskFunctions
+      }
     }
+    return []
   }
 
   /** @inheritDoc */
@@ -678,12 +709,11 @@ export abstract class AbstractPool<
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
       if (
         name != null &&
-        Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
-        !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
-          name
-        )
+        Array.isArray(workerInfo.taskFunctions) &&
+        !workerInfo.taskFunctions.includes(name)
       ) {
         reject(
           new Error(`Task function '${name}' is not registered in the pool`)
@@ -695,7 +725,7 @@ export abstract class AbstractPool<
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
+        workerId: workerInfo.id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -713,7 +743,6 @@ export abstract class AbstractPool<
       } else {
         this.enqueueTask(workerNodeKey, task)
       }
-      this.checkAndEmitEvents()
     })
   }
 
@@ -724,7 +753,7 @@ export abstract class AbstractPool<
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
-    this.emitter?.emit(PoolEvents.destroy)
+    this.emitter?.emit(PoolEvents.destroy, this.info)
   }
 
   protected async sendKillMessageToWorker (
@@ -779,12 +808,12 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
-      const taskWorkerUsage = this.workerNodes[
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
-      ++taskWorkerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+      ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+      ++taskFunctionWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
     }
   }
 
@@ -803,22 +832,29 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
-      const taskWorkerUsage = this.workerNodes[
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-      ].getTaskWorkerUsage(
+      ].getTaskFunctionWorkerUsage(
         message.taskPerformance?.name ?? DEFAULT_TASK_NAME
       ) as WorkerUsage
-      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-      this.updateEluWorkerUsage(taskWorkerUsage, message)
+      this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
     }
   }
 
-  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+  /**
+   * Whether the worker node shall update its task function worker usage or not.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
+   */
+  private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
-      Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
-      (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1
+      Array.isArray(workerInfo.taskFunctions) &&
+      workerInfo.taskFunctions.length > 2
     )
   }
 
@@ -827,7 +863,19 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     const workerTaskStatistics = workerUsage.tasks
-    --workerTaskStatistics.executing
+    if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing > 0
+    ) {
+      --workerTaskStatistics.executing
+    } else if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing < 0
+    ) {
+      throw new Error(
+        'Worker usage statistic for tasks executing cannot be negative'
+      )
+    }
     if (message.taskError == null) {
       ++workerTaskStatistics.executed
     } else {
@@ -1017,6 +1065,7 @@ export abstract class AbstractPool<
     if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
       workerInfo.ready = true
     }
+    this.checkAndEmitDynamicWorkerCreationEvents()
     return workerNodeKey
   }
 
@@ -1125,7 +1174,7 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return (message) => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null) {
+      if (message.ready != null && message.taskFunctions != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
@@ -1144,9 +1193,11 @@ export abstract class AbstractPool<
     if (message.ready === false) {
       throw new Error(`Worker ${message.workerId} failed to initialize`)
     }
-    this.getWorkerInfo(
+    const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    ).ready = message.ready as boolean
+    )
+    workerInfo.ready = message.ready as boolean
+    workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
@@ -1180,13 +1231,22 @@ export abstract class AbstractPool<
     }
   }
 
-  private checkAndEmitEvents (): void {
-    if (this.emitter != null) {
-      if (this.busy) {
-        this.emitter.emit(PoolEvents.busy, this.info)
-      }
-      if (this.type === PoolTypes.dynamic && this.full) {
-        this.emitter.emit(PoolEvents.full, this.info)
+  private checkAndEmitTaskExecutionEvents (): void {
+    if (this.busy) {
+      this.emitter?.emit(PoolEvents.busy, this.info)
+    }
+  }
+
+  private checkAndEmitTaskQueuingEvents (): void {
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
+    }
+  }
+
+  private checkAndEmitDynamicWorkerCreationEvents (): void {
+    if (this.type === PoolTypes.dynamic) {
+      if (this.full) {
+        this.emitter?.emit(PoolEvents.full, this.info)
       }
     }
   }
@@ -1209,7 +1269,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1217,7 +1281,7 @@ export abstract class AbstractPool<
     this.workerNodes.push(workerNode)
     const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
     if (workerNodeKey === -1) {
-      throw new Error('Worker node not found')
+      throw new Error('Worker node added not found')
     }
     return workerNodeKey
   }
@@ -1235,6 +1299,23 @@ export abstract class AbstractPool<
     }
   }
 
+  /** @inheritDoc */
+  public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    )
+  }
+
+  private hasBackPressure (): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes.findIndex(
+        (workerNode) => !workerNode.hasBackPressure()
+      ) === -1
+    )
+  }
+
   /**
    * Executes the given task on the worker given its worker node key.
    *
@@ -1244,10 +1325,13 @@ export abstract class AbstractPool<
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
     this.beforeTaskExecutionHook(workerNodeKey, task)
     this.sendToWorker(workerNodeKey, task, task.transferList)
+    this.checkAndEmitTaskExecutionEvents()
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    this.checkAndEmitTaskQueuingEvents()
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {