From 78099a150dc54d7adab495195fa5f133fd54e114 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 8 Apr 2023 14:06:56 +0200 Subject: [PATCH] feat: add median task run time statistic MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/circular-array.ts | 94 ++++++++++++ src/pools/abstract-pool.ts | 11 +- src/pools/pool-internal.ts | 3 + .../abstract-worker-choice-strategy.ts | 3 +- .../fair-share-worker-choice-strategy.ts | 3 +- .../less-busy-worker-choice-strategy.ts | 3 +- .../selection-strategies-types.ts | 1 + ...hted-round-robin-worker-choice-strategy.ts | 3 +- src/utils.ts | 18 +++ tests/circular-array.test.js | 144 ++++++++++++++++++ tests/pools/abstract/abstract-pool.test.js | 11 ++ .../selection-strategies.test.js | 30 ++++ 12 files changed, 319 insertions(+), 5 deletions(-) create mode 100644 src/circular-array.ts create mode 100644 tests/circular-array.test.js diff --git a/src/circular-array.ts b/src/circular-array.ts new file mode 100644 index 00000000..8adf4b5f --- /dev/null +++ b/src/circular-array.ts @@ -0,0 +1,94 @@ +// Copyright Jerome Benoit. 2021-2023. All Rights Reserved. + +const DEFAULT_CIRCULAR_ARRAY_SIZE = 1024 + +/** + * Array with a maximum length shifting items when full. + */ +export class CircularArray extends Array { + public size: number + + constructor (size: number = DEFAULT_CIRCULAR_ARRAY_SIZE, ...items: T[]) { + super() + this.checkSize(size) + this.size = size + if (arguments.length > 1) { + this.push(...items) + } + } + + public push (...items: T[]): number { + const length = super.push(...items) + if (length > this.size) { + super.splice(0, length - this.size) + } + return this.length + } + + public unshift (...items: T[]): number { + const length = super.unshift(...items) + if (length > this.size) { + super.splice(this.size, items.length) + } + return this.length + } + + public concat (...items: Array>): CircularArray { + const concatenatedCircularArray = super.concat( + items as T[] + ) as CircularArray + concatenatedCircularArray.size = this.size + if (concatenatedCircularArray.length > concatenatedCircularArray.size) { + concatenatedCircularArray.splice( + 0, + concatenatedCircularArray.length - concatenatedCircularArray.size + ) + } + return concatenatedCircularArray + } + + public splice (start: number, deleteCount?: number, ...items: T[]): T[] { + let itemsRemoved: T[] + if (arguments.length >= 3 && deleteCount !== undefined) { + itemsRemoved = super.splice(start, deleteCount) + // FIXME: that makes the items insert not in place + this.push(...items) + } else if (arguments.length === 2) { + itemsRemoved = super.splice(start, deleteCount) + } else { + itemsRemoved = super.splice(start) + } + return itemsRemoved + } + + public resize (size: number): void { + this.checkSize(size) + if (size === 0) { + this.length = 0 + } else if (size < this.size) { + for (let i = size; i < this.size; i++) { + super.pop() + } + } + this.size = size + } + + public empty (): boolean { + return this.length === 0 + } + + public full (): boolean { + return this.length === this.size + } + + private checkSize (size: number): void { + if (!Number.isSafeInteger(size)) { + throw new TypeError( + `Invalid circular array size: ${size} is not a safe integer` + ) + } + if (size < 0) { + throw new RangeError(`Invalid circular array size: ${size} < 0`) + } + } +} diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9ac81ce5..75b5cf79 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,6 +1,6 @@ import crypto from 'node:crypto' import type { MessageValue, PromiseResponseWrapper } from '../utility-types' -import { EMPTY_FUNCTION } from '../utils' +import { EMPTY_FUNCTION, median } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' import { PoolEvents, type PoolOptions } from './pool' import { PoolEmitter } from './pool' @@ -12,6 +12,7 @@ import { type WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types' import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' +import { CircularArray } from '../circular-array' /** * Base class that implements some shared logic for all poolifier pools. @@ -171,7 +172,9 @@ export abstract class AbstractPool< run: 0, running: 0, runTime: 0, + runTimeHistory: new CircularArray(), avgRunTime: 0, + medRunTime: 0, error: 0 }) } @@ -284,6 +287,10 @@ export abstract class AbstractPool< workerTasksUsage.avgRunTime = workerTasksUsage.runTime / workerTasksUsage.run } + if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) { + workerTasksUsage.runTimeHistory.push(message.runTime ?? 0) + workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) + } } } @@ -375,7 +382,9 @@ export abstract class AbstractPool< run: 0, running: 0, runTime: 0, + runTimeHistory: new CircularArray(), avgRunTime: 0, + medRunTime: 0, error: 0 }) diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index c3ec39be..bf799581 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -1,3 +1,4 @@ +import type { CircularArray } from '../circular-array' import type { IPool } from './pool' import type { IPoolWorker } from './pool-worker' @@ -18,7 +19,9 @@ export interface TasksUsage { run: number running: number runTime: number + runTimeHistory: CircularArray avgRunTime: number + medRunTime: number error: number } diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index b0a624da..8ab3d360 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -23,7 +23,8 @@ export abstract class AbstractWorkerChoiceStrategy< /** @inheritDoc */ public requiredStatistics: RequiredStatistics = { runTime: false, - avgRunTime: false + avgRunTime: false, + medRunTime: false } /** diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index 16d5a7be..fc4f0f69 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -31,7 +31,8 @@ export class FairShareWorkerChoiceStrategy< /** @inheritDoc */ public readonly requiredStatistics: RequiredStatistics = { runTime: true, - avgRunTime: true + avgRunTime: true, + medRunTime: false } /** diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index 87ef804d..c03c9da0 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -22,7 +22,8 @@ export class LessBusyWorkerChoiceStrategy< /** @inheritDoc */ public readonly requiredStatistics: RequiredStatistics = { runTime: true, - avgRunTime: false + avgRunTime: false, + medRunTime: false } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 2c6c2e16..22071734 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -35,6 +35,7 @@ export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies export interface RequiredStatistics { runTime: boolean avgRunTime: boolean + medRunTime: boolean } /** diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index ea598b20..96b5867b 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -33,7 +33,8 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public readonly requiredStatistics: RequiredStatistics = { runTime: true, - avgRunTime: true + avgRunTime: true, + medRunTime: false } /** diff --git a/src/utils.ts b/src/utils.ts index 809ca38b..d85c5ba5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -4,3 +4,21 @@ export const EMPTY_FUNCTION: () => void = Object.freeze(() => { /* Intentionally empty */ }) + +/** + * Returns the median of the given data set. + * + * @param dataSet - Data set. + * @returns The median of the given data set. + */ +export const median = (dataSet: number[]): number => { + if (Array.isArray(dataSet) && dataSet.length === 1) { + return dataSet[0] + } + const sortedDataSet = dataSet.slice().sort((a, b) => a - b) + const middleIndex = Math.floor(sortedDataSet.length / 2) + if (sortedDataSet.length % 2 === 0) { + return sortedDataSet[middleIndex / 2] + } + return (sortedDataSet[middleIndex - 1] + sortedDataSet[middleIndex]) / 2 +} diff --git a/tests/circular-array.test.js b/tests/circular-array.test.js new file mode 100644 index 00000000..d821b6cd --- /dev/null +++ b/tests/circular-array.test.js @@ -0,0 +1,144 @@ +const { expect } = require('expect') +const { CircularArray } = require('../lib/circular-array') + +describe('Circular array test suite', () => { + it('Verify that circular array can be instantiated', () => { + const circularArray = new CircularArray() + expect(circularArray).toBeInstanceOf(CircularArray) + }) + + it('Verify circular array default size at instance creation', () => { + const circularArray = new CircularArray() + expect(circularArray.size).toBe(1024) + }) + + it('Verify that circular array size can be set at instance creation', () => { + const circularArray = new CircularArray(1000) + expect(circularArray.size).toBe(1000) + }) + + it('Verify that circular array size and items can be set at instance creation', () => { + let circularArray = new CircularArray(1000, 1, 2, 3, 4, 5) + expect(circularArray.size).toBe(1000) + expect(circularArray.length).toBe(5) + circularArray = new CircularArray(4, 1, 2, 3, 4, 5) + expect(circularArray.size).toBe(4) + expect(circularArray.length).toBe(4) + }) + + it('Verify that circular array size is valid at instance creation', () => { + expect(() => new CircularArray(0.25)).toThrowError( + new TypeError('Invalid circular array size: 0.25 is not a safe integer') + ) + expect(() => new CircularArray(-1)).toThrowError( + new RangeError('Invalid circular array size: -1 < 0') + ) + expect(() => new CircularArray(Number.MAX_SAFE_INTEGER + 1)).toThrowError( + new TypeError( + `Invalid circular array size: ${ + Number.MAX_SAFE_INTEGER + 1 + } is not a safe integer` + ) + ) + }) + + it('Verify that circular array empty works as intended', () => { + const circularArray = new CircularArray() + expect(circularArray.empty()).toBe(true) + }) + + it('Verify that circular array full works as intended', () => { + const circularArray = new CircularArray(5, 1, 2, 3, 4, 5) + expect(circularArray.full()).toBe(true) + }) + + it('Verify that circular array push works as intended', () => { + let circularArray = new CircularArray(4) + let arrayLength = circularArray.push(1, 2, 3, 4, 5) + expect(arrayLength).toBe(circularArray.size) + expect(circularArray.length).toBe(circularArray.size) + expect(circularArray).toStrictEqual(new CircularArray(4, 2, 3, 4, 5)) + arrayLength = circularArray.push(6, 7) + expect(arrayLength).toBe(circularArray.size) + expect(circularArray.length).toBe(circularArray.size) + expect(circularArray).toStrictEqual(new CircularArray(4, 4, 5, 6, 7)) + circularArray = new CircularArray(100) + arrayLength = circularArray.push(1, 2, 3, 4, 5) + expect(arrayLength).toBe(5) + expect(circularArray.size).toBe(100) + expect(circularArray.length).toBe(5) + expect(circularArray).toStrictEqual(new CircularArray(100, 1, 2, 3, 4, 5)) + }) + + it('Verify that circular array splice works as intended', () => { + let circularArray = new CircularArray(1000, 1, 2, 3, 4, 5) + let deletedItems = circularArray.splice(2) + expect(deletedItems).toStrictEqual(new CircularArray(3, 3, 4, 5)) + expect(circularArray.length).toBe(2) + expect(circularArray).toStrictEqual(new CircularArray(1000, 1, 2)) + circularArray = new CircularArray(1000, 1, 2, 3, 4, 5) + deletedItems = circularArray.splice(2, 1) + expect(deletedItems).toStrictEqual(new CircularArray(1, 3)) + expect(circularArray.length).toBe(4) + expect(circularArray).toStrictEqual(new CircularArray(1000, 1, 2, 4, 5)) + circularArray = new CircularArray(4, 1, 2, 3, 4) + deletedItems = circularArray.splice(2, 1, 5, 6) + expect(deletedItems).toStrictEqual(new CircularArray(1, 3)) + expect(circularArray.length).toBe(4) + expect(circularArray).toStrictEqual(new CircularArray(4, 2, 4, 5, 6)) + }) + + it('Verify that circular array concat works as intended', () => { + let circularArray = new CircularArray(5, 1, 2, 3, 4, 5) + circularArray = circularArray.concat(6, 7) + expect(circularArray.length).toBe(5) + expect(circularArray).toStrictEqual(new CircularArray(5, 3, 4, 5, 6, 7)) + circularArray = new CircularArray(1) + circularArray = circularArray.concat(6, 7) + expect(circularArray.length).toBe(1) + expect(circularArray).toStrictEqual(new CircularArray(1, 7)) + }) + + it('Verify that circular array unshift works as intended', () => { + let circularArray = new CircularArray(5, 1, 2, 3, 4, 5) + let arrayLength = circularArray.unshift(6, 7) + expect(arrayLength).toBe(5) + expect(circularArray.length).toBe(5) + expect(circularArray).toStrictEqual(new CircularArray(5, 6, 7, 1, 2, 3)) + circularArray = new CircularArray(1) + arrayLength = circularArray.unshift(6, 7) + expect(arrayLength).toBe(1) + expect(circularArray.length).toBe(1) + expect(circularArray).toStrictEqual(new CircularArray(1, 6)) + }) + + it('Verify that circular array resize works as intended', () => { + expect(() => new CircularArray().resize(0.25)).toThrowError( + new TypeError('Invalid circular array size: 0.25 is not a safe integer') + ) + expect(() => new CircularArray().resize(-1)).toThrowError( + new RangeError('Invalid circular array size: -1 < 0') + ) + expect(() => + new CircularArray().resize(Number.MAX_SAFE_INTEGER + 1) + ).toThrowError( + new TypeError( + `Invalid circular array size: ${ + Number.MAX_SAFE_INTEGER + 1 + } is not a safe integer` + ) + ) + let circularArray = new CircularArray(5, 1, 2, 3, 4, 5) + circularArray.resize(0) + expect(circularArray.size).toBe(0) + expect(circularArray).toStrictEqual(new CircularArray(0)) + circularArray = new CircularArray(5, 1, 2, 3, 4, 5) + circularArray.resize(3) + expect(circularArray.size).toBe(3) + expect(circularArray).toStrictEqual(new CircularArray(3, 1, 2, 3)) + circularArray = new CircularArray(5, 1, 2, 3, 4, 5) + circularArray.resize(8) + expect(circularArray.size).toBe(8) + expect(circularArray).toStrictEqual(new CircularArray(8, 1, 2, 3, 4, 5)) + }) +}) diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 658be6a9..5063c7fd 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -6,6 +6,7 @@ const { PoolEvents, WorkerChoiceStrategies } = require('../../../lib/index') +const { CircularArray } = require('../../../lib/circular-array') describe('Abstract pool test suite', () => { const numberOfWorkers = 1 @@ -145,7 +146,9 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.run).toBe(0) expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBe(0) + expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) expect(workerItem.tasksUsage.avgRunTime).toBe(0) + expect(workerItem.tasksUsage.medRunTime).toBe(0) expect(workerItem.tasksUsage.error).toBe(0) } await pool.destroy() @@ -165,7 +168,9 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.run).toBe(0) expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2) expect(workerItem.tasksUsage.runTime).toBe(0) + expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) expect(workerItem.tasksUsage.avgRunTime).toBe(0) + expect(workerItem.tasksUsage.medRunTime).toBe(0) expect(workerItem.tasksUsage.error).toBe(0) } await Promise.all(promises) @@ -174,7 +179,9 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2) expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.medRunTime).toBe(0) expect(workerItem.tasksUsage.error).toBe(0) } await pool.destroy() @@ -196,7 +203,9 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2) expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerItem.tasksUsage.medRunTime).toBe(0) expect(workerItem.tasksUsage.error).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) @@ -205,7 +214,9 @@ describe('Abstract pool test suite', () => { expect(workerItem.tasksUsage.run).toBe(0) expect(workerItem.tasksUsage.running).toBe(0) expect(workerItem.tasksUsage.runTime).toBe(0) + expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) expect(workerItem.tasksUsage.avgRunTime).toBe(0) + expect(workerItem.tasksUsage.medRunTime).toBe(0) expect(workerItem.tasksUsage.error).toBe(0) } await pool.destroy() diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index b1599a99..5a7394a5 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -77,6 +77,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(false) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( min, @@ -90,6 +93,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(false) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) // We need to clean up the resources after our test await pool.destroy() }) @@ -230,6 +236,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(false) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( min, @@ -243,6 +252,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(false) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) // We need to clean up the resources after our test await pool.destroy() }) @@ -318,6 +330,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(false) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( min, @@ -331,6 +346,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(false) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) // We need to clean up the resources after our test await pool.destroy() }) @@ -420,6 +438,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(true) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( min, @@ -433,6 +454,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(true) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) // We need to clean up the resources after our test await pool.destroy() }) @@ -600,6 +624,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(true) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) await pool.destroy() pool = new DynamicThreadPool( min, @@ -613,6 +640,9 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime ).toBe(true) + expect( + pool.workerChoiceStrategyContext.getRequiredStatistics().medRunTime + ).toBe(false) // We need to clean up the resources after our test await pool.destroy() }) -- 2.34.1