- `exitHandler` (optional) - A function that will listen for exit event on each worker.
Default: `() => {}`
-- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
+- `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
- `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks
Measurements,
WorkerChoiceStrategies
} from './pools/selection-strategies/selection-strategies-types.js'
-export type { WorkerChoiceStrategyContext } from './pools/selection-strategies/worker-choice-strategy-context.js'
+export type { WorkerChoiceStrategiesContext } from './pools/selection-strategies/worker-choice-strategies-context.js'
export { DynamicThreadPool } from './pools/thread/dynamic.js'
export type { ThreadPoolOptions } from './pools/thread/fixed.js'
export { FixedThreadPool } from './pools/thread/fixed.js'
type WorkerChoiceStrategy,
type WorkerChoiceStrategyOptions
} from './selection-strategies/selection-strategies-types.js'
-import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
checkValidTasksQueueOptions,
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
- * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
+ * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
*
* When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
*/
new Map<string, PromiseResponseWrapper<Response>>()
/**
- * Worker choice strategy context referencing a worker choice algorithm implementation.
+ * Worker choice strategies context referencing worker choice algorithms implementation.
*/
- protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
+ protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
Worker,
Data,
Response
if (this.opts.enableEvents === true) {
this.initializeEventEmitter()
}
- this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
+ this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
Worker,
Data,
Response
>(
this,
- this.opts.workerChoiceStrategy,
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ [this.opts.workerChoiceStrategy!],
this.opts.workerChoiceStrategyOptions
)
started: this.started,
ready: this.ready,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- strategy: this.opts.workerChoiceStrategy!,
- strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
+ defaultStrategy: this.opts.workerChoiceStrategy!,
+ strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
minSize: this.minimumNumberOfWorkers,
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.runTime.aggregate === true &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.aggregate && {
utilization: round(this.utilization)
}),
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.runTime.aggregate === true && {
runTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
})
}
}),
- ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.waitTime.aggregate === true && {
waitTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
): void {
checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
- this.opts.workerChoiceStrategy
+ this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
+ this.opts.workerChoiceStrategy,
+ workerChoiceStrategyOptions
)
- if (workerChoiceStrategyOptions != null) {
- this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- }
- for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
- workerNode.resetUsage()
+ for (const [workerNodeKey] of this.workerNodes.entries()) {
this.sendStatisticsMessageToWorker(workerNodeKey)
}
}
if (workerChoiceStrategyOptions != null) {
this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
- this.workerChoiceStrategyContext?.setOptions(
+ this.workerChoiceStrategiesContext?.setOptions(
this.opts.workerChoiceStrategyOptions
)
}
return []
}
+ /**
+ * Gets task function strategy, if any.
+ *
+ * @param name - The task function name.
+ * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
+ */
+ private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
+ name?: string
+ ): WorkerChoiceStrategy | undefined => {
+ if (name != null) {
+ return this.listTaskFunctionsProperties().find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.strategy
+ }
+ }
+
/** @inheritDoc */
public async setDefaultTaskFunction (name: string): Promise<boolean> {
return await this.sendTaskFunctionOperationToWorkers({
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode()
+ const workerNodeKey = this.chooseWorkerNode(
+ this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
+ )
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const workerUsage = this.workerNodes[workerNodeKey].usage
++workerUsage.tasks.executing
updateWaitTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
task
)
].getTaskFunctionWorkerUsage(task.name!)!
++taskFunctionWorkerUsage.tasks.executing
updateWaitTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
task
)
const workerUsage = this.workerNodes[workerNodeKey].usage
updateTaskStatisticsWorkerUsage(workerUsage, message)
updateRunTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
message
)
updateEluWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
workerUsage,
message
)
].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
updateRunTimeWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
message
)
updateEluWorkerUsage(
- this.workerChoiceStrategyContext,
+ this.workerChoiceStrategiesContext,
taskFunctionWorkerUsage,
message
)
needWorkerChoiceStrategyUpdate = true
}
if (needWorkerChoiceStrategyUpdate) {
- this.workerChoiceStrategyContext?.update(workerNodeKey)
+ this.workerChoiceStrategiesContext?.update(workerNodeKey)
}
}
/**
* Chooses a worker node for the next task.
*
- * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
- *
+ * @param workerChoiceStrategy - The worker choice strategy.
* @returns The chosen worker node key
*/
- private chooseWorkerNode (): number {
+ private chooseWorkerNode (
+ workerChoiceStrategy?: WorkerChoiceStrategy
+ ): number {
if (this.shallCreateDynamicWorker()) {
const workerNodeKey = this.createAndSetupDynamicWorkerNode()
if (
- this.workerChoiceStrategyContext?.getStrategyPolicy()
- .dynamicWorkerUsage === true
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
return workerNodeKey
}
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategyContext!.execute()
+ return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
}
/**
const workerNode = this.workerNodes[workerNodeKey]
workerNode.info.dynamic = true
if (
- this.workerChoiceStrategyContext?.getStrategyPolicy()
- .dynamicWorkerReady === true ||
- this.workerChoiceStrategyContext?.getStrategyPolicy()
- .dynamicWorkerUsage === true
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
+ true ||
+ this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
+ true
) {
workerNode.info.ready = true
}
this.sendToWorker(workerNodeKey, {
statistics: {
runTime:
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
.runTime.aggregate ?? false,
elu:
- this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
- .aggregate ?? false
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .elu.aggregate ?? false
}
})
}
const workerNodeKey = this.workerNodes.indexOf(workerNode)
if (workerNodeKey !== -1) {
this.workerNodes.splice(workerNodeKey, 1)
- this.workerChoiceStrategyContext?.remove(workerNodeKey)
+ this.workerChoiceStrategiesContext?.remove(workerNodeKey)
}
this.checkAndEmitEmptyEvent()
}
readonly worker: WorkerType
readonly started: boolean
readonly ready: boolean
- readonly strategy: WorkerChoiceStrategy
+ readonly defaultStrategy: WorkerChoiceStrategy
readonly strategyRetries: number
readonly minSize: number
readonly maxSize: number
*/
startWorkers?: boolean
/**
- * The worker choice strategy to use in this pool.
+ * The default worker choice strategy to use in this pool.
*
* @defaultValue WorkerChoiceStrategies.ROUND_ROBIN
*/
*/
readonly setDefaultTaskFunction: (name: string) => Promise<boolean>
/**
- * Sets the worker choice strategy in this pool.
+ * Sets the default worker choice strategy in this pool.
*
- * @param workerChoiceStrategy - The worker choice strategy.
+ * @param workerChoiceStrategy - The default worker choice strategy.
* @param workerChoiceStrategyOptions - The worker choice strategy options.
*/
readonly setWorkerChoiceStrategy: (
import type { IPool } from '../pool.js'
-import {
- buildWorkerChoiceStrategyOptions,
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
-} from '../utils.js'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import type { IWorker } from '../worker.js'
import type {
IWorkerChoiceStrategy,
TaskStatisticsRequirements,
WorkerChoiceStrategyOptions
} from './selection-strategies-types.js'
+import { buildWorkerChoiceStrategyOptions } from './selection-strategies-utils.js'
/**
* Worker choice strategy abstract base class.
--- /dev/null
+import { cpus } from 'node:os'
+
+import type { IPool } from '../pool.js'
+import type { IWorker } from '../worker.js'
+import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
+import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
+import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
+import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
+import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
+import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
+import {
+ type IWorkerChoiceStrategy,
+ WorkerChoiceStrategies,
+ type WorkerChoiceStrategy,
+ type WorkerChoiceStrategyOptions
+} from './selection-strategies-types.js'
+import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
+import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
+
+const clone = <T>(object: T): T => {
+ return structuredClone<T>(object)
+}
+
+const estimatedCpuSpeed = (): number => {
+ const runs = 150000000
+ const begin = performance.now()
+ // eslint-disable-next-line no-empty
+ for (let i = runs; i > 0; i--) {}
+ const end = performance.now()
+ const duration = end - begin
+ return Math.trunc(runs / duration / 1000) // in MHz
+}
+
+const getDefaultWorkerWeight = (): number => {
+ const currentCpus = cpus()
+ let estCpuSpeed: number | undefined
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
+ estCpuSpeed = estimatedCpuSpeed()
+ }
+ let cpusCycleTimeWeight = 0
+ for (const cpu of currentCpus) {
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (cpu.speed == null || cpu.speed === 0) {
+ cpu.speed =
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
+ estCpuSpeed ??
+ 2000
+ }
+ // CPU estimated cycle time
+ const numberOfDigits = cpu.speed.toString().length - 1
+ const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
+ cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
+ }
+ return Math.round(cpusCycleTimeWeight / currentCpus.length)
+}
+
+const getDefaultWeights = (
+ poolMaxSize: number,
+ defaultWorkerWeight?: number
+): Record<number, number> => {
+ defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
+ const weights: Record<number, number> = {}
+ for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
+ weights[workerNodeKey] = defaultWorkerWeight
+ }
+ return weights
+}
+
+export const getWorkerChoiceStrategiesRetries = <
+ Worker extends IWorker,
+ Data,
+ Response
+>(
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): number => {
+ return (
+ pool.info.maxSize +
+ Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
+ )
+}
+
+export const buildWorkerChoiceStrategyOptions = <
+ Worker extends IWorker,
+ Data,
+ Response
+>(
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): WorkerChoiceStrategyOptions => {
+ opts = clone(opts ?? {})
+ opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
+ return {
+ ...{
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ },
+ ...opts
+ }
+}
+
+export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
+ workerChoiceStrategy: WorkerChoiceStrategy,
+ pool: IPool<Worker, Data, Response>,
+ context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>,
+ opts?: WorkerChoiceStrategyOptions
+): IWorkerChoiceStrategy => {
+ switch (workerChoiceStrategy) {
+ case WorkerChoiceStrategies.ROUND_ROBIN:
+ return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts)
+ case WorkerChoiceStrategies.LEAST_USED:
+ return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts)
+ case WorkerChoiceStrategies.LEAST_BUSY:
+ return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts)
+ case WorkerChoiceStrategies.LEAST_ELU:
+ return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts)
+ case WorkerChoiceStrategies.FAIR_SHARE:
+ return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts)
+ case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
+ return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))(
+ pool,
+ opts
+ )
+ case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN:
+ return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(
+ context
+ ))(pool, opts)
+ default:
+ throw new Error(
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Worker choice strategy '${workerChoiceStrategy}' is not valid`
+ )
+ }
+}
--- /dev/null
+import type { IPool } from '../pool.js'
+import type { IWorker } from '../worker.js'
+import type {
+ IWorkerChoiceStrategy,
+ StrategyPolicy,
+ TaskStatisticsRequirements,
+ WorkerChoiceStrategy,
+ WorkerChoiceStrategyOptions
+} from './selection-strategies-types.js'
+import { WorkerChoiceStrategies } from './selection-strategies-types.js'
+import {
+ getWorkerChoiceStrategiesRetries,
+ getWorkerChoiceStrategy
+} from './selection-strategies-utils.js'
+
+/**
+ * The worker choice strategies context.
+ *
+ * @typeParam Worker - Type of worker.
+ * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
+ * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
+ */
+export class WorkerChoiceStrategiesContext<
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+> {
+ /**
+ * The number of worker choice strategies execution retries.
+ */
+ public retriesCount: number
+
+ /**
+ * The default worker choice strategy in the context.
+ */
+ private defaultWorkerChoiceStrategy: WorkerChoiceStrategy
+
+ /**
+ * The worker choice strategies registered in the context.
+ */
+ private readonly workerChoiceStrategies: Map<
+ WorkerChoiceStrategy,
+ IWorkerChoiceStrategy
+ >
+
+ /**
+ * The maximum number of worker choice strategies execution retries.
+ */
+ private readonly retries: number
+
+ /**
+ * Worker choice strategies context constructor.
+ *
+ * @param pool - The pool instance.
+ * @param workerChoiceStrategies - The worker choice strategies. @defaultValue [WorkerChoiceStrategies.ROUND_ROBIN]
+ * @param opts - The worker choice strategy options.
+ */
+ public constructor (
+ private readonly pool: IPool<Worker, Data, Response>,
+ workerChoiceStrategies: WorkerChoiceStrategy[] = [
+ WorkerChoiceStrategies.ROUND_ROBIN
+ ],
+ opts?: WorkerChoiceStrategyOptions
+ ) {
+ this.execute = this.execute.bind(this)
+ this.defaultWorkerChoiceStrategy = workerChoiceStrategies[0]
+ this.workerChoiceStrategies = new Map<
+ WorkerChoiceStrategy,
+ IWorkerChoiceStrategy
+ >()
+ for (const workerChoiceStrategy of workerChoiceStrategies) {
+ this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+ }
+ this.retriesCount = 0
+ this.retries = getWorkerChoiceStrategiesRetries(this.pool, opts)
+ }
+
+ /**
+ * Gets the active worker choice strategies policy in the context.
+ *
+ * @returns The strategies policy.
+ */
+ public getPolicy (): StrategyPolicy {
+ const policies: StrategyPolicy[] = []
+ for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
+ policies.push(workerChoiceStrategy.strategyPolicy)
+ }
+ return {
+ dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
+ dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
+ }
+ }
+
+ /**
+ * Gets the active worker choice strategies in the context task statistics requirements.
+ *
+ * @returns The task statistics requirements.
+ */
+ public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
+ const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
+ for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
+ taskStatisticsRequirements.push(
+ workerChoiceStrategy.taskStatisticsRequirements
+ )
+ }
+ return {
+ runTime: {
+ aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
+ average: taskStatisticsRequirements.some(r => r.runTime.average),
+ median: taskStatisticsRequirements.some(r => r.runTime.median)
+ },
+ waitTime: {
+ aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
+ average: taskStatisticsRequirements.some(r => r.waitTime.average),
+ median: taskStatisticsRequirements.some(r => r.waitTime.median)
+ },
+ elu: {
+ aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
+ average: taskStatisticsRequirements.some(r => r.elu.average),
+ median: taskStatisticsRequirements.some(r => r.elu.median)
+ }
+ }
+ }
+
+ /**
+ * Sets the default worker choice strategy to use in the context.
+ *
+ * @param workerChoiceStrategy - The default worker choice strategy to set.
+ * @param opts - The worker choice strategy options.
+ */
+ public setDefaultWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy,
+ opts?: WorkerChoiceStrategyOptions
+ ): void {
+ this.defaultWorkerChoiceStrategy = workerChoiceStrategy
+ this.addWorkerChoiceStrategy(workerChoiceStrategy, this.pool, opts)
+ }
+
+ /**
+ * Updates the worker node key in the active worker choice strategies in the context internals.
+ *
+ * @returns `true` if the update is successful, `false` otherwise.
+ */
+ public update (workerNodeKey: number): boolean {
+ const res: boolean[] = []
+ for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
+ res.push(workerChoiceStrategy.update(workerNodeKey))
+ }
+ return res.every(r => r)
+ }
+
+ /**
+ * Executes the worker choice strategy in the context algorithm.
+ *
+ * @param workerChoiceStrategy - The worker choice strategy algorithm to execute. @defaultValue this.defaultWorkerChoiceStrategy
+ * @returns The key of the worker node.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
+ */
+ public execute (
+ workerChoiceStrategy: WorkerChoiceStrategy = this
+ .defaultWorkerChoiceStrategy
+ ): number {
+ return this.executeStrategy(
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ this.workerChoiceStrategies.get(workerChoiceStrategy)!
+ )
+ }
+
+ /**
+ * Executes the given worker choice strategy.
+ *
+ * @param workerChoiceStrategy - The worker choice strategy.
+ * @returns The key of the worker node.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
+ */
+ private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
+ let workerNodeKey: number | undefined
+ let chooseCount = 0
+ let retriesCount = 0
+ do {
+ workerNodeKey = workerChoiceStrategy.choose()
+ if (workerNodeKey == null && chooseCount > 0) {
+ ++retriesCount
+ ++this.retriesCount
+ }
+ ++chooseCount
+ } while (workerNodeKey == null && retriesCount < this.retries)
+ if (workerNodeKey == null) {
+ throw new Error(
+ `Worker node key chosen is null or undefined after ${retriesCount} retries`
+ )
+ }
+ return workerNodeKey
+ }
+
+ /**
+ * Removes the worker node key from the active worker choice strategies in the context.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @returns `true` if the removal is successful, `false` otherwise.
+ */
+ public remove (workerNodeKey: number): boolean {
+ const res: boolean[] = []
+ for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
+ res.push(workerChoiceStrategy.remove(workerNodeKey))
+ }
+ return res.every(r => r)
+ }
+
+ /**
+ * Sets the active worker choice strategies in the context options.
+ *
+ * @param opts - The worker choice strategy options.
+ */
+ public setOptions (opts: WorkerChoiceStrategyOptions | undefined): void {
+ for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
+ workerChoiceStrategy.setOptions(opts)
+ }
+ }
+
+ private addWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy,
+ pool: IPool<Worker, Data, Response>,
+ opts?: WorkerChoiceStrategyOptions
+ ): Map<WorkerChoiceStrategy, IWorkerChoiceStrategy> {
+ if (!this.workerChoiceStrategies.has(workerChoiceStrategy)) {
+ return this.workerChoiceStrategies.set(
+ workerChoiceStrategy,
+ getWorkerChoiceStrategy<Worker, Data, Response>(
+ workerChoiceStrategy,
+ pool,
+ this,
+ opts
+ )
+ )
+ }
+ return this.workerChoiceStrategies
+ }
+
+ // private removeWorkerChoiceStrategy (
+ // workerChoiceStrategy: WorkerChoiceStrategy
+ // ): boolean {
+ // return this.workerChoiceStrategies.delete(workerChoiceStrategy)
+ // }
+}
+++ /dev/null
-import type { IPool } from '../pool.js'
-import { getWorkerChoiceStrategyRetries } from '../utils.js'
-import type { IWorker } from '../worker.js'
-import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
-import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
-import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
-import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
-import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
-import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
-import type {
- IWorkerChoiceStrategy,
- StrategyPolicy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
-} from './selection-strategies-types.js'
-import { WorkerChoiceStrategies } from './selection-strategies-types.js'
-import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
-
-/**
- * The worker choice strategy context.
- *
- * @typeParam Worker - Type of worker.
- * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
- * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
- */
-export class WorkerChoiceStrategyContext<
- Worker extends IWorker,
- Data = unknown,
- Response = unknown
-> {
- /**
- * The number of worker choice strategy execution retries.
- */
- public retriesCount: number
-
- /**
- * The worker choice strategy instances registered in the context.
- */
- private readonly workerChoiceStrategies: Map<
- WorkerChoiceStrategy,
- IWorkerChoiceStrategy
- >
-
- /**
- * The maximum number of worker choice strategy execution retries.
- */
- private readonly retries: number
-
- /**
- * Worker choice strategy context constructor.
- *
- * @param pool - The pool instance.
- * @param workerChoiceStrategy - The worker choice strategy.
- * @param opts - The worker choice strategy options.
- */
- public constructor (
- pool: IPool<Worker, Data, Response>,
- private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
- opts?: WorkerChoiceStrategyOptions
- ) {
- this.execute = this.execute.bind(this)
- this.workerChoiceStrategies = new Map<
- WorkerChoiceStrategy,
- IWorkerChoiceStrategy
- >([
- [
- WorkerChoiceStrategies.ROUND_ROBIN,
- new (RoundRobinWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
- pool,
- opts
- )
- ],
- [
- WorkerChoiceStrategies.LEAST_USED,
- new (LeastUsedWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
- pool,
- opts
- )
- ],
- [
- WorkerChoiceStrategies.LEAST_BUSY,
- new (LeastBusyWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
- pool,
- opts
- )
- ],
- [
- WorkerChoiceStrategies.LEAST_ELU,
- new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
- pool,
- opts
- )
- ],
- [
- WorkerChoiceStrategies.FAIR_SHARE,
- new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
- pool,
- opts
- )
- ],
- [
- WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
- new (WeightedRoundRobinWorkerChoiceStrategy.bind(this))<
- Worker,
- Data,
- Response
- >(pool, opts)
- ],
- [
- WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
- new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(this))<
- Worker,
- Data,
- Response
- >(pool, opts)
- ]
- ])
- this.retriesCount = 0
- this.retries = getWorkerChoiceStrategyRetries(pool, opts)
- }
-
- /**
- * Gets the strategy policy in the context.
- *
- * @returns The strategy policy.
- */
- public getStrategyPolicy (): StrategyPolicy {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategies.get(this.workerChoiceStrategy)!
- .strategyPolicy
- }
-
- /**
- * Gets the worker choice strategy in the context task statistics requirements.
- *
- * @returns The task statistics requirements.
- */
- public getTaskStatisticsRequirements (): TaskStatisticsRequirements {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategies.get(this.workerChoiceStrategy)!
- .taskStatisticsRequirements
- }
-
- /**
- * Sets the worker choice strategy to use in the context.
- *
- * @param workerChoiceStrategy - The worker choice strategy to set.
- */
- public setWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy
- ): void {
- if (this.workerChoiceStrategy !== workerChoiceStrategy) {
- this.workerChoiceStrategy = workerChoiceStrategy
- }
- this.workerChoiceStrategies.get(this.workerChoiceStrategy)?.reset()
- }
-
- /**
- * Updates the worker node key in the worker choice strategy in the context internals.
- *
- * @returns `true` if the update is successful, `false` otherwise.
- */
- public update (workerNodeKey: number): boolean {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategies
- .get(this.workerChoiceStrategy)!
- .update(workerNodeKey)
- }
-
- /**
- * Executes the worker choice strategy in the context algorithm.
- *
- * @returns The key of the worker node.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
- */
- public execute (): number {
- return this.executeStrategy(
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.workerChoiceStrategies.get(this.workerChoiceStrategy)!
- )
- }
-
- /**
- * Executes the given worker choice strategy.
- *
- * @param workerChoiceStrategy - The worker choice strategy.
- * @returns The key of the worker node.
- * @throws {@link https://nodejs.org/api/errors.html#class-error} If after computed retries the worker node key is null or undefined.
- */
- private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
- let workerNodeKey: number | undefined
- let chooseCount = 0
- let retriesCount = 0
- do {
- workerNodeKey = workerChoiceStrategy.choose()
- if (workerNodeKey == null && chooseCount > 0) {
- ++retriesCount
- ++this.retriesCount
- }
- ++chooseCount
- } while (workerNodeKey == null && retriesCount < this.retries)
- if (workerNodeKey == null) {
- throw new Error(
- `Worker node key chosen is null or undefined after ${retriesCount} retries`
- )
- }
- return workerNodeKey
- }
-
- /**
- * Removes the worker node key from the worker choice strategy in the context.
- *
- * @param workerNodeKey - The worker node key.
- * @returns `true` if the removal is successful, `false` otherwise.
- */
- public remove (workerNodeKey: number): boolean {
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- return this.workerChoiceStrategies
- .get(this.workerChoiceStrategy)!
- .remove(workerNodeKey)
- }
-
- /**
- * Sets the worker choice strategies in the context options.
- *
- * @param opts - The worker choice strategy options.
- */
- public setOptions (opts: WorkerChoiceStrategyOptions | undefined): void {
- for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
- workerChoiceStrategy.setOptions(opts)
- }
- }
-}
import cluster, { Worker as ClusterWorker } from 'node:cluster'
import { existsSync } from 'node:fs'
-import { cpus } from 'node:os'
import { env } from 'node:process'
import {
SHARE_ENV,
import type { MessageValue, Task } from '../utility-types.js'
import { average, isPlainObject, max, median, min } from '../utils.js'
-import type { IPool, TasksQueueOptions } from './pool.js'
+import type { TasksQueueOptions } from './pool.js'
import {
type MeasurementStatisticsRequirements,
WorkerChoiceStrategies,
- type WorkerChoiceStrategy,
- type WorkerChoiceStrategyOptions
+ type WorkerChoiceStrategy
} from './selection-strategies/selection-strategies-types.js'
-import type { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
+import type { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
type IWorker,
type IWorkerNode,
}
}
-export const getWorkerChoiceStrategyRetries = <
- Worker extends IWorker,
- Data,
- Response
->(
- pool: IPool<Worker, Data, Response>,
- opts?: WorkerChoiceStrategyOptions
- ): number => {
- return (
- pool.info.maxSize +
- Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
- )
-}
-
-export const buildWorkerChoiceStrategyOptions = <
- Worker extends IWorker,
- Data,
- Response
->(
- pool: IPool<Worker, Data, Response>,
- opts?: WorkerChoiceStrategyOptions
- ): WorkerChoiceStrategyOptions => {
- opts = clone(opts ?? {})
- opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
- return {
- ...{
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- },
- ...opts
- }
-}
-
-const clone = <T>(object: T): T => {
- return structuredClone<T>(object)
-}
-
-const getDefaultWeights = (
- poolMaxSize: number,
- defaultWorkerWeight?: number
-): Record<number, number> => {
- defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
- const weights: Record<number, number> = {}
- for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
- weights[workerNodeKey] = defaultWorkerWeight
- }
- return weights
-}
-
-const estimatedCpuSpeed = (): number => {
- const runs = 150000000
- const begin = performance.now()
- // eslint-disable-next-line no-empty
- for (let i = runs; i > 0; i--) {}
- const end = performance.now()
- const duration = end - begin
- return Math.trunc(runs / duration / 1000) // in MHz
-}
-
-const getDefaultWorkerWeight = (): number => {
- const currentCpus = cpus()
- let estCpuSpeed: number | undefined
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
- estCpuSpeed = estimatedCpuSpeed()
- }
- let cpusCycleTimeWeight = 0
- for (const cpu of currentCpus) {
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- if (cpu.speed == null || cpu.speed === 0) {
- cpu.speed =
- // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
- currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
- estCpuSpeed ??
- 2000
- }
- // CPU estimated cycle time
- const numberOfDigits = cpu.speed.toString().length - 1
- const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
- cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
- }
- return Math.round(cpusCycleTimeWeight / currentCpus.length)
-}
-
export const checkFilePath = (filePath: string | undefined): void => {
if (filePath == null) {
throw new TypeError('The worker file path must be specified')
Response = unknown
>(
workerChoiceStrategyContext:
- | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
| undefined,
workerUsage: WorkerUsage,
task: Task<Data>
Response = unknown
>(
workerChoiceStrategyContext:
- | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
| undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
Response = unknown
>(
workerChoiceStrategyContext:
- | WorkerChoiceStrategyContext<Worker, Data, Response>
+ | WorkerChoiceStrategiesContext<Worker, Data, Response>
| undefined,
workerUsage: WorkerUsage,
message: MessageValue<Response>
--- /dev/null
+/**
+ * @internal
+ */
+interface PriorityQueueNode<T> {
+ data: T
+ priority: number
+}
+
+/**
+ * Priority queue.
+ *
+ * @typeParam T - Type of priority queue data.
+ * @internal
+ */
+export class PriorityQueue<T> {
+ private nodeArray!: Array<PriorityQueueNode<T>>
+ /** The size of the priority queue. */
+ public size!: number
+ /** The maximum size of the priority queue. */
+ public maxSize!: number
+
+ public constructor () {
+ this.clear()
+ }
+
+ /**
+ * Enqueue data into the priority queue.
+ *
+ * @param data - Data to enqueue.
+ * @param priority - Priority of the data. Lower values have higher priority.
+ * @returns The new size of the priority queue.
+ */
+ public enqueue (data: T, priority?: number): number {
+ priority = priority ?? 0
+ let inserted = false
+ for (const [index, node] of this.nodeArray.entries()) {
+ if (node.priority > priority) {
+ this.nodeArray.splice(index, 0, { data, priority })
+ inserted = true
+ break
+ }
+ }
+ if (!inserted) {
+ this.nodeArray.push({ data, priority })
+ }
+ return this.incrementSize()
+ }
+
+ /**
+ * Dequeue data from the priority queue.
+ *
+ * @returns The dequeued data or `undefined` if the priority queue is empty.
+ */
+ public dequeue (): T | undefined {
+ if (this.size > 0) {
+ --this.size
+ }
+ return this.nodeArray.shift()?.data
+ }
+
+ /**
+ * Peeks at the first data.
+ * @returns The first data or `undefined` if the priority queue is empty.
+ */
+ public peekFirst (): T | undefined {
+ return this.nodeArray[0]?.data
+ }
+
+ /**
+ * Peeks at the last data.
+ * @returns The last data or `undefined` if the priority queue is empty.
+ */
+ public peekLast (): T | undefined {
+ return this.nodeArray[this.nodeArray.length - 1]?.data
+ }
+
+ /**
+ * Clears the priority queue.
+ */
+ public clear (): void {
+ this.nodeArray = []
+ this.size = 0
+ this.maxSize = 0
+ }
+
+ /**
+ * Increments the size of the deque.
+ *
+ * @returns The new size of the deque.
+ */
+ private incrementSize (): number {
+ ++this.size
+ if (this.size > this.maxSize) {
+ this.maxSize = this.size
+ }
+ return this.size
+ }
+}
* @internal
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
-export const once = <A extends any[], R, C>(
+export const once = <A extends any[], R, C extends ThisType<any>>(
fn: (...args: A) => R,
context: C
): ((...args: A) => R) => {
} from './task-functions.js'
import {
checkTaskFunctionName,
- checkValidTaskFunctionEntry,
+ checkValidTaskFunctionObjectEntry,
checkValidWorkerOptions
} from './utils.js'
import { KillBehaviors, type WorkerOptions } from './worker-options.js'
Response
>
}
- checkValidTaskFunctionEntry<Data, Response>(name, fnObj)
+ checkValidTaskFunctionObjectEntry<Data, Response>(name, fnObj)
fnObj.taskFunction = fnObj.taskFunction.bind(this)
if (firstEntry) {
this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj)
if (typeof fn === 'function') {
fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
}
- checkValidTaskFunctionEntry<Data, Response>(name, fn)
+ checkValidTaskFunctionObjectEntry<Data, Response>(name, fn)
fn.taskFunction = fn.taskFunction.bind(this)
if (
this.taskFunctions.get(name) ===
}
}
-export const checkValidTaskFunctionEntry = <Data = unknown, Response = unknown>(
- name: string,
- fnObj: TaskFunctionObject<Data, Response>
-): void => {
+export const checkValidTaskFunctionObjectEntry = <
+ Data = unknown,
+ Response = unknown
+>(
+ name: string,
+ fnObj: TaskFunctionObject<Data, Response>
+ ): void => {
if (typeof name !== 'string') {
throw new TypeError('A taskFunctions parameter object key is not a string')
}
enableTasksQueue: false,
workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
errorHandler: testHandler,
exitHandler: testHandler
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
runTime: { median: true },
elu: { median: true }
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: true },
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
runTime: { median: false },
elu: { median: false }
})
- for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
+ for (const [, workerChoiceStrategy] of pool.workerChoiceStrategiesContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
})
}
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: 0,
minSize: numberOfWorkers,
maxSize: numberOfWorkers,
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: 0,
minSize: Math.floor(numberOfWorkers / 2),
maxSize: numberOfWorkers,
await pool.destroy()
})
- it('Verify that pool worker tasks usage are reset at worker choice strategy change', async () => {
+ it("Verify that pool worker tasks usage aren't reset at worker choice strategy change", async () => {
const pool = new DynamicThreadPool(
Math.floor(numberOfWorkers / 2),
numberOfWorkers,
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: 0,
+ executed: expect.any(Number),
executing: 0,
queued: 0,
maxQueued: 0,
}
}
})
+ expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ numberOfWorkers * maxMultiplier
+ )
expect(workerNode.usage.runTime.history.length).toBe(0)
expect(workerNode.usage.waitTime.history.length).toBe(0)
expect(workerNode.usage.elu.idle.history.length).toBe(0)
worker: WorkerTypes.cluster,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
worker: WorkerTypes.thread,
started: true,
ready: true,
- strategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ defaultStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
strategyRetries: expect.any(Number),
minSize: expect.any(Number),
maxSize: expect.any(Number),
await waitWorkerEvents(longRunningPool, 'exit', max - min)
expect(longRunningPool.workerNodes.length).toBe(min)
expect(
- longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
+ longRunningPool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ longRunningPool.workerChoiceStrategiesContext
+ .defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBeLessThan(longRunningPool.workerNodes.length)
// We need to clean up the resources after our test
+import { randomInt } from 'node:crypto'
+
import { expect } from 'expect'
import { CircularArray } from '../../../lib/circular-array.cjs'
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
+ expect(pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.ROUND_ROBIN
+ )
// We need to clean up the resources after our test
await pool.destroy()
})
{ workerChoiceStrategy }
)
expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
- expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
+ expect(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).toBe(workerChoiceStrategy)
await pool.destroy()
}
})
)
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
- expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
+ expect(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).toBe(workerChoiceStrategy)
await pool.destroy()
}
for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
)
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
- expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
+ expect(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).toBe(workerChoiceStrategy)
await pool.destroy()
}
})
it('Verify available strategies default internals at pool creation', async () => {
- const pool = new FixedThreadPool(
- max,
- './tests/worker-files/thread/testWorker.mjs'
- )
for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.mjs',
+ { workerChoiceStrategy }
+ )
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).nextWorkerNodeKey
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).previousWorkerNodeKey
).toBe(0)
workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
) {
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).workerNodeVirtualTaskRunTime
).toBe(0)
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
) {
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).workerNodeVirtualTaskRunTime
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).roundId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).workerNodeId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).roundWeights.length
).toBe(1)
expect(
Number.isSafeInteger(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
).roundWeights[0]
)
).toBe(true)
}
+ await pool.destroy()
}
- await pool.destroy()
})
it('Verify ROUND_ROBIN strategy default policy', async () => {
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: false,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: false,
})
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toBe(pool.workerNodes.length - 1)
// We need to clean up the resources after our test
)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toBe(pool.workerNodes.length - 1)
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
+ it("Verify ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
let pool = new FixedThreadPool(
max,
'./tests/worker-files/thread/testWorker.mjs',
- { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+ { workerChoiceStrategy }
)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).previousWorkerNodeKey
- ).toBeDefined()
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).nextWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).previousWorkerNodeKey = randomInt(1, max - 1)
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
await pool.destroy()
pool = new DynamicThreadPool(
min,
max,
'./tests/worker-files/thread/testWorker.mjs',
- { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+ { workerChoiceStrategy }
)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).nextWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).previousWorkerNodeKey
- ).toBeDefined()
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).nextWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ ).previousWorkerNodeKey = randomInt(1, max - 1)
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
// We need to clean up the resources after our test
await pool.destroy()
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: false,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: false,
)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: false,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: false,
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(expect.any(Number))
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(expect.any(Number))
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
+ it("Verify FAIR_SHARE strategy internals aren't reset after setting it", async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
let pool = new FixedThreadPool(
max,
}
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
+ expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
}
await pool.destroy()
pool = new DynamicThreadPool(
}
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeUndefined()
+ expect(workerNode.strategyData.virtualTaskEndTimestamp).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
await pool.destroy()
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeVirtualTaskRunTime
).toBeGreaterThanOrEqual(0)
// We need to clean up the resources after our test
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeVirtualTaskRunTime
).toBeGreaterThanOrEqual(0)
// We need to clean up the resources after our test
}
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeVirtualTaskRunTime
).toBeGreaterThanOrEqual(0)
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
+ it("Verify WEIGHTED_ROUND_ROBIN strategy internals aren't reset after setting it", async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
let pool = new FixedThreadPool(
max,
- './tests/worker-files/thread/testWorker.mjs'
+ './tests/worker-files/thread/testWorker.mjs',
+ { workerChoiceStrategy }
)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).nextWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).previousWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
- ).toBeDefined()
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).nextWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).previousWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).workerNodeVirtualTaskRunTime = randomInt(100, 1000)
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeVirtualTaskRunTime
- ).toBe(0)
+ ).toBeGreaterThan(99)
await pool.destroy()
pool = new DynamicThreadPool(
min,
max,
- './tests/worker-files/thread/testWorker.mjs'
+ './tests/worker-files/thread/testWorker.mjs',
+ { workerChoiceStrategy }
)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).nextWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).previousWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
- ).toBeDefined()
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).nextWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).previousWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).workerNodeVirtualTaskRunTime = randomInt(100, 1000)
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeVirtualTaskRunTime
- ).toBe(0)
+ ).toBeGreaterThan(99)
// We need to clean up the resources after our test
await pool.destroy()
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy }
)
- expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+ expect(pool.workerChoiceStrategiesContext.getPolicy()).toStrictEqual({
dynamicWorkerUsage: false,
dynamicWorkerReady: true
})
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
{ workerChoiceStrategy }
)
expect(
- pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ pool.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
aggregate: true,
)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights.length
).toBe(1)
expect(
Number.isSafeInteger(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights[0]
)
).toBe(true)
)
}
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeId
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBe(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
).toEqual(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights.length
).toBe(1)
expect(
Number.isSafeInteger(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights[0]
)
).toBe(true)
await pool.destroy()
})
- it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
+ it("Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy internals aren't resets after setting it", async () => {
const workerChoiceStrategy =
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
let pool = new FixedThreadPool(
max,
- './tests/worker-files/thread/testWorker.mjs'
+ './tests/worker-files/thread/testWorker.mjs',
+ { workerChoiceStrategy }
)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).roundId
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).workerNodeId
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).nextWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).previousWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).roundWeights
- ).toBeDefined()
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundId = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).workerNodeId = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).nextWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).previousWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)]
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundId
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeId
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights.length
- ).toBe(1)
+ ).toBeGreaterThan(1)
expect(
Number.isSafeInteger(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights[0]
)
).toBe(true)
pool = new DynamicThreadPool(
min,
max,
- './tests/worker-files/thread/testWorker.mjs'
+ './tests/worker-files/thread/testWorker.mjs',
+ { workerChoiceStrategy }
)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).roundId
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).workerNodeId
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).nextWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).previousWorkerNodeKey
- ).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).roundWeights
- ).toBeDefined()
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundId = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).workerNodeId = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).nextWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).previousWorkerNodeKey = randomInt(1, max - 1)
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategy
+ ).roundWeights = [randomInt(1, max - 1), randomInt(1, max - 1)]
pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundId
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).workerNodeId
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
+ ).toBeGreaterThan(0)
expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights.length
- ).toBe(1)
+ ).toBeGreaterThan(1)
expect(
Number.isSafeInteger(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
+ pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).roundWeights[0]
)
).toBe(true)
--- /dev/null
+import { expect } from 'expect'
+
+import { FixedClusterPool, FixedThreadPool } from '../../../lib/index.cjs'
+import {
+ buildWorkerChoiceStrategyOptions,
+ getWorkerChoiceStrategiesRetries
+} from '../../../lib/pools/selection-strategies/selection-strategies-utils.cjs'
+
+describe('Selection strategies utils test suite', () => {
+ it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => {
+ const numberOfWorkers = 4
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.cjs'
+ )
+ expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
+ })
+ const workerChoiceStrategyOptions = {
+ runTime: { median: true },
+ waitTime: { median: true },
+ elu: { median: true },
+ weights: {
+ 0: 100,
+ 1: 100
+ }
+ }
+ expect(
+ buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions)
+ ).toStrictEqual(workerChoiceStrategyOptions)
+ await pool.destroy()
+ })
+
+ it('Verify getWorkerChoiceStrategyRetries() behavior', async () => {
+ const numberOfThreads = 4
+ const pool = new FixedThreadPool(
+ numberOfThreads,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(getWorkerChoiceStrategiesRetries(pool)).toBe(pool.info.maxSize * 2)
+ const workerChoiceStrategyOptions = {
+ runTime: { median: true },
+ waitTime: { median: true },
+ elu: { median: true },
+ weights: {
+ 0: 100,
+ 1: 100
+ }
+ }
+ expect(
+ getWorkerChoiceStrategiesRetries(pool, workerChoiceStrategyOptions)
+ ).toBe(
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategyOptions.weights).length
+ )
+ await pool.destroy()
+ })
+})
import { LeastUsedWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/least-used-worker-choice-strategy.cjs'
import { RoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/round-robin-worker-choice-strategy.cjs'
import { WeightedRoundRobinWorkerChoiceStrategy } from '../../../lib/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.cjs'
-import { WorkerChoiceStrategyContext } from '../../../lib/pools/selection-strategies/worker-choice-strategy-context.cjs'
+import { WorkerChoiceStrategiesContext } from '../../../lib/pools/selection-strategies/worker-choice-strategies-context.cjs'
describe('Worker choice strategy context test suite', () => {
const min = 1
await dynamicPool.destroy()
})
- it('Verify that constructor() initializes the context with all the available worker choice strategies', () => {
- let workerChoiceStrategyContext = new WorkerChoiceStrategyContext(fixedPool)
- expect(workerChoiceStrategyContext.workerChoiceStrategies.size).toBe(
- Object.keys(WorkerChoiceStrategies).length
+ it('Verify that constructor() initializes the context with the default choice strategy', () => {
+ let workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ fixedPool
)
- workerChoiceStrategyContext = new WorkerChoiceStrategyContext(dynamicPool)
- expect(workerChoiceStrategyContext.workerChoiceStrategies.size).toBe(
- Object.keys(WorkerChoiceStrategies).length
+ expect(workerChoiceStrategiesContext.workerChoiceStrategies.size).toBe(1)
+ expect(
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ )
+ ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
+ workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ dynamicPool
)
+ expect(workerChoiceStrategiesContext.workerChoiceStrategies.size).toBe(1)
+ expect(
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
+ )
+ ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
})
it('Verify that constructor() initializes the context with retries attribute properly set', () => {
- let workerChoiceStrategyContext = new WorkerChoiceStrategyContext(fixedPool)
- expect(workerChoiceStrategyContext.retries).toBe(fixedPool.info.maxSize * 2)
- workerChoiceStrategyContext = new WorkerChoiceStrategyContext(dynamicPool)
- expect(workerChoiceStrategyContext.retries).toBe(
+ let workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ fixedPool
+ )
+ expect(workerChoiceStrategiesContext.retries).toBe(
+ fixedPool.info.maxSize * 2
+ )
+ workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
+ dynamicPool
+ )
+ expect(workerChoiceStrategiesContext.retries).toBe(
dynamicPool.info.maxSize * 2
)
})
it('Verify that execute() throws error if null or undefined is returned after retries', () => {
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+ expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
const workerChoiceStrategyUndefinedStub = createStubInstance(
choose: stub().returns(undefined)
}
)
- workerChoiceStrategyContext.workerChoiceStrategies.set(
- workerChoiceStrategyContext.workerChoiceStrategy,
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
workerChoiceStrategyUndefinedStub
)
- expect(() => workerChoiceStrategyContext.execute()).toThrow(
+ expect(() => workerChoiceStrategiesContext.execute()).toThrow(
new Error(
- `Worker node key chosen is null or undefined after ${workerChoiceStrategyContext.retries} retries`
+ `Worker node key chosen is null or undefined after ${workerChoiceStrategiesContext.retries} retries`
)
)
const workerChoiceStrategyNullStub = createStubInstance(
choose: stub().returns(null)
}
)
- workerChoiceStrategyContext.workerChoiceStrategies.set(
- workerChoiceStrategyContext.workerChoiceStrategy,
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
workerChoiceStrategyNullStub
)
- expect(() => workerChoiceStrategyContext.execute()).toThrow(
+ expect(() => workerChoiceStrategiesContext.execute()).toThrow(
new Error(
- `Worker node key chosen is null or undefined after ${workerChoiceStrategyContext.retries} retries`
+ `Worker node key chosen is null or undefined after ${workerChoiceStrategiesContext.retries} retries`
)
)
})
it('Verify that execute() retry until a worker node is chosen', () => {
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
const workerChoiceStrategyStub = createStubInstance(
.returns(1)
}
)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+ expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
- workerChoiceStrategyContext.workerChoiceStrategies.set(
- workerChoiceStrategyContext.workerChoiceStrategy,
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
workerChoiceStrategyStub
)
- const chosenWorkerKey = workerChoiceStrategyContext.execute()
+ const chosenWorkerKey = workerChoiceStrategiesContext.execute()
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategyContext.workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).choose.callCount
).toBe(6)
expect(chosenWorkerKey).toBe(1)
})
it('Verify that execute() return the worker node key chosen by the strategy with fixed pool', () => {
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
const workerChoiceStrategyStub = createStubInstance(
choose: stub().returns(0)
}
)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+ expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
- workerChoiceStrategyContext.workerChoiceStrategies.set(
- workerChoiceStrategyContext.workerChoiceStrategy,
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
workerChoiceStrategyStub
)
- const chosenWorkerKey = workerChoiceStrategyContext.execute()
+ const chosenWorkerKey = workerChoiceStrategiesContext.execute()
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategyContext.workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).choose.calledOnce
).toBe(true)
expect(chosenWorkerKey).toBe(0)
})
it('Verify that execute() return the worker node key chosen by the strategy with dynamic pool', () => {
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
const workerChoiceStrategyStub = createStubInstance(
choose: stub().returns(0)
}
)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+ expect(workerChoiceStrategiesContext.defaultWorkerChoiceStrategy).toBe(
WorkerChoiceStrategies.ROUND_ROBIN
)
- workerChoiceStrategyContext.workerChoiceStrategies.set(
- workerChoiceStrategyContext.workerChoiceStrategy,
+ workerChoiceStrategiesContext.workerChoiceStrategies.set(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy,
workerChoiceStrategyStub
)
- const chosenWorkerKey = workerChoiceStrategyContext.execute()
+ const chosenWorkerKey = workerChoiceStrategiesContext.execute()
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategyContext.workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
).choose.calledOnce
).toBe(true)
expect(chosenWorkerKey).toBe(0)
})
- it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.ROUND_ROBIN
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.ROUND_ROBIN
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with LEAST_USED and fixed pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_USED and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LEAST_USED
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(LeastUsedWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with LEAST_USED and dynamic pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_USED and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LEAST_USED
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(LeastUsedWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with LEAST_BUSY and fixed pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_BUSY and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LEAST_BUSY
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(LeastBusyWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with LEAST_BUSY and dynamic pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_BUSY and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LEAST_BUSY
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(LeastBusyWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LEAST_ELU
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(LeastEluWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.LEAST_ELU
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(LeastEluWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(FairShareWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.FAIR_SHARE
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(FairShareWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => {
- const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and fixed pool', () => {
- const workerChoiceStrategy =
- WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and fixed pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(InterleavedWeightedRoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
- it('Verify that setWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and dynamic pool', () => {
- const workerChoiceStrategy =
- WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
- const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ it('Verify that setDefaultWorkerChoiceStrategy() works with INTERLEAVED_WEIGHTED_ROUND_ROBIN and dynamic pool', () => {
+ const workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool
)
- workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+ workerChoiceStrategiesContext.setDefaultWorkerChoiceStrategy(
+ WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+ )
expect(
- workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
+ workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
)
).toBeInstanceOf(InterleavedWeightedRoundRobinWorkerChoiceStrategy)
- expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
- workerChoiceStrategy
- )
})
it('Verify that worker choice strategy options enable median runtime pool statistics', () => {
const wwrWorkerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- let workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ let workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool,
- wwrWorkerChoiceStrategy,
+ [wwrWorkerChoiceStrategy],
{
runTime: { median: true }
}
)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
.average
).toBe(false)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
+ .median
).toBe(true)
- workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool,
- wwrWorkerChoiceStrategy,
+ [wwrWorkerChoiceStrategy],
{
runTime: { median: true }
}
)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
.average
).toBe(false)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
+ .median
).toBe(true)
const fsWorkerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
- workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
fixedPool,
- fsWorkerChoiceStrategy,
+ [fsWorkerChoiceStrategy],
{
runTime: { median: true }
}
)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
.average
).toBe(false)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
+ .median
).toBe(true)
- workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext(
dynamicPool,
- fsWorkerChoiceStrategy,
+ [fsWorkerChoiceStrategy],
{
runTime: { median: true }
}
)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
.average
).toBe(false)
expect(
- workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime.median
+ workerChoiceStrategiesContext.getTaskStatisticsRequirements().runTime
+ .median
).toBe(true)
})
})
await waitWorkerEvents(longRunningPool, 'exit', max - min)
expect(longRunningPool.workerNodes.length).toBe(min)
expect(
- longRunningPool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- longRunningPool.workerChoiceStrategyContext.workerChoiceStrategy
+ longRunningPool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
+ longRunningPool.workerChoiceStrategiesContext
+ .defaultWorkerChoiceStrategy
).nextWorkerNodeKey
).toBeLessThan(longRunningPool.workerNodes.length)
// We need to clean up the resources after our test
CircularArray,
DEFAULT_CIRCULAR_ARRAY_SIZE
} from '../../lib/circular-array.cjs'
+import { WorkerTypes } from '../../lib/index.cjs'
import {
- FixedClusterPool,
- FixedThreadPool,
- WorkerTypes
-} from '../../lib/index.cjs'
-import {
- buildWorkerChoiceStrategyOptions,
createWorker,
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
getDefaultTasksQueueOptions,
- getWorkerChoiceStrategyRetries,
getWorkerId,
getWorkerType,
updateMeasurementStatistics
})
})
- it('Verify getWorkerChoiceStrategyRetries() behavior', async () => {
- const numberOfThreads = 4
- const pool = new FixedThreadPool(
- numberOfThreads,
- './tests/worker-files/thread/testWorker.mjs'
- )
- expect(getWorkerChoiceStrategyRetries(pool)).toBe(pool.info.maxSize * 2)
- const workerChoiceStrategyOptions = {
- runTime: { median: true },
- waitTime: { median: true },
- elu: { median: true },
- weights: {
- 0: 100,
- 1: 100
- }
- }
- expect(
- getWorkerChoiceStrategyRetries(pool, workerChoiceStrategyOptions)
- ).toBe(
- pool.info.maxSize +
- Object.keys(workerChoiceStrategyOptions.weights).length
- )
- await pool.destroy()
- })
-
- it('Verify buildWorkerChoiceStrategyOptions() behavior', async () => {
- const numberOfWorkers = 4
- const pool = new FixedClusterPool(
- numberOfWorkers,
- './tests/worker-files/cluster/testWorker.cjs'
- )
- expect(buildWorkerChoiceStrategyOptions(pool)).toStrictEqual({
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false },
- weights: expect.objectContaining({
- 0: expect.any(Number),
- [pool.info.maxSize - 1]: expect.any(Number)
- })
- })
- const workerChoiceStrategyOptions = {
- runTime: { median: true },
- waitTime: { median: true },
- elu: { median: true },
- weights: {
- 0: 100,
- 1: 100
- }
- }
- expect(
- buildWorkerChoiceStrategyOptions(pool, workerChoiceStrategyOptions)
- ).toStrictEqual(workerChoiceStrategyOptions)
- await pool.destroy()
- })
-
it('Verify updateMeasurementStatistics() behavior', () => {
const measurementStatistics = {
history: new CircularArray()