fix: unregister worker callbacks after usage
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 20 Sep 2023 16:14:36 +0000 (18:14 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 20 Sep 2023 16:14:36 +0000 (18:14 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
.eslintrc.js
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/worker/abstract-worker.ts

index 97f787aeb53f70fba974107339ec3d95182ffb51..564849169f6898f8d659d1866169139707b6f016 100644 (file)
@@ -94,7 +94,6 @@ module.exports = defineConfig({
           'typedoc',
           'unlink',
           'unref',
-          'unregister',
           'utf8',
           'webcrypto',
           'workerpool',
index 083f81797d9962e7e24355a8e3a09b56a2064383..7b37a041b6b627b0c44775e7b9fa5f713fd5f5ed 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Fixed
+
+- Ensure worker message listener used one time are removed after usage.
+
 ## [2.7.0] - 2023-09-19
 
 ### Fixed
index 52f9c0e3fc25aeb6a2169cf49243cfacb03d2153..88e5fe79e782b06cbf507c794ab85c54d1bceaa9 100644 (file)
@@ -30,12 +30,12 @@ import {
   PoolTypes,
   type TasksQueueOptions
 } from './pool'
-import type {
-  IWorker,
-  IWorkerNode,
-  WorkerInfo,
-  WorkerType,
-  WorkerUsage
+import {
+  type IWorker,
+  type IWorkerNode,
+  type WorkerInfo,
+  type WorkerType,
+  type WorkerUsage
 } from './worker'
 import {
   type MeasurementStatisticsRequirements,
@@ -682,28 +682,38 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
-      const workerId = this.getWorkerInfo(workerNodeKey).id as number
-      this.registerWorkerMessageListener(workerNodeKey, message => {
+      const taskFunctionOperationListener = (
+        message: MessageValue<Response>
+      ): void => {
+        this.checkMessageWorkerId(message)
+        const workerId = this.getWorkerInfo(workerNodeKey).id as number
         if (
-          message.workerId === workerId &&
-          message.taskFunctionOperationStatus === true
-        ) {
-          resolve(true)
-        } else if (
-          message.workerId === workerId &&
-          message.taskFunctionOperationStatus === false
+          message.taskFunctionOperationStatus != null &&
+          message.workerId === workerId
         ) {
-          reject(
-            new Error(
-              `Task function operation '${
-                message.taskFunctionOperation as string
-              }' failed on worker ${message.workerId} with error: '${
-                message.workerError?.message as string
-              }'`
+          if (message.taskFunctionOperationStatus) {
+            resolve(true)
+          } else if (!message.taskFunctionOperationStatus) {
+            reject(
+              new Error(
+                `Task function operation '${
+                  message.taskFunctionOperation as string
+                }' failed on worker ${message.workerId} with error: '${
+                  message.workerError?.message as string
+                }'`
+              )
             )
+          }
+          this.deregisterWorkerMessageListener(
+            this.getWorkerNodeKeyByWorkerId(message.workerId),
+            taskFunctionOperationListener
           )
         }
-      })
+      }
+      this.registerWorkerMessageListener(
+        workerNodeKey,
+        taskFunctionOperationListener
+      )
       this.sendToWorker(workerNodeKey, message)
     })
   }
@@ -712,20 +722,21 @@ export abstract class AbstractPool<
     message: MessageValue<Data>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
-      const responsesReceived = new Array<MessageValue<Data | Response>>()
-      for (const [workerNodeKey] of this.workerNodes.entries()) {
-        this.registerWorkerMessageListener(workerNodeKey, message => {
-          if (message.taskFunctionOperationStatus != null) {
-            responsesReceived.push(message)
+      const responsesReceived = new Array<MessageValue<Response>>()
+      const taskFunctionOperationsListener = (
+        message: MessageValue<Response>
+      ): void => {
+        this.checkMessageWorkerId(message)
+        if (message.taskFunctionOperationStatus != null) {
+          responsesReceived.push(message)
+          if (responsesReceived.length === this.workerNodes.length) {
             if (
-              responsesReceived.length === this.workerNodes.length &&
               responsesReceived.every(
                 message => message.taskFunctionOperationStatus === true
               )
             ) {
               resolve(true)
             } else if (
-              responsesReceived.length === this.workerNodes.length &&
               responsesReceived.some(
                 message => message.taskFunctionOperationStatus === false
               )
@@ -745,8 +756,18 @@ export abstract class AbstractPool<
                 )
               )
             }
+            this.deregisterWorkerMessageListener(
+              this.getWorkerNodeKeyByWorkerId(message.workerId),
+              taskFunctionOperationsListener
+            )
           }
-        })
+        }
+      }
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(
+          workerNodeKey,
+          taskFunctionOperationsListener
+        )
         this.sendToWorker(workerNodeKey, message)
       }
     })
@@ -924,19 +945,21 @@ export abstract class AbstractPool<
     workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
-      this.registerWorkerMessageListener(workerNodeKey, message => {
+      const killMessageListener = (message: MessageValue<Response>): void => {
+        this.checkMessageWorkerId(message)
         if (message.kill === 'success') {
           resolve()
         } else if (message.kill === 'failure') {
           reject(
             new Error(
-              `Worker ${
+              `Kill message handling failed on worker ${
                 message.workerId as number
-              } kill message handling failed`
+              }`
             )
           )
         }
-      })
+      }
+      this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
       this.sendToWorker(workerNodeKey, { kill: true })
     })
   }
