X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=c56beaa6d801843c043f091963cea385b72eccee;hb=c726f66c5c7ce127bfd2010a60fa07761f21cbd1;hp=88e5fe79e782b06cbf507c794ab85c54d1bceaa9;hpb=ae036c3e73796126b7f1138129b6b18ef6bcef8c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 88e5fe79..c56beaa6 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' -import { type TransferListItem } from 'node:worker_threads' +import type { TransferListItem } from 'node:worker_threads' +import { type EventEmitter, EventEmitterAsyncResource } from 'node:events' import type { MessageValue, PromiseResponseWrapper, @@ -22,7 +23,6 @@ import { KillBehaviors } from '../worker/worker-options' import type { TaskFunction } from '../worker/task-functions' import { type IPool, - PoolEmitter, PoolEvents, type PoolInfo, type PoolOptions, @@ -30,12 +30,12 @@ import { PoolTypes, type TasksQueueOptions } from './pool' -import { - type IWorker, - type IWorkerNode, - type WorkerInfo, - type WorkerType, - type WorkerUsage +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerType, + WorkerUsage } from './worker' import { type MeasurementStatisticsRequirements, @@ -70,7 +70,7 @@ export abstract class AbstractPool< public readonly workerNodes: Array> = [] /** @inheritDoc */ - public readonly emitter?: PoolEmitter + public emitter?: EventEmitter | EventEmitterAsyncResource /** * The task execution response promise map: @@ -133,8 +133,8 @@ export abstract class AbstractPool< 'Cannot start a pool from a worker with the same type as the pool' ) } - this.checkNumberOfWorkers(this.numberOfWorkers) checkFilePath(this.filePath) + this.checkNumberOfWorkers(this.numberOfWorkers) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) @@ -142,7 +142,7 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) if (this.opts.enableEvents === true) { - this.emitter = new PoolEmitter() + this.initializeEventEmitter() } this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext< Worker, @@ -261,6 +261,12 @@ export abstract class AbstractPool< } } + private initializeEventEmitter (): void { + this.emitter = new EventEmitterAsyncResource({ + name: `poolifier:${this.type}-${this.worker}-pool` + }) + } + /** @inheritDoc */ public get info (): PoolInfo { return { @@ -485,7 +491,7 @@ export abstract class AbstractPool< * @param message - The received message. * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ - private checkMessageWorkerId (message: MessageValue): void { + private checkMessageWorkerId (message: MessageValue): void { if (message.workerId == null) { throw new Error('Worker message received without worker id') } else if ( @@ -938,6 +944,9 @@ export abstract class AbstractPool< }) ) this.emitter?.emit(PoolEvents.destroy, this.info) + if (this.emitter instanceof EventEmitterAsyncResource) { + this.emitter?.emitDestroy() + } this.started = false } @@ -1243,6 +1252,7 @@ export abstract class AbstractPool< protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() this.registerWorkerMessageListener(workerNodeKey, message => { + this.checkMessageWorkerId(message) const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId )