chore: v2.7.4
[poolifier.git] / src / pools / abstract-pool.ts
index 88e5fe79e782b06cbf507c794ab85c54d1bceaa9..c56beaa6d801843c043f091963cea385b72eccee 100644 (file)
@@ -1,6 +1,7 @@
 import { randomUUID } from 'node:crypto'
 import { performance } from 'node:perf_hooks'
-import { type TransferListItem } from 'node:worker_threads'
+import type { TransferListItem } from 'node:worker_threads'
+import { type EventEmitter, EventEmitterAsyncResource } from 'node:events'
 import type {
   MessageValue,
   PromiseResponseWrapper,
@@ -22,7 +23,6 @@ import { KillBehaviors } from '../worker/worker-options'
 import type { TaskFunction } from '../worker/task-functions'
 import {
   type IPool,
-  PoolEmitter,
   PoolEvents,
   type PoolInfo,
   type PoolOptions,
@@ -30,12 +30,12 @@ import {
   PoolTypes,
   type TasksQueueOptions
 } from './pool'
-import {
-  type IWorker,
-  type IWorkerNode,
-  type WorkerInfo,
-  type WorkerType,
-  type WorkerUsage
+import type {
+  IWorker,
+  IWorkerNode,
+  WorkerInfo,
+  WorkerType,
+  WorkerUsage
 } from './worker'
 import {
   type MeasurementStatisticsRequirements,
@@ -70,7 +70,7 @@ export abstract class AbstractPool<
   public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
 
   /** @inheritDoc */
-  public readonly emitter?: PoolEmitter
+  public emitter?: EventEmitter | EventEmitterAsyncResource
 
   /**
    * The task execution response promise map:
@@ -133,8 +133,8 @@ export abstract class AbstractPool<
         'Cannot start a pool from a worker with the same type as the pool'
       )
     }
-    this.checkNumberOfWorkers(this.numberOfWorkers)
     checkFilePath(this.filePath)
+    this.checkNumberOfWorkers(this.numberOfWorkers)
     this.checkPoolOptions(this.opts)
 
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
@@ -142,7 +142,7 @@ export abstract class AbstractPool<
     this.enqueueTask = this.enqueueTask.bind(this)
 
     if (this.opts.enableEvents === true) {
-      this.emitter = new PoolEmitter()
+      this.initializeEventEmitter()
     }
     this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
     Worker,
@@ -261,6 +261,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private initializeEventEmitter (): void {
+    this.emitter = new EventEmitterAsyncResource({
+      name: `poolifier:${this.type}-${this.worker}-pool`
+    })
+  }
+
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
@@ -485,7 +491,7 @@ export abstract class AbstractPool<
    * @param message - The received message.
    * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
    */
-  private checkMessageWorkerId (message: MessageValue<Response>): void {
+  private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
     if (message.workerId == null) {
       throw new Error('Worker message received without worker id')
     } else if (
@@ -938,6 +944,9 @@ export abstract class AbstractPool<
       })
     )
     this.emitter?.emit(PoolEvents.destroy, this.info)
+    if (this.emitter instanceof EventEmitterAsyncResource) {
+      this.emitter?.emitDestroy()
+    }
     this.started = false
   }
 
@@ -1243,6 +1252,7 @@ export abstract class AbstractPool<
   protected createAndSetupDynamicWorkerNode (): number {
     const workerNodeKey = this.createAndSetupWorkerNode()
     this.registerWorkerMessageListener(workerNodeKey, message => {
+      this.checkMessageWorkerId(message)
       const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
         message.workerId
       )