refactor: factor out worker node termination code
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Dec 2023 09:39:35 +0000 (10:39 +0100)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 16 Dec 2023 09:39:35 +0000 (10:39 +0100)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/pools/worker-node.ts
src/pools/worker.ts

index 56be9a1a9ffe0ca22131b40d9308adab87b9894a..282979852bf564de38942b3d8c86f1dddf0c1831 100644 (file)
@@ -1043,7 +1043,14 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
+  protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
+    this.flagWorkerNodeAsNotReady(workerNodeKey)
+    this.flushTasksQueue(workerNodeKey)
+    // FIXME: wait for tasks to be finished
+    const workerNode = this.workerNodes[workerNodeKey]
+    await this.sendKillMessageToWorker(workerNodeKey)
+    await workerNode.terminate()
+  }
 
   /**
    * Setup hook to execute code before worker nodes are created in the abstract constructor.
@@ -1279,17 +1286,15 @@ export abstract class AbstractPool<
     )
     workerNode.registerWorkerEventHandler('error', (error: Error) => {
       const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker)
-      this.flagWorkerNodeAsNotReady(workerNodeKey)
-      const workerInfo = this.getWorkerInfo(workerNodeKey)
+      workerNode.info.ready = false
       this.emitter?.emit(PoolEvents.error, error)
-      this.workerNodes[workerNodeKey].closeChannel()
       if (
         this.started &&
         !this.starting &&
         !this.destroying &&
         this.opts.restartWorkerOnError === true
       ) {
-        if (workerInfo.dynamic) {
+        if (workerNode.info.dynamic) {
           this.createAndSetupDynamicWorkerNode()
         } else {
           this.createAndSetupWorkerNode()
@@ -1298,6 +1303,9 @@ export abstract class AbstractPool<
       if (this.started && this.opts.enableTasksQueue === true) {
         this.redistributeQueuedTasks(workerNodeKey)
       }
+      workerNode.terminate().catch(error => {
+        this.emitter?.emit(PoolEvents.error, error)
+      })
     })
     workerNode.registerWorkerEventHandler(
       'exit',
@@ -1846,7 +1854,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Removes the worker node associated to the give given worker from the pool worker nodes.
+   * Removes the worker node associated to the given worker from the pool worker nodes.
    *
    * @param worker - The worker.
    */
index f3fb54bbf4a08bd0ee6d28f623f3ac04f55630f1..46cabdcb75787700bab966bed3ce3312056659ad 100644 (file)
@@ -41,26 +41,6 @@ export class FixedClusterPool<
     return cluster.isPrimary
   }
 
-  /** @inheritDoc */
-  protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
-    this.flagWorkerNodeAsNotReady(workerNodeKey)
-    this.flushTasksQueue(workerNodeKey)
-    // FIXME: wait for tasks to be finished
-    const workerNode = this.workerNodes[workerNodeKey]
-    const waitWorkerExit = new Promise<void>(resolve => {
-      workerNode.registerOnceWorkerEventHandler('exit', () => {
-        resolve()
-      })
-    })
-    workerNode.registerOnceWorkerEventHandler('disconnect', () => {
-      workerNode.worker.kill()
-    })
-    await this.sendKillMessageToWorker(workerNodeKey)
-    workerNode.removeAllListeners()
-    workerNode.worker.disconnect()
-    await waitWorkerExit
-  }
-
   /** @inheritDoc */
   protected sendToWorker (
     workerNodeKey: number,
index e5074e392b45a94c1e9b80376cc08d3624a5c155..197830279be020e45741fb00930bb82ffcc12ea3 100644 (file)
@@ -42,24 +42,6 @@ export class FixedThreadPool<
     return isMainThread
   }
 
-  /** @inheritDoc */
-  protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
-    this.flagWorkerNodeAsNotReady(workerNodeKey)
-    this.flushTasksQueue(workerNodeKey)
-    // FIXME: wait for tasks to be finished
-    const workerNode = this.workerNodes[workerNodeKey]
-    const waitWorkerExit = new Promise<void>(resolve => {
-      workerNode.registerOnceWorkerEventHandler('exit', () => {
-        resolve()
-      })
-    })
-    await this.sendKillMessageToWorker(workerNodeKey)
-    workerNode.closeChannel()
-    workerNode.removeAllListeners()
-    await workerNode.worker.terminate()
-    await waitWorkerExit
-  }
-
   /** @inheritDoc */
   protected sendToWorker (
     workerNodeKey: number,
index 9a98458e59547378cef70ad00dcbb36fe6be695e..7a6a1500f962e4cc948ad593f3f40e45d49e53ad 100644 (file)
@@ -124,14 +124,23 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   }
 
   /** @inheritdoc */
-  public closeChannel (): void {
-    if (this.messageChannel != null) {
-      this.messageChannel.port1.unref()
-      this.messageChannel.port2.unref()
-      this.messageChannel.port1.close()
-      this.messageChannel.port2.close()
-      delete this.messageChannel
+  public async terminate (): Promise<void> {
+    const waitWorkerExit = new Promise<void>(resolve => {
+      this.registerOnceWorkerEventHandler('exit', () => {
+        resolve()
+      })
+    })
+    this.closeMessageChannel()
+    this.removeAllListeners()
+    if (this.info.type === WorkerTypes.thread) {
+      await this.worker.terminate?.()
+    } else if (this.info.type === WorkerTypes.cluster) {
+      this.registerOnceWorkerEventHandler('disconnect', () => {
+        this.worker.kill?.()
+      })
+      this.worker.disconnect?.()
     }
+    await waitWorkerExit
   }
 
   /** @inheritdoc */
@@ -187,6 +196,16 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
+  private closeMessageChannel (): void {
+    if (this.messageChannel != null) {
+      this.messageChannel.port1.unref()
+      this.messageChannel.port2.unref()
+      this.messageChannel.port1.close()
+      this.messageChannel.port2.close()
+      delete this.messageChannel
+    }
+  }
+
   private initWorkerInfo (worker: Worker): WorkerInfo {
     return {
       id: getWorkerId(worker),
index 1a94ef60950dffe84e9430374dcd13395a956bab..da4cf172d98ee4fcc3384fe4f11bc50800f4a378 100644 (file)
@@ -198,9 +198,12 @@ export interface StrategyData {
  */
 export interface IWorker {
   /**
-   * Worker id.
+   * Cluster worker id.
    */
   readonly id?: number
+  /**
+   * Worker thread worker id.
+   */
   readonly threadId?: number
   /**
    * Registers an event listener.
@@ -230,6 +233,19 @@ export interface IWorker {
     | ErrorHandler<this>
     | ExitHandler<this>
   ) => void
+  /**
+   * Stop all JavaScript execution in the worker thread as soon as possible.
+   * Returns a Promise for the exit code that is fulfilled when the `'exit' event` is emitted.
+   */
+  readonly terminate?: () => Promise<number>
+  /**
+   * Cluster worker disconnect.
+   */
+  readonly disconnect?: () => void
+  /**
+   * Cluster worker kill.
+   */
+  readonly kill?: (signal?: string) => void
 }
 
 /**
@@ -270,7 +286,7 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    */
   strategyData?: StrategyData
   /**
-   * Message channel (worker_threads only).
+   * Message channel (worker thread only).
    */
   readonly messageChannel?: MessageChannel
   /**
@@ -325,9 +341,9 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    */
   readonly resetUsage: () => void
   /**
-   * Closes communication channel.
+   * Terminates the worker node.
    */
-  readonly closeChannel: () => void
+  readonly terminate: () => Promise<void>
   /**
    * Registers a worker event handler.
    *