test: cleanup helpers
[poolifier.git] / src / pools / abstract-pool.ts
index f64e9ea41cd6ee3779e75e92cd158d963bd56a7b..058b43b8a37104b29d1d15978613c42735072547 100644 (file)
@@ -29,6 +29,7 @@ import type {
   WorkerUsage
 } from './worker'
 import {
+  Measurements,
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
   type WorkerChoiceStrategyOptions
@@ -199,6 +200,16 @@ export abstract class AbstractPool<
         'Invalid worker choice strategy options: must have a weight for each worker node'
       )
     }
+    if (
+      workerChoiceStrategyOptions.measurement != null &&
+      !Object.values(Measurements).includes(
+        workerChoiceStrategyOptions.measurement
+      )
+    ) {
+      throw new Error(
+        `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
+      )
+    }
   }
 
   private checkValidTasksQueueOptions (
@@ -207,11 +218,20 @@ export abstract class AbstractPool<
     if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
       throw new TypeError('Invalid tasks queue options: must be a plain object')
     }
-    if ((tasksQueueOptions?.concurrency as number) <= 0) {
+    if (
+      tasksQueueOptions?.concurrency != null &&
+      !Number.isSafeInteger(tasksQueueOptions.concurrency)
+    ) {
+      throw new TypeError(
+        'Invalid worker tasks concurrency: must be an integer'
+      )
+    }
+    if (
+      tasksQueueOptions?.concurrency != null &&
+      tasksQueueOptions.concurrency <= 0
+    ) {
       throw new Error(
-        `Invalid worker tasks concurrency '${
-          tasksQueueOptions.concurrency as number
-        }'`
+        `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
       )
     }
   }
@@ -379,6 +399,11 @@ export abstract class AbstractPool<
    */
   protected abstract get busy (): boolean
 
+  /**
+   * Whether worker nodes are executing at least one task.
+   *
+   * @returns Worker nodes busyness boolean status.
+   */
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
@@ -416,7 +441,6 @@ export abstract class AbstractPool<
     } else {
       this.executeTask(workerNodeKey, submittedTask)
     }
-    this.workerChoiceStrategyContext.update(workerNodeKey)
     this.checkAndEmitEvents()
     // eslint-disable-next-line @typescript-eslint/return-await
     return res
@@ -434,7 +458,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Shutdowns the given worker.
+   * Terminates the given worker.
    *
    * @param worker - A worker within `workerNodes`.
    */
@@ -609,33 +633,29 @@ export abstract class AbstractPool<
   /**
    * Chooses a worker node for the next task.
    *
-   * The default worker choice strategy uses a round robin algorithm to distribute the load.
+   * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
    *
    * @returns The worker node key
    */
-  protected chooseWorkerNode (): number {
-    let workerNodeKey: number
-    if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
-      const workerCreated = this.createAndSetupWorker()
-      this.registerWorkerMessageListener(workerCreated, message => {
-        const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
-        if (
-          isKillBehavior(KillBehaviors.HARD, message.kill) ||
-          (message.kill != null &&
-            this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
-              .executing === 0)
-        ) {
-          // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-          this.flushTasksQueue(currentWorkerNodeKey)
-          // FIXME: wait for tasks to be finished
-          void (this.destroyWorker(workerCreated) as Promise<void>)
-        }
-      })
-      workerNodeKey = this.getWorkerNodeKey(workerCreated)
-    } else {
-      workerNodeKey = this.workerChoiceStrategyContext.execute()
+  private chooseWorkerNode (): number {
+    if (this.shallCreateDynamicWorker()) {
+      const worker = this.createAndSetupDynamicWorker()
+      if (
+        this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+      ) {
+        return this.getWorkerNodeKey(worker)
+      }
     }
-    return workerNodeKey
+    return this.workerChoiceStrategyContext.execute()
+  }
+
+  /**
+   * Conditions for dynamic worker creation.
+   *
+   * @returns Whether to create a dynamic worker or not.
+   */
+  private shallCreateDynamicWorker (): boolean {
+    return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
   }
 
   /**
@@ -660,7 +680,9 @@ export abstract class AbstractPool<
   >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   /**
-   * Returns a newly created worker.
+   * Creates a new worker.
+   *
+   * @returns Newly created worker.
    */
   protected abstract createWorker (): Worker
 
@@ -687,8 +709,6 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
-    })
-    worker.on('error', () => {
       if (this.opts.restartWorkerOnError === true) {
         this.createAndSetupWorker()
       }
@@ -708,6 +728,33 @@ export abstract class AbstractPool<
     return worker
   }
 
+  /**
+   * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
+   *
+   * @returns New, completely set up dynamic worker.
+   */
+  protected createAndSetupDynamicWorker (): Worker {
+    const worker = this.createAndSetupWorker()
+    this.registerWorkerMessageListener(worker, message => {
+      const workerNodeKey = this.getWorkerNodeKey(worker)
+      if (
+        isKillBehavior(KillBehaviors.HARD, message.kill) ||
+        (message.kill != null &&
+          ((this.opts.enableTasksQueue === false &&
+            this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+              0) ||
+            (this.opts.enableTasksQueue === true &&
+              this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
+                0 &&
+              this.tasksQueueSize(workerNodeKey) === 0)))
+      ) {
+        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+        void (this.destroyWorker(worker) as Promise<void>)
+      }
+    })
+    return worker
+  }
+
   /**
    * This function is the listener registered for each worker message.
    *
@@ -720,10 +767,10 @@ export abstract class AbstractPool<
         const promiseResponse = this.promiseResponseMap.get(message.id)
         if (promiseResponse != null) {
           if (message.taskError != null) {
-            promiseResponse.reject(message.taskError.message)
             if (this.emitter != null) {
               this.emitter.emit(PoolEvents.taskError, message.taskError)
             }
+            promiseResponse.reject(message.taskError.message)
           } else {
             promiseResponse.resolve(message.data as Response)
           }
@@ -739,6 +786,7 @@ export abstract class AbstractPool<
               this.dequeueTask(workerNodeKey) as Task<Data>
             )
           }
+          this.workerChoiceStrategyContext.update(workerNodeKey)
         }
       }
     }