chore: v3.1.7
[poolifier.git] / src / pools / worker-node.ts
index 59c4de7a388e81434c373f1835b144c1b3c88f57..d80c03d80f00edc474da050ca41c8297c12142dd 100644 (file)
@@ -5,15 +5,20 @@ import type { Task } from '../utility-types'
 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
 import { Deque } from '../deque'
 import {
+  type ErrorHandler,
+  type ExitHandler,
   type IWorker,
   type IWorkerNode,
+  type MessageHandler,
+  type OnlineHandler,
   type StrategyData,
   type WorkerInfo,
+  type WorkerNodeOptions,
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
 } from './worker'
-import { checkWorkerNodeArguments } from './utils'
+import { checkWorkerNodeArguments, createWorker } from './utils'
 
 /**
  * Worker node.
@@ -43,19 +48,23 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /**
    * Constructs a new worker node.
    *
-   * @param worker - The worker.
-   * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
+   * @param type - The worker type.
+   * @param filePath - Path to the worker file.
+   * @param opts - The worker node options.
    */
-  constructor (worker: Worker, tasksQueueBackPressureSize: number) {
+  constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
     super()
-    checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
-    this.worker = worker
-    this.info = this.initWorkerInfo(worker)
+    checkWorkerNodeArguments(type, filePath, opts)
+    this.worker = createWorker<Worker>(type, filePath, {
+      env: opts.env,
+      workerOptions: opts.workerOptions
+    })
+    this.info = this.initWorkerInfo(this.worker)
     this.usage = this.initWorkerUsage()
     if (this.info.type === WorkerTypes.thread) {
       this.messageChannel = new MessageChannel()
     }
-    this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+    this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
@@ -115,14 +124,47 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   }
 
   /** @inheritdoc */
-  public closeChannel (): void {
-    if (this.messageChannel != null) {
-      this.messageChannel.port1.unref()
-      this.messageChannel.port2.unref()
-      this.messageChannel.port1.close()
-      this.messageChannel.port2.close()
-      delete this.messageChannel
+  public async terminate (): Promise<void> {
+    const waitWorkerExit = new Promise<void>(resolve => {
+      this.registerOnceWorkerEventHandler('exit', () => {
+        resolve()
+      })
+    })
+    this.closeMessageChannel()
+    this.removeAllListeners()
+    if (this.info.type === WorkerTypes.thread) {
+      await this.worker.terminate?.()
+    } else if (this.info.type === WorkerTypes.cluster) {
+      this.registerOnceWorkerEventHandler('disconnect', () => {
+        this.worker.kill?.()
+      })
+      this.worker.disconnect?.()
     }
+    await waitWorkerExit
+  }
+
+  /** @inheritdoc */
+  public registerWorkerEventHandler (
+    event: string,
+    handler:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ): void {
+    this.worker.on(event, handler)
+  }
+
+  /** @inheritdoc */
+  public registerOnceWorkerEventHandler (
+    event: string,
+    handler:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ): void {
+    this.worker.once(event, handler)
   }
 
   /** @inheritdoc */
@@ -154,6 +196,16 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.taskFunctionsUsage.delete(name)
   }
 
+  private closeMessageChannel (): void {
+    if (this.messageChannel != null) {
+      this.messageChannel.port1.unref()
+      this.messageChannel.port2.unref()
+      this.messageChannel.port1.close()
+      this.messageChannel.port2.close()
+      delete this.messageChannel
+    }
+  }
+
   private initWorkerInfo (worker: Worker): WorkerInfo {
     return {
       id: getWorkerId(worker),