- Fix race condition between ready and task functions worker message handling at startup.
- Fix duplicate task usage statistics computation per task function.
+### Added
+
+- Add back pressure detection on the worker node queue. Event `backPressure` is emitted when the worker node queue is full (size > poolMaxSize^2).
+- Use back pressure detection in worker choice strategies.
+- Add worker choice strategies retries mechanism if no worker is eligible.
+
## [2.6.28] - 2023-08-16
### Fixed
- `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.
Properties:
+ - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible.
- `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- `runTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) runtime instead of the tasks average runtime in worker choice strategies.
- `waitTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) wait time instead of the tasks average wait time in worker choice strategies.
- `elu` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) ELU instead of the tasks average ELU in worker choice strategies.
- `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
- Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
+ Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.
Default: `true`
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
- this.opts.workerChoiceStrategyOptions =
- opts.workerChoiceStrategyOptions ??
- DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.opts.workerChoiceStrategyOptions = {
+ ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+ ...opts.workerChoiceStrategyOptions
+ }
this.checkValidWorkerChoiceStrategyOptions(
this.opts.workerChoiceStrategyOptions
)
'Invalid worker choice strategy options: must be a plain object'
)
}
+ if (
+ workerChoiceStrategyOptions.choiceRetries != null &&
+ !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
+ ) {
+ throw new TypeError(
+ 'Invalid worker choice strategy options: choice retries must be an integer'
+ )
+ }
+ if (
+ workerChoiceStrategyOptions.choiceRetries != null &&
+ workerChoiceStrategyOptions.choiceRetries <= 0
+ ) {
+ throw new RangeError(
+ `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
+ )
+ }
if (
workerChoiceStrategyOptions.weights != null &&
Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
): void {
this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
- this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
+ this.opts.workerChoiceStrategyOptions = {
+ ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
+ ...workerChoiceStrategyOptions
+ }
this.workerChoiceStrategyContext.setOptions(
this.opts.workerChoiceStrategyOptions
)
protected readonly pool: IPool<Worker, Data, Response>,
protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
+ this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
this.choose = this.choose.bind(this)
}
/** @inheritDoc */
public setOptions (opts: WorkerChoiceStrategyOptions): void {
- this.opts = opts ?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
this.setTaskStatisticsRequirements(this.opts)
}
* @param workerNodeKey - The worker node key.
* @returns Whether the worker node is ready or not.
*/
- protected isWorkerNodeReady (workerNodeKey: number): boolean {
+ private isWorkerNodeReady (workerNodeKey: number): boolean {
return this.pool.workerNodes[workerNodeKey].info.ready
}
* @param workerNodeKey - The worker node key.
* @returns `true` if the worker node has back pressure, `false` otherwise.
*/
- protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
+ private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
}
+ /**
+ * Whether the worker node is eligible or not.
+ * A worker node is eligible if it is ready and does not have back pressure.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @returns `true` if the worker node is eligible, `false` otherwise.
+ * @see {@link isWorkerNodeReady}
+ * @see {@link hasWorkerNodeBackPressure}
+ */
+ protected isWorkerNodeEligible (workerNodeKey: number): boolean {
+ return (
+ this.isWorkerNodeReady(workerNodeKey) &&
+ !this.hasWorkerNodeBackPressure(workerNodeKey)
+ )
+ }
+
/**
* Gets the worker task runtime.
* If the task statistics require the average runtime, the average runtime is returned.
const workerVirtualTaskEndTimestamp =
this.workersVirtualTaskEndTimestamp[workerNodeKey]
if (
- this.isWorkerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeEligible(workerNodeKey) &&
workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
) {
minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
const workerWeight =
this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
if (
- this.isWorkerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeEligible(workerNodeKey) &&
workerWeight >= this.roundWeights[roundIndex]
) {
roundId = roundIndex
const workerTime =
(workerNode.usage.runTime?.aggregate ?? 0) +
(workerNode.usage.waitTime?.aggregate ?? 0)
- if (this.isWorkerNodeReady(workerNodeKey) && workerTime === 0) {
+ if (this.isWorkerNodeEligible(workerNodeKey) && workerTime === 0) {
this.nextWorkerNodeKey = workerNodeKey
break
} else if (
- this.isWorkerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeEligible(workerNodeKey) &&
workerTime < minTime
) {
minTime = workerTime
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerUsage = workerNode.usage
const workerElu = workerUsage.elu?.active?.aggregate ?? 0
- if (this.isWorkerNodeReady(workerNodeKey) && workerElu === 0) {
+ if (this.isWorkerNodeEligible(workerNodeKey) && workerElu === 0) {
this.nextWorkerNodeKey = workerNodeKey
break
} else if (
- this.isWorkerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeEligible(workerNodeKey) &&
workerElu < minWorkerElu
) {
minWorkerElu = workerElu
workerTaskStatistics.executed +
workerTaskStatistics.executing +
workerTaskStatistics.queued
- if (this.isWorkerNodeReady(workerNodeKey) && workerTasks === 0) {
+ if (this.isWorkerNodeEligible(workerNodeKey) && workerTasks === 0) {
this.nextWorkerNodeKey = workerNodeKey
break
} else if (
- this.isWorkerNodeReady(workerNodeKey) &&
+ this.isWorkerNodeEligible(workerNodeKey) &&
workerTasks < minNumberOfTasks
) {
minNumberOfTasks = workerTasks
const chosenWorkerNodeKey = this.nextWorkerNodeKey
do {
this.roundRobinNextWorkerNodeKey()
- } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey))
+ } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey))
return chosenWorkerNodeKey
}
*/
export interface WorkerChoiceStrategyOptions {
/**
- * Measurement to use for worker choice strategy.
+ * Number of worker choice retries to perform if no worker is eligible.
+ *
+ * @defaultValue 6
+ */
+ readonly choiceRetries?: number
+ /**
+ * Measurement to use in worker choice strategy supporting it.
*/
readonly measurement?: Measurement
/**
const chosenWorkerNodeKey = this.nextWorkerNodeKey
do {
this.weightedRoundRobinNextWorkerNodeKey()
- } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey))
+ } while (!this.isWorkerNodeEligible(this.nextWorkerNodeKey))
return chosenWorkerNodeKey
}
IWorkerChoiceStrategy
>
+ /**
+ * The number of times the worker choice strategy in the context has been retried.
+ */
+ private choiceRetriesCount = 0
+
/**
* Worker choice strategy context constructor.
*
public constructor (
pool: IPool<Worker, Data, Response>,
private workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN,
- opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+ private opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
) {
+ this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
this.execute = this.execute.bind(this)
this.workerChoiceStrategies = new Map<
WorkerChoiceStrategy,
}
/**
- * Gets the worker choice strategy task statistics requirements in the context.
+ * Gets the worker choice strategy in the context task statistics requirements.
*
* @returns The task statistics requirements.
*/
}
/**
- * Updates the worker node key in the worker choice strategy internals in the context.
+ * Updates the worker node key in the worker choice strategy in the context internals.
*
* @returns `true` if the update is successful, `false` otherwise.
*/
}
/**
- * Executes the worker choice strategy algorithm in the context.
+ * Executes the worker choice strategy in the context algorithm.
*
* @returns The key of the worker node.
* @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker node key is null or undefined.
this.workerChoiceStrategy
) as IWorkerChoiceStrategy
).choose()
- if (workerNodeKey == null) {
+ if (
+ workerNodeKey == null &&
+ this.choiceRetriesCount < (this.opts.choiceRetries as number)
+ ) {
+ this.choiceRetriesCount++
+ return this.execute()
+ } else if (workerNodeKey == null) {
throw new TypeError('Worker node key chosen is null or undefined')
}
return workerNodeKey
* @param opts - The worker choice strategy options.
*/
public setOptions (opts: WorkerChoiceStrategyOptions): void {
+ this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
for (const workerChoiceStrategy of this.workerChoiceStrategies.values()) {
workerChoiceStrategy.setOptions(opts)
}
*/
export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions =
{
+ choiceRetries: 6,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
return availableParallelism
}
+/**
+ * Computes the retry delay in milliseconds using an exponential back off algorithm.
+ *
+ * @param retryNumber - The number of retries that have already been attempted
+ * @param maxDelayRatio - The maximum ratio of the delay that can be randomized
+ * @returns Delay in milliseconds
+ */
+export const exponentialDelay = (
+ retryNumber = 0,
+ maxDelayRatio = 0.2
+): number => {
+ const delay = Math.pow(2, retryNumber) * 100
+ const randomSum = delay * maxDelayRatio * Math.random() // 0-(maxDelayRatio*100)% of the delay
+ return delay + randomSum
+}
+
/**
* Computes the median of the given data set.
*
WorkerChoiceStrategies.ROUND_ROBIN
)
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ choiceRetries: 6,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ })
+ expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
WorkerChoiceStrategies.LEAST_USED
)
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: { 0: 300, 1: 200 }
+ })
+ expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+ choiceRetries: 6,
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: false },
weights: { 0: 300, 1: 200 }
})
expect(pool.opts.messageHandler).toStrictEqual(testHandler)
}
)
).toThrowError("Invalid worker choice strategy 'invalidStrategy'")
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.js',
- {
- workerChoiceStrategyOptions: 'invalidOptions'
- }
- )
- ).toThrowError(
- 'Invalid worker choice strategy options: must be a plain object'
- )
expect(
() =>
new FixedThreadPool(
{ workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
)
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ choiceRetries: 6,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ })
+ expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: false },
waitTime: { median: false },
elu: { median: false }
elu: { median: true }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ choiceRetries: 6,
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: true }
+ })
+ expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: true },
+ waitTime: { median: false },
elu: { median: true }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: true },
+ waitTime: { median: false },
elu: { median: true }
})
}
elu: { median: false }
})
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ })
+ expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
+ choiceRetries: 6,
+ runTime: { median: false },
+ waitTime: { median: false },
elu: { median: false }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
+ choiceRetries: 6,
runTime: { median: false },
+ waitTime: { median: false },
elu: { median: false }
})
}