fix: avoid duplicate per task function name usage statistics
authorJérôme Benoit <jerome.benoit@sap.com>
Wed, 16 Aug 2023 22:50:42 +0000 (00:50 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Wed, 16 Aug 2023 22:50:42 +0000 (00:50 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/worker/abstract-worker.ts

index 6d71e87e34378abe5c75ad1c1806eacaad979a3b..25c0eba870ee62560e7d47c69cad7909fa4ccc4e 100644 (file)
@@ -92,10 +92,6 @@ export abstract class AbstractPool<
    * The start timestamp of the pool.
    */
   private readonly startTimestamp
-  /**
-   * The task function names.
-   */
-  private taskFunctions!: string[]
 
   /**
    * Constructs a new poolifier pool.
@@ -650,8 +646,11 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (this.taskFunctions != null) {
-      return this.taskFunctions
+    if (
+      Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
+      (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
+    ) {
+      return this.getWorkerInfo(0).taskFunctions as string[]
     } else {
       return []
     }
@@ -674,20 +673,22 @@ export abstract class AbstractPool<
       ) {
         reject(new TypeError('name argument must not be an empty string'))
       }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
+      const timestamp = performance.now()
+      const workerNodeKey = this.chooseWorkerNode()
       if (
         name != null &&
-        this.taskFunctions != null &&
-        !this.taskFunctions.includes(name)
+        Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
+        !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
+          name
+        )
       ) {
         reject(
           new Error(`Task function '${name}' is not registered in the pool`)
         )
       }
-      if (transferList != null && !Array.isArray(transferList)) {
-        reject(new TypeError('transferList argument must be an array'))
-      }
-      const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -778,11 +779,13 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      task.name as string
-    ) as WorkerUsage
-    ++taskWorkerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
+      ++taskWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    }
   }
 
   /**
@@ -800,12 +803,23 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      message.taskPerformance?.name ?? DEFAULT_TASK_NAME
-    ) as WorkerUsage
-    this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-    this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-    this.updateEluWorkerUsage(taskWorkerUsage, message)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) as WorkerUsage
+      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+      this.updateEluWorkerUsage(taskWorkerUsage, message)
+    }
+  }
+
+  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+    return (
+      Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
+      (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1
+    )
   }
 
   private updateTaskStatisticsWorkerUsage (
@@ -1119,7 +1133,9 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.taskFunctions = message.taskFunctions
+        this.getWorkerInfo(
+          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        ).taskFunctions = message.taskFunctions
       }
     }
   }
index 3b69d3854e07015aa0019ee84f164b8569ffe9aa..923fe9b222a4ea9dc4279294d51d0755f67f1ba7 100644 (file)
@@ -2,6 +2,7 @@ import { MessageChannel } from 'node:worker_threads'
 import { CircularArray } from '../circular-array'
 import { Queue } from '../queue'
 import type { Task } from '../utility-types'
+import { DEFAULT_TASK_NAME } from '../utils'
 import {
   type IWorker,
   type IWorkerNode,
@@ -22,6 +23,7 @@ implements IWorkerNode<Worker, Data> {
   public readonly worker: Worker
   public readonly info: WorkerInfo
   public usage: WorkerUsage
+  public taskFunctions!: string[]
   private readonly tasksUsage: Map<string, WorkerUsage>
   private readonly tasksQueue: Queue<Task<Data>>
 
@@ -87,6 +89,13 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
+    if (
+      name === DEFAULT_TASK_NAME &&
+      Array.isArray(this.taskFunctions) &&
+      this.taskFunctions.length > 1
+    ) {
+      name = this.taskFunctions[1]
+    }
     if (!this.tasksUsage.has(name)) {
       this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
     }
index 5dbb6ac5fc16cdb2afd48f968513b1292c257f66..5ac90fbf1783e9e04a7e820e37c961e8527385ff 100644 (file)
@@ -137,6 +137,10 @@ export interface WorkerInfo {
    * Ready flag.
    */
   ready: boolean
+  /**
+   * Task function names.
+   */
+  taskFunctions?: string[]
   /**
    * Message channel.
    */
index 98b32fe4cb9525efba1ab151b8d7b7f3efb2310b..44c392bcf75aba937a694b7dab43f72e07cc7a34 100644 (file)
@@ -265,7 +265,24 @@ export abstract class AbstractWorker<
    * @returns The names of the worker's task functions.
    */
   public listTaskFunctions (): string[] {
-    return [...this.taskFunctions.keys()]
+    const names: string[] = [...this.taskFunctions.keys()]
+    let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
+    for (const [name, fn] of this.taskFunctions) {
+      if (
+        name !== DEFAULT_TASK_NAME &&
+        fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
+      ) {
+        defaultTaskFunctionName = name
+        break
+      }
+    }
+    return [
+      names[names.indexOf(DEFAULT_TASK_NAME)],
+      defaultTaskFunctionName,
+      ...names.filter(
+        (name) => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
+      )
+    ]
   }
 
   /**