From b560403426309944ad775794161773a745263190 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 24 Sep 2023 14:01:49 +0200 Subject: [PATCH] feat: change pool event emitter to event emitter async resource MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- @types/events.d.ts | 49 +++++++++++++++++++++++++++++++ CHANGELOG.md | 4 +++ docs/api.md | 2 +- src/pools/abstract-pool.ts | 11 +++++-- src/pools/pool.ts | 11 +++---- src/worker/cluster-worker.ts | 2 +- src/worker/thread-worker.ts | 2 +- tests/pools/abstract-pool.test.js | 4 +-- tsconfig.json | 1 + 9 files changed, 74 insertions(+), 12 deletions(-) create mode 100644 @types/events.d.ts diff --git a/@types/events.d.ts b/@types/events.d.ts new file mode 100644 index 00000000..e5504f0a --- /dev/null +++ b/@types/events.d.ts @@ -0,0 +1,49 @@ +import type { AsyncResource, AsyncResourceOptions } from 'node:async_hooks' +import { EventEmitter } from 'node:events' + +declare module 'node:events' { + interface EventEmitterOptions { + /** + * Enables automatic capturing of promise rejection. + */ + captureRejections?: boolean | undefined + } + + interface EventEmitterAsyncResourceOptions + extends AsyncResourceOptions, + EventEmitterOptions { + /** + * The type of async event. + * @default new.target.name + */ + name?: string + } + + /** + * Integrates `EventEmitter` with `AsyncResource` for `EventEmitters` that require + * manual async tracking. Specifically, all events emitted by instances of + * `EventEmitterAsyncResource` will run within its async context. + * + * The EventEmitterAsyncResource class has the same methods and takes the + * same options as EventEmitter and AsyncResource themselves. + */ + export class EventEmitterAsyncResource extends EventEmitter { + constructor (options?: EventEmitterAsyncResourceOptions) + /** + * Call all `destroy` hooks. This should only ever be called once. An error will + * be thrown if it is called more than once. This **must** be manually called. If + * the resource is left to be collected by the GC then the `destroy` hooks will + * never be called. + * @return A reference to `asyncResource`. + */ + emitDestroy (): AsyncResource + /** The unique asyncId assigned to the resource. */ + get asyncId (): number + /** The same triggerAsyncId that is passed to the AsyncResource constructor. */ + get triggerAsyncId (): number + /** The underlying AsyncResource */ + get asyncResource (): AsyncResource & { + readonly eventEmitter: EventEmitterAsyncResource + } + } +} diff --git a/CHANGELOG.md b/CHANGELOG.md index dd22f49e..61249856 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Convert pool event emitter to event emitter async resource. + ## [2.7.2] - 2023-09-23 ### Changed diff --git a/docs/api.md b/docs/api.md index 4f7c074d..91185f77 100644 --- a/docs/api.md +++ b/docs/api.md @@ -126,7 +126,7 @@ An object with these properties: Default: `true` - `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool. Default: `true` -- `enableEvents` (optional) - Events emission enablement in this pool. +- `enableEvents` (optional) - Events integrated with async resource emission enablement in this pool. Default: `true` - `enableTasksQueue` (optional) - Tasks queue per worker enablement in this pool. Default: `false` diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 70459dbb..f7e6f8d2 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -70,7 +70,7 @@ export abstract class AbstractPool< public readonly workerNodes: Array> = [] /** @inheritDoc */ - public readonly emitter?: PoolEmitter + public emitter?: PoolEmitter /** * The task execution response promise map: @@ -142,7 +142,7 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) if (this.opts.enableEvents === true) { - this.emitter = new PoolEmitter() + this.initializeEventEmitter() } this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext< Worker, @@ -261,6 +261,12 @@ export abstract class AbstractPool< } } + private initializeEventEmitter (): void { + this.emitter = new PoolEmitter({ + name: `poolifier:${this.type}-${this.worker}-pool` + }) + } + /** @inheritDoc */ public get info (): PoolInfo { return { @@ -938,6 +944,7 @@ export abstract class AbstractPool< }) ) this.emitter?.emit(PoolEvents.destroy, this.info) + this.emitter?.emitDestroy() this.started = false } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f0c7d886..e6388b03 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,4 +1,4 @@ -import { EventEmitter } from 'node:events' +import { EventEmitterAsyncResource } from 'node:events' import type { TransferListItem } from 'node:worker_threads' import type { TaskFunction } from '../worker/task-functions' import type { @@ -35,9 +35,9 @@ export const PoolTypes = Object.freeze({ export type PoolType = keyof typeof PoolTypes /** - * Pool events emitter. + * Pool event emitter integrated with async resource. */ -export class PoolEmitter extends EventEmitter {} +export class PoolEmitter extends EventEmitterAsyncResource {} /** * Enumeration of pool events. @@ -179,7 +179,7 @@ export interface PoolOptions { */ restartWorkerOnError?: boolean /** - * Pool events emission. + * Pool events integrated with async resource emission. * * @defaultValue true */ @@ -227,7 +227,8 @@ export interface IPool< */ readonly hasWorkerNodeBackPressure: (workerNodeKey: number) => boolean /** - * Emitter on which events can be listened to. + * Event emitter integrated with `AsyncResource` on which events can be listened to. + * The async tracking tooling identifier is `poolifier:--pool`. * * Events that can currently be listened to: * diff --git a/src/worker/cluster-worker.ts b/src/worker/cluster-worker.ts index 201a516c..0fd90956 100644 --- a/src/worker/cluster-worker.ts +++ b/src/worker/cluster-worker.ts @@ -33,7 +33,7 @@ export class ClusterWorker< opts: WorkerOptions = {} ) { super( - 'worker-cluster-pool:poolifier', + 'poolifier:cluster-worker', cluster.isPrimary, cluster.worker as Worker, taskFunctions, diff --git a/src/worker/thread-worker.ts b/src/worker/thread-worker.ts index 7b92caf6..bfd1c13c 100644 --- a/src/worker/thread-worker.ts +++ b/src/worker/thread-worker.ts @@ -42,7 +42,7 @@ export class ThreadWorker< opts: WorkerOptions = {} ) { super( - 'worker-thread-pool:poolifier', + 'poolifier:thread-worker', isMainThread, parentPort as MessagePort, taskFunctions, diff --git a/tests/pools/abstract-pool.test.js b/tests/pools/abstract-pool.test.js index 85d48d6a..cb04292b 100644 --- a/tests/pools/abstract-pool.test.js +++ b/tests/pools/abstract-pool.test.js @@ -1,4 +1,4 @@ -const { EventEmitter } = require('node:events') +const { EventEmitterAsyncResource } = require('node:events') const { expect } = require('expect') const sinon = require('sinon') const { @@ -186,7 +186,7 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/thread/testWorker.js' ) - expect(pool.emitter).toBeInstanceOf(EventEmitter) + expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource) expect(pool.opts).toStrictEqual({ startWorkers: true, enableEvents: true, diff --git a/tsconfig.json b/tsconfig.json index 79ab1b82..1bd56853 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,6 +4,7 @@ "target": "ES2022", "module": "ES2022", "moduleResolution": "Node", + "typeRoots": ["./node_modules/@types", "./@types"], "declaration": true, "declarationDir": "./lib/dts", "strict": true, -- 2.34.1