From 2377984dc95d13e0a684210368e3e4d4ba6239d6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 10 Oct 2022 21:22:42 +0200 Subject: [PATCH] Fix WRR worker choice strategy implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- ...hted-round-robin-worker-choice-strategy.ts | 80 +++++++++++-------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 6e48f82f..19c81f99 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -5,7 +5,7 @@ import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { RequiredStatistics } from './selection-strategies-types' /** - * Task run time. + * Virtual task run time. */ type TaskRunTime = { weight: number @@ -43,9 +43,9 @@ export class WeightedRoundRobinWorkerChoiceStrategy< */ private defaultWorkerWeight: number /** - * Per worker task runtime map. + * Per worker virtual task runtime map. */ - private workerTaskRunTime: Map = new Map< + private workersTaskRunTime: Map = new Map< Worker, TaskRunTime >() @@ -58,41 +58,39 @@ export class WeightedRoundRobinWorkerChoiceStrategy< public constructor (pool: IPoolInternal) { super(pool) this.defaultWorkerWeight = this.computeWorkerWeight() - this.initWorkerTaskRunTime() + this.initWorkersTaskRunTime() } /** @inheritDoc */ public choose (): Worker { const currentWorker = this.pool.workers[this.currentWorkerIndex] - if (this.isDynamicPool === true) { - this.workerTaskRunTime.has(currentWorker) === false && - this.workerTaskRunTime.set(currentWorker, { - weight: this.defaultWorkerWeight, - runTime: 0 - }) + if ( + this.isDynamicPool === true && + this.workersTaskRunTime.has(currentWorker) === false + ) { + this.initWorkerTaskRunTime(currentWorker) } const workerVirtualTaskRunTime = this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0 const workerTaskWeight = - this.workerTaskRunTime.get(currentWorker)?.weight ?? + this.workersTaskRunTime.get(currentWorker)?.weight ?? this.defaultWorkerWeight if (this.currentWorkerIndex === this.previousWorkerIndex) { const workerTaskRunTime = - (this.workerTaskRunTime.get(currentWorker)?.runTime ?? 0) + + (this.workersTaskRunTime.get(currentWorker)?.runTime ?? 0) + workerVirtualTaskRunTime - this.workerTaskRunTime.set(currentWorker, { - weight: workerTaskWeight, - runTime: workerTaskRunTime - }) + this.setWorkerTaskRunTime( + currentWorker, + workerTaskWeight, + workerTaskRunTime + ) } else { - this.workerTaskRunTime.set(currentWorker, { - weight: workerTaskWeight, - runTime: 0 - }) + this.setWorkerTaskRunTime(currentWorker, workerTaskWeight, 0) } if ( workerVirtualTaskRunTime < - (this.workerTaskRunTime.get(currentWorker) ?? this.defaultWorkerWeight) + (this.workersTaskRunTime.get(currentWorker)?.weight ?? + this.defaultWorkerWeight) ) { this.previousWorkerIndex = this.currentWorkerIndex } else { @@ -105,7 +103,32 @@ export class WeightedRoundRobinWorkerChoiceStrategy< return this.pool.workers[this.currentWorkerIndex] } - private computeWorkerWeight () { + private initWorkersTaskRunTime (): void { + for (const worker of this.pool.workers) { + this.initWorkerTaskRunTime(worker) + } + } + + private initWorkerTaskRunTime (worker: Worker): void { + this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0) + } + + private setWorkerTaskRunTime ( + worker: Worker, + weight: number, + runTime: number + ): void { + this.workersTaskRunTime.set(worker, { + weight, + runTime + }) + } + + private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined { + return this.pool.getWorkerAverageTasksRunTime(worker) + } + + private computeWorkerWeight (): number { let cpusCycleTimeWeight = 0 for (const cpu of cpus()) { // CPU estimated cycle time @@ -115,17 +138,4 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } return cpusCycleTimeWeight / cpus().length } - - private initWorkerTaskRunTime () { - for (const worker of this.pool.workers) { - this.workerTaskRunTime.set(worker, { - weight: this.defaultWorkerWeight, - runTime: 0 - }) - } - } - - private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined { - return this.pool.getWorkerAverageTasksRunTime(worker) - } } -- 2.34.1