feat: add dedicated message channel for threads pool
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 19 Jul 2023 21:41:53 +0000 (23:41 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 19 Jul 2023 21:41:53 +0000 (23:41 +0200)
Reference: #801

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/worker/thread-worker.test.js

index 10a2d514246748a7d21187f22b6b8400b1697864..ddf4cbccd64f0f29e70b7f2994b4c2cc543626f3 100644 (file)
@@ -30,7 +30,6 @@ import {
 import type {
   IWorker,
   IWorkerNode,
-  MessageHandler,
   WorkerInfo,
   WorkerType,
   WorkerUsage
@@ -878,6 +877,11 @@ export abstract class AbstractPool<
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
+      const workerInfo = this.getWorkerInfoByWorker(worker)
+      if (workerInfo.messageChannel != null) {
+        workerInfo.messageChannel?.port1.close()
+        workerInfo.messageChannel?.port1.close()
+      }
       this.removeWorkerNode(worker)
     })
 
@@ -928,12 +932,9 @@ export abstract class AbstractPool<
    * @param worker - The worker which should register a listener.
    * @param listener - The message listener callback.
    */
-  private registerWorkerMessageListener<Message extends Data | Response>(
-    worker: Worker,
-    listener: (message: MessageValue<Message>) => void
-  ): void {
-    worker.on('message', listener as MessageHandler<Worker>)
-  }
+  protected abstract registerWorkerMessageListener<
+    Message extends Data | Response
+  >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   /**
    * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
@@ -944,18 +945,18 @@ export abstract class AbstractPool<
   protected afterWorkerSetup (worker: Worker): void {
     // Listen to worker messages.
     this.registerWorkerMessageListener(worker, this.workerListener())
-    // Send startup message to worker.
-    this.sendWorkerStartupMessage(worker)
+    // Send the startup message to worker.
+    this.sendStartupMessageToWorker(worker)
     // Setup worker task statistics computation.
     this.setWorkerStatistics(worker)
   }
 
-  private sendWorkerStartupMessage (worker: Worker): void {
-    this.sendToWorker(worker, {
-      ready: false,
-      workerId: this.getWorkerInfoByWorker(worker).id as number
-    })
-  }
+  /**
+   * Sends the startup message to the given worker.
+   *
+   * @param worker - The worker which should receive the startup message.
+   */
+  protected abstract sendStartupMessageToWorker (worker: Worker): void
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
@@ -1064,7 +1065,7 @@ export abstract class AbstractPool<
    *
    * @param worker - The worker.
    */
-  private getWorkerInfoByWorker (worker: Worker): WorkerInfo {
+  protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
     return this.workerNodes[this.getWorkerNodeKey(worker)].info
   }
 
index 6e9b4b07113bac62000a84ae0a2d8aa05b333532..d7e3cc4bce0c4291bac5a71df3b9878933f96148 100644 (file)
@@ -73,6 +73,22 @@ export class FixedClusterPool<
     worker.send(message)
   }
 
