fix: ensure worker choice strategies internals are updated after tasks
[poolifier.git] / src / pools / abstract-pool.ts
index c3565446f254c0292978f1f3eed1c23c1b1ac267..8363ecba9f8d3c98d9071023e99f4b7b47053e7a 100644 (file)
@@ -379,6 +379,11 @@ export abstract class AbstractPool<
    */
   protected abstract get busy (): boolean
 
+  /**
+   * Whether worker nodes are executing at least one task.
+   *
+   * @returns Worker nodes busyness boolean status.
+   */
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
@@ -434,7 +439,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Shutdowns the given worker.
+   * Terminates the given worker.
    *
    * @param worker - A worker within `workerNodes`.
    */
@@ -609,18 +614,28 @@ export abstract class AbstractPool<
   /**
    * Chooses a worker node for the next task.
    *
-   * The default worker choice strategy uses a round robin algorithm to distribute the load.
+   * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
    *
    * @returns The worker node key
    */
-  protected chooseWorkerNode (): number {
+  private chooseWorkerNode (): number {
     if (this.shallCreateDynamicWorker()) {
-      return this.getWorkerNodeKey(this.createAndSetupDynamicWorker())
+      const worker = this.createAndSetupDynamicWorker()
+      if (
+        this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+      ) {
+        return this.getWorkerNodeKey(worker)
+      }
     }
     return this.workerChoiceStrategyContext.execute()
   }
 
-  protected shallCreateDynamicWorker (): boolean {
+  /**
+   * Conditions for dynamic worker creation.
+   *
+   * @returns Whether to create a dynamic worker or not.
+   */
+  private shallCreateDynamicWorker (): boolean {
     return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
   }
 
@@ -646,7 +661,9 @@ export abstract class AbstractPool<
   >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   /**
-   * Returns a newly created worker.
+   * Creates a new worker.
+   *
+   * @returns Newly created worker.
    */
   protected abstract createWorker (): Worker
 
@@ -704,12 +721,15 @@ export abstract class AbstractPool<
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
         (message.kill != null &&
-          this.workerNodes[currentWorkerNodeKey].workerUsage.tasks.executing ===
-            0)
+          ((this.opts.enableTasksQueue === false &&
+            this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+              .executing === 0) ||
+            (this.opts.enableTasksQueue === true &&
+              this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+                .executing === 0 &&
+              this.tasksQueueSize(currentWorkerNodeKey) === 0)))
       ) {
         // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        this.flushTasksQueue(currentWorkerNodeKey)
-        // FIXME: wait for tasks to be finished
         void (this.destroyWorker(worker) as Promise<void>)
       }
     })
@@ -746,6 +766,7 @@ export abstract class AbstractPool<
               workerNodeKey,
               this.dequeueTask(workerNodeKey) as Task<Data>
             )
+            this.workerChoiceStrategyContext.update(workerNodeKey)
           }
         }
       }