feat: add minimum and maximum to internal measurements
[poolifier.git] / src / pools / abstract-pool.ts
index 24e8a5b0c670f275a91a93dec22227fa12659d25..ad657b17cffd21b373473444a802fba34c6cefb8 100644 (file)
@@ -38,6 +38,7 @@ import {
   type WorkerChoiceStrategyOptions
 } from './selection-strategies/selection-strategies-types'
 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
+import { version } from './version'
 
 /**
  * Base class that implements some shared logic for all poolifier pools.
@@ -249,11 +250,15 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public get info (): PoolInfo {
     return {
+      version,
       type: this.type,
       worker: this.worker,
       minSize: this.minSize,
       maxSize: this.maxSize,
-      utilization: round(this.utilization),
+      ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+        .runTime.aggregate &&
+        this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+          .waitTime.aggregate && { utilization: round(this.utilization) }),
       workerNodes: this.workerNodes.length,
       idleWorkerNodes: this.workerNodes.reduce(
         (accumulator, workerNode) =>
@@ -295,22 +300,14 @@ export abstract class AbstractPool<
     }
   }
 
-  /**
-   * Gets the pool run time.
-   *
-   * @returns The pool run time in milliseconds.
-   */
-  private get runTime (): number {
-    return performance.now() - this.startTimestamp
-  }
-
   /**
    * Gets the approximate pool utilization.
    *
    * @returns The pool utilization.
    */
   private get utilization (): number {
-    const poolRunTimeCapacity = this.runTime * this.maxSize
+    const poolRunTimeCapacity =
+      (performance.now() - this.startTimestamp) * this.maxSize
     const totalTasksRunTime = this.workerNodes.reduce(
       (accumulator, workerNode) =>
         accumulator + workerNode.usage.runTime.aggregate,
@@ -502,7 +499,13 @@ export abstract class AbstractPool<
       this.workerNodes.map(async (workerNode, workerNodeKey) => {
         this.flushTasksQueue(workerNodeKey)
         // FIXME: wait for tasks to be finished
+        const workerExitPromise = new Promise<void>(resolve => {
+          workerNode.worker.on('exit', () => {
+            resolve()
+          })
+        })
         await this.destroyWorker(workerNode.worker)
+        await workerExitPromise
       })
     )
   }
@@ -582,7 +585,16 @@ export abstract class AbstractPool<
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
         .aggregate
     ) {
-      workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
+      const taskRunTime = message.taskPerformance?.runTime ?? 0
+      workerUsage.runTime.aggregate += taskRunTime
+      workerUsage.runTime.minimum = Math.min(
+        taskRunTime,
+        workerUsage.runTime.minimum
+      )
+      workerUsage.runTime.maximum = Math.max(
+        taskRunTime,
+        workerUsage.runTime.maximum
+      )
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
           .average &&
@@ -613,7 +625,15 @@ export abstract class AbstractPool<
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
         .aggregate
     ) {
-      workerUsage.waitTime.aggregate += taskWaitTime ?? 0
+      workerUsage.waitTime.aggregate += taskWaitTime
+      workerUsage.waitTime.minimum = Math.min(
+        taskWaitTime,
+        workerUsage.waitTime.minimum
+      )
+      workerUsage.waitTime.maximum = Math.max(
+        taskWaitTime,
+        workerUsage.waitTime.maximum
+      )
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .waitTime.average &&
@@ -642,17 +662,33 @@ export abstract class AbstractPool<
       this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
         .aggregate
     ) {
-      if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
+      if (message.taskPerformance?.elu != null) {
         workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
         workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
-        workerUsage.elu.utilization =
-          (workerUsage.elu.utilization +
-            message.taskPerformance.elu.utilization) /
-          2
-      } else if (message.taskPerformance?.elu != null) {
-        workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
-        workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
-        workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+        if (workerUsage.elu.utilization != null) {
+          workerUsage.elu.utilization =
+            (workerUsage.elu.utilization +
+              message.taskPerformance.elu.utilization) /
+            2
+        } else {
+          workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+        }
+        workerUsage.elu.idle.minimum = Math.min(
+          message.taskPerformance.elu.idle,
+          workerUsage.elu.idle.minimum
+        )
+        workerUsage.elu.idle.maximum = Math.max(
+          message.taskPerformance.elu.idle,
+          workerUsage.elu.idle.maximum
+        )
+        workerUsage.elu.active.minimum = Math.min(
+          message.taskPerformance.elu.active,
+          workerUsage.elu.active.minimum
+        )
+        workerUsage.elu.active.maximum = Math.max(
+          message.taskPerformance.elu.active,
+          workerUsage.elu.active.maximum
+        )
       }
       if (
         this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
@@ -763,6 +799,33 @@ export abstract class AbstractPool<
       if (this.emitter != null) {
         this.emitter.emit(PoolEvents.error, error)
       }
+      if (this.opts.enableTasksQueue === true) {
+        const workerNodeKey = this.getWorkerNodeKey(worker)
+        while (this.tasksQueueSize(workerNodeKey) > 0) {
+          let targetWorkerNodeKey: number = workerNodeKey
+          let minQueuedTasks = Infinity
+          for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued === 0
+            ) {
+              targetWorkerNodeKey = workerNodeId
+              break
+            }
+            if (
+              workerNodeId !== workerNodeKey &&
+              workerNode.usage.tasks.queued < minQueuedTasks
+            ) {
+              minQueuedTasks = workerNode.usage.tasks.queued
+              targetWorkerNodeKey = workerNodeId
+            }
+          }
+          this.enqueueTask(
+            targetWorkerNodeKey,
+            this.dequeueTask(workerNodeKey) as Task<Data>
+          )
+        }
+      }
       if (this.opts.restartWorkerOnError === true) {
         this.createAndSetupWorker()
       }
