perf: optimize tasks stealing in corner case
[poolifier.git] / src / pools / abstract-pool.ts
index 5da69ca1cfd8190cdf39e78c0d5867e4218b140d..7ea9379665050a982259a4181c934a7eecc81adb 100644 (file)
@@ -594,11 +594,13 @@ export abstract class AbstractPool<
         this.buildTasksQueueOptions(tasksQueueOptions)
       this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
       if (this.opts.tasksQueueOptions.taskStealing === true) {
+        this.unsetTaskStealing()
         this.setTaskStealing()
       } else {
         this.unsetTaskStealing()
       }
       if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
+        this.unsetTasksStealingOnBackPressure()
         this.setTasksStealingOnBackPressure()
       } else {
         this.unsetTasksStealingOnBackPressure()
@@ -630,36 +632,36 @@ export abstract class AbstractPool<
 
   private setTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].addEventListener(
+      this.workerNodes[workerNodeKey].on(
         'idleWorkerNode',
-        this.handleIdleWorkerNodeEvent as EventListener
+        this.handleIdleWorkerNodeEvent
       )
     }
   }
 
   private unsetTaskStealing (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].removeEventListener(
+      this.workerNodes[workerNodeKey].off(
         'idleWorkerNode',
-        this.handleIdleWorkerNodeEvent as EventListener
+        this.handleIdleWorkerNodeEvent
       )
     }
   }
 
   private setTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].addEventListener(
+      this.workerNodes[workerNodeKey].on(
         'backPressure',
-        this.handleBackPressureEvent as EventListener
+        this.handleBackPressureEvent
       )
     }
   }
 
   private unsetTasksStealingOnBackPressure (): void {
     for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.workerNodes[workerNodeKey].removeEventListener(
+      this.workerNodes[workerNodeKey].off(
         'backPressure',
-        this.handleBackPressureEvent as EventListener
+        this.handleBackPressureEvent
       )
     }
   }
@@ -983,12 +985,13 @@ export abstract class AbstractPool<
     }
     this.destroying = true
     await Promise.all(
-      this.workerNodes.map(async (_, workerNodeKey) => {
+      this.workerNodes.map(async (_workerNode, workerNodeKey) => {
         await this.destroyWorkerNode(workerNodeKey)
       })
     )
     this.emitter?.emit(PoolEvents.destroy, this.info)
     this.emitter?.emitDestroy()
+    this.emitter?.removeAllListeners()
     this.readyEventEmitted = false
     this.destroying = false
     this.started = false
@@ -1395,7 +1398,7 @@ export abstract class AbstractPool<
     // Listen to worker messages.
     this.registerWorkerMessageListener(
       workerNodeKey,
-      this.workerMessageListener.bind(this)
+      this.workerMessageListener
     )
     // Send the startup message to worker.
     this.sendStartupMessageToWorker(workerNodeKey)
@@ -1403,15 +1406,15 @@ export abstract class AbstractPool<
     this.sendStatisticsMessageToWorker(workerNodeKey)
     if (this.opts.enableTasksQueue === true) {
       if (this.opts.tasksQueueOptions?.taskStealing === true) {
-        this.workerNodes[workerNodeKey].addEventListener(
+        this.workerNodes[workerNodeKey].on(
           'idleWorkerNode',
-          this.handleIdleWorkerNodeEvent as EventListener
+          this.handleIdleWorkerNodeEvent
         )
       }
       if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
-        this.workerNodes[workerNodeKey].addEventListener(
+        this.workerNodes[workerNodeKey].on(
           'backPressure',
-          this.handleBackPressureEvent as EventListener
+          this.handleBackPressureEvent
         )
       }
     }
@@ -1442,6 +1445,9 @@ export abstract class AbstractPool<
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
+    if (this.workerNodes.length <= 1) {
+      return
+    }
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       const destinationWorkerNodeKey = this.workerNodes.reduce(
         (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
@@ -1532,10 +1538,13 @@ export abstract class AbstractPool<
   }
 
   private readonly handleIdleWorkerNodeEvent = (
-    event: CustomEvent<WorkerNodeEventDetail>,
+    eventDetail: WorkerNodeEventDetail,
     previousStolenTask?: Task<Data>
   ): void => {
-    const { workerNodeKey } = event.detail
+    if (this.workerNodes.length <= 1) {
+      return
+    }
+    const { workerNodeKey } = eventDetail
     if (workerNodeKey == null) {
       throw new Error(
         'WorkerNode event detail workerNodeKey attribute must be defined'
@@ -1586,7 +1595,7 @@ export abstract class AbstractPool<
     }
     sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
       .then(() => {
-        this.handleIdleWorkerNodeEvent(event, stolenTask)
+        this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
         return undefined
       })
       .catch(EMPTY_FUNCTION)
@@ -1624,9 +1633,12 @@ export abstract class AbstractPool<
   }
 
   private readonly handleBackPressureEvent = (
-    event: CustomEvent<WorkerNodeEventDetail>
+    eventDetail: WorkerNodeEventDetail
   ): void => {
-    const { workerId } = event.detail
+    if (this.workerNodes.length <= 1) {
+      return
+    }
+    const { workerId } = eventDetail
     const sizeOffset = 1
     if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
       return
@@ -1664,7 +1676,9 @@ export abstract class AbstractPool<
   /**
    * This method is the message listener registered on each worker.
    */
-  protected workerMessageListener (message: MessageValue<Response>): void {
+  protected readonly workerMessageListener = (
+    message: MessageValue<Response>
+  ): void => {
     this.checkMessageWorkerId(message)
     const { workerId, ready, taskId, taskFunctionNames } = message
     if (ready != null && taskFunctionNames != null) {
@@ -1728,11 +1742,10 @@ export abstract class AbstractPool<
           this.tasksQueueSize(workerNodeKey) === 0 &&
           workerNodeTasksUsage.sequentiallyStolen === 0
         ) {
-          this.workerNodes[workerNodeKey].dispatchEvent(
-            new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
-              detail: { workerId: workerId as number, workerNodeKey }
-            })
-          )
+          this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
+            workerId: workerId as number,
+            workerNodeKey
+          })
         }
       }
     }