feat: introduce worker node queue back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
index 06e69a6e3140da3092e75a6ff7040e201c70255c..eb2c2f7818a687ac8f8016eb27489895cadfaaee 100644 (file)
@@ -92,10 +92,6 @@ export abstract class AbstractPool<
    * The start timestamp of the pool.
    */
   private readonly startTimestamp
-  /**
-   * The task function names.
-   */
-  private taskFunctions!: string[]
 
   /**
    * Constructs a new poolifier pool.
@@ -510,7 +506,9 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
    */
   private checkMessageWorkerId (message: MessageValue<Response>): void {
-    if (
+    if (message.workerId == null) {
+      throw new Error('Worker message received without worker id')
+    } else if (
       message.workerId != null &&
       this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
     ) {
@@ -648,11 +646,15 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (this.taskFunctions != null) {
-      return this.taskFunctions
-    } else {
-      return []
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctions) &&
+        workerNode.info.taskFunctions.length > 0
+      ) {
+        return workerNode.info.taskFunctions
+      }
     }
+    return []
   }
 
   /** @inheritDoc */
@@ -672,27 +674,28 @@ export abstract class AbstractPool<
       ) {
         reject(new TypeError('name argument must not be an empty string'))
       }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
+      const timestamp = performance.now()
+      const workerNodeKey = this.chooseWorkerNode()
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
       if (
         name != null &&
-        this.taskFunctions != null &&
-        !this.taskFunctions.includes(name)
+        Array.isArray(workerInfo.taskFunctions) &&
+        !workerInfo.taskFunctions.includes(name)
       ) {
         reject(
           new Error(`Task function '${name}' is not registered in the pool`)
         )
       }
-      if (transferList != null && !Array.isArray(transferList)) {
-        reject(new TypeError('transferList argument must be an array'))
-      }
-      const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
+        workerId: workerInfo.id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -776,11 +779,13 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      task.name as string
-    ) as WorkerUsage
-    ++taskWorkerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
+      ++taskWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    }
   }
 
   /**
@@ -798,12 +803,24 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      message.taskPerformance?.name ?? DEFAULT_TASK_NAME
-    ) as WorkerUsage
-    this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-    this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-    this.updateEluWorkerUsage(taskWorkerUsage, message)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) as WorkerUsage
+      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+      this.updateEluWorkerUsage(taskWorkerUsage, message)
+    }
+  }
+
+  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    return (
+      Array.isArray(workerInfo.taskFunctions) &&
+      workerInfo.taskFunctions.length > 1
+    )
   }
 
   private updateTaskStatisticsWorkerUsage (
@@ -1109,7 +1126,7 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return (message) => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null) {
+      if (message.ready != null && message.taskFunctions != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
@@ -1117,34 +1134,40 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.taskFunctions = message.taskFunctions
+        this.getWorkerInfo(
+          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        ).taskFunctions = message.taskFunctions
       }
     }
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
-    this.getWorkerInfo(
+    if (message.ready === false) {
+      throw new Error(`Worker ${message.workerId} failed to initialize`)
+    }
+    const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    ).ready = message.ready as boolean
+    )
+    workerInfo.ready = message.ready as boolean
+    workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const promiseResponse = this.promiseResponseMap.get(
-      message.taskId as string
-    )
+    const { taskId, taskError, data } = message
+    const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      if (message.taskError != null) {
-        this.emitter?.emit(PoolEvents.taskError, message.taskError)
-        promiseResponse.reject(message.taskError.message)
+      if (taskError != null) {
+        this.emitter?.emit(PoolEvents.taskError, taskError)
+        promiseResponse.reject(taskError.message)
       } else {
-        promiseResponse.resolve(message.data as Response)
+        promiseResponse.resolve(data as Response)
       }
       const workerNodeKey = promiseResponse.workerNodeKey
       this.afterTaskExecutionHook(workerNodeKey, message)
-      this.promiseResponseMap.delete(message.taskId as string)
+      this.promiseResponseMap.delete(taskId as string)
       if (
         this.opts.enableTasksQueue === true &&
         this.tasksQueueSize(workerNodeKey) > 0 &&
@@ -1189,7 +1212,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1197,7 +1224,7 @@ export abstract class AbstractPool<
     this.workerNodes.push(workerNode)
     const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
     if (workerNodeKey === -1) {
-      throw new Error('Worker node not found')
+      throw new Error('Worker node added not found')
     }
     return workerNodeKey
   }
@@ -1227,6 +1254,15 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      this.emitter?.emit(PoolEvents.backPressure, {
+        workerId: this.getWorkerInfo(workerNodeKey).id,
+        ...this.info
+      })
+    }
     return this.workerNodes[workerNodeKey].enqueueTask(task)
   }