From a4958de2101f06e7096b83adbca82fcfd532a721 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 6 May 2023 20:25:18 +0200 Subject: [PATCH] fix: fix fair share algorithm implementation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 32 +++---------- .../abstract-worker-choice-strategy.ts | 8 ++-- .../fair-share-worker-choice-strategy.ts | 8 ++-- .../less-busy-worker-choice-strategy.ts | 2 +- .../less-used-worker-choice-strategy.ts | 2 +- .../selection-strategies-types.ts | 8 +++- .../worker-choice-strategy-context.ts | 4 +- src/utils.ts | 2 +- .../selection-strategies.test.js | 48 +++++-------------- 9 files changed, 37 insertions(+), 77 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b778fe17..ab93fc00 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -322,7 +322,7 @@ export abstract class AbstractPool< protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { - return workerNode.tasksUsage?.running === 0 + return workerNode.tasksUsage.running === 0 }) === -1 ) } @@ -412,7 +412,8 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerTasksUsage = this.getWorkerTasksUsage(worker) + const workerNodeKey = this.getWorkerNodeKey(worker) + const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage --workerTasksUsage.running ++workerTasksUsage.run if (message.error != null) { @@ -432,7 +433,7 @@ export abstract class AbstractPool< workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } - this.workerChoiceStrategyContext.update() + this.workerChoiceStrategyContext.update(workerNodeKey) } /** @@ -447,13 +448,14 @@ export abstract class AbstractPool< if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) { const workerCreated = this.createAndSetupWorker() this.registerWorkerMessageListener(workerCreated, message => { + const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated) if ( isKillBehavior(KillBehaviors.HARD, message.kill) || (message.kill != null && - this.getWorkerTasksUsage(workerCreated)?.running === 0) + this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0) ) { // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime) - this.flushTasksQueueByWorker(workerCreated) + this.flushTasksQueue(currentWorkerNodeKey) void (this.destroyWorker(workerCreated) as Promise) } }) @@ -579,21 +581,6 @@ export abstract class AbstractPool< workerNode.tasksUsage = tasksUsage } - /** - * Gets the given worker its tasks usage in the pool. - * - * @param worker - The worker. - * @throws Error if the worker is not found in the pool worker nodes. - * @returns The worker tasks usage. - */ - private getWorkerTasksUsage (worker: Worker): TasksUsage { - const workerNodeKey = this.getWorkerNodeKey(worker) - if (workerNodeKey !== -1) { - return this.workerNodes[workerNodeKey].tasksUsage - } - throw new Error('Worker could not be found in the pool worker nodes') - } - /** * Pushes the given worker in the pool worker nodes. * @@ -676,11 +663,6 @@ export abstract class AbstractPool< } } - private flushTasksQueueByWorker (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKey(worker) - this.flushTasksQueue(workerNodeKey) - } - private flushTasksQueues (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.flushTasksQueue(workerNodeKey) diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 9aabdfd2..e2dd626d 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -66,7 +66,7 @@ export abstract class AbstractWorkerChoiceStrategy< public abstract reset (): boolean /** @inheritDoc */ - public abstract update (): boolean + public abstract update (workerNodeKey: number): boolean /** @inheritDoc */ public abstract choose (): number @@ -106,7 +106,7 @@ export abstract class AbstractWorkerChoiceStrategy< */ private findFirstFreeWorkerNodeKey (): number { return this.pool.workerNodes.findIndex(workerNode => { - return workerNode.tasksUsage?.running === 0 + return workerNode.tasksUsage.running === 0 }) } @@ -122,10 +122,10 @@ export abstract class AbstractWorkerChoiceStrategy< private findLastFreeWorkerNodeKey (): number { // It requires node >= 18.0.0: // return this.workerNodes.findLastIndex(workerNode => { - // return workerNode.tasksUsage?.running === 0 + // return workerNode.tasksUsage.running === 0 // }) for (let i = this.pool.workerNodes.length - 1; i >= 0; i--) { - if (this.pool.workerNodes[i].tasksUsage?.running === 0) { + if (this.pool.workerNodes[i].tasksUsage.running === 0) { return i } } diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 17133408..e65575a1 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -13,7 +13,7 @@ import type { */ interface WorkerVirtualTaskTimestamp { start: number - end: number + end?: number } /** @@ -59,10 +59,8 @@ export class FairShareWorkerChoiceStrategy< } /** @inheritDoc */ - public update (): boolean { - for (const [workerNodeKey] of this.pool.workerNodes.entries()) { - this.computeWorkerVirtualTaskTimestamp(workerNodeKey) - } + public update (workerNodeKey: number): boolean { + this.computeWorkerVirtualTaskTimestamp(workerNodeKey) return true } diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index 8feac71c..40a3f649 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -69,7 +69,7 @@ export class LessBusyWorkerChoiceStrategy< } /** @inheritDoc */ - public remove (workerNodeKey: number): boolean { + public remove (): boolean { return true } } diff --git a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts index 078dd8fd..295493a4 100644 --- a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts @@ -62,7 +62,7 @@ export class LessUsedWorkerChoiceStrategy< } /** @inheritDoc */ - public remove (workerNodeKey: number): boolean { + public remove (): boolean { return true } } diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 968d2996..114ae8c5 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -78,16 +78,20 @@ export interface IWorkerChoiceStrategy { readonly requiredStatistics: RequiredStatistics /** * Resets strategy internals. + * + * @returns `true` if the reset is successful, `false` otherwise. */ reset: () => boolean /** - * Updates strategy internals. + * Updates worker node strategy internals. * * @returns `true` if the update is successful, `false` otherwise. */ - update: () => boolean + update: (workerNodeKey: number) => boolean /** * Chooses a worker node in the pool and returns its key. + * + * @returns The worker node key. */ choose: () => number /** diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 0a488549..e5fddc1b 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -119,12 +119,12 @@ export class WorkerChoiceStrategyContext< * * @returns `true` if the update is successful, `false` otherwise. */ - public update (): boolean { + public update (workerNodeKey: number): boolean { return ( this.workerChoiceStrategies.get( this.workerChoiceStrategy ) as IWorkerChoiceStrategy - ).update() + ).update(workerNodeKey) } /** diff --git a/src/utils.ts b/src/utils.ts index f30e175c..ed118da6 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -28,7 +28,7 @@ export const median = (dataSet: number[]): number => { const sortedDataSet = dataSet.slice().sort((a, b) => a - b) const middleIndex = Math.floor(sortedDataSet.length / 2) if (sortedDataSet.length % 2 === 0) { - return sortedDataSet[middleIndex / 2] + return sortedDataSet[middleIndex] } return (sortedDataSet[middleIndex - 1] + sortedDataSet[middleIndex]) / 2 } diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index e8d9d66b..392aafcf 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -153,12 +153,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) // We need to clean up the resources after our test await pool.destroy() }) @@ -171,12 +169,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } ) // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) // We need to clean up the resources after our test await pool.destroy() }) @@ -290,12 +286,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } ) // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) // We need to clean up the resources after our test await pool.destroy() }) @@ -308,12 +302,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED } ) // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) // We need to clean up the resources after our test await pool.destroy() }) @@ -361,12 +353,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } ) // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) // We need to clean up the resources after our test await pool.destroy() }) @@ -379,12 +369,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY } ) // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) // We need to clean up the resources after our test await pool.destroy() }) @@ -432,12 +420,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksUsage.avgRunTime).toBeDefined() expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) @@ -461,12 +447,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksUsage.avgRunTime).toBeDefined() expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) @@ -495,12 +479,10 @@ describe('Selection strategies test suite', () => { } ) // TODO: Create a better test to cover `FairShareChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksUsage.avgRunTime).toBeDefined() expect(workerNode.tasksUsage.avgRunTime).toBe(0) @@ -633,12 +615,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksUsage.avgRunTime).toBeDefined() expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) @@ -667,12 +647,10 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } ) // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksUsage.avgRunTime).toBeDefined() expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) @@ -706,12 +684,10 @@ describe('Selection strategies test suite', () => { } ) // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose` - const promises = [] const maxMultiplier = 2 for (let i = 0; i < max * maxMultiplier; i++) { - promises.push(pool.execute()) + await pool.execute() } - await Promise.all(promises) for (const workerNode of pool.workerNodes) { expect(workerNode.tasksUsage.avgRunTime).toBeDefined() expect(workerNode.tasksUsage.avgRunTime).toBe(0) -- 2.34.1