feat: make worker node an event target
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 13 Oct 2023 19:38:49 +0000 (21:38 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Fri, 13 Oct 2023 19:38:49 +0000 (21:38 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/index.ts
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
tests/pools/abstract-pool.test.mjs

index f3bbe5031e3d8b94e65c1e0dfb343fa6c5e6c301..1e5ecbff62f82623a76feaa591c71a334a3c36c3 100644 (file)
@@ -26,7 +26,6 @@ export type {
   StrategyData,
   TaskStatistics,
   WorkerInfo,
-  WorkerNodeEventCallback,
   WorkerType,
   WorkerUsage
 } from './pools/worker'
index 08a955a93846ec406ee8a57053722549c56e476d..ba87b704879abb5f8171f97681fc9a9148c0059e 100644 (file)
@@ -34,6 +34,7 @@ import type {
   IWorker,
   IWorkerNode,
   WorkerInfo,
+  WorkerNodeEventDetail,
   WorkerType,
   WorkerUsage
 } from './worker'
@@ -619,27 +620,37 @@ export abstract class AbstractPool<
 
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].onEmptyQueue =
-        this.taskStealingOnEmptyQueue.bind(this)
+      this.workerNodes[workerNodeKey].addEventListener(
+        'emptyqueue',
+        this.handleEmptyQueueEvent
+      )
     }
   }
 
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      delete this.workerNodes[workerNodeKey].onEmptyQueue
+      this.workerNodes[workerNodeKey].removeEventListener(
+        'emptyqueue',
+        this.handleEmptyQueueEvent
+      )
     }
   }
 
   private setTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].onBackPressure =
-        this.tasksStealingOnBackPressure.bind(this)
+      this.workerNodes[workerNodeKey].addEventListener(
+        'backpressure',
+        this.handleBackPressureEvent
+      )
     }
   }
 
   private unsetTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      delete this.workerNodes[workerNodeKey].onBackPressure
+      this.workerNodes[workerNodeKey].removeEventListener(
+        'backpressure',
+        this.handleBackPressureEvent
+      )
     }
   }
 
@@ -1353,12 +1364,16 @@ export abstract class AbstractPool<
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
-        this.workerNodes[workerNodeKey].onEmptyQueue =
-          this.taskStealingOnEmptyQueue.bind(this)
+        this.workerNodes[workerNodeKey].addEventListener(
+          'emptyqueue',
+          this.handleEmptyQueueEvent.bind(this)
+        )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
-        this.workerNodes[workerNodeKey].onBackPressure =
-          this.tasksStealingOnBackPressure.bind(this)
+        this.workerNodes[workerNodeKey].addEventListener(
+          'backpressure',
+          this.handleBackPressureEvent.bind(this)
+        )
       }
     }
   }
@@ -1427,8 +1442,12 @@ export abstract class AbstractPool<
     }
   }
 
-  private taskStealingOnEmptyQueue (workerId: number): void {
-    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+  private readonly handleEmptyQueueEvent = (
+    event: CustomEvent<WorkerNodeEventDetail>
+  ): void => {
+    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
+      event.detail.workerId
+    )
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1438,7 +1457,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       workerNode =>
         workerNode.info.ready &&
-        workerNode.info.id !== workerId &&
+        workerNode.info.id !== event.detail.workerId &&
         workerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
@@ -1455,13 +1474,15 @@ export abstract class AbstractPool<
     }
   }
 
