fix: recreate the right worker type on uncaught error
authorJérôme Benoit <jerome.benoit@sap.com>
Tue, 4 Jul 2023 20:00:44 +0000 (22:00 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Tue, 4 Jul 2023 20:00:44 +0000 (22:00 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker.ts

index ad657b17cffd21b373473444a802fba34c6cefb8..e12023f1de4c8a913e382028445585bc06893528 100644 (file)
@@ -28,6 +28,7 @@ import type {
   IWorker,
   MessageHandler,
   Task,
+  WorkerInfo,
   WorkerNode,
   WorkerUsage
 } from './worker'
@@ -379,10 +380,10 @@ export abstract class AbstractPool<
     if (workerChoiceStrategyOptions != null) {
       this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     }
-    for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
+    for (const workerNode of this.workerNodes) {
       this.setWorkerNodeTasksUsage(
         workerNode,
-        this.getWorkerUsage(workerNodeKey)
+        this.getInitialWorkerUsage(workerNode.worker)
       )
       this.setWorkerStatistics(workerNode.worker)
     }
@@ -827,7 +828,11 @@ export abstract class AbstractPool<
         }
       }
       if (this.opts.restartWorkerOnError === true) {
-        this.createAndSetupWorker()
+        if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
+          this.createAndSetupDynamicWorker()
+        } else {
+          this.createAndSetupWorker()
+        }
       }
     })
     worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
@@ -852,6 +857,7 @@ export abstract class AbstractPool<
    */
   protected createAndSetupDynamicWorker (): Worker {
     const worker = this.createAndSetupWorker()
+    this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
     this.registerWorkerMessageListener(worker, message => {
       const workerNodeKey = this.getWorkerNodeKey(worker)
       if (
@@ -953,6 +959,15 @@ export abstract class AbstractPool<
     workerNode.usage = workerUsage
   }
 
+  /**
+   * Gets the worker information.
+   *
+   * @param workerNodeKey - The worker node key.
+   */
+  private getWorkerInfo (workerNodeKey: number): WorkerInfo {
+    return this.workerNodes[workerNodeKey].info
+  }
+
   /**
    * Pushes the given worker in the pool worker nodes.
    *
@@ -962,14 +977,13 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     this.workerNodes.push({
       worker,
-      info: { id: this.getWorkerId(worker), started: true },
-      usage: this.getWorkerUsage(),
+      info: this.getInitialWorkerInfo(worker),
+      usage: this.getInitialWorkerUsage(),
       tasksQueue: new Queue<Task<Data>>()
     })
-    const workerNodeKey = this.getWorkerNodeKey(worker)
     this.setWorkerNodeTasksUsage(
-      this.workerNodes[workerNodeKey],
-      this.getWorkerUsage(workerNodeKey)
+      this.workerNodes[this.getWorkerNodeKey(worker)],
+      this.getInitialWorkerUsage(worker)
     )
     return this.workerNodes.length
   }
@@ -1074,22 +1088,26 @@ export abstract class AbstractPool<
     })
   }
 
-  private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
-    const getTasksQueueSize = (workerNodeKey?: number): number => {
-      return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
+  private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
+    const getTasksQueueSize = (worker?: Worker): number => {
+      return worker != null
+        ? this.tasksQueueSize(this.getWorkerNodeKey(worker))
+        : 0
     }
-    const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
-      return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
+    const getTasksMaxQueueSize = (worker?: Worker): number => {
+      return worker != null
+        ? this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
+        : 0
     }
     return {
       tasks: {
         executed: 0,
         executing: 0,
         get queued (): number {
-          return getTasksQueueSize(workerNodeKey)
+          return getTasksQueueSize(worker)
         },
         get maxQueued (): number {
-          return getTasksMaxQueueSize(workerNodeKey)
+          return getTasksMaxQueueSize(worker)
         },
         failed: 0
       },
@@ -1129,4 +1147,8 @@ export abstract class AbstractPool<
       }
     }
   }
+
+  private getInitialWorkerInfo (worker: Worker): WorkerInfo {
+    return { id: this.getWorkerId(worker), dynamic: false, started: true }
+  }
 }
index 7f9cef0a6632aa354f07eb04db5dea6d7ad38c4e..20fac7ccce33fe1b32ac16874b1d721758cef137 100644 (file)
@@ -136,6 +136,10 @@ export interface WorkerInfo {
    * Worker id.
    */
   readonly id: number | undefined
+  /**
+   * Dynamic flag.
+   */
+  dynamic: boolean
   /**
    * Started flag.
    */