## [Unreleased]
+### Fixed
+
+- Don't account worker usage statistics for tasks that have failed.
+
+### Changed
+
+- Update simple moving average implementation to use a circular buffer.
+- Update simple moving median implementation to use a circular buffer.
+
## [2.6.34] - 2023-08-24
### Fixes
- `choiceRetries` (optional) - The number of retries to perform if no worker is eligible.
- `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- - `runTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) runtime instead of the tasks average runtime in worker choice strategies.
- - `waitTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) wait time instead of the tasks average wait time in worker choice strategies.
- - `elu` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) ELU instead of the tasks average ELU in worker choice strategies.
+ - `runTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies.
+ - `waitTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies.
+ - `elu` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies.
- `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`.
Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
- [Weighted round robin](#weighted-round-robin)
- [Interleaved weighted round robin](#interleaved-weighted-round-robin)
- [Statistics](#statistics)
- - [Median](#median)
+ - [Simple moving median](#simple-moving-median)
## Strategies
### Fair share
-Its goal is to distribute the load evenly across all workers. To achieve this, the strategy keeps track of the average task execution time for each worker and assigns the next task to the worker with the lowest task end prediction time: `task_end_prediction = max(current_time, task_end_prediction) + average_task_execution_time`.
-By default, the strategy uses the average task execution time for each worker but it can be configured to use the average task event loop utilization (ELU) active time instead.
+Its goal is to distribute the load evenly across all workers. To achieve this, the strategy keeps track of the simple moving average task execution time for each worker and assigns the next task to the worker with the lowest task end prediction time: `task_end_prediction = max(current_time, task_end_prediction) + simple_moving_average_task_execution_time`.
+By default, the strategy uses the simple moving average task execution time for each worker but it can be configured to use the simple moving average task event loop utilization (ELU) active time instead.
### Weighted round robin
Worker choice strategies enable only the statistics that are needed to choose the next worker to avoid unnecessary overhead.
-### Median
+### Simple moving median
-Strategies using the average task execution time for each worker can use the median instead. Median is more robust to outliers and can be used to avoid assigning tasks to workers that are currently overloaded. Median usage introduces a small overhead: measurement history must be kept for each worker and the median must be recomputed each time a task has finished.
+Strategies using the average task execution time for each worker can use the simple moving median instead. Simple moving median is more robust to outliers and can be used to avoid assigning tasks to workers that are currently overloaded. Simple moving median usage introduces a small overhead: measurement history must be kept for each worker and the simple moving median must be recomputed each time a task has finished.
// Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
-const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024
+export const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024
/**
* Array with a maximum length and shifting items when full.
DEFAULT_TASK_NAME,
DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
EMPTY_FUNCTION,
+ average,
isKillBehavior,
isPlainObject,
median,
)
),
average: round(
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
- 0
- ) /
- this.workerNodes.reduce(
+ average(
+ this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.executed ?? 0),
- 0
+ accumulator.concat(workerNode.usage.runTime.history),
+ []
)
+ )
),
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.median && {
)
),
average: round(
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
- 0
- ) /
- this.workerNodes.reduce(
+ average(
+ this.workerNodes.reduce<number[]>(
(accumulator, workerNode) =>
- accumulator + (workerNode.usage.tasks?.executed ?? 0),
- 0
+ accumulator.concat(workerNode.usage.waitTime.history),
+ []
)
+ )
),
...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.waitTime.median && {
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
+ if (message.taskError != null) {
+ return
+ }
updateMeasurementStatistics(
workerUsage.runTime,
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
- message.taskPerformance?.runTime ?? 0,
- workerUsage.tasks.executed
+ message.taskPerformance?.runTime ?? 0
)
}
updateMeasurementStatistics(
workerUsage.waitTime,
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
- taskWaitTime,
- workerUsage.tasks.executed
+ taskWaitTime
)
}
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
+ if (message.taskError != null) {
+ return
+ }
const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
updateMeasurementStatistics(
workerUsage.elu.active,
eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.active ?? 0,
- workerUsage.tasks.executed
+ message.taskPerformance?.elu?.active ?? 0
)
updateMeasurementStatistics(
workerUsage.elu.idle,
eluTaskStatisticsRequirements,
- message.taskPerformance?.elu?.idle ?? 0,
- workerUsage.tasks.executed
+ message.taskPerformance?.elu?.idle ?? 0
)
if (eluTaskStatisticsRequirements.aggregate) {
if (message.taskPerformance?.elu != null) {
// return delay + randomSum
// }
+/**
+ * Computes the average of the given data set.
+ *
+ * @param dataSet - Data set.
+ * @returns The average of the given data set.
+ * @internal
+ */
+export const average = (dataSet: number[]): number => {
+ if (Array.isArray(dataSet) && dataSet.length === 0) {
+ return 0
+ }
+ if (Array.isArray(dataSet) && dataSet.length === 1) {
+ return dataSet[0]
+ }
+ return (
+ dataSet.reduce((accumulator, number) => accumulator + number, 0) /
+ dataSet.length
+ )
+}
+
/**
* Computes the median of the given data set.
*
export const updateMeasurementStatistics = (
measurementStatistics: MeasurementStatistics,
measurementRequirements: MeasurementStatisticsRequirements,
- measurementValue: number,
- numberOfMeasurements: number
+ measurementValue: number
): void => {
if (measurementRequirements.aggregate) {
measurementStatistics.aggregate =
measurementValue,
measurementStatistics.maximum ?? -Infinity
)
- if (measurementRequirements.average && numberOfMeasurements !== 0) {
- measurementStatistics.average =
- measurementStatistics.aggregate / numberOfMeasurements
- }
- if (measurementRequirements.median && measurementValue != null) {
+ if (
+ (measurementRequirements.average || measurementRequirements.median) &&
+ measurementValue != null
+ ) {
measurementStatistics.history.push(measurementValue)
- measurementStatistics.median = median(measurementStatistics.history)
+ if (measurementRequirements.average) {
+ measurementStatistics.average = average(measurementStatistics.history)
+ }
+ if (measurementRequirements.median) {
+ measurementStatistics.median = median(measurementStatistics.history)
+ }
}
}
}
}
}
+ afterEach(() => {
+ sinon.restore()
+ })
+
it('Simulate pool creation from a non main thread/process', () => {
expect(
() =>
const { expect } = require('expect')
-const { CircularArray } = require('../lib/circular-array')
+const {
+ CircularArray,
+ DEFAULT_CIRCULAR_ARRAY_SIZE
+} = require('../lib/circular-array')
const {
availableParallelism,
+ average,
isAsyncFunction,
isKillBehavior,
isPlainObject,
expect(Number.isSafeInteger(availableParallelism())).toBe(true)
})
+ it('Verify average() computation', () => {
+ expect(average([])).toBe(0)
+ expect(average([0.08])).toBe(0.08)
+ expect(average([0.25, 4.75, 3.05, 6.04, 1.01, 2.02, 5.03])).toBe(
+ 3.1642857142857146
+ )
+ expect(average([0.25, 4.75, 3.05, 6.04, 1.01, 2.02])).toBe(
+ 2.8533333333333335
+ )
+ })
+
it('Verify median() computation', () => {
expect(median([])).toBe(0)
expect(median([0.08])).toBe(0.08)
updateMeasurementStatistics(
measurementStatistics,
{ aggregate: true, average: false, median: false },
- 0.01,
- 1
+ 0.01
)
expect(measurementStatistics).toStrictEqual({
aggregate: 0.01,
maximum: 0.01,
minimum: 0.01,
- history: expect.any(CircularArray)
+ history: new CircularArray()
})
updateMeasurementStatistics(
measurementStatistics,
{ aggregate: true, average: false, median: false },
- 0.02,
- 2
+ 0.02
)
expect(measurementStatistics).toStrictEqual({
aggregate: 0.03,
maximum: 0.02,
minimum: 0.01,
- history: expect.any(CircularArray)
+ history: new CircularArray()
})
updateMeasurementStatistics(
measurementStatistics,
{ aggregate: true, average: true, median: false },
- 0.001,
- 3
+ 0.001
)
expect(measurementStatistics).toStrictEqual({
aggregate: 0.031,
maximum: 0.02,
minimum: 0.001,
- average: 0.010333333333333333,
- history: expect.any(CircularArray)
+ average: 0.001,
+ history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001)
+ })
+ updateMeasurementStatistics(
+ measurementStatistics,
+ { aggregate: true, average: true, median: false },
+ 0.003
+ )
+ expect(measurementStatistics).toStrictEqual({
+ aggregate: 0.034,
+ maximum: 0.02,
+ minimum: 0.001,
+ average: 0.002,
+ history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001, 0.003)
})
})
})
}
}
+ afterEach(() => {
+ sinon.restore()
+ })
+
it('Verify worker options default values', () => {
const worker = new ThreadWorker(() => {})
expect(worker.opts.maxInactiveTime).toStrictEqual(60000)