fix: handle added function at runtime with dynamic worker
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 13 Sep 2023 20:23:58 +0000 (22:23 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 13 Sep 2023 20:23:58 +0000 (22:23 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/pool.ts
src/pools/thread/fixed.ts
src/utility-types.ts

index c1c0a58c784e71dd3d049d5b16298f86dfd8d0a4..6a26c7b7a63129c1d51e1bcfac0804b2d9962608 100644 (file)
@@ -93,6 +93,13 @@ export abstract class AbstractPool<
    */
   protected readonly max?: number
 
+  /**
+   * The task functions added at runtime map:
+   * - `key`: The task function name.
+   * - `value`: The task function itself.
+   */
+  private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
+
   /**
    * Whether the pool is starting or not.
    */
@@ -146,6 +153,8 @@ export abstract class AbstractPool<
 
     this.setupHook()
 
+    this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
+
     this.starting = true
     this.startPool()
     this.starting = false
@@ -601,7 +610,7 @@ export abstract class AbstractPool<
    * @param workerId - The worker id.
    * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
    */
-  private getWorkerNodeKeyByWorkerId (workerId: number): number {
+  private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
     return this.workerNodes.findIndex(
       workerNode => workerNode.info.id === workerId
     )
@@ -724,6 +733,35 @@ export abstract class AbstractPool<
   }
 
   private async sendTaskFunctionOperationToWorker (
+    workerNodeKey: number,
+    message: MessageValue<Data>
+  ): Promise<boolean> {
+    const workerId = this.getWorkerInfo(workerNodeKey).id as number
+    return await new Promise<boolean>((resolve, reject) => {
+      this.registerWorkerMessageListener(workerNodeKey, message => {
+        if (
+          message.workerId === workerId &&
+          message.taskFunctionOperationStatus === true
+        ) {
+          resolve(true)
+        } else if (
+          message.workerId === workerId &&
+          message.taskFunctionOperationStatus === false
+        ) {
+          reject(
+            new Error(
+              `Task function operation ${
+                message.taskFunctionOperation as string
+              } failed on worker ${message.workerId}`
+            )
+          )
+        }
+      })
+      this.sendToWorker(workerNodeKey, message)
+    })
+  }
+
+  private async sendTaskFunctionOperationToWorkers (
     message: Omit<MessageValue<Data>, 'workerId'>
   ): Promise<boolean> {
     return await new Promise<boolean>((resolve, reject) => {
@@ -749,16 +787,13 @@ export abstract class AbstractPool<
                 new Error(
                   `Task function operation ${
                     message.taskFunctionOperation as string
-                  } failed on worker ${message.workerId}`
+                  } failed on worker ${message.workerId as number}`
                 )
               )
             }
           }
         })
-        this.sendToWorker(workerNodeKey, {
-          ...message,
-          workerId: this.getWorkerInfo(workerNodeKey).id as number
-        })
+        this.sendToWorker(workerNodeKey, message)
       }
     })
   }
@@ -779,9 +814,10 @@ export abstract class AbstractPool<
   /** @inheritDoc */
   public async addTaskFunction (
     name: string,
-    taskFunction: TaskFunction
+    taskFunction: TaskFunction<Data, Response>
   ): Promise<boolean> {
-    return await this.sendTaskFunctionOperationToWorker({
+    this.taskFunctions.set(name, taskFunction)
+    return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionName: name,
       taskFunction: taskFunction.toString()
@@ -790,7 +826,8 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async removeTaskFunction (name: string): Promise<boolean> {
-    return await this.sendTaskFunctionOperationToWorker({
+    this.taskFunctions.delete(name)
+    return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'remove',
       taskFunctionName: name
     })
@@ -811,7 +848,7 @@ export abstract class AbstractPool<
 
   /** @inheritDoc */
   public async setDefaultTaskFunction (name: string): Promise<boolean> {
-    return await this.sendTaskFunctionOperationToWorker({
+    return await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'default',
       taskFunctionName: name
     })
@@ -860,7 +897,6 @@ export abstract class AbstractPool<
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -892,18 +928,23 @@ export abstract class AbstractPool<
   }
 
   protected async sendKillMessageToWorker (
-    workerNodeKey: number,
-    workerId: number
+    workerNodeKey: number
   ): Promise<void> {
     await new Promise<void>((resolve, reject) => {
       this.registerWorkerMessageListener(workerNodeKey, message => {
         if (message.kill === 'success') {
           resolve()
         } else if (message.kill === 'failure') {
-          reject(new Error(`Worker ${workerId} kill message handling failed`))
+          reject(
+            new Error(
+              `Worker ${
+                message.workerId as number
+              } kill message handling failed`
+            )
+          )
         }
       })
-      this.sendToWorker(workerNodeKey, { kill: true, workerId })
+      this.sendToWorker(workerNodeKey, { kill: true })
     })
   }
 
@@ -1210,6 +1251,17 @@ export abstract class AbstractPool<
       checkActive: true,
       workerId: workerInfo.id as number
     })
+    if (this.taskFunctions.size > 0) {
+      for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
+        this.sendTaskFunctionOperationToWorker(workerNodeKey, {
+          taskFunctionOperation: 'add',
+          taskFunctionName,
+          taskFunction: taskFunction.toString()
+        }).catch(error => {
+          this.emitter?.emit(PoolEvents.error, error)
+        })
+      }
+    }
     workerInfo.dynamic = true
     if (
       this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
@@ -1275,8 +1327,7 @@ export abstract class AbstractPool<
             .runTime.aggregate,
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu.aggregate
-      },
-      workerId: this.getWorkerInfo(workerNodeKey).id as number
+      }
     })
   }
 
@@ -1292,11 +1343,7 @@ export abstract class AbstractPool<
         },
         0
       )
-      const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
-      const task = {
-        ...(this.dequeueTask(workerNodeKey) as Task<Data>),
-        workerId: destinationWorkerNode.info.id as number
-      }
+      const task = this.dequeueTask(workerNodeKey) as Task<Data>
       if (this.shallExecuteTask(destinationWorkerNodeKey)) {
         this.executeTask(destinationWorkerNodeKey, task)
       } else {
@@ -1326,7 +1373,6 @@ export abstract class AbstractPool<
 
   private taskStealingOnEmptyQueue (workerId: number): void {
     const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
-    const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
     const workerNodes = this.workerNodes
       .slice()
       .sort(
@@ -1340,10 +1386,7 @@ export abstract class AbstractPool<
         workerNode.usage.tasks.queued > 0
     )
     if (sourceWorkerNode != null) {
-      const task = {
-        ...(sourceWorkerNode.popTask() as Task<Data>),
-        workerId: destinationWorkerNode.info.id as number
-      }
+      const task = sourceWorkerNode.popTask() as Task<Data>
       if (this.shallExecuteTask(destinationWorkerNodeKey)) {
         this.executeTask(destinationWorkerNodeKey, task)
       } else {
@@ -1377,10 +1420,7 @@ export abstract class AbstractPool<
         workerNode.usage.tasks.queued <
           (this.opts.tasksQueueOptions?.size as number) - sizeOffset
       ) {
-        const task = {
-          ...(sourceWorkerNode.popTask() as Task<Data>),
-          workerId: workerNode.info.id as number
-        }
+        const task = sourceWorkerNode.popTask() as Task<Data>
         if (this.shallExecuteTask(workerNodeKey)) {
           this.executeTask(workerNodeKey, task)
         } else {
@@ -1413,15 +1453,15 @@ export abstract class AbstractPool<
         this.getWorkerInfo(
           this.getWorkerNodeKeyByWorkerId(message.workerId)
         ).taskFunctionNames = message.taskFunctionNames
-      } else if (message.taskFunctionOperation != null) {
-        // Task function operation response received from worker
       }
     }
   }
 
   private handleWorkerReadyResponse (message: MessageValue<Response>): void {
     if (message.ready === false) {
-      throw new Error(`Worker ${message.workerId} failed to initialize`)
+      throw new Error(
+        `Worker ${message.workerId as number} failed to initialize`
+      )
     }
     const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
index 17ef7e9e57adcb91967b30be12cef158d1fc37fd..20215df650c8ce6102d5ad9eaad3253a7c0ccf63 100644 (file)
@@ -73,10 +73,7 @@ export class FixedClusterPool<
     worker.on('disconnect', () => {
       worker.kill()
     })
-    await this.sendKillMessageToWorker(
-      workerNodeKey,
-      workerNode.info.id as number
-    )
+    await this.sendKillMessageToWorker(workerNodeKey)
     worker.disconnect()
     await waitWorkerExit
   }
@@ -86,7 +83,10 @@ export class FixedClusterPool<
     workerNodeKey: number,
     message: MessageValue<Data>
   ): void {
-    this.workerNodes[workerNodeKey].worker.send(message)
+    this.workerNodes[workerNodeKey].worker.send({
+      ...message,
+      workerId: this.workerNodes[workerNodeKey].info.id as number
+    })
   }
 
   /** @inheritDoc */
index a19b70a387665bf13d8c7f796390928da52ed0d8..cb7580898656ccd58a379edf24cdc55631301f34 100644 (file)
@@ -251,7 +251,7 @@ export interface IPool<
    */
   readonly addTaskFunction: (
     name: string,
-    taskFunction: TaskFunction
+    taskFunction: TaskFunction<Data, Response>
   ) => Promise<boolean>
   /**
    * Removes a task function from this pool.
index 6e234e2ea3dab6f045bfbfca84f7556afbff9f3c..51123204b8666ac4a1ef6c08b41d1ffdac1065bb 100644 (file)
@@ -67,10 +67,7 @@ export class FixedThreadPool<
         resolve()
       })
     })
-    await this.sendKillMessageToWorker(
-      workerNodeKey,
-      workerNode.info.id as number
-    )
+    await this.sendKillMessageToWorker(workerNodeKey)
     workerNode.closeChannel()
     await worker.terminate()
     await waitWorkerExit
@@ -84,7 +81,10 @@ export class FixedThreadPool<
   ): void {
     (
       this.workerNodes[workerNodeKey].messageChannel as MessageChannel
-    ).port1.postMessage(message, transferList)
+    ).port1.postMessage(
+      { ...message, workerId: this.workerNodes[workerNodeKey].info.id },
+      transferList
+    )
   }
 
   /** @inheritDoc */
index 3d5673f93111d552a4cdce0364d7c036cf3e175e..3817515a585120bb8e8c3c369dc57e1c59ac0c65 100644 (file)
@@ -72,7 +72,7 @@ export interface Task<Data = unknown> {
   /**
    * Worker id.
    */
-  readonly workerId: number
+  readonly workerId?: number
   /**
    * Task name.
    */