chore: v2.6.29
[poolifier.git] / src / pools / abstract-pool.ts
index 4c08671e5d99ab362f4b0263a423cfe42b4af5e5..dc1fc53f90bfbda6031a56cfd2c92e62867f07ec 100644 (file)
@@ -92,10 +92,6 @@ export abstract class AbstractPool<
    * The start timestamp of the pool.
    */
   private readonly startTimestamp
-  /**
-   * The task function names.
-   */
-  private taskFunctions!: string[]
 
   /**
    * Constructs a new poolifier pool.
@@ -208,9 +204,10 @@ export abstract class AbstractPool<
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-      this.opts.workerChoiceStrategyOptions =
-        opts.workerChoiceStrategyOptions ??
-        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+      this.opts.workerChoiceStrategyOptions = {
+        ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+        ...opts.workerChoiceStrategyOptions
+      }
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
@@ -248,6 +245,22 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+    ) {
+      throw new TypeError(
+        'Invalid worker choice strategy options: choice retries must be an integer'
+      )
+    }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      workerChoiceStrategyOptions.choiceRetries <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+      )
+    }
     if (
       workerChoiceStrategyOptions.weights != null &&
       Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
@@ -287,7 +300,7 @@ export abstract class AbstractPool<
       tasksQueueOptions.concurrency <= 0
     ) {
       throw new Error(
-        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
+        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
       )
     }
   }
@@ -510,7 +523,9 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
    */
   private checkMessageWorkerId (message: MessageValue<Response>): void {
-    if (
+    if (message.workerId == null) {
+      throw new Error('Worker message received without worker id')
+    } else if (
       message.workerId != null &&
       this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
     ) {
@@ -568,7 +583,10 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.opts.workerChoiceStrategyOptions = {
+      ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+      ...workerChoiceStrategyOptions
+    }
     this.workerChoiceStrategyContext.setOptions(
       this.opts.workerChoiceStrategyOptions
     )
@@ -648,11 +666,15 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (this.taskFunctions != null) {
-      return this.taskFunctions
-    } else {
-      return []
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctions) &&
+        workerNode.info.taskFunctions.length > 0
+      ) {
+        return workerNode.info.taskFunctions
+      }
     }
+    return []
   }
 
   /** @inheritDoc */
@@ -672,23 +694,28 @@ export abstract class AbstractPool<
       ) {
         reject(new TypeError('name argument must not be an empty string'))
       }
-      if (name != null && !this.taskFunctions.includes(name)) {
-        reject(
-          new Error(`Task function '${name}' is not registered in the pool`)
-        )
-      }
       if (transferList != null && !Array.isArray(transferList)) {
         reject(new TypeError('transferList argument must be an array'))
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      if (
+        name != null &&
+        Array.isArray(workerInfo.taskFunctions) &&
+        !workerInfo.taskFunctions.includes(name)
+      ) {
+        reject(
+          new Error(`Task function '${name}' is not registered in the pool`)
+        )
+      }
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
+        workerId: workerInfo.id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -717,6 +744,7 @@ export abstract class AbstractPool<
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
+    this.emitter?.emit(PoolEvents.destroy)
   }
 
   protected async sendKillMessageToWorker (
@@ -771,11 +799,13 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      task.name as string
-    ) as WorkerUsage
-    ++taskWorkerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+      ++taskFunctionWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
+    }
   }
 
   /**
@@ -793,12 +823,30 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      message.taskPerformance?.name ?? DEFAULT_TASK_NAME
-    ) as WorkerUsage
-    this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-    this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-    this.updateEluWorkerUsage(taskWorkerUsage, message)
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskFunctionWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) as WorkerUsage
+      this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
+    }
+  }
+
+  /**
+   * Whether the worker node shall update its task function worker usage or not.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
+   */
+  private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    return (
+      Array.isArray(workerInfo.taskFunctions) &&
+      workerInfo.taskFunctions.length > 2
+    )
   }
 
   private updateTaskStatisticsWorkerUsage (
@@ -806,7 +854,19 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void {
     const workerTaskStatistics = workerUsage.tasks
-    --workerTaskStatistics.executing
+    if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing > 0
+    ) {
+      --workerTaskStatistics.executing
+    } else if (
+      workerTaskStatistics.executing != null &&
+      workerTaskStatistics.executing < 0
+    ) {
+      throw new Error(
+        'Worker usage statistic for tasks executing cannot be negative'
+      )
+    }
     if (message.taskError == null) {
       ++workerTaskStatistics.executed
     } else {
@@ -928,6 +988,7 @@ export abstract class AbstractPool<
   protected createAndSetupWorkerNode (): number {
     const worker = this.createWorker()
 
+    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('error', (error) => {
@@ -947,7 +1008,6 @@ export abstract class AbstractPool<
         this.redistributeQueuedTasks(workerNodeKey)
       }
     })
-    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
       this.removeWorkerNode(worker)
@@ -1104,7 +1164,7 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return (message) => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null) {
+      if (message.ready != null && message.taskFunctions != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
@@ -1112,34 +1172,40 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.taskFunctions = message.taskFunctions
+        this.getWorkerInfo(
+          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        ).taskFunctions = message.taskFunctions
       }
     }
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
-    this.getWorkerInfo(
+    if (message.ready === false) {
+      throw new Error(`Worker ${message.workerId} failed to initialize`)
+    }
+    const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    ).ready = message.ready as boolean
+    )
+    workerInfo.ready = message.ready as boolean
+    workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const promiseResponse = this.promiseResponseMap.get(
-      message.taskId as string
-    )
+    const { taskId, taskError, data } = message
+    const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      if (message.taskError != null) {
-        this.emitter?.emit(PoolEvents.taskError, message.taskError)
-        promiseResponse.reject(message.taskError.message)
+      if (taskError != null) {
+        this.emitter?.emit(PoolEvents.taskError, taskError)
+        promiseResponse.reject(taskError.message)
       } else {
-        promiseResponse.resolve(message.data as Response)
+        promiseResponse.resolve(data as Response)
       }
       const workerNodeKey = promiseResponse.workerNodeKey
       this.afterTaskExecutionHook(workerNodeKey, message)
-      this.promiseResponseMap.delete(message.taskId as string)
+      this.promiseResponseMap.delete(taskId as string)
       if (
         this.opts.enableTasksQueue === true &&
         this.tasksQueueSize(workerNodeKey) > 0 &&
@@ -1184,7 +1250,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1192,7 +1262,7 @@ export abstract class AbstractPool<
     this.workerNodes.push(workerNode)
     const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
     if (workerNodeKey === -1) {
-      throw new Error('Worker node not found')
+      throw new Error('Worker node added not found')
     }
     return workerNodeKey
   }
@@ -1210,6 +1280,23 @@ export abstract class AbstractPool<
     }
   }
 
+  /** @inheritDoc */
+  public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    )
+  }
+
+  private hasBackPressure (): boolean {
+    return (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes.findIndex(
+        (workerNode) => !workerNode.hasBackPressure()
+      ) !== -1
+    )
+  }
+
   /**
    * Executes the given task on the worker given its worker node key.
    *
@@ -1222,7 +1309,11 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    if (this.hasBackPressure()) {
+      this.emitter?.emit(PoolEvents.backPressure, this.info)
+    }
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {