From 3f7d99f5a6b9783554886f1850aa45c5bc237b65 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 10 Jul 2023 20:36:14 +0200 Subject: [PATCH] feat: make IWRR strategy worker readiness aware MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- README.md | 2 +- ...ed-weighted-round-robin-worker-choice-strategy.ts | 6 ++++-- src/worker/worker-options.ts | 2 +- tests/pools/cluster/fixed.test.js | 4 +++- .../selection-strategies.test.js | 12 ++++++++++-- tests/pools/thread/fixed.test.js | 4 +++- 6 files changed, 22 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 682a356f..e76dc427 100644 --- a/README.md +++ b/README.md @@ -250,7 +250,7 @@ This method is available on both pool implementations and will call the terminat If `killBehavior` is set to `KillBehaviors.SOFT` your tasks have no timeout and your workers will not be terminated until your task is completed. Default: `60000` -- `killBehavior` (optional) - Dictates if your async unit (worker/process) will be deleted in case that a task is active on it. +- `killBehavior` (optional) - Dictates if your worker will be deleted in case that a task is active on it. **KillBehaviors.SOFT**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker **won't** be deleted. **KillBehaviors.HARD**: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker will be deleted. This option only apply to the newly created workers. diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 907bd911..26f9d909 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -80,8 +80,10 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< ) { const workerWeight = this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight - // if (this.isWorkerNodeReady(workerNodeKey) && workerWeight >= this.roundWeights[roundIndex]) { - if (workerWeight >= this.roundWeights[roundIndex]) { + if ( + this.isWorkerNodeReady(workerNodeKey) && + workerWeight >= this.roundWeights[roundIndex] + ) { roundId = roundIndex workerNodeId = workerNodeKey break diff --git a/src/worker/worker-options.ts b/src/worker/worker-options.ts index 75d3dd5b..93f56512 100644 --- a/src/worker/worker-options.ts +++ b/src/worker/worker-options.ts @@ -42,7 +42,7 @@ export interface WorkerOptions { */ async?: boolean /** - * `killBehavior` dictates if your async unit (worker/process) will be deleted in case that a task is active on it. + * `killBehavior` dictates if your worker will be deleted in case that a task is active on it. * * - SOFT: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker **won't** be deleted. * - HARD: If `currentTime - lastActiveTime` is greater than `maxInactiveTime` but a task is still executing or queued, then the worker will be deleted. diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 1d0f7b8c..c93155b0 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -87,7 +87,9 @@ describe('Fixed cluster pool test suite', () => { ) let poolReady = 0 pool1.emitter.on(PoolEvents.ready, () => ++poolReady) - await waitPoolEvents(pool1, PoolEvents.ready, 1) + if (!pool1.info.ready) { + await waitPoolEvents(pool1, PoolEvents.ready, 1) + } expect(poolReady).toBe(1) }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 6671b577..535f5f97 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -3,9 +3,11 @@ const { DynamicThreadPool, FixedClusterPool, FixedThreadPool, + PoolEvents, WorkerChoiceStrategies } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') +const { waitPoolEvents } = require('../../test-utils') describe('Selection strategies test suite', () => { const min = 0 @@ -1715,6 +1717,9 @@ describe('Selection strategies test suite', () => { WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN } ) + if (!pool.info.ready) { + await waitPoolEvents(pool, PoolEvents.ready, 1) + } // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` const promises = new Set() const maxMultiplier = 2 @@ -1785,6 +1790,9 @@ describe('Selection strategies test suite', () => { WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN } ) + if (!pool.info.ready) { + await waitPoolEvents(pool, PoolEvents.ready, 1) + } // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose` const promises = new Set() const maxMultiplier = 2 @@ -1795,7 +1803,7 @@ describe('Selection strategies test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, @@ -1831,7 +1839,7 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBe(1) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index de81cb68..636121fd 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -87,7 +87,9 @@ describe('Fixed thread pool test suite', () => { ) let poolReady = 0 pool1.emitter.on(PoolEvents.ready, () => ++poolReady) - await waitPoolEvents(pool1, PoolEvents.ready, 1) + if (!pool1.info.ready) { + await waitPoolEvents(pool1, PoolEvents.ready, 1) + } expect(poolReady).toBe(1) }) -- 2.34.1