chore: generate documentation
[poolifier.git] / src / pools / abstract-pool.ts
index 81012fa0b4f736559fca73eb395bf7c622bce48a..a558ef504568ad95abcf91d8f413968a2f07bf8f 100644 (file)
@@ -149,6 +149,7 @@ export abstract class AbstractPool<
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
+      this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
       this.opts.enableEvents = opts.enableEvents ?? true
       this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
       if (this.opts.enableTasksQueue) {
@@ -415,10 +416,7 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected beforeTaskExecutionHook (
-    workerNodeKey: number,
-    task: Task<Data>
-  ): void {
+  protected beforeTaskExecutionHook (workerNodeKey: number): void {
     ++this.workerNodes[workerNodeKey].tasksUsage.running
   }
 
@@ -433,13 +431,21 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
+    const workerTasksUsage =
+      this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
     --workerTasksUsage.running
     ++workerTasksUsage.run
     if (message.error != null) {
       ++workerTasksUsage.error
     }
+    this.updateRunTimeTasksUsage(workerTasksUsage, message)
+    this.updateWaitTimeTasksUsage(workerTasksUsage, message)
+  }
+
+  private updateRunTimeTasksUsage (
+    workerTasksUsage: TasksUsage,
+    message: MessageValue<Response>
+  ): void {
     if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
       workerTasksUsage.runTime += message.runTime ?? 0
       if (
@@ -457,6 +463,12 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
+  }
+
+  private updateWaitTimeTasksUsage (
+    workerTasksUsage: TasksUsage,
+    message: MessageValue<Response>
+  ): void {
     if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
       workerTasksUsage.waitTime += message.waitTime ?? 0
       if (
@@ -551,6 +563,16 @@ export abstract class AbstractPool<
 
     worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+    worker.on('error', error => {
+      if (this.emitter != null) {
+        this.emitter.emit(PoolEvents.error, error)
+      }
+    })
+    if (this.opts.restartWorkerOnError === true) {
+      worker.on('error', () => {
+        this.createAndSetupWorker()
+      })
+    }
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
@@ -598,7 +620,7 @@ export abstract class AbstractPool<
   }
 
   private checkAndEmitEvents (): void {
-    if (this.opts.enableEvents === true) {
+    if (this.emitter != null) {
       if (this.busy) {
         this.emitter?.emit(PoolEvents.busy)
       }
@@ -675,12 +697,14 @@ export abstract class AbstractPool<
    */
   private removeWorkerNode (worker: Worker): void {
     const workerNodeKey = this.getWorkerNodeKey(worker)
-    this.workerNodes.splice(workerNodeKey, 1)
-    this.workerChoiceStrategyContext.remove(workerNodeKey)
+    if (workerNodeKey !== -1) {
+      this.workerNodes.splice(workerNodeKey, 1)
+      this.workerChoiceStrategyContext.remove(workerNodeKey)
+    }
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
-    this.beforeTaskExecutionHook(workerNodeKey, task)
+    this.beforeTaskExecutionHook(workerNodeKey)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }