fix: rely on node event emitter for worker node events
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 25 Nov 2023 13:50:00 +0000 (14:50 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 25 Nov 2023 13:50:00 +0000 (14:50 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts

index 5da69ca1cfd8190cdf39e78c0d5867e4218b140d..eb92eb548903a719169f15e658ccbfe86cefe4be 100644 (file)
@@ -630,36 +630,36 @@ export abstract class AbstractPool<
 
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].addEventListener(
+      this.workerNodes[workerNodeKey].on(
         'idleWorkerNode',
-        this.handleIdleWorkerNodeEvent as EventListener
+        this.handleIdleWorkerNodeEvent
       )
     }
   }
 
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].removeEventListener(
+      this.workerNodes[workerNodeKey].off(
         'idleWorkerNode',
-        this.handleIdleWorkerNodeEvent as EventListener
+        this.handleIdleWorkerNodeEvent
       )
     }
   }
 
   private setTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].addEventListener(
+      this.workerNodes[workerNodeKey].on(
         'backPressure',
-        this.handleBackPressureEvent as EventListener
+        this.handleBackPressureEvent
       )
     }
   }
 
   private unsetTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].removeEventListener(
+      this.workerNodes[workerNodeKey].off(
         'backPressure',
-        this.handleBackPressureEvent as EventListener
+        this.handleBackPressureEvent
       )
     }
   }
@@ -1403,15 +1403,15 @@ export abstract class AbstractPool<
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
-        this.workerNodes[workerNodeKey].addEventListener(
+        this.workerNodes[workerNodeKey].on(
           'idleWorkerNode',
-          this.handleIdleWorkerNodeEvent as EventListener
+          this.handleIdleWorkerNodeEvent
         )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
-        this.workerNodes[workerNodeKey].addEventListener(
+        this.workerNodes[workerNodeKey].on(
           'backPressure',
-          this.handleBackPressureEvent as EventListener
+          this.handleBackPressureEvent
         )
       }
     }
@@ -1532,10 +1532,10 @@ export abstract class AbstractPool<
   }
 
   private readonly handleIdleWorkerNodeEvent = (
-    event: CustomEvent<WorkerNodeEventDetail>,
+    eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
-    const { workerNodeKey } = event.detail
+    const { workerNodeKey } = eventDetail
     if (workerNodeKey == null) {
       throw new Error(
         'WorkerNode event detail workerNodeKey attribute must be defined'
@@ -1586,7 +1586,7 @@ export abstract class AbstractPool<
     }
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
-        this.handleIdleWorkerNodeEvent(event, stolenTask)
+        this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
         return undefined
       })
       .catch(EMPTY_FUNCTION)
@@ -1624,9 +1624,9 @@ export abstract class AbstractPool<
   }
 
   private readonly handleBackPressureEvent = (
-    event: CustomEvent<WorkerNodeEventDetail>
+    eventDetail: WorkerNodeEventDetail
   ): void => {
-    const { workerId } = event.detail
+    const { workerId } = eventDetail
     const sizeOffset = 1
     if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
       return
@@ -1728,11 +1728,10 @@ export abstract class AbstractPool<
           this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNodeTasksUsage.sequentiallyStolen === 0
         ) {
-          this.workerNodes[workerNodeKey].dispatchEvent(
-            new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
-              detail: { workerId: workerId as number, workerNodeKey }
-            })
-          )
+          this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+            workerId: workerId as number,
+            workerNodeKey
+          })
         }
       }
     }
index 3de3cb809cda9a3a3f2787579371a467e3f9d200..59c4de7a388e81434c373f1835b144c1b3c88f57 100644 (file)
@@ -1,4 +1,5 @@
 import { MessageChannel } from 'node:worker_threads'
+import { EventEmitter } from 'node:events'
 import { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
@@ -8,7 +9,6 @@ import {
   type IWorkerNode,
   type StrategyData,
   type WorkerInfo,
-  type WorkerNodeEventDetail,
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
@@ -22,7 +22,7 @@ import { checkWorkerNodeArguments } from './utils'
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  */
 export class WorkerNode<Worker extends IWorker, Data = unknown>
-  extends EventTarget
+  extends EventEmitter
   implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public readonly worker: Worker
@@ -71,11 +71,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     const tasksQueueSize = this.tasksQueue.push(task)
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
-      this.dispatchEvent(
-        new CustomEvent<WorkerNodeEventDetail>('backPressure', {
-          detail: { workerId: this.info.id as number }
-        })
-      )
+      this.emit('backPressure', { workerId: this.info.id as number })
       this.onBackPressureStarted = false
     }
     return tasksQueueSize
@@ -86,11 +82,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     const tasksQueueSize = this.tasksQueue.unshift(task)
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
-      this.dispatchEvent(
-        new CustomEvent<WorkerNodeEventDetail>('backPressure', {
-          detail: { workerId: this.info.id as number }
-        })
-      )
+      this.emit('backPressure', { workerId: this.info.id as number })
       this.onBackPressureStarted = false
     }
     return tasksQueueSize
index 776986d5000f2c2d33cdf8f9daf444038df74ef6..5439606d420f0abd165b84c49157808de230aee6 100644 (file)
@@ -1,4 +1,5 @@
 import type { MessageChannel } from 'node:worker_threads'
+import type { EventEmitter } from 'node:events'
 import type { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
 
@@ -238,7 +239,7 @@ export interface WorkerNodeEventDetail {
  * @internal
  */
 export interface IWorkerNode<Worker extends IWorker, Data = unknown>
-  extends EventTarget {
+  extends EventEmitter {
   /**
    * Worker.
    */