feat: add dedicated message channel for threads pool
[poolifier.git] / src / pools / abstract-pool.ts
index 10a2d514246748a7d21187f22b6b8400b1697864..ddf4cbccd64f0f29e70b7f2994b4c2cc543626f3 100644 (file)
@@ -30,7 +30,6 @@ import {
 import type {
   IWorker,
   IWorkerNode,
-  MessageHandler,
   WorkerInfo,
   WorkerType,
   WorkerUsage
@@ -878,6 +877,11 @@ export abstract class AbstractPool<
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
+      const workerInfo = this.getWorkerInfoByWorker(worker)
+      if (workerInfo.messageChannel != null) {
+        workerInfo.messageChannel?.port1.close()
+        workerInfo.messageChannel?.port1.close()
+      }
       this.removeWorkerNode(worker)
     })
 
@@ -928,12 +932,9 @@ export abstract class AbstractPool<
    * @param worker - The worker which should register a listener.
    * @param listener - The message listener callback.
    */
-  private registerWorkerMessageListener<Message extends Data | Response>(
-    worker: Worker,
-    listener: (message: MessageValue<Message>) => void
-  ): void {
-    worker.on('message', listener as MessageHandler<Worker>)
-  }
+  protected abstract registerWorkerMessageListener<
+    Message extends Data | Response
+  >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   /**
    * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
@@ -944,18 +945,18 @@ export abstract class AbstractPool<
   protected afterWorkerSetup (worker: Worker): void {
     // Listen to worker messages.
     this.registerWorkerMessageListener(worker, this.workerListener())
-    // Send startup message to worker.
-    this.sendWorkerStartupMessage(worker)
+    // Send the startup message to worker.
+    this.sendStartupMessageToWorker(worker)
     // Setup worker task statistics computation.
     this.setWorkerStatistics(worker)
   }
 
-  private sendWorkerStartupMessage (worker: Worker): void {
-    this.sendToWorker(worker, {
-      ready: false,
-      workerId: this.getWorkerInfoByWorker(worker).id as number
-    })
-  }
+  /**
+   * Sends the startup message to the given worker.
+   *
+   * @param worker - The worker which should receive the startup message.
+   */
+  protected abstract sendStartupMessageToWorker (worker: Worker): void
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
@@ -1064,7 +1065,7 @@ export abstract class AbstractPool<
    *
    * @param worker - The worker.
    */
-  private getWorkerInfoByWorker (worker: Worker): WorkerInfo {
+  protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
     return this.workerNodes[this.getWorkerNodeKey(worker)].info
   }