fix: fix race condition between ready and task functions message at
authorJérôme Benoit <jerome.benoit@sap.com>
Thu, 17 Aug 2023 17:23:50 +0000 (19:23 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Thu, 17 Aug 2023 17:23:50 +0000 (19:23 +0200)
startup

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/worker/cluster-worker.ts
src/worker/thread-worker.ts

index e7c27cdae8a881392aa3cb4c51e8ab0386365f99..f89af5acf813afc18eeaaa1b9277039c6692e480 100644 (file)
@@ -678,12 +678,11 @@ export abstract class AbstractPool<
       }
       const timestamp = performance.now()
       const workerNodeKey = this.chooseWorkerNode()
+      const workerInfo = this.getWorkerInfo(workerNodeKey)
       if (
         name != null &&
-        Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
-        !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes(
-          name
-        )
+        Array.isArray(workerInfo.taskFunctions) &&
+        !workerInfo.taskFunctions.includes(name)
       ) {
         reject(
           new Error(`Task function '${name}' is not registered in the pool`)
@@ -695,7 +694,7 @@ export abstract class AbstractPool<
         data: data ?? ({} as Data),
         transferList,
         timestamp,
-        workerId: this.getWorkerInfo(workerNodeKey).id as number,
+        workerId: workerInfo.id as number,
         taskId: randomUUID()
       }
       this.promiseResponseMap.set(task.taskId as string, {
@@ -816,9 +815,10 @@ export abstract class AbstractPool<
   }
 
   private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean {
+    const workerInfo = this.getWorkerInfo(workerNodeKey)
     return (
-      Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) &&
-      (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1
+      Array.isArray(workerInfo.taskFunctions) &&
+      workerInfo.taskFunctions.length > 1
     )
   }
 
@@ -1125,7 +1125,7 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return (message) => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null) {
+      if (message.ready != null && message.taskFunctions != null) {
         // Worker ready response received from worker
         this.handleWorkerReadyResponse(message)
       } else if (message.taskId != null) {
@@ -1144,9 +1144,11 @@ export abstract class AbstractPool<
     if (message.ready === false) {
       throw new Error(`Worker ${message.workerId} failed to initialize`)
     }
-    this.getWorkerInfo(
+    const workerInfo = this.getWorkerInfo(
       this.getWorkerNodeKeyByWorkerId(message.workerId)
-    ).ready = message.ready as boolean
+    )
+    workerInfo.ready = message.ready as boolean
+    workerInfo.taskFunctions = message.taskFunctions
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
@@ -1217,7 +1219,7 @@ export abstract class AbstractPool<
     this.workerNodes.push(workerNode)
     const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
     if (workerNodeKey === -1) {
-      throw new Error('Worker node not found')
+      throw new Error('Worker node added not found')
     }
     return workerNodeKey
   }
index 4349ed72471b3614e227ab4d374df7f0aba17186..292b39dfa15352e9cf113919a2dc1485226faa73 100644 (file)
@@ -88,9 +88,9 @@ implements IWorkerNode<Worker, Data> {
 
   /** @inheritdoc */
   public getTaskWorkerUsage (name: string): WorkerUsage | undefined {
-    if (name === DEFAULT_TASK_NAME && !Array.isArray(this.info.taskFunctions)) {
+    if (!Array.isArray(this.info.taskFunctions)) {
       throw new Error(
-        'Cannot get task worker usage for default task function name when task function names list is not yet defined'
+        `Cannot get task worker usage for task function name '${name}' when task function names list is not yet defined`
       )
     }
     if (
index 5ac90fbf1783e9e04a7e820e37c961e8527385ff..1de3cf09b6932053605efd6cc4719bfbe8bcf14c 100644 (file)
@@ -247,7 +247,7 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown> {
    */
   readonly resetUsage: () => void
   /**
-   * Close communication channel.
+   * Closes communication channel.
    */
   readonly closeChannel: () => void
   /**
index e5bcb72781680fdcd9c0a3a505ae587947e35609..35899e794f5b373aa5bca3ec5b3dbe2d1f5cd4c3 100644 (file)
@@ -46,10 +46,17 @@ export class ClusterWorker<
     if (message.workerId === this.id && message.ready === false) {
       try {
         this.getMainWorker()?.on('message', this.messageListener.bind(this))
-        this.sendTaskFunctionsListToMainWorker()
-        this.sendToMainWorker({ ready: true, workerId: this.id })
+        this.sendToMainWorker({
+          ready: true,
+          taskFunctions: this.listTaskFunctions(),
+          workerId: this.id
+        })
       } catch {
-        this.sendToMainWorker({ ready: false, workerId: this.id })
+        this.sendToMainWorker({
+          ready: false,
+          taskFunctions: this.listTaskFunctions(),
+          workerId: this.id
+        })
       }
     }
   }
index 6f36664b8ae1a56cebb5058d7d4c56885b5ae88f..8d30ef21a8874dcfb6802e2436a7d6033a4fa8ec 100644 (file)
@@ -60,10 +60,17 @@ export class ThreadWorker<
       try {
         this.port = message.port
         this.port.on('message', this.messageListener.bind(this))
-        this.sendTaskFunctionsListToMainWorker()
-        this.sendToMainWorker({ ready: true, workerId: this.id })
+        this.sendToMainWorker({
+          ready: true,
+          taskFunctions: this.listTaskFunctions(),
+          workerId: this.id
+        })
       } catch {
-        this.sendToMainWorker({ ready: false, workerId: this.id })
+        this.sendToMainWorker({
+          ready: false,
+          taskFunctions: this.listTaskFunctions(),
+          workerId: this.id
+        })
       }
     }
   }