Merge dependabot/npm_and_yarn/examples/typescript/http-client-pool/types/node-20...
[poolifier.git] / src / pools / abstract-pool.ts
index 628ad32ff8bac28bfb71a4355e79f7c7594e2236..604c7af8ad32dde3959dcb75166427458999c9f7 100644 (file)
@@ -17,6 +17,7 @@ import {
   max,
   median,
   min,
+  once,
   round
 } from '../utils'
 import { KillBehaviors } from '../worker/worker-options'
@@ -34,6 +35,7 @@ import type {
   IWorker,
   IWorkerNode,
   WorkerInfo,
+  WorkerNodeEventDetail,
   WorkerType,
   WorkerUsage
 } from './worker'
@@ -494,10 +496,7 @@ export abstract class AbstractPool<
   private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
     if (message.workerId == null) {
       throw new Error('Worker message received without worker id')
-    } else if (
-      message.workerId != null &&
-      this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
-    ) {
+    } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
       throw new Error(
         `Worker message received from unknown worker '${message.workerId}'`
       )
@@ -619,27 +618,37 @@ export abstract class AbstractPool<
 
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].onEmptyQueue =
-        this.taskStealingOnEmptyQueue.bind(this)
+      this.workerNodes[workerNodeKey].addEventListener(
+        'emptyqueue',
+        this.handleEmptyQueueEvent as EventListener
+      )
     }
   }
 
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      delete this.workerNodes[workerNodeKey].onEmptyQueue
+      this.workerNodes[workerNodeKey].removeEventListener(
+        'emptyqueue',
+        this.handleEmptyQueueEvent as EventListener
+      )
     }
   }
 
   private setTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].onBackPressure =
-        this.tasksStealingOnBackPressure.bind(this)
+      this.workerNodes[workerNodeKey].addEventListener(
+        'backpressure',
+        this.handleBackPressureEvent as EventListener
+      )
     }
   }
 
   private unsetTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      delete this.workerNodes[workerNodeKey].onBackPressure
+      this.workerNodes[workerNodeKey].removeEventListener(
+        'backpressure',
+        this.handleBackPressureEvent as EventListener
+      )
     }
   }
 
@@ -966,6 +975,7 @@ export abstract class AbstractPool<
           )
         }
       }
+      // FIXME: should be registered only once
       this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
       this.sendToWorker(workerNodeKey, { kill: true })
     })
@@ -1211,10 +1221,10 @@ export abstract class AbstractPool<
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
     worker.on('error', error => {
       const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
+      this.flagWorkerNodeAsNotReady(workerNodeKey)
       const workerInfo = this.getWorkerInfo(workerNodeKey)
-      workerInfo.ready = false
-      this.workerNodes[workerNodeKey].closeChannel()
       this.emitter?.emit(PoolEvents.error, error)
+      this.workerNodes[workerNodeKey].closeChannel()
       if (
         this.started &&
         !this.starting &&
@@ -1265,6 +1275,8 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
+        // Flag the worker node as not ready immediately
+        this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
         this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
           this.emitter?.emit(PoolEvents.error, error)
         })
@@ -1353,12 +1365,16 @@ export abstract class AbstractPool<
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
-        this.workerNodes[workerNodeKey].onEmptyQueue =
-          this.taskStealingOnEmptyQueue.bind(this)
+        this.workerNodes[workerNodeKey].addEventListener(
+          'emptyqueue',
+          this.handleEmptyQueueEvent as EventListener
+        )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
-        this.workerNodes[workerNodeKey].onBackPressure =
-          this.tasksStealingOnBackPressure.bind(this)
+        this.workerNodes[workerNodeKey].addEventListener(
+          'backpressure',
+          this.handleBackPressureEvent as EventListener
+        )
       }
     }
   }
@@ -1427,8 +1443,12 @@ export abstract class AbstractPool<
     }
   }
 
-  private taskStealingOnEmptyQueue (workerId: number): void {
-    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
+  private readonly handleEmptyQueueEvent = (
+    event: CustomEvent<WorkerNodeEventDetail>
+  ): void => {
+    const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
+      event.detail.workerId
+    )
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1438,7 +1458,7 @@ export abstract class AbstractPool<
     const sourceWorkerNode = workerNodes.find(
       workerNode =>
         workerNode.info.ready &&
-        workerNode.info.id !== workerId &&
+        workerNode.info.id !== event.detail.workerId &&
         workerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
@@ -1455,13 +1475,15 @@ export abstract class AbstractPool<
     }
   }
 
-  private tasksStealingOnBackPressure (workerId: number): void {
+  private readonly handleBackPressureEvent = (
+    event: CustomEvent<WorkerNodeEventDetail>
+  ): void => {
     const sizeOffset = 1
     if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
       return
     }
     const sourceWorkerNode =
-      this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
+      this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1472,7 +1494,7 @@ export abstract class AbstractPool<
       if (
         sourceWorkerNode.usage.tasks.queued > 0 &&
         workerNode.info.ready &&
-        workerNode.info.id !== workerId &&
+        workerNode.info.id !== event.detail.workerId &&
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
@@ -1521,7 +1543,11 @@ export abstract class AbstractPool<
     workerInfo.ready = message.ready as boolean
     workerInfo.taskFunctionNames = message.taskFunctionNames
     if (this.ready) {
-      this.emitter?.emit(PoolEvents.ready, this.info)
+      const emitPoolReadyEventOnce = once(
+        () => this.emitter?.emit(PoolEvents.ready, this.info),
+        this
+      )
+      emitPoolReadyEventOnce()
     }
   }
 
@@ -1580,7 +1606,7 @@ export abstract class AbstractPool<
    * @returns The worker information.
    */
   protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
-    return this.workerNodes[workerNodeKey].info
+    return this.workerNodes[workerNodeKey]?.info
   }
 
   /**
@@ -1620,6 +1646,10 @@ export abstract class AbstractPool<
     }
   }
 
+  protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
+    this.getWorkerInfo(workerNodeKey).ready = false
+  }
+
   /** @inheritDoc */
   public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
     return (