--- /dev/null
+import type { AsyncResource, AsyncResourceOptions } from 'node:async_hooks'
+import { EventEmitter } from 'node:events'
+
+declare module 'node:events' {
+ interface EventEmitterOptions {
+ /**
+ * Enables automatic capturing of promise rejection.
+ */
+ captureRejections?: boolean | undefined
+ }
+
+ interface EventEmitterAsyncResourceOptions
+ extends AsyncResourceOptions,
+ EventEmitterOptions {
+ /**
+ * The type of async event.
+ * @default new.target.name
+ */
+ name?: string
+ }
+
+ /**
+ * Integrates `EventEmitter` with `AsyncResource` for `EventEmitters` that require
+ * manual async tracking. Specifically, all events emitted by instances of
+ * `EventEmitterAsyncResource` will run within its async context.
+ *
+ * The EventEmitterAsyncResource class has the same methods and takes the
+ * same options as EventEmitter and AsyncResource themselves.
+ */
+ export class EventEmitterAsyncResource extends EventEmitter {
+ constructor (options?: EventEmitterAsyncResourceOptions)
+ /**
+ * Call all `destroy` hooks. This should only ever be called once. An error will
+ * be thrown if it is called more than once. This **must** be manually called. If
+ * the resource is left to be collected by the GC then the `destroy` hooks will
+ * never be called.
+ * @return A reference to `asyncResource`.
+ */
+ emitDestroy (): AsyncResource
+ /** The unique asyncId assigned to the resource. */
+ get asyncId (): number
+ /** The same triggerAsyncId that is passed to the AsyncResource constructor. */
+ get triggerAsyncId (): number
+ /** The underlying AsyncResource */
+ get asyncResource (): AsyncResource & {
+ readonly eventEmitter: EventEmitterAsyncResource
+ }
+ }
+}
## [Unreleased]
+### Changed
+
+- Convert pool event emitter to event emitter async resource.
+
## [2.7.2] - 2023-09-23
### Changed
Default: `true`
- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.
Default: `true`
-- `enableEvents` (optional) - Events emission enablement in this pool.
+- `enableEvents` (optional) - Events integrated with async resource emission enablement in this pool.
Default: `true`
- `enableTasksQueue` (optional) - Tasks queue per worker enablement in this pool.
Default: `false`
public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
/** @inheritDoc */
- public readonly emitter?: PoolEmitter
+ public emitter?: PoolEmitter
/**
* The task execution response promise map:
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.emitter = new PoolEmitter()
+ this.initializeEventEmitter()
}
this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
Worker,
}
}
+ private initializeEventEmitter (): void {
+ this.emitter = new PoolEmitter({
+ name: `poolifier:${this.type}-${this.worker}-pool`
+ })
+ }
+
/** @inheritDoc */
public get info (): PoolInfo {
return {
})
)
this.emitter?.emit(PoolEvents.destroy, this.info)
+ this.emitter?.emitDestroy()
this.started = false
}
-import { EventEmitter } from 'node:events'
+import { EventEmitterAsyncResource } from 'node:events'
import type { TransferListItem } from 'node:worker_threads'
import type { TaskFunction } from '../worker/task-functions'
import type {
export type PoolType = keyof typeof PoolTypes
/**
- * Pool events emitter.
+ * Pool event emitter integrated with async resource.
*/
-export class PoolEmitter extends EventEmitter {}
+export class PoolEmitter extends EventEmitterAsyncResource {}
/**
* Enumeration of pool events.
*/
restartWorkerOnError?: boolean
/**
- * Pool events emission.
+ * Pool events integrated with async resource emission.
*
* @defaultValue true
*/
*/
readonly hasWorkerNodeBackPressure: (workerNodeKey: number) => boolean
/**
- * Emitter on which events can be listened to.
+ * Event emitter integrated with `AsyncResource` on which events can be listened to.
+ * The async tracking tooling identifier is `poolifier:<PoolType>-<WorkerType>-pool`.
*
* Events that can currently be listened to:
*
opts: WorkerOptions = {}
) {
super(
- 'worker-cluster-pool:poolifier',
+ 'poolifier:cluster-worker',
cluster.isPrimary,
cluster.worker as Worker,
taskFunctions,
opts: WorkerOptions = {}
) {
super(
- 'worker-thread-pool:poolifier',
+ 'poolifier:thread-worker',
isMainThread,
parentPort as MessagePort,
taskFunctions,
-const { EventEmitter } = require('node:events')
+const { EventEmitterAsyncResource } = require('node:events')
const { expect } = require('expect')
const sinon = require('sinon')
const {
numberOfWorkers,
'./tests/worker-files/thread/testWorker.js'
)
- expect(pool.emitter).toBeInstanceOf(EventEmitter)
+ expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
expect(pool.opts).toStrictEqual({
startWorkers: true,
enableEvents: true,
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "Node",
+ "typeRoots": ["./node_modules/@types", "./@types"],
"declaration": true,
"declarationDir": "./lib/dts",
"strict": true,