feat: introduce worker node queue back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
index 5b2bd15012142fa7e075fa8ea4968041ccfdd63a..eb2c2f7818a687ac8f8016eb27489895cadfaaee 100644 (file)
@@ -92,10 +92,6 @@ export abstract class AbstractPool<
    * The start timestamp of the pool.
    */
   private readonly startTimestamp
-  /**
-   * The task function names.
-   */
-  private taskFunctions!: string[]
 
   /**
    * Constructs a new poolifier pool.
@@ -510,7 +506,9 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
    */
   private checkMessageWorkerId (message: MessageValue<Response>): void {
-    if (
+    if (message.workerId == null) {
+      throw new Error('Worker message received without worker id')
+    } else if (
       message.workerId != null &&
       this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
     ) {
@@ -648,11 +646,15 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (this.taskFunctions != null) {
-      return this.taskFunctions
-    } else {
-      return []
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctions) &&
+        workerNode.info.taskFunctions.length > 0
+      ) {
+        return workerNode.info.taskFunctions
+      }
     }
+    return []
   }
 
   /** @inheritDoc */
@@ -670,25 +672,30 @@ export abstract class AbstractPool<
         typeof name === 'string' &&
         name.trim().length === 0
       ) {
-        reject(new Error('name argument must not be an empty string'))
-      }
-      if (name != null && !this.taskFunctions.includes(name)) {
-        reject(
-          new Error(`Task function '${name}' is not registered in the pool`)
-        )
+        reject(new TypeError('name argument must not be an empty string'))
       }
       if (transferList != null && !Array.isArray(transferList)) {
         reject(new TypeError('transferList argument must be an array'))
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      if (
+        name != null &&
+        Array.isArray(workerInfo.taskFunctions) &&
+        !workerInfo.taskFunctions.includes(name)
+      ) {
+        reject(
+          new Error(`Task function '${name}' is not registered in the pool`)
+        )
+      }
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
+        workerId: workerInfo.id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -717,6 +724,7 @@ export abstract class AbstractPool<
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
+    this.emitter?.emit(PoolEvents.destroy)
   }
 
   protected async sendKillMessageToWorker (
@@ -771,11 +779,13 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      task.name as string
-    ) as WorkerUsage
-    ++taskWorkerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
+      ++taskWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    }
   }
 
   /**
@@ -793,12 +803,24 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      message.taskPerformance?.name ?? DEFAULT_TASK_NAME
-    ) as WorkerUsage
-    this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-    this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-    this.updateEluWorkerUsage(taskWorkerUsage, message)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) as WorkerUsage
+      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+      this.updateEluWorkerUsage(taskWorkerUsage, message)
+    }
+  }
+
+  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
+    return (
+      Array.isArray(workerInfo.taskFunctions) &&
+      workerInfo.taskFunctions.length > 1
+    )
   }
 
   private updateTaskStatisticsWorkerUsage (
@@ -928,6 +950,7 @@ export abstract class AbstractPool<
   protected createAndSetupWorkerNode (): number {
     const worker = this.createWorker()
 
+    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('error', (error) => {
@@ -947,7 +970,6 @@ export abstract class AbstractPool<
         this.redistributeQueuedTasks(workerNodeKey)
       }
     })
-    worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
       this.removeWorkerNode(worker)
@@ -1104,7 +1126,7 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return (message) => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null) {
+      if (message.ready != null && message.taskFunctions != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
@@ -1112,34 +1134,40 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.taskFunctions = message.taskFunctions
+        this.getWorkerInfo(
+          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        ).taskFunctions = message.taskFunctions
       }
     }
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
-    this.getWorkerInfo(
+    if (message.ready === false) {
+      throw new Error(`Worker ${message.workerId} failed to initialize`)
+    }
+    const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    ).ready = message.ready as boolean
+    )
+    workerInfo.ready = message.ready as boolean
+    workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const promiseResponse = this.promiseResponseMap.get(
-      message.taskId as string
-    )
+    const { taskId, taskError, data } = message
+    const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      if (message.taskError != null) {
-        this.emitter?.emit(PoolEvents.taskError, message.taskError)
-        promiseResponse.reject(message.taskError.message)
+      if (taskError != null) {
+        this.emitter?.emit(PoolEvents.taskError, taskError)
+        promiseResponse.reject(taskError.message)
       } else {
-        promiseResponse.resolve(message.data as Response)
+        promiseResponse.resolve(data as Response)
       }
       const workerNodeKey = promiseResponse.workerNodeKey
       this.afterTaskExecutionHook(workerNodeKey, message)
-      this.promiseResponseMap.delete(message.taskId as string)
+      this.promiseResponseMap.delete(taskId as string)
       if (
         this.opts.enableTasksQueue === true &&
         this.tasksQueueSize(workerNodeKey) > 0 &&
@@ -1184,7 +1212,11 @@ export abstract class AbstractPool<
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
    */
   private addWorkerNode (worker: Worker): number {
-    const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
+    const workerNode = new WorkerNode<Worker, Data>(
+      worker,
+      this.worker,
+      this.maxSize
+    )
     // Flag the worker node as ready at pool startup.
     if (this.starting) {
       workerNode.info.ready = true
@@ -1192,7 +1224,7 @@ export abstract class AbstractPool<
     this.workerNodes.push(workerNode)
     const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
     if (workerNodeKey === -1) {
-      throw new Error('Worker node not found')
+      throw new Error('Worker node added not found')
     }
     return workerNodeKey
   }
@@ -1222,6 +1254,15 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      this.emitter?.emit(PoolEvents.backPressure, {
+        workerId: this.getWorkerInfo(workerNodeKey).id,
+        ...this.info
+      })
+    }
     return this.workerNodes[workerNodeKey].enqueueTask(task)
   }