feat: make IWRR strategy worker readiness aware
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Jul 2023 18:36:14 +0000 (20:36 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 10 Jul 2023 18:36:14 +0000 (20:36 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
README.md
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/worker/worker-options.ts
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/fixed.test.js

index 682a356f75654ed6eb4d8986ed5d4275c4e441de..e76dc427453b2062645d34f992a098e2093c3952 100644 (file)
--- a/README.md
+++ b/README.md
@@ -250,7 +250,7 @@ This method is available on both pool implementations and will call the terminat
   If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed.  
   Default: `60000`
 
-- `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it.  
+- `killBehavior` (optional) - Dictates if your worker will be deleted in case that a task is active on it.  
   **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker **won't** be deleted.  
   **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker will be deleted.  
   This option only apply to the newly created workers.  
index 907bd911d6e607966bfddc2cd02087c11cdab44f..26f9d909eed84f448e0244042d23dc289a26b96a 100644 (file)
@@ -80,8 +80,10 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
       ) {
         const workerWeight =
           this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
-        // if (this.isWorkerNodeReady(workerNodeKey) && workerWeight >= this.roundWeights[roundIndex]) {
-        if (workerWeight >= this.roundWeights[roundIndex]) {
+        if (
+          this.isWorkerNodeReady(workerNodeKey) &&
+          workerWeight >= this.roundWeights[roundIndex]
+        ) {
           roundId = roundIndex
           workerNodeId = workerNodeKey
           break
index 75d3dd5b98bf1d8e633d3b58c0190cd3c1f20f93..93f56512a42ec02587d3aa21170b36a04698bee4 100644 (file)
@@ -42,7 +42,7 @@ export interface WorkerOptions {
    */
   async?: boolean
   /**
-   * `killBehavior` dictates if your async unit (worker/process) will be deleted in case that a task is active on it.
+   * `killBehavior` dictates if your worker will be deleted in case that a task is active on it.
    *
    * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker **won't** be deleted.
    * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker will be deleted.
index 1d0f7b8c29fc2992f7415fc71cb8ad53e58bc1a9..c93155b0bfbb3e8e8a29a579928e38b74363146e 100644 (file)
@@ -87,7 +87,9 @@ describe('Fixed cluster pool test suite', () => {
     )
     let poolReady = 0
     pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
-    await waitPoolEvents(pool1, PoolEvents.ready, 1)
+    if (!pool1.info.ready) {
+      await waitPoolEvents(pool1, PoolEvents.ready, 1)
+    }
     expect(poolReady).toBe(1)
   })
 
index 6671b57781b1f51c15bdf0717f3c96cafcf0ca76..535f5f974559ab7748ac885c476f064084e148e6 100644 (file)
@@ -3,9 +3,11 @@ const {
   DynamicThreadPool,
   FixedClusterPool,
   FixedThreadPool,
+  PoolEvents,
   WorkerChoiceStrategies
 } = require('../../../lib')
 const { CircularArray } = require('../../../lib/circular-array')
+const { waitPoolEvents } = require('../../test-utils')
 
 describe('Selection strategies test suite', () => {
   const min = 0
@@ -1715,6 +1717,9 @@ describe('Selection strategies test suite', () => {
           WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
       }
     )
+    if (!pool.info.ready) {
+      await waitPoolEvents(pool, PoolEvents.ready, 1)
+    }
     // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
     const promises = new Set()
     const maxMultiplier = 2
@@ -1785,6 +1790,9 @@ describe('Selection strategies test suite', () => {
           WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
       }
     )
+    if (!pool.info.ready) {
+      await waitPoolEvents(pool, PoolEvents.ready, 1)
+    }
     // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
     const promises = new Set()
     const maxMultiplier = 2
@@ -1795,7 +1803,7 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           maxQueued: 0,
@@ -1831,7 +1839,7 @@ describe('Selection strategies test suite', () => {
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
       ).nextWorkerNodeKey
-    ).toBe(0)
+    ).toBe(1)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
index de81cb68c304e8ef703ff4bf819c4e5b20829237..636121fd4812eb8705bb2aed57119e8b84f8bcfb 100644 (file)
@@ -87,7 +87,9 @@ describe('Fixed thread pool test suite', () => {
     )
     let poolReady = 0
     pool1.emitter.on(PoolEvents.ready, () => ++poolReady)
-    await waitPoolEvents(pool1, PoolEvents.ready, 1)
+    if (!pool1.info.ready) {
+      await waitPoolEvents(pool1, PoolEvents.ready, 1)
+    }
     expect(poolReady).toBe(1)
   })