Revert "fix: use version from package.json"
[poolifier.git] / src / pools / abstract-pool.ts
index f71bf2008c6fa955aea588e7b2c5ab905e690f47..796cff1d56c6f85e87f48609c938eaa84535edb7 100644 (file)
@@ -4,11 +4,12 @@ import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
 import {
   DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
   EMPTY_FUNCTION,
+  isKillBehavior,
   isPlainObject,
   median,
   round
 } from '../utils'
-import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
+import { KillBehaviors } from '../worker/worker-options'
 import { CircularArray } from '../circular-array'
 import { Queue } from '../queue'
 import {
@@ -37,6 +38,7 @@ import {
   type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -248,11 +250,15 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
+      version,
       type: this.type,
       worker: this.worker,
       minSize: this.minSize,
       maxSize: this.maxSize,
-      utilization: round(this.utilization),
+      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+        .runTime.aggregate &&
+        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          .waitTime.aggregate && { utilization: round(this.utilization) }),
       workerNodes: this.workerNodes.length,
       idleWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
@@ -501,7 +507,13 @@ export abstract class AbstractPool<
       this.workerNodes.map(async (workerNode, workerNodeKey) => {
         this.flushTasksQueue(workerNodeKey)
         // FIXME: wait for tasks to be finished
+        const workerExitPromise = new Promise<void>(resolve => {
+          workerNode.worker.on('exit', () => {
+            resolve()
+          })
+        })
         await this.destroyWorker(workerNode.worker)
+        await workerExitPromise
       })
     )
   }
@@ -762,6 +774,33 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
+      if (this.opts.enableTasksQueue === true) {
+        const workerNodeKey = this.getWorkerNodeKey(worker)
+        while (this.tasksQueueSize(workerNodeKey) > 0) {
+          let targetWorkerNodeKey: number = workerNodeKey
+          let minQueuedTasks = Infinity
+          for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued === 0
+            ) {
+              targetWorkerNodeKey = workerNodeId
+              break
+            }
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued < minQueuedTasks
+            ) {
+              minQueuedTasks = workerNode.usage.tasks.queued
+              targetWorkerNodeKey = workerNodeId
+            }
+          }
+          this.enqueueTask(
+            targetWorkerNodeKey,
+            this.dequeueTask(workerNodeKey) as Task<Data>
+          )
+        }
+      }
       if (this.opts.restartWorkerOnError === true) {
         this.createAndSetupWorker()
       }
@@ -815,44 +854,53 @@ export abstract class AbstractPool<
     return message => {
       if (message.workerId != null && message.started != null) {
         // Worker started message received
-        const worker = this.getWorkerById(message.workerId)
-        if (worker != null) {
-          this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
-            message.started
-        } else {
-          throw new Error(
-            `Worker started message received from unknown worker '${message.workerId}'`
-          )
-        }
+        this.handleWorkerStartedMessage(message)
       } else if (message.id != null) {
         // Task execution response received
-        const promiseResponse = this.promiseResponseMap.get(message.id)
-        if (promiseResponse != null) {
-          if (message.taskError != null) {
-            if (this.emitter != null) {
-              this.emitter.emit(PoolEvents.taskError, message.taskError)
-            }
-            promiseResponse.reject(
-              `${message.taskError.message} on worker '${message.taskError.workerId}'`
-            )
-          } else {
-            promiseResponse.resolve(message.data as Response)
-          }
-          this.afterTaskExecutionHook(promiseResponse.worker, message)
-          this.promiseResponseMap.delete(message.id)
-          const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
-          if (
-            this.opts.enableTasksQueue === true &&
-            this.tasksQueueSize(workerNodeKey) > 0
-          ) {
-            this.executeTask(
-              workerNodeKey,
-              this.dequeueTask(workerNodeKey) as Task<Data>
-            )
-          }
-          this.workerChoiceStrategyContext.update(workerNodeKey)
+        this.handleTaskExecutionResponse(message)
+      }
+    }
+  }
+
+  private handleWorkerStartedMessage (message: MessageValue<Response>): void {
+    // Worker started message received
+    const worker = this.getWorkerById(message.workerId as number)
+    if (worker != null) {
+      this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+        message.started as boolean
+    } else {
+      throw new Error(
+        `Worker started message received from unknown worker '${
+          message.workerId as number
+        }'`
+      )
+    }
+  }
+
+  private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+    const promiseResponse = this.promiseResponseMap.get(message.id as string)
+    if (promiseResponse != null) {
+      if (message.taskError != null) {
+        if (this.emitter != null) {
+          this.emitter.emit(PoolEvents.taskError, message.taskError)
         }
+        promiseResponse.reject(message.taskError.message)
+      } else {
+        promiseResponse.resolve(message.data as Response)
       }
+      this.afterTaskExecutionHook(promiseResponse.worker, message)
+      this.promiseResponseMap.delete(message.id as string)
+      const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+      if (
+        this.opts.enableTasksQueue === true &&
+        this.tasksQueueSize(workerNodeKey) > 0
+      ) {
+        this.executeTask(
+          workerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
+      }
+      this.workerChoiceStrategyContext.update(workerNodeKey)
     }
   }
 
@@ -974,13 +1022,11 @@ export abstract class AbstractPool<
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
-    if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
-        this.executeTask(
-          workerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
-      }
+    while (this.tasksQueueSize(workerNodeKey) > 0) {
+      this.executeTask(
+        workerNodeKey,
+        this.dequeueTask(workerNodeKey) as Task<Data>
+      )
     }
     this.workerNodes[workerNodeKey].tasksQueue.clear()
   }