feat: internal messaging strict worker id checking
authorJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Jul 2023 20:42:41 +0000 (22:42 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sat, 8 Jul 2023 20:42:41 +0000 (22:42 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/pools/worker.ts
src/utility-types.ts
src/worker/abstract-worker.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/thread/fixed.test.js

index 6c9a0fd185f7c560f4f7fa5ac597ab784f6d6dbd..687ae25f7f43154b484fafc27bb27ea7102b3961 100644 (file)
@@ -162,6 +162,10 @@ export abstract class AbstractPool<
       throw new RangeError(
         'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
       )
+    } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) {
+      throw new RangeError(
+        'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+      )
     } else if (this.type === PoolTypes.dynamic && min === max) {
       throw new RangeError(
         'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
@@ -462,6 +466,17 @@ export abstract class AbstractPool<
       ?.worker
   }
 
+  private checkMessageWorkerId (message: MessageValue<Response>): void {
+    if (
+      message.workerId != null &&
+      this.getWorkerById(message.workerId) == null
+    ) {
+      throw new Error(
+        `Worker message received from unknown worker '${message.workerId}'`
+      )
+    }
+  }
+
   /**
    * Gets the given worker its worker node key.
    *
@@ -573,6 +588,7 @@ export abstract class AbstractPool<
       // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
       data: data ?? ({} as Data),
       timestamp,
+      workerId: this.getWorkerInfo(workerNodeKey).id as number,
       id: randomUUID()
     }
     const res = new Promise<Response>((resolve, reject) => {
@@ -894,7 +910,7 @@ export abstract class AbstractPool<
     // Send startup message to worker.
     this.sendToWorker(worker, {
       ready: false,
-      workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
+      workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
     })
     // Setup worker task statistics computation.
     this.setWorkerStatistics(worker)
@@ -988,8 +1004,12 @@ export abstract class AbstractPool<
         void (this.destroyWorker(worker) as Promise<void>)
       }
     })
-    this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
-    this.sendToWorker(worker, { checkAlive: true })
+    const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+    workerInfo.dynamic = true
+    this.sendToWorker(worker, {
+      checkAlive: true,
+      workerId: workerInfo.id as number
+    })
     return worker
   }
 
@@ -1000,6 +1020,7 @@ export abstract class AbstractPool<
    */
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
+      this.checkMessageWorkerId(message)
       if (message.ready != null && message.workerId != null) {
         // Worker ready message received
         this.handleWorkerReadyMessage(message)
@@ -1011,17 +1032,9 @@ export abstract class AbstractPool<
   }
 
   private handleWorkerReadyMessage (message: MessageValue<Response>): void {
-    const worker = this.getWorkerById(message.workerId as number)
-    if (worker != null) {
-      this.getWorkerInfo(this.getWorkerNodeKey(worker)).ready =
-        message.ready as boolean
-    } else {
-      throw new Error(
-        `Worker ready message received from unknown worker '${
-          message.workerId as number
-        }'`
-      )
-    }
+    const worker = this.getWorkerById(message.workerId)
+    this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
+      message.ready as boolean
     if (this.emitter != null && this.ready) {
       this.emitter.emit(PoolEvents.ready, this.info)
     }
@@ -1138,7 +1151,8 @@ export abstract class AbstractPool<
             .runTime.aggregate,
         elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
           .elu.aggregate
-      }
+      },
+      workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
     })
   }
 }
