fix: ensure task function worker usage is updated if there's at least 2
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 16:41:25 +0000 (18:41 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 18 Aug 2023 16:41:25 +0000 (18:41 +0200)
task functions

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts

index 05f5c7a944c8db3cb0023b8da573f4b8556071a0..3fb91510df4f0dab9b69005e38252a83b82825c9 100644 (file)
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Fix race condition between ready and task functions worker message handling at startup.
 - Fix duplicate task usage statistics computation per task function.
+- Update task function worker usage statistics if and only if there's at least two different task functions.
 
 ### Added
 
index 64fda2545627eec3fbcd3a7bdc3b7edd4ecb7ed2..06d09e6b72cc8937a950a3f129e7a97699d4f415 100644 (file)
@@ -799,12 +799,12 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
-      const taskWorkerUsage = this.workerNodes[
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
-      ++taskWorkerUsage.tasks.executing
-      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+      ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
+      ++taskFunctionWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
     }
   }
 
@@ -823,23 +823,29 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
-      const taskWorkerUsage = this.workerNodes[
+    if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
+      const taskFunctionWorkerUsage = this.workerNodes[
         workerNodeKey
-      ].getTaskWorkerUsage(
+      ].getTaskFunctionWorkerUsage(
         message.taskPerformance?.name ?? DEFAULT_TASK_NAME
       ) as WorkerUsage
-      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-      this.updateEluWorkerUsage(taskWorkerUsage, message)
+      this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
+      this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
     }
   }
 
-  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+  /**
+   * Whether the worker node shall update its task function worker usage or not.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
+   */
+  private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
       Array.isArray(workerInfo.taskFunctions) &&
-      workerInfo.taskFunctions.length > 1
+      workerInfo.taskFunctions.length > 2
     )
   }
 
index 812d14448f6fc4c7b221a74a23958920627f94ac..7667dd16a6b99c2994eaeda37866662b43f8d178 100644 (file)
@@ -28,7 +28,7 @@ implements IWorkerNode<Worker, Data> {
   public messageChannel?: MessageChannel
   /** @inheritdoc */
   public usage: WorkerUsage
-  private readonly tasksUsage: Map<string, WorkerUsage>
+  private readonly taskFunctionsUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Queue<Task<Data>>
   private readonly tasksQueueBackPressureSize: number
 
@@ -46,7 +46,7 @@ implements IWorkerNode<Worker, Data> {
       this.messageChannel = new MessageChannel()
     }
     this.usage = this.initWorkerUsage()
-    this.tasksUsage = new Map<string, WorkerUsage>()
+    this.taskFunctionsUsage = new Map<string, WorkerUsage>()
     this.tasksQueue = new Queue<Task<Data>>()
     this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2)
   }
@@ -88,7 +88,7 @@ implements IWorkerNode<Worker, Data> {
   /** @inheritdoc */
   public resetUsage (): void {
     this.usage = this.initWorkerUsage()
-    this.tasksUsage.clear()
+    this.taskFunctionsUsage.clear()
   }
 
   /** @inheritdoc */
@@ -103,23 +103,27 @@ implements IWorkerNode<Worker, Data> {
   }
 
   /** @inheritdoc */
-  public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
+  public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
     if (!Array.isArray(this.info.taskFunctions)) {
       throw new Error(
-        `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
+        `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
       )
     }
     if (
-      name === DEFAULT_TASK_NAME &&
       Array.isArray(this.info.taskFunctions) &&
-      this.info.taskFunctions.length > 1
+      this.info.taskFunctions.length < 3
     ) {
+      throw new Error(
+        `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
+      )
+    }
+    if (name === DEFAULT_TASK_NAME) {
       name = this.info.taskFunctions[1]
     }
-    if (!this.tasksUsage.has(name)) {
-      this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
+    if (!this.taskFunctionsUsage.has(name)) {
+      this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
     }
-    return this.tasksUsage.get(name)
+    return this.taskFunctionsUsage.get(name)
   }
 
   private initWorkerInfo (worker: Worker, workerType: WorkerType): WorkerInfo {
@@ -167,7 +171,7 @@ implements IWorkerNode<Worker, Data> {
     }
   }
 
-  private initTaskWorkerUsage (name: string): WorkerUsage {
+  private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
     const getTaskQueueSize = (): number => {
       let taskQueueSize = 0
       for (const task of this.tasksQueue) {
index d6c236591341c2fb73086d62cdb111cecd2a012e..0bdda982b16819ba1abdb65062c086e439344b97 100644 (file)
@@ -262,5 +262,5 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    * @param name - The task function name.
    * @returns The task function worker usage statistics if the task function worker usage statistics are initialized, `undefined` otherwise.
    */
-  readonly getTaskWorkerUsage: (name: string) => WorkerUsage | undefined
+  readonly getTaskFunctionWorkerUsage: (name: string) => WorkerUsage | undefined
 }