Merge branch 'master' into combine-prs-branch
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 17 Aug 2023 13:39:19 +0000 (15:39 +0200)
committerGitHub <noreply@github.com>
Thu, 17 Aug 2023 13:39:19 +0000 (15:39 +0200)
README.md
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/worker/abstract-worker.ts

index 0faebec4220e2a7e9208b6cd6fe8e1fa8d97401e..d104bd528487a4da5e017c1f0e569184bdf82d7c 100644 (file)
--- a/README.md
+++ b/README.md
@@ -128,7 +128,7 @@ pool.emitter.on(PoolEvents.ready, () => console.info('Pool is ready'))
 pool.emitter.on(PoolEvents.busy, () => console.info('Pool is busy'))
 
 // the execute method signature is the same for both implementations,
-// so you can easy switch from one to another
+// so you can easily switch from one to another
 pool
   .execute()
   .then((res) => {
index 6d71e87e34378abe5c75ad1c1806eacaad979a3b..e7c27cdae8a881392aa3cb4c51e8ab0386365f99 100644 (file)
@@ -92,10 +92,6 @@ export abstract class AbstractPool<
    * The start timestamp of the pool.
    */
   private readonly startTimestamp
-  /**
-   * The task function names.
-   */
-  private taskFunctions!: string[]
 
   /**
    * Constructs a new poolifier pool.
@@ -650,8 +646,11 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public listTaskFunctions (): string[] {
-    if (this.taskFunctions != null) {
-      return this.taskFunctions
+    if (
+      Array.isArray(this.getWorkerInfo(0).taskFunctions) &&
+      (this.getWorkerInfo(0).taskFunctions as string[]).length > 0
+    ) {
+      return this.getWorkerInfo(0).taskFunctions as string[]
     } else {
       return []
     }
@@ -674,20 +673,22 @@ export abstract class AbstractPool<
       ) {
         reject(new TypeError('name argument must not be an empty string'))
       }
+      if (transferList != null && !Array.isArray(transferList)) {
+        reject(new TypeError('transferList argument must be an array'))
+      }
+      const timestamp = performance.now()
+      const workerNodeKey = this.chooseWorkerNode()
       if (
         name != null &&
-        this.taskFunctions != null &&
-        !this.taskFunctions.includes(name)
+        Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
+        !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
+          name
+        )
       ) {
         reject(
           new Error(`Task function '${name}' is not registered in the pool`)
         )
       }
-      if (transferList != null && !Array.isArray(transferList)) {
-        reject(new TypeError('transferList argument must be an array'))
-      }
-      const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode()
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
@@ -778,11 +779,13 @@ export abstract class AbstractPool<
     const workerUsage = this.workerNodes[workerNodeKey].usage
     ++workerUsage.tasks.executing
     this.updateWaitTimeWorkerUsage(workerUsage, task)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      task.name as string
-    ) as WorkerUsage
-    ++taskWorkerUsage.tasks.executing
-    this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(task.name as string) as WorkerUsage
+      ++taskWorkerUsage.tasks.executing
+      this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
+    }
   }
 
   /**
@@ -800,12 +803,23 @@ export abstract class AbstractPool<
     this.updateTaskStatisticsWorkerUsage(workerUsage, message)
     this.updateRunTimeWorkerUsage(workerUsage, message)
     this.updateEluWorkerUsage(workerUsage, message)
-    const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
-      message.taskPerformance?.name ?? DEFAULT_TASK_NAME
-    ) as WorkerUsage
-    this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
-    this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
-    this.updateEluWorkerUsage(taskWorkerUsage, message)
+    if (this.canUpdateTaskWorkerUsage(workerNodeKey)) {
+      const taskWorkerUsage = this.workerNodes[
+        workerNodeKey
+      ].getTaskWorkerUsage(
+        message.taskPerformance?.name ?? DEFAULT_TASK_NAME
+      ) as WorkerUsage
+      this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
+      this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
+      this.updateEluWorkerUsage(taskWorkerUsage, message)
+    }
+  }
+
+  private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+    return (
+      Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
+      (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1
+    )
   }
 
   private updateTaskStatisticsWorkerUsage (
@@ -1119,7 +1133,9 @@ export abstract class AbstractPool<
         this.handleTaskExecutionResponse(message)
       } else if (message.taskFunctions != null) {
         // Task functions message received from worker
-        this.taskFunctions = message.taskFunctions
+        this.getWorkerInfo(
+          this.getWorkerNodeKeyByWorkerId(message.workerId)
+        ).taskFunctions = message.taskFunctions
       }
     }
   }
@@ -1137,19 +1153,18 @@ export abstract class AbstractPool<
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const promiseResponse = this.promiseResponseMap.get(
-      message.taskId as string
-    )
+    const { taskId, taskError, data } = message
+    const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      if (message.taskError != null) {
-        this.emitter?.emit(PoolEvents.taskError, message.taskError)
-        promiseResponse.reject(message.taskError.message)
+      if (taskError != null) {
+        this.emitter?.emit(PoolEvents.taskError, taskError)
+        promiseResponse.reject(taskError.message)
       } else {
-        promiseResponse.resolve(message.data as Response)
+        promiseResponse.resolve(data as Response)
       }
       const workerNodeKey = promiseResponse.workerNodeKey
       this.afterTaskExecutionHook(workerNodeKey, message)
-      this.promiseResponseMap.delete(message.taskId as string)
+      this.promiseResponseMap.delete(taskId as string)
       if (
         this.opts.enableTasksQueue === true &&
         this.tasksQueueSize(workerNodeKey) > 0 &&
index 3b69d3854e07015aa0019ee84f164b8569ffe9aa..4349ed72471b3614e227ab4d374df7f0aba17186 100644 (file)
@@ -2,6 +2,7 @@ import { MessageChannel } from 'node:worker_threads'
 import { CircularArray } from '../circular-array'
 import { Queue } from '../queue'
 import type { Task } from '../utility-types'
+import { DEFAULT_TASK_NAME } from '../utils'
 import {
   type IWorker,
   type IWorkerNode,
@@ -87,6 +88,18 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
+    if (name === DEFAULT_TASK_NAME && !Array.isArray(this.info.taskFunctions)) {
+      throw new Error(
+        'Cannot get task worker usage for default task function name when task function names list is not yet defined'
+      )
+    }
+    if (
+      name === DEFAULT_TASK_NAME &&
+      Array.isArray(this.info.taskFunctions) &&
+      this.info.taskFunctions.length > 1
+    ) {
+      name = this.info.taskFunctions[1]
+    }
     if (!this.tasksUsage.has(name)) {
       this.tasksUsage.set(name, this.initTaskWorkerUsage(name))
     }
index 5dbb6ac5fc16cdb2afd48f968513b1292c257f66..5ac90fbf1783e9e04a7e820e37c961e8527385ff 100644 (file)
@@ -137,6 +137,10 @@ export interface WorkerInfo {
    * Ready flag.
    */
   ready: boolean
+  /**
+   * Task function names.
+   */
+  taskFunctions?: string[]
   /**
    * Message channel.
    */
index 98b32fe4cb9525efba1ab151b8d7b7f3efb2310b..44c392bcf75aba937a694b7dab43f72e07cc7a34 100644 (file)
@@ -265,7 +265,24 @@ export abstract class AbstractWorker<
    * @returns The names of the worker's task functions.
    */
   public listTaskFunctions (): string[] {
-    return [...this.taskFunctions.keys()]
+    const names: string[] = [...this.taskFunctions.keys()]
+    let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
+    for (const [name, fn] of this.taskFunctions) {
+      if (
+        name !== DEFAULT_TASK_NAME &&
+        fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
+      ) {
+        defaultTaskFunctionName = name
+        break
+      }
+    }
+    return [
+      names[names.indexOf(DEFAULT_TASK_NAME)],
+      defaultTaskFunctionName,
+      ...names.filter(
+        (name) => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
+      )
+    ]
   }
 
   /**