From 85bbc7ab16c9f69a5dd358b71e3e6d4204dfd630 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 28 Apr 2024 23:35:28 +0200 Subject: [PATCH] fix: ensure task function ops sync worker choice strategies MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .eslintrc.cjs | 1 + .vscode/settings.json | 1 + src/pools/abstract-pool.ts | 64 +++++++++++++------ .../selection-strategies-utils.ts | 6 +- .../worker-choice-strategies-context.ts | 53 ++++++++++++--- src/pools/utils.ts | 4 +- tests/worker/abstract-worker.test.mjs | 6 +- 7 files changed, 99 insertions(+), 36 deletions(-) diff --git a/.eslintrc.cjs b/.eslintrc.cjs index ef9e86c0..cc22b6a4 100644 --- a/.eslintrc.cjs +++ b/.eslintrc.cjs @@ -45,6 +45,7 @@ module.exports = defineConfig({ 'cpus', 'cryptographically', 'ctx', + 'deduped', 'deprecations', 'deque', 'dequeue', diff --git a/.vscode/settings.json b/.vscode/settings.json index b014abda..b644a303 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -14,6 +14,7 @@ "cloneable", "codeql", "commitlint", + "deduped", "Dependabot", "deque", "deregisters", diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 63f1f859..c510b2ad 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -548,13 +548,22 @@ export abstract class AbstractPool< workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { checkValidWorkerChoiceStrategy(workerChoiceStrategy) - this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( - this.opts.workerChoiceStrategy, - workerChoiceStrategyOptions - ) - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.sendStatisticsMessageToWorker(workerNodeKey) + if (workerChoiceStrategyOptions != null) { + this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + } + if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { + this.opts.workerChoiceStrategy = workerChoiceStrategy + this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( + this.opts.workerChoiceStrategy, + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const [workerNodeKey] of this.workerNodes.entries()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } } } @@ -809,17 +818,9 @@ export abstract class AbstractPool< /** @inheritDoc */ public hasTaskFunction (name: string): boolean { - for (const workerNode of this.workerNodes) { - if ( - Array.isArray(workerNode.info.taskFunctionsProperties) && - workerNode.info.taskFunctionsProperties.some( - taskFunctionProperties => taskFunctionProperties.name === name - ) - ) { - return true - } - } - return false + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.name === name + ) } /** @inheritDoc */ @@ -845,6 +846,9 @@ export abstract class AbstractPool< taskFunction: fn.taskFunction.toString() }) this.taskFunctions.set(name, fn) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies() + ) return opResult } @@ -864,6 +868,9 @@ export abstract class AbstractPool< }) this.deleteTaskFunctionWorkerUsages(name) this.taskFunctions.delete(name) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerWorkerChoiceStrategies() + ) return opResult } @@ -897,6 +904,27 @@ export abstract class AbstractPool< } } + /** + * Gets the worker choice strategies registered in this pool. + * + * @returns The worker choice strategies. + */ + private readonly getWorkerWorkerChoiceStrategies = + (): Set => { + return new Set([ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.workerChoiceStrategy!, + ...(this.listTaskFunctionsProperties() + .map( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.strategy + ) + .filter( + (strategy: WorkerChoiceStrategy | undefined) => strategy != null + ) as WorkerChoiceStrategy[]) + ]) + } + /** @inheritDoc */ public async setDefaultTaskFunction (name: string): Promise { return await this.sendTaskFunctionOperationToWorkers({ diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts index bed0d624..d1e60e03 100644 --- a/src/pools/selection-strategies/selection-strategies-utils.ts +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -17,10 +17,6 @@ import { import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js' import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js' -const clone = (object: T): T => { - return structuredClone(object) -} - const estimatedCpuSpeed = (): number => { const runs = 150000000 const begin = performance.now() @@ -90,7 +86,7 @@ export const buildWorkerChoiceStrategyOptions = < pool: IPool, opts?: WorkerChoiceStrategyOptions ): WorkerChoiceStrategyOptions => { - opts = clone(opts ?? {}) + opts = structuredClone(opts ?? {}) opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) return { ...{ diff --git a/src/pools/selection-strategies/worker-choice-strategies-context.ts b/src/pools/selection-strategies/worker-choice-strategies-context.ts index 6b9d726d..7e633e16 100644 --- a/src/pools/selection-strategies/worker-choice-strategies-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategies-context.ts @@ -79,7 +79,7 @@ export class WorkerChoiceStrategiesContext< } /** - * Gets the active worker choice strategies policy in the context. + * Gets the active worker choice strategies in the context policy. * * @returns The strategies policy. */ @@ -135,8 +135,10 @@ export class WorkerChoiceStrategiesContext< workerChoiceStrategy: WorkerChoiceStrategy, opts?: WorkerChoiceStrategyOptions ): void { - this.defaultWorkerChoiceStrategy = workerChoiceStrategy - this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) + if (workerChoiceStrategy !== this.defaultWorkerChoiceStrategy) { + this.defaultWorkerChoiceStrategy = workerChoiceStrategy + this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) + } } /** @@ -221,6 +223,35 @@ export class WorkerChoiceStrategiesContext< } } + /** + * Synchronizes the active worker choice strategies in the context with the given worker choice strategies. + * + * @param workerChoiceStrategies - The worker choice strategies to synchronize. + * @param opts - The worker choice strategy options. + */ + public syncWorkerChoiceStrategies ( + workerChoiceStrategies: Set, + opts?: WorkerChoiceStrategyOptions + ): void { + for (const workerChoiceStrategy of this.workerChoiceStrategies.keys()) { + if (!workerChoiceStrategies.has(workerChoiceStrategy)) { + this.removeWorkerChoiceStrategy(workerChoiceStrategy) + } + } + for (const workerChoiceStrategy of workerChoiceStrategies) { + if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) { + this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) + } + } + } + + /** + * Adds a worker choice strategy to the context. + * + * @param workerChoiceStrategy - The worker choice strategy to add. + * @param opts - The worker choice strategy options. + * @returns The worker choice strategies. + */ private addWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy, pool: IPool, @@ -240,9 +271,15 @@ export class WorkerChoiceStrategiesContext< return this.workerChoiceStrategies } - // private removeWorkerChoiceStrategy ( - // workerChoiceStrategy: WorkerChoiceStrategy - // ): boolean { - // return this.workerChoiceStrategies.delete(workerChoiceStrategy) - // } + /** + * Removes a worker choice strategy from the context. + * + * @param workerChoiceStrategy - The worker choice strategy to remove. + * @returns `true` if the worker choice strategy is removed, `false` otherwise. + */ + private removeWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy + ): boolean { + return this.workerChoiceStrategies.delete(workerChoiceStrategy) + } } diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 36bd8939..3fa17312 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -89,14 +89,14 @@ export const checkDynamicPoolSize = ( export const checkValidPriority = (priority: number | undefined): void => { if (priority != null && !Number.isSafeInteger(priority)) { - throw new TypeError(`Invalid priority '${priority}'`) + throw new TypeError(`Invalid property 'priority': '${priority}'`) } if ( priority != null && Number.isSafeInteger(priority) && (priority < -20 || priority > 19) ) { - throw new RangeError('Property priority must be between -20 and 19') + throw new RangeError("Property 'priority' must be between -20 and 19") } } diff --git a/tests/worker/abstract-worker.test.mjs b/tests/worker/abstract-worker.test.mjs index d94cd440..38eaab5b 100644 --- a/tests/worker/abstract-worker.test.mjs +++ b/tests/worker/abstract-worker.test.mjs @@ -174,13 +174,13 @@ describe('Abstract worker test suite', () => { ) expect( () => new ThreadWorker({ fn1: { taskFunction: fn1, priority: '' } }) - ).toThrow(new TypeError("Invalid priority ''")) + ).toThrow(new TypeError("Invalid property 'priority': ''")) expect( () => new ThreadWorker({ fn1: { taskFunction: fn1, priority: -21 } }) - ).toThrow(new TypeError('Property priority must be between -20 and 19')) + ).toThrow(new TypeError("Property 'priority' must be between -20 and 19")) expect( () => new ThreadWorker({ fn1: { taskFunction: fn1, priority: 20 } }) - ).toThrow(new RangeError('Property priority must be between -20 and 19')) + ).toThrow(new RangeError("Property 'priority' must be between -20 and 19")) expect( () => new ThreadWorker({ -- 2.34.1