From: Jérôme Benoit Date: Mon, 10 Oct 2022 08:14:02 +0000 (+0200) Subject: Allow worker choice strategy to specify their statistics requirements X-Git-Tag: v2.3.1~45 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=10fcfaf49c0b6d6f0e1d137eeba1e9d805c9a815;p=poolifier.git Allow worker choice strategy to specify their statistics requirements Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0921f456..e204e124 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -279,7 +279,7 @@ export abstract class AbstractPool< protected removeWorker (worker: Worker): void { // Clean worker from data structure this.workers.splice(this.getWorkerIndex(worker), 1) - this.resetWorkerTasksUsage(worker) + this.removeWorkerTasksUsage(worker) } /** @@ -393,7 +393,7 @@ export abstract class AbstractPool< } /** - * Increase the number of tasks that the given worker has applied. + * Increases the number of tasks that the given worker has applied. * * @param worker Worker which running tasks is increased. */ @@ -402,7 +402,7 @@ export abstract class AbstractPool< } /** - * Decrease the number of tasks that the given worker has applied. + * Decreases the number of tasks that the given worker has applied. * * @param worker Worker which running tasks is decreased. */ @@ -411,7 +411,7 @@ export abstract class AbstractPool< } /** - * Step the number of tasks that the given worker has applied. + * Steps the number of tasks that the given worker has applied. * * @param worker Worker which running tasks are stepped. * @param step Number of running tasks step. @@ -427,12 +427,12 @@ export abstract class AbstractPool< } /** - * Step the number of tasks that the given worker has run. + * Steps the number of tasks that the given worker has run. * * @param worker Worker which has run tasks. * @param step Number of run tasks step. */ - private stepWorkerRunTasks (worker: Worker, step: number) { + private stepWorkerRunTasks (worker: Worker, step: number): void { const tasksUsage = this.workersTasksUsage.get(worker) if (tasksUsage !== undefined) { tasksUsage.run = tasksUsage.run + step @@ -443,7 +443,7 @@ export abstract class AbstractPool< } /** - * Update tasks run time for the given worker. + * Updates tasks run time for the given worker. * * @param worker Worker which run the task. * @param taskRunTime Worker task run time. @@ -451,23 +451,28 @@ export abstract class AbstractPool< private updateWorkerTasksRunTime ( worker: Worker, taskRunTime: number | undefined - ) { - const tasksUsage = this.workersTasksUsage.get(worker) - if (tasksUsage !== undefined && tasksUsage.run !== 0) { - tasksUsage.runTime += taskRunTime ?? 0 - tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run - this.workersTasksUsage.set(worker, tasksUsage) - } else { - throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) + ): void { + if ( + this.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime === true + ) { + const tasksUsage = this.workersTasksUsage.get(worker) + if (tasksUsage !== undefined && tasksUsage.run !== 0) { + tasksUsage.runTime += taskRunTime ?? 0 + tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run + this.workersTasksUsage.set(worker, tasksUsage) + } else { + throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP) + } } } /** - * Reset worker tasks usage statistics. + * Removes worker tasks usage statistics. * * @param worker The worker. */ - private resetWorkerTasksUsage (worker: Worker): void { + private removeWorkerTasksUsage (worker: Worker): void { this.workersTasksUsage.delete(worker) } } diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index fc9074da..cd50f700 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -1,7 +1,10 @@ import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' import { PoolType } from '../pool-internal' -import type { IWorkerChoiceStrategy } from './selection-strategies-types' +import type { + IWorkerChoiceStrategy, + RequiredStatistics +} from './selection-strategies-types' /** * Abstract worker choice strategy class. @@ -17,6 +20,10 @@ export abstract class AbstractWorkerChoiceStrategy< > implements IWorkerChoiceStrategy { /** @inheritDoc */ public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC + /** @inheritDoc */ + public requiredStatistics: RequiredStatistics = { + runTime: false + } /** * Constructs a worker choice strategy attached to the pool. diff --git a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts index c2053af4..68b48e32 100644 --- a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts @@ -39,6 +39,7 @@ export class DynamicPoolWorkerChoiceStrategy< this.pool, workerChoiceStrategy ) + this.requiredStatistics = this.workerChoiceStrategy.requiredStatistics } /** @inheritDoc */ diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index f500343f..f8f18212 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -1,5 +1,6 @@ import type { AbstractPoolWorker } from '../abstract-pool-worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' +import type { RequiredStatistics } from './selection-strategies-types' /** * Worker virtual task timestamp. @@ -22,6 +23,11 @@ export class FairShareWorkerChoiceStrategy< Data, Response > extends AbstractWorkerChoiceStrategy { + /** @inheritDoc */ + public requiredStatistics: RequiredStatistics = { + runTime: true + } + /** * Worker last virtual task execution timestamp. */ diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index dd545d41..4d302f3d 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -27,6 +27,13 @@ export const WorkerChoiceStrategies = Object.freeze({ */ export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies +/** + * Tasks usage statistics requirements. + */ +export type RequiredStatistics = { + runTime: boolean +} + /** * Worker choice strategy interface. * @@ -37,6 +44,10 @@ export interface IWorkerChoiceStrategy { * Is the pool attached to the strategy dynamic?. */ isDynamicPool: boolean + /** + * Required tasks usage statistics. + */ + requiredStatistics: RequiredStatistics /** * Choose a worker in the pool. */ diff --git a/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts index db8fa235..6e48f82f 100644 --- a/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-choice-strategy.ts @@ -2,6 +2,7 @@ import { cpus } from 'os' import type { AbstractPoolWorker } from '../abstract-pool-worker' import type { IPoolInternal } from '../pool-internal' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' +import type { RequiredStatistics } from './selection-strategies-types' /** * Task run time. @@ -24,6 +25,11 @@ export class WeightedRoundRobinWorkerChoiceStrategy< Data, Response > extends AbstractWorkerChoiceStrategy { + /** @inheritDoc */ + public requiredStatistics: RequiredStatistics = { + runTime: true + } + /** * Worker index where the previous task was submitted. */ diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index b84a8632..8290f89e 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -60,6 +60,15 @@ export class WorkerChoiceStrategyContext< ) } + /** + * Get the worker choice strategy used in the context. + * + * @returns The worker choice strategy. + */ + public getWorkerChoiceStrategy (): IWorkerChoiceStrategy { + return this.workerChoiceStrategy + } + /** * Set the worker choice strategy to use in the context. * diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 1e337ff9..6a4da63d 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -158,7 +158,7 @@ describe('Abstract pool test suite', () => { pool.destroy() }) - it('Simulate worker not found during updateWorkerTasksRunTime', () => { + it('Simulate worker not found during updateWorkerTasksRunTime with strategy not requiring it', () => { const pool = new StubPoolWithWorkerTasksUsageMapClear( numberOfWorkers, './tests/worker-files/cluster/testWorker.js', @@ -168,6 +168,21 @@ describe('Abstract pool test suite', () => { ) // Simulate worker not found. pool.removeAllWorker() + expect(() => pool.updateWorkerTasksRunTime()).not.toThrowError() + pool.destroy() + }) + + it('Simulate worker not found during updateWorkerTasksRunTime with strategy requiring it', () => { + const pool = new StubPoolWithWorkerTasksUsageMapClear( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js', + { + workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE, + errorHandler: e => console.error(e) + } + ) + // Simulate worker not found. + pool.removeAllWorker() expect(() => pool.updateWorkerTasksRunTime()).toThrowError( workerNotFoundInTasksUsageMapError ) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index b12a8df4..39895b21 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -46,6 +46,32 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const min = 0 + const max = 3 + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(false) + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(false) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => { const max = 3 const pool = new FixedThreadPool( @@ -116,6 +142,32 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify LESS_RECENTLY_USED strategy default tasks usage statistics requirements', async () => { + const min = 0 + const max = 3 + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_RECENTLY_USED) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(false) + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_RECENTLY_USED) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(false) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify LESS_RECENTLY_USED strategy can be run in a fixed pool', async () => { const max = 3 const pool = new FixedThreadPool( @@ -180,6 +232,32 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => { + const min = 0 + const max = 3 + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(true) + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(true) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify FAIR_SHARE strategy can be run in a fixed pool', async () => { const max = 3 const pool = new FixedThreadPool( @@ -244,6 +322,32 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => { + const min = 0 + const max = 3 + let pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(true) + pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js' + ) + pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) + expect( + pool.workerChoiceStrategyContext.getWorkerChoiceStrategy() + .requiredStatistics.runTime + ).toBe(true) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a fixed pool', async () => { const max = 3 const pool = new FixedThreadPool(