From b0d6ed8f66e9636805462c83e4c9290dccebb690 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 7 May 2023 15:45:03 +0200 Subject: [PATCH] fix: fix fair share worker choice strategy internals update MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 ++ src/pools/abstract-pool.ts | 2 +- .../fair-share-worker-choice-strategy.ts | 56 ++++++++++--------- .../worker-choice-strategy-context.ts | 7 ++- .../selection-strategies.test.js | 34 +++++------ 5 files changed, 59 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e567704..ecbb7e77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix fair share worker choice strategy internals update: ensure virtual task end timestamp is computed at task submission. + ## [2.4.12] - 2023-05-06 ### Added diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ab93fc00..5114bead 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -354,6 +354,7 @@ export abstract class AbstractPool< } else { this.executeTask(workerNodeKey, submittedTask) } + this.workerChoiceStrategyContext.update(workerNodeKey) this.checkAndEmitEvents() // eslint-disable-next-line @typescript-eslint/return-await return res @@ -433,7 +434,6 @@ export abstract class AbstractPool< workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } - this.workerChoiceStrategyContext.update(workerNodeKey) } /** diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index e65575a1..3a2e71ee 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -8,14 +8,6 @@ import type { WorkerChoiceStrategyOptions } from './selection-strategies-types' -/** - * Worker virtual task timestamp. - */ -interface WorkerVirtualTaskTimestamp { - start: number - end?: number -} - /** * Selects the next worker with a fair share scheduling algorithm. * Loosely modeled after the fair queueing algorithm: https://en.wikipedia.org/wiki/Fair_queuing. @@ -39,9 +31,9 @@ export class FairShareWorkerChoiceStrategy< } /** - * Workers' virtual task execution timestamp. + * Workers' virtual task end execution timestamp. */ - private workersVirtualTaskTimestamp: WorkerVirtualTaskTimestamp[] = [] + private workersVirtualTaskEndTimestamp: number[] = [] /** @inheritDoc */ public constructor ( @@ -54,13 +46,13 @@ export class FairShareWorkerChoiceStrategy< /** @inheritDoc */ public reset (): boolean { - this.workersVirtualTaskTimestamp = [] + this.workersVirtualTaskEndTimestamp = [] return true } /** @inheritDoc */ public update (workerNodeKey: number): boolean { - this.computeWorkerVirtualTaskTimestamp(workerNodeKey) + this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey) return true } @@ -69,8 +61,11 @@ export class FairShareWorkerChoiceStrategy< let minWorkerVirtualTaskEndTimestamp = Infinity let chosenWorkerNodeKey!: number for (const [workerNodeKey] of this.pool.workerNodes.entries()) { + if (this.workersVirtualTaskEndTimestamp[workerNodeKey] == null) { + this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey) + } const workerVirtualTaskEndTimestamp = - this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? 0 + this.workersVirtualTaskEndTimestamp[workerNodeKey] if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) { minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp chosenWorkerNodeKey = workerNodeKey @@ -81,26 +76,37 @@ export class FairShareWorkerChoiceStrategy< /** @inheritDoc */ public remove (workerNodeKey: number): boolean { - this.workersVirtualTaskTimestamp.splice(workerNodeKey, 1) + this.workersVirtualTaskEndTimestamp.splice(workerNodeKey, 1) return true } /** - * Computes worker virtual task timestamp. + * Computes the worker node key virtual task end timestamp. * * @param workerNodeKey - The worker node key. */ - private computeWorkerVirtualTaskTimestamp (workerNodeKey: number): void { - const workerVirtualTaskStartTimestamp = Math.max( - performance.now(), - this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? -Infinity - ) - const workerVirtualTaskTRunTime = this.requiredStatistics.medRunTime + private computeWorkerVirtualTaskEndTimestamp (workerNodeKey: number): void { + this.workersVirtualTaskEndTimestamp[workerNodeKey] = + this.getWorkerVirtualTaskEndTimestamp( + workerNodeKey, + this.getWorkerVirtualTaskStartTimestamp(workerNodeKey) + ) + } + + private getWorkerVirtualTaskEndTimestamp ( + workerNodeKey: number, + workerVirtualTaskStartTimestamp: number + ): number { + const workerVirtualTaskRunTime = this.requiredStatistics.medRunTime ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime - this.workersVirtualTaskTimestamp[workerNodeKey] = { - start: workerVirtualTaskStartTimestamp, - end: workerVirtualTaskStartTimestamp + workerVirtualTaskTRunTime - } + return workerVirtualTaskStartTimestamp + workerVirtualTaskRunTime + } + + private getWorkerVirtualTaskStartTimestamp (workerNodeKey: number): number { + return Math.max( + performance.now(), + this.workersVirtualTaskEndTimestamp[workerNodeKey] ?? -Infinity + ) } } diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 2b3c9fb6..124addd6 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -131,13 +131,18 @@ export class WorkerChoiceStrategyContext< * Executes the worker choice strategy algorithm in the context. * * @returns The key of the worker node. + * @throws {@link Error} If the worker node key is null or undefined. */ public execute (): number { - return ( + const workerNodeKey = ( this.workerChoiceStrategies.get( this.workerChoiceStrategy ) as IWorkerChoiceStrategy ).choose() + if (workerNodeKey == null) { + throw new Error('Worker node key chosen is null or undefined') + } + return workerNodeKey } /** diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 392aafcf..1ecd270f 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -80,12 +80,12 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp + ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(0) } else if ( workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN @@ -433,7 +433,7 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() @@ -460,7 +460,7 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() @@ -492,7 +492,7 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( pool.workerChoiceStrategyContext.workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() @@ -507,31 +507,31 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp + ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(0) pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp[0] = 0 + ).workersVirtualTaskEndTimestamp[0] = performance.now() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(1) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp + ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(0) await pool.destroy() pool = new DynamicThreadPool( @@ -542,31 +542,31 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp + ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(0) pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp[0] = 0 + ).workersVirtualTaskEndTimestamp[0] = performance.now() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(1) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp + ).workersVirtualTaskEndTimestamp ).toBeInstanceOf(Array) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( workerChoiceStrategy - ).workersVirtualTaskTimestamp.length + ).workersVirtualTaskEndTimestamp.length ).toBe(0) // We need to clean up the resources after our test await pool.destroy() -- 2.34.1