Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
index 90cede3a8227f7e21cd7a0f32ec84e4f8e6bdbf6..eb517943a6dc74b6ec88ca9cd17f4ca5c5eb5365 100644 (file)
@@ -93,6 +93,13 @@ export abstract class AbstractPool<
    */
   protected readonly max?: number
 
+  /**
+   * The task functions added at runtime map:
+   * - `key`: The task function name.
+   * - `value`: The task function itself.
+   */
+  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
   /**
    * Whether the pool is starting or not.
    */
@@ -146,6 +153,8 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
+    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
     this.starting = true
     this.startPool()
     this.starting = false
@@ -601,7 +610,7 @@ export abstract class AbstractPool<
    * @param workerId - The worker id.
    * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerNodeKeyByWorkerId (workerId: number): number {
+  private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
     return this.workerNodes.findIndex(
       workerNode => workerNode.info.id === workerId
     )
@@ -723,44 +732,105 @@ export abstract class AbstractPool<
     }
   }
 
-  private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
-    let messagesCount = 0
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.sendToWorker(workerNodeKey, {
-        ...message,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number
+  private async sendTaskFunctionOperationToWorker (
+    workerNodeKey: number,
+    message: MessageValue<Data>
+  ): Promise<boolean> {
+    const workerId = this.getWorkerInfo(workerNodeKey).id as number
+    return await new Promise<boolean>((resolve, reject) => {
+      this.registerWorkerMessageListener(workerNodeKey, message => {
+        if (
+          message.workerId === workerId &&
+          message.taskFunctionOperationStatus === true
+        ) {
+          resolve(true)
+        } else if (
+          message.workerId === workerId &&
+          message.taskFunctionOperationStatus === false
+        ) {
+          reject(
+            new Error(
+              `Task function operation ${
+                message.taskFunctionOperation as string
+              } failed on worker ${message.workerId}`
+            )
+          )
+        }
       })
-      ++messagesCount
-    }
-    return messagesCount
+      this.sendToWorker(workerNodeKey, message)
+    })
+  }
+
+  private async sendTaskFunctionOperationToWorkers (
+    message: Omit<MessageValue<Data>, 'workerId'>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const responsesReceived = new Array<MessageValue<Data | Response>>()
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(workerNodeKey, message => {
+          if (message.taskFunctionOperationStatus != null) {
+            responsesReceived.push(message)
+            if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.every(
+                message => message.taskFunctionOperationStatus === true
+              )
+            ) {
+              resolve(true)
+            } else if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.some(
+                message => message.taskFunctionOperationStatus === false
+              )
+            ) {
+              reject(
+                new Error(
+                  `Task function operation ${
+                    message.taskFunctionOperation as string
+                  } failed on worker ${message.workerId as number}`
+                )
+              )
+            }
+          }
+        })
+        this.sendToWorker(workerNodeKey, message)
+      }
+    })
   }
 
   /** @inheritDoc */
   public hasTaskFunction (name: string): boolean {
-    this.sendToWorkers({
-      taskFunctionOperation: 'has',
-      taskFunctionName: name
-    })
-    return true
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.includes(name)
+      ) {
+        return true
+      }
+    }
+    return false
   }
 
   /** @inheritDoc */
-  public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
-    this.sendToWorkers({
+  public async addTaskFunction (
+    name: string,
+    taskFunction: TaskFunction<Data, Response>
+  ): Promise<boolean> {
+    this.taskFunctions.set(name, taskFunction)
+    return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionName: name,
       taskFunction: taskFunction.toString()
     })
-    return true
   }
 
   /** @inheritDoc */
