From: Jérôme Benoit Date: Sun, 11 Jun 2023 22:07:22 +0000 (+0200) Subject: fix: ensure newly created worker is used only if needed X-Git-Tag: v2.6.2~7 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=6c6afb8463782af0689101b7c67cea80df83018f;p=poolifier.git fix: ensure newly created worker is used only if needed Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index c3565446..2b59abf2 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -379,6 +379,11 @@ export abstract class AbstractPool< */ protected abstract get busy (): boolean + /** + * Whether worker nodes are executing at least one task. + * + * @returns Worker nodes busyness boolean status. + */ protected internalBusy (): boolean { return ( this.workerNodes.findIndex(workerNode => { @@ -434,7 +439,7 @@ export abstract class AbstractPool< } /** - * Shutdowns the given worker. + * Terminates the given worker. * * @param worker - A worker within `workerNodes`. */ @@ -609,18 +614,28 @@ export abstract class AbstractPool< /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the load. + * The default worker choice strategy uses a round robin algorithm to distribute the tasks. * * @returns The worker node key */ - protected chooseWorkerNode (): number { + private chooseWorkerNode (): number { if (this.shallCreateDynamicWorker()) { - return this.getWorkerNodeKey(this.createAndSetupDynamicWorker()) + const worker = this.createAndSetupDynamicWorker() + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + ) { + return this.getWorkerNodeKey(worker) + } } return this.workerChoiceStrategyContext.execute() } - protected shallCreateDynamicWorker (): boolean { + /** + * Conditions for dynamic worker creation. + * + * @returns Whether to create a dynamic worker or not. + */ + private shallCreateDynamicWorker (): boolean { return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() } @@ -646,7 +661,9 @@ export abstract class AbstractPool< >(worker: Worker, listener: (message: MessageValue) => void): void /** - * Returns a newly created worker. + * Creates a newly worker. + * + * @returns Newly created worker. */ protected abstract createWorker (): Worker diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 33c3ff3f..257b0d64 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -188,7 +188,7 @@ export interface IPool< */ execute: (data?: Data, name?: string) => Promise /** - * Shutdowns every current worker in this pool. + * Terminate every current worker in this pool. */ destroy: () => Promise /** diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index d9a57341..a060cea3 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -4,6 +4,7 @@ import type { IPool } from '../pool' import type { IWorker } from '../worker' import type { IWorkerChoiceStrategy, + StrategyPolicy, TaskStatisticsRequirements, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -24,6 +25,12 @@ export abstract class AbstractWorkerChoiceStrategy< * Toggles finding the last free worker node key. */ private toggleFindLastFreeWorkerNodeKey: boolean = false + + /** @inheritDoc */ + public readonly strategyPolicy: StrategyPolicy = { + useDynamicWorker: false + } + /** @inheritDoc */ public readonly taskStatisticsRequirements: TaskStatisticsRequirements = { runTime: { diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index 7d77a21d..deaa4881 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -4,6 +4,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, + StrategyPolicy, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -21,6 +22,11 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< > extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { + /** @inheritDoc */ + public readonly strategyPolicy: StrategyPolicy = { + useDynamicWorker: true + } + /** * Worker node id where the current task will be submitted. */ diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index ea174552..93692531 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -4,6 +4,7 @@ import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, + StrategyPolicy, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -21,6 +22,11 @@ export class RoundRobinWorkerChoiceStrategy< > extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { + /** @inheritDoc */ + public readonly strategyPolicy: StrategyPolicy = { + useDynamicWorker: true + } + /** * Id of the next worker node. */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index b0b036ee..52e287e7 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -140,10 +140,24 @@ export interface TaskStatisticsRequirements { elu: MeasurementStatisticsRequirements } +/** + * Strategy policy. + */ +export interface StrategyPolicy { + /** + * Expect direct usage of dynamic worker. + */ + useDynamicWorker: boolean +} + /** * Worker choice strategy interface. */ export interface IWorkerChoiceStrategy { + /** + * Strategy policy. + */ + readonly strategyPolicy: StrategyPolicy /** * Tasks statistics requirements. */ diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 1ce60419..618b5065 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -4,6 +4,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, + StrategyPolicy, TaskStatisticsRequirements, WorkerChoiceStrategyOptions } from './selection-strategies-types' @@ -23,6 +24,11 @@ export class WeightedRoundRobinWorkerChoiceStrategy< > extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { + /** @inheritDoc */ + public readonly strategyPolicy: StrategyPolicy = { + useDynamicWorker: true + } + /** @inheritDoc */ public readonly taskStatisticsRequirements: TaskStatisticsRequirements = { runTime: { diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index a0ba2583..e1332b85 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -9,6 +9,7 @@ import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' import type { IWorkerChoiceStrategy, + StrategyPolicy, TaskStatisticsRequirements, WorkerChoiceStrategy, WorkerChoiceStrategyOptions @@ -104,6 +105,19 @@ export class WorkerChoiceStrategyContext< ]) } + /** + * Gets the strategy policy in the context. + * + * @returns The strategy policy. + */ + public getStrategyPolicy (): StrategyPolicy { + return ( + this.workerChoiceStrategies.get( + this.workerChoiceStrategy + ) as IWorkerChoiceStrategy + ).strategyPolicy + } + /** * Gets the worker choice strategy task statistics requirements in the context. * diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index e9376281..0df558de 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -115,7 +115,31 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN let pool = new FixedThreadPool( max, @@ -356,7 +380,31 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LEAST_USED strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_USED strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_USED strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED let pool = new FixedThreadPool( max, @@ -521,7 +569,31 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LEAST_BUSY strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_BUSY strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY let pool = new FixedThreadPool( max, @@ -626,7 +698,7 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) @@ -700,7 +772,31 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => { + it('Verify LEAST_ELU strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU let pool = new FixedThreadPool( max, @@ -877,7 +973,31 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { + it('Verify FAIR_SHARE strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: false + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE let pool = new FixedThreadPool( max, @@ -1013,7 +1133,7 @@ describe('Selection strategies test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.workerUsage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, failed: 0 @@ -1046,6 +1166,10 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0) expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) @@ -1082,7 +1206,7 @@ describe('Selection strategies test suite', () => { for (const workerNode of pool.workerNodes) { expect(workerNode.workerUsage).toStrictEqual({ tasks: { - executed: maxMultiplier, + executed: expect.any(Number), executing: 0, queued: 0, failed: 0 @@ -1115,6 +1239,10 @@ describe('Selection strategies test suite', () => { utilization: expect.any(Number) } }) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( + max * maxMultiplier + ) expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0) expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0) expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0) @@ -1203,7 +1331,31 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool( max, @@ -1379,7 +1531,7 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) @@ -1455,7 +1607,7 @@ describe('Selection strategies test suite', () => { utilization: 0 } }) - expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0) expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( max * maxMultiplier ) @@ -1554,7 +1706,32 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) - it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => { + const workerChoiceStrategy = + WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: true + }) + await pool.destroy() + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy } + ) + expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({ + useDynamicWorker: true + }) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => { const workerChoiceStrategy = WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN let pool = new FixedThreadPool(