refactor: factor out worker message handlers
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 2 Jul 2023 20:53:31 +0000 (22:53 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 2 Jul 2023 20:53:31 +0000 (22:53 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts

index 24e8a5b0c670f275a91a93dec22227fa12659d25..6b28833d71ffac9176d60eab835aa5bfaffff41e 100644 (file)
@@ -816,42 +816,53 @@ export abstract class AbstractPool<
     return message => {
       if (message.workerId != null && message.started != null) {
         // Worker started message received
-        const worker = this.getWorkerById(message.workerId)
-        if (worker != null) {
-          this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
-            message.started
-        } else {
-          throw new Error(
-            `Worker started message received from unknown worker '${message.workerId}'`
-          )
-        }
+        this.handleWorkerStartedMessage(message)
       } else if (message.id != null) {
         // Task execution response received
-        const promiseResponse = this.promiseResponseMap.get(message.id)
-        if (promiseResponse != null) {
-          if (message.taskError != null) {
-            if (this.emitter != null) {
-              this.emitter.emit(PoolEvents.taskError, message.taskError)
-            }
-            promiseResponse.reject(message.taskError.message)
-          } else {
-            promiseResponse.resolve(message.data as Response)
-          }
-          this.afterTaskExecutionHook(promiseResponse.worker, message)
-          this.promiseResponseMap.delete(message.id)
-          const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
-          if (
-            this.opts.enableTasksQueue === true &&
-            this.tasksQueueSize(workerNodeKey) > 0
-          ) {
-            this.executeTask(
-              workerNodeKey,
-              this.dequeueTask(workerNodeKey) as Task<Data>
-            )
-          }
-          this.workerChoiceStrategyContext.update(workerNodeKey)
+        this.handleTaskExecutionResponse(message)
+      }
+    }
+  }
+
+  private handleWorkerStartedMessage (message: MessageValue<Response>): void {
+    // Worker started message received
+    const worker = this.getWorkerById(message.workerId as number)
+    if (worker != null) {
+      this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+        message.started as boolean
+    } else {
+      throw new Error(
+        `Worker started message received from unknown worker '${
+          message.workerId as number
+        }'`
+      )
+    }
+  }
+
+  private handleTaskExecutionResponse (message: MessageValue<Response>): void {
+    const promiseResponse = this.promiseResponseMap.get(message.id as string)
+    if (promiseResponse != null) {
+      if (message.taskError != null) {
+        if (this.emitter != null) {
+          this.emitter.emit(PoolEvents.taskError, message.taskError)
         }
+        promiseResponse.reject(message.taskError.message)
+      } else {
+        promiseResponse.resolve(message.data as Response)
+      }
+      this.afterTaskExecutionHook(promiseResponse.worker, message)
+      this.promiseResponseMap.delete(message.id as string)
+      const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
+      if (
+        this.opts.enableTasksQueue === true &&
+        this.tasksQueueSize(workerNodeKey) > 0
+      ) {
+        this.executeTask(
+          workerNodeKey,
+          this.dequeueTask(workerNodeKey) as Task<Data>
+        )
       }
+      this.workerChoiceStrategyContext.update(workerNodeKey)
     }
   }