From: Jérôme Benoit Date: Fri, 9 Jun 2023 11:06:38 +0000 (+0200) Subject: Merge branch 'master' into elu-strategy X-Git-Tag: v2.6.0~7^2~4 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=4eb9e75a6d47a08541d38b70505e5ba1126aa2f1;hp=477f48e752d4d4284f92d2ab114422fcad99d5c2;p=poolifier.git Merge branch 'master' into elu-strategy --- diff --git a/README.md b/README.md index ca6d7aff..e0f665fe 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,7 @@ An object with these properties: - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time + - `WorkerChoiceStrategies.LEAST_ELU`: Submit tasks to the worker with the minimum event loop utilization (ELU) (experimental) - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time - `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental) - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts new file mode 100644 index 00000000..5cc02bff --- /dev/null +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -0,0 +1,80 @@ +import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' +import type { IPool } from '../pool' +import type { IWorker } from '../worker' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' +import type { + IWorkerChoiceStrategy, + TaskStatisticsRequirements, + WorkerChoiceStrategyOptions +} from './selection-strategies-types' + +/** + * Selects the worker with the least ELU. + * + * @typeParam Worker - Type of worker which manages the strategy. + * @typeParam Data - Type of data sent to the worker. This can only be serializable data. + * @typeParam Response - Type of execution response. This can only be serializable data. + */ +export class LeastEluWorkerChoiceStrategy< + Worker extends IWorker, + Data = unknown, + Response = unknown + > + extends AbstractWorkerChoiceStrategy + implements IWorkerChoiceStrategy { + /** @inheritDoc */ + public readonly taskStatisticsRequirements: TaskStatisticsRequirements = { + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: true + } + + /** @inheritDoc */ + public constructor ( + pool: IPool, + opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + ) { + super(pool, opts) + this.setTaskStatisticsRequirements(this.opts) + } + + /** @inheritDoc */ + public reset (): boolean { + return true + } + + /** @inheritDoc */ + public update (): boolean { + return true + } + + /** @inheritDoc */ + public choose (): number { + let minWorkerElu = Infinity + let leastEluWorkerNodeKey!: number + for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) { + const workerUsage = workerNode.workerUsage + const workerElu = workerUsage.elu?.utilization ?? 0 + if (workerElu === 0) { + return workerNodeKey + } else if (workerElu < minWorkerElu) { + minWorkerElu = workerElu + leastEluWorkerNodeKey = workerNodeKey + } + } + return leastEluWorkerNodeKey + } + + /** @inheritDoc */ + public remove (): boolean { + return true + } +} diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 26fe436d..0fbb4d4d 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -14,6 +14,12 @@ export const WorkerChoiceStrategies = Object.freeze({ * Least busy worker selection strategy. */ LEAST_BUSY: 'LEAST_BUSY', + /** + * Least ELU worker selection strategy. + * + * @experimental + */ + LEAST_ELU: 'LEAST_ELU', /** * Fair share worker selection strategy. */ diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index f6e132c8..a0ba2583 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -5,6 +5,7 @@ import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strate import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy' import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy' import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy' +import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy' import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' import type { IWorkerChoiceStrategy, @@ -70,6 +71,13 @@ export class WorkerChoiceStrategyContext< opts ) ], + [ + WorkerChoiceStrategies.LEAST_ELU, + new (LeastEluWorkerChoiceStrategy.bind(this))( + pool, + opts + ) + ], [ WorkerChoiceStrategies.FAIR_SHARE, new (FairShareWorkerChoiceStrategy.bind(this))( diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index d43023b3..8e226860 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -6,6 +6,7 @@ const { FixedClusterPool } = require('../../../lib') const { CircularArray } = require('../../../lib/circular-array') +const TestUtils = require('../../test-utils') describe('Selection strategies test suite', () => { const min = 0 @@ -15,6 +16,7 @@ describe('Selection strategies test suite', () => { expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN') expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED') expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY') + expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU') expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE') expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe( 'WEIGHTED_ROUND_ROBIN' @@ -592,6 +594,111 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect( + pool.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ).toStrictEqual({ + runTime: { + aggregate: false, + average: false, + median: false + }, + waitTime: { + aggregate: false, + average: false, + median: false + }, + elu: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => { + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU } + ) + // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose` + const maxMultiplier = 2 + for (let i = 0; i < max * maxMultiplier; i++) { + await pool.execute() + if (i !== max * maxMultiplier - 1) await TestUtils.sleep(500) + } + for (const workerNode of pool.workerNodes) { + const expectedWorkerUsage = { + tasks: { + executed: expect.any(Number), + executing: 0, + queued: 0, + failed: 0 + }, + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: expect.any(CircularArray) + } + } + if (workerNode.workerUsage.elu === undefined) { + expect(workerNode.workerUsage).toStrictEqual({ + ...expectedWorkerUsage, + elu: undefined + }) + } else { + expect(workerNode.workerUsage).toStrictEqual({ + ...expectedWorkerUsage, + elu: { + active: expect.any(Number), + idle: 0, + utilization: 1 + } + }) + } + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) + } + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( diff --git a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js index e39b1484..2070f8e0 100644 --- a/tests/pools/selection-strategies/worker-choice-strategy-context.test.js +++ b/tests/pools/selection-strategies/worker-choice-strategy-context.test.js @@ -17,6 +17,9 @@ const { const { LeastBusyWorkerChoiceStrategy } = require('../../../lib/pools/selection-strategies/least-busy-worker-choice-strategy') +const { + LeastEluWorkerChoiceStrategy +} = require('../../../lib/pools/selection-strategies/least-elu-worker-choice-strategy') const { FairShareWorkerChoiceStrategy } = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy') @@ -261,6 +264,38 @@ describe('Worker choice strategy context test suite', () => { ) }) + it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + fixedPool + ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ) + ).toBeInstanceOf(LeastEluWorkerChoiceStrategy) + expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + }) + + it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + const workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + dynamicPool + ) + workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy) + expect( + workerChoiceStrategyContext.workerChoiceStrategies.get( + workerChoiceStrategy + ) + ).toBeInstanceOf(LeastEluWorkerChoiceStrategy) + expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe( + workerChoiceStrategy + ) + }) + it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => { const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(