From: Jérôme Benoit Date: Thu, 24 Aug 2023 17:32:08 +0000 (+0200) Subject: feat: use SMA and SMM for worker tasks usage X-Git-Tag: v2.6.35~25 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=dc021bcca72f8b4d185d96a301579612fba2793b;p=poolifier.git feat: use SMA and SMM for worker tasks usage Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 93289b8d..811b8397 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Don't account worker usage statistics for tasks that have failed. + +### Changed + +- Update simple moving average implementation to use a circular buffer. +- Update simple moving median implementation to use a circular buffer. + ## [2.6.34] - 2023-08-24 ### Fixes diff --git a/docs/api.md b/docs/api.md index e50ebf0d..5ffe92c0 100644 --- a/docs/api.md +++ b/docs/api.md @@ -76,9 +76,9 @@ An object with these properties: - `choiceRetries` (optional) - The number of retries to perform if no worker is eligible. - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`. - - `runTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) runtime instead of the tasks average runtime in worker choice strategies. - - `waitTime` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) wait time instead of the tasks average wait time in worker choice strategies. - - `elu` (optional) - Use the tasks [median](./../docs/worker-choice-strategies.md#median) ELU instead of the tasks average ELU in worker choice strategies. + - `runTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) runtime instead of the tasks simple moving average runtime in worker choice strategies. + - `waitTime` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) wait time instead of the tasks simple moving average wait time in worker choice strategies. + - `elu` (optional) - Use the tasks simple moving [median](./../docs/worker-choice-strategies.md#simple-moving-median) ELU instead of the tasks simple moving average ELU in worker choice strategies. - `weights` (optional) - The worker weights to use in weighted round robin worker choice strategies: `{ 0: 200, 1: 300, ..., n: 100 }`. Default: `{ choiceRetries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }` diff --git a/docs/worker-choice-strategies.md b/docs/worker-choice-strategies.md index 147a500e..296d8c3b 100644 --- a/docs/worker-choice-strategies.md +++ b/docs/worker-choice-strategies.md @@ -9,14 +9,14 @@ All duration or timestamp are expressed in milliseconds. - [Weighted round robin](#weighted-round-robin) - [Interleaved weighted round robin](#interleaved-weighted-round-robin) - [Statistics](#statistics) - - [Median](#median) + - [Simple moving median](#simple-moving-median) ## Strategies ### Fair share -Its goal is to distribute the load evenly across all workers. To achieve this, the strategy keeps track of the average task execution time for each worker and assigns the next task to the worker with the lowest task end prediction time: `task_end_prediction = max(current_time, task_end_prediction) + average_task_execution_time`. -By default, the strategy uses the average task execution time for each worker but it can be configured to use the average task event loop utilization (ELU) active time instead. +Its goal is to distribute the load evenly across all workers. To achieve this, the strategy keeps track of the simple moving average task execution time for each worker and assigns the next task to the worker with the lowest task end prediction time: `task_end_prediction = max(current_time, task_end_prediction) + simple_moving_average_task_execution_time`. +By default, the strategy uses the simple moving average task execution time for each worker but it can be configured to use the simple moving average task event loop utilization (ELU) active time instead. ### Weighted round robin @@ -32,6 +32,6 @@ The worker default weights are the same for all workers and is computed given th Worker choice strategies enable only the statistics that are needed to choose the next worker to avoid unnecessary overhead. -### Median +### Simple moving median -Strategies using the average task execution time for each worker can use the median instead. Median is more robust to outliers and can be used to avoid assigning tasks to workers that are currently overloaded. Median usage introduces a small overhead: measurement history must be kept for each worker and the median must be recomputed each time a task has finished. +Strategies using the average task execution time for each worker can use the simple moving median instead. Simple moving median is more robust to outliers and can be used to avoid assigning tasks to workers that are currently overloaded. Simple moving median usage introduces a small overhead: measurement history must be kept for each worker and the simple moving median must be recomputed each time a task has finished. diff --git a/src/circular-array.ts b/src/circular-array.ts index 9cea08bc..8f545179 100644 --- a/src/circular-array.ts +++ b/src/circular-array.ts @@ -1,6 +1,6 @@ // Copyright Jerome Benoit. 2021-2023. All Rights Reserved. -const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024 +export const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024 /** * Array with a maximum length and shifting items when full. diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 507fcc10..a11a5741 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -12,6 +12,7 @@ import { DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, + average, isKillBehavior, isPlainObject, median, @@ -427,16 +428,13 @@ export abstract class AbstractPool< ) ), average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( + average( + this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + accumulator.concat(workerNode.usage.runTime.history), + [] ) + ) ), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .runTime.median && { @@ -468,16 +466,13 @@ export abstract class AbstractPool< ) ), average: round( - this.workerNodes.reduce( - (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), - 0 - ) / - this.workerNodes.reduce( + average( + this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.executed ?? 0), - 0 + accumulator.concat(workerNode.usage.waitTime.history), + [] ) + ) ), ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .waitTime.median && { @@ -940,11 +935,13 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } updateMeasurementStatistics( workerUsage.runTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.runTime ?? 0 ) } @@ -957,8 +954,7 @@ export abstract class AbstractPool< updateMeasurementStatistics( workerUsage.waitTime, this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime, - workerUsage.tasks.executed + taskWaitTime ) } @@ -966,19 +962,20 @@ export abstract class AbstractPool< workerUsage: WorkerUsage, message: MessageValue ): void { + if (message.taskError != null) { + return + } const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu updateMeasurementStatistics( workerUsage.elu.active, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.active ?? 0 ) updateMeasurementStatistics( workerUsage.elu.idle, eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0, - workerUsage.tasks.executed + message.taskPerformance?.elu?.idle ?? 0 ) if (eluTaskStatisticsRequirements.aggregate) { if (message.taskPerformance?.elu != null) { diff --git a/src/utils.ts b/src/utils.ts index 77f889c9..d39bfa02 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -76,6 +76,26 @@ export const availableParallelism = (): number => { // return delay + randomSum // } +/** + * Computes the average of the given data set. + * + * @param dataSet - Data set. + * @returns The average of the given data set. + * @internal + */ +export const average = (dataSet: number[]): number => { + if (Array.isArray(dataSet) && dataSet.length === 0) { + return 0 + } + if (Array.isArray(dataSet) && dataSet.length === 1) { + return dataSet[0] + } + return ( + dataSet.reduce((accumulator, number) => accumulator + number, 0) / + dataSet.length + ) +} + /** * Computes the median of the given data set. * @@ -163,8 +183,7 @@ export const isAsyncFunction = ( export const updateMeasurementStatistics = ( measurementStatistics: MeasurementStatistics, measurementRequirements: MeasurementStatisticsRequirements, - measurementValue: number, - numberOfMeasurements: number + measurementValue: number ): void => { if (measurementRequirements.aggregate) { measurementStatistics.aggregate = @@ -177,13 +196,17 @@ export const updateMeasurementStatistics = ( measurementValue, measurementStatistics.maximum ?? -Infinity ) - if (measurementRequirements.average && numberOfMeasurements !== 0) { - measurementStatistics.average = - measurementStatistics.aggregate / numberOfMeasurements - } - if (measurementRequirements.median && measurementValue != null) { + if ( + (measurementRequirements.average || measurementRequirements.median) && + measurementValue != null + ) { measurementStatistics.history.push(measurementValue) - measurementStatistics.median = median(measurementStatistics.history) + if (measurementRequirements.average) { + measurementStatistics.average = average(measurementStatistics.history) + } + if (measurementRequirements.median) { + measurementStatistics.median = median(measurementStatistics.history) + } } } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 81250951..a5976d99 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -23,6 +23,10 @@ describe('Abstract pool test suite', () => { } } + afterEach(() => { + sinon.restore() + }) + it('Simulate pool creation from a non main thread/process', () => { expect( () => diff --git a/tests/utils.test.js b/tests/utils.test.js index f5a5366d..97b5e8a9 100644 --- a/tests/utils.test.js +++ b/tests/utils.test.js @@ -1,7 +1,11 @@ const { expect } = require('expect') -const { CircularArray } = require('../lib/circular-array') +const { + CircularArray, + DEFAULT_CIRCULAR_ARRAY_SIZE +} = require('../lib/circular-array') const { availableParallelism, + average, isAsyncFunction, isKillBehavior, isPlainObject, @@ -18,6 +22,17 @@ describe('Utils test suite', () => { expect(Number.isSafeInteger(availableParallelism())).toBe(true) }) + it('Verify average() computation', () => { + expect(average([])).toBe(0) + expect(average([0.08])).toBe(0.08) + expect(average([0.25, 4.75, 3.05, 6.04, 1.01, 2.02, 5.03])).toBe( + 3.1642857142857146 + ) + expect(average([0.25, 4.75, 3.05, 6.04, 1.01, 2.02])).toBe( + 2.8533333333333335 + ) + }) + it('Verify median() computation', () => { expect(median([])).toBe(0) expect(median([0.08])).toBe(0.08) @@ -137,39 +152,48 @@ describe('Utils test suite', () => { updateMeasurementStatistics( measurementStatistics, { aggregate: true, average: false, median: false }, - 0.01, - 1 + 0.01 ) expect(measurementStatistics).toStrictEqual({ aggregate: 0.01, maximum: 0.01, minimum: 0.01, - history: expect.any(CircularArray) + history: new CircularArray() }) updateMeasurementStatistics( measurementStatistics, { aggregate: true, average: false, median: false }, - 0.02, - 2 + 0.02 ) expect(measurementStatistics).toStrictEqual({ aggregate: 0.03, maximum: 0.02, minimum: 0.01, - history: expect.any(CircularArray) + history: new CircularArray() }) updateMeasurementStatistics( measurementStatistics, { aggregate: true, average: true, median: false }, - 0.001, - 3 + 0.001 ) expect(measurementStatistics).toStrictEqual({ aggregate: 0.031, maximum: 0.02, minimum: 0.001, - average: 0.010333333333333333, - history: expect.any(CircularArray) + average: 0.001, + history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001) + }) + updateMeasurementStatistics( + measurementStatistics, + { aggregate: true, average: true, median: false }, + 0.003 + ) + expect(measurementStatistics).toStrictEqual({ + aggregate: 0.034, + maximum: 0.02, + minimum: 0.001, + average: 0.002, + history: new CircularArray(DEFAULT_CIRCULAR_ARRAY_SIZE, 0.001, 0.003) }) }) }) diff --git a/tests/worker/abstract-worker.test.js b/tests/worker/abstract-worker.test.js index 68665bc4..6fd0ea2c 100644 --- a/tests/worker/abstract-worker.test.js +++ b/tests/worker/abstract-worker.test.js @@ -11,6 +11,10 @@ describe('Abstract worker test suite', () => { } } + afterEach(() => { + sinon.restore() + }) + it('Verify worker options default values', () => { const worker = new ThreadWorker(() => {}) expect(worker.opts.maxInactiveTime).toStrictEqual(60000)