fix: refine pool statuses semantic
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Jul 2023 23:08:36 +0000 (01:08 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Jul 2023 23:08:36 +0000 (01:08 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/utility-types.ts
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/fixed.test.js

index f1392edd5124e25421531fec9958a2fea610bf59..10a5ce42af8d58e183761f454069d00f6b76a593 100644 (file)
@@ -411,17 +411,16 @@ export abstract class AbstractPool<
   }
 
   private get starting (): boolean {
-    return (
-      this.workerNodes.length < this.minSize ||
-      (this.workerNodes.length >= this.minSize &&
-        this.workerNodes.some(workerNode => !workerNode.info.ready))
-    )
+    return this.workerNodes.length < this.minSize
   }
 
   private get ready (): boolean {
     return (
       this.workerNodes.length >= this.minSize &&
-      this.workerNodes.every(workerNode => workerNode.info.ready)
+      this.workerNodes.every(
+        (workerNode, workerNodeKey) =>
+          workerNodeKey < this.minSize && workerNode.info.ready
+      )
     )
   }
 
@@ -980,6 +979,7 @@ export abstract class AbstractPool<
       }
     })
     const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
+    workerInfo.ready = true
     workerInfo.dynamic = true
     this.sendToWorker(worker, {
       checkAlive: true,
@@ -1011,12 +1011,16 @@ export abstract class AbstractPool<
     // Listen to worker messages.
     this.registerWorkerMessageListener(worker, this.workerListener())
     // Send startup message to worker.
+    this.sendWorkerStartupMessage(worker)
+    // Setup worker task statistics computation.
+    this.setWorkerStatistics(worker)
+  }
+
+  private sendWorkerStartupMessage (worker: Worker): void {
     this.sendToWorker(worker, {
       ready: false,
       workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
     })
-    // Setup worker task statistics computation.
-    this.setWorkerStatistics(worker)
   }
 
   private redistributeQueuedTasks (workerNodeKey: number): void {
@@ -1057,7 +1061,7 @@ export abstract class AbstractPool<
   protected workerListener (): (message: MessageValue<Response>) => void {
     return message => {
       this.checkMessageWorkerId(message)
-      if (message.ready != null && message.workerId != null) {
+      if (message.ready != null) {
         // Worker ready message received
         this.handleWorkerReadyMessage(message)
       } else if (message.id != null) {
@@ -1130,6 +1134,10 @@ export abstract class AbstractPool<
    * @returns The worker nodes length.
    */
   private pushWorkerNode (worker: Worker): number {
+    const workerNode = new WorkerNode(worker, this.worker)
+    if (this.starting) {
+      workerNode.info.ready = true
+    }
     return this.workerNodes.push(new WorkerNode(worker, this.worker))
   }
 
index 28bf19e256abaef2f619051773ad55456643b842..135dcefba0cde30cc76aa2a2d10bba89eefb14bc 100644 (file)
@@ -134,11 +134,11 @@ export interface PromiseResponseWrapper<
   /**
    * Resolve callback to fulfill the promise.
    */
-  readonly resolve: (value: Response) => void
+  readonly resolve: (value: Response | PromiseLike<Response>) => void
   /**
    * Reject callback to reject the promise.
    */
-  readonly reject: (reason?: string) => void
+  readonly reject: (reason?: unknown) => void
   /**
    * The worker handling the execution.
    */
index c93155b0bfbb3e8e8a29a579928e38b74363146e..1d0f7b8c29fc2992f7415fc71cb8ad53e58bc1a9 100644 (file)
@@ -87,9 +87,7 @@ describe('Fixed cluster pool test suite', () => {
     )
     let poolReady = 0
     pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
-    if (!pool1.info.ready) {
-      await waitPoolEvents(pool1, PoolEvents.ready, 1)
-    }
+    await waitPoolEvents(pool1, PoolEvents.ready, 1)
     expect(poolReady).toBe(1)
   })
 
index 535f5f974559ab7748ac885c476f064084e148e6..8894ba3126102faef6ab1dbea23ef20676c21a37 100644 (file)
@@ -1717,9 +1717,8 @@ describe('Selection strategies test suite', () => {
           WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
       }
     )
-    if (!pool.info.ready) {
-      await waitPoolEvents(pool, PoolEvents.ready, 1)
-    }
+    // FIXME: shall not be needed
+    await waitPoolEvents(pool, PoolEvents.ready, 1)
     // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
     const promises = new Set()
     const maxMultiplier = 2
@@ -1790,9 +1789,6 @@ describe('Selection strategies test suite', () => {
           WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
       }
     )
-    if (!pool.info.ready) {
-      await waitPoolEvents(pool, PoolEvents.ready, 1)
-    }
     // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
     const promises = new Set()
     const maxMultiplier = 2
@@ -1803,7 +1799,7 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: expect.any(Number),
+          executed: maxMultiplier,
           executing: 0,
           queued: 0,
           maxQueued: 0,
@@ -1839,7 +1835,7 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
       ).nextWorkerNodeKey
-    ).toBe(1)
+    ).toBe(0)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
index 636121fd4812eb8705bb2aed57119e8b84f8bcfb..de81cb68c304e8ef703ff4bf819c4e5b20829237 100644 (file)
@@ -87,9 +87,7 @@ describe('Fixed thread pool test suite', () => {
     )
     let poolReady = 0
     pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
-    if (!pool1.info.ready) {
-      await waitPoolEvents(pool1, PoolEvents.ready, 1)
-    }
+    await waitPoolEvents(pool1, PoolEvents.ready, 1)
     expect(poolReady).toBe(1)
   })