From a35560bac09e829e1e19f88f8fd1d71a64c9d50b Mon Sep 17 00:00:00 2001 From: Shinigami Date: Sun, 21 Feb 2021 15:01:33 +0100 Subject: [PATCH] Extract selection strategies to classes (#176) MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Co-authored-by: Shinigami92 Co-authored-by: Jérôme Benoit --- .eslintrc.js | 5 +- .vscode/settings.json | 1 + package-lock.json | 30 ++- src/index.ts | 2 + src/pools/abstract-pool.ts | 124 +++++----- src/pools/cluster/dynamic.ts | 40 +-- src/pools/cluster/fixed.ts | 8 +- src/pools/pool-internal.ts | 81 ++++++ src/pools/pool.ts | 8 + src/pools/selection-strategies.ts | 272 +++++++++++++++++++++ src/pools/thread/dynamic.ts | 39 +-- src/pools/thread/fixed.ts | 8 +- src/worker/abstract-worker.ts | 2 +- tests/pools/abstract/abstract-pool.test.js | 2 +- tests/pools/cluster/dynamic.test.js | 2 +- tests/pools/cluster/fixed.test.js | 2 +- tests/pools/selection-strategies.test.js | 56 +++++ tests/pools/thread/dynamic.test.js | 2 +- tests/pools/thread/fixed.test.js | 2 +- 19 files changed, 524 insertions(+), 162 deletions(-) create mode 100644 src/pools/pool-internal.ts create mode 100644 src/pools/selection-strategies.ts create mode 100644 tests/pools/selection-strategies.test.js diff --git a/.eslintrc.js b/.eslintrc.js index c3c51fe7..a5a0075e 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -50,12 +50,13 @@ module.exports = { 'warn', { skipWords: [ - 'poolifier', 'christopher', 'ecma', 'enum', + 'inheritdoc', 'jsdoc', 'pioardi', + 'poolifier', 'readonly', 'serializable', 'unregister', @@ -70,6 +71,8 @@ module.exports = { files: ['src/**/*.ts'], extends: 'plugin:jsdoc/recommended', rules: { + 'no-useless-constructor': 'off', + 'jsdoc/match-description': [ 'warn', { diff --git a/.vscode/settings.json b/.vscode/settings.json index cf389d79..886a5af3 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,6 +6,7 @@ "Dependabot", "Gitter", "Shinigami", + "inheritdoc", "lcov", "loglevel", "markdownlint", diff --git a/package-lock.json b/package-lock.json index 9713230d..24bd1309 100644 --- a/package-lock.json +++ b/package-lock.json @@ -824,15 +824,15 @@ } }, "array-includes": { - "version": "3.1.2", - "resolved": "https://registry.npmjs.org/array-includes/-/array-includes-3.1.2.tgz", - "integrity": "sha512-w2GspexNQpx+PutG3QpT437/BenZBj0M/MZGn5mzv/MofYqo0xmRHzn4lFsoDlWJ+THYsGJmFlW68WlDFx7VRw==", + "version": "3.1.3", + "resolved": "https://registry.npmjs.org/array-includes/-/array-includes-3.1.3.tgz", + "integrity": "sha512-gcem1KlBU7c9rB+Rq8/3PPKsK2kjqeEBa3bD5kkQo4nYlOHQCJqIJFqBXDEfwaRuYTT4E+FxA9xez7Gf/e3Q7A==", "dev": true, "requires": { - "call-bind": "^1.0.0", + "call-bind": "^1.0.2", "define-properties": "^1.1.3", - "es-abstract": "^1.18.0-next.1", - "get-intrinsic": "^1.0.1", + "es-abstract": "^1.18.0-next.2", + "get-intrinsic": "^1.1.1", "is-string": "^1.0.5" } }, @@ -1896,9 +1896,9 @@ } }, "file-entry-cache": { - "version": "6.0.0", - "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.0.tgz", - "integrity": "sha512-fqoO76jZ3ZnYrXLDRxBR1YvOvc0k844kcOg40bgsPrE25LAb/PDqTY+ho64Xh2c8ZXgIKldchCFHczG2UVRcWA==", + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz", + "integrity": "sha512-7Gps/XWymbLk2QLYK4NzpMOrYjMhdIxXuIvy2QBsLE6ljuodKvdkWs/cpyJJ3CVIVpH0Oi1Hvg1ovbMzLdFBBg==", "dev": true, "requires": { "flat-cache": "^3.0.4" @@ -2833,9 +2833,9 @@ } }, "lodash": { - "version": "4.17.20", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz", - "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==", + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, "lodash.flattendeep": { @@ -3754,6 +3754,12 @@ "integrity": "sha512-cyFDKrqc/YdcWFniJhzI42+AzS+gNwmUzOSFcRCQYwySuBBBy/KjuxWLZ/FHEH6Moq1NizMOBWyTcv8O4OZIMg==", "dev": true }, + "lodash": { + "version": "4.17.20", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz", + "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==", + "dev": true + }, "resolve": { "version": "1.19.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz", diff --git a/src/index.ts b/src/index.ts index 6cc41500..2bae50e7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,8 @@ export { DynamicClusterPool } from './pools/cluster/dynamic' export { FixedClusterPool } from './pools/cluster/fixed' export type { ClusterPoolOptions } from './pools/cluster/fixed' export type { IPool } from './pools/pool' +export { WorkerChoiceStrategies } from './pools/selection-strategies' +export type { WorkerChoiceStrategy } from './pools/selection-strategies' export { DynamicThreadPool } from './pools/thread/dynamic' export { FixedThreadPool } from './pools/thread/fixed' export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 0435a32c..7944606d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,12 +1,17 @@ -import EventEmitter from 'events' import type { MessageValue } from '../utility-types' -import type { IPool } from './pool' +import type { IPoolInternal } from './pool-internal' +import { PoolEmitter } from './pool-internal' +import type { WorkerChoiceStrategy } from './selection-strategies' +import { + WorkerChoiceStrategies, + WorkerChoiceStrategyContext +} from './selection-strategies' /** * An intentional empty function. */ -function emptyFunction () { - // intentionally left blank +const EMPTY_FUNCTION: () => void = () => { + /* Intentionally empty */ } /** @@ -83,13 +88,12 @@ export interface PoolOptions { * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n) */ maxTasks?: number + /** + * The work choice strategy to use in this pool. + */ + workerChoiceStrategy?: WorkerChoiceStrategy } -/** - * Internal poolifier pool emitter. - */ -class PoolEmitter extends EventEmitter {} - /** * Base class containing some shared logic for all poolifier pools. * @@ -101,32 +105,14 @@ export abstract class AbstractPool< Worker extends IWorker, Data = unknown, Response = unknown -> implements IPool { - /** - * List of currently available workers. - */ +> implements IPoolInternal { + /** @inheritdoc */ public readonly workers: Worker[] = [] - /** - * Index for the next worker. - */ - public nextWorkerIndex: number = 0 - - /** - * The tasks map. - * - * - `key`: The `Worker` - * - `value`: Number of tasks currently in progress on the worker. - */ + /** @inheritdoc */ public readonly tasks: Map = new Map() - /** - * Emitter on which events can be listened to. - * - * Events that can currently be listened to: - * - * - `'FullPool'` - */ + /** @inheritdoc */ public readonly emitter: PoolEmitter /** @@ -134,6 +120,17 @@ export abstract class AbstractPool< */ protected nextMessageId: number = 0 + /** + * Worker choice strategy instance implementing the worker choice algorithm. + * + * Default to a strategy implementing a round robin algorithm. + */ + protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< + Worker, + Data, + Response + > + /** * Constructs a new poolifier pool. * @@ -157,20 +154,33 @@ export abstract class AbstractPool< } this.emitter = new PoolEmitter() + this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext( + this, + opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN + ) } - private checkFilePath (filePath: string) { + private checkFilePath (filePath: string): void { if (!filePath) { throw new Error('Please specify a file with a worker implementation') } } - /** - * Perform the task specified in the constructor with the data parameter. - * - * @param data The input for the specified task. This can only be serializable data. - * @returns Promise that will be resolved when the task is successfully completed. - */ + /** @inheritdoc */ + public isDynamic (): boolean { + return false + } + + /** @inheritdoc */ + public setWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy + ): void { + this.workerChoiceStrategyContext.setWorkerChoiceStrategy( + workerChoiceStrategy + ) + } + + /** @inheritdoc */ public execute (data: Data): Promise { // Configure worker to handle message with the specified task const worker = this.chooseWorker() @@ -181,19 +191,13 @@ export abstract class AbstractPool< return res } - /** - * Shut down every current worker in this pool. - */ + /** @inheritdoc */ public async destroy (): Promise { await Promise.all(this.workers.map(worker => this.destroyWorker(worker))) } - /** - * Shut down given worker. - * - * @param worker A worker within `workers`. - */ - protected abstract destroyWorker (worker: Worker): void | Promise + /** @inheritdoc */ + public abstract destroyWorker (worker: Worker): void | Promise /** * Setup hook that can be overridden by a Poolifier pool implementation @@ -232,7 +236,7 @@ export abstract class AbstractPool< * @param worker Worker whose tasks are set. * @param step Worker number of tasks step. */ - private stepWorkerNumberOfTasks (worker: Worker, step: number) { + private stepWorkerNumberOfTasks (worker: Worker, step: number): void { const numberOfTasksInProgress = this.tasks.get(worker) if (numberOfTasksInProgress !== undefined) { this.tasks.set(worker, numberOfTasksInProgress + step) @@ -261,12 +265,7 @@ export abstract class AbstractPool< * @returns Worker. */ protected chooseWorker (): Worker { - const chosenWorker = this.workers[this.nextWorkerIndex] - this.nextWorkerIndex = - this.workers.length - 1 === this.nextWorkerIndex - ? 0 - : this.nextWorkerIndex + 1 - return chosenWorker + return this.workerChoiceStrategyContext.execute() } /** @@ -280,7 +279,8 @@ export abstract class AbstractPool< message: MessageValue ): void - protected abstract registerWorkerMessageListener< + /** @inheritdoc */ + public abstract registerWorkerMessageListener< Message extends Data | Response > (worker: Worker, listener: (message: MessageValue) => void): void @@ -319,17 +319,13 @@ export abstract class AbstractPool< */ protected abstract afterWorkerSetup (worker: Worker): void - /** - * Creates a new worker for this pool and sets it up completely. - * - * @returns New, completely set up worker. - */ - protected createAndSetupWorker (): Worker { + /** @inheritdoc */ + public createAndSetupWorker (): Worker { const worker: Worker = this.createWorker() - worker.on('error', this.opts.errorHandler ?? emptyFunction) - worker.on('online', this.opts.onlineHandler ?? emptyFunction) - worker.on('exit', this.opts.exitHandler ?? emptyFunction) + worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) + worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) + worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => this.removeWorker(worker)) this.workers.push(worker) diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index 2a25f7b3..5445bd03 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -1,5 +1,3 @@ -import type { Worker } from 'cluster' -import { isKillBehavior, KillBehaviors } from '../../worker/worker-options' import type { ClusterPoolOptions } from './fixed' import { FixedClusterPool } from './fixed' @@ -36,40 +34,8 @@ export class DynamicClusterPool< super(min, filePath, opts) } - /** - * Choose a worker for the next task. - * - * It will first check for and return an idle worker. - * If all workers are busy, then it will try to create a new one up to the `max` worker count. - * If the max worker count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load. - * - * @returns Cluster worker. - */ - protected chooseWorker (): Worker { - for (const [worker, numberOfTasks] of this.tasks) { - if (numberOfTasks === 0) { - // A worker is free, use it - return worker - } - } - - if (this.workers.length === this.max) { - this.emitter.emit('FullPool') - return super.chooseWorker() - } - - // All workers are busy, create a new worker - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { - const tasksInProgress = this.tasks.get(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - tasksInProgress === 0 - ) { - // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.destroyWorker(workerCreated) - } - }) - return workerCreated + /** @inheritdoc */ + public isDynamic (): boolean { + return true } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 616523cb..6246320e 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -58,7 +58,8 @@ export class FixedClusterPool< return isMaster } - protected destroyWorker (worker: Worker): void { + /** @inheritdoc */ + public destroyWorker (worker: Worker): void { this.sendToWorker(worker, { kill: 1 }) worker.kill() } @@ -67,7 +68,8 @@ export class FixedClusterPool< worker.send(message) } - protected registerWorkerMessageListener ( + /** @inheritdoc */ + public registerWorkerMessageListener ( worker: Worker, listener: (message: MessageValue) => void ): void { @@ -86,7 +88,7 @@ export class FixedClusterPool< } protected afterWorkerSetup (worker: Worker): void { - // we will attach a listener for every task, + // We will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size worker.setMaxListeners(this.opts.maxTasks ?? 1000) } diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts new file mode 100644 index 00000000..ddee27ea --- /dev/null +++ b/src/pools/pool-internal.ts @@ -0,0 +1,81 @@ +import EventEmitter from 'events' +import type { MessageValue } from '../utility-types' +import type { IWorker } from './abstract-pool' +import type { IPool } from './pool' + +/** + * Internal poolifier pool emitter. + */ +export class PoolEmitter extends EventEmitter {} + +/** + * Internal contract definition for a poolifier pool. + * + * @template Worker Type of worker which manages this pool. + * @template Data Type of data sent to the worker. + * @template Response Type of response of execution. + */ +export interface IPoolInternal< + Worker extends IWorker, + Data = unknown, + Response = unknown +> extends IPool { + /** + * List of currently available workers. + */ + readonly workers: Worker[] + + /** + * The tasks map. + * + * - `key`: The `Worker` + * - `value`: Number of tasks currently in progress on the worker. + */ + readonly tasks: Map + + /** + * Emitter on which events can be listened to. + * + * Events that can currently be listened to: + * + * - `'FullPool'` + */ + readonly emitter: PoolEmitter + + /** + * Maximum number of workers that can be created by this pool. + */ + readonly max?: number + + /** + * Whether the pool is dynamic or not. + * + * If it is dynamic, it provides the `max` property. + */ + isDynamic(): boolean + + /** + * Creates a new worker for this pool and sets it up completely. + * + * @returns New, completely set up worker. + */ + createAndSetupWorker(): Worker + + /** + * Shut down given worker. + * + * @param worker A worker within `workers`. + */ + destroyWorker(worker: Worker): void | Promise + + /** + * Register a listener callback on a given worker. + * + * @param worker A worker. + * @param listener A message listener callback. + */ + registerWorkerMessageListener( + worker: Worker, + listener: (message: MessageValue) => void + ): void +} diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f48d8843..75204582 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -1,3 +1,5 @@ +import type { WorkerChoiceStrategy } from './selection-strategies' + /** * Contract definition for a poolifier pool. * @@ -16,4 +18,10 @@ export interface IPool { * Shut down every current worker in this pool. */ destroy(): Promise + /** + * Set the worker choice strategy in this pool. + * + * @param workerChoiceStrategy The worker choice strategy. + */ + setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void } diff --git a/src/pools/selection-strategies.ts b/src/pools/selection-strategies.ts new file mode 100644 index 00000000..5c40a71b --- /dev/null +++ b/src/pools/selection-strategies.ts @@ -0,0 +1,272 @@ +import { isKillBehavior, KillBehaviors } from '../worker/worker-options' +import type { IWorker } from './abstract-pool' +import type { IPoolInternal } from './pool-internal' + +/** + * Enumeration of worker choice strategies. + */ +export const WorkerChoiceStrategies = Object.freeze({ + /** + * Round robin worker selection strategy. + */ + ROUND_ROBIN: 'ROUND_ROBIN', + /** + * Less recently used worker selection strategy. + */ + LESS_RECENTLY_USED: 'LESS_RECENTLY_USED' +} as const) + +/** + * Worker choice strategy. + */ +export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies + +/** + * Worker choice strategy interface. + * + * @template Worker Type of worker which manages the strategy. + */ +interface IWorkerChoiceStrategy { + /** + * Choose a worker in the pool. + */ + choose(): Worker +} + +/** + * Selects the next worker in a round robin fashion. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +class RoundRobinWorkerChoiceStrategy + implements IWorkerChoiceStrategy { + /** + * Index for the next worker. + */ + private nextWorkerIndex: number = 0 + + /** + * Constructs a worker choice strategy that selects in a round robin fashion. + * + * @param pool The pool instance. + */ + public constructor ( + private readonly pool: IPoolInternal + ) {} + + /** @inheritdoc */ + public choose (): Worker { + const chosenWorker = this.pool.workers[this.nextWorkerIndex] + this.nextWorkerIndex = + this.pool.workers.length - 1 === this.nextWorkerIndex + ? 0 + : this.nextWorkerIndex + 1 + return chosenWorker + } +} + +/** + * Selects the less recently used worker. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +class LessRecentlyUsedWorkerChoiceStrategy< + Worker extends IWorker, + Data, + Response +> implements IWorkerChoiceStrategy { + /** + * Constructs a worker choice strategy that selects based on less recently used. + * + * @param pool The pool instance. + */ + public constructor ( + private readonly pool: IPoolInternal + ) {} + + /** @inheritdoc */ + public choose (): Worker { + let minNumberOfTasks = Infinity + // A worker is always found because it picks the one with fewer tasks + let lessRecentlyUsedWorker!: Worker + for (const [worker, numberOfTasks] of this.pool.tasks) { + if (numberOfTasks === 0) { + return worker + } else if (numberOfTasks < minNumberOfTasks) { + minNumberOfTasks = numberOfTasks + lessRecentlyUsedWorker = worker + } + } + return lessRecentlyUsedWorker + } +} + +/** + * Get the worker choice strategy instance. + * + * @param pool The pool instance. + * @param workerChoiceStrategy The worker choice strategy. + * @returns The worker choice strategy instance. + */ +function getWorkerChoiceStrategy ( + pool: IPoolInternal, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN +): IWorkerChoiceStrategy { + switch (workerChoiceStrategy) { + case WorkerChoiceStrategies.ROUND_ROBIN: + return new RoundRobinWorkerChoiceStrategy(pool) + case WorkerChoiceStrategies.LESS_RECENTLY_USED: + return new LessRecentlyUsedWorkerChoiceStrategy(pool) + default: + throw new Error( + `Worker choice strategy '${workerChoiceStrategy}' not found` + ) + } +} + +/** + * Dynamically choose a worker. + * + * @template Worker Type of worker which manages the strategy. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +class DynamicPoolWorkerChoiceStrategy + implements IWorkerChoiceStrategy { + private workerChoiceStrategy: IWorkerChoiceStrategy + + /** + * Constructs a worker choice strategy for dynamical pools. + * + * @param pool The pool instance. + * @param workerChoiceStrategy The worker choice strategy when the pull is full. + */ + public constructor ( + private readonly pool: IPoolInternal, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ) { + this.workerChoiceStrategy = getWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) + } + + /** + * Find a free worker based on number of tasks the worker has applied. + * + * If a worker was found that has `0` tasks, it is detected as free and will be returned. + * + * If no free worker was found, `null` will be returned. + * + * @returns A free worker if there was one, otherwise `null`. + */ + private findFreeWorkerBasedOnTasks (): Worker | null { + for (const [worker, numberOfTasks] of this.pool.tasks) { + if (numberOfTasks === 0) { + // A worker is free, use it + return worker + } + } + return null + } + + /** @inheritdoc */ + public choose (): Worker { + const freeWorker = this.findFreeWorkerBasedOnTasks() + if (freeWorker) { + return freeWorker + } + + if (this.pool.workers.length === this.pool.max) { + this.pool.emitter.emit('FullPool') + return this.workerChoiceStrategy.choose() + } + + // All workers are busy, create a new worker + const workerCreated = this.pool.createAndSetupWorker() + this.pool.registerWorkerMessageListener(workerCreated, message => { + const tasksInProgress = this.pool.tasks.get(workerCreated) + if ( + isKillBehavior(KillBehaviors.HARD, message.kill) || + tasksInProgress === 0 + ) { + // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) + void this.pool.destroyWorker(workerCreated) + } + }) + return workerCreated + } +} + +/** + * The worker choice strategy context. + * + * @template Worker Type of worker. + * @template Data Type of data sent to the worker. This can only be serializable data. + * @template Response Type of response of execution. This can only be serializable data. + */ +export class WorkerChoiceStrategyContext< + Worker extends IWorker, + Data, + Response +> { + // Will be set by setter in constructor + private workerChoiceStrategy!: IWorkerChoiceStrategy + + /** + * Worker choice strategy context constructor. + * + * @param pool The pool instance. + * @param workerChoiceStrategy The worker choice strategy. + */ + public constructor ( + private readonly pool: IPoolInternal, + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ) { + this.setWorkerChoiceStrategy(workerChoiceStrategy) + } + + /** + * Get the worker choice strategy instance specific to the pool type. + * + * @param workerChoiceStrategy The worker choice strategy. + * @returns The worker choice strategy instance for the pool type. + */ + private getPoolWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN + ): IWorkerChoiceStrategy { + if (this.pool.isDynamic()) { + return new DynamicPoolWorkerChoiceStrategy( + this.pool, + workerChoiceStrategy + ) + } + return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy) + } + + /** + * Set the worker choice strategy to use in the context. + * + * @param workerChoiceStrategy The worker choice strategy to set. + */ + public setWorkerChoiceStrategy ( + workerChoiceStrategy: WorkerChoiceStrategy + ): void { + this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy( + workerChoiceStrategy + ) + } + + /** + * Choose a worker with the underlying selection strategy. + * + * @returns The chosen one. + */ + public execute (): Worker { + return this.workerChoiceStrategy.choose() + } +} diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index 3e74a4ea..fcf4b2ff 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -1,4 +1,3 @@ -import { isKillBehavior, KillBehaviors } from '../../worker/worker-options' import type { PoolOptions } from '../abstract-pool' import type { ThreadWorkerWithMessageChannel } from './fixed' import { FixedThreadPool } from './fixed' @@ -36,40 +35,8 @@ export class DynamicThreadPool< super(min, filePath, opts) } - /** - * Choose a thread for the next task. - * - * It will first check for and return an idle thread. - * If all threads are busy, then it will try to create a new one up to the `max` thread count. - * If the max thread count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load. - * - * @returns Thread worker. - */ - protected chooseWorker (): ThreadWorkerWithMessageChannel { - for (const [worker, numberOfTasks] of this.tasks) { - if (numberOfTasks === 0) { - // A worker is free, use it - return worker - } - } - - if (this.workers.length === this.max) { - this.emitter.emit('FullPool') - return super.chooseWorker() - } - - // All workers are busy, create a new worker - const workerCreated = this.createAndSetupWorker() - this.registerWorkerMessageListener(workerCreated, message => { - const tasksInProgress = this.tasks.get(workerCreated) - if ( - isKillBehavior(KillBehaviors.HARD, message.kill) || - tasksInProgress === 0 - ) { - // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) - void this.destroyWorker(workerCreated) - } - }) - return workerCreated + /** @inheritdoc */ + public isDynamic (): boolean { + return true } } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index a61c10b2..021de492 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -44,7 +44,8 @@ export class FixedThreadPool< return isMainThread } - protected async destroyWorker ( + /** @inheritdoc */ + public async destroyWorker ( worker: ThreadWorkerWithMessageChannel ): Promise { this.sendToWorker(worker, { kill: 1 }) @@ -58,7 +59,8 @@ export class FixedThreadPool< worker.postMessage(message) } - protected registerWorkerMessageListener ( + /** @inheritdoc */ + public registerWorkerMessageListener ( messageChannel: ThreadWorkerWithMessageChannel, listener: (message: MessageValue) => void ): void { @@ -83,7 +85,7 @@ export class FixedThreadPool< worker.postMessage({ parent: port1 }, [port1]) worker.port1 = port1 worker.port2 = port2 - // we will attach a listener for every task, + // We will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) } diff --git a/src/worker/abstract-worker.ts b/src/worker/abstract-worker.ts index d41115bc..4f741b47 100644 --- a/src/worker/abstract-worker.ts +++ b/src/worker/abstract-worker.ts @@ -101,7 +101,7 @@ export abstract class AbstractWorker< * * @param fn The function that should be defined. */ - private checkFunctionInput (fn: (data: Data) => Response) { + private checkFunctionInput (fn: (data: Data) => Response): void { if (!fn) throw new Error('fn parameter is mandatory') } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index bc2385f4..fe6aad35 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -14,7 +14,7 @@ class StubPoolWithIsMainMethod extends FixedThreadPool { } } -describe('Abstract pool test suite ', () => { +describe('Abstract pool test suite', () => { it('Simulate worker not found during increaseWorkersTask', () => { const pool = new StubPoolWithTasksMapClear( 1, diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 2cff475f..53a5f8d0 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -12,7 +12,7 @@ const pool = new DynamicClusterPool( } ) -describe('Dynamic cluster pool test suite ', () => { +describe('Dynamic cluster pool test suite', () => { it('Verify that the function is executed in a worker cluster', async () => { const result = await pool.execute({ test: 'test' }) expect(result).toBeDefined() diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index 63399c6e..ef44e6f1 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -41,7 +41,7 @@ const asyncPool = new FixedClusterPool( } ) -describe('Fixed cluster pool test suite ', () => { +describe('Fixed cluster pool test suite', () => { after('Destroy all pools', async () => { // We need to clean up the resources after our test await echoPool.destroy() diff --git a/tests/pools/selection-strategies.test.js b/tests/pools/selection-strategies.test.js new file mode 100644 index 00000000..186dee7b --- /dev/null +++ b/tests/pools/selection-strategies.test.js @@ -0,0 +1,56 @@ +const expect = require('expect') +const { + WorkerChoiceStrategies, + DynamicThreadPool, + FixedThreadPool +} = require('../../lib/index') +const TestUtils = require('../test-utils') + +describe('Selection strategies test suite', () => { + it('Verify that WorkerChoiceStrategies enumeration provides string values', () => { + expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN') + expect(WorkerChoiceStrategies.LESS_RECENTLY_USED).toBe('LESS_RECENTLY_USED') + }) + + it('Verify LESS_RECENTLY_USED is taken', async () => { + const max = 3 + const pool = new FixedThreadPool( + max, + './tests/worker-files/thread/testWorker.js', + { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED } + ) + + expect(pool.opts.workerChoiceStrategy).toBe( + WorkerChoiceStrategies.LESS_RECENTLY_USED + ) + + // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose` + const promises = [] + for (let i = 0; i < max * 2; i++) { + promises.push(pool.execute({ test: 'test' })) + } + await Promise.all(promises) + + // We need to clean up the resources after our test + await pool.destroy() + }) + + it('Verify unknown strategies throw error', () => { + const min = 1 + const max = 3 + expect( + () => + new DynamicThreadPool( + min, + max, + './tests/worker-files/thread/testWorker.js', + { + maxTasks: 1000, + workerChoiceStrategy: 'UNKNOWN_STRATEGY' + } + ) + ).toThrowError( + new Error("Worker choice strategy 'UNKNOWN_STRATEGY' not found") + ) + }) +}) diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 66aeaa3a..3a162267 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -12,7 +12,7 @@ const pool = new DynamicThreadPool( } ) -describe('Dynamic thread pool test suite ', () => { +describe('Dynamic thread pool test suite', () => { it('Verify that the function is executed in a worker thread', async () => { const result = await pool.execute({ test: 'test' }) expect(result).toBeDefined() diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 2a7d005b..881813c0 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -32,7 +32,7 @@ const asyncPool = new FixedThreadPool( { maxTasks: maxTasks } ) -describe('Fixed thread pool test suite ', () => { +describe('Fixed thread pool test suite', () => { it('Choose worker round robin test', async () => { const results = new Set() for (let i = 0; i < numberOfThreads; i++) { -- 2.34.1