refactor: factor out worker communication channel closing
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 19 Jul 2023 23:48:21 +0000 (01:48 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 19 Jul 2023 23:48:21 +0000 (01:48 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts
src/pools/worker.ts

index be1402a7eccf3e9a81d5e7736f61ed7751d1720a..93023794ceb8f27519fc73e4c3e79c73efb21338 100644 (file)
@@ -516,7 +516,7 @@ export abstract class AbstractPool<
    * @param worker - The worker.
    * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerNodeKey (worker: Worker): number {
+  protected getWorkerNodeKey (worker: Worker): number {
     return this.workerNodes.findIndex(
       workerNode => workerNode.worker === worker
     )
@@ -877,11 +877,7 @@ export abstract class AbstractPool<
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
-      const workerInfo = this.getWorkerInfoByWorker(worker)
-      if (workerInfo.messageChannel != null) {
-        workerInfo.messageChannel?.port1.close()
-        workerInfo.messageChannel?.port1.close()
-      }
+      this.workerNodes[this.getWorkerNodeKey(worker)].closeChannel()
       this.removeWorkerNode(worker)
     })
 
@@ -1055,6 +1051,7 @@ export abstract class AbstractPool<
    * Gets the worker information from the given worker node key.
    *
    * @param workerNodeKey - The worker node key.
+   * @returns The worker information.
    */
   private getWorkerInfo (workerNodeKey: number): WorkerInfo {
     return this.workerNodes[workerNodeKey].info
@@ -1064,6 +1061,8 @@ export abstract class AbstractPool<
    * Gets the worker information from the given worker.
    *
    * @param worker - The worker.
+   * @returns The worker information.
+   * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
    */
   protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
     const workerNodeKey = this.getWorkerNodeKey(worker)
index e3a94b329d4cfcee6f05cd6e2f8ad06e555a6dfc..bef82826350d772c146656d4676f942923d33165 100644 (file)
@@ -58,9 +58,7 @@ export class FixedThreadPool<
   /** @inheritDoc */
   protected async destroyWorker (worker: Worker): Promise<void> {
     this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
-    const workerInfo = this.getWorkerInfoByWorker(worker)
-    workerInfo.messageChannel?.port1.close()
-    workerInfo.messageChannel?.port2.close()
+    this.workerNodes[this.getWorkerNodeKey(worker)].closeChannel()
     await worker.terminate()
   }
 
index 2c39393b05eb416b16f7aba82cdba417ab5b4758..fcc091378565a4b78c7f9085619c9017169474d3 100644 (file)
@@ -74,6 +74,15 @@ implements IWorkerNode<Worker, Data> {
     this.tasksUsage.clear()
   }
 
+  /** @inheritdoc */
+  public closeChannel (): void {
+    if (this.info.messageChannel != null) {
+      this.info.messageChannel?.port1.close()
+      this.info.messageChannel?.port2.close()
+      delete this.info.messageChannel
+    }
+  }
+
   /** @inheritdoc */
   public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
     if (!this.tasksUsage.has(name)) {
index b7cd7f6799b9fe5b3c5da9c35be9d417a700c1b7..f1c29b3659a3c1c9da5569fe3f008fe70cfd78df 100644 (file)
@@ -242,6 +242,10 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * Resets usage statistics .
    */
   readonly resetUsage: () => void
+  /**
+   * Close communication channel.
+   */
+  readonly closeChannel: () => void
   /**
    * Gets task worker usage statistics.
    */