fix: reset worker choice strategies retries count
[poolifier.git] / src / pools / abstract-pool.ts
index eb2c2f7818a687ac8f8016eb27489895cadfaaee..64fda2545627eec3fbcd3a7bdc3b7edd4ecb7ed2 100644 (file)
@@ -204,9 +204,10 @@ export abstract class AbstractPool<
       this.opts.workerChoiceStrategy =
         opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
       this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
-      this.opts.workerChoiceStrategyOptions =
-        opts.workerChoiceStrategyOptions ??
-        DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+      this.opts.workerChoiceStrategyOptions = {
+        ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+        ...opts.workerChoiceStrategyOptions
+      }
       this.checkValidWorkerChoiceStrategyOptions(
         this.opts.workerChoiceStrategyOptions
       )
@@ -244,6 +245,22 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must be a plain object'
       )
     }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+    ) {
+      throw new TypeError(
+        'Invalid worker choice strategy options: choice retries must be an integer'
+      )
+    }
+    if (
+      workerChoiceStrategyOptions.choiceRetries != null &&
+      workerChoiceStrategyOptions.choiceRetries <= 0
+    ) {
+      throw new RangeError(
+        `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+      )
+    }
     if (
       workerChoiceStrategyOptions.weights != null &&
       Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
@@ -566,7 +583,10 @@ export abstract class AbstractPool<
     workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
   ): void {
     this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
-    this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+    this.opts.workerChoiceStrategyOptions = {
+      ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+      ...workerChoiceStrategyOptions
+    }
     this.workerChoiceStrategyContext.setOptions(
       this.opts.workerChoiceStrategyOptions
     )
@@ -1242,6 +1262,17 @@ export abstract class AbstractPool<
     }
   }
 
+  /** @inheritDoc */
+  public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+    if (
+      this.opts.enableTasksQueue === true &&
+      this.workerNodes[workerNodeKey].hasBackPressure()
+    ) {
+      return true
+    }
+    return false
+  }
+
   /**
    * Executes the given task on the worker given its worker node key.
    *
@@ -1254,16 +1285,14 @@ export abstract class AbstractPool<
   }
 
   private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
-    if (
-      this.opts.enableTasksQueue === true &&
-      this.workerNodes[workerNodeKey].hasBackPressure()
-    ) {
+    const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
+    if (this.hasWorkerNodeBackPressure(workerNodeKey)) {
       this.emitter?.emit(PoolEvents.backPressure, {
         workerId: this.getWorkerInfo(workerNodeKey).id,
         ...this.info
       })
     }
-    return this.workerNodes[workerNodeKey].enqueueTask(task)
+    return tasksQueueSize
   }
 
   private dequeueTask (workerNodeKey: number): Task<Data> | undefined {