import { randomUUID } from 'node:crypto'
import { performance } from 'node:perf_hooks'
import type { TransferListItem } from 'node:worker_threads'
+import { type EventEmitter, EventEmitterAsyncResource } from 'node:events'
import type {
MessageValue,
PromiseResponseWrapper,
import type { TaskFunction } from '../worker/task-functions'
import {
type IPool,
- PoolEmitter,
PoolEvents,
type PoolInfo,
type PoolOptions,
public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
- public emitter?: PoolEmitter
+ public emitter?: EventEmitter | EventEmitterAsyncResource
/**
* The task execution response promise map:
}
private initializeEventEmitter (): void {
- this.emitter = new PoolEmitter({
+ this.emitter = new EventEmitterAsyncResource({
name: `poolifier:${this.type}-${this.worker}-pool`
})
}
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
- this.emitter?.emitDestroy()
+ if (this.emitter instanceof EventEmitterAsyncResource) {
+ this.emitter?.emitDestroy()
+ }
this.started = false
}
-import { EventEmitterAsyncResource } from 'node:events'
import type { TransferListItem } from 'node:worker_threads'
+import type { EventEmitter, EventEmitterAsyncResource } from 'node:events'
import type { TaskFunction } from '../worker/task-functions'
import type {
ErrorHandler,
*/
export type PoolType = keyof typeof PoolTypes
-/**
- * Pool event emitter integrated with async resource.
- */
-export class PoolEmitter extends EventEmitterAsyncResource {}
-
/**
* Enumeration of pool events.
*/
* - `'taskError'`: Emitted when an error occurs while executing a task.
* - `'backPressure'`: Emitted when all worker nodes have back pressure (i.e. their tasks queue is full: queue size \>= maximum queue size).
*/
- readonly emitter?: PoolEmitter
+ readonly emitter?: EventEmitter | EventEmitterAsyncResource
/**
* Executes the specified function in the worker constructor with the task data input parameter.
*