feat: change pool event emitter to event emitter async resource
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 24 Sep 2023 12:01:49 +0000 (14:01 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 24 Sep 2023 12:01:49 +0000 (14:01 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
@types/events.d.ts [new file with mode: 0644]
CHANGELOG.md
docs/api.md
src/pools/abstract-pool.ts
src/pools/pool.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/pools/abstract-pool.test.js
tsconfig.json

diff --git a/@types/events.d.ts b/@types/events.d.ts
new file mode 100644 (file)
index 0000000..e5504f0
--- /dev/null
@@ -0,0 +1,49 @@
+import type { AsyncResource, AsyncResourceOptions } from 'node:async_hooks'
+import { EventEmitter } from 'node:events'
+
+declare module 'node:events' {
+  interface EventEmitterOptions {
+    /**
+     * Enables automatic capturing of promise rejection.
+     */
+    captureRejections?: boolean | undefined
+  }
+
+  interface EventEmitterAsyncResourceOptions
+    extends AsyncResourceOptions,
+    EventEmitterOptions {
+    /**
+     * The type of async event.
+     * @default new.target.name
+     */
+    name?: string
+  }
+
+  /**
+   * Integrates `EventEmitter` with `AsyncResource` for `EventEmitters` that require
+   * manual async tracking. Specifically, all events emitted by instances of
+   * `EventEmitterAsyncResource` will run within its async context.
+   *
+   * The EventEmitterAsyncResource class has the same methods and takes the
+   * same options as EventEmitter and AsyncResource themselves.
+   */
+  export class EventEmitterAsyncResource extends EventEmitter {
+    constructor (options?: EventEmitterAsyncResourceOptions)
+    /**
+     * Call all `destroy` hooks. This should only ever be called once. An error will
+     * be thrown if it is called more than once. This **must** be manually called. If
+     * the resource is left to be collected by the GC then the `destroy` hooks will
+     * never be called.
+     * @return A reference to `asyncResource`.
+     */
+    emitDestroy (): AsyncResource
+    /** The unique asyncId assigned to the resource. */
+    get asyncId (): number
+    /** The same triggerAsyncId that is passed to the AsyncResource constructor. */
+    get triggerAsyncId (): number
+    /** The underlying AsyncResource */
+    get asyncResource (): AsyncResource & {
+      readonly eventEmitter: EventEmitterAsyncResource
+    }
+  }
+}
index dd22f49e57b88084caed4f33d6a0c1e01b445c78..612498563d83d251e95bd36740103591d5b37605 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Convert pool event emitter to event emitter async resource.
+
 ## [2.7.2] - 2023-09-23
 
 ### Changed
index 4f7c074dee3b704f4c1857b006bd111b1cc822b7..91185f77d7abb7bbc9acbd306251dbcb9e62b041 100644 (file)
@@ -126,7 +126,7 @@ An object with these properties:
   Default: `true`
 - `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.  
   Default: `true`
-- `enableEvents` (optional) - Events emission enablement in this pool.  
+- `enableEvents` (optional) - Events integrated with async resource emission enablement in this pool.  
   Default: `true`
 - `enableTasksQueue` (optional) - Tasks queue per worker enablement in this pool.  
   Default: `false`
index 70459dbbee743d82f158eb9739e9bda8367478e4..f7e6f8d2c17b1e1ef0773f7273541c42bad55dde 100644 (file)
@@ -70,7 +70,7 @@ export abstract class AbstractPool<
   public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
 
   /** @inheritDoc */
-  public readonly emitter?: PoolEmitter
+  public emitter?: PoolEmitter
 
   /**
    * The task execution response promise map:
@@ -142,7 +142,7 @@ export abstract class AbstractPool<
     this.enqueueTask = this.enqueueTask.bind(this)
 
     if (this.opts.enableEvents === true) {
-      this.emitter = new PoolEmitter()
+      this.initializeEventEmitter()
     }
     this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
     Worker,
@@ -261,6 +261,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private initializeEventEmitter (): void {
+    this.emitter = new PoolEmitter({
+      name: `poolifier:${this.type}-${this.worker}-pool`
+    })
+  }
+
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
@@ -938,6 +944,7 @@ export abstract class AbstractPool<
       })
     )
     this.emitter?.emit(PoolEvents.destroy, this.info)
+    this.emitter?.emitDestroy()
     this.started = false
   }
 
index f0c7d886fb5258db41f4c6c7277d474afe200369..e6388b03fc8544f1a5ee3b6eac62d862b51d323e 100644 (file)
@@ -1,4 +1,4 @@
-import { EventEmitter } from 'node:events'
+import { EventEmitterAsyncResource } from 'node:events'
 import type { TransferListItem } from 'node:worker_threads'
 import type { TaskFunction } from '../worker/task-functions'
 import type {
@@ -35,9 +35,9 @@ export const PoolTypes = Object.freeze({
 export type PoolType = keyof typeof PoolTypes
 
 /**
- * Pool events emitter.
+ * Pool event emitter integrated with async resource.
  */
-export class PoolEmitter extends EventEmitter {}
+export class PoolEmitter extends EventEmitterAsyncResource {}
 
 /**
  * Enumeration of pool events.
@@ -179,7 +179,7 @@ export interface PoolOptions<Worker extends IWorker> {
    */
   restartWorkerOnError?: boolean
   /**
-   * Pool events emission.
+   * Pool events integrated with async resource emission.
    *
    * @defaultValue true
    */
@@ -227,7 +227,8 @@ export interface IPool<
    */
   readonly hasWorkerNodeBackPressure: (workerNodeKey: number) => boolean
   /**
-   * Emitter on which events can be listened to.
+   * Event emitter integrated with `AsyncResource` on which events can be listened to.
+   * The async tracking tooling identifier is `poolifier:<PoolType>-<WorkerType>-pool`.
    *
    * Events that can currently be listened to:
    *
index 201a516c56e1d8ab2a3d2d569c4d54d7f0fd484d..0fd9095625fa7ab6bf4bd9203cbf9b4e2ef03348 100644 (file)
@@ -33,7 +33,7 @@ export class ClusterWorker<
     opts: WorkerOptions = {}
   ) {
     super(
-      'worker-cluster-pool:poolifier',
+      'poolifier:cluster-worker',
       cluster.isPrimary,
       cluster.worker as Worker,
       taskFunctions,
index 7b92caf67201795797f6e3e06fff4fc593f4ee94..bfd1c13cdb41775c0c01d031cb0aedf9d364959b 100644 (file)
@@ -42,7 +42,7 @@ export class ThreadWorker<
     opts: WorkerOptions = {}
   ) {
     super(
-      'worker-thread-pool:poolifier',
+      'poolifier:thread-worker',
       isMainThread,
       parentPort as MessagePort,
       taskFunctions,
index 85d48d6a4e4eecaab910c20814a8abc471cc6fe3..cb04292b2efc58614b5e95d1655e4ccc4237a927 100644 (file)
@@ -1,4 +1,4 @@
-const { EventEmitter } = require('node:events')
+const { EventEmitterAsyncResource } = require('node:events')
 const { expect } = require('expect')
 const sinon = require('sinon')
 const {
@@ -186,7 +186,7 @@ describe('Abstract pool test suite', () => {
       numberOfWorkers,
       './tests/worker-files/thread/testWorker.js'
     )
-    expect(pool.emitter).toBeInstanceOf(EventEmitter)
+    expect(pool.emitter).toBeInstanceOf(EventEmitterAsyncResource)
     expect(pool.opts).toStrictEqual({
       startWorkers: true,
       enableEvents: true,
index 79ab1b8258e732583dae5ddac8c3736899e85ff3..1bd56853f33732df968911426eddccc9ea1c1a46 100644 (file)
@@ -4,6 +4,7 @@
     "target": "ES2022",
     "module": "ES2022",
     "moduleResolution": "Node",
+    "typeRoots": ["./node_modules/@types", "./@types"],
     "declaration": true,
     "declarationDir": "./lib/dts",
     "strict": true,