index 3c69ec2f74318188d1a2b8ac1dcd40b59369aa80..6e9b4b07113bac62000a84ae0a2d8aa05b333532 100644 (file)
@@ -61,7 +61,7 @@ export class FixedClusterPool<
 
   /** @inheritDoc */
   protected destroyWorker (worker: Worker): void {
-    this.sendToWorker(worker, { kill: true })
+    this.sendToWorker(worker, { kill: true, workerId: worker.id })
     worker.on('disconnect', () => {
       worker.kill()
     })
index 29f008806663aea87b837cbf412a5ed561d1974e..c9145f571186a9d5783531236414aff7eadd463e 100644 (file)
@@ -55,7 +55,7 @@ export class FixedThreadPool<
 
   /** @inheritDoc */
   protected async destroyWorker (worker: Worker): Promise<void> {
-    this.sendToWorker(worker, { kill: true })
+    this.sendToWorker(worker, { kill: true, workerId: worker.threadId })
     await worker.terminate()
   }
 
index 5b427a5620d93740d642625e30d69f2bfd3cba0a..c6303221c9d2da4badc9af17ac6e8a80fda27d98 100644 (file)
@@ -36,6 +36,10 @@ export type ExitHandler<Worker extends IWorker> = (
  * @internal
  */
 export interface Task<Data = unknown> {
+  /**
+   * Worker id.
+   */
+  readonly workerId: number
   /**
    * Task name.
    */
index f1b773eb31196c5ca827759bbbf3f70a37fc1ae9..a8da491e16fe62143d1d716672c4b4db44aa8e9f 100644 (file)
@@ -8,10 +8,6 @@ import type { IWorker, Task } from './pools/worker'
  * @typeParam Data - Type of data sent to the worker triggering an error. This can only be structured-cloneable data.
  */
 export interface TaskError<Data = unknown> {
-  /**
-   * Worker id.
-   */
-  readonly workerId: number
   /**
    * Error message.
    */
@@ -61,10 +57,6 @@ export interface WorkerStatistics {
  */
 export interface MessageValue<Data = unknown, ErrorData = unknown>
   extends Task<Data> {
-  /**
-   * Worker id.
-   */
-  readonly workerId?: number
   /**
    * Kill code.
    */
index 4e92be01ae340da26d4699595ff692566075f006..8122f909af89d93e76d5a0c1730e19323b3391da 100644 (file)
@@ -148,13 +148,17 @@ export abstract class AbstractWorker<
     if (message.ready != null && message.workerId === this.id) {
       // Startup message received
       this.workerReady()
-    } else if (message.statistics != null) {
+    } else if (message.statistics != null && message.workerId === this.id) {
       // Statistics message received
       this.statistics = message.statistics
-    } else if (message.checkAlive != null) {
+    } else if (message.checkAlive != null && message.workerId === this.id) {
       // Check alive message received
       message.checkAlive ? this.startCheckAlive() : this.stopCheckAlive()
-    } else if (message.id != null && message.data != null) {
+    } else if (
+      message.id != null &&
+      message.data != null &&
+      message.workerId === this.id
+    ) {
       // Task message received
       const fn = this.getTaskFunction(message.name)
       if (isAsyncFunction(fn)) {
@@ -162,7 +166,7 @@ export abstract class AbstractWorker<
       } else {
         this.runInAsyncScope(this.runSync.bind(this), this, fn, message)
       }
-    } else if (message.kill === true) {
+    } else if (message.kill === true && message.workerId === this.id) {
       // Kill message received
       this.stopCheckAlive()
       this.emitDestroy()
@@ -203,7 +207,7 @@ export abstract class AbstractWorker<
       performance.now() - this.lastTaskTimestamp >
       (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
     ) {
-      this.sendToMainWorker({ kill: this.opts.killBehavior })
+      this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
     }
   }
 
@@ -262,10 +266,10 @@ export abstract class AbstractWorker<
       const errorMessage = this.handleError(e as Error | string)
       this.sendToMainWorker({
         taskError: {
-          workerId: this.id,
           message: errorMessage,
           data: message.data
         },
+        workerId: this.id,
         id: message.id
       })
     } finally {
@@ -301,10 +305,10 @@ export abstract class AbstractWorker<
         const errorMessage = this.handleError(e as Error | string)
         this.sendToMainWorker({
           taskError: {
-            workerId: this.id,
             message: errorMessage,
             data: message.data
           },
+          workerId: this.id,
           id: message.id
         })
       })
index caec34fd47a0ae267f9c6b5dcfe7648a352e1a2a..6c340ed9f089ea6f9fd5deeb126480dc0e0ab3df 100644 (file)
@@ -16,13 +16,6 @@ const { waitPoolEvents } = require('../../test-utils')
 
 describe('Abstract pool test suite', () => {
   const numberOfWorkers = 2
-  class StubPoolWithRemoveAllWorker extends FixedThreadPool {
-    removeAllWorker () {
-      this.workerNodes = []
-      this.promiseResponseMap.clear()
-      this.handleWorkerReadyMessage = () => {}
-    }
-  }
   class StubPoolWithIsMain extends FixedThreadPool {
     isMain () {
       return false
@@ -99,6 +92,14 @@ describe('Abstract pool test suite', () => {
         'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
       )
     )
+    expect(
+      () =>
+        new DynamicThreadPool(0, 0, './tests/worker-files/thread/testWorker.js')
+    ).toThrowError(
+      new RangeError(
+        'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
+      )
+    )
   })
 
   it('Verify that pool options are checked', async () => {
@@ -457,21 +458,6 @@ describe('Abstract pool test suite', () => {
     await pool.destroy()
   })
 
-  it('Simulate worker not found', async () => {
-    const pool = new StubPoolWithRemoveAllWorker(
-      numberOfWorkers,
-      './tests/worker-files/thread/testWorker.js',
-      {
-        errorHandler: e => console.error(e)
-      }
-    )
-    expect(pool.workerNodes.length).toBe(numberOfWorkers)
-    // Simulate worker not found.
-    pool.removeAllWorker()
-    expect(pool.workerNodes.length).toBe(0)
-    await pool.destroy()
-  })
-
   it('Verify that pool worker tasks usage are initialized', async () => {
     const pool = new FixedClusterPool(
       numberOfWorkers,
index e0ad0ccce3078a947473438921efb3464640156d..f9423e731cbd93a20417b2124d7f4c4f4c8b783d 100644 (file)
@@ -147,7 +147,6 @@ describe('Fixed cluster pool test suite', () => {
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker')
     expect(taskError).toStrictEqual({
-      workerId: expect.any(Number),
       message: 'Error Message from ClusterWorker',
       data
     })
@@ -174,7 +173,6 @@ describe('Fixed cluster pool test suite', () => {
     expect(typeof inError === 'string').toBe(true)
     expect(inError).toBe('Error Message from ClusterWorker:async')
     expect(taskError).toStrictEqual({
-      workerId: expect.any(Number),
       message: 'Error Message from ClusterWorker:async',
       data
     })
index 4ffc354771f49e3636172a8dc7d16f65e8a9ed02..829c8ade42d48ca2938a58c38323f1f7c9298bf8 100644 (file)
@@ -149,7 +149,6 @@ describe('Fixed thread pool test suite', () => {
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker')
     expect(taskError).toStrictEqual({
-      workerId: expect.any(Number),
       message: new Error('Error Message from ThreadWorker'),
       data
     })
@@ -178,7 +177,6 @@ describe('Fixed thread pool test suite', () => {
     expect(typeof inError.message === 'string').toBe(true)
     expect(inError.message).toBe('Error Message from ThreadWorker:async')
     expect(taskError).toStrictEqual({
-      workerId: expect.any(Number),
       message: new Error('Error Message from ThreadWorker:async'),
       data
     })