fix: ensure worker started message cannot be received for non existing
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 30 Jun 2023 22:55:35 +0000 (00:55 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 30 Jun 2023 22:55:35 +0000 (00:55 +0200)
worker

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts

index 6fde2491b6e1a565ba48ce26d821407eff67c8a2..ba05d92959a83f362fd721b4ca3615e66c86d196 100644 (file)
@@ -19,7 +19,8 @@ import {
   type PoolType,
   PoolTypes,
   type TasksQueueOptions,
-  type WorkerType
+  type WorkerType,
+  WorkerTypes
 } from './pool'
 import type {
   IWorker,
@@ -784,9 +785,13 @@ export abstract class AbstractPool<
     return message => {
       if (message.workerId != null && message.started != null) {
         // Worker started message received
-        this.workerNodes[
-          this.getWorkerNodeKey(this.getWorkerById(message.workerId) as Worker)
-        ].info.started = message.started
+        const worker = this.getWorkerById(message.workerId)
+        if (worker != null) {
+          this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
+            message.started
+        } else {
+          throw new Error('Worker started message received from unknown worker')
+        }
       } else if (message.id != null) {
         // Task execution response received
         const promiseResponse = this.promiseResponseMap.get(message.id)
@@ -850,7 +855,7 @@ export abstract class AbstractPool<
   private pushWorkerNode (worker: Worker): number {
     this.workerNodes.push({
       worker,
-      info: { id: worker.threadId ?? worker.id, started: false },
+      info: { id: this.getWorkerId(worker), started: false },
       usage: this.getWorkerUsage(),
       tasksQueue: new Queue<Task<Data>>()
     })
@@ -862,6 +867,20 @@ export abstract class AbstractPool<
     return this.workerNodes.length
   }
 
+  /**
+   * Gets the worker id.
+   *
+   * @param worker - The worker.
+   * @returns The worker id.
+   */
+  private getWorkerId (worker: Worker): number | undefined {
+    if (this.worker === WorkerTypes.thread) {
+      return worker.threadId
+    } else if (this.worker === WorkerTypes.cluster) {
+      return worker.id
+    }
+  }
+
   // /**
   //  * Sets the given worker in the pool worker nodes.
   //  *
index c92478d4a8436512853384f65ccce026f6528a19..e0636d9920db6ddf4e5d7a20ea412cfbee612b04 100644 (file)
@@ -125,7 +125,7 @@ export interface TaskStatistics {
  */
 export interface WorkerInfo {
   /**
-   * Worker Id.
+   * Worker id.
    */
   id: number | undefined
   /**
@@ -163,7 +163,7 @@ export interface WorkerUsage {
  */
 export interface IWorker {
   /**
-   * Worker Id.
+   * Worker id.
    */
   id?: number
   threadId?: number
index 1f724274150f401833df2dec47750528235a578e..0443b4ceef2374e28ba8201bf5d1d6a61ebbce93 100644 (file)
@@ -54,7 +54,7 @@ export interface WorkerStatistics {
 export interface MessageValue<Data = unknown, ErrorData = unknown>
   extends Task<Data> {
   /**
-   * Worker Id.
+   * Worker id.
    */
   readonly workerId?: number
   /**
index 242c46982236357626ae50fae5f3a33d7c3f04eb..0646a9bfa0e423bd466ccc16efeba8b4782067c7 100644 (file)
@@ -37,7 +37,7 @@ export abstract class AbstractWorker<
   Response = unknown
 > extends AsyncResource {
   /**
-   * Worker Id.
+   * Worker id.
    */
   protected abstract id: number
   /**