From: Jérôme Benoit Date: Tue, 30 May 2023 08:54:58 +0000 (+0200) Subject: Merge branch 'master' of github.com:poolifier/poolifier into interleaved-weighted... X-Git-Tag: v2.5.0~3^2~16 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=5a5fc090d6f7eb9248df1ba5c0ff4d001461b6d4;hp=e4543b1428fd6b52f5832ea75f21ac082b52684e;p=poolifier.git Merge branch 'master' of github.com:poolifier/poolifier into interleaved-weighted-round-robin-worker-choice-strategy Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts new file mode 100644 index 00000000..12a8055e --- /dev/null +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -0,0 +1,149 @@ +import { cpus } from 'node:os' +import type { IWorker } from '../worker' +import type { IPool } from '../pool' +import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' +import type { + IWorkerChoiceStrategy, + WorkerChoiceStrategyOptions +} from './selection-strategies-types' + +/** + * Selects the next worker with an interleaved weighted round robin scheduling algorithm. + * + * @typeParam Worker - Type of worker which manages the strategy. + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< + Worker extends IWorker, + Data = unknown, + Response = unknown + > + extends AbstractWorkerChoiceStrategy + implements IWorkerChoiceStrategy { + /** + * Worker node id where the current task will be submitted. + */ + private currentWorkerNodeId: number = 0 + /** + * Current round id. + * This is used to determine the current round weight. + */ + private currentRoundId: number = 0 + /** + * Round weights. + */ + private roundWeights: number[] + /** + * Default worker weight. + */ + private readonly defaultWorkerWeight: number + + /** @inheritDoc */ + public constructor ( + pool: IPool, + opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + ) { + super(pool, opts) + this.setRequiredStatistics(this.opts) + this.defaultWorkerWeight = this.computeDefaultWorkerWeight() + this.roundWeights = this.getRoundWeights() + } + + /** @inheritDoc */ + public reset (): boolean { + this.currentWorkerNodeId = 0 + this.currentRoundId = 0 + return true + } + + /** @inheritDoc */ + public update (): boolean { + return true + } + + /** @inheritDoc */ + public choose (): number { + let roundId: number | undefined + let workerNodeId: number | undefined + for ( + let roundIndex = this.currentRoundId; + roundIndex < this.roundWeights.length; + roundIndex++ + ) { + for ( + let workerNodeKey = this.currentWorkerNodeId; + workerNodeKey < this.pool.workerNodes.length; + workerNodeKey++ + ) { + const workerWeight = + this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight + if (workerWeight >= this.roundWeights[roundIndex]) { + roundId = roundIndex + workerNodeId = workerNodeKey + break + } + } + } + this.currentRoundId = roundId ?? 0 + this.currentWorkerNodeId = workerNodeId ?? 0 + const chosenWorkerNodeKey = this.currentWorkerNodeId + if (this.currentWorkerNodeId === this.pool.workerNodes.length - 1) { + this.currentWorkerNodeId = 0 + this.currentRoundId = + this.currentRoundId === this.roundWeights.length - 1 + ? 0 + : this.currentRoundId + 1 + } else { + this.currentWorkerNodeId = this.currentWorkerNodeId + 1 + } + return chosenWorkerNodeKey + } + + /** @inheritDoc */ + public remove (workerNodeKey: number): boolean { + if (this.currentWorkerNodeId === workerNodeKey) { + if (this.pool.workerNodes.length === 0) { + this.currentWorkerNodeId = 0 + } else if (this.currentWorkerNodeId > this.pool.workerNodes.length - 1) { + this.currentWorkerNodeId = this.pool.workerNodes.length - 1 + this.currentRoundId = + this.currentRoundId === this.roundWeights.length - 1 + ? 0 + : this.currentRoundId + 1 + } + } + return true + } + + /** @inheritDoc */ + public setOptions (opts: WorkerChoiceStrategyOptions): void { + super.setOptions(opts) + this.roundWeights = this.getRoundWeights() + } + + private computeDefaultWorkerWeight (): number { + let cpusCycleTimeWeight = 0 + for (const cpu of cpus()) { + // CPU estimated cycle time + const numberOfDigits = cpu.speed.toString().length - 1 + const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + } + return Math.round(cpusCycleTimeWeight / cpus().length) + } + + private getRoundWeights (): number[] { + if (this.opts.weights == null) { + return [this.defaultWorkerWeight] + } + return [ + ...new Set( + Object.values(this.opts.weights) + .slice() + .sort((a, b) => a - b) + ) + ] + } +} diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 282e306d..29f1ca81 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -21,7 +21,13 @@ export const WorkerChoiceStrategies = Object.freeze({ /** * Weighted round robin worker selection strategy. */ - WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN' + WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN', + /** + * Interleaved weighted round robin worker selection strategy. + * + * @experimental + */ + INTERLEAVED_WEIGHTED_ROUND_ROBIN: 'INTERLEAVED_WEIGHTED_ROUND_ROBIN' } as const) /** diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 21c24f7c..43bcb3ad 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -2,6 +2,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import type { IPool } from '../pool' import type { IWorker } from '../worker' import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy' +import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy' import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy' import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy' import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' @@ -83,6 +84,14 @@ export class WorkerChoiceStrategyContext< Data, Response >(pool, opts) + ], + [ + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN, + new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))< + Worker, + Data, + Response + >(pool, opts) ] ]) } diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 9e3e99e3..ea5fc04a 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -18,6 +18,9 @@ describe('Selection strategies test suite', () => { expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( 'WEIGHTED_ROUND_ROBIN' ) + expect(WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN).toBe( + 'INTERLEAVED_WEIGHTED_ROUND_ROBIN' + ) }) it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => {