{
"check-coverage": true,
- "lines": 93,
- "statements": 93,
+ "lines": 92,
+ "statements": 92,
"functions": 94,
"branches": 92
}
## [Unreleased]
+### Fixed
+
+- Ensure worker choice strategies implementation wait for worker node readiness: [#1748](https://github.com/poolifier/poolifier/issues/1748).
+
## [3.1.6] - 2023-12-18
### Fixed
- `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.
Properties:
- - `retries` (optional) - The number of retries to perform if no worker is eligible.
- `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- `runTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
- `waitTime` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
- `elu` (optional) - Use the tasks [simple moving median](./worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
- `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
- Default: `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
+ Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
- `startWorkers` (optional) - Start the minimum number of workers at pool initialization.
Default: `true`
console.info(`==== ${request.method} ${request.url} ====`)
console.info('> Headers')
- console.log(request.headers)
+ console.info(request.headers)
console.info('> Body')
console.info(body)
} from '../utility-types'
import {
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
average,
exponentialDelay,
/** @inheritDoc */
public emitter?: EventEmitterAsyncResource
- /**
- * Dynamic pool maximum size property placeholder.
- */
- protected readonly max?: number
-
/**
* The task execution response promise map:
* - `key`: The message id of each submitted task.
/**
* Constructs a new poolifier pool.
*
- * @param numberOfWorkers - Number of workers that this pool should manage.
+ * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
* @param filePath - Path to the worker file.
* @param opts - Options for the pool.
+ * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
*/
public constructor (
- protected readonly numberOfWorkers: number,
+ protected readonly minimumNumberOfWorkers: number,
protected readonly filePath: string,
- protected readonly opts: PoolOptions<Worker>
+ protected readonly opts: PoolOptions<Worker>,
+ protected readonly maximumNumberOfWorkers?: number
) {
if (!this.isMain()) {
throw new Error(
'Cannot start a pool from a worker with the same type as the pool'
)
}
+ this.checkPoolType()
checkFilePath(this.filePath)
- this.checkNumberOfWorkers(this.numberOfWorkers)
+ this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
this.checkPoolOptions(this.opts)
this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
this.startTimestamp = performance.now()
}
- private checkNumberOfWorkers (numberOfWorkers: number): void {
- if (numberOfWorkers == null) {
+ private checkPoolType (): void {
+ if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
+ throw new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ }
+ }
+
+ private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void {
+ if (minimumNumberOfWorkers == null) {
throw new Error(
'Cannot instantiate a pool without specifying the number of workers'
)
- } else if (!Number.isSafeInteger(numberOfWorkers)) {
+ } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
throw new TypeError(
'Cannot instantiate a pool with a non safe integer number of workers'
)
- } else if (numberOfWorkers < 0) {
+ } else if (minimumNumberOfWorkers < 0) {
throw new RangeError(
'Cannot instantiate a pool with a negative number of workers'
)
- } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
+ } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
throw new RangeError('Cannot instantiate a fixed pool with zero worker')
}
}
this.checkValidWorkerChoiceStrategyOptions(
opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...opts.workerChoiceStrategyOptions
+ if (opts.workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
}
this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
this.opts.enableEvents = opts.enableEvents ?? true
'Invalid worker choice strategy options: must be a plain object'
)
}
- if (
- workerChoiceStrategyOptions?.retries != null &&
- !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
- ) {
- throw new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- }
- if (
- workerChoiceStrategyOptions?.retries != null &&
- workerChoiceStrategyOptions.retries < 0
- ) {
- throw new RangeError(
- `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
- )
- }
if (
workerChoiceStrategyOptions?.weights != null &&
- Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
+ Object.keys(workerChoiceStrategyOptions.weights).length !==
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
) {
throw new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
- minSize: this.minSize,
- maxSize: this.maxSize,
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ minSize: this.minimumNumberOfWorkers,
+ maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate &&
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && { utilization: round(this.utilization) }),
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
accumulator + workerNode.usage.tasks.failed,
0
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.aggregate && {
runTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.runTime.median && {
median: round(
median(
})
}
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.aggregate && {
waitTime: {
minimum: round(
)
)
),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.average && {
average: round(
average(
)
)
}),
- ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
.waitTime.median && {
median: round(
median(
? accumulator + 1
: accumulator,
0
- ) >= this.minSize
+ ) >= this.minimumNumberOfWorkers
)
}
*/
private get utilization (): number {
const poolTimeCapacity =
- (performance.now() - this.startTimestamp) * this.maxSize
+ (performance.now() - this.startTimestamp) *
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
const totalTasksRunTime = this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
*/
protected abstract get worker (): WorkerType
- /**
- * The pool minimum size.
- */
- protected get minSize (): number {
- return this.numberOfWorkers
- }
-
- /**
- * The pool maximum size.
- */
- protected get maxSize (): number {
- return this.max ?? this.numberOfWorkers
- }
-
/**
* Checks if the worker id sent in the received message from a worker is valid.
*
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = {
- ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
- ...workerChoiceStrategyOptions
+ if (workerChoiceStrategyOptions != null) {
+ this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
}
this.workerChoiceStrategyContext.setOptions(
+ this,
this.opts.workerChoiceStrategyOptions
)
}
tasksQueueOptions: TasksQueueOptions
): TasksQueueOptions {
return {
- ...getDefaultTasksQueueOptions(this.maxSize),
+ ...getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ),
...tasksQueueOptions
}
}
* The pool filling boolean status.
*/
protected get full (): boolean {
- return this.workerNodes.length >= this.maxSize
+ return (
+ this.workerNodes.length >=
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
+ )
}
/**
(accumulator, workerNode) =>
!workerNode.info.dynamic ? accumulator + 1 : accumulator,
0
- ) < this.numberOfWorkers
+ ) < this.minimumNumberOfWorkers
) {
this.createAndSetupWorkerNode()
}
this.started = false
}
- protected async sendKillMessageToWorker (
- workerNodeKey: number
- ): Promise<void> {
+ private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
await new Promise<void>((resolve, reject) => {
if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) {
reject(new Error(`Invalid worker node key '${workerNodeKey}'`))
'taskFinished',
flushedTasks,
this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
- getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).tasksFinishedTimeout
)
await this.sendKillMessageToWorker(workerNodeKey)
await workerNode.terminate()
workerOptions: this.opts.workerOptions,
tasksQueueBackPressureSize:
this.opts.tasksQueueOptions?.size ??
- getDefaultTasksQueueOptions(this.maxSize).size
+ getDefaultTasksQueueOptions(
+ this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
+ ).size
}
)
// Flag the worker node as ready at pool startup.
*/
public constructor (
min: number,
- protected readonly max: number,
+ max: number,
filePath: string,
opts: PoolOptions<Worker> = {}
) {
- super(min, filePath, opts)
- checkDynamicPoolSize(this.numberOfWorkers, this.max)
+ super(min, filePath, opts, max)
+ checkDynamicPoolSize(
+ this.minimumNumberOfWorkers,
+ this.maximumNumberOfWorkers as number
+ )
}
/** @inheritDoc */
public constructor (
numberOfWorkers: number,
filePath: string,
- protected readonly opts: PoolOptions<Worker> = {}
+ opts: PoolOptions<Worker> = {},
+ maximumNumberOfWorkers?: number
) {
- super(numberOfWorkers, filePath, opts)
+ super(numberOfWorkers, filePath, opts, maximumNumberOfWorkers)
}
/** @inheritDoc */
-import { cpus } from 'node:os'
import {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ buildInternalWorkerChoiceStrategyOptions
} from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import type {
IWorkerChoiceStrategy,
+ InternalWorkerChoiceStrategyOptions,
MeasurementStatisticsRequirements,
StrategyPolicy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
*/
public constructor (
protected readonly pool: IPool<Worker, Data, Response>,
- protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ protected opts: InternalWorkerChoiceStrategyOptions
) {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ this.opts = buildInternalWorkerChoiceStrategyOptions(
+ this.pool.info.maxSize,
+ this.opts
+ )
+ this.setTaskStatisticsRequirements(this.opts)
this.choose = this.choose.bind(this)
}
protected setTaskStatisticsRequirements (
- opts: WorkerChoiceStrategyOptions
+ opts: InternalWorkerChoiceStrategyOptions
): void {
this.toggleMedianMeasurementStatisticsRequirements(
this.taskStatisticsRequirements.runTime,
public abstract remove (workerNodeKey: number): boolean
/** @inheritDoc */
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ public setOptions (opts: InternalWorkerChoiceStrategyOptions): void {
+ this.opts = buildInternalWorkerChoiceStrategyOptions(
+ this.pool.info.maxSize,
+ opts
+ )
this.setTaskStatisticsRequirements(this.opts)
}
protected setPreviousWorkerNodeKey (workerNodeKey: number | undefined): void {
this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey
}
-
- protected computeDefaultWorkerWeight (): number {
- let cpusCycleTimeWeight = 0
- for (const cpu of cpus()) {
- // CPU estimated cycle time
- const numberOfDigits = cpu.speed.toString().length - 1
- const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
- cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
- }
- return Math.round(cpusCycleTimeWeight / cpus().length)
- }
}
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker, StrategyData } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import {
type IWorkerChoiceStrategy,
+ type InternalWorkerChoiceStrategyOptions,
Measurements,
- type TaskStatisticsRequirements,
- type WorkerChoiceStrategyOptions
+ type TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
import type { IWorker } from '../worker'
import type { IPool } from '../pool'
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
* Round id.
*/
private roundId: number = 0
- /**
- * Default worker weight.
- */
- private readonly defaultWorkerWeight: number
/**
* Round weights.
*/
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
- this.defaultWorkerWeight = this.computeDefaultWorkerWeight()
this.roundWeights = this.getRoundWeights()
}
) {
this.workerNodeVirtualTaskRunTime = 0
}
- const workerWeight =
- this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
+ const workerWeight = this.opts.weights?.[workerNodeKey] as number
if (
this.isWorkerNodeReady(workerNodeKey) &&
workerWeight >= this.roundWeights[roundIndex] &&
}
/** @inheritDoc */
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
+ public setOptions (opts: InternalWorkerChoiceStrategyOptions): void {
super.setOptions(opts)
this.roundWeights = this.getRoundWeights()
}
private getRoundWeights (): number[] {
- if (this.opts.weights == null) {
- return [this.defaultWorkerWeight]
- }
return [
...new Set(
- Object.values(this.opts.weights)
+ Object.values(this.opts.weights as Record<number, number>)
.slice()
.sort((a, b) => a - b)
)
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
- this.setTaskStatisticsRequirements(this.opts)
}
/** @inheritDoc */
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions
} from './selection-strategies-types'
/**
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
- this.setTaskStatisticsRequirements(this.opts)
}
/** @inheritDoc */
* Worker choice strategy options.
*/
export interface WorkerChoiceStrategyOptions {
- /**
- * Number of worker choice retries to perform if no worker is eligible.
- *
- * @defaultValue 6
- */
- readonly retries?: number
/**
* Measurement to use in worker choice strategy supporting it.
*/
*
* @defaultValue Weights computed automatically given the CPU performance.
*/
- readonly weights?: Record<number, number>
+ weights?: Record<number, number>
+}
+
+/**
+ * Worker choice strategy internal options.
+ *
+ * @internal
+ */
+export interface InternalWorkerChoiceStrategyOptions
+ extends WorkerChoiceStrategyOptions {
+ /**
+ * Number of worker choice retries to perform if no worker is eligible.
+ *
+ * @defaultValue pool maximum size
+ */
+ readonly retries?: number
}
/**
import type { IWorker } from '../worker'
import type { IPool } from '../pool'
-import {
- DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
-} from '../../utils'
+import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../../utils'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ TaskStatisticsRequirements
} from './selection-strategies-types'
/**
elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
}
- /**
- * Default worker weight.
- */
- private readonly defaultWorkerWeight: number
/**
* Worker node virtual task runtime.
*/
/** @inheritDoc */
public constructor (
pool: IPool<Worker, Data, Response>,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ opts: InternalWorkerChoiceStrategyOptions
) {
super(pool, opts)
this.setTaskStatisticsRequirements(this.opts)
- this.defaultWorkerWeight = this.computeDefaultWorkerWeight()
}
/** @inheritDoc */
}
private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
- const workerWeight =
- this.opts.weights?.[
- this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
- ] ?? this.defaultWorkerWeight
+ const workerWeight = this.opts.weights?.[
+ this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+ ] as number
if (this.workerNodeVirtualTaskRunTime < workerWeight) {
this.workerNodeVirtualTaskRunTime =
this.workerNodeVirtualTaskRunTime +
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import { buildInternalWorkerChoiceStrategyOptions } from '../../utils'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
+ InternalWorkerChoiceStrategyOptions,
StrategyPolicy,
TaskStatisticsRequirements,
- WorkerChoiceStrategy,
- WorkerChoiceStrategyOptions
+ WorkerChoiceStrategy
} from './selection-strategies-types'
import { WorkerChoiceStrategies } from './selection-strategies-types'
import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy'
public constructor (
pool: IPool<Worker, Data, Response>,
private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
- private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ private opts?: InternalWorkerChoiceStrategyOptions
) {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ this.opts = buildInternalWorkerChoiceStrategyOptions(
+ pool.info.maxSize,
+ this.opts
+ )
this.execute = this.execute.bind(this)
this.workerChoiceStrategies = new Map<
WorkerChoiceStrategy,
WorkerChoiceStrategies.ROUND_ROBIN,
new (RoundRobinWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.LEAST_USED,
new (LeastUsedWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.LEAST_BUSY,
new (LeastBusyWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.LEAST_ELU,
new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
WorkerChoiceStrategies.FAIR_SHARE,
new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
pool,
- opts
+ this.opts
)
],
[
Worker,
Data,
Response
- >(pool, opts)
+ >(pool, this.opts)
],
[
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN,
Worker,
Data,
Response
- >(pool, opts)
+ >(pool, this.opts)
]
])
}
*
* @returns The key of the worker node.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
- * @throws {@link https://nodejs.org/api/errors.html#class-rangeerror} If the maximum consecutive worker choice strategy executions has been reached.
*/
public execute (): number {
const workerChoiceStrategy = this.workerChoiceStrategies.get(
this.workerChoiceStrategy
) as IWorkerChoiceStrategy
+ if (!workerChoiceStrategy.hasPoolWorkerNodesReady()) {
+ return this.execute()
+ }
+ return this.executeStrategy(workerChoiceStrategy)
+ }
+
+ /**
+ * Executes the given worker choice strategy.
+ *
+ * @param workerChoiceStrategy - The worker choice strategy.
+ * @returns The key of the worker node.
+ * @throws {@link https://nodejs.org/api/errors.html#class-error} If after configured retries the worker node key is null or undefined.
+ */
+ private executeStrategy (workerChoiceStrategy: IWorkerChoiceStrategy): number {
let workerNodeKey: number | undefined
- const maxExecutionCount = 10000
- let executionCount = 0
let chooseCount = 0
let retriesCount = 0
do {
- if (workerChoiceStrategy.hasPoolWorkerNodesReady()) {
- workerNodeKey = workerChoiceStrategy.choose()
- if (chooseCount > 0) {
- retriesCount++
- }
- chooseCount++
+ workerNodeKey = workerChoiceStrategy.choose()
+ if (workerNodeKey == null && chooseCount > 0) {
+ retriesCount++
}
- executionCount++
+ chooseCount++
} while (
- executionCount < maxExecutionCount &&
- (!workerChoiceStrategy.hasPoolWorkerNodesReady() ||
- (workerNodeKey == null && retriesCount < (this.opts.retries as number)))
+ workerNodeKey == null &&
+ retriesCount < (this.opts?.retries as number)
)
- if (executionCount >= maxExecutionCount) {
- throw new RangeError(
- `Worker choice strategy consecutive executions has exceeded the maximum of ${maxExecutionCount}`
- )
- }
if (workerNodeKey == null) {
throw new Error(
`Worker node key chosen is null or undefined after ${retriesCount} retries`
/**
* Sets the worker choice strategies in the context options.
*
+ * @param pool - The pool instance.
* @param opts - The worker choice strategy options.
*/
- public setOptions (opts: WorkerChoiceStrategyOptions): void {
- this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
+ public setOptions (
+ pool: IPool<Worker, Data, Response>,
+ opts?: InternalWorkerChoiceStrategyOptions
+ ): void {
+ this.opts = buildInternalWorkerChoiceStrategyOptions(
+ pool.info.maxSize,
+ opts
+ )
for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
- workerChoiceStrategy.setOptions(opts)
+ workerChoiceStrategy.setOptions(this.opts)
}
}
}
*/
public constructor (
min: number,
- protected readonly max: number,
+ max: number,
filePath: string,
opts: PoolOptions<Worker> = {}
) {
- super(min, filePath, opts)
- checkDynamicPoolSize(this.numberOfWorkers, this.max)
+ super(min, filePath, opts, max)
+ checkDynamicPoolSize(
+ this.minimumNumberOfWorkers,
+ this.maximumNumberOfWorkers as number
+ )
}
/** @inheritDoc */
public constructor (
numberOfThreads: number,
filePath: string,
- protected readonly opts: PoolOptions<Worker> = {}
+ opts: PoolOptions<Worker> = {},
+ maximumNumberOfThreads?: number
) {
- super(numberOfThreads, filePath, opts)
+ super(numberOfThreads, filePath, opts, maximumNumberOfThreads)
}
/** @inheritDoc */
import { getRandomValues } from 'node:crypto'
import { Worker as ClusterWorker } from 'node:cluster'
import { Worker as ThreadWorker } from 'node:worker_threads'
+import { cpus } from 'node:os'
import type {
- MeasurementStatisticsRequirements,
- WorkerChoiceStrategyOptions
+ InternalWorkerChoiceStrategyOptions,
+ MeasurementStatisticsRequirements
} from './pools/selection-strategies/selection-strategies-types'
import type { KillBehavior } from './worker/worker-options'
import { type IWorker, type WorkerType, WorkerTypes } from './pools/worker'
})
/**
- * Default worker choice strategy options.
+ * Gets default worker choice strategy options.
+ *
+ * @param retries - The number of worker choice retries.
+ * @returns The default worker choice strategy options.
*/
-export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions =
- {
- retries: 6,
+const getDefaultInternalWorkerChoiceStrategyOptions = (
+ retries: number
+): InternalWorkerChoiceStrategyOptions => {
+ return {
+ retries,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
}
+}
/**
* Default measurement statistics requirements.
return result
}
}
+
+const clone = <T extends object>(object: T): T => {
+ return JSON.parse(JSON.stringify(object)) as T
+}
+
+export const buildInternalWorkerChoiceStrategyOptions = (
+ poolMaxSize: number,
+ opts?: InternalWorkerChoiceStrategyOptions
+): InternalWorkerChoiceStrategyOptions => {
+ opts = clone(opts ?? {})
+ if (opts?.weights == null) {
+ opts.weights = getDefaultWeights(poolMaxSize)
+ }
+ return {
+ ...getDefaultInternalWorkerChoiceStrategyOptions(
+ poolMaxSize + Object.keys(opts.weights).length
+ ),
+ ...opts
+ }
+}
+
+const getDefaultWeights = (
+ poolMaxSize: number,
+ defaultWorkerWeight: number = getDefaultWorkerWeight()
+): Record<number, number> => {
+ const weights: Record<number, number> = {}
+ for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
+ weights[workerNodeKey] = defaultWorkerWeight
+ }
+ return weights
+}
+
+const getDefaultWorkerWeight = (): number => {
+ let cpusCycleTimeWeight = 0
+ for (const cpu of cpus()) {
+ // CPU estimated cycle time
+ const numberOfDigits = cpu.speed.toString().length - 1
+ const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
+ cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
+ }
+ return Math.round(cpusCycleTimeWeight / cpus().length)
+}
)
})
+ it('Verify that pool arguments number and pool type are checked', () => {
+ expect(
+ () =>
+ new FixedThreadPool(
+ numberOfWorkers,
+ './tests/worker-files/thread/testWorker.mjs',
+ undefined,
+ numberOfWorkers * 2
+ )
+ ).toThrow(
+ new Error(
+ 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
+ )
+ )
+ })
+
it('Verify that dynamic pool sizing is checked', () => {
expect(
() =>
enableEvents: true,
restartWorkerOnError: true,
enableTasksQueue: false,
- workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
- workerChoiceStrategyOptions: {
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- }
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(workerChoiceStrategy.opts).toStrictEqual(
+ expect.objectContaining({
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ })
+ )
}
await pool.destroy()
const testHandler = () => console.info('test handler executed')
},
workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
workerChoiceStrategyOptions: {
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
weights: { 0: 300, 1: 200 }
},
onlineHandler: testHandler,
exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.opts.workerChoiceStrategyOptions.weights).length,
runTime: { median: true },
waitTime: { median: false },
elu: { median: false },
}
)
).toThrow(new Error("Invalid worker choice strategy 'invalidStrategy'"))
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: 'invalidChoiceRetries'
- }
- }
- )
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.mjs',
- {
- workerChoiceStrategyOptions: {
- retries: -1
- }
- }
- )
- ).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(
() =>
new FixedThreadPool(
'./tests/worker-files/thread/testWorker.mjs',
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(workerChoiceStrategy.opts).toStrictEqual(
+ expect.objectContaining({
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ })
+ )
}
expect(
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
elu: { median: true }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: true },
- waitTime: { median: false },
elu: { median: true }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: true },
waitTime: { median: false },
- elu: { median: true }
+ elu: { median: true },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: true }
- })
+ expect(workerChoiceStrategy.opts).toStrictEqual(
+ expect.objectContaining({
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: true }
+ })
+ )
}
expect(
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
elu: { median: false }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
runTime: { median: false },
- waitTime: { median: false },
elu: { median: false }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
- expect(workerChoiceStrategy.opts).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(workerChoiceStrategy.opts).toStrictEqual(
+ expect.objectContaining({
+ retries:
+ pool.info.maxSize +
+ Object.keys(workerChoiceStrategy.opts.weights).length,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ })
+ )
}
expect(
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
'Invalid worker choice strategy options: must be a plain object'
)
)
- expect(() =>
- pool.setWorkerChoiceStrategyOptions({
- retries: 'invalidChoiceRetries'
- })
- ).toThrow(
- new TypeError(
- 'Invalid worker choice strategy options: retries must be an integer'
- )
- )
- expect(() => pool.setWorkerChoiceStrategyOptions({ retries: -1 })).toThrow(
- new RangeError(
- "Invalid worker choice strategy options: retries '-1' must be greater or equal than zero"
- )
- )
expect(() => pool.setWorkerChoiceStrategyOptions({ weights: {} })).toThrow(
new Error(
'Invalid worker choice strategy options: must have a weight for each worker node'
await pool.destroy()
const elapsedTime = performance.now() - startTime
expect(tasksFinished).toBe(0)
- expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 300)
+ expect(elapsedTime).toBeLessThanOrEqual(tasksFinishedTimeout + 800)
})
it('Verify that pool asynchronous resource track tasks execution', async () => {
expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
workerChoiceStrategy
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 6,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
await pool.destroy()
}
max,
'./tests/worker-files/cluster/testWorker.js'
)
- pool.setWorkerChoiceStrategy(workerChoiceStrategy, { retries: 3 })
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
workerChoiceStrategy
)
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 3,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
+ expect(pool.opts.workerChoiceStrategyOptions).toBeUndefined()
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
- retries: 3,
+ retries:
+ pool.info.maxSize +
+ Object.keys(pool.workerChoiceStrategyContext.opts.weights).length,
runTime: { median: false },
waitTime: { median: false },
- elu: { median: false }
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [pool.info.maxSize - 1]: expect.any(Number)
+ })
})
await pool.destroy()
}
if (
workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
) {
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
workerChoiceStrategy ===
WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
) {
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).roundWeights
- ).toStrictEqual([
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ])
+ ).roundWeights.length
+ ).toBe(1)
}
}
await pool.destroy()
})
+ it('Verify strategies wait for worker node readiness in dynamic pool', async () => {
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.mjs'
+ )
+ expect(pool.starting).toBe(false)
+ expect(pool.workerNodes.length).toBe(min)
+ const maxMultiplier = 10000
+ const promises = new Set()
+ for (let i = 0; i < max * maxMultiplier; i++) {
+ promises.add(pool.execute())
+ }
+ await Promise.all(promises)
+ expect(pool.workerNodes.length).toBe(max)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify ROUND_ROBIN strategy default policy', async () => {
const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
let pool = new FixedThreadPool(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
+ ).toEqual(expect.any(Number))
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toEqual(expect.any(Number))
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
+ ).toEqual(expect.any(Number))
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).nextWorkerNodeKey
- ).toBe(0)
+ ).toEqual(expect.any(Number))
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).previousWorkerNodeKey
- ).toBe(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
+ ).toEqual(expect.any(Number))
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
workerChoiceStrategy
).previousWorkerNodeKey
).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
pool.workerChoiceStrategyContext.workerChoiceStrategy
).previousWorkerNodeKey
).toBe(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
workerChoiceStrategy
).previousWorkerNodeKey
).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
pool.workerChoiceStrategyContext.workerChoiceStrategy
).previousWorkerNodeKey
).toBe(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
await pool.destroy()
})
- it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
+ it.skip('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => {
const pool = new FixedThreadPool(
max,
'./tests/worker-files/thread/testWorker.mjs',
max * maxMultiplier
)
}
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).roundWeights
- ).toStrictEqual([
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ])
+ ).roundWeights.length
+ ).toBe(1)
// We need to clean up the resources after our test
await pool.destroy()
})
- it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
+ it.skip('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
const pool = new DynamicThreadPool(
min,
max,
max * maxMultiplier
)
}
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).roundWeights
- ).toStrictEqual([
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ])
+ ).roundWeights.length
+ ).toBe(1)
// We need to clean up the resources after our test
await pool.destroy()
})
workerChoiceStrategy
).previousWorkerNodeKey
).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).roundWeights
- ).toStrictEqual([
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ])
+ ).roundWeights.length
+ ).toBe(1)
await pool.destroy()
pool = new DynamicThreadPool(
min,
workerChoiceStrategy
).previousWorkerNodeKey
).toBeDefined()
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ).toBeGreaterThan(0)
- expect(
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).roundWeights
- ).toStrictEqual([
- pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
- pool.workerChoiceStrategyContext.workerChoiceStrategy
- ).defaultWorkerWeight
- ])
+ ).roundWeights.length
+ ).toBe(1)
// We need to clean up the resources after our test
await pool.destroy()
})
workerChoiceStrategyUndefinedStub
)
expect(() => workerChoiceStrategyContext.execute()).toThrow(
- new Error('Worker node key chosen is null or undefined after 6 retries')
+ new Error(
+ `Worker node key chosen is null or undefined after ${
+ fixedPool.info.maxSize +
+ Object.keys(workerChoiceStrategyContext.opts.weights).length
+ } retries`
+ )
)
const workerChoiceStrategyNullStub = createStubInstance(
RoundRobinWorkerChoiceStrategy,
workerChoiceStrategyNullStub
)
expect(() => workerChoiceStrategyContext.execute()).toThrow(
- new Error('Worker node key chosen is null or undefined after 6 retries')
+ new Error(
+ `Worker node key chosen is null or undefined after ${
+ fixedPool.info.maxSize +
+ Object.keys(workerChoiceStrategyContext.opts.weights).length
+ } retries`
+ )
)
})
.returns(false)
.onCall(4)
.returns(false)
- .onCall(6)
- .returns(false)
- .onCall(7)
- .returns(false)
- .onCall(8)
- .returns(false)
.returns(true),
choose: stub().returns(1)
}
workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategyContext.workerChoiceStrategy
).hasPoolWorkerNodesReady.callCount
- ).toBe(12)
+ ).toBe(6)
expect(
workerChoiceStrategyContext.workerChoiceStrategies.get(
workerChoiceStrategyContext.workerChoiceStrategy
expect(chosenWorkerKey).toBe(1)
})
- it('Verify that execute() throws error if worker choice strategy consecutive executions has been reached', () => {
+ it('Verify that execute() throws error if worker choice strategy recursion reach the maximum depth', () => {
const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
fixedPool
)
workerChoiceStrategyStub
)
expect(() => workerChoiceStrategyContext.execute()).toThrow(
- new RangeError(
- 'Worker choice strategy consecutive executions has exceeded the maximum of 10000'
- )
+ new RangeError('Maximum call stack size exceeded')
)
})
import {
DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
DEFAULT_TASK_NAME,
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
availableParallelism,
average,
+ buildInternalWorkerChoiceStrategyOptions,
exponentialDelay,
getWorkerId,
getWorkerType,
expect(EMPTY_FUNCTION).toStrictEqual(expect.any(Function))
})
- it('Verify DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS values', () => {
- expect(DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
- })
- })
-
it('Verify DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS values', () => {
expect(DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS).toStrictEqual({
aggregate: false,
expect(max(1, 1)).toBe(1)
})
+ it('Verify buildInternalWorkerChoiceStrategyOptions() behavior', () => {
+ const poolMaxSize = 10
+ const internalWorkerChoiceStrategyOptions =
+ buildInternalWorkerChoiceStrategyOptions(poolMaxSize)
+ expect(internalWorkerChoiceStrategyOptions).toStrictEqual({
+ retries:
+ poolMaxSize +
+ Object.keys(internalWorkerChoiceStrategyOptions.weights).length,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: expect.objectContaining({
+ 0: expect.any(Number),
+ [poolMaxSize - 1]: expect.any(Number)
+ })
+ })
+ })
+
// it('Verify once()', () => {
// let called = 0
// const fn = () => ++called