@@ -1278,6 +1301,32 @@ export abstract class AbstractPool<
     listener: (message: MessageValue<Message>) => void
   ): void
 
+  /**
+   * Registers once a listener callback on the worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param listener - The message listener callback.
+   */
+  protected abstract registerOnceWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void
+
+  /**
+   * Deregisters a listener callback on the worker given its worker node key.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param listener - The message listener callback.
+   */
+  protected abstract deregisterWorkerMessageListener<
+    Message extends Data | Response
+  >(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void
+
   /**
    * Method hooked up after a worker node has been newly created.
    * Can be overridden.
index 9470cccd857321388d70e0bf7f09de1a20505cfe..d457b37790d0537c80e17635fcbb76c4b4599b6d 100644 (file)
@@ -66,11 +66,11 @@ export class FixedClusterPool<
     const workerNode = this.workerNodes[workerNodeKey]
     const worker = workerNode.worker
     const waitWorkerExit = new Promise<void>(resolve => {
-      worker.on('exit', () => {
+      worker.once('exit', () => {
         resolve()
       })
     })
-    worker.on('disconnect', () => {
+    worker.once('disconnect', () => {
       worker.kill()
     })
     await this.sendKillMessageToWorker(workerNodeKey)
@@ -104,6 +104,22 @@ export class FixedClusterPool<
     this.workerNodes[workerNodeKey].worker.on('message', listener)
   }
 
+  /** @inheritDoc */
+  protected registerOnceWorkerMessageListener<Message extends Data | Response>(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void {
+    this.workerNodes[workerNodeKey].worker.once('message', listener)
+  }
+
+  /** @inheritDoc */
+  protected deregisterWorkerMessageListener<Message extends Data | Response>(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void {
+    this.workerNodes[workerNodeKey].worker.off('message', listener)
+  }
+
   /** @inheritDoc */
   protected createWorker (): Worker {
     return cluster.fork(this.opts.env)
index 2ad94a7bf16083072bf1303745834c2b7c06506a..db278485a097fcb40a1846902ee8a7b36b548c3e 100644 (file)
@@ -63,7 +63,7 @@ export class FixedThreadPool<
     const workerNode = this.workerNodes[workerNodeKey]
     const worker = workerNode.worker
     const waitWorkerExit = new Promise<void>(resolve => {
-      worker.on('exit', () => {
+      worker.once('exit', () => {
         resolve()
       })
     })
@@ -112,6 +112,26 @@ export class FixedThreadPool<
     ).port1.on('message', listener)
   }
 
+  /** @inheritDoc */
+  protected registerOnceWorkerMessageListener<Message extends Data | Response>(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void {
+    (
+      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+    ).port1.once('message', listener)
+  }
+
+  /** @inheritDoc */
+  protected deregisterWorkerMessageListener<Message extends Data | Response>(
+    workerNodeKey: number,
+    listener: (message: MessageValue<Message>) => void
+  ): void {
+    (
+      this.workerNodes[workerNodeKey].messageChannel as MessageChannel
+    ).port1.off('message', listener)
+  }
+
   /** @inheritDoc */
   protected createWorker (): Worker {
     return new Worker(this.filePath, {
index 311a56ad60e00684835e89ef0507d60f0c9d4432..b8e4b167602dd14306b595d0f1564b13009f636c 100644 (file)
@@ -100,7 +100,7 @@ export abstract class AbstractWorker<
     this.checkTaskFunctions(taskFunctions)
     this.checkWorkerOptions(this.opts)
     if (!this.isMain) {
-      this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
+      this.getMainWorker().once('message', this.handleReadyMessage.bind(this))
     }
   }