fix: properly handle response for add/remove/set task function operaions
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 13 Sep 2023 19:27:13 +0000 (21:27 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 13 Sep 2023 19:27:13 +0000 (21:27 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/index.ts
src/pools/abstract-pool.ts
src/pools/pool.ts
src/worker/abstract-worker.ts
src/worker/task-functions.ts

index d16109532be4620adc94e436e2941be6b6a282cd..8faba2552bb4cb772724aa350016471a36d659d2 100644 (file)
@@ -60,6 +60,7 @@ export type {
 export type {
   TaskAsyncFunction,
   TaskFunction,
+  TaskFunctionOperationReturnType,
   TaskFunctions,
   TaskSyncFunction
 } from './worker/task-functions'
index 1b53065c54e18318f1295bd68e6340dc5cbc76c1..c1c0a58c784e71dd3d049d5b16298f86dfd8d0a4 100644 (file)
@@ -723,16 +723,44 @@ export abstract class AbstractPool<
     }
   }
 
-  private sendToWorkers (message: Omit<MessageValue<Data>, 'workerId'>): number {
-    let messagesCount = 0
-    for (const [workerNodeKey] of this.workerNodes.entries()) {
-      this.sendToWorker(workerNodeKey, {
-        ...message,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number
-      })
-      ++messagesCount
-    }
-    return messagesCount
+  private async sendTaskFunctionOperationToWorker (
+    message: Omit<MessageValue<Data>, 'workerId'>
+  ): Promise<boolean> {
+    return await new Promise<boolean>((resolve, reject) => {
+      const responsesReceived = new Array<MessageValue<Data | Response>>()
+      for (const [workerNodeKey] of this.workerNodes.entries()) {
+        this.registerWorkerMessageListener(workerNodeKey, message => {
+          if (message.taskFunctionOperationStatus != null) {
+            responsesReceived.push(message)
+            if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.every(
+                message => message.taskFunctionOperationStatus === true
+              )
+            ) {
+              resolve(true)
+            } else if (
+              responsesReceived.length === this.workerNodes.length &&
+              responsesReceived.some(
+                message => message.taskFunctionOperationStatus === false
+              )
+            ) {
+              reject(
+                new Error(
+                  `Task function operation ${
+                    message.taskFunctionOperation as string
+                  } failed on worker ${message.workerId}`
+                )
+              )
+            }
+          }
+        })
+        this.sendToWorker(workerNodeKey, {
+          ...message,
+          workerId: this.getWorkerInfo(workerNodeKey).id as number
+        })
+      }
+    })
   }
 
   /** @inheritDoc */
@@ -749,22 +777,23 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public addTaskFunction (name: string, taskFunction: TaskFunction): boolean {
-    this.sendToWorkers({
+  public async addTaskFunction (
+    name: string,
+    taskFunction: TaskFunction
+  ): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorker({
       taskFunctionOperation: 'add',
       taskFunctionName: name,
       taskFunction: taskFunction.toString()
     })
-    return true
   }
 
   /** @inheritDoc */
-  public removeTaskFunction (name: string): boolean {
-    this.sendToWorkers({
+  public async removeTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorker({
       taskFunctionOperation: 'remove',
       taskFunctionName: name
     })
-    return true
   }
 
   /** @inheritDoc */
@@ -781,12 +810,11 @@ export abstract class AbstractPool<
   }
 
   /** @inheritDoc */
-  public setDefaultTaskFunction (name: string): boolean {
-    this.sendToWorkers({
+  public async setDefaultTaskFunction (name: string): Promise<boolean> {
+    return await this.sendTaskFunctionOperationToWorker({
       taskFunctionOperation: 'default',
       taskFunctionName: name
     })
-    return true
   }
 
   private shallExecuteTask (workerNodeKey: number): boolean {
index 6156a0421170e6baa8ad7c2071b755da554a03ed..a19b70a387665bf13d8c7f796390928da52ed0d8 100644 (file)
@@ -252,14 +252,14 @@ export interface IPool<
   readonly addTaskFunction: (
     name: string,
     taskFunction: TaskFunction
-  ) => boolean
+  ) => Promise<boolean>
   /**
    * Removes a task function from this pool.
    *
    * @param name - The name of the task function.
    * @returns `true` if the task function was removed, `false` otherwise.
    */
-  readonly removeTaskFunction: (name: string) => boolean
+  readonly removeTaskFunction: (name: string) => Promise<boolean>
   /**
    * Lists the names of task function available in this pool.
    *
@@ -272,7 +272,7 @@ export interface IPool<
    * @param name - The name of the task function.
    * @returns `true` if the default task function was set, `false` otherwise.
    */
-  readonly setDefaultTaskFunction: (name: string) => boolean
+  readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
   /**
    * Sets the worker choice strategy in this pool.
    *
index 8cd370de2e3b7671c2f697c106acff3dc12e267f..107d72384166053e5783f121ef14d0a7abbe69ca 100644 (file)
@@ -18,15 +18,11 @@ import { KillBehaviors, type WorkerOptions } from './worker-options'
 import type {
   TaskAsyncFunction,
   TaskFunction,
+  TaskFunctionOperationReturnType,
   TaskFunctions,
   TaskSyncFunction
 } from './task-functions'
 
-interface TaskFunctionOperationReturnType {
-  status: boolean
-  error?: Error
-}
-
 const DEFAULT_MAX_INACTIVE_TIME = 60000
 const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
   /**
@@ -217,7 +213,7 @@ export abstract class AbstractWorker<
         this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
       }
       this.taskFunctions.set(name, boundFn)
-      this.sendTaskFunctionsListToMainWorker()
+      this.sendTaskFunctionNamesToMainWorker()
       return { status: true }
     } catch (error) {
       return { status: false, error: error as Error }
@@ -247,7 +243,7 @@ export abstract class AbstractWorker<
         )
       }
       const deleteStatus = this.taskFunctions.delete(name)
-      this.sendTaskFunctionsListToMainWorker()
+      this.sendTaskFunctionNamesToMainWorker()
       return { status: deleteStatus }
     } catch (error) {
       return { status: false, error: error as Error }
@@ -485,9 +481,9 @@ export abstract class AbstractWorker<
   ): void
 
   /**
-   * Sends the list of task function names to the main worker.
+   * Sends task function names to the main worker.
    */
-  protected sendTaskFunctionsListToMainWorker (): void {
+  protected sendTaskFunctionNamesToMainWorker (): void {
     this.sendToMainWorker({
       taskFunctionNames: this.listTaskFunctionNames(),
       workerId: this.id
index a61d69627ce6eeebd36e462f5801c36e94cc8e06..353b8b0c1f8b329d71c89e6a89bbbdd921d3d548 100644 (file)
@@ -43,3 +43,11 @@ export type TaskFunctions<Data = unknown, Response = unknown> = Record<
 string,
 TaskFunction<Data, Response>
 >
+
+/**
+ * Task function operation return type.
+ */
+export interface TaskFunctionOperationReturnType {
+  status: boolean
+  error?: Error
+}