From bdaf31cd0e637aa466c78d54a49f157899a2cb3f Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 8 Oct 2022 12:24:16 +0200 Subject: [PATCH] Add dynamic worker choice strategy change at runtime MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- .eslintrc.js | 1 + CHANGELOG.md | 4 +- benchmarks/internal/benchmark-utils.js | 2 +- src/index.ts | 19 +- src/pools/abstract-pool-worker.ts | 23 ++ src/pools/abstract-pool.ts | 130 ++------- src/pools/cluster/fixed.ts | 2 +- src/pools/pool-internal.ts | 30 +- src/pools/pool-worker.ts | 73 +++++ src/pools/pool.ts | 40 ++- src/pools/selection-strategies.ts | 260 ------------------ .../abstract-worker-choice-strategy.ts | 32 +++ .../dynamic-pool-worker-choice-strategy.ts | 58 ++++ ...ss-recently-used-worker-choice-strategy.ts | 35 +++ .../round-robin-worker-choice-strategy.ts | 30 ++ .../selection-strategies-types.ts | 36 +++ .../selection-strategies-utils.ts | 42 +++ .../worker-choice-strategy-context.ts | 85 ++++++ src/pools/thread/dynamic.ts | 2 +- src/pools/thread/fixed.ts | 2 +- src/utility-types.ts | 4 +- tests/pools/selection-strategies.test.js | 42 +++ tests/pools/thread/dynamic.test.js | 3 +- tests/test-utils.js | 4 +- tsconfig.json | 3 +- 25 files changed, 567 insertions(+), 395 deletions(-) create mode 100644 src/pools/abstract-pool-worker.ts create mode 100644 src/pools/pool-worker.ts delete mode 100644 src/pools/selection-strategies.ts create mode 100644 src/pools/selection-strategies/abstract-worker-choice-strategy.ts create mode 100644 src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts create mode 100644 src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts create mode 100644 src/pools/selection-strategies/round-robin-worker-choice-strategy.ts create mode 100644 src/pools/selection-strategies/selection-strategies-types.ts create mode 100644 src/pools/selection-strategies/selection-strategies-utils.ts create mode 100644 src/pools/selection-strategies/worker-choice-strategy-context.ts diff --git a/.eslintrc.js b/.eslintrc.js index fe04926f..ca01b306 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -35,6 +35,7 @@ module.exports = defineConfig({ 'comparator', 'ecma', 'enum', + 'fibonacci', 'inheritdoc', 'jsdoc', 'poolifier', diff --git a/CHANGELOG.md b/CHANGELOG.md index cbaf1909..da16390f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [2.2.1] - 2022-05-01 -- +### Added + +- Dynamic worker choice strategy change at runtime. ## [2.2.0] - 2022-05-01 diff --git a/benchmarks/internal/benchmark-utils.js b/benchmarks/internal/benchmark-utils.js index 5f0c643b..debe0b1c 100644 --- a/benchmarks/internal/benchmark-utils.js +++ b/benchmarks/internal/benchmark-utils.js @@ -37,7 +37,7 @@ function generateRandomInteger (max, min = 0) { /** * Intentionally inefficient implementation. * - * @param {*} n + * @param {number} n * @returns {number} */ function fibonacci (n) { diff --git a/src/index.ts b/src/index.ts index 2bae50e7..b0c419ca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,16 +1,15 @@ -export type { - ErrorHandler, - ExitHandler, - IWorker, - OnlineHandler, - PoolOptions -} from './pools/abstract-pool' export { DynamicClusterPool } from './pools/cluster/dynamic' export { FixedClusterPool } from './pools/cluster/fixed' export type { ClusterPoolOptions } from './pools/cluster/fixed' -export type { IPool } from './pools/pool' -export { WorkerChoiceStrategies } from './pools/selection-strategies' -export type { WorkerChoiceStrategy } from './pools/selection-strategies' +export type { IPool, PoolOptions } from './pools/pool' +export type { + ErrorHandler, + ExitHandler, + IPoolWorker, + OnlineHandler +} from './pools/pool-worker' +export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types' +export type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types' export { DynamicThreadPool } from './pools/thread/dynamic' export { FixedThreadPool } from './pools/thread/fixed' export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed' diff --git a/src/pools/abstract-pool-worker.ts b/src/pools/abstract-pool-worker.ts new file mode 100644 index 00000000..f591b24e --- /dev/null +++ b/src/pools/abstract-pool-worker.ts @@ -0,0 +1,23 @@ +import type { + ErrorHandler, + ExitHandler, + IPoolWorker, + MessageHandler, + OnlineHandler +} from './pool-worker' + +/** + * Basic class that implement the minimum required for a pool worker. + */ +export abstract class AbstractPoolWorker implements IPoolWorker { + /** @inheritdoc */ + abstract on (event: 'message', handler: MessageHandler): void + /** @inheritdoc */ + abstract on (event: 'error', handler: ErrorHandler): void + /** @inheritdoc */ + abstract on (event: 'online', handler: OnlineHandler): void + /** @inheritdoc */ + abstract on (event: 'exit', handler: ExitHandler): void + /** @inheritdoc */ + abstract once (event: 'exit', handler: ExitHandler): void +} diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 71046d49..928b24c6 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,106 +4,15 @@ import type { } from '../utility-types' import { EMPTY_FUNCTION } from '../utils' import { isKillBehavior, KillBehaviors } from '../worker/worker-options' +import type { AbstractPoolWorker } from './abstract-pool-worker' +import type { PoolOptions } from './pool' import type { IPoolInternal } from './pool-internal' import { PoolEmitter, PoolType } from './pool-internal' -import type { WorkerChoiceStrategy } from './selection-strategies' import { WorkerChoiceStrategies, - WorkerChoiceStrategyContext -} from './selection-strategies' - -/** - * Callback invoked if the worker has received a message. - */ -export type MessageHandler = (this: Worker, m: unknown) => void - -/** - * Callback invoked if the worker raised an error. - */ -export type ErrorHandler = (this: Worker, e: Error) => void - -/** - * Callback invoked when the worker has started successfully. - */ -export type OnlineHandler = (this: Worker) => void - -/** - * Callback invoked when the worker exits successfully. - */ -export type ExitHandler = (this: Worker, code: number) => void - -/** - * Basic interface that describes the minimum required implementation of listener events for a pool-worker. - */ -export interface IWorker { - /** - * Register a listener to the message event. - * - * @param event `'message'`. - * @param handler The message handler. - */ - on(event: 'message', handler: MessageHandler): void - /** - * Register a listener to the error event. - * - * @param event `'error'`. - * @param handler The error handler. - */ - on(event: 'error', handler: ErrorHandler): void - /** - * Register a listener to the online event. - * - * @param event `'online'`. - * @param handler The online handler. - */ - on(event: 'online', handler: OnlineHandler): void - /** - * Register a listener to the exit event. - * - * @param event `'exit'`. - * @param handler The exit handler. - */ - on(event: 'exit', handler: ExitHandler): void - /** - * Register a listener to the exit event that will only performed once. - * - * @param event `'exit'`. - * @param handler The exit handler. - */ - once(event: 'exit', handler: ExitHandler): void -} - -/** - * Options for a poolifier pool. - */ -export interface PoolOptions { - /** - * A function that will listen for message event on each worker. - */ - messageHandler?: MessageHandler - /** - * A function that will listen for error event on each worker. - */ - errorHandler?: ErrorHandler - /** - * A function that will listen for online event on each worker. - */ - onlineHandler?: OnlineHandler - /** - * A function that will listen for exit event on each worker. - */ - exitHandler?: ExitHandler - /** - * The work choice strategy to use in this pool. - */ - workerChoiceStrategy?: WorkerChoiceStrategy - /** - * Pool events emission. - * - * @default true - */ - enableEvents?: boolean -} + WorkerChoiceStrategy +} from './selection-strategies/selection-strategies-types' +import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' /** * Base class containing some shared logic for all poolifier pools. @@ -113,7 +22,7 @@ export interface PoolOptions { * @template Response Type of response of execution. This can only be serializable data. */ export abstract class AbstractPool< - Worker extends IWorker, + Worker extends AbstractPoolWorker, Data = unknown, Response = unknown > implements IPoolInternal { @@ -192,7 +101,7 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerCreated, message => { if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - this.tasks.get(workerCreated) === 0 + this.getWorkerRunningTasks(workerCreated) === 0 ) { // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) this.destroyWorker(workerCreated) as void @@ -242,6 +151,16 @@ export abstract class AbstractPool< return this.promiseMap.size } + /** @inheritdoc */ + public getWorkerRunningTasks (worker: Worker): number | undefined { + return this.tasks.get(worker) + } + + /** @inheritdoc */ + public getWorkerIndex (worker: Worker): number { + return this.workers.indexOf(worker) + } + /** @inheritdoc */ public setWorkerChoiceStrategy ( workerChoiceStrategy: WorkerChoiceStrategy @@ -258,16 +177,16 @@ export abstract class AbstractPool< protected internalGetBusyStatus (): boolean { return ( this.numberOfRunningTasks >= this.numberOfWorkers && - this.findFreeTasksMapEntry() === false + this.findFreeWorker() === false ) } /** @inheritdoc */ - public findFreeTasksMapEntry (): [Worker, number] | false { - for (const [worker, numberOfTasks] of this.tasks) { - if (numberOfTasks === 0) { - // A worker is free, return the matching tasks map entry - return [worker, numberOfTasks] + public findFreeWorker (): Worker | false { + for (const worker of this.workers) { + if (this.getWorkerRunningTasks(worker) === 0) { + // A worker is free, return the matching worker + return worker } } return false @@ -350,8 +269,7 @@ export abstract class AbstractPool< */ protected removeWorker (worker: Worker): void { // Clean worker from data structure - const workerIndex = this.workers.indexOf(worker) - this.workers.splice(workerIndex, 1) + this.workers.splice(this.getWorkerIndex(worker), 1) this.tasks.delete(worker) } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 9d162d89..971116bc 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -1,8 +1,8 @@ import type { Worker } from 'cluster' import cluster from 'cluster' import type { MessageValue } from '../../utility-types' -import type { PoolOptions } from '../abstract-pool' import { AbstractPool } from '../abstract-pool' +import type { PoolOptions } from '../pool' import { PoolType } from '../pool-internal' /** diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index dd71ae71..71120110 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -1,5 +1,5 @@ import EventEmitter from 'events' -import type { IWorker } from './abstract-pool' +import type { AbstractPoolWorker } from './abstract-pool-worker' import type { IPool } from './pool' /** @@ -23,7 +23,7 @@ export class PoolEmitter extends EventEmitter {} * @template Response Type of response of execution. */ export interface IPoolInternal< - Worker extends IWorker, + Worker extends AbstractPoolWorker, Data = unknown, Response = unknown > extends IPool { @@ -74,13 +74,29 @@ export interface IPoolInternal< readonly numberOfRunningTasks: number /** - * Find a tasks map entry with a free worker based on the number of tasks the worker has applied. + * Find a free worker based on the number of tasks the worker has applied. * - * If an entry is found with a worker that has `0` tasks, it is detected as free. + * If a worker is found with `0` running tasks, it is detected as free and returned. * - * If no tasks map entry with a free worker was found, `false` will be returned. + * If no free worker is found, `false` is returned. * - * @returns A tasks map entry with a free worker if there was one, otherwise `false`. + * @returns A free worker if there is one, otherwise `false`. */ - findFreeTasksMapEntry(): [Worker, number] | false + findFreeWorker(): Worker | false + + /** + * Get worker index. + * + * @param worker The worker. + * @returns The worker index. + */ + getWorkerIndex(worker: Worker): number + + /** + * Get worker running tasks. + * + * @param worker The worker. + * @returns The number of tasks currently running on the worker. + */ + getWorkerRunningTasks(worker: Worker): number | undefined } diff --git a/src/pools/pool-worker.ts b/src/pools/pool-worker.ts new file mode 100644 index 00000000..d46eda19 --- /dev/null +++ b/src/pools/pool-worker.ts @@ -0,0 +1,73 @@ +import type { Worker as ClusterWorker } from 'cluster' +import type { Worker as WorkerThread } from 'worker_threads' +import type { Draft } from '../utility-types' + +/** + * Poolifier supported worker type. + */ +export type WorkerType = WorkerThread & ClusterWorker & Draft + +/** + * Callback invoked if the worker has received a message. + */ +export type MessageHandler = (this: Worker, m: unknown) => void + +/** + * Callback invoked if the worker raised an error. + */ +export type ErrorHandler = (this: Worker, e: Error) => void + +/** + * Callback invoked when the worker has started successfully. + */ +export type OnlineHandler = (this: Worker) => void + +/** + * Callback invoked when the worker exits successfully. + */ +export type ExitHandler = (this: Worker, code: number) => void + +/** + * Basic interface that describes the minimum required implementation of listener events for a pool worker. + */ +export interface IPoolWorker { + /** + * Worker identifier. + */ + readonly id?: number + /** + * Register a listener to the message event. + * + * @param event `'message'`. + * @param handler The message handler. + */ + on(event: 'message', handler: MessageHandler): void + /** + * Register a listener to the error event. + * + * @param event `'error'`. + * @param handler The error handler. + */ + on(event: 'error', handler: ErrorHandler): void + /** + * Register a listener to the online event. + * + * @param event `'online'`. + * @param handler The online handler. + */ + on(event: 'online', handler: OnlineHandler): void + /** + * Register a listener to the exit event. + * + * @param event `'exit'`. + * @param handler The exit handler. + */ + on(event: 'exit', handler: ExitHandler): void + /** + * Register a listener to the exit event that will only performed once. + * + * @param event `'exit'`. + * @param handler The exit handler. + */ + once(event: 'exit', handler: ExitHandler): void +} diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 75204582..8da89156 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,4 +1,42 @@ -import type { WorkerChoiceStrategy } from './selection-strategies' +import type { + ErrorHandler, + ExitHandler, + MessageHandler, + OnlineHandler +} from './pool-worker' +import type { WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types' + +/** + * Options for a poolifier pool. + */ +export interface PoolOptions { + /** + * A function that will listen for message event on each worker. + */ + messageHandler?: MessageHandler + /** + * A function that will listen for error event on each worker. + */ + errorHandler?: ErrorHandler + /** + * A function that will listen for online event on each worker. + */ + onlineHandler?: OnlineHandler + /** + * A function that will listen for exit event on each worker. + */ + exitHandler?: ExitHandler + /** + * The work choice strategy to use in this pool. + */ + workerChoiceStrategy?: WorkerChoiceStrategy + /** + * Pool events emission. + * + * @default true + */ + enableEvents?: boolean +} /** * Contract definition for a poolifier pool. diff --git a/src/pools/selection-strategies.ts b/src/pools/selection-strategies.ts deleted file mode 100644 index f1c20bef..00000000 --- a/src/pools/selection-strategies.ts +++ /dev/null @@ -1,260 +0,0 @@ -import type { IWorker } from './abstract-pool' -import type { IPoolInternal } from './pool-internal' -import { PoolType } from './pool-internal' - -/** - * Enumeration of worker choice strategies. - */ -export const WorkerChoiceStrategies = Object.freeze({ - /** - * Round robin worker selection strategy. - */ - ROUND_ROBIN: 'ROUND_ROBIN', - /** - * Less recently used worker selection strategy. - */ - LESS_RECENTLY_USED: 'LESS_RECENTLY_USED' -} as const) - -/** - * Worker choice strategy. - */ -export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies - -/** - * Worker choice strategy interface. - * - * @template Worker Type of worker which manages the strategy. - */ -interface IWorkerChoiceStrategy { - /** - * Choose a worker in the pool. - */ - choose(): Worker -} - -/** - * Selects the next worker in a round robin fashion. - * - * @template Worker Type of worker which manages the strategy. - * @template Data Type of data sent to the worker. This can only be serializable data. - * @template Response Type of response of execution. This can only be serializable data. - */ -class RoundRobinWorkerChoiceStrategy - implements IWorkerChoiceStrategy { - /** - * Index for the next worker. - */ - private nextWorkerIndex: number = 0 - - /** - * Constructs a worker choice strategy that selects in a round robin fashion. - * - * @param pool The pool instance. - */ - public constructor ( - private readonly pool: IPoolInternal - ) {} - - /** @inheritdoc */ - public choose (): Worker { - const chosenWorker = this.pool.workers[this.nextWorkerIndex] - this.nextWorkerIndex = - this.nextWorkerIndex === this.pool.workers.length - 1 - ? 0 - : this.nextWorkerIndex + 1 - return chosenWorker - } -} - -/** - * Selects the less recently used worker. - * - * @template Worker Type of worker which manages the strategy. - * @template Data Type of data sent to the worker. This can only be serializable data. - * @template Response Type of response of execution. This can only be serializable data. - */ -class LessRecentlyUsedWorkerChoiceStrategy< - Worker extends IWorker, - Data, - Response -> implements IWorkerChoiceStrategy { - /** - * Constructs a worker choice strategy that selects based on less recently used. - * - * @param pool The pool instance. - */ - public constructor ( - private readonly pool: IPoolInternal - ) {} - - /** @inheritdoc */ - public choose (): Worker { - const isPoolDynamic = this.pool.type === PoolType.DYNAMIC - let minNumberOfTasks = Infinity - // A worker is always found because it picks the one with fewer tasks - let lessRecentlyUsedWorker!: Worker - for (const [worker, numberOfTasks] of this.pool.tasks) { - if (!isPoolDynamic && numberOfTasks === 0) { - return worker - } else if (numberOfTasks < minNumberOfTasks) { - lessRecentlyUsedWorker = worker - minNumberOfTasks = numberOfTasks - } - } - return lessRecentlyUsedWorker - } -} - -/** - * Dynamically choose a worker. - * - * @template Worker Type of worker which manages the strategy. - * @template Data Type of data sent to the worker. This can only be serializable data. - * @template Response Type of response of execution. This can only be serializable data. - */ -class DynamicPoolWorkerChoiceStrategy - implements IWorkerChoiceStrategy { - private workerChoiceStrategy: IWorkerChoiceStrategy - - /** - * Constructs a worker choice strategy for dynamical pools. - * - * @param pool The pool instance. - * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. - * @param workerChoiceStrategy The worker choice strategy when the pull is busy. - */ - public constructor ( - private readonly pool: IPoolInternal, - private createDynamicallyWorkerCallback: () => Worker, - workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - ) { - this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy( - this.pool, - workerChoiceStrategy - ) - } - - /** @inheritdoc */ - public choose (): Worker { - const freeTaskMapEntry = this.pool.findFreeTasksMapEntry() - if (freeTaskMapEntry) { - return freeTaskMapEntry[0] - } - - if (this.pool.busy) { - return this.workerChoiceStrategy.choose() - } - - // All workers are busy, create a new worker - return this.createDynamicallyWorkerCallback() - } -} - -/** - * The worker choice strategy context. - * - * @template Worker Type of worker. - * @template Data Type of data sent to the worker. This can only be serializable data. - * @template Response Type of response of execution. This can only be serializable data. - */ -export class WorkerChoiceStrategyContext< - Worker extends IWorker, - Data, - Response -> { - // Will be set by setter in constructor - private workerChoiceStrategy!: IWorkerChoiceStrategy - - /** - * Worker choice strategy context constructor. - * - * @param pool The pool instance. - * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. - * @param workerChoiceStrategy The worker choice strategy. - */ - public constructor ( - private readonly pool: IPoolInternal, - private createDynamicallyWorkerCallback: () => Worker, - workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - ) { - this.setWorkerChoiceStrategy(workerChoiceStrategy) - } - - /** - * Get the worker choice strategy instance specific to the pool type. - * - * @param workerChoiceStrategy The worker choice strategy. - * @returns The worker choice strategy instance for the pool type. - */ - private getPoolWorkerChoiceStrategy ( - workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - ): IWorkerChoiceStrategy { - if (this.pool.type === PoolType.DYNAMIC) { - return new DynamicPoolWorkerChoiceStrategy( - this.pool, - this.createDynamicallyWorkerCallback, - workerChoiceStrategy - ) - } - return SelectionStrategiesUtils.getWorkerChoiceStrategy( - this.pool, - workerChoiceStrategy - ) - } - - /** - * Set the worker choice strategy to use in the context. - * - * @param workerChoiceStrategy The worker choice strategy to set. - */ - public setWorkerChoiceStrategy ( - workerChoiceStrategy: WorkerChoiceStrategy - ): void { - this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( - workerChoiceStrategy - ) - } - - /** - * Choose a worker with the underlying selection strategy. - * - * @returns The chosen one. - */ - public execute (): Worker { - return this.workerChoiceStrategy.choose() - } -} - -/** - * Worker selection strategies helpers class. - */ -class SelectionStrategiesUtils { - /** - * Get the worker choice strategy instance. - * - * @param pool The pool instance. - * @param workerChoiceStrategy The worker choice strategy. - * @returns The worker choice strategy instance. - */ - public static getWorkerChoiceStrategy< - Worker extends IWorker, - Data, - Response - > ( - pool: IPoolInternal, - workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN - ): IWorkerChoiceStrategy { - switch (workerChoiceStrategy) { - case WorkerChoiceStrategies.ROUND_ROBIN: - return new RoundRobinWorkerChoiceStrategy(pool) - case WorkerChoiceStrategies.LESS_RECENTLY_USED: - return new LessRecentlyUsedWorkerChoiceStrategy(pool) - default: - throw new Error( - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - `Worker choice strategy '${workerChoiceStrategy}' not found` - ) - } - } -} diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts new file mode 100644 index 00000000..9cbebc3b --- /dev/null +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -0,0 +1,32 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolInternal } from '../pool-internal' +import { PoolType } from '../pool-internal' +import type { IWorkerChoiceStrategy } from './selection-strategies-types' + +/** + * Abstract worker choice strategy class. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export abstract class AbstractWorkerChoiceStrategy< + Worker extends AbstractPoolWorker, + Data, + Response +> implements IWorkerChoiceStrategy { + /** @inheritdoc */ + public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC + + /** + * Constructs a worker choice strategy attached to the pool. + * + * @param pool The pool instance. + */ + public constructor ( + protected readonly pool: IPoolInternal + ) {} + + /** @inheritdoc */ + public abstract choose (): Worker +} diff --git a/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts new file mode 100644 index 00000000..f98ec5ef --- /dev/null +++ b/src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts @@ -0,0 +1,58 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolInternal } from '../pool-internal' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' +import type { + IWorkerChoiceStrategy, + WorkerChoiceStrategy +} from './selection-strategies-types' +import { WorkerChoiceStrategies } from './selection-strategies-types' +import { SelectionStrategiesUtils } from './selection-strategies-utils' + +/** + * Dynamically choose a worker. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export class DynamicPoolWorkerChoiceStrategy< + Worker extends AbstractPoolWorker, + Data, + Response +> extends AbstractWorkerChoiceStrategy { + private workerChoiceStrategy: IWorkerChoiceStrategy + + /** + * Constructs a worker choice strategy for dynamical pool. + * + * @param pool The pool instance. + * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. + * @param workerChoiceStrategy The worker choice strategy when the pull is busy. + */ + public constructor ( + pool: IPoolInternal, + private createDynamicallyWorkerCallback: () => Worker, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ) { + super(pool) + this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) + } + + /** @inheritdoc */ + public choose (): Worker { + const freeWorker = this.pool.findFreeWorker() + if (freeWorker) { + return freeWorker + } + + if (this.pool.busy) { + return this.workerChoiceStrategy.choose() + } + + // All workers are busy, create a new worker + return this.createDynamicallyWorkerCallback() + } +} diff --git a/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts new file mode 100644 index 00000000..a7892c40 --- /dev/null +++ b/src/pools/selection-strategies/less-recently-used-worker-choice-strategy.ts @@ -0,0 +1,35 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' + +/** + * Selects the less recently used worker. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export class LessRecentlyUsedWorkerChoiceStrategy< + Worker extends AbstractPoolWorker, + Data, + Response +> extends AbstractWorkerChoiceStrategy { + /** @inheritdoc */ + public choose (): Worker { + let minNumberOfRunningTasks = Infinity + // A worker is always found because it picks the one with fewer tasks + let lessRecentlyUsedWorker!: Worker + for (const worker of this.pool.workers) { + const workerRunningTasks = this.pool.getWorkerRunningTasks(worker) + if (!this.isDynamicPool && workerRunningTasks === 0) { + return worker + } else if ( + workerRunningTasks !== undefined && + workerRunningTasks < minNumberOfRunningTasks + ) { + lessRecentlyUsedWorker = worker + minNumberOfRunningTasks = workerRunningTasks + } + } + return lessRecentlyUsedWorker + } +} diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts new file mode 100644 index 00000000..ea1ad567 --- /dev/null +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -0,0 +1,30 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' + +/** + * Selects the next worker in a round robin fashion. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export class RoundRobinWorkerChoiceStrategy< + Worker extends AbstractPoolWorker, + Data, + Response +> extends AbstractWorkerChoiceStrategy { + /** + * Index for the next worker. + */ + private nextWorkerIndex: number = 0 + + /** @inheritdoc */ + public choose (): Worker { + const chosenWorker = this.pool.workers[this.nextWorkerIndex] + this.nextWorkerIndex = + this.nextWorkerIndex === this.pool.workers.length - 1 + ? 0 + : this.nextWorkerIndex + 1 + return chosenWorker + } +} diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts new file mode 100644 index 00000000..edefcfaa --- /dev/null +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -0,0 +1,36 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' + +/** + * Enumeration of worker choice strategies. + */ +export const WorkerChoiceStrategies = Object.freeze({ + /** + * Round robin worker selection strategy. + */ + ROUND_ROBIN: 'ROUND_ROBIN', + /** + * Less recently used worker selection strategy. + */ + LESS_RECENTLY_USED: 'LESS_RECENTLY_USED' +} as const) + +/** + * Worker choice strategy. + */ +export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies + +/** + * Worker choice strategy interface. + * + * @template Worker Type of worker which manages the strategy. + */ +export interface IWorkerChoiceStrategy { + /** + * Is the pool attached to the strategy dynamic?. + */ + isDynamicPool: boolean + /** + * Choose a worker in the pool. + */ + choose(): Worker +} diff --git a/src/pools/selection-strategies/selection-strategies-utils.ts b/src/pools/selection-strategies/selection-strategies-utils.ts new file mode 100644 index 00000000..e76a6998 --- /dev/null +++ b/src/pools/selection-strategies/selection-strategies-utils.ts @@ -0,0 +1,42 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolInternal } from '../pool-internal' +import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy' +import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy' +import type { + IWorkerChoiceStrategy, + WorkerChoiceStrategy +} from './selection-strategies-types' +import { WorkerChoiceStrategies } from './selection-strategies-types' + +/** + * Worker selection strategies helpers class. + */ +export class SelectionStrategiesUtils { + /** + * Get the worker choice strategy instance. + * + * @param pool The pool instance. + * @param workerChoiceStrategy The worker choice strategy. + * @returns The worker choice strategy instance. + */ + public static getWorkerChoiceStrategy< + Worker extends AbstractPoolWorker, + Data, + Response + > ( + pool: IPoolInternal, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ): IWorkerChoiceStrategy { + switch (workerChoiceStrategy) { + case WorkerChoiceStrategies.ROUND_ROBIN: + return new RoundRobinWorkerChoiceStrategy(pool) + case WorkerChoiceStrategies.LESS_RECENTLY_USED: + return new LessRecentlyUsedWorkerChoiceStrategy(pool) + default: + throw new Error( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + `Worker choice strategy '${workerChoiceStrategy}' not found` + ) + } + } +} diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts new file mode 100644 index 00000000..3de73532 --- /dev/null +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -0,0 +1,85 @@ +import type { AbstractPoolWorker } from '../abstract-pool-worker' +import type { IPoolInternal } from '../pool-internal' +import { PoolType } from '../pool-internal' +import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy' +import type { + IWorkerChoiceStrategy, + WorkerChoiceStrategy +} from './selection-strategies-types' +import { WorkerChoiceStrategies } from './selection-strategies-types' +import { SelectionStrategiesUtils } from './selection-strategies-utils' + +/** + * The worker choice strategy context. + * + * @template Worker Type of worker. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export class WorkerChoiceStrategyContext< + Worker extends AbstractPoolWorker, + Data, + Response +> { + // Will be set by setter in constructor + private workerChoiceStrategy!: IWorkerChoiceStrategy + + /** + * Worker choice strategy context constructor. + * + * @param pool The pool instance. + * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool. + * @param workerChoiceStrategy The worker choice strategy. + */ + public constructor ( + private readonly pool: IPoolInternal, + private createDynamicallyWorkerCallback: () => Worker, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ) { + this.setWorkerChoiceStrategy(workerChoiceStrategy) + } + + /** + * Get the worker choice strategy instance specific to the pool type. + * + * @param workerChoiceStrategy The worker choice strategy. + * @returns The worker choice strategy instance for the pool type. + */ + private getPoolWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ): IWorkerChoiceStrategy { + if (this.pool.type === PoolType.DYNAMIC) { + return new DynamicPoolWorkerChoiceStrategy( + this.pool, + this.createDynamicallyWorkerCallback, + workerChoiceStrategy + ) + } + return SelectionStrategiesUtils.getWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) + } + + /** + * Set the worker choice strategy to use in the context. + * + * @param workerChoiceStrategy The worker choice strategy to set. + */ + public setWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy + ): void { + this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( + workerChoiceStrategy + ) + } + + /** + * Choose a worker with the underlying selection strategy. + * + * @returns The chosen one. + */ + public execute (): Worker { + return this.workerChoiceStrategy.choose() + } +} diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 361c615c..a5b9eb7c 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,4 +1,4 @@ -import type { PoolOptions } from '../abstract-pool' +import type { PoolOptions } from '../pool' import { PoolType } from '../pool-internal' import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index e3f9602c..da3c0138 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -1,7 +1,7 @@ import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads' import type { Draft, MessageValue } from '../../utility-types' -import type { PoolOptions } from '../abstract-pool' import { AbstractPool } from '../abstract-pool' +import type { PoolOptions } from '../pool' import { PoolType } from '../pool-internal' /** diff --git a/src/utility-types.ts b/src/utility-types.ts index 4e2f3de4..c9c5492c 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,6 +1,6 @@ import type { Worker as ClusterWorker } from 'cluster' import type { MessagePort } from 'worker_threads' -import type { IWorker } from './pools/abstract-pool' +import type { AbstractPoolWorker } from './pools/abstract-pool-worker' import type { KillBehavior } from './worker/worker-options' /** @@ -46,7 +46,7 @@ export interface MessageValue< * @template Response Type of response of execution. This can only be serializable data. */ export interface PromiseWorkerResponseWrapper< - Worker extends IWorker, + Worker extends AbstractPoolWorker, Response = unknown > { /** diff --git a/tests/pools/selection-strategies.test.js b/tests/pools/selection-strategies.test.js index 9071b571..c554ab14 100644 --- a/tests/pools/selection-strategies.test.js +++ b/tests/pools/selection-strategies.test.js @@ -42,6 +42,48 @@ describe('Selection strategies test suite', () => { await pool.destroy() }) + it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => { + const max = 3 + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + ) + expect(pool.opts.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.ROUND_ROBIN + ) + // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` + const promises = [] + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + await Promise.all(promises) + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => { + const min = 0 + const max = 3 + const pool = new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN } + ) + expect(pool.opts.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.ROUND_ROBIN + ) + // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose` + const promises = [] + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + await Promise.all(promises) + // We need to clean up the resources after our test + await pool.destroy() + }) + it('Verify LESS_RECENTLY_USED strategy is taken at pool creation', async () => { const max = 3 const pool = new FixedThreadPool( diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 92fbd9b8..94320429 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -26,7 +26,8 @@ describe('Dynamic thread pool test suite', () => { for (let i = 0; i < max * 2; i++) { promises.push(pool.execute({ test: 'test' })) } - expect(pool.workers.length).toBe(max) + expect(pool.workers.length).toBeLessThanOrEqual(max) + expect(pool.workers.length).toBeGreaterThan(min) // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) diff --git a/tests/test-utils.js b/tests/test-utils.js index 8565c750..a34cd788 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -35,7 +35,7 @@ class TestUtils { /** * Intentionally inefficient implementation. * - * @param {*} n + * @param {number} n * @returns {number} */ static fibonacci (n) { @@ -46,7 +46,7 @@ class TestUtils { /** * Intentionally inefficient implementation. * - * @param {*} n + * @param {number} n * @returns {number} */ static factorial (n) { diff --git a/tsconfig.json b/tsconfig.json index 2ac8fa7d..e3c2919e 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -5,7 +5,8 @@ "outDir": "lib", "esModuleInterop": true, "declaration": true, - "strict": true + "strict": true, + "importsNotUsedAsValues": "error" }, "include": ["src/**/*.ts"], "exclude": ["node_modules"] -- 2.34.1