From 97a2abc3c559695c4fae99c48d1a2dc636275ccb Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 3 Apr 2023 19:09:06 +0200 Subject: [PATCH] fix: ensure worker removal impact is propated to worker choice strategy MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + src/pools/abstract-pool.ts | 9 ++-- .../abstract-worker-choice-strategy.ts | 3 ++ .../dynamic-pool-worker-choice-strategy.ts | 5 ++ .../fair-share-worker-choice-strategy.ts | 11 +++++ .../less-busy-worker-choice-strategy.ts | 5 ++ .../less-used-worker-choice-strategy.ts | 5 ++ .../round-robin-worker-choice-strategy.ts | 11 +++++ .../selection-strategies-types.ts | 6 +++ ...hted-round-robin-worker-choice-strategy.ts | 17 +++++++ .../worker-choice-strategy-context.ts | 21 +++++++++ .../selection-strategies.test.js | 46 +++++++++++-------- 12 files changed, 115 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 79788cf7..55c8239e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Ensure trimmable characters are checked at pool initialization. - Fix message id integer overflow. +- Fix pool worker removal in worker choice strategy internals. ## [2.3.10] - 2023-03-18 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 172feda0..095a4f7b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -260,10 +260,7 @@ export abstract class AbstractPool< if (message.error != null) { ++workerTasksUsage.error } - if ( - this.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime - ) { + if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) { workerTasksUsage.runTime += message.taskRunTime ?? 0 if (workerTasksUsage.run !== 0) { workerTasksUsage.avgRunTime = @@ -278,7 +275,9 @@ export abstract class AbstractPool< * @param worker - The worker that will be removed. */ protected removeWorker (worker: Worker): void { - this.workers.splice(this.getWorkerKey(worker), 1) + const workerKey = this.getWorkerKey(worker) + this.workers.splice(workerKey, 1) + this.workerChoiceStrategyContext.remove(workerKey) } /** diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 5b526331..b1812733 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -41,4 +41,7 @@ export abstract class AbstractWorkerChoiceStrategy< /** {@inheritDoc} */ public abstract choose (): number + + /** {@inheritDoc} */ + public abstract remove (workerKey: number): boolean } diff --git a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts index c122c2b1..1658d176 100644 --- a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts @@ -61,4 +61,9 @@ export class DynamicPoolWorkerChoiceStrategy< // All workers are busy, create a new worker return this.createWorkerCallback() } + + /** {@inheritDoc} */ + public remove (workerKey: number): boolean { + return this.workerChoiceStrategy.remove(workerKey) + } } diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index af8eb75d..dbe10ce7 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -60,6 +60,17 @@ export class FairShareWorkerChoiceStrategy< return chosenWorkerKey } + /** {@inheritDoc} */ + public remove (workerKey: number): boolean { + const workerDeleted = this.workerLastVirtualTaskTimestamp.delete(workerKey) + for (const [key, value] of this.workerLastVirtualTaskTimestamp.entries()) { + if (key > workerKey) { + this.workerLastVirtualTaskTimestamp.set(key - 1, value) + } + } + return workerDeleted + } + /** * Computes worker last virtual task timestamp. * diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index 2bbafb59..f2e40162 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -39,4 +39,9 @@ export class LessBusyWorkerChoiceStrategy< } return lessBusyWorkerKey } + + /** {@inheritDoc} */ + public remove (workerKey: number): boolean { + return true + } } diff --git a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts index 2ed58c96..9cae4a54 100644 --- a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts @@ -34,4 +34,9 @@ export class LessUsedWorkerChoiceStrategy< } return lessUsedWorkerKey } + + /** {@inheritDoc} */ + public remove (workerKey: number): boolean { + return true + } } diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index e265172f..3880ca39 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -33,4 +33,15 @@ export class RoundRobinWorkerChoiceStrategy< : this.nextWorkerId + 1 return chosenWorkerKey } + + /** {@inheritDoc} */ + public remove (workerKey: number): boolean { + if (this.nextWorkerId === workerKey) { + this.nextWorkerId = + this.nextWorkerId > this.pool.workers.length - 1 + ? this.pool.workers.length - 1 + : this.nextWorkerId + } + return true + } } diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index a05fd9b7..c0987685 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -56,4 +56,10 @@ export interface IWorkerChoiceStrategy { * Chooses a worker in the pool and returns its key. */ choose: () => number + /** + * Removes a worker reference from strategy internals. + * + * @param workerKey - The worker key. + */ + remove: (workerKey: number) => boolean } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index e3d2be3f..24a25b4d 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -93,6 +93,23 @@ export class WeightedRoundRobinWorkerChoiceStrategy< return chosenWorkerKey } + /** {@inheritDoc} */ + public remove (workerKey: number): boolean { + if (this.currentWorkerId === workerKey) { + this.currentWorkerId = + this.currentWorkerId > this.pool.workers.length - 1 + ? this.pool.workers.length - 1 + : this.currentWorkerId + } + const workerDeleted = this.workersTaskRunTime.delete(workerKey) + for (const [key, value] of this.workersTaskRunTime) { + if (key > workerKey) { + this.workersTaskRunTime.set(key - 1, value) + } + } + return workerDeleted + } + private initWorkersTaskRunTime (): void { for (const [index] of this.pool.workers.entries()) { this.initWorkerTaskRunTime(index) diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index c149cc2e..011e00db 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -4,6 +4,7 @@ import type { IPoolWorker } from '../pool-worker' import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy' import type { IWorkerChoiceStrategy, + RequiredStatistics, WorkerChoiceStrategy } from './selection-strategies-types' import { WorkerChoiceStrategies } from './selection-strategies-types' @@ -61,11 +62,21 @@ export class WorkerChoiceStrategyContext< * Gets the worker choice strategy used in the context. * * @returns The worker choice strategy. + * @deprecated Scheduled removal. */ public getWorkerChoiceStrategy (): IWorkerChoiceStrategy { return this.workerChoiceStrategy } + /** + * Gets the worker choice strategy required statistics. + * + * @returns The required statistics. + */ + public getRequiredStatistics (): RequiredStatistics { + return this.workerChoiceStrategy.requiredStatistics + } + /** * Sets the worker choice strategy to use in the context. * @@ -87,4 +98,14 @@ export class WorkerChoiceStrategyContext< public execute (): number { return this.workerChoiceStrategy.choose() } + + /** + * Removes a worker in the underlying selection strategy internals. + * + * @param workerKey - The key of the worker to remove. + * @returns `true` if the removal is successful, `false` otherwise. + */ + public remove (workerKey: number): boolean { + return this.workerChoiceStrategy.remove(workerKey) + } } diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index a6257179..d09dde6f 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -70,8 +70,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( @@ -81,8 +80,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(false) // We need to clean up the resources after our test await pool.destroy() @@ -213,8 +211,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( @@ -224,8 +221,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(false) // We need to clean up the resources after our test await pool.destroy() @@ -297,8 +293,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(true) await pool.destroy() pool = new DynamicThreadPool( @@ -308,8 +303,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(true) // We need to clean up the resources after our test await pool.destroy() @@ -395,8 +389,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(true) await pool.destroy() pool = new DynamicThreadPool( @@ -406,8 +399,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(true) // We need to clean up the resources after our test await pool.destroy() @@ -425,6 +417,10 @@ describe('Selection strategies test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .workerLastVirtualTaskTimestamp.size + ).toBe(pool.workers.length) // We need to clean up the resources after our test await pool.destroy() }) @@ -442,6 +438,10 @@ describe('Selection strategies test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) + // expect( + // pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + // .workerChoiceStrategy.workerLastVirtualTaskTimestamp.size + // ).toBe(pool.workers.length) // We need to clean up the resources after our test await pool.destroy() }) @@ -555,8 +555,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(true) await pool.destroy() pool = new DynamicThreadPool( @@ -566,8 +565,7 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) expect( - pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() - .requiredStatistics.runTime + pool.workerChoiceStrategyContext.getRequiredStatistics().runTime ).toBe(true) // We need to clean up the resources after our test await pool.destroy() @@ -585,6 +583,10 @@ describe('Selection strategies test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .workersTaskRunTime.size + ).toBe(pool.workers.length) // We need to clean up the resources after our test await pool.destroy() }) @@ -602,6 +604,10 @@ describe('Selection strategies test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) + // expect( + // pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + // .workerChoiceStrategy.workersTaskRunTime.size + // ).toBe(pool.workers.length) // We need to clean up the resources after our test await pool.destroy() }) -- 2.34.1