import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
-import { type TransferListItem } from 'node:worker_threads'
+import type { TransferListItem } from 'node:worker_threads'
+import { type EventEmitter, EventEmitterAsyncResource } from 'node:events'
import type {
MessageValue,
PromiseResponseWrapper,
import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
- PoolEmitter,
PoolEvents,
type PoolInfo,
type PoolOptions,
PoolTypes,
type TasksQueueOptions
} from './pool'
-import {
- type IWorker,
- type IWorkerNode,
- type WorkerInfo,
- type WorkerType,
- type WorkerUsage
+import type {
+ IWorker,
+ IWorkerNode,
+ WorkerInfo,
+ WorkerType,
+ WorkerUsage
} from './worker'
import {
type MeasurementStatisticsRequirements,
public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
- public readonly emitter?: PoolEmitter
+ public emitter?: EventEmitter | EventEmitterAsyncResource
/**
* The task execution response promise map:
'Cannot start a pool from a worker with the same type as the pool'
)
}
- this.checkNumberOfWorkers(this.numberOfWorkers)
checkFilePath(this.filePath)
+ this.checkNumberOfWorkers(this.numberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.emitter = new PoolEmitter()
+ this.initializeEventEmitter()
}
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
Worker,
}
}
+ private initializeEventEmitter (): void {
+ this.emitter = new EventEmitterAsyncResource({
+ name: `poolifier:${this.type}-${this.worker}-pool`
+ })
+ }
+
/** @inheritDoc */
public get info (): PoolInfo {
return {
* @param message - The received message.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
*/
- private checkMessageWorkerId (message: MessageValue<Response>): void {
+ private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
if (message.workerId == null) {
throw new Error('Worker message received without worker id')
} else if (
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
+ if (this.emitter instanceof EventEmitterAsyncResource) {
+ this.emitter?.emitDestroy()
+ }
this.started = false
}
protected createAndSetupDynamicWorkerNode (): number {
const workerNodeKey = this.createAndSetupWorkerNode()
this.registerWorkerMessageListener(workerNodeKey, message => {
+ this.checkMessageWorkerId(message)
const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
message.workerId
)