Revert "fix: use version from package.json"
[poolifier.git] / src / pools / abstract-pool.ts
index 232b0cc17e702e95c12bb0b083e351af3f624161..796cff1d56c6f85e87f48609c938eaa84535edb7 100644 (file)
@@ -38,6 +38,7 @@ import {
   type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -249,6 +250,7 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
+      version,
       type: this.type,
       worker: this.worker,
       minSize: this.minSize,
@@ -505,7 +507,13 @@ export abstract class AbstractPool<
       this.workerNodes.map(async (workerNode, workerNodeKey) => {
         this.flushTasksQueue(workerNodeKey)
         // FIXME: wait for tasks to be finished
+        const workerExitPromise = new Promise<void>(resolve => {
+          workerNode.worker.on('exit', () => {
+            resolve()
+          })
+        })
         await this.destroyWorker(workerNode.worker)
+        await workerExitPromise
       })
     )
   }
@@ -766,6 +774,33 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
+      if (this.opts.enableTasksQueue === true) {
+        const workerNodeKey = this.getWorkerNodeKey(worker)
+        while (this.tasksQueueSize(workerNodeKey) > 0) {
+          let targetWorkerNodeKey: number = workerNodeKey
+          let minQueuedTasks = Infinity
+          for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued === 0
+            ) {
+              targetWorkerNodeKey = workerNodeId
+              break
+            }
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued < minQueuedTasks
+            ) {
+              minQueuedTasks = workerNode.usage.tasks.queued
+              targetWorkerNodeKey = workerNodeId
+            }
+          }
+          this.enqueueTask(
+            targetWorkerNodeKey,
+            this.dequeueTask(workerNodeKey) as Task<Data>
+          )
+        }
+      }
       if (this.opts.restartWorkerOnError === true) {
         this.createAndSetupWorker()
       }
@@ -987,13 +1022,11 @@ export abstract class AbstractPool<
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
-    if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
-        this.executeTask(
-          workerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
-      }
+    while (this.tasksQueueSize(workerNodeKey) > 0) {
+      this.executeTask(
+        workerNodeKey,
+        this.dequeueTask(workerNodeKey) as Task<Data>
+      )
     }
     this.workerNodes[workerNodeKey].tasksQueue.clear()
   }