fix: fix 'queued' value in worker usage
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 8 Jun 2023 19:14:29 +0000 (21:14 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 8 Jun 2023 19:14:29 +0000 (21:14 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker.ts

index e941fabb772690f0797c204226430f113ea97d43..db0d5455d7987002ae5cd8fd5fe8a4ea231942bb 100644 (file)
@@ -21,7 +21,13 @@ import {
   type TasksQueueOptions,
   type WorkerType
 } from './pool'
-import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
+import type {
+  IWorker,
+  Task,
+  TaskStatistics,
+  WorkerNode,
+  WorkerUsage
+} from './worker'
 import {
   WorkerChoiceStrategies,
   type WorkerChoiceStrategy,
@@ -309,30 +315,10 @@ export abstract class AbstractPool<
       this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
     }
     for (const workerNode of this.workerNodes) {
-      this.setWorkerNodeTasksUsage(workerNode, {
-        tasks: {
-          executed: 0,
-          executing: 0,
-          queued:
-            this.opts.enableTasksQueue === true
-              ? workerNode.tasksQueue.size
-              : 0,
-          failed: 0
-        },
-        runTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-        waitTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-        elu: undefined
-      })
+      this.setWorkerNodeTasksUsage(
+        workerNode,
+        this.getWorkerUsage(workerNode.worker)
+      )
       this.setWorkerStatistics(workerNode.worker)
     }
   }
@@ -479,10 +465,6 @@ export abstract class AbstractPool<
    */
   protected beforeTaskExecutionHook (workerNodeKey: number): void {
     ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
-    if (this.opts.enableTasksQueue === true) {
-      this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
-        this.tasksQueueSize(workerNodeKey)
-    }
   }
 
   /**
@@ -759,52 +741,31 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     return this.workerNodes.push({
       worker,
-      workerUsage: {
-        tasks: {
-          executed: 0,
-          executing: 0,
-          queued: 0,
-          failed: 0
-        },
-        runTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-
-        waitTime: {
-          aggregation: 0,
-          average: 0,
-          median: 0,
-          history: new CircularArray()
-        },
-        elu: undefined
-      },
+      workerUsage: this.getWorkerUsage(worker),
       tasksQueue: new Queue<Task<Data>>()
     })
   }
 
-  /**
-   * Sets the given worker in the pool worker nodes.
-   *
-   * @param workerNodeKey - The worker node key.
-   * @param worker - The worker.
-   * @param workerUsage - The worker usage.
-   * @param tasksQueue - The worker task queue.
-   */
-  private setWorkerNode (
-    workerNodeKey: number,
-    worker: Worker,
-    workerUsage: WorkerUsage,
-    tasksQueue: Queue<Task<Data>>
-  ): void {
-    this.workerNodes[workerNodeKey] = {
-      worker,
-      workerUsage,
-      tasksQueue
-    }
-  }
+  // /**
+  //  * Sets the given worker in the pool worker nodes.
+  //  *
+  //  * @param workerNodeKey - The worker node key.
+  //  * @param worker - The worker.
+  //  * @param workerUsage - The worker usage.
+  //  * @param tasksQueue - The worker task queue.
+  //  */
+  // private setWorkerNode (
+  //   workerNodeKey: number,
+  //   worker: Worker,
+  //   workerUsage: WorkerUsage,
+  //   tasksQueue: Queue<Task<Data>>
+  // ): void {
+  //   this.workerNodes[workerNodeKey] = {
+  //     worker,
+  //     workerUsage,
+  //     tasksQueue
+  //   }
+  // }
 
   /**
    * Removes the given worker from the pool worker nodes.
@@ -867,4 +828,37 @@ export abstract class AbstractPool<
       }
     })
   }
+
+  private getWorkerUsage (worker: Worker): WorkerUsage {
+    return {
+      tasks: this.getTaskStatistics(this, worker),
+      runTime: {
+        aggregation: 0,
+        average: 0,
+        median: 0,
+        history: new CircularArray()
+      },
+      waitTime: {
+        aggregation: 0,
+        average: 0,
+        median: 0,
+        history: new CircularArray()
+      },
+      elu: undefined
+    }
+  }
+
+  private getTaskStatistics (
+    self: AbstractPool<Worker, Data, Response>,
+    worker: Worker
+  ): TaskStatistics {
+    return {
+      executed: 0,
+      executing: 0,
+      get queued (): number {
+        return self.tasksQueueSize(self.getWorkerNodeKey(worker))
+      },
+      failed: 0
+    }
+  }
 }
index ba9393ab9ba8241e5f1a1184165d42055b859835..b6dddd1d59ac24916dd8ab49591dc31c1edfa93a 100644 (file)
@@ -97,7 +97,7 @@ export interface TaskStatistics {
   /**
    * Number of tasks queued.
    */
-  queued: number
+  readonly queued: number
   /**
    * Number of tasks failed.
    */