Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
index 1bd5ee2cc73152e912cc08ecc3492f8e4a9ae6b7..33585218249e599cd1998fdc19158e7ab4b22168 100644 (file)
@@ -768,8 +768,8 @@ export abstract class AbstractPool<
     workerNodeKey: number,
     message: MessageValue<Data>
   ): Promise<boolean> {
-    const workerId = this.getWorkerInfo(workerNodeKey).id as number
     return await new Promise<boolean>((resolve, reject) => {
+      const workerId = this.getWorkerInfo(workerNodeKey).id as number
       this.registerWorkerMessageListener(workerNodeKey, message => {
         if (
           message.workerId === workerId &&
@@ -794,7 +794,7 @@ export abstract class AbstractPool<
   }
 
   private async sendTaskFunctionOperationToWorkers (
-    message: Omit<MessageValue<Data>, 'workerId'>
+    message: MessageValue<Data>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
       const responsesReceived = new Array<MessageValue<Data | Response>>()
@@ -857,12 +857,13 @@ export abstract class AbstractPool<
     if (typeof fn !== 'function') {
       throw new TypeError('fn argument must be a function')
     }
-    this.taskFunctions.set(name, fn)
-    return await this.sendTaskFunctionOperationToWorkers({
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionName: name,
       taskFunction: fn.toString()
     })
+    this.taskFunctions.set(name, fn)
+    return opResult
   }
 
   /** @inheritDoc */
@@ -872,11 +873,13 @@ export abstract class AbstractPool<
         'Cannot remove a task function not handled on the pool side'
       )
     }
-    this.taskFunctions.delete(name)
-    return await this.sendTaskFunctionOperationToWorkers({
+    const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'remove',
       taskFunctionName: name
     })
+    this.deleteTaskFunctionWorkerUsages(name)
+    this.taskFunctions.delete(name)
+    return opResult
   }
 
   /** @inheritDoc */
@@ -900,6 +903,12 @@ export abstract class AbstractPool<
     })
   }
 
+  private deleteTaskFunctionWorkerUsages (name: string): void {
+    for (const workerNode of this.workerNodes) {
+      workerNode.deleteTaskFunctionWorkerUsage(name)
+    }
+  }
+
   private shallExecuteTask (workerNodeKey: number): boolean {
     return (
       this.tasksQueueSize(workerNodeKey) === 0 &&