@@ -816,42 +879,53 @@ export abstract class AbstractPool<
     return message => {
       if (message.workerId != null && message.started != null) {
         // Worker started message received
-        const worker = this.getWorkerById(message.workerId)
-        if (worker != null) {
-          this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
-            message.started
-        } else {
-          throw new Error(
-            `Worker started message received from unknown worker '${message.workerId}'`
-          )
-        }
+        this.handleWorkerStartedMessage(message)
       } else if (message.id != null) {
         // Task execution response received
-        const promiseResponse = this.promiseResponseMap.get(message.id)
-        if (promiseResponse != null) {
-          if (message.taskError != null) {
-            if (this.emitter != null) {
-              this.emitter.emit(PoolEvents.taskError, message.taskError)
-            }
-            promiseResponse.reject(message.taskError.message)
-          } else {
-            promiseResponse.resolve(message.data as Response)
-          }
-          this.afterTaskExecutionHook(promiseResponse.worker, message)
-          this.promiseResponseMap.delete(message.id)
-          const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
-          if (
-            this.opts.enableTasksQueue === true &&
-            this.tasksQueueSize(workerNodeKey) > 0
-          ) {
-            this.executeTask(
-              workerNodeKey,
-              this.dequeueTask(workerNodeKey) as Task<Data>
-            )
-          }
-          this.workerChoiceStrategyContext.update(workerNodeKey)
+        this.handleTaskExecutionResponse(message)
+      }
+    }
+  }
+
+  private handleWorkerStartedMessage (message: MessageValue<Response>): void {
+    // Worker started message received
+    const worker = this.getWorkerById(message.workerId as number)
+    if (worker != null) {
+      this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+        message.started as boolean
+    } else {
+      throw new Error(
+        `Worker started message received from unknown worker '${
+          message.workerId as number
+        }'`
+      )
+    }
+  }
+
+  private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+    const promiseResponse = this.promiseResponseMap.get(message.id as string)
+    if (promiseResponse != null) {
+      if (message.taskError != null) {
+        if (this.emitter != null) {
+          this.emitter.emit(PoolEvents.taskError, message.taskError)
         }
+        promiseResponse.reject(message.taskError.message)
+      } else {
+        promiseResponse.resolve(message.data as Response)
+      }
+      this.afterTaskExecutionHook(promiseResponse.worker, message)
+      this.promiseResponseMap.delete(message.id as string)
+      const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+      if (
+        this.opts.enableTasksQueue === true &&
+        this.tasksQueueSize(workerNodeKey) > 0
+      ) {
+        this.executeTask(
+          workerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
       }
+      this.workerChoiceStrategyContext.update(workerNodeKey)
     }
   }
 
@@ -973,13 +1047,11 @@ export abstract class AbstractPool<
   }
 
   private flushTasksQueue (workerNodeKey: number): void {
-    if (this.tasksQueueSize(workerNodeKey) > 0) {
-      for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
-        this.executeTask(
-          workerNodeKey,
-          this.dequeueTask(workerNodeKey) as Task<Data>
-        )
-      }
+    while (this.tasksQueueSize(workerNodeKey) > 0) {
+      this.executeTask(
+        workerNodeKey,
+        this.dequeueTask(workerNodeKey) as Task<Data>
+      )
     }
     this.workerNodes[workerNodeKey].tasksQueue.clear()
   }
@@ -1023,12 +1095,16 @@ export abstract class AbstractPool<
       },
       runTime: {
         aggregate: 0,
+        maximum: 0,
+        minimum: 0,
         average: 0,
         median: 0,
         history: new CircularArray()
       },
       waitTime: {
         aggregate: 0,
+        maximum: 0,
+        minimum: 0,
         average: 0,
         median: 0,
         history: new CircularArray()
@@ -1036,17 +1112,20 @@ export abstract class AbstractPool<
       elu: {
         idle: {
           aggregate: 0,
+          maximum: 0,
+          minimum: 0,
           average: 0,
           median: 0,
           history: new CircularArray()
         },
         active: {
           aggregate: 0,
+          maximum: 0,
+          minimum: 0,
           average: 0,
           median: 0,
           history: new CircularArray()
-        },
-        utilization: 0
+        }
       }
     }
   }