fix: properly handle response for add/remove/set task function operaions
[poolifier.git] / src / pools / abstract-pool.ts
index 90cede3a8227f7e21cd7a0f32ec84e4f8e6bdbf6..c1c0a58c784e71dd3d049d5b16298f86dfd8d0a4 100644 (file)
@@ -723,44 +723,77 @@ export abstract class AbstractPool<
     }
   }
 
-  private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
-    let messagesCount = 0
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.sendToWorker(workerNodeKey, {
-        ...message,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number
-      })
-      ++messagesCount
-    }
-    return messagesCount
+  private async sendTaskFunctionOperationToWorker (
+    message: Omit<MessageValue<Data>, 'workerId'>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const responsesReceived = new Array<MessageValue<Data | Response>>()
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(workerNodeKey, message => {
+          if (message.taskFunctionOperationStatus != null) {
+            responsesReceived.push(message)
+            if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.every(
+                message => message.taskFunctionOperationStatus === true
+              )
+            ) {
+              resolve(true)
+            } else if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.some(
+                message => message.taskFunctionOperationStatus === false
+              )
+            ) {
+              reject(
+                new Error(
+                  `Task function operation ${
+                    message.taskFunctionOperation as string
+                  } failed on worker ${message.workerId}`
+                )
+              )
+            }
+          }
+        })
+        this.sendToWorker(workerNodeKey, {
+          ...message,
+          workerId: this.getWorkerInfo(workerNodeKey).id as number
+        })
+      }
+    })
   }
 
   /** @inheritDoc */
   public hasTaskFunction (name: string): boolean {
-    this.sendToWorkers({
-      taskFunctionOperation: 'has',
-      taskFunctionName: name
-    })
-    return true
+    for (const workerNode of this.workerNodes) {
+      if (
+        Array.isArray(workerNode.info.taskFunctionNames) &&
+        workerNode.info.taskFunctionNames.includes(name)
+      ) {
+        return true
+      }
+    }
+    return false
   }
 
   /** @inheritDoc */
-  public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
-    this.sendToWorkers({
+  public async addTaskFunction (
+    name: string,
+    taskFunction: TaskFunction
+  ): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorker({
       taskFunctionOperation: 'add',
       taskFunctionName: name,
       taskFunction: taskFunction.toString()
     })
-    return true
   }
 
   /** @inheritDoc */
-  public removeTaskFunction (name: string): boolean {
-    this.sendToWorkers({
+  public async removeTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorker({
       taskFunctionOperation: 'remove',
       taskFunctionName: name
     })
-    return true
   }
 
   /** @inheritDoc */
@@ -777,12 +810,11 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public setDefaultTaskFunction (name: string): boolean {
-    this.sendToWorkers({
+  public async setDefaultTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorker({
       taskFunctionOperation: 'default',
       taskFunctionName: name
     })
-    return true
   }
 
   private shallExecuteTask (workerNodeKey: number): boolean {