feat: add task function properties support
[poolifier.git] / src / pools / abstract-pool.ts
index 01e621c297a1bc0cb818b807d1e13fe33344952e..88c04a854fde383dfa8f93de6bd3be670a036aa9 100644 (file)
@@ -7,10 +7,12 @@ import type { TransferListItem } from 'node:worker_threads'
 import type {
   MessageValue,
   PromiseResponseWrapper,
-  Task
+  Task,
+  TaskFunctionProperties
 } from '../utility-types.js'
 import {
   average,
+  buildTaskFunctionProperties,
   DEFAULT_TASK_NAME,
   EMPTY_FUNCTION,
   exponentialDelay,
@@ -22,7 +24,10 @@ import {
   round,
   sleep
 } from '../utils.js'
-import type { TaskFunction } from '../worker/task-functions.js'
+import type {
+  TaskFunction,
+  TaskFunctionObject
+} from '../worker/task-functions.js'
 import { KillBehaviors } from '../worker/worker-options.js'
 import {
   type IPool,
@@ -101,9 +106,12 @@ export abstract class AbstractPool<
   /**
    * The task functions added at runtime map:
    * - `key`: The task function name.
-   * - `value`: The task function itself.
+   * - `value`: The task function object.
    */
-  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+  private readonly taskFunctions: Map<
+  string,
+  TaskFunctionObject<Data, Response>
+  >
 
   /**
    * Whether the pool is started or not.
@@ -173,7 +181,7 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
-    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+    this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
 
     this.started = false
     this.starting = false
@@ -805,8 +813,10 @@ export abstract class AbstractPool<
   public hasTaskFunction (name: string): boolean {
     for (const workerNode of this.workerNodes) {
       if (
-        Array.isArray(workerNode.info.taskFunctionNames) &&
-        workerNode.info.taskFunctionNames.includes(name)
+        Array.isArray(workerNode.info.taskFunctionsProperties) &&
+        workerNode.info.taskFunctionsProperties.some(
+          taskFunctionProperties => taskFunctionProperties.name === name
+        )
       ) {
         return true
       }
@@ -817,7 +827,7 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async addTaskFunction (
     name: string,
-    fn: TaskFunction<Data, Response>
+    fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
   ): Promise<boolean> {
     if (typeof name !== 'string') {
       throw new TypeError('name argument must be a string')
@@ -825,13 +835,16 @@ export abstract class AbstractPool<
     if (typeof name === 'string' && name.trim().length === 0) {
       throw new TypeError('name argument must not be an empty string')
     }
-    if (typeof fn !== 'function') {
-      throw new TypeError('fn argument must be a function')
+    if (typeof fn === 'function') {
+      fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
+    }
+    if (typeof fn.taskFunction !== 'function') {
+      throw new TypeError('taskFunction property must be a function')
     }
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
-      taskFunctionName: name,
-      taskFunction: fn.toString()
+      taskFunctionProperties: buildTaskFunctionProperties(name, fn),
+      taskFunction: fn.taskFunction.toString()
     })
     this.taskFunctions.set(name, fn)
     return opResult
@@ -846,7 +859,10 @@ export abstract class AbstractPool<
     }
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'remove',
-      taskFunctionName: name
+      taskFunctionProperties: buildTaskFunctionProperties(
+        name,
+        this.taskFunctions.get(name)
+      )
     })
     this.deleteTaskFunctionWorkerUsages(name)
     this.taskFunctions.delete(name)
@@ -854,13 +870,13 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public listTaskFunctionNames (): string[] {
+  public listTaskFunctionsProperties (): TaskFunctionProperties[] {
     for (const workerNode of this.workerNodes) {
       if (
-        Array.isArray(workerNode.info.taskFunctionNames) &&
-        workerNode.info.taskFunctionNames.length > 0
+        Array.isArray(workerNode.info.taskFunctionsProperties) &&
+        workerNode.info.taskFunctionsProperties.length > 0
       ) {
-        return workerNode.info.taskFunctionNames
+        return workerNode.info.taskFunctionsProperties
       }
     }
     return []
@@ -870,7 +886,10 @@ export abstract class AbstractPool<
   public async setDefaultTaskFunction (name: string): Promise<boolean> {
     return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'default',
-      taskFunctionName: name
+      taskFunctionProperties: buildTaskFunctionProperties(
+        name,
+        this.taskFunctions.get(name)
+      )
     })
   }
 
@@ -1185,8 +1204,8 @@ export abstract class AbstractPool<
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
       workerInfo != null &&
-      Array.isArray(workerInfo.taskFunctionNames) &&
-      workerInfo.taskFunctionNames.length > 2
+      Array.isArray(workerInfo.taskFunctionsProperties) &&
+      workerInfo.taskFunctionsProperties.length > 2
     )
   }
 
@@ -1329,11 +1348,14 @@ export abstract class AbstractPool<
       checkActive: true
     })
     if (this.taskFunctions.size > 0) {
-      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+      for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
         this.sendTaskFunctionOperationToWorker(workerNodeKey, {
           taskFunctionOperation: 'add',
-          taskFunctionName,
-          taskFunction: taskFunction.toString()
+          taskFunctionProperties: buildTaskFunctionProperties(
+            taskFunctionName,
+            taskFunctionObject
+          ),
+          taskFunction: taskFunctionObject.taskFunction.toString()
         }).catch((error: unknown) => {
           this.emitter?.emit(PoolEvents.error, error)
         })
@@ -1587,10 +1609,10 @@ export abstract class AbstractPool<
     ) {
       workerInfo.stealing = false
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      for (const taskName of workerInfo.taskFunctionNames!) {
+      for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
         this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
           workerNodeKey,
-          taskName
+          taskFunctionProperties.name
         )
       }
       this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
@@ -1726,21 +1748,21 @@ export abstract class AbstractPool<
     message: MessageValue<Response>
   ): void => {
     this.checkMessageWorkerId(message)
-    const { workerId, ready, taskId, taskFunctionNames } = message
-    if (ready != null && taskFunctionNames != null) {
+    const { workerId, ready, taskId, taskFunctionsProperties } = message
+    if (ready != null && taskFunctionsProperties != null) {
       // Worker ready response received from worker
       this.handleWorkerReadyResponse(message)
-    } else if (taskId != null) {
-      // Task execution response received from worker
-      this.handleTaskExecutionResponse(message)
-    } else if (taskFunctionNames != null) {
-      // Task function names message received from worker
+    } else if (taskFunctionsProperties != null) {
+      // Task function properties message received from worker
       const workerInfo = this.getWorkerInfo(
         this.getWorkerNodeKeyByWorkerId(workerId)
       )
       if (workerInfo != null) {
-        workerInfo.taskFunctionNames = taskFunctionNames
+        workerInfo.taskFunctionsProperties = taskFunctionsProperties
       }
+    } else if (taskId != null) {
+      // Task execution response received from worker
+      this.handleTaskExecutionResponse(message)
     }
   }
 
@@ -1752,14 +1774,14 @@ export abstract class AbstractPool<
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
-    const { workerId, ready, taskFunctionNames } = message
+    const { workerId, ready, taskFunctionsProperties } = message
     if (ready == null || !ready) {
       throw new Error(`Worker ${workerId} failed to initialize`)
     }
     const workerNode =
       this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
     workerNode.info.ready = ready
-    workerNode.info.taskFunctionNames = taskFunctionNames
+    workerNode.info.taskFunctionsProperties = taskFunctionsProperties
     this.checkAndEmitReadyEvent()
   }