From feec6e8c04165c3c70a12c9a1c5e64a32b40311c Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 7 May 2023 18:28:22 +0200 Subject: [PATCH] feat: initial IWRR worker choice implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ...hted-round-robin-worker-choice-strategy.ts | 158 ++++++++++++++++++ .../selection-strategies-types.ts | 6 +- .../worker-choice-strategy-context.ts | 9 + .../selection-strategies.test.js | 3 + 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts new file mode 100644 index 00000000..09598355 --- /dev/null +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -0,0 +1,158 @@ +import { cpus } from 'node:os' +import type { IWorker } from '../worker' +import type { IPool } from '../pool' +import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' +import type { + IWorkerChoiceStrategy, + RequiredStatistics, + WorkerChoiceStrategyOptions +} from './selection-strategies-types' + +/** + * Selects the next worker with an interleaved weighted round robin scheduling algorithm. + * + * @typeParam Worker - Type of worker which manages the strategy. + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< + Worker extends IWorker, + Data = unknown, + Response = unknown + > + extends AbstractWorkerChoiceStrategy + implements IWorkerChoiceStrategy { + /** @inheritDoc */ + public readonly requiredStatistics: RequiredStatistics = { + runTime: true, + avgRunTime: true, + medRunTime: false + } + + /** + * Worker node id where the current task will be submitted. + */ + private currentWorkerNodeId: number = 0 + /** + * Current round id. + * This is used to determine the current round weight. + */ + private currentRoundId: number = 0 + /** + * Round weights. + */ + private readonly roundWeights: number[] + /** + * Default worker weight. + */ + private readonly defaultWorkerWeight: number + + /** @inheritDoc */ + public constructor ( + pool: IPool, + opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + ) { + super(pool, opts) + this.checkOptions(this.opts) + this.defaultWorkerWeight = this.computeDefaultWorkerWeight() + this.roundWeights = this.getRoundWeights() + } + + /** @inheritDoc */ + public reset (): boolean { + this.currentWorkerNodeId = 0 + this.currentRoundId = 0 + return true + } + + /** @inheritDoc */ + public update (): boolean { + return true + } + + /** @inheritDoc */ + public choose (): number { + let chosenWorkerNodeKey: number + const workerWeight = + this.opts.weights?.[this.currentWorkerNodeId] ?? this.defaultWorkerWeight + if (workerWeight >= this.roundWeights[this.currentRoundId]) { + chosenWorkerNodeKey = this.currentWorkerNodeId + this.currentWorkerNodeId = + this.currentWorkerNodeId === this.pool.workerNodes.length - 1 + ? 0 + : this.currentWorkerNodeId + 1 + if (this.currentWorkerNodeId === this.pool.workerNodes.length - 1) { + this.currentRoundId = + this.currentRoundId === this.roundWeights.length - 1 + ? 0 + : this.currentRoundId + 1 + } + } else { + let roundId: number | undefined + let workerNodeId: number | undefined + for ( + let round = this.currentRoundId; + round < this.roundWeights.length; + round++ + ) { + for ( + let workerNodeKey = this.currentWorkerNodeId + 1; + workerNodeKey < this.pool.workerNodes.length; + workerNodeKey++ + ) { + const workerWeight = + this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight + if (workerWeight >= this.roundWeights[round]) { + roundId = round + workerNodeId = workerNodeKey + break + } + } + } + this.currentRoundId = roundId ?? 0 + this.currentWorkerNodeId = workerNodeId ?? 0 + chosenWorkerNodeKey = this.currentWorkerNodeId + } + return chosenWorkerNodeKey + } + + /** @inheritDoc */ + public remove (workerNodeKey: number): boolean { + if (this.currentWorkerNodeId === workerNodeKey) { + if (this.pool.workerNodes.length === 0) { + this.currentWorkerNodeId = 0 + } else { + this.currentWorkerNodeId = + this.currentWorkerNodeId > this.pool.workerNodes.length - 1 + ? this.pool.workerNodes.length - 1 + : this.currentWorkerNodeId + } + } + return true + } + + private computeDefaultWorkerWeight (): number { + let cpusCycleTimeWeight = 0 + for (const cpu of cpus()) { + // CPU estimated cycle time + const numberOfDigits = cpu.speed.toString().length - 1 + const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + } + return Math.round(cpusCycleTimeWeight / cpus().length) + } + + private getRoundWeights (): number[] { + if (this.opts.weights == null) { + return [this.defaultWorkerWeight] + } + return [ + ...new Set( + Object.values(this.opts.weights) + .slice() + .sort((a, b) => a - b) + ) + ] + } +} diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index b95cf7f3..f2b401d6 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -21,7 +21,11 @@ export const WorkerChoiceStrategies = Object.freeze({ /** * Weighted round robin worker selection strategy. */ - WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN' + WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN', + /** + * Interleaved weighted round robin worker selection strategy. + */ + INTERLEAVED_WEIGHTED_ROUND_ROBIN: 'INTERLEAVED_WEIGHTED_ROUND_ROBIN' } as const) /** diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 124addd6..9ea46316 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -2,6 +2,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy' +import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy' import { LessBusyWorkerChoiceStrategy } from './less-busy-worker-choice-strategy' import { LessUsedWorkerChoiceStrategy } from './less-used-worker-choice-strategy' import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' @@ -83,6 +84,14 @@ export class WorkerChoiceStrategyContext< Data, Response >(pool, opts) + ], + [ + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN, + new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))< + Worker, + Data, + Response + >(pool, opts) ] ]) } diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 1ecd270f..59067afc 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -18,6 +18,9 @@ describe('Selection strategies test suite', () => { expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( 'WEIGHTED_ROUND_ROBIN' ) + expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe( + 'INTERLEAVED_WEIGHTED_ROUND_ROBIN' + ) }) it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => { -- 2.34.1