refactor: cleanup cluster ESM issue workaround
[poolifier.git] / src / pools / abstract-pool.ts
index 56dd7fa9296265078f863d342338da0457309919..30b4d01f9846cd2c2327d5a52e4a6be594e8d62c 100644 (file)
@@ -10,7 +10,6 @@ import {
   DEFAULT_TASK_NAME,
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
-  isAsyncFunction,
   isKillBehavior,
   isPlainObject,
   median,
@@ -335,16 +334,20 @@ export abstract class AbstractPool<
           accumulator + workerNode.usage.tasks.executing,
         0
       ),
-      queuedTasks: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          accumulator + workerNode.usage.tasks.queued,
-        0
-      ),
-      maxQueuedTasks: this.workerNodes.reduce(
-        (accumulator, workerNode) =>
-          accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
-        0
-      ),
+      ...(this.opts.enableTasksQueue === true && {
+        queuedTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + workerNode.usage.tasks.queued,
+          0
+        )
+      }),
+      ...(this.opts.enableTasksQueue === true && {
+        maxQueuedTasks: this.workerNodes.reduce(
+          (accumulator, workerNode) =>
+            accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
+          0
+        )
+      }),
       failedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
           accumulator + workerNode.usage.tasks.failed,
@@ -618,9 +621,10 @@ export abstract class AbstractPool<
    */
   protected internalBusy (): boolean {
     return (
-      this.workerNodes.findIndex(workerNode => {
-        return workerNode.usage.tasks.executing === 0
-      }) === -1
+      this.workerNodes.findIndex(
+        workerNode =>
+          workerNode.info.ready && workerNode.usage.tasks.executing === 0
+      ) === -1
     )
   }
 
@@ -643,14 +647,14 @@ export abstract class AbstractPool<
         workerNodeKey
       })
       if (
-        this.opts.enableTasksQueue === true &&
-        (this.busy ||
-          this.workerNodes[workerNodeKey].usage.tasks.executing >=
+        this.opts.enableTasksQueue === false ||
+        (this.opts.enableTasksQueue === true &&
+          this.workerNodes[workerNodeKey].usage.tasks.executing <
             (this.opts.tasksQueueOptions?.concurrency as number))
       ) {
-        this.enqueueTask(workerNodeKey, task)
-      } else {
         this.executeTask(workerNodeKey, task)
+      } else {
+        this.enqueueTask(workerNodeKey, task)
       }
       this.checkAndEmitEvents()
     })
@@ -659,16 +663,8 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async destroy (): Promise<void> {
     await Promise.all(
-      this.workerNodes.map(async (workerNode, workerNodeKey) => {
-        this.flushTasksQueue(workerNodeKey)
-        // FIXME: wait for tasks to be finished
-        const workerExitPromise = new Promise<void>(resolve => {
-          workerNode.worker.on('exit', () => {
-            resolve()
-          })
-        })
+      this.workerNodes.map(async (_, workerNodeKey) => {
         await this.destroyWorkerNode(workerNodeKey)
-        await workerExitPromise
       })
     )
   }
@@ -678,9 +674,7 @@ export abstract class AbstractPool<
    *
    * @param workerNodeKey - The worker node key.
    */
-  protected abstract destroyWorkerNode (
-    workerNodeKey: number
-  ): void | Promise<void>
+  protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
 
   /**
    * Setup hook to execute code before worker nodes are created in the abstract constructor.
@@ -910,6 +904,7 @@ export abstract class AbstractPool<
         message.workerId
       )
       const workerUsage = this.workerNodes[localWorkerNodeKey].usage
+      // Kill message received from worker
       if (
         isKillBehavior(KillBehaviors.HARD, message.kill) ||
         (message.kill != null &&
@@ -919,17 +914,7 @@ export abstract class AbstractPool<
               workerUsage.tasks.executing === 0 &&
               this.tasksQueueSize(localWorkerNodeKey) === 0)))
       ) {
-        // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
-        const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
-        if (isAsyncFunction(destroyWorkerNodeBounded)) {
-          (
-            destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
-          )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
-        } else {
-          (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
-            localWorkerNodeKey
-          )
-        }
+        this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
       }
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
@@ -1050,10 +1035,10 @@ export abstract class AbstractPool<
     return message => {
       this.checkMessageWorkerId(message)
       if (message.ready != null) {
-        // Worker ready response received
+        // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.id != null) {
-        // Task execution response received
+        // Task execution response received from worker
         this.handleTaskExecutionResponse(message)
       }
     }
@@ -1173,7 +1158,7 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueueSize()
   }
 
-  private flushTasksQueue (workerNodeKey: number): void {
+  protected flushTasksQueue (workerNodeKey: number): void {
     while (this.tasksQueueSize(workerNodeKey) > 0) {
       this.executeTask(
         workerNodeKey,