-  private tasksStealingOnBackPressure (workerId: number): void {
+  private readonly handleBackPressureEvent = (
+    event: CustomEvent<WorkerNodeEventDetail>
+  ): void => {
     const sizeOffset = 1
     if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
       return
     }
     const sourceWorkerNode =
-      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1472,7 +1493,7 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
-        workerNode.info.id !== workerId &&
+        workerNode.info.id !== event.detail.workerId &&
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
index 6eb2d646bdcaa86f93eb07cf341012463f295a9a..718fd4894d72cc76e560374c7a82cc7e2f643533 100644 (file)
@@ -15,7 +15,7 @@ import {
   type IWorkerNode,
   type StrategyData,
   type WorkerInfo,
-  type WorkerNodeEventCallback,
+  type WorkerNodeEventDetail,
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
@@ -29,7 +29,8 @@ import { checkWorkerNodeArguments } from './utils'
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  */
 export class WorkerNode<Worker extends IWorker, Data = unknown>
-implements IWorkerNode<Worker, Data> {
+  extends EventTarget
+  implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public readonly worker: Worker
   /** @inheritdoc */
@@ -42,10 +43,6 @@ implements IWorkerNode<Worker, Data> {
   public messageChannel?: MessageChannel
   /** @inheritdoc */
   public tasksQueueBackPressureSize: number
-  /** @inheritdoc */
-  public onBackPressure?: WorkerNodeEventCallback
-  /** @inheritdoc */
-  public onEmptyQueue?: WorkerNodeEventCallback
   private readonly tasksQueue: Deque<Task<Data>>
   private onBackPressureStarted: boolean
   private onEmptyQueueCount: number
@@ -58,6 +55,7 @@ implements IWorkerNode<Worker, Data> {
    * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
    */
   constructor (worker: Worker, tasksQueueBackPressureSize: number) {
+    super()
     checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
     this.worker = worker
     this.info = this.initWorkerInfo(worker)
@@ -80,13 +78,13 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.push(task)
-    if (
-      this.onBackPressure != null &&
-      this.hasBackPressure() &&
-      !this.onBackPressureStarted
-    ) {
+    if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
-      this.onBackPressure(this.info.id as number)
+      this.dispatchEvent(
+        new CustomEvent<WorkerNodeEventDetail>('backpressure', {
+          detail: { workerId: this.info.id as number }
+        })
+      )
       this.onBackPressureStarted = false
     }
     return tasksQueueSize
@@ -95,13 +93,13 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public unshiftTask (task: Task<Data>): number {
     const tasksQueueSize = this.tasksQueue.unshift(task)
-    if (
-      this.onBackPressure != null &&
-      this.hasBackPressure() &&
-      !this.onBackPressureStarted
-    ) {
+    if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
-      this.onBackPressure(this.info.id as number)
+      this.dispatchEvent(
+        new CustomEvent<WorkerNodeEventDetail>('backpressure', {
+          detail: { workerId: this.info.id as number }
+        })
+      )
       this.onBackPressureStarted = false
     }
     return tasksQueueSize
@@ -110,11 +108,7 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public dequeueTask (): Task<Data> | undefined {
     const task = this.tasksQueue.shift()
-    if (
-      this.onEmptyQueue != null &&
-      this.tasksQueue.size === 0 &&
-      this.onEmptyQueueCount === 0
-    ) {
+    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
       this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
     }
     return task
@@ -123,11 +117,7 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public popTask (): Task<Data> | undefined {
     const task = this.tasksQueue.pop()
-    if (
-      this.onEmptyQueue != null &&
-      this.tasksQueue.size === 0 &&
-      this.onEmptyQueueCount === 0
-    ) {
+    if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
       this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
     }
     return task
@@ -198,7 +188,11 @@ implements IWorkerNode<Worker, Data> {
       return
     }
     ++this.onEmptyQueueCount
-    this.onEmptyQueue?.(this.info.id as number)
+    this.dispatchEvent(
+      new CustomEvent<WorkerNodeEventDetail>('emptyqueue', {
+        detail: { workerId: this.info.id as number }
+      })
+    )
     await sleep(exponentialDelay(this.onEmptyQueueCount))
     await this.startOnEmptyQueue()
   }
index 4c877bc8b1dcc58871a55cd70c87871d918c4596..43e7fc608cb9be8d569ade1acc7dc72fe2194cad 100644 (file)
@@ -209,12 +209,13 @@ export interface IWorker {
 }
 
 /**
- * Worker node event callback.
+ * Worker node event detail.
  *
- * @param workerId - The worker id.
  * @internal
  */
-export type WorkerNodeEventCallback = (workerId: number) => void
+export interface WorkerNodeEventDetail {
+  workerId: number
+}
 
 /**
  * Worker node interface.
@@ -223,7 +224,8 @@ export type WorkerNodeEventCallback = (workerId: number) => void
  * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
  * @internal
  */
-export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
+export interface IWorkerNode<Worker extends IWorker, Data = unknown>
+  extends EventTarget {
   /**
    * Worker.
    */
@@ -250,14 +252,6 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * This is the number of tasks that can be enqueued before the worker node has back pressure.
    */
   tasksQueueBackPressureSize: number
-  /**
-   * Callback invoked when worker node tasks queue is back pressured.
-   */
-  onBackPressure?: WorkerNodeEventCallback
-  /**
-   * Callback invoked when worker node tasks queue is empty.
-   */
-  onEmptyQueue?: WorkerNodeEventCallback
   /**
    * Tasks queue size.
    *
index c42dbc94e7b271668fea4362612680017c616697..896610f869006606c23c81a3d09bdb54a0d90dcb 100644 (file)
@@ -634,10 +634,6 @@ describe('Abstract pool test suite', () => {
     )
     expect(pool.opts.enableTasksQueue).toBe(false)
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeUndefined()
-      expect(workerNode.onBackPressure).toBeUndefined()
-    }
     pool.enableTasksQueue(true)
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
@@ -646,10 +642,6 @@ describe('Abstract pool test suite', () => {
       taskStealing: true,
       tasksStealingOnBackPressure: true
     })
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
-    }
     pool.enableTasksQueue(true, { concurrency: 2 })
     expect(pool.opts.enableTasksQueue).toBe(true)
     expect(pool.opts.tasksQueueOptions).toStrictEqual({
@@ -658,17 +650,9 @@ describe('Abstract pool test suite', () => {
       taskStealing: true,
       tasksStealingOnBackPressure: true
     })
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
-    }
     pool.enableTasksQueue(false)
     expect(pool.opts.enableTasksQueue).toBe(false)
     expect(pool.opts.tasksQueueOptions).toBeUndefined()
-    for (const workerNode of pool.workerNodes) {
-      expect(workerNode.onEmptyQueue).toBeUndefined()
-      expect(workerNode.onBackPressure).toBeUndefined()
-    }
     await pool.destroy()
   })
 
@@ -688,8 +672,6 @@ describe('Abstract pool test suite', () => {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
         pool.opts.tasksQueueOptions.size
       )
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
     }
     pool.setTasksQueueOptions({
       concurrency: 2,
@@ -707,8 +689,6 @@ describe('Abstract pool test suite', () => {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
         pool.opts.tasksQueueOptions.size
       )
-      expect(workerNode.onEmptyQueue).toBeUndefined()
-      expect(workerNode.onBackPressure).toBeUndefined()
     }
     pool.setTasksQueueOptions({
       concurrency: 1,
@@ -725,8 +705,6 @@ describe('Abstract pool test suite', () => {
       expect(workerNode.tasksQueueBackPressureSize).toBe(
         pool.opts.tasksQueueOptions.size
       )
-      expect(workerNode.onEmptyQueue).toBeInstanceOf(Function)
-      expect(workerNode.onBackPressure).toBeInstanceOf(Function)
     }
     expect(() => pool.setTasksQueueOptions('invalidTasksQueueOptions')).toThrow(
       new TypeError('Invalid tasks queue options: must be a plain object')