From 619f403be01e2954cef4c490f76240b192f16276 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Wed, 30 Aug 2023 12:47:19 +0200 Subject: [PATCH] feat: improve IWRR implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 + docs/worker-choice-strategies.md | 6 +- .../abstract-worker-choice-strategy.ts | 2 +- ...hted-round-robin-worker-choice-strategy.ts | 91 ++++++--- ...hted-round-robin-worker-choice-strategy.ts | 5 +- .../selection-strategies.test.js | 178 ++++++++++-------- 6 files changed, 170 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b28d6eb6..286623fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bundle typescript types declaration into one file. +### Changed + +- Improve interleaved weighted round robin worker choice strategy implementation. + ## [2.6.37] - 2023-08-28 ### Fixed diff --git a/docs/worker-choice-strategies.md b/docs/worker-choice-strategies.md index b26819a4..49f7c65c 100644 --- a/docs/worker-choice-strategies.md +++ b/docs/worker-choice-strategies.md @@ -20,12 +20,12 @@ By default, the strategy uses the simple moving average task execution time for ### Weighted round robin -The worker weights are maximum tasks execution time, once the worker has reached its maximum tasks execution time, the next task is assigned to the next worker. The default worker weight is the same for each and computed given the CPU cores speed and theirs numbers. +The worker weights are maximum tasks execution time. Once the worker has reached its maximum tasks execution time, the next task is assigned to the next worker. The default worker weight is the same for each and computed given the CPU cores speed and theirs numbers. -### Interleaved weighted round robin +### Interleaved weighted round robin (experimental) The worker weights are maximum tasks execution time. The rounds are the deduplicated worker weights. -During a round, if the worker weight is superior or equal to the current round weight, the task is assigned to the worker. Once all workers weight have been tested, the next round starts. +During a round, if the worker weight is superior or equal to the current round weight and its tasks execution time is inferior or equal to the current round weight, the task is assigned to the worker. Once all workers weight have been tested, the next round starts. The default worker weights is the same for each and computed given the CPU cores speed and theirs numbers. So the default 'rounds' consists of a unique worker weight. ## Statistics diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 2fba4e9f..cd8736c2 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -197,7 +197,7 @@ export abstract class AbstractWorkerChoiceStrategy< /** * Check the next worker node eligibility. * - * @param chosenNextWorkerNodeKey - The chosen worker node key. + * @param chosenNextWorkerNodeKey - The chosen next worker node key. */ protected checkNextWorkerNodeEligibility ( chosenNextWorkerNodeKey: number | undefined diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index e3754382..ca5b7db9 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -1,9 +1,13 @@ import type { IWorker } from '../worker' import type { IPool } from '../pool' -import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' +import { + DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, + DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS +} from '../../utils' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, + TaskStatisticsRequirements, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -21,19 +25,37 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< > extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { + /** @inheritDoc */ + public readonly taskStatisticsRequirements: TaskStatisticsRequirements = { + runTime: { + aggregate: true, + average: true, + median: false + }, + waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, + elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS + } + /** * Round id. - * This is used to determine the current round weight. */ private roundId: number = 0 + /** + * Default worker weight. + */ + private readonly defaultWorkerWeight: number /** * Round weights. */ private roundWeights: number[] /** - * Default worker weight. + * Worker node id. */ - private readonly defaultWorkerWeight: number + private workerNodeId: number = 0 + /** + * Worker virtual task runtime. + */ + private workerVirtualTaskRunTime: number = 0 /** @inheritDoc */ public constructor ( @@ -50,6 +72,8 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< public reset (): boolean { this.resetWorkerNodeKeyProperties() this.roundId = 0 + this.workerNodeId = 0 + this.workerVirtualTaskRunTime = 0 return true } @@ -60,47 +84,59 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number | undefined { - let roundId!: number - let workerNodeId: number | undefined for ( let roundIndex = this.roundId; roundIndex < this.roundWeights.length; roundIndex++ ) { - roundId = roundIndex + this.roundId = roundIndex for ( - let workerNodeKey = - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey; + let workerNodeKey = this.workerNodeId; workerNodeKey < this.pool.workerNodes.length; workerNodeKey++ ) { + this.workerNodeId = workerNodeKey if (!this.isWorkerNodeEligible(workerNodeKey)) { continue } + if ( + this.workerNodeId !== this.nextWorkerNodeKey && + this.workerVirtualTaskRunTime !== 0 + ) { + this.workerVirtualTaskRunTime = 0 + } const workerWeight = this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight - if (workerWeight >= this.roundWeights[roundIndex]) { - workerNodeId = workerNodeKey - break + if ( + workerWeight >= this.roundWeights[roundIndex] && + this.workerVirtualTaskRunTime < workerWeight + ) { + this.workerVirtualTaskRunTime = + this.workerVirtualTaskRunTime + + this.getWorkerTaskRunTime(workerNodeKey) + this.previousWorkerNodeKey = + this.nextWorkerNodeKey ?? this.previousWorkerNodeKey + this.nextWorkerNodeKey = workerNodeKey + return this.nextWorkerNodeKey } } } - this.roundId = roundId - if (workerNodeId == null) { - this.previousWorkerNodeKey = - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey - } - this.nextWorkerNodeKey = workerNodeId - const chosenWorkerNodeKey = this.nextWorkerNodeKey - if (this.nextWorkerNodeKey === this.pool.workerNodes.length - 1) { - this.nextWorkerNodeKey = 0 - this.roundId = - this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1 + this.interleavedWeightedRoundRobinNextWorkerNodeId() + } + + private interleavedWeightedRoundRobinNextWorkerNodeId (): void { + if ( + this.roundId === this.roundWeights.length - 1 && + this.workerNodeId === this.pool.workerNodes.length - 1 + ) { + this.roundId = 0 + this.workerNodeId = 0 + } else if (this.workerNodeId === this.pool.workerNodes.length - 1) { + this.roundId = this.roundId + 1 + this.workerNodeId = 0 } else { - this.nextWorkerNodeKey = - (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 + this.workerNodeId = this.workerNodeId + 1 } - return chosenWorkerNodeKey } /** @inheritDoc */ @@ -109,10 +145,11 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< if (this.pool.workerNodes.length === 0) { this.nextWorkerNodeKey = 0 } else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) { - this.nextWorkerNodeKey = this.pool.workerNodes.length - 1 this.roundId = this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1 + this.nextWorkerNodeKey = this.pool.workerNodes.length - 1 } + this.workerVirtualTaskRunTime = 0 } return true } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 46575b80..034c88e6 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -90,14 +90,13 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } private weightedRoundRobinNextWorkerNodeKey (): number | undefined { - const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime const workerWeight = this.opts.weights?.[ this.nextWorkerNodeKey ?? this.previousWorkerNodeKey ] ?? this.defaultWorkerWeight - if (workerVirtualTaskRunTime < workerWeight) { + if (this.workerVirtualTaskRunTime < workerWeight) { this.workerVirtualTaskRunTime = - workerVirtualTaskRunTime + + this.workerVirtualTaskRunTime + this.getWorkerTaskRunTime( this.nextWorkerNodeKey ?? this.previousWorkerNodeKey ) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 253238ba..6cc68a8b 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -675,7 +675,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -684,18 +684,18 @@ describe('Selection strategies test suite', () => { stolen: 0, failed: 0 }, - runTime: { + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, - waitTime: { + }), + waitTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -733,7 +733,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -742,18 +742,18 @@ describe('Selection strategies test suite', () => { stolen: 0, failed: 0 }, - runTime: { + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, - waitTime: { + }), + waitTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -872,7 +872,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -882,19 +882,19 @@ describe('Selection strategies test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: { - idle: { + elu: expect.objectContaining({ + idle: expect.objectContaining({ history: expect.any(CircularArray) - }, - active: { + }), + active: expect.objectContaining({ history: expect.any(CircularArray) - } - } + }) + }) }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( @@ -936,7 +936,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -946,19 +946,19 @@ describe('Selection strategies test suite', () => { failed: 0 }, runTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: { - idle: { + elu: expect.objectContaining({ + idle: expect.objectContaining({ history: expect.any(CircularArray) - }, - active: { + }), + active: expect.objectContaining({ history: expect.any(CircularArray) - } - } + }) + }) }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( @@ -1081,7 +1081,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1090,20 +1090,20 @@ describe('Selection strategies test suite', () => { stolen: 0, failed: 0 }, - runTime: { + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: { - idle: { + elu: expect.objectContaining({ + idle: expect.objectContaining({ history: expect.any(CircularArray) - }, - active: { + }), + active: expect.objectContaining({ history: expect.any(CircularArray) - } - } + }) + }) }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( @@ -1160,7 +1160,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1169,20 +1169,20 @@ describe('Selection strategies test suite', () => { stolen: 0, failed: 0 }, - runTime: { + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: { - idle: { + elu: expect.objectContaining({ + idle: expect.objectContaining({ history: expect.any(CircularArray) - }, - active: { + }), + active: expect.objectContaining({ history: expect.any(CircularArray) - } - } + }) + }) }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( @@ -1244,7 +1244,7 @@ describe('Selection strategies test suite', () => { } await Promise.all(promises) for (const workerNode of pool.workerNodes) { - expect(workerNode.usage).toMatchObject({ + expect(workerNode.usage).toStrictEqual({ tasks: { executed: expect.any(Number), executing: 0, @@ -1253,20 +1253,20 @@ describe('Selection strategies test suite', () => { stolen: 0, failed: 0 }, - runTime: { + runTime: expect.objectContaining({ history: expect.any(CircularArray) - }, + }), waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, - elu: { - idle: { + elu: expect.objectContaining({ + idle: expect.objectContaining({ history: expect.any(CircularArray) - }, - active: { + }), + active: expect.objectContaining({ history: expect.any(CircularArray) - } - } + }) + }) }) expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( @@ -1491,14 +1491,14 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }), waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -1559,14 +1559,14 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }), waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -1632,14 +1632,14 @@ describe('Selection strategies test suite', () => { history: expect.any(CircularArray) }), waitTime: { - history: expect.any(CircularArray) + history: new CircularArray() }, elu: { idle: { - history: expect.any(CircularArray) + history: new CircularArray() }, active: { - history: expect.any(CircularArray) + history: new CircularArray() } } }) @@ -1789,8 +1789,8 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, waitTime: { @@ -1815,8 +1815,8 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { - aggregate: false, - average: false, + aggregate: true, + average: true, median: false }, waitTime: { @@ -1853,16 +1853,16 @@ describe('Selection strategies test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, stolen: 0, failed: 0 }, - runTime: { - history: new CircularArray() - }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), waitTime: { history: new CircularArray() }, @@ -1875,6 +1875,10 @@ describe('Selection strategies test suite', () => { } } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) } expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -1886,6 +1890,11 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategy ).roundId ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy @@ -1931,9 +1940,9 @@ describe('Selection strategies test suite', () => { stolen: 0, failed: 0 }, - runTime: { - history: new CircularArray() - }, + runTime: expect.objectContaining({ + history: expect.any(CircularArray) + }), waitTime: { history: new CircularArray() }, @@ -1961,6 +1970,11 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategy ).roundId ).toBe(0) + expect( + pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategyContext.workerChoiceStrategy + ).workerNodeId + ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy -- 2.34.1