refactor: use pool side task functions list for hasTaskFunction()
[poolifier.git] / src / pools / abstract-pool.ts
index 3dcbceaba1bfb6012f98c80c73f81a332f80e468..1b53065c54e18318f1295bd68e6340dc5cbc76c1 100644 (file)
@@ -21,6 +21,7 @@ import {
   updateMeasurementStatistics
 } from '../utils'
 import { KillBehaviors } from '../worker/worker-options'
+import type { TaskFunction } from '../worker/task-functions'
 import {
   type IPool,
   PoolEmitter,
@@ -722,19 +723,72 @@ export abstract class AbstractPool<
     }
   }
 
+  private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
+    let messagesCount = 0
+    for (const [workerNodeKey] of this.workerNodes.entries()) {
+      this.sendToWorker(workerNodeKey, {
+        ...message,
+        workerId: this.getWorkerInfo(workerNodeKey).id as number
+      })
+      ++messagesCount
+    }
+    return messagesCount
+  }
+
   /** @inheritDoc */
-  public listTaskFunctions (): string[] {
+  public hasTaskFunction (name: string): boolean {
     for (const workerNode of this.workerNodes) {
       if (
-        Array.isArray(workerNode.info.taskFunctions) &&
-        workerNode.info.taskFunctions.length > 0
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.includes(name)
       ) {
-        return workerNode.info.taskFunctions
+        return true
+      }
+    }
+    return false
+  }
+
+  /** @inheritDoc */
+  public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
+    this.sendToWorkers({
+      taskFunctionOperation: 'add',
+      taskFunctionName: name,
+      taskFunction: taskFunction.toString()
+    })
+    return true
+  }
+
+  /** @inheritDoc */
+  public removeTaskFunction (name: string): boolean {
+    this.sendToWorkers({
+      taskFunctionOperation: 'remove',
+      taskFunctionName: name
+    })
+    return true
+  }
+
+  /** @inheritDoc */
+  public listTaskFunctionNames (): string[] {
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.length > 0
+      ) {
+        return workerNode.info.taskFunctionNames
       }
     }
     return []
   }
 
+  /** @inheritDoc */
+  public setDefaultTaskFunction (name: string): boolean {
+    this.sendToWorkers({
+      taskFunctionOperation: 'default',
+      taskFunctionName: name
+    })
+    return true
+  }
+
   private shallExecuteTask (workerNodeKey: number): boolean {
     return (
       this.tasksQueueSize(workerNodeKey) === 0 &&
@@ -921,8 +975,8 @@ export abstract class AbstractPool<
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
       workerInfo != null &&
-      Array.isArray(workerInfo.taskFunctions) &&
-      workerInfo.taskFunctions.length > 2
+      Array.isArray(workerInfo.taskFunctionNames) &&
+      workerInfo.taskFunctionNames.length > 2
     )
   }
 
@@ -937,7 +991,7 @@ export abstract class AbstractPool<
     ) {
       --workerTaskStatistics.executing
     }
-    if (message.taskError == null) {
+    if (message.workerError == null) {
       ++workerTaskStatistics.executed
     } else {
       ++workerTaskStatistics.failed
@@ -948,7 +1002,7 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
-    if (message.taskError != null) {
+    if (message.workerError != null) {
       return
     }
     updateMeasurementStatistics(
@@ -975,7 +1029,7 @@ export abstract class AbstractPool<
     workerUsage: WorkerUsage,
     message: MessageValue<Response>
   ): void {
-    if (message.taskError != null) {
+    if (message.workerError != null) {
       return
     }
     const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
@@ -1071,8 +1125,8 @@ export abstract class AbstractPool<
       this.emitter?.emit(PoolEvents.error, error)
       if (
         this.opts.restartWorkerOnError === true &&
-        !this.starting &&
-        this.started
+        this.started &&
+        !this.starting
       ) {
         if (workerInfo.dynamic) {
           this.createAndSetupDynamicWorkerNode()
@@ -1320,17 +1374,19 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null && message.taskFunctions != null) {
+      if (message.ready != null && message.taskFunctionNames != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
         // Task execution response received from worker
         this.handleTaskExecutionResponse(message)
-      } else if (message.taskFunctions != null) {
-        // Task functions message received from worker
+      } else if (message.taskFunctionNames != null) {
+        // Task function names message received from worker
         this.getWorkerInfo(
           this.getWorkerNodeKeyByWorkerId(message.workerId)
-        ).taskFunctions = message.taskFunctions
+        ).taskFunctionNames = message.taskFunctionNames
+      } else if (message.taskFunctionOperation != null) {
+        // Task function operation response received from worker
       }
     }
   }
@@ -1343,19 +1399,19 @@ export abstract class AbstractPool<
       this.getWorkerNodeKeyByWorkerId(message.workerId)
     )
     workerInfo.ready = message.ready as boolean
-    workerInfo.taskFunctions = message.taskFunctions
+    workerInfo.taskFunctionNames = message.taskFunctionNames
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
   }
 
   private handleTaskExecutionResponse (message: MessageValue<Response>): void {
-    const { taskId, taskError, data } = message
+    const { taskId, workerError, data } = message
     const promiseResponse = this.promiseResponseMap.get(taskId as string)
     if (promiseResponse != null) {
-      if (taskError != null) {
-        this.emitter?.emit(PoolEvents.taskError, taskError)
-        promiseResponse.reject(taskError.message)
+      if (workerError != null) {
+        this.emitter?.emit(PoolEvents.taskError, workerError)
+        promiseResponse.reject(workerError.message)
       } else {
         promiseResponse.resolve(data as Response)
       }