test: cleanup helpers
[poolifier.git] / src / pools / abstract-pool.ts
index f082b8d144993b28621cc58e2941551c8da4a264..058b43b8a37104b29d1d15978613c42735072547 100644 (file)
@@ -29,6 +29,7 @@ import type {
   WorkerUsage
 } from './worker'
 import {
+  Measurements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
   type WorkerChoiceStrategyOptions
@@ -68,8 +69,6 @@ export abstract class AbstractPool<
 
   /**
    * Worker choice strategy context referencing a worker choice algorithm implementation.
-   *
-   * Default to a round robin algorithm.
    */
   protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
   Worker,
@@ -201,6 +200,16 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must have a weight for each worker node'
       )
     }
+    if (
+      workerChoiceStrategyOptions.measurement != null &&
+      !Object.values(Measurements).includes(
+        workerChoiceStrategyOptions.measurement
+      )
+    ) {
+      throw new Error(
+        `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+      )
+    }
   }
 
   private checkValidTasksQueueOptions (
@@ -209,11 +218,20 @@ export abstract class AbstractPool<
     if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
       throw new TypeError('Invalid tasks queue options: must be a plain object')
     }
-    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+    if (
+      tasksQueueOptions?.concurrency != null &&
+      !Number.isSafeInteger(tasksQueueOptions.concurrency)
+    ) {
+      throw new TypeError(
+        'Invalid worker tasks concurrency: must be an integer'
+      )
+    }
+    if (
+      tasksQueueOptions?.concurrency != null &&
+      tasksQueueOptions.concurrency <= 0
+    ) {
       throw new Error(
-        `Invalid worker tasks concurrency '${
-          tasksQueueOptions.concurrency as number
-        }'`
+        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
       )
     }
   }
@@ -381,6 +399,11 @@ export abstract class AbstractPool<
    */
   protected abstract get busy (): boolean
 
+  /**
+   * Whether worker nodes are executing at least one task.
+   *
+   * @returns Worker nodes busyness boolean status.
+   */
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
@@ -418,7 +441,6 @@ export abstract class AbstractPool<
     } else {
       this.executeTask(workerNodeKey, submittedTask)
     }
-    this.workerChoiceStrategyContext.update(workerNodeKey)
     this.checkAndEmitEvents()
     // eslint-disable-next-line @typescript-eslint/return-await
     return res
@@ -436,7 +458,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Shutdowns the given worker.
+   * Terminates the given worker.
    *
    * @param worker - A worker within `workerNodes`.
    */