-  public removeTaskFunction (name: string): boolean {
-    this.sendToWorkers({
+  public async removeTaskFunction (name: string): Promise<boolean> {
+    this.taskFunctions.delete(name)
+    return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'remove',
       taskFunctionName: name
     })
-    return true
   }
 
   /** @inheritDoc */
@@ -777,12 +847,11 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public setDefaultTaskFunction (name: string): boolean {
-    this.sendToWorkers({
+  public async setDefaultTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'default',
       taskFunctionName: name
     })
-    return true
   }
 
   private shallExecuteTask (workerNodeKey: number): boolean {
@@ -828,7 +897,6 @@ export abstract class AbstractPool<
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -860,18 +928,23 @@ export abstract class AbstractPool<
   }
 
   protected async sendKillMessageToWorker (
-    workerNodeKey: number,
-    workerId: number
+    workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
       this.registerWorkerMessageListener(workerNodeKey, message => {
         if (message.kill === 'success') {
           resolve()
         } else if (message.kill === 'failure') {
-          reject(new Error(`Worker ${workerId} kill message handling failed`))
+          reject(
+            new Error(
+              `Worker ${
+                message.workerId as number
+              } kill message handling failed`
+            )
+          )
         }
       })
-      this.sendToWorker(workerNodeKey, { kill: true, workerId })
+      this.sendToWorker(workerNodeKey, { kill: true })
     })
   }
 
@@ -1175,9 +1248,19 @@ export abstract class AbstractPool<
     })
     const workerInfo = this.getWorkerInfo(workerNodeKey)
     this.sendToWorker(workerNodeKey, {
-      checkActive: true,
-      workerId: workerInfo.id as number
+      checkActive: true
     })
+    if (this.taskFunctions.size > 0) {
+      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+        this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+          taskFunctionOperation: 'add',
+          taskFunctionName,
+          taskFunction: taskFunction.toString()
+        }).catch(error => {
+          this.emitter?.emit(PoolEvents.error, error)
+        })
+      }
+    }
     workerInfo.dynamic = true
     if (
       this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
@@ -1243,8 +1326,7 @@ export abstract class AbstractPool<
             .runTime.aggregate,
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu.aggregate
-      },
-      workerId: this.getWorkerInfo(workerNodeKey).id as number
+      }
     })
   }
 
@@ -1260,11 +1342,7 @@ export abstract class AbstractPool<
         },
         0
       )
-      const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
-      const task = {
-        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-        workerId: destinationWorkerNode.info.id as number
-      }
+      const task = this.dequeueTask(workerNodeKey) as Task<Data>
       if (this.shallExecuteTask(destinationWorkerNodeKey)) {
         this.executeTask(destinationWorkerNodeKey, task)
       } else {
@@ -1294,7 +1372,6 @@ export abstract class AbstractPool<
 
   private taskStealingOnEmptyQueue (workerId: number): void {
     const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
-    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1308,10 +1385,7 @@ export abstract class AbstractPool<
         workerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
-      const task = {
-        ...(sourceWorkerNode.popTask() as Task<Data>),
-        workerId: destinationWorkerNode.info.id as number
-      }
+      const task = sourceWorkerNode.popTask() as Task<Data>
       if (this.shallExecuteTask(destinationWorkerNodeKey)) {
         this.executeTask(destinationWorkerNodeKey, task)
       } else {
@@ -1345,10 +1419,7 @@ export abstract class AbstractPool<
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
-        const task = {
-          ...(sourceWorkerNode.popTask() as Task<Data>),
-          workerId: workerNode.info.id as number
-        }
+        const task = sourceWorkerNode.popTask() as Task<Data>
         if (this.shallExecuteTask(workerNodeKey)) {
           this.executeTask(workerNodeKey, task)
         } else {
@@ -1381,15 +1452,15 @@ export abstract class AbstractPool<
         this.getWorkerInfo(
           this.getWorkerNodeKeyByWorkerId(message.workerId)
         ).taskFunctionNames = message.taskFunctionNames
-      } else if (message.taskFunctionOperation != null) {
-        // Task function operation response received from worker
       }
     }
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
     if (message.ready === false) {
-      throw new Error(`Worker ${message.workerId} failed to initialize`)
+      throw new Error(
+        `Worker ${message.workerId as number} failed to initialize`
+      )
     }
     const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)