From bcfb06ce041a682baf396a099c633a848d6a4045 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 28 Apr 2024 18:24:19 +0200 Subject: [PATCH] feat: add per task function strategy support MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- docs/api.md | 2 +- src/index.ts | 2 +- src/pools/abstract-pool.ts | 110 ++-- src/pools/pool.ts | 8 +- .../abstract-worker-choice-strategy.ts | 6 +- .../selection-strategies-utils.ts | 137 ++++ .../worker-choice-strategies-context.ts | 245 ++++++++ .../worker-choice-strategy-context.ts | 234 ------- src/pools/utils.ts | 99 +-- src/priority-queue.ts | 98 +++ src/utils.ts | 2 +- src/worker/abstract-worker.ts | 6 +- src/worker/utils.ts | 11 +- tests/pools/abstract-pool.test.mjs | 36 +- tests/pools/cluster/dynamic.test.mjs | 5 +- .../selection-strategies.test.mjs | 594 +++++++++--------- .../strategies-utils.test.mjs | 64 ++ .../worker-choice-strategy-context.test.mjs | 374 ++++++----- tests/pools/thread/dynamic.test.mjs | 5 +- tests/pools/utils.test.mjs | 63 +- 20 files changed, 1121 insertions(+), 980 deletions(-) create mode 100644 src/pools/selection-strategies/selection-strategies-utils.ts create mode 100644 src/pools/selection-strategies/worker-choice-strategies-context.ts delete mode 100644 src/pools/selection-strategies/worker-choice-strategy-context.ts create mode 100644 src/priority-queue.ts create mode 100644 tests/pools/selection-strategies/strategies-utils.test.mjs diff --git a/docs/api.md b/docs/api.md index c62e7f50..731104f4 100644 --- a/docs/api.md +++ b/docs/api.md @@ -95,7 +95,7 @@ An object with these properties: - `exitHandler` (optional) - A function that will listen for exit event on each worker. Default: `() => {}` -- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool: +- `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool: - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks diff --git a/src/index.ts b/src/index.ts index 5b0eb909..a4482d58 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,7 +27,7 @@ export { Measurements, WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types.js' -export type { WorkerChoiceStrategyContext } from './pools/selection-strategies/worker-choice-strategy-context.js' +export type { WorkerChoiceStrategiesContext } from './pools/selection-strategies/worker-choice-strategies-context.js' export { DynamicThreadPool } from './pools/thread/dynamic.js' export type { ThreadPoolOptions } from './pools/thread/fixed.js' export { FixedThreadPool } from './pools/thread/fixed.js' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 88c04a85..63f1f859 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -44,7 +44,7 @@ import { type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions } from './selection-strategies/selection-strategies-types.js' -import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, checkValidTasksQueueOptions, @@ -87,7 +87,7 @@ export abstract class AbstractPool< /** * The task execution response promise map: * - `key`: The message id of each submitted task. - * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks. + * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource. * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ @@ -95,9 +95,9 @@ export abstract class AbstractPool< new Map>() /** - * Worker choice strategy context referencing a worker choice algorithm implementation. + * Worker choice strategies context referencing worker choice algorithms implementation. */ - protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext< + protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext< Worker, Data, Response @@ -169,13 +169,14 @@ export abstract class AbstractPool< if (this.opts.enableEvents === true) { this.initializeEventEmitter() } - this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext< + this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< Worker, Data, Response >( this, - this.opts.workerChoiceStrategy, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + [this.opts.workerChoiceStrategy!], this.opts.workerChoiceStrategyOptions ) @@ -296,13 +297,13 @@ export abstract class AbstractPool< started: this.started, ready: this.ready, // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - strategy: this.opts.workerChoiceStrategy!, - strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0, + defaultStrategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0, minSize: this.minimumNumberOfWorkers, maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .runTime.aggregate === true && - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) }), @@ -365,7 +366,7 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .runTime.aggregate === true && { runTime: { minimum: round( @@ -382,7 +383,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -394,7 +395,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -408,7 +409,7 @@ export abstract class AbstractPool< }) } }), - ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .waitTime.aggregate === true && { waitTime: { minimum: round( @@ -425,7 +426,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -437,7 +438,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -548,14 +549,11 @@ export abstract class AbstractPool< ): void { checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategyContext?.setWorkerChoiceStrategy( - this.opts.workerChoiceStrategy + this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( + this.opts.workerChoiceStrategy, + workerChoiceStrategyOptions ) - if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - } - for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { - workerNode.resetUsage() + for (const [workerNodeKey] of this.workerNodes.entries()) { this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -568,7 +566,7 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } - this.workerChoiceStrategyContext?.setOptions( + this.workerChoiceStrategiesContext?.setOptions( this.opts.workerChoiceStrategyOptions ) } @@ -882,6 +880,23 @@ export abstract class AbstractPool< return [] } + /** + * Gets task function strategy, if any. + * + * @param name - The task function name. + * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getTaskFunctionWorkerWorkerChoiceStrategy = ( + name?: string + ): WorkerChoiceStrategy | undefined => { + if (name != null) { + return this.listTaskFunctionsProperties().find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + } + /** @inheritDoc */ public async setDefaultTaskFunction (name: string): Promise { return await this.sendTaskFunctionOperationToWorkers({ @@ -940,7 +955,9 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() + const workerNodeKey = this.chooseWorkerNode( + this.getTaskFunctionWorkerWorkerChoiceStrategy(name) + ) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions @@ -1111,7 +1128,7 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing updateWaitTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, workerUsage, task ) @@ -1129,7 +1146,7 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing updateWaitTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, taskFunctionWorkerUsage, task ) @@ -1153,12 +1170,12 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage updateTaskStatisticsWorkerUsage(workerUsage, message) updateRunTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, workerUsage, message ) updateEluWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, workerUsage, message ) @@ -1178,19 +1195,19 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)! updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) updateRunTimeWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, taskFunctionWorkerUsage, message ) updateEluWorkerUsage( - this.workerChoiceStrategyContext, + this.workerChoiceStrategiesContext, taskFunctionWorkerUsage, message ) needWorkerChoiceStrategyUpdate = true } if (needWorkerChoiceStrategyUpdate) { - this.workerChoiceStrategyContext?.update(workerNodeKey) + this.workerChoiceStrategiesContext?.update(workerNodeKey) } } @@ -1212,22 +1229,23 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the tasks. - * + * @param workerChoiceStrategy - The worker choice strategy. * @returns The chosen worker node key */ - private chooseWorkerNode (): number { + private chooseWorkerNode ( + workerChoiceStrategy?: WorkerChoiceStrategy + ): number { if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext?.getStrategyPolicy() - .dynamicWorkerUsage === true + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage === + true ) { return workerNodeKey } } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategyContext!.execute() + return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy) } /** @@ -1364,10 +1382,10 @@ export abstract class AbstractPool< const workerNode = this.workerNodes[workerNodeKey] workerNode.info.dynamic = true if ( - this.workerChoiceStrategyContext?.getStrategyPolicy() - .dynamicWorkerReady === true || - this.workerChoiceStrategyContext?.getStrategyPolicy() - .dynamicWorkerUsage === true + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady === + true || + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage === + true ) { workerNode.info.ready = true } @@ -1462,11 +1480,11 @@ export abstract class AbstractPool< this.sendToWorker(workerNodeKey, { statistics: { runTime: - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() .runTime.aggregate ?? false, elu: - this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu - .aggregate ?? false + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate ?? false } }) } @@ -1927,7 +1945,7 @@ export abstract class AbstractPool< const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext?.remove(workerNodeKey) + this.workerChoiceStrategiesContext?.remove(workerNodeKey) } this.checkAndEmitEmptyEvent() } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 9eec76b9..82f3d8d9 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -77,7 +77,7 @@ export interface PoolInfo { readonly worker: WorkerType readonly started: boolean readonly ready: boolean - readonly strategy: WorkerChoiceStrategy + readonly defaultStrategy: WorkerChoiceStrategy readonly strategyRetries: number readonly minSize: number readonly maxSize: number @@ -185,7 +185,7 @@ export interface PoolOptions { */ startWorkers?: boolean /** - * The worker choice strategy to use in this pool. + * The default worker choice strategy to use in this pool. * * @defaultValue WorkerChoiceStrategies.ROUND_ROBIN */ @@ -335,9 +335,9 @@ export interface IPool< */ readonly setDefaultTaskFunction: (name: string) => Promise /** - * Sets the worker choice strategy in this pool. + * Sets the default worker choice strategy in this pool. * - * @param workerChoiceStrategy - The worker choice strategy. + * @param workerChoiceStrategy - The default worker choice strategy. * @param workerChoiceStrategyOptions - The worker choice strategy options. */ readonly setWorkerChoiceStrategy: ( diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 3618944c..eaa03f3d 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -1,8 +1,5 @@ import type { IPool } from '../pool.js' -import { - buildWorkerChoiceStrategyOptions, - DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS -} from '../utils.js' +import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js' import type { IWorker } from '../worker.js' import type { IWorkerChoiceStrategy, @@ -11,6 +8,7 @@ import type { TaskStatisticsRequirements, WorkerChoiceStrategyOptions } from './selection-strategies-types.js' +import { buildWorkerChoiceStrategyOptions } from './selection-strategies-utils.js' /** * Worker choice strategy abstract base class. diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts new file mode 100644 index 00000000..bed0d624 --- /dev/null +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -0,0 +1,137 @@ +import { cpus } from 'node:os' + +import type { IPool } from '../pool.js' +import type { IWorker } from '../worker.js' +import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js' +import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js' +import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js' +import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js' +import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js' +import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js' +import { + type IWorkerChoiceStrategy, + WorkerChoiceStrategies, + type WorkerChoiceStrategy, + type WorkerChoiceStrategyOptions +} from './selection-strategies-types.js' +import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js' +import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js' + +const clone = (object: T): T => { + return structuredClone(object) +} + +const estimatedCpuSpeed = (): number => { + const runs = 150000000 + const begin = performance.now() + // eslint-disable-next-line no-empty + for (let i = runs; i > 0; i--) {} + const end = performance.now() + const duration = end - begin + return Math.trunc(runs / duration / 1000) // in MHz +} + +const getDefaultWorkerWeight = (): number => { + const currentCpus = cpus() + let estCpuSpeed: number | undefined + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) { + estCpuSpeed = estimatedCpuSpeed() + } + let cpusCycleTimeWeight = 0 + for (const cpu of currentCpus) { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (cpu.speed == null || cpu.speed === 0) { + cpu.speed = + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? + estCpuSpeed ?? + 2000 + } + // CPU estimated cycle time + const numberOfDigits = cpu.speed.toString().length - 1 + const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) + cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) + } + return Math.round(cpusCycleTimeWeight / currentCpus.length) +} + +const getDefaultWeights = ( + poolMaxSize: number, + defaultWorkerWeight?: number +): Record => { + defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() + const weights: Record = {} + for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { + weights[workerNodeKey] = defaultWorkerWeight + } + return weights +} + +export const getWorkerChoiceStrategiesRetries = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): number => { + return ( + pool.info.maxSize + + Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length + ) +} + +export const buildWorkerChoiceStrategyOptions = < + Worker extends IWorker, + Data, + Response +>( + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): WorkerChoiceStrategyOptions => { + opts = clone(opts ?? {}) + opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) + return { + ...{ + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false } + }, + ...opts + } +} + +export const getWorkerChoiceStrategy = ( + workerChoiceStrategy: WorkerChoiceStrategy, + pool: IPool, + context: ThisType>, + opts?: WorkerChoiceStrategyOptions +): IWorkerChoiceStrategy => { + switch (workerChoiceStrategy) { + case WorkerChoiceStrategies.ROUND_ROBIN: + return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts) + case WorkerChoiceStrategies.LEAST_USED: + return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts) + case WorkerChoiceStrategies.LEAST_BUSY: + return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts) + case WorkerChoiceStrategies.LEAST_ELU: + return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts) + case WorkerChoiceStrategies.FAIR_SHARE: + return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts) + case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN: + return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))( + pool, + opts + ) + case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN: + return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind( + context + ))(pool, opts) + default: + throw new Error( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Worker choice strategy '${workerChoiceStrategy}' is not valid` + ) + } +} diff --git a/src/pools/selection-strategies/worker-choice-strategies-context.ts b/src/pools/selection-strategies/worker-choice-strategies-context.ts new file mode 100644 index 00000000..2650dd37 --- /dev/null +++ b/src/pools/selection-strategies/worker-choice-strategies-context.ts @@ -0,0 +1,245 @@ +import type { IPool } from '../pool.js' +import type { IWorker } from '../worker.js' +import type { + IWorkerChoiceStrategy, + StrategyPolicy, + TaskStatisticsRequirements, + WorkerChoiceStrategy, + WorkerChoiceStrategyOptions +} from './selection-strategies-types.js' +import { WorkerChoiceStrategies } from './selection-strategies-types.js' +import { + getWorkerChoiceStrategiesRetries, + getWorkerChoiceStrategy +} from './selection-strategies-utils.js' + +/** + * The worker choice strategies context. + * + * @typeParam Worker - Type of worker. + * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. + * @typeParam Response - Type of execution response. This can only be structured-cloneable data. + */ +export class WorkerChoiceStrategiesContext< + Worker extends IWorker, + Data = unknown, + Response = unknown +> { + /** + * The number of worker choice strategies execution retries. + */ + public retriesCount: number + + /** + * The default worker choice strategy in the context. + */ + private defaultWorkerChoiceStrategy: WorkerChoiceStrategy + + /** + * The worker choice strategies registered in the context. + */ + private readonly workerChoiceStrategies: Map< + WorkerChoiceStrategy, + IWorkerChoiceStrategy + > + + /** + * The maximum number of worker choice strategies execution retries. + */ + private readonly retries: number + + /** + * Worker choice strategies context constructor. + * + * @param pool - The pool instance. + * @param workerChoiceStrategies - The worker choice strategies. @defaultValue [WorkerChoiceStrategies.ROUND_ROBIN] + * @param opts - The worker choice strategy options. + */ + public constructor ( + private readonly pool: IPool, + workerChoiceStrategies: WorkerChoiceStrategy[] = [ + WorkerChoiceStrategies.ROUND_ROBIN + ], + opts?: WorkerChoiceStrategyOptions + ) { + this.execute = this.execute.bind(this) + this.defaultWorkerChoiceStrategy = workerChoiceStrategies[0] + this.workerChoiceStrategies = new Map< + WorkerChoiceStrategy, + IWorkerChoiceStrategy + >() + for (const workerChoiceStrategy of workerChoiceStrategies) { + this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) + } + this.retriesCount = 0 + this.retries = getWorkerChoiceStrategiesRetries(this.pool, opts) + } + + /** + * Gets the active worker choice strategies policy in the context. + * + * @returns The strategies policy. + */ + public getPolicy (): StrategyPolicy { + const policies: StrategyPolicy[] = [] + for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { + policies.push(workerChoiceStrategy.strategyPolicy) + } + return { + dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage), + dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady) + } + } + + /** + * Gets the active worker choice strategies in the context task statistics requirements. + * + * @returns The task statistics requirements. + */ + public getTaskStatisticsRequirements (): TaskStatisticsRequirements { + const taskStatisticsRequirements: TaskStatisticsRequirements[] = [] + for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { + taskStatisticsRequirements.push( + workerChoiceStrategy.taskStatisticsRequirements + ) + } + return { + runTime: { + aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate), + average: taskStatisticsRequirements.some(r => r.runTime.average), + median: taskStatisticsRequirements.some(r => r.runTime.median) + }, + waitTime: { + aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate), + average: taskStatisticsRequirements.some(r => r.waitTime.average), + median: taskStatisticsRequirements.some(r => r.waitTime.median) + }, + elu: { + aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate), + average: taskStatisticsRequirements.some(r => r.elu.average), + median: taskStatisticsRequirements.some(r => r.elu.median) + } + } + } + + /** + * Sets the default worker choice strategy to use in the context. + * + * @param workerChoiceStrategy - The default worker choice strategy to set. + * @param opts - The worker choice strategy options. + */ + public setDefaultWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy, + opts?: WorkerChoiceStrategyOptions + ): void { + this.defaultWorkerChoiceStrategy = workerChoiceStrategy + this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts) + } + + /** + * Updates the worker node key in the active worker choice strategies in the context internals. + * + * @returns `true` if the update is successful, `false` otherwise. + */ + public update (workerNodeKey: number): boolean { + const res: boolean[] = [] + for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { + res.push(workerChoiceStrategy.update(workerNodeKey)) + } + return res.every(r => r) + } + + /** + * Executes the worker choice strategy in the context algorithm. + * + * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy + * @returns The key of the worker node. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. + */ + public execute ( + workerChoiceStrategy: WorkerChoiceStrategy = this + .defaultWorkerChoiceStrategy + ): number { + return this.executeStrategy( + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.workerChoiceStrategies.get(workerChoiceStrategy)! + ) + } + + /** + * Executes the given worker choice strategy. + * + * @param workerChoiceStrategy - The worker choice strategy. + * @returns The key of the worker node. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. + */ + private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number { + let workerNodeKey: number | undefined + let chooseCount = 0 + let retriesCount = 0 + do { + workerNodeKey = workerChoiceStrategy.choose() + if (workerNodeKey == null && chooseCount > 0) { + ++retriesCount + ++this.retriesCount + } + ++chooseCount + } while (workerNodeKey == null && retriesCount < this.retries) + if (workerNodeKey == null) { + throw new Error( + `Worker node key chosen is null or undefined after ${retriesCount} retries` + ) + } + return workerNodeKey + } + + /** + * Removes the worker node key from the active worker choice strategies in the context. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the removal is successful, `false` otherwise. + */ + public remove (workerNodeKey: number): boolean { + const res: boolean[] = [] + for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { + res.push(workerChoiceStrategy.remove(workerNodeKey)) + } + return res.every(r => r) + } + + /** + * Sets the active worker choice strategies in the context options. + * + * @param opts - The worker choice strategy options. + */ + public setOptions (opts: WorkerChoiceStrategyOptions | undefined): void { + for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { + workerChoiceStrategy.setOptions(opts) + } + } + + private addWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy, + pool: IPool, + opts?: WorkerChoiceStrategyOptions + ): Map { + if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) { + return this.workerChoiceStrategies.set( + workerChoiceStrategy, + getWorkerChoiceStrategy( + workerChoiceStrategy, + pool, + this, + opts + ) + ) + } + return this.workerChoiceStrategies + } + + // private removeWorkerChoiceStrategy ( + // workerChoiceStrategy: WorkerChoiceStrategy + // ): boolean { + // return this.workerChoiceStrategies.delete(workerChoiceStrategy) + // } +} diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts deleted file mode 100644 index f9ddb3a1..00000000 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ /dev/null @@ -1,234 +0,0 @@ -import type { IPool } from '../pool.js' -import { getWorkerChoiceStrategyRetries } from '../utils.js' -import type { IWorker } from '../worker.js' -import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js' -import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js' -import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js' -import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js' -import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js' -import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js' -import type { - IWorkerChoiceStrategy, - StrategyPolicy, - TaskStatisticsRequirements, - WorkerChoiceStrategy, - WorkerChoiceStrategyOptions -} from './selection-strategies-types.js' -import { WorkerChoiceStrategies } from './selection-strategies-types.js' -import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js' - -/** - * The worker choice strategy context. - * - * @typeParam Worker - Type of worker. - * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data. - * @typeParam Response - Type of execution response. This can only be structured-cloneable data. - */ -export class WorkerChoiceStrategyContext< - Worker extends IWorker, - Data = unknown, - Response = unknown -> { - /** - * The number of worker choice strategy execution retries. - */ - public retriesCount: number - - /** - * The worker choice strategy instances registered in the context. - */ - private readonly workerChoiceStrategies: Map< - WorkerChoiceStrategy, - IWorkerChoiceStrategy - > - - /** - * The maximum number of worker choice strategy execution retries. - */ - private readonly retries: number - - /** - * Worker choice strategy context constructor. - * - * @param pool - The pool instance. - * @param workerChoiceStrategy - The worker choice strategy. - * @param opts - The worker choice strategy options. - */ - public constructor ( - pool: IPool, - private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN, - opts?: WorkerChoiceStrategyOptions - ) { - this.execute = this.execute.bind(this) - this.workerChoiceStrategies = new Map< - WorkerChoiceStrategy, - IWorkerChoiceStrategy - >([ - [ - WorkerChoiceStrategies.ROUND_ROBIN, - new (RoundRobinWorkerChoiceStrategy.bind(this))( - pool, - opts - ) - ], - [ - WorkerChoiceStrategies.LEAST_USED, - new (LeastUsedWorkerChoiceStrategy.bind(this))( - pool, - opts - ) - ], - [ - WorkerChoiceStrategies.LEAST_BUSY, - new (LeastBusyWorkerChoiceStrategy.bind(this))( - pool, - opts - ) - ], - [ - WorkerChoiceStrategies.LEAST_ELU, - new (LeastEluWorkerChoiceStrategy.bind(this))( - pool, - opts - ) - ], - [ - WorkerChoiceStrategies.FAIR_SHARE, - new (FairShareWorkerChoiceStrategy.bind(this))( - pool, - opts - ) - ], - [ - WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN, - new (WeightedRoundRobinWorkerChoiceStrategy.bind(this))< - Worker, - Data, - Response - >(pool, opts) - ], - [ - WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN, - new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))< - Worker, - Data, - Response - >(pool, opts) - ] - ]) - this.retriesCount = 0 - this.retries = getWorkerChoiceStrategyRetries(pool, opts) - } - - /** - * Gets the strategy policy in the context. - * - * @returns The strategy policy. - */ - public getStrategyPolicy (): StrategyPolicy { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategies.get(this.workerChoiceStrategy)! - .strategyPolicy - } - - /** - * Gets the worker choice strategy in the context task statistics requirements. - * - * @returns The task statistics requirements. - */ - public getTaskStatisticsRequirements (): TaskStatisticsRequirements { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategies.get(this.workerChoiceStrategy)! - .taskStatisticsRequirements - } - - /** - * Sets the worker choice strategy to use in the context. - * - * @param workerChoiceStrategy - The worker choice strategy to set. - */ - public setWorkerChoiceStrategy ( - workerChoiceStrategy: WorkerChoiceStrategy - ): void { - if (this.workerChoiceStrategy !== workerChoiceStrategy) { - this.workerChoiceStrategy = workerChoiceStrategy - } - this.workerChoiceStrategies.get(this.workerChoiceStrategy)?.reset() - } - - /** - * Updates the worker node key in the worker choice strategy in the context internals. - * - * @returns `true` if the update is successful, `false` otherwise. - */ - public update (workerNodeKey: number): boolean { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategies - .get(this.workerChoiceStrategy)! - .update(workerNodeKey) - } - - /** - * Executes the worker choice strategy in the context algorithm. - * - * @returns The key of the worker node. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. - */ - public execute (): number { - return this.executeStrategy( - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.workerChoiceStrategies.get(this.workerChoiceStrategy)! - ) - } - - /** - * Executes the given worker choice strategy. - * - * @param workerChoiceStrategy - The worker choice strategy. - * @returns The key of the worker node. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined. - */ - private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number { - let workerNodeKey: number | undefined - let chooseCount = 0 - let retriesCount = 0 - do { - workerNodeKey = workerChoiceStrategy.choose() - if (workerNodeKey == null && chooseCount > 0) { - ++retriesCount - ++this.retriesCount - } - ++chooseCount - } while (workerNodeKey == null && retriesCount < this.retries) - if (workerNodeKey == null) { - throw new Error( - `Worker node key chosen is null or undefined after ${retriesCount} retries` - ) - } - return workerNodeKey - } - - /** - * Removes the worker node key from the worker choice strategy in the context. - * - * @param workerNodeKey - The worker node key. - * @returns `true` if the removal is successful, `false` otherwise. - */ - public remove (workerNodeKey: number): boolean { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return this.workerChoiceStrategies - .get(this.workerChoiceStrategy)! - .remove(workerNodeKey) - } - - /** - * Sets the worker choice strategies in the context options. - * - * @param opts - The worker choice strategy options. - */ - public setOptions (opts: WorkerChoiceStrategyOptions | undefined): void { - for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) { - workerChoiceStrategy.setOptions(opts) - } - } -} diff --git a/src/pools/utils.ts b/src/pools/utils.ts index 724c4ab9..6a6c1fa9 100644 --- a/src/pools/utils.ts +++ b/src/pools/utils.ts @@ -1,6 +1,5 @@ import cluster, { Worker as ClusterWorker } from 'node:cluster' import { existsSync } from 'node:fs' -import { cpus } from 'node:os' import { env } from 'node:process' import { SHARE_ENV, @@ -10,14 +9,13 @@ import { import type { MessageValue, Task } from '../utility-types.js' import { average, isPlainObject, max, median, min } from '../utils.js' -import type { IPool, TasksQueueOptions } from './pool.js' +import type { TasksQueueOptions } from './pool.js' import { type MeasurementStatisticsRequirements, WorkerChoiceStrategies, - type WorkerChoiceStrategy, - type WorkerChoiceStrategyOptions + type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types.js' -import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { type IWorker, type IWorkerNode, @@ -50,91 +48,6 @@ export const getDefaultTasksQueueOptions = ( } } -export const getWorkerChoiceStrategyRetries = < - Worker extends IWorker, - Data, - Response ->( - pool: IPool, - opts?: WorkerChoiceStrategyOptions - ): number => { - return ( - pool.info.maxSize + - Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length - ) -} - -export const buildWorkerChoiceStrategyOptions = < - Worker extends IWorker, - Data, - Response ->( - pool: IPool, - opts?: WorkerChoiceStrategyOptions - ): WorkerChoiceStrategyOptions => { - opts = clone(opts ?? {}) - opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) - return { - ...{ - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false } - }, - ...opts - } -} - -const clone = (object: T): T => { - return structuredClone(object) -} - -const getDefaultWeights = ( - poolMaxSize: number, - defaultWorkerWeight?: number -): Record => { - defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() - const weights: Record = {} - for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { - weights[workerNodeKey] = defaultWorkerWeight - } - return weights -} - -const estimatedCpuSpeed = (): number => { - const runs = 150000000 - const begin = performance.now() - // eslint-disable-next-line no-empty - for (let i = runs; i > 0; i--) {} - const end = performance.now() - const duration = end - begin - return Math.trunc(runs / duration / 1000) // in MHz -} - -const getDefaultWorkerWeight = (): number => { - const currentCpus = cpus() - let estCpuSpeed: number | undefined - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) { - estCpuSpeed = estimatedCpuSpeed() - } - let cpusCycleTimeWeight = 0 - for (const cpu of currentCpus) { - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (cpu.speed == null || cpu.speed === 0) { - cpu.speed = - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? - estCpuSpeed ?? - 2000 - } - // CPU estimated cycle time - const numberOfDigits = cpu.speed.toString().length - 1 - const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) - cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) - } - return Math.round(cpusCycleTimeWeight / currentCpus.length) -} - export const checkFilePath = (filePath: string | undefined): void => { if (filePath == null) { throw new TypeError('The worker file path must be specified') @@ -317,7 +230,7 @@ export const updateWaitTimeWorkerUsage = < Response = unknown >( workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, task: Task @@ -356,7 +269,7 @@ export const updateRunTimeWorkerUsage = < Response = unknown >( workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, message: MessageValue @@ -377,7 +290,7 @@ export const updateEluWorkerUsage = < Response = unknown >( workerChoiceStrategyContext: - | WorkerChoiceStrategyContext + | WorkerChoiceStrategiesContext | undefined, workerUsage: WorkerUsage, message: MessageValue diff --git a/src/priority-queue.ts b/src/priority-queue.ts new file mode 100644 index 00000000..55a97169 --- /dev/null +++ b/src/priority-queue.ts @@ -0,0 +1,98 @@ +/** + * @internal + */ +interface PriorityQueueNode { + data: T + priority: number +} + +/** + * Priority queue. + * + * @typeParam T - Type of priority queue data. + * @internal + */ +export class PriorityQueue { + private nodeArray!: Array> + /** The size of the priority queue. */ + public size!: number + /** The maximum size of the priority queue. */ + public maxSize!: number + + public constructor () { + this.clear() + } + + /** + * Enqueue data into the priority queue. + * + * @param data - Data to enqueue. + * @param priority - Priority of the data. Lower values have higher priority. + * @returns The new size of the priority queue. + */ + public enqueue (data: T, priority?: number): number { + priority = priority ?? 0 + let inserted = false + for (const [index, node] of this.nodeArray.entries()) { + if (node.priority > priority) { + this.nodeArray.splice(index, 0, { data, priority }) + inserted = true + break + } + } + if (!inserted) { + this.nodeArray.push({ data, priority }) + } + return this.incrementSize() + } + + /** + * Dequeue data from the priority queue. + * + * @returns The dequeued data or `undefined` if the priority queue is empty. + */ + public dequeue (): T | undefined { + if (this.size > 0) { + --this.size + } + return this.nodeArray.shift()?.data + } + + /** + * Peeks at the first data. + * @returns The first data or `undefined` if the priority queue is empty. + */ + public peekFirst (): T | undefined { + return this.nodeArray[0]?.data + } + + /** + * Peeks at the last data. + * @returns The last data or `undefined` if the priority queue is empty. + */ + public peekLast (): T | undefined { + return this.nodeArray[this.nodeArray.length - 1]?.data + } + + /** + * Clears the priority queue. + */ + public clear (): void { + this.nodeArray = [] + this.size = 0 + this.maxSize = 0 + } + + /** + * Increments the size of the deque. + * + * @returns The new size of the deque. + */ + private incrementSize (): number { + ++this.size + if (this.size > this.maxSize) { + this.maxSize = this.size + } + return this.size + } +} diff --git a/src/utils.ts b/src/utils.ts index 7e1c8ef8..dd839bc4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -207,7 +207,7 @@ export const max = (...args: number[]): number => * @internal */ // eslint-disable-next-line @typescript-eslint/no-explicit-any -export const once = ( +export const once = >( fn: (...args: A) => R, context: C ): ((...args: A) => R) => { diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index f328bcfb..2a5f1121 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -26,7 +26,7 @@ import type { } from './task-functions.js' import { checkTaskFunctionName, - checkValidTaskFunctionEntry, + checkValidTaskFunctionObjectEntry, checkValidWorkerOptions } from './utils.js' import { KillBehaviors, type WorkerOptions } from './worker-options.js' @@ -145,7 +145,7 @@ export abstract class AbstractWorker< Response > } - checkValidTaskFunctionEntry(name, fnObj) + checkValidTaskFunctionObjectEntry(name, fnObj) fnObj.taskFunction = fnObj.taskFunction.bind(this) if (firstEntry) { this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj) @@ -200,7 +200,7 @@ export abstract class AbstractWorker< if (typeof fn === 'function') { fn = { taskFunction: fn } satisfies TaskFunctionObject } - checkValidTaskFunctionEntry(name, fn) + checkValidTaskFunctionObjectEntry(name, fn) fn.taskFunction = fn.taskFunction.bind(this) if ( this.taskFunctions.get(name) === diff --git a/src/worker/utils.ts b/src/worker/utils.ts index 486d92e7..b5c1725a 100644 --- a/src/worker/utils.ts +++ b/src/worker/utils.ts @@ -33,10 +33,13 @@ export const checkValidWorkerOptions = ( } } -export const checkValidTaskFunctionEntry = ( - name: string, - fnObj: TaskFunctionObject -): void => { +export const checkValidTaskFunctionObjectEntry = < + Data = unknown, + Response = unknown +>( + name: string, + fnObj: TaskFunctionObject + ): void => { if (typeof name !== 'string') { throw new TypeError('A taskFunctions parameter object key is not a string') } diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index 50656e85..ba328b68 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -232,7 +232,7 @@ describe('Abstract pool test suite', () => { enableTasksQueue: false, workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -288,7 +288,7 @@ describe('Abstract pool test suite', () => { errorHandler: testHandler, exitHandler: testHandler }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -447,7 +447,7 @@ describe('Abstract pool test suite', () => { { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE } ) expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined() - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -460,7 +460,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -486,7 +486,7 @@ describe('Abstract pool test suite', () => { runTime: { median: true }, elu: { median: true } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: true }, @@ -499,7 +499,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -525,7 +525,7 @@ describe('Abstract pool test suite', () => { runTime: { median: false }, elu: { median: false } }) - for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext + for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext .workerChoiceStrategies) { expect(workerChoiceStrategy.opts).toStrictEqual({ runTime: { median: false }, @@ -538,7 +538,7 @@ describe('Abstract pool test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -706,7 +706,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: 0, minSize: numberOfWorkers, maxSize: numberOfWorkers, @@ -729,7 +729,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: 0, minSize: Math.floor(numberOfWorkers / 2), maxSize: numberOfWorkers, @@ -975,7 +975,7 @@ describe('Abstract pool test suite', () => { await pool.destroy() }) - it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => { + it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => { const pool = new DynamicThreadPool( Math.floor(numberOfWorkers / 2), numberOfWorkers, @@ -1026,7 +1026,7 @@ describe('Abstract pool test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.usage).toStrictEqual({ tasks: { - executed: 0, + executed: expect.any(Number), executing: 0, queued: 0, maxQueued: 0, @@ -1049,6 +1049,10 @@ describe('Abstract pool test suite', () => { } } }) + expect(workerNode.usage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual( + numberOfWorkers * maxMultiplier + ) expect(workerNode.usage.runTime.history.length).toBe(0) expect(workerNode.usage.waitTime.history.length).toBe(0) expect(workerNode.usage.elu.idle.history.length).toBe(0) @@ -1079,7 +1083,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.cluster, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1120,7 +1124,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1160,7 +1164,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), @@ -1203,7 +1207,7 @@ describe('Abstract pool test suite', () => { worker: WorkerTypes.thread, started: true, ready: true, - strategy: WorkerChoiceStrategies.ROUND_ROBIN, + defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN, strategyRetries: expect.any(Number), minSize: expect.any(Number), maxSize: expect.any(Number), diff --git a/tests/pools/cluster/dynamic.test.mjs b/tests/pools/cluster/dynamic.test.mjs index a60d95fa..166498f9 100644 --- a/tests/pools/cluster/dynamic.test.mjs +++ b/tests/pools/cluster/dynamic.test.mjs @@ -119,8 +119,9 @@ describe('Dynamic cluster pool test suite', () => { await waitWorkerEvents(longRunningPool, 'exit', max - min) expect(longRunningPool.workerNodes.length).toBe(min) expect( - longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get( - longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy + longRunningPool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + longRunningPool.workerChoiceStrategiesContext + .defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBeLessThan(longRunningPool.workerNodes.length) // We need to clean up the resources after our test diff --git a/tests/pools/selection-strategies/selection-strategies.test.mjs b/tests/pools/selection-strategies/selection-strategies.test.mjs index 5241168c..0e7f19fb 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.mjs +++ b/tests/pools/selection-strategies/selection-strategies.test.mjs @@ -1,3 +1,5 @@ +import { randomInt } from 'node:crypto' + import { expect } from 'expect' import { CircularArray } from '../../../lib/circular-array.cjs' @@ -36,6 +38,9 @@ describe('Selection strategies test suite', () => { expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) + expect(pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe( + WorkerChoiceStrategies.ROUND_ROBIN + ) // We need to clean up the resources after our test await pool.destroy() }) @@ -48,9 +53,9 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) + expect( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).toBe(workerChoiceStrategy) await pool.destroy() } }) @@ -64,9 +69,9 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) + expect( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).toBe(workerChoiceStrategy) await pool.destroy() } for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { @@ -77,26 +82,27 @@ describe('Selection strategies test suite', () => { ) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy) - expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) + expect( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).toBe(workerChoiceStrategy) await pool.destroy() } }) it('Verify available strategies default internals at pool creation', async () => { - const pool = new FixedThreadPool( - max, - './tests/worker-files/thread/testWorker.mjs' - ) for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.mjs', + { workerChoiceStrategy } + ) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).nextWorkerNodeKey ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).previousWorkerNodeKey ).toBe(0) @@ -104,7 +110,7 @@ describe('Selection strategies test suite', () => { workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN ) { expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).workerNodeVirtualTaskRunTime ).toBe(0) @@ -113,35 +119,35 @@ describe('Selection strategies test suite', () => { WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN ) { expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).workerNodeVirtualTaskRunTime ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).roundId ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).workerNodeId ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).roundWeights.length ).toBe(1) expect( Number.isSafeInteger( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( workerChoiceStrategy ).roundWeights[0] ) ).toBe(true) } + await pool.destroy() } - await pool.destroy() }) it('Verify ROUND_ROBIN strategy default policy', async () => { @@ -151,7 +157,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -162,7 +168,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -178,7 +184,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: false, @@ -204,7 +210,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: false, @@ -268,13 +274,13 @@ describe('Selection strategies test suite', () => { }) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toBe(pool.workerNodes.length - 1) // We need to clean up the resources after our test @@ -328,13 +334,13 @@ describe('Selection strategies test suite', () => { ) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toBe(pool.workerNodes.length - 1) // We need to clean up the resources after our test @@ -367,62 +373,54 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => { + it("Verify ROUND_ROBIN strategy internals aren't reset after setting it", async () => { const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, './tests/worker-files/thread/testWorker.mjs', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).previousWorkerNodeKey - ).toBeDefined() + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).nextWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).previousWorkerNodeKey = randomInt(1, max - 1) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) await pool.destroy() pool = new DynamicThreadPool( min, max, './tests/worker-files/thread/testWorker.mjs', - { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN } + { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).nextWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy - ).previousWorkerNodeKey - ).toBeDefined() + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).nextWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ).previousWorkerNodeKey = randomInt(1, max - 1) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) // We need to clean up the resources after our test await pool.destroy() }) @@ -434,7 +432,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -445,7 +443,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -461,7 +459,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: false, @@ -487,7 +485,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: false, @@ -554,13 +552,13 @@ describe('Selection strategies test suite', () => { ) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -613,13 +611,13 @@ describe('Selection strategies test suite', () => { ) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -633,7 +631,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -644,7 +642,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -660,7 +658,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -686,7 +684,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -763,13 +761,13 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -832,13 +830,13 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -852,7 +850,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -863,7 +861,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -879,7 +877,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: false, @@ -905,7 +903,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: false, @@ -988,13 +986,13 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -1063,13 +1061,13 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -1083,7 +1081,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -1094,7 +1092,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -1110,7 +1108,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -1136,7 +1134,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -1230,13 +1228,13 @@ describe('Selection strategies test suite', () => { expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -1316,13 +1314,13 @@ describe('Selection strategies test suite', () => { expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test @@ -1407,20 +1405,20 @@ describe('Selection strategies test suite', () => { expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(expect.any(Number)) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(expect.any(Number)) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => { + it("Verify FAIR_SHARE strategy internals aren't reset after setting it", async () => { const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, @@ -1433,7 +1431,7 @@ describe('Selection strategies test suite', () => { } pool.setWorkerChoiceStrategy(workerChoiceStrategy) for (const workerNode of pool.workerNodes) { - expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined() + expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0) } await pool.destroy() pool = new DynamicThreadPool( @@ -1448,7 +1446,7 @@ describe('Selection strategies test suite', () => { } pool.setWorkerChoiceStrategy(workerChoiceStrategy) for (const workerNode of pool.workerNodes) { - expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined() + expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0) } // We need to clean up the resources after our test await pool.destroy() @@ -1461,7 +1459,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -1472,7 +1470,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -1488,7 +1486,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -1514,7 +1512,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -1591,18 +1589,18 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeVirtualTaskRunTime ).toBeGreaterThanOrEqual(0) // We need to clean up the resources after our test @@ -1665,18 +1663,18 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeVirtualTaskRunTime ).toBeGreaterThanOrEqual(0) // We need to clean up the resources after our test @@ -1744,98 +1742,88 @@ describe('Selection strategies test suite', () => { } } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeVirtualTaskRunTime ).toBeGreaterThanOrEqual(0) // We need to clean up the resources after our test await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + it("Verify WEIGHTED_ROUND_ROBIN strategy internals aren't reset after setting it", async () => { const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, - './tests/worker-files/thread/testWorker.mjs' + './tests/worker-files/thread/testWorker.mjs', + { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).previousWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).workerNodeVirtualTaskRunTime - ).toBeDefined() + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeVirtualTaskRunTime = randomInt(100, 1000) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeVirtualTaskRunTime - ).toBe(0) + ).toBeGreaterThan(99) await pool.destroy() pool = new DynamicThreadPool( min, max, - './tests/worker-files/thread/testWorker.mjs' + './tests/worker-files/thread/testWorker.mjs', + { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).previousWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).workerNodeVirtualTaskRunTime - ).toBeDefined() + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeVirtualTaskRunTime = randomInt(100, 1000) pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeVirtualTaskRunTime - ).toBe(0) + ).toBeGreaterThan(99) // We need to clean up the resources after our test await pool.destroy() }) @@ -1848,7 +1836,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -1859,7 +1847,7 @@ describe('Selection strategies test suite', () => { './tests/worker-files/thread/testWorker.mjs', { workerChoiceStrategy } ) - expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({ dynamicWorkerUsage: false, dynamicWorkerReady: true }) @@ -1876,7 +1864,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -1902,7 +1890,7 @@ describe('Selection strategies test suite', () => { { workerChoiceStrategy } ) expect( - pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements() ).toStrictEqual({ runTime: { aggregate: true, @@ -1972,34 +1960,34 @@ describe('Selection strategies test suite', () => { ) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundId ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeId ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights.length ).toBe(1) expect( Number.isSafeInteger( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights[0] ) ).toBe(true) @@ -2056,34 +2044,34 @@ describe('Selection strategies test suite', () => { ) } expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundId ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeId ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBe(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey ).toEqual(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights.length ).toBe(1) expect( Number.isSafeInteger( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights[0] ) ).toBe(true) @@ -2091,68 +2079,59 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => { + it("Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals aren't resets after setting it", async () => { const workerChoiceStrategy = WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, - './tests/worker-files/thread/testWorker.mjs' + './tests/worker-files/thread/testWorker.mjs', + { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).roundId - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).workerNodeId - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).previousWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).roundWeights - ).toBeDefined() + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundId = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)] pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundId - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeId - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights.length - ).toBe(1) + ).toBeGreaterThan(1) expect( Number.isSafeInteger( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights[0] ) ).toBe(true) @@ -2160,63 +2139,54 @@ describe('Selection strategies test suite', () => { pool = new DynamicThreadPool( min, max, - './tests/worker-files/thread/testWorker.mjs' + './tests/worker-files/thread/testWorker.mjs', + { workerChoiceStrategy } ) - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).roundId - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).workerNodeId - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).nextWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).previousWorkerNodeKey - ).toBeDefined() - expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy - ).roundWeights - ).toBeDefined() + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundId = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).workerNodeId = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).nextWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).previousWorkerNodeKey = randomInt(1, max - 1) + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategy + ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)] pool.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundId - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).workerNodeId - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).nextWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).previousWorkerNodeKey - ).toBe(0) + ).toBeGreaterThan(0) expect( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights.length - ).toBe(1) + ).toBeGreaterThan(1) expect( Number.isSafeInteger( - pool.workerChoiceStrategyContext.workerChoiceStrategies.get( - pool.workerChoiceStrategyContext.workerChoiceStrategy + pool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).roundWeights[0] ) ).toBe(true) diff --git a/tests/pools/selection-strategies/strategies-utils.test.mjs b/tests/pools/selection-strategies/strategies-utils.test.mjs new file mode 100644 index 00000000..dc6fc71f --- /dev/null +++ b/tests/pools/selection-strategies/strategies-utils.test.mjs @@ -0,0 +1,64 @@ +import { expect } from 'expect' + +import { FixedClusterPool, FixedThreadPool } from '../../../lib/index.cjs' +import { + buildWorkerChoiceStrategyOptions, + getWorkerChoiceStrategiesRetries +} from '../../../lib/pools/selection-strategies/selection-strategies-utils.cjs' + +describe('Selection strategies utils test suite', () => { + it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => { + const numberOfWorkers = 4 + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.cjs' + ) + expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({ + runTime: { median: false }, + waitTime: { median: false }, + elu: { median: false }, + weights: expect.objectContaining({ + 0: expect.any(Number), + [pool.info.maxSize - 1]: expect.any(Number) + }) + }) + const workerChoiceStrategyOptions = { + runTime: { median: true }, + waitTime: { median: true }, + elu: { median: true }, + weights: { + 0: 100, + 1: 100 + } + } + expect( + buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions) + ).toStrictEqual(workerChoiceStrategyOptions) + await pool.destroy() + }) + + it('Verify getWorkerChoiceStrategyRetries() behavior', async () => { + const numberOfThreads = 4 + const pool = new FixedThreadPool( + numberOfThreads, + './tests/worker-files/thread/testWorker.mjs' + ) + expect(getWorkerChoiceStrategiesRetries(pool)).toBe(pool.info.maxSize * 2) + const workerChoiceStrategyOptions = { + runTime: { median: true }, + waitTime: { median: true }, + elu: { median: true }, + weights: { + 0: 100, + 1: 100 + } + } + expect( + getWorkerChoiceStrategiesRetries(pool, workerChoiceStrategyOptions) + ).toBe( + pool.info.maxSize + + Object.keys(workerChoiceStrategyOptions.weights).length + ) + await pool.destroy() + }) +}) diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs index c591235c..83649cc7 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.mjs @@ -13,7 +13,7 @@ import { LeastEluWorkerChoiceStrategy } from '../../../lib/pools/selection-strat import { LeastUsedWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-used-worker-choice-strategy.cjs' import { RoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/round-robin-worker-choice-strategy.cjs' import { WeightedRoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.cjs' -import { WorkerChoiceStrategyContext } from '../../../lib/pools/selection-strategies/worker-choice-strategy-context.cjs' +import { WorkerChoiceStrategiesContext } from '../../../lib/pools/selection-strategies/worker-choice-strategies-context.cjs' describe('Worker choice strategy context test suite', () => { const min = 1 @@ -41,31 +41,47 @@ describe('Worker choice strategy context test suite', () => { await dynamicPool.destroy() }) - it('Verify that constructor() initializes the context with all the available worker choice strategies', () => { - let workerChoiceStrategyContext = new WorkerChoiceStrategyContext(fixedPool) - expect(workerChoiceStrategyContext.workerChoiceStrategies.size).toBe( - Object.keys(WorkerChoiceStrategies).length + it('Verify that constructor() initializes the context with the default choice strategy', () => { + let workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( + fixedPool ) - workerChoiceStrategyContext = new WorkerChoiceStrategyContext(dynamicPool) - expect(workerChoiceStrategyContext.workerChoiceStrategies.size).toBe( - Object.keys(WorkerChoiceStrategies).length + expect(workerChoiceStrategiesContext.workerChoiceStrategies.size).toBe(1) + expect( + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ) + ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) + workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( + dynamicPool ) + expect(workerChoiceStrategiesContext.workerChoiceStrategies.size).toBe(1) + expect( + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy + ) + ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) }) it('Verify that constructor() initializes the context with retries attribute properly set', () => { - let workerChoiceStrategyContext = new WorkerChoiceStrategyContext(fixedPool) - expect(workerChoiceStrategyContext.retries).toBe(fixedPool.info.maxSize * 2) - workerChoiceStrategyContext = new WorkerChoiceStrategyContext(dynamicPool) - expect(workerChoiceStrategyContext.retries).toBe( + let workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( + fixedPool + ) + expect(workerChoiceStrategiesContext.retries).toBe( + fixedPool.info.maxSize * 2 + ) + workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( + dynamicPool + ) + expect(workerChoiceStrategiesContext.retries).toBe( dynamicPool.info.maxSize * 2 ) }) it('Verify that execute() throws error if null or undefined is returned after retries', () => { - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) const workerChoiceStrategyUndefinedStub = createStubInstance( @@ -74,13 +90,13 @@ describe('Worker choice strategy context test suite', () => { choose: stub().returns(undefined) } ) - workerChoiceStrategyContext.workerChoiceStrategies.set( - workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategiesContext.workerChoiceStrategies.set( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, workerChoiceStrategyUndefinedStub ) - expect(() => workerChoiceStrategyContext.execute()).toThrow( + expect(() => workerChoiceStrategiesContext.execute()).toThrow( new Error( - `Worker node key chosen is null or undefined after ${workerChoiceStrategyContext.retries} retries` + `Worker node key chosen is null or undefined after ${workerChoiceStrategiesContext.retries} retries` ) ) const workerChoiceStrategyNullStub = createStubInstance( @@ -89,19 +105,19 @@ describe('Worker choice strategy context test suite', () => { choose: stub().returns(null) } ) - workerChoiceStrategyContext.workerChoiceStrategies.set( - workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategiesContext.workerChoiceStrategies.set( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, workerChoiceStrategyNullStub ) - expect(() => workerChoiceStrategyContext.execute()).toThrow( + expect(() => workerChoiceStrategiesContext.execute()).toThrow( new Error( - `Worker node key chosen is null or undefined after ${workerChoiceStrategyContext.retries} retries` + `Worker node key chosen is null or undefined after ${workerChoiceStrategiesContext.retries} retries` ) ) }) it('Verify that execute() retry until a worker node is chosen', () => { - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) const workerChoiceStrategyStub = createStubInstance( @@ -121,24 +137,24 @@ describe('Worker choice strategy context test suite', () => { .returns(1) } ) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) - workerChoiceStrategyContext.workerChoiceStrategies.set( - workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategiesContext.workerChoiceStrategies.set( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, workerChoiceStrategyStub ) - const chosenWorkerKey = workerChoiceStrategyContext.execute() + const chosenWorkerKey = workerChoiceStrategiesContext.execute() expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategyContext.workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).choose.callCount ).toBe(6) expect(chosenWorkerKey).toBe(1) }) it('Verify that execute() return the worker node key chosen by the strategy with fixed pool', () => { - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) const workerChoiceStrategyStub = createStubInstance( @@ -147,24 +163,24 @@ describe('Worker choice strategy context test suite', () => { choose: stub().returns(0) } ) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) - workerChoiceStrategyContext.workerChoiceStrategies.set( - workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategiesContext.workerChoiceStrategies.set( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, workerChoiceStrategyStub ) - const chosenWorkerKey = workerChoiceStrategyContext.execute() + const chosenWorkerKey = workerChoiceStrategiesContext.execute() expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategyContext.workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).choose.calledOnce ).toBe(true) expect(chosenWorkerKey).toBe(0) }) it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => { - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) const workerChoiceStrategyStub = createStubInstance( @@ -173,322 +189,290 @@ describe('Worker choice strategy context test suite', () => { choose: stub().returns(0) } ) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe( WorkerChoiceStrategies.ROUND_ROBIN ) - workerChoiceStrategyContext.workerChoiceStrategies.set( - workerChoiceStrategyContext.workerChoiceStrategy, + workerChoiceStrategiesContext.workerChoiceStrategies.set( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy, workerChoiceStrategyStub ) - const chosenWorkerKey = workerChoiceStrategyContext.execute() + const chosenWorkerKey = workerChoiceStrategiesContext.execute() expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategyContext.workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ).choose.calledOnce ).toBe(true) expect(chosenWorkerKey).toBe(0) }) - it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.ROUND_ROBIN ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.ROUND_ROBIN ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with LEAST_USED and fixed pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_USED and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.LEAST_USED + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(LeastUsedWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with LEAST_USED and dynamic pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_USED and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.LEAST_USED + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(LeastUsedWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with LEAST_BUSY and fixed pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_BUSY and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.LEAST_BUSY + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(LeastBusyWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with LEAST_BUSY and dynamic pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_BUSY and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.LEAST_BUSY + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(LeastBusyWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.LEAST_ELU + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(LeastEluWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.LEAST_ELU + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(LeastEluWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.FAIR_SHARE + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(FairShareWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.FAIR_SHARE + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(FairShareWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => { - const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and fixed pool', () => { - const workerChoiceStrategy = - WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and fixed pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(InterleavedWeightedRoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) - it('Verify that setWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and dynamic pool', () => { - const workerChoiceStrategy = - WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN - const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + it('Verify that setDefaultWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and dynamic pool', () => { + const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool ) - workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy( + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + ) expect( - workerChoiceStrategyContext.workerChoiceStrategies.get( - workerChoiceStrategy + workerChoiceStrategiesContext.workerChoiceStrategies.get( + workerChoiceStrategiesContext.defaultWorkerChoiceStrategy ) ).toBeInstanceOf(InterleavedWeightedRoundRobinWorkerChoiceStrategy) - expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( - workerChoiceStrategy - ) }) it('Verify that worker choice strategy options enable median runtime pool statistics', () => { const wwrWorkerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - let workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + let workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool, - wwrWorkerChoiceStrategy, + [wwrWorkerChoiceStrategy], { runTime: { median: true } } ) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime .average ).toBe(false) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime + .median ).toBe(true) - workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool, - wwrWorkerChoiceStrategy, + [wwrWorkerChoiceStrategy], { runTime: { median: true } } ) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime .average ).toBe(false) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime + .median ).toBe(true) const fsWorkerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE - workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( fixedPool, - fsWorkerChoiceStrategy, + [fsWorkerChoiceStrategy], { runTime: { median: true } } ) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime .average ).toBe(false) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime + .median ).toBe(true) - workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext( dynamicPool, - fsWorkerChoiceStrategy, + [fsWorkerChoiceStrategy], { runTime: { median: true } } ) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime .average ).toBe(false) expect( - workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median + workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime + .median ).toBe(true) }) }) diff --git a/tests/pools/thread/dynamic.test.mjs b/tests/pools/thread/dynamic.test.mjs index 6cefa0aa..22d89223 100644 --- a/tests/pools/thread/dynamic.test.mjs +++ b/tests/pools/thread/dynamic.test.mjs @@ -119,8 +119,9 @@ describe('Dynamic thread pool test suite', () => { await waitWorkerEvents(longRunningPool, 'exit', max - min) expect(longRunningPool.workerNodes.length).toBe(min) expect( - longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get( - longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy + longRunningPool.workerChoiceStrategiesContext.workerChoiceStrategies.get( + longRunningPool.workerChoiceStrategiesContext + .defaultWorkerChoiceStrategy ).nextWorkerNodeKey ).toBeLessThan(longRunningPool.workerNodes.length) // We need to clean up the resources after our test diff --git a/tests/pools/utils.test.mjs b/tests/pools/utils.test.mjs index c6cc06d3..cfa378e1 100644 --- a/tests/pools/utils.test.mjs +++ b/tests/pools/utils.test.mjs @@ -7,17 +7,11 @@ import { CircularArray, DEFAULT_CIRCULAR_ARRAY_SIZE } from '../../lib/circular-array.cjs' +import { WorkerTypes } from '../../lib/index.cjs' import { - FixedClusterPool, - FixedThreadPool, - WorkerTypes -} from '../../lib/index.cjs' -import { - buildWorkerChoiceStrategyOptions, createWorker, DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS, getDefaultTasksQueueOptions, - getWorkerChoiceStrategyRetries, getWorkerId, getWorkerType, updateMeasurementStatistics @@ -43,61 +37,6 @@ describe('Pool utils test suite', () => { }) }) - it('Verify getWorkerChoiceStrategyRetries() behavior', async () => { - const numberOfThreads = 4 - const pool = new FixedThreadPool( - numberOfThreads, - './tests/worker-files/thread/testWorker.mjs' - ) - expect(getWorkerChoiceStrategyRetries(pool)).toBe(pool.info.maxSize * 2) - const workerChoiceStrategyOptions = { - runTime: { median: true }, - waitTime: { median: true }, - elu: { median: true }, - weights: { - 0: 100, - 1: 100 - } - } - expect( - getWorkerChoiceStrategyRetries(pool, workerChoiceStrategyOptions) - ).toBe( - pool.info.maxSize + - Object.keys(workerChoiceStrategyOptions.weights).length - ) - await pool.destroy() - }) - - it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => { - const numberOfWorkers = 4 - const pool = new FixedClusterPool( - numberOfWorkers, - './tests/worker-files/cluster/testWorker.cjs' - ) - expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({ - runTime: { median: false }, - waitTime: { median: false }, - elu: { median: false }, - weights: expect.objectContaining({ - 0: expect.any(Number), - [pool.info.maxSize - 1]: expect.any(Number) - }) - }) - const workerChoiceStrategyOptions = { - runTime: { median: true }, - waitTime: { median: true }, - elu: { median: true }, - weights: { - 0: 100, - 1: 100 - } - } - expect( - buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions) - ).toStrictEqual(workerChoiceStrategyOptions) - await pool.destroy() - }) - it('Verify updateMeasurementStatistics() behavior', () => { const measurementStatistics = { history: new CircularArray() -- 2.34.1