fix: fix possible null exception with worker_threads pools
[poolifier.git] / src / pools / abstract-pool.ts
index d51970ef216f3c2e60c2fb07c8a6c145058df7b7..da0eb335b26f41092fb57560d255b5b0b624ad6e 100644 (file)
@@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
 import type { TransferListItem } from 'node:worker_threads'
 import { EventEmitterAsyncResource } from 'node:events'
+import { AsyncResource } from 'node:async_hooks'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -933,7 +934,13 @@ export abstract class AbstractPool<
       this.promiseResponseMap.set(task.taskId as string, {
         resolve,
         reject,
-        workerNodeKey
+        workerNodeKey,
+        ...(this.emitter != null && {
+          asyncResource: new AsyncResource('poolifier:task', {
+            triggerAsyncId: this.emitter.asyncId,
+            requireManualDestroy: true
+          })
+        })
       })
       if (
         this.opts.enableTasksQueue === false ||
@@ -1398,7 +1405,7 @@ export abstract class AbstractPool<
     // Listen to worker messages.
     this.registerWorkerMessageListener(
       workerNodeKey,
-      this.workerMessageListener.bind(this)
+      this.workerMessageListener
     )
     // Send the startup message to worker.
     this.sendStartupMessageToWorker(workerNodeKey)
@@ -1445,6 +1452,9 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
+    if (this.workerNodes.length <= 1) {
+      return
+    }
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       const destinationWorkerNodeKey = this.workerNodes.reduce(
         (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
@@ -1538,6 +1548,9 @@ export abstract class AbstractPool<
     eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
+    if (this.workerNodes.length <= 1) {
+      return
+    }
     const { workerNodeKey } = eventDetail
     if (workerNodeKey == null) {
       throw new Error(
@@ -1629,6 +1642,9 @@ export abstract class AbstractPool<
   private readonly handleBackPressureEvent = (
     eventDetail: WorkerNodeEventDetail
   ): void => {
+    if (this.workerNodes.length <= 1) {
+      return
+    }
     const { workerId } = eventDetail
     const sizeOffset = 1
     if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
@@ -1667,7 +1683,9 @@ export abstract class AbstractPool<
   /**
    * This method is the message listener registered on each worker.
    */
-  protected workerMessageListener (message: MessageValue<Response>): void {
+  protected readonly workerMessageListener = (
+    message: MessageValue<Response>
+  ): void => {
     this.checkMessageWorkerId(message)
     const { workerId, ready, taskId, taskFunctionNames } = message
     if (ready != null && taskFunctionNames != null) {
@@ -1704,13 +1722,22 @@ export abstract class AbstractPool<
     const { workerId, taskId, workerError, data } = message
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      const { resolve, reject, workerNodeKey } = promiseResponse
+      const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
       if (workerError != null) {
         this.emitter?.emit(PoolEvents.taskError, workerError)
-        reject(workerError.message)
+        asyncResource != null
+          ? asyncResource.runInAsyncScope(
+            reject,
+            this.emitter,
+            workerError.message
+          )
+          : reject(workerError.message)
       } else {
-        resolve(data as Response)
+        asyncResource != null
+          ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
+          : resolve(data as Response)
       }
+      asyncResource?.emitDestroy()
       this.afterTaskExecutionHook(workerNodeKey, message)
       this.workerChoiceStrategyContext.update(workerNodeKey)
       this.promiseResponseMap.delete(taskId as string)