refactor: move worker setup into worker node constructor
[poolifier.git] / src / pools / worker-node.ts
index 59c4de7a388e81434c373f1835b144c1b3c88f57..9a98458e59547378cef70ad00dcbb36fe6be695e 100644 (file)
@@ -5,15 +5,20 @@ import type { Task } from '../utility-types'
 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils'
 import { Deque } from '../deque'
 import {
+  type ErrorHandler,
+  type ExitHandler,
   type IWorker,
   type IWorkerNode,
+  type MessageHandler,
+  type OnlineHandler,
   type StrategyData,
   type WorkerInfo,
+  type WorkerNodeOptions,
   type WorkerType,
   WorkerTypes,
   type WorkerUsage
 } from './worker'
-import { checkWorkerNodeArguments } from './utils'
+import { checkWorkerNodeArguments, createWorker } from './utils'
 
 /**
  * Worker node.
@@ -43,19 +48,23 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /**
    * Constructs a new worker node.
    *
-   * @param worker - The worker.
-   * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
+   * @param type - The worker type.
+   * @param filePath - The worker file path.
+   * @param opts - The worker node options.
    */
-  constructor (worker: Worker, tasksQueueBackPressureSize: number) {
+  constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
     super()
-    checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
-    this.worker = worker
-    this.info = this.initWorkerInfo(worker)
+    checkWorkerNodeArguments(type, filePath, opts)
+    this.worker = createWorker<Worker>(type, filePath, {
+      env: opts.env,
+      workerOptions: opts.workerOptions
+    })
+    this.info = this.initWorkerInfo(this.worker)
     this.usage = this.initWorkerUsage()
     if (this.info.type === WorkerTypes.thread) {
       this.messageChannel = new MessageChannel()
     }
-    this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
+    this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
     this.tasksQueue = new Deque<Task<Data>>()
     this.onBackPressureStarted = false
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
@@ -125,6 +134,30 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     }
   }
 
+  /** @inheritdoc */
+  public registerWorkerEventHandler (
+    event: string,
+    listener:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ): void {
+    this.worker.on(event, listener)
+  }
+
+  /** @inheritdoc */
+  public registerOnceWorkerEventHandler (
+    event: string,
+    listener:
+    | OnlineHandler<Worker>
+    | MessageHandler<Worker>
+    | ErrorHandler<Worker>
+    | ExitHandler<Worker>
+  ): void {
+    this.worker.once(event, listener)
+  }
+
   /** @inheritdoc */
   public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
     if (!Array.isArray(this.info.taskFunctionNames)) {