fix: ensure the task concurrency is respected at queued task
[poolifier.git] / src / pools / abstract-pool.ts
index 086d7ee82550bc40c46781b83616c1639b32155b..0d3d21832a715191e6efad9723c6a47454ae2215 100644 (file)
@@ -10,6 +10,7 @@ import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
+  isAsyncFunction,
   isKillBehavior,
   isPlainObject,
   median,
@@ -114,6 +115,7 @@ export abstract class AbstractPool<
     this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
     this.executeTask = this.executeTask.bind(this)
     this.enqueueTask = this.enqueueTask.bind(this)
+    this.dequeueTask = this.dequeueTask.bind(this)
     this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
 
     if (this.opts.enableEvents === true) {
@@ -171,7 +173,15 @@ export abstract class AbstractPool<
 
   protected checkDynamicPoolSize (min: number, max: number): void {
     if (this.type === PoolTypes.dynamic) {
-      if (min > max) {
+      if (max == null) {
+        throw new Error(
+          'Cannot instantiate a dynamic pool without specifying the maximum pool size'
+        )
+      } else if (!Number.isSafeInteger(max)) {
+        throw new TypeError(
+          'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
+        )
+      } else if (min > max) {
         throw new RangeError(
           'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
         )
@@ -913,7 +923,16 @@ export abstract class AbstractPool<
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
         // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        void (this.destroyWorkerNode(localWorkerNodeKey) as Promise<void>)
+        const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
+        if (isAsyncFunction(destroyWorkerNodeBounded)) {
+          (
+            destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
+          )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
+        } else {
+          (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
+            localWorkerNodeKey
+          )
+        }
       }
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
@@ -985,6 +1004,7 @@ export abstract class AbstractPool<
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       let targetWorkerNodeKey: number = workerNodeKey
       let minQueuedTasks = Infinity
+      let executeTask = false
       for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
         const workerInfo = this.getWorkerInfo(workerNodeId)
         if (
@@ -992,6 +1012,12 @@ export abstract class AbstractPool<
           workerInfo.ready &&
           workerNode.usage.tasks.queued === 0
         ) {
+          if (
+            this.workerNodes[workerNodeId].usage.tasks.executing <
+            (this.opts.tasksQueueOptions?.concurrency as number)
+          ) {
+            executeTask = true
+          }
           targetWorkerNodeKey = workerNodeId
           break
         }
@@ -1004,10 +1030,17 @@ export abstract class AbstractPool<
           targetWorkerNodeKey = workerNodeId
         }
       }
-      this.enqueueTask(
-        targetWorkerNodeKey,
-        this.dequeueTask(workerNodeKey) as Task<Data>
-      )
+      if (executeTask) {
+        this.executeTask(
+          targetWorkerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      } else {
+        this.enqueueTask(
+          targetWorkerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      }
     }
   }