chore: v2.5.1
[poolifier.git] / src / pools / abstract-pool.ts
index e396744603eed1011518cbb3f62b59bb3cedd305..a558ef504568ad95abcf91d8f413968a2f07bf8f 100644 (file)
@@ -149,6 +149,7 @@ export abstract class AbstractPool<
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
+      this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
       this.opts.enableEvents = opts.enableEvents ?? true
       this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
       if (this.opts.enableTasksQueue) {
@@ -415,22 +416,8 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected beforeTaskExecutionHook (
-    workerNodeKey: number,
-    task: Task<Data>
-  ): void {
-    const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
-    ++workerTasksUsage.running
-    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
-      const waitTime = performance.now() - (task.submissionTimestamp ?? 0)
-      workerTasksUsage.waitTime += waitTime
-      if (
-        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime
-      ) {
-        workerTasksUsage.waitTimeHistory.push(waitTime)
-        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
-      }
-    }
+  protected beforeTaskExecutionHook (workerNodeKey: number): void {
+    ++this.workerNodes[workerNodeKey].tasksUsage.running
   }
 
   /**
@@ -444,13 +431,21 @@ export abstract class AbstractPool<
     worker: Worker,
     message: MessageValue<Response>
   ): void {
-    const workerNodeKey = this.getWorkerNodeKey(worker)
-    const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
+    const workerTasksUsage =
+      this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
     --workerTasksUsage.running
     ++workerTasksUsage.run
     if (message.error != null) {
       ++workerTasksUsage.error
     }
+    this.updateRunTimeTasksUsage(workerTasksUsage, message)
+    this.updateWaitTimeTasksUsage(workerTasksUsage, message)
+  }
+
+  private updateRunTimeTasksUsage (
+    workerTasksUsage: TasksUsage,
+    message: MessageValue<Response>
+  ): void {
     if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
       workerTasksUsage.runTime += message.runTime ?? 0
       if (
@@ -468,13 +463,28 @@ export abstract class AbstractPool<
         workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
       }
     }
-    if (
-      this.workerChoiceStrategyContext.getRequiredStatistics().waitTime &&
-      this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
-      workerTasksUsage.run !== 0
-    ) {
-      workerTasksUsage.avgWaitTime =
-        workerTasksUsage.waitTime / workerTasksUsage.run
+  }
+
+  private updateWaitTimeTasksUsage (
+    workerTasksUsage: TasksUsage,
+    message: MessageValue<Response>
+  ): void {
+    if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+      workerTasksUsage.waitTime += message.waitTime ?? 0
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+        workerTasksUsage.run !== 0
+      ) {
+        workerTasksUsage.avgWaitTime =
+          workerTasksUsage.waitTime / workerTasksUsage.run
+      }
+      if (
+        this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+        message.waitTime != null
+      ) {
+        workerTasksUsage.waitTimeHistory.push(message.waitTime)
+        workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+      }
     }
   }
 
@@ -553,6 +563,16 @@ export abstract class AbstractPool<
 
     worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
     worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+    worker.on('error', error => {
+      if (this.emitter != null) {
+        this.emitter.emit(PoolEvents.error, error)
+      }
+    })
+    if (this.opts.restartWorkerOnError === true) {
+      worker.on('error', () => {
+        this.createAndSetupWorker()
+      })
+    }
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
     worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
     worker.once('exit', () => {
@@ -600,7 +620,7 @@ export abstract class AbstractPool<
   }
 
   private checkAndEmitEvents (): void {
-    if (this.opts.enableEvents === true) {
+    if (this.emitter != null) {
       if (this.busy) {
         this.emitter?.emit(PoolEvents.busy)
       }
@@ -677,12 +697,14 @@ export abstract class AbstractPool<
    */
   private removeWorkerNode (worker: Worker): void {
     const workerNodeKey = this.getWorkerNodeKey(worker)
-    this.workerNodes.splice(workerNodeKey, 1)
-    this.workerChoiceStrategyContext.remove(workerNodeKey)
+    if (workerNodeKey !== -1) {
+      this.workerNodes.splice(workerNodeKey, 1)
+      this.workerChoiceStrategyContext.remove(workerNodeKey)
+    }
   }
 
   private executeTask (workerNodeKey: number, task: Task<Data>): void {
-    this.beforeTaskExecutionHook(workerNodeKey, task)
+    this.beforeTaskExecutionHook(workerNodeKey)
     this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
   }