fix: unregister worker callbacks after usage
[poolifier.git] / src / pools / abstract-pool.ts
index 52f9c0e3fc25aeb6a2169cf49243cfacb03d2153..88e5fe79e782b06cbf507c794ab85c54d1bceaa9 100644 (file)
@@ -30,12 +30,12 @@ import {
   PoolTypes,
   type TasksQueueOptions
 } from './pool'
-import type {
-  IWorker,
-  IWorkerNode,
-  WorkerInfo,
-  WorkerType,
-  WorkerUsage
+import {
+  type IWorker,
+  type IWorkerNode,
+  type WorkerInfo,
+  type WorkerType,
+  type WorkerUsage
 } from './worker'
 import {
   type MeasurementStatisticsRequirements,
@@ -682,28 +682,38 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
-      const workerId = this.getWorkerInfo(workerNodeKey).id as number
-      this.registerWorkerMessageListener(workerNodeKey, message => {
+      const taskFunctionOperationListener = (
+        message: MessageValue<Response>
+      ): void => {
+        this.checkMessageWorkerId(message)
+        const workerId = this.getWorkerInfo(workerNodeKey).id as number
         if (
-          message.workerId === workerId &&
-          message.taskFunctionOperationStatus === true
-        ) {
-          resolve(true)
-        } else if (
-          message.workerId === workerId &&
-          message.taskFunctionOperationStatus === false
+          message.taskFunctionOperationStatus != null &&
+          message.workerId === workerId
         ) {
-          reject(
-            new Error(
-              `Task function operation '${
-                message.taskFunctionOperation as string
-              }' failed on worker ${message.workerId} with error: '${
-                message.workerError?.message as string
-              }'`
+          if (message.taskFunctionOperationStatus) {
+            resolve(true)
+          } else if (!message.taskFunctionOperationStatus) {
+            reject(
+              new Error(
+                `Task function operation '${
+                  message.taskFunctionOperation as string
+                }' failed on worker ${message.workerId} with error: '${
+                  message.workerError?.message as string
+                }'`
+              )
             )
+          }
+          this.deregisterWorkerMessageListener(
+            this.getWorkerNodeKeyByWorkerId(message.workerId),
+            taskFunctionOperationListener
           )
         }
-      })
+      }
+      this.registerWorkerMessageListener(
+        workerNodeKey,
+        taskFunctionOperationListener
+      )
       this.sendToWorker(workerNodeKey, message)
     })
   }
@@ -712,20 +722,21 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
-      const responsesReceived = new Array<MessageValue<Data | Response>>()
-      for (const [workerNodeKey] of this.workerNodes.entries()) {
-        this.registerWorkerMessageListener(workerNodeKey, message => {
-          if (message.taskFunctionOperationStatus != null) {
-            responsesReceived.push(message)
+      const responsesReceived = new Array<MessageValue<Response>>()
+      const taskFunctionOperationsListener = (
+        message: MessageValue<Response>
+      ): void => {
+        this.checkMessageWorkerId(message)
+        if (message.taskFunctionOperationStatus != null) {
+          responsesReceived.push(message)
+          if (responsesReceived.length === this.workerNodes.length) {
             if (
-              responsesReceived.length === this.workerNodes.length &&
               responsesReceived.every(
                 message => message.taskFunctionOperationStatus === true
               )
             ) {
               resolve(true)
             } else if (
-              responsesReceived.length === this.workerNodes.length &&
               responsesReceived.some(
                 message => message.taskFunctionOperationStatus === false
               )
@@ -745,8 +756,18 @@ export abstract class AbstractPool<
                 )
               )
             }
+            this.deregisterWorkerMessageListener(
+              this.getWorkerNodeKeyByWorkerId(message.workerId),
+              taskFunctionOperationsListener
+            )
           }
-        })
+        }
+      }
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(
+          workerNodeKey,
+          taskFunctionOperationsListener
+        )
         this.sendToWorker(workerNodeKey, message)
       }
     })
@@ -924,19 +945,21 @@ export abstract class AbstractPool<
     workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
-      this.registerWorkerMessageListener(workerNodeKey, message => {
+      const killMessageListener = (message: MessageValue<Response>): void => {
+        this.checkMessageWorkerId(message)
         if (message.kill === 'success') {
           resolve()
         } else if (message.kill === 'failure') {
           reject(
             new Error(
-              `Worker ${
+              `Kill message handling failed on worker ${
                 message.workerId as number
-              } kill message handling failed`
+              }`
             )
           )
         }
-      })
+      }
+      this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
       this.sendToWorker(workerNodeKey, { kill: true })
     })
   }
@@ -1278,6 +1301,32 @@ export abstract class AbstractPool<
     listener: (message: MessageValue<Message>) => void
   ): void
 
+  /**
+   * Registers once a listener callback on the worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param listener - The message listener callback.
+   */
+  protected abstract registerOnceWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void
+
+  /**
+   * Deregisters a listener callback on the worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param listener - The message listener callback.
+   */
+  protected abstract deregisterWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void
+
   /**
    * Method hooked up after a worker node has been newly created.
    * Can be overridden.