@@ -486,14 +508,21 @@ export abstract class AbstractPool<
   ): void {
     const workerUsage =
       this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+    this.updateTaskStatisticsWorkerUsage(workerUsage, message)
+    this.updateRunTimeWorkerUsage(workerUsage, message)
+    this.updateEluWorkerUsage(workerUsage, message)
+  }
+
+  private updateTaskStatisticsWorkerUsage (
+    workerUsage: WorkerUsage,
+    message: MessageValue<Response>
+  ): void {
     const workerTaskStatistics = workerUsage.tasks
     --workerTaskStatistics.executing
     ++workerTaskStatistics.executed
     if (message.taskError != null) {
       ++workerTaskStatistics.failed
     }
-    this.updateRunTimeWorkerUsage(workerUsage, message)
-    this.updateEluWorkerUsage(workerUsage, message)
   }
 
   private updateRunTimeWorkerUsage (
@@ -511,7 +540,8 @@ export abstract class AbstractPool<
         workerUsage.tasks.executed !== 0
       ) {
         workerUsage.runTime.average =
-          workerUsage.runTime.aggregate / workerUsage.tasks.executed
+          workerUsage.runTime.aggregate /
+          (workerUsage.tasks.executed - workerUsage.tasks.failed)
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
@@ -541,7 +571,8 @@ export abstract class AbstractPool<
         workerUsage.tasks.executed !== 0
       ) {
         workerUsage.waitTime.average =
-          workerUsage.waitTime.aggregate / workerUsage.tasks.executed
+          workerUsage.waitTime.aggregate /
+          (workerUsage.tasks.executed - workerUsage.tasks.failed)
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
@@ -563,10 +594,8 @@ export abstract class AbstractPool<
         .aggregate
     ) {
       if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
-        workerUsage.elu.idle.aggregate =
-          workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle
-        workerUsage.elu.active.aggregate =
-          workerUsage.elu.active.aggregate + message.taskPerformance.elu.active
+        workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
+        workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
         workerUsage.elu.utilization =
           (workerUsage.elu.utilization +
             message.taskPerformance.elu.utilization) /
@@ -581,10 +610,12 @@ export abstract class AbstractPool<
           .average &&
         workerUsage.tasks.executed !== 0
       ) {
+        const executedTasks =
+          workerUsage.tasks.executed - workerUsage.tasks.failed
         workerUsage.elu.idle.average =
-          workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
+          workerUsage.elu.idle.aggregate / executedTasks
         workerUsage.elu.active.average =
-          workerUsage.elu.active.aggregate / workerUsage.tasks.executed
+          workerUsage.elu.active.aggregate / executedTasks
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
@@ -602,33 +633,29 @@ export abstract class AbstractPool<
   /**
    * Chooses a worker node for the next task.
    *
-   * The default worker choice strategy uses a round robin algorithm to distribute the load.
+   * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
    *
    * @returns The worker node key
    */
-  protected chooseWorkerNode (): number {
-    let workerNodeKey: number
-    if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
-      const workerCreated = this.createAndSetupWorker()
-      this.registerWorkerMessageListener(workerCreated, message => {
-        const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
-        if (
-          isKillBehavior(KillBehaviors.HARD, message.kill) ||
-          (message.kill != null &&
-            this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
-              .executing === 0)
-        ) {
-          // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          this.flushTasksQueue(currentWorkerNodeKey)
-          // FIXME: wait for tasks to be finished
-          void (this.destroyWorker(workerCreated) as Promise<void>)
-        }
-      })
-      workerNodeKey = this.getWorkerNodeKey(workerCreated)
-    } else {
-      workerNodeKey = this.workerChoiceStrategyContext.execute()
+  private chooseWorkerNode (): number {
+    if (this.shallCreateDynamicWorker()) {
+      const worker = this.createAndSetupDynamicWorker()
+      if (
+        this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+      ) {
+        return this.getWorkerNodeKey(worker)
+      }
     }
-    return workerNodeKey
+    return this.workerChoiceStrategyContext.execute()
+  }
+
+  /**
+   * Conditions for dynamic worker creation.
+   *
+   * @returns Whether to create a dynamic worker or not.
+   */
+  private shallCreateDynamicWorker (): boolean {
+    return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
   }
 
   /**
@@ -653,7 +680,9 @@ export abstract class AbstractPool<
   >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   /**
-   * Returns a newly created worker.
+   * Creates a new worker.
+   *
+   * @returns Newly created worker.
    */
   protected abstract createWorker (): Worker
 
@@ -680,8 +709,6 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
-    })
-    worker.on('error', () => {
       if (this.opts.restartWorkerOnError === true) {
         this.createAndSetupWorker()
       }
@@ -701,6 +728,33 @@ export abstract class AbstractPool<
     return worker
   }
 
+  /**
+   * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+   *
+   * @returns New, completely set up dynamic worker.
+   */
+  protected createAndSetupDynamicWorker (): Worker {
+    const worker = this.createAndSetupWorker()
+    this.registerWorkerMessageListener(worker, message => {
+      const workerNodeKey = this.getWorkerNodeKey(worker)
+      if (
+        isKillBehavior(KillBehaviors.HARD, message.kill) ||
+        (message.kill != null &&
+          ((this.opts.enableTasksQueue === false &&
+            this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+              0) ||
+            (this.opts.enableTasksQueue === true &&
+              this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+                0 &&
+              this.tasksQueueSize(workerNodeKey) === 0)))
+      ) {
+        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+        void (this.destroyWorker(worker) as Promise<void>)
+      }
+    })
+    return worker
+  }
+
   /**
    * This function is the listener registered for each worker message.
    *
@@ -713,10 +767,10 @@ export abstract class AbstractPool<
         const promiseResponse = this.promiseResponseMap.get(message.id)
         if (promiseResponse != null) {
           if (message.taskError != null) {
-            promiseResponse.reject(message.taskError.message)
             if (this.emitter != null) {
               this.emitter.emit(PoolEvents.taskError, message.taskError)
             }
+            promiseResponse.reject(message.taskError.message)
           } else {
             promiseResponse.resolve(message.data as Response)
           }
@@ -732,6 +786,7 @@ export abstract class AbstractPool<
               this.dequeueTask(workerNodeKey) as Task<Data>
             )
           }
+          this.workerChoiceStrategyContext.update(workerNodeKey)
         }
       }
     }