From: Jérôme Benoit Date: Sun, 9 Oct 2022 20:06:28 +0000 (+0200) Subject: Add WRR worker choice strategy X-Git-Tag: v2.3.1~53 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=b3432a63039e7cb70c0448da5518690e457cd47e;p=poolifier.git Add WRR worker choice strategy Close #363 Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index edefcfaa..359cfb94 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -11,7 +11,11 @@ export const WorkerChoiceStrategies = Object.freeze({ /** * Less recently used worker selection strategy. */ - LESS_RECENTLY_USED: 'LESS_RECENTLY_USED' + LESS_RECENTLY_USED: 'LESS_RECENTLY_USED', + /** + * Weighted round robin worker selection strategy. + */ + WEIGHTED_ROUND_ROBIN: 'WEIGHTED_ROUND_ROBIN' } as const) /** diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts index e76a6998..34c425b3 100644 --- a/src/pools/selection-strategies/selection-strategies-utils.ts +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -7,6 +7,7 @@ import type { WorkerChoiceStrategy } from './selection-strategies-types' import { WorkerChoiceStrategies } from './selection-strategies-types' +import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-choice-strategy' /** * Worker selection strategies helpers class. @@ -32,6 +33,8 @@ export class SelectionStrategiesUtils { return new RoundRobinWorkerChoiceStrategy(pool) case WorkerChoiceStrategies.LESS_RECENTLY_USED: return new LessRecentlyUsedWorkerChoiceStrategy(pool) + case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN: + return new WeightedRoundRobinWorkerChoiceStrategy(pool) default: throw new Error( // eslint-disable-next-line @typescript-eslint/restrict-template-expressions diff --git a/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts new file mode 100644 index 00000000..cdb88016 --- /dev/null +++ b/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts @@ -0,0 +1,125 @@ +import { cpus } from 'os' +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolInternal } from '../pool-internal' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' + +/** + * Task run time. + */ +type TaskRunTime = { + weight: number + runTime: number +} + +/** + * Selects the next worker with a weighted round robin scheduling algorithm. + * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export class WeightedRoundRobinWorkerChoiceStrategy< + Worker extends AbstractPoolWorker, + Data, + Response +> extends AbstractWorkerChoiceStrategy { + /** + * Worker index where the previous task was submitted. + */ + private previousWorkerIndex: number = 0 + /** + * Worker index where the current task will be submitted. + */ + private currentWorkerIndex: number = 0 + /** + * Default worker weight. + */ + private defaultWorkerWeight: number + /** + * Per worker task runtime map. + */ + private workerTaskRunTime: Map = new Map< + Worker, + TaskRunTime + >() + + /** + * Constructs a worker choice strategy that selects based a weighted round robin scheduling algorithm. + * + * @param pool The pool instance. + */ + public constructor (pool: IPoolInternal) { + super(pool) + this.defaultWorkerWeight = this.computeWorkerWeight() + this.initWorkerTaskRunTime() + } + + /** @inheritDoc */ + public choose (): Worker { + const currentWorker = this.pool.workers[this.currentWorkerIndex] + if (this.isDynamicPool === true) { + this.workerTaskRunTime.has(currentWorker) === false && + this.workerTaskRunTime.set(currentWorker, { + weight: this.defaultWorkerWeight, + runTime: 0 + }) + } + const workerVirtualTaskRunTime = + this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0 + const workerTaskWeight = + this.workerTaskRunTime.get(currentWorker)?.weight ?? + this.defaultWorkerWeight + if (this.currentWorkerIndex === this.previousWorkerIndex) { + const workerTaskRunTime = + (this.workerTaskRunTime.get(currentWorker)?.runTime ?? 0) + + workerVirtualTaskRunTime + this.workerTaskRunTime.set(currentWorker, { + weight: workerTaskWeight, + runTime: workerTaskRunTime + }) + } else { + this.workerTaskRunTime.set(currentWorker, { + weight: workerTaskWeight, + runTime: 0 + }) + } + if ( + workerVirtualTaskRunTime < + (this.workerTaskRunTime.get(currentWorker) ?? this.defaultWorkerWeight) + ) { + this.previousWorkerIndex = this.currentWorkerIndex + } else { + this.previousWorkerIndex = this.currentWorkerIndex + this.currentWorkerIndex = + this.pool.workers.length - 1 === this.currentWorkerIndex + ? 0 + : this.currentWorkerIndex + 1 + } + return this.pool.workers[this.currentWorkerIndex] + } + + private computeWorkerWeight () { + let cpusCycleTimeWeight = 0 + for (let cpu = 0; cpu < cpus().length; cpu++) { + // CPU estimated cycle time + const numberOfDigit = cpus()[cpu].speed.toString().length - 1 + const cpuCycleTime = 1 / (cpus()[cpu].speed / Math.pow(10, numberOfDigit)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit) + } + return cpusCycleTimeWeight / cpus().length + } + + private initWorkerTaskRunTime () { + for (const worker of this.pool.workers) { + this.workerTaskRunTime.set(worker, { + weight: this.defaultWorkerWeight, + runTime: 0 + }) + } + } + + private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined { + return this.pool.getWorkerAverageTasksRunTime(worker) + } +} diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 6448c667..93649c39 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -9,6 +9,9 @@ describe('Selection strategies test suite', () => { it('Verify that WorkerChoiceStrategies enumeration provides string values', () => { expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN') expect(WorkerChoiceStrategies.LESS_RECENTLY_USED).toBe('LESS_RECENTLY_USED') + expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( + 'WEIGHTED_ROUND_ROBIN' + ) }) it('Verify ROUND_ROBIN strategy is the default at pool creation', async () => { @@ -148,6 +151,70 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify WEIGHTED_ROUND_ROBIN strategy is taken at pool creation', async () => { + const max = 3 + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + ) + expect(pool.opts.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy can be set after pool creation', async () => { + const max = 3 + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + expect(pool.opts.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { + const max = 3 + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + ) + // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = [] + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + await Promise.all(promises) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + const min = 0 + const max = 3 + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + ) + // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` + const promises = [] + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + await Promise.all(promises) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify unknown strategies throw error', () => { const min = 1 const max = 3