+  /** @inheritDoc */
+  protected sendStartupMessageToWorker (worker: Worker): void {
+    this.sendToWorker(worker, {
+      ready: false,
+      workerId: this.getWorkerInfoByWorker(worker).id as number
+    })
+  }
+
+  /** @inheritDoc */
+  protected registerWorkerMessageListener<Message extends Data | Response>(
+    worker: Worker,
+    listener: (message: MessageValue<Message>) => void
+  ): void {
+    worker.on('message', listener)
+  }
+
   /** @inheritDoc */
   protected createWorker (): Worker {
     return cluster.fork(this.opts.env)
index c9145f571186a9d5783531236414aff7eadd463e..52868fd7ec4360e62d9ab565995bb4ce532fde02 100644 (file)
@@ -1,4 +1,6 @@
 import {
+  type MessageChannel,
+  type MessagePort,
   SHARE_ENV,
   Worker,
   type WorkerOptions,
@@ -56,12 +58,42 @@ export class FixedThreadPool<
   /** @inheritDoc */
   protected async destroyWorker (worker: Worker): Promise<void> {
     this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
+    const workerInfo = this.getWorkerInfoByWorker(worker)
+    workerInfo.messageChannel?.port1.close()
+    workerInfo.messageChannel?.port2.close()
     await worker.terminate()
   }
 
   /** @inheritDoc */
   protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
-    worker.postMessage(message)
+    (
+      this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
+    ).port1.postMessage(message)
+  }
+
+  /** @inheritDoc */
+  protected sendStartupMessageToWorker (worker: Worker): void {
+    const port2: MessagePort = (
+      this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
+    ).port2
+    worker.postMessage(
+      {
+        ready: false,
+        workerId: this.getWorkerInfoByWorker(worker).id as number,
+        port: port2
+      },
+      [port2]
+    )
+  }
+
+  /** @inheritDoc */
+  protected registerWorkerMessageListener<Message extends Data | Response>(
+    worker: Worker,
+    listener: (message: MessageValue<Message>) => void
+  ): void {
+    (
+      this.getWorkerInfoByWorker(worker).messageChannel as MessageChannel
+    ).port1.on('message', listener)
   }
 
   /** @inheritDoc */
index 76169c8988b28a7621a716decb96fb9771fbfe67..2c39393b05eb416b16f7aba82cdba417ab5b4758 100644 (file)
@@ -1,3 +1,4 @@
+import { MessageChannel } from 'node:worker_threads'
 import { CircularArray } from '../circular-array'
 import { Queue } from '../queue'
 import type { Task } from '../utility-types'
@@ -86,7 +87,10 @@ implements IWorkerNode<Worker, Data> {
       id: this.getWorkerId(worker, workerType),
       type: workerType,
       dynamic: false,
-      ready: false
+      ready: false,
+      ...(workerType === WorkerTypes.thread && {
+        messageChannel: new MessageChannel()
+      })
     }
   }
 
index 507396dea078ac806c5dda839336f6046df11d5c..b7cd7f6799b9fe5b3c5da9c35be9d417a700c1b7 100644 (file)
@@ -1,3 +1,4 @@
+import type { MessageChannel } from 'node:worker_threads'
 import type { CircularArray } from '../circular-array'
 import type { Task } from '../utility-types'
 
@@ -136,6 +137,10 @@ export interface WorkerInfo {
    * Ready flag.
    */
   ready: boolean
+  /**
+   * Message channel.
+   */
+  messageChannel?: MessageChannel
 }
 
 /**
index 0723b4395407a94b18d3cafcfc6e5db2360017e0..91644713c18d7bcc064ed071a4bb5578bda5ca22 100644 (file)
@@ -1,4 +1,5 @@
 import type { EventLoopUtilization } from 'node:perf_hooks'
+import type { MessagePort } from 'node:worker_threads'
 import type { KillBehavior } from './worker/worker-options'
 import type { IWorker } from './pools/worker'
 
@@ -118,6 +119,10 @@ export interface MessageValue<Data = unknown, ErrorData = unknown>
    * Whether the worker starts or stops its activity check.
    */
   readonly checkActive?: boolean
+  /**
+   * Message port.
+   */
+  readonly port?: MessagePort
 }
 
 /**
index a431f5a7fb0e9e72771dc848c0245fb775078854..77d2e0076f58abbbdf05c33ae994ec4f7cadf44e 100644 (file)
@@ -66,17 +66,17 @@ export abstract class AbstractWorker<
    *
    * @param type - The type of async event.
    * @param isMain - Whether this is the main worker or not.
-   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
    * @param mainWorker - Reference to main worker.
+   * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
    * @param opts - Options for the worker.
    */
   public constructor (
     type: string,
     protected readonly isMain: boolean,
+    protected readonly mainWorker: MainWorker,
     taskFunctions:
     | WorkerFunction<Data, Response>
     | TaskFunctions<Data, Response>,
-    protected readonly mainWorker: MainWorker,
     protected readonly opts: WorkerOptions = {
       /**
        * The kill behavior option on this worker or its default value.
@@ -93,7 +93,7 @@ export abstract class AbstractWorker<
     this.checkWorkerOptions(this.opts)
     this.checkTaskFunctions(taskFunctions)
     if (!this.isMain) {
-      this.mainWorker?.on('message', this.messageListener.bind(this))
+      this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this))
     }
   }
 
@@ -289,12 +289,9 @@ export abstract class AbstractWorker<
    *
    * @param message - The received message.
    */
-  protected messageListener (message: MessageValue<Data, Data>): void {
+  protected messageListener (message: MessageValue<Data>): void {
     if (message.workerId === this.id) {
-      if (message.ready != null) {
-        // Startup message received
-        this.sendReadyResponse()
-      } else if (message.statistics != null) {
+      if (message.statistics != null) {
         // Statistics message received
         this.statistics = message.statistics
       } else if (message.checkActive != null) {
@@ -314,11 +311,11 @@ export abstract class AbstractWorker<
   }
 
   /**
-   * Sends the ready response to the main worker.
+   * Handles the ready message sent by the main worker.
+   *
+   * @param message - The ready message.
    */
-  protected sendReadyResponse (): void {
-    !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id })
-  }
+  protected abstract handleReadyMessage (message: MessageValue<Data>): void
 
   /**
    * Starts the worker check active interval.
index c43e7f75a7586d2b2a9fa6633eea7da6e44697bf..c820641ddbb10319b454bf5b25154312703cc446 100644 (file)
@@ -37,10 +37,20 @@ export class ClusterWorker<
     super(
       'worker-cluster-pool:poolifier',
       cluster.isPrimary,
-      taskFunctions,
       cluster.worker as Worker,
+      taskFunctions,
       opts
     )
+    if (!this.isMain) {
+      this.getMainWorker()?.on('message', this.messageListener.bind(this))
+    }
+  }
+
+  /** @inheritDoc */
+  protected handleReadyMessage (message: MessageValue<Data>): void {
+    if (message.workerId === this.id && message.ready != null) {
+      !this.isMain && this.sendToMainWorker({ ready: true, workerId: this.id })
+    }
   }
 
   /** @inheritDoc */
index 09135afffc913dd7b983b7cbac9134bb5573b3b3..a8658f03a797ed649cbd583a14ed5d924949a235 100644 (file)
@@ -27,6 +27,10 @@ export class ThreadWorker<
   Data = unknown,
   Response = unknown
 > extends AbstractWorker<MessagePort, Data, Response> {
+  /**
+   * Message port used to communicate with the main thread.
+   */
+  private port!: MessagePort
   /**
    * Constructs a new poolifier thread worker.
    *
@@ -42,19 +46,35 @@ export class ThreadWorker<
     super(
       'worker-thread-pool:poolifier',
       isMainThread,
-      taskFunctions,
       parentPort as MessagePort,
+      taskFunctions,
       opts
     )
   }
 
+  /** @inheritDoc */
+  protected handleReadyMessage (message: MessageValue<Data>): void {
+    if (
+      message.workerId === this.id &&
+      message.ready != null &&
+      message.port != null
+    ) {
+      if (!this.isMain) {
+        this.port = message.port
+        this.port.on('message', this.messageListener.bind(this))
+        this.sendToMainWorker({ ready: true, workerId: this.id })
+      }
+    }
+  }
+
+  /** @inheritDoc */
   protected get id (): number {
     return threadId
   }
 
   /** @inheritDoc */
   protected sendToMainWorker (message: MessageValue<Response>): void {
-    this.getMainWorker().postMessage(message)
+    this.port.postMessage(message)
   }
 
   /** @inheritDoc */
index 62c461841b82fbf8c014b95b3502ec050e9d6406..b26ca43d6ef0cc37354d598db8dd3851e686fdca 100644 (file)
@@ -1,3 +1,4 @@
+const { MessageChannel } = require('worker_threads')
 const { expect } = require('expect')
 const {
   DynamicClusterPool,
@@ -549,7 +550,8 @@ describe('Abstract pool test suite', () => {
         id: expect.any(Number),
         type: WorkerTypes.thread,
         dynamic: false,
-        ready: true
+        ready: true,
+        messageChannel: expect.any(MessageChannel)
       })
     }
   })
index c08adffd9816a9c0f4bd78337b857a664fa4553f..9eeaaa0a70d44eeced4d4ecc4fa88f885b426156 100644 (file)
@@ -7,8 +7,9 @@ describe('Thread worker test suite', () => {
     ++numberOfMessagesPosted
   }
   class SpyWorker extends ThreadWorker {
-    getMainWorker () {
-      return { postMessage }
+    constructor (fn) {
+      super(fn)
+      this.port = { postMessage }
     }
   }
 
@@ -25,7 +26,7 @@ describe('Thread worker test suite', () => {
     expect(worker.handleError(errorMessage)).toStrictEqual(errorMessage)
   })
 
-  it('Verify worker invokes the getMainWorker() and postMessage() methods', () => {
+  it('Verify worker invokes the postMessage() method on port property', () => {
     const worker = new SpyWorker(() => {})
     worker.sendToMainWorker({ ok: 1 })
     expect(numberOfMessagesPosted).toBe(1)