'warn',
{
skipWords: [
- 'poolifier',
'christopher',
'ecma',
'enum',
+ 'inheritdoc',
'jsdoc',
'pioardi',
+ 'poolifier',
'readonly',
'serializable',
'unregister',
files: ['src/**/*.ts'],
extends: 'plugin:jsdoc/recommended',
rules: {
+ 'no-useless-constructor': 'off',
+
'jsdoc/match-description': [
'warn',
{
"Dependabot",
"Gitter",
"Shinigami",
+ "inheritdoc",
"lcov",
"loglevel",
"markdownlint",
}
},
"array-includes": {
- "version": "3.1.2",
- "resolved": "https://registry.npmjs.org/array-includes/-/array-includes-3.1.2.tgz",
- "integrity": "sha512-w2GspexNQpx+PutG3QpT437/BenZBj0M/MZGn5mzv/MofYqo0xmRHzn4lFsoDlWJ+THYsGJmFlW68WlDFx7VRw==",
+ "version": "3.1.3",
+ "resolved": "https://registry.npmjs.org/array-includes/-/array-includes-3.1.3.tgz",
+ "integrity": "sha512-gcem1KlBU7c9rB+Rq8/3PPKsK2kjqeEBa3bD5kkQo4nYlOHQCJqIJFqBXDEfwaRuYTT4E+FxA9xez7Gf/e3Q7A==",
"dev": true,
"requires": {
- "call-bind": "^1.0.0",
+ "call-bind": "^1.0.2",
"define-properties": "^1.1.3",
- "es-abstract": "^1.18.0-next.1",
- "get-intrinsic": "^1.0.1",
+ "es-abstract": "^1.18.0-next.2",
+ "get-intrinsic": "^1.1.1",
"is-string": "^1.0.5"
}
},
}
},
"file-entry-cache": {
- "version": "6.0.0",
- "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.0.tgz",
- "integrity": "sha512-fqoO76jZ3ZnYrXLDRxBR1YvOvc0k844kcOg40bgsPrE25LAb/PDqTY+ho64Xh2c8ZXgIKldchCFHczG2UVRcWA==",
+ "version": "6.0.1",
+ "resolved": "https://registry.npmjs.org/file-entry-cache/-/file-entry-cache-6.0.1.tgz",
+ "integrity": "sha512-7Gps/XWymbLk2QLYK4NzpMOrYjMhdIxXuIvy2QBsLE6ljuodKvdkWs/cpyJJ3CVIVpH0Oi1Hvg1ovbMzLdFBBg==",
"dev": true,
"requires": {
"flat-cache": "^3.0.4"
}
},
"lodash": {
- "version": "4.17.20",
- "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz",
- "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==",
+ "version": "4.17.21",
+ "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
+ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==",
"dev": true
},
"lodash.flattendeep": {
"integrity": "sha512-cyFDKrqc/YdcWFniJhzI42+AzS+gNwmUzOSFcRCQYwySuBBBy/KjuxWLZ/FHEH6Moq1NizMOBWyTcv8O4OZIMg==",
"dev": true
},
+ "lodash": {
+ "version": "4.17.20",
+ "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz",
+ "integrity": "sha512-PlhdFcillOINfeV7Ni6oF1TAEayyZBoZ8bcshTHqOYJYlrqzRK5hagpagky5o4HfCzzd1TRkXPMFq6cKk9rGmA==",
+ "dev": true
+ },
"resolve": {
"version": "1.19.0",
"resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz",
export { FixedClusterPool } from './pools/cluster/fixed'
export type { ClusterPoolOptions } from './pools/cluster/fixed'
export type { IPool } from './pools/pool'
+export { WorkerChoiceStrategies } from './pools/selection-strategies'
+export type { WorkerChoiceStrategy } from './pools/selection-strategies'
export { DynamicThreadPool } from './pools/thread/dynamic'
export { FixedThreadPool } from './pools/thread/fixed'
export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed'
-import EventEmitter from 'events'
import type { MessageValue } from '../utility-types'
-import type { IPool } from './pool'
+import type { IPoolInternal } from './pool-internal'
+import { PoolEmitter } from './pool-internal'
+import type { WorkerChoiceStrategy } from './selection-strategies'
+import {
+ WorkerChoiceStrategies,
+ WorkerChoiceStrategyContext
+} from './selection-strategies'
/**
* An intentional empty function.
*/
-function emptyFunction () {
- // intentionally left blank
+const EMPTY_FUNCTION: () => void = () => {
+ /* Intentionally empty */
}
/**
* @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
*/
maxTasks?: number
+ /**
+ * The work choice strategy to use in this pool.
+ */
+ workerChoiceStrategy?: WorkerChoiceStrategy
}
-/**
- * Internal poolifier pool emitter.
- */
-class PoolEmitter extends EventEmitter {}
-
/**
* Base class containing some shared logic for all poolifier pools.
*
Worker extends IWorker,
Data = unknown,
Response = unknown
-> implements IPool<Data, Response> {
- /**
- * List of currently available workers.
- */
+> implements IPoolInternal<Worker, Data, Response> {
+ /** @inheritdoc */
public readonly workers: Worker[] = []
- /**
- * Index for the next worker.
- */
- public nextWorkerIndex: number = 0
-
- /**
- * The tasks map.
- *
- * - `key`: The `Worker`
- * - `value`: Number of tasks currently in progress on the worker.
- */
+ /** @inheritdoc */
public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
- /**
- * Emitter on which events can be listened to.
- *
- * Events that can currently be listened to:
- *
- * - `'FullPool'`
- */
+ /** @inheritdoc */
public readonly emitter: PoolEmitter
/**
*/
protected nextMessageId: number = 0
+ /**
+ * Worker choice strategy instance implementing the worker choice algorithm.
+ *
+ * Default to a strategy implementing a round robin algorithm.
+ */
+ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
+ Worker,
+ Data,
+ Response
+ >
+
/**
* Constructs a new poolifier pool.
*
}
this.emitter = new PoolEmitter()
+ this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+ this,
+ opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
+ )
}
- private checkFilePath (filePath: string) {
+ private checkFilePath (filePath: string): void {
if (!filePath) {
throw new Error('Please specify a file with a worker implementation')
}
}
- /**
- * Perform the task specified in the constructor with the data parameter.
- *
- * @param data The input for the specified task. This can only be serializable data.
- * @returns Promise that will be resolved when the task is successfully completed.
- */
+ /** @inheritdoc */
+ public isDynamic (): boolean {
+ return false
+ }
+
+ /** @inheritdoc */
+ public setWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
+ workerChoiceStrategy
+ )
+ }
+
+ /** @inheritdoc */
public execute (data: Data): Promise<Response> {
// Configure worker to handle message with the specified task
const worker = this.chooseWorker()
return res
}
- /**
- * Shut down every current worker in this pool.
- */
+ /** @inheritdoc */
public async destroy (): Promise<void> {
await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
}
- /**
- * Shut down given worker.
- *
- * @param worker A worker within `workers`.
- */
- protected abstract destroyWorker (worker: Worker): void | Promise<void>
+ /** @inheritdoc */
+ public abstract destroyWorker (worker: Worker): void | Promise<void>
/**
* Setup hook that can be overridden by a Poolifier pool implementation
* @param worker Worker whose tasks are set.
* @param step Worker number of tasks step.
*/
- private stepWorkerNumberOfTasks (worker: Worker, step: number) {
+ private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
const numberOfTasksInProgress = this.tasks.get(worker)
if (numberOfTasksInProgress !== undefined) {
this.tasks.set(worker, numberOfTasksInProgress + step)
* @returns Worker.
*/
protected chooseWorker (): Worker {
- const chosenWorker = this.workers[this.nextWorkerIndex]
- this.nextWorkerIndex =
- this.workers.length - 1 === this.nextWorkerIndex
- ? 0
- : this.nextWorkerIndex + 1
- return chosenWorker
+ return this.workerChoiceStrategyContext.execute()
}
/**
message: MessageValue<Data>
): void
- protected abstract registerWorkerMessageListener<
+ /** @inheritdoc */
+ public abstract registerWorkerMessageListener<
Message extends Data | Response
> (worker: Worker, listener: (message: MessageValue<Message>) => void): void
*/
protected abstract afterWorkerSetup (worker: Worker): void
- /**
- * Creates a new worker for this pool and sets it up completely.
- *
- * @returns New, completely set up worker.
- */
- protected createAndSetupWorker (): Worker {
+ /** @inheritdoc */
+ public createAndSetupWorker (): Worker {
const worker: Worker = this.createWorker()
- worker.on('error', this.opts.errorHandler ?? emptyFunction)
- worker.on('online', this.opts.onlineHandler ?? emptyFunction)
- worker.on('exit', this.opts.exitHandler ?? emptyFunction)
+ worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
+ worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
+ worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => this.removeWorker(worker))
this.workers.push(worker)
-import type { Worker } from 'cluster'
-import { isKillBehavior, KillBehaviors } from '../../worker/worker-options'
import type { ClusterPoolOptions } from './fixed'
import { FixedClusterPool } from './fixed'
super(min, filePath, opts)
}
- /**
- * Choose a worker for the next task.
- *
- * It will first check for and return an idle worker.
- * If all workers are busy, then it will try to create a new one up to the `max` worker count.
- * If the max worker count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load.
- *
- * @returns Cluster worker.
- */
- protected chooseWorker (): Worker {
- for (const [worker, numberOfTasks] of this.tasks) {
- if (numberOfTasks === 0) {
- // A worker is free, use it
- return worker
- }
- }
-
- if (this.workers.length === this.max) {
- this.emitter.emit('FullPool')
- return super.chooseWorker()
- }
-
- // All workers are busy, create a new worker
- const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener<Data>(workerCreated, message => {
- const tasksInProgress = this.tasks.get(workerCreated)
- if (
- isKillBehavior(KillBehaviors.HARD, message.kill) ||
- tasksInProgress === 0
- ) {
- // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void this.destroyWorker(workerCreated)
- }
- })
- return workerCreated
+ /** @inheritdoc */
+ public isDynamic (): boolean {
+ return true
}
}
return isMaster
}
- protected destroyWorker (worker: Worker): void {
+ /** @inheritdoc */
+ public destroyWorker (worker: Worker): void {
this.sendToWorker(worker, { kill: 1 })
worker.kill()
}
worker.send(message)
}
- protected registerWorkerMessageListener<Message extends Data | Response> (
+ /** @inheritdoc */
+ public registerWorkerMessageListener<Message extends Data | Response> (
worker: Worker,
listener: (message: MessageValue<Message>) => void
): void {
}
protected afterWorkerSetup (worker: Worker): void {
- // we will attach a listener for every task,
+ // We will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
worker.setMaxListeners(this.opts.maxTasks ?? 1000)
}
--- /dev/null
+import EventEmitter from 'events'
+import type { MessageValue } from '../utility-types'
+import type { IWorker } from './abstract-pool'
+import type { IPool } from './pool'
+
+/**
+ * Internal poolifier pool emitter.
+ */
+export class PoolEmitter extends EventEmitter {}
+
+/**
+ * Internal contract definition for a poolifier pool.
+ *
+ * @template Worker Type of worker which manages this pool.
+ * @template Data Type of data sent to the worker.
+ * @template Response Type of response of execution.
+ */
+export interface IPoolInternal<
+ Worker extends IWorker,
+ Data = unknown,
+ Response = unknown
+> extends IPool<Data, Response> {
+ /**
+ * List of currently available workers.
+ */
+ readonly workers: Worker[]
+
+ /**
+ * The tasks map.
+ *
+ * - `key`: The `Worker`
+ * - `value`: Number of tasks currently in progress on the worker.
+ */
+ readonly tasks: Map<Worker, number>
+
+ /**
+ * Emitter on which events can be listened to.
+ *
+ * Events that can currently be listened to:
+ *
+ * - `'FullPool'`
+ */
+ readonly emitter: PoolEmitter
+
+ /**
+ * Maximum number of workers that can be created by this pool.
+ */
+ readonly max?: number
+
+ /**
+ * Whether the pool is dynamic or not.
+ *
+ * If it is dynamic, it provides the `max` property.
+ */
+ isDynamic(): boolean
+
+ /**
+ * Creates a new worker for this pool and sets it up completely.
+ *
+ * @returns New, completely set up worker.
+ */
+ createAndSetupWorker(): Worker
+
+ /**
+ * Shut down given worker.
+ *
+ * @param worker A worker within `workers`.
+ */
+ destroyWorker(worker: Worker): void | Promise<void>
+
+ /**
+ * Register a listener callback on a given worker.
+ *
+ * @param worker A worker.
+ * @param listener A message listener callback.
+ */
+ registerWorkerMessageListener<Message extends Data | Response>(
+ worker: Worker,
+ listener: (message: MessageValue<Message>) => void
+ ): void
+}
+import type { WorkerChoiceStrategy } from './selection-strategies'
+
/**
* Contract definition for a poolifier pool.
*
* Shut down every current worker in this pool.
*/
destroy(): Promise<void>
+ /**
+ * Set the worker choice strategy in this pool.
+ *
+ * @param workerChoiceStrategy The worker choice strategy.
+ */
+ setWorkerChoiceStrategy(workerChoiceStrategy: WorkerChoiceStrategy): void
}
--- /dev/null
+import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { IWorker } from './abstract-pool'
+import type { IPoolInternal } from './pool-internal'
+
+/**
+ * Enumeration of worker choice strategies.
+ */
+export const WorkerChoiceStrategies = Object.freeze({
+ /**
+ * Round robin worker selection strategy.
+ */
+ ROUND_ROBIN: 'ROUND_ROBIN',
+ /**
+ * Less recently used worker selection strategy.
+ */
+ LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
+} as const)
+
+/**
+ * Worker choice strategy.
+ */
+export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
+
+/**
+ * Worker choice strategy interface.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ */
+interface IWorkerChoiceStrategy<Worker extends IWorker> {
+ /**
+ * Choose a worker in the pool.
+ */
+ choose(): Worker
+}
+
+/**
+ * Selects the next worker in a round robin fashion.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
+ implements IWorkerChoiceStrategy<Worker> {
+ /**
+ * Index for the next worker.
+ */
+ private nextWorkerIndex: number = 0
+
+ /**
+ * Constructs a worker choice strategy that selects in a round robin fashion.
+ *
+ * @param pool The pool instance.
+ */
+ public constructor (
+ private readonly pool: IPoolInternal<Worker, Data, Response>
+ ) {}
+
+ /** @inheritdoc */
+ public choose (): Worker {
+ const chosenWorker = this.pool.workers[this.nextWorkerIndex]
+ this.nextWorkerIndex =
+ this.pool.workers.length - 1 === this.nextWorkerIndex
+ ? 0
+ : this.nextWorkerIndex + 1
+ return chosenWorker
+ }
+}
+
+/**
+ * Selects the less recently used worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+class LessRecentlyUsedWorkerChoiceStrategy<
+ Worker extends IWorker,
+ Data,
+ Response
+> implements IWorkerChoiceStrategy<Worker> {
+ /**
+ * Constructs a worker choice strategy that selects based on less recently used.
+ *
+ * @param pool The pool instance.
+ */
+ public constructor (
+ private readonly pool: IPoolInternal<Worker, Data, Response>
+ ) {}
+
+ /** @inheritdoc */
+ public choose (): Worker {
+ let minNumberOfTasks = Infinity
+ // A worker is always found because it picks the one with fewer tasks
+ let lessRecentlyUsedWorker!: Worker
+ for (const [worker, numberOfTasks] of this.pool.tasks) {
+ if (numberOfTasks === 0) {
+ return worker
+ } else if (numberOfTasks < minNumberOfTasks) {
+ minNumberOfTasks = numberOfTasks
+ lessRecentlyUsedWorker = worker
+ }
+ }
+ return lessRecentlyUsedWorker
+ }
+}
+
+/**
+ * Get the worker choice strategy instance.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance.
+ */
+function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> (
+ pool: IPoolInternal<Worker, Data, Response>,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+): IWorkerChoiceStrategy<Worker> {
+ switch (workerChoiceStrategy) {
+ case WorkerChoiceStrategies.ROUND_ROBIN:
+ return new RoundRobinWorkerChoiceStrategy(pool)
+ case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+ return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+ default:
+ throw new Error(
+ `Worker choice strategy '${workerChoiceStrategy}' not found`
+ )
+ }
+}
+
+/**
+ * Dynamically choose a worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
+ implements IWorkerChoiceStrategy<Worker> {
+ private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
+
+ /**
+ * Constructs a worker choice strategy for dynamical pools.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy when the pull is full.
+ */
+ public constructor (
+ private readonly pool: IPoolInternal<Worker, Data, Response>,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ) {
+ this.workerChoiceStrategy = getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
+ }
+
+ /**
+ * Find a free worker based on number of tasks the worker has applied.
+ *
+ * If a worker was found that has `0` tasks, it is detected as free and will be returned.
+ *
+ * If no free worker was found, `null` will be returned.
+ *
+ * @returns A free worker if there was one, otherwise `null`.
+ */
+ private findFreeWorkerBasedOnTasks (): Worker | null {
+ for (const [worker, numberOfTasks] of this.pool.tasks) {
+ if (numberOfTasks === 0) {
+ // A worker is free, use it
+ return worker
+ }
+ }
+ return null
+ }
+
+ /** @inheritdoc */
+ public choose (): Worker {
+ const freeWorker = this.findFreeWorkerBasedOnTasks()
+ if (freeWorker) {
+ return freeWorker
+ }
+
+ if (this.pool.workers.length === this.pool.max) {
+ this.pool.emitter.emit('FullPool')
+ return this.workerChoiceStrategy.choose()
+ }
+
+ // All workers are busy, create a new worker
+ const workerCreated = this.pool.createAndSetupWorker()
+ this.pool.registerWorkerMessageListener(workerCreated, message => {
+ const tasksInProgress = this.pool.tasks.get(workerCreated)
+ if (
+ isKillBehavior(KillBehaviors.HARD, message.kill) ||
+ tasksInProgress === 0
+ ) {
+ // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
+ void this.pool.destroyWorker(workerCreated)
+ }
+ })
+ return workerCreated
+ }
+}
+
+/**
+ * The worker choice strategy context.
+ *
+ * @template Worker Type of worker.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class WorkerChoiceStrategyContext<
+ Worker extends IWorker,
+ Data,
+ Response
+> {
+ // Will be set by setter in constructor
+ private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
+
+ /**
+ * Worker choice strategy context constructor.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy.
+ */
+ public constructor (
+ private readonly pool: IPoolInternal<Worker, Data, Response>,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ) {
+ this.setWorkerChoiceStrategy(workerChoiceStrategy)
+ }
+
+ /**
+ * Get the worker choice strategy instance specific to the pool type.
+ *
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance for the pool type.
+ */
+ private getPoolWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ): IWorkerChoiceStrategy<Worker> {
+ if (this.pool.isDynamic()) {
+ return new DynamicPoolWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
+ }
+ return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
+ }
+
+ /**
+ * Set the worker choice strategy to use in the context.
+ *
+ * @param workerChoiceStrategy The worker choice strategy to set.
+ */
+ public setWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
+ workerChoiceStrategy
+ )
+ }
+
+ /**
+ * Choose a worker with the underlying selection strategy.
+ *
+ * @returns The chosen one.
+ */
+ public execute (): Worker {
+ return this.workerChoiceStrategy.choose()
+ }
+}
-import { isKillBehavior, KillBehaviors } from '../../worker/worker-options'
import type { PoolOptions } from '../abstract-pool'
import type { ThreadWorkerWithMessageChannel } from './fixed'
import { FixedThreadPool } from './fixed'
super(min, filePath, opts)
}
- /**
- * Choose a thread for the next task.
- *
- * It will first check for and return an idle thread.
- * If all threads are busy, then it will try to create a new one up to the `max` thread count.
- * If the max thread count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load.
- *
- * @returns Thread worker.
- */
- protected chooseWorker (): ThreadWorkerWithMessageChannel {
- for (const [worker, numberOfTasks] of this.tasks) {
- if (numberOfTasks === 0) {
- // A worker is free, use it
- return worker
- }
- }
-
- if (this.workers.length === this.max) {
- this.emitter.emit('FullPool')
- return super.chooseWorker()
- }
-
- // All workers are busy, create a new worker
- const workerCreated = this.createAndSetupWorker()
- this.registerWorkerMessageListener<Data>(workerCreated, message => {
- const tasksInProgress = this.tasks.get(workerCreated)
- if (
- isKillBehavior(KillBehaviors.HARD, message.kill) ||
- tasksInProgress === 0
- ) {
- // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
- void this.destroyWorker(workerCreated)
- }
- })
- return workerCreated
+ /** @inheritdoc */
+ public isDynamic (): boolean {
+ return true
}
}
return isMainThread
}
- protected async destroyWorker (
+ /** @inheritdoc */
+ public async destroyWorker (
worker: ThreadWorkerWithMessageChannel
): Promise<void> {
this.sendToWorker(worker, { kill: 1 })
worker.postMessage(message)
}
- protected registerWorkerMessageListener<Message extends Data | Response> (
+ /** @inheritdoc */
+ public registerWorkerMessageListener<Message extends Data | Response> (
messageChannel: ThreadWorkerWithMessageChannel,
listener: (message: MessageValue<Message>) => void
): void {
worker.postMessage({ parent: port1 }, [port1])
worker.port1 = port1
worker.port2 = port2
- // we will attach a listener for every task,
+ // We will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
}
*
* @param fn The function that should be defined.
*/
- private checkFunctionInput (fn: (data: Data) => Response) {
+ private checkFunctionInput (fn: (data: Data) => Response): void {
if (!fn) throw new Error('fn parameter is mandatory')
}
}
}
-describe('Abstract pool test suite ', () => {
+describe('Abstract pool test suite', () => {
it('Simulate worker not found during increaseWorkersTask', () => {
const pool = new StubPoolWithTasksMapClear(
1,
}
)
-describe('Dynamic cluster pool test suite ', () => {
+describe('Dynamic cluster pool test suite', () => {
it('Verify that the function is executed in a worker cluster', async () => {
const result = await pool.execute({ test: 'test' })
expect(result).toBeDefined()
}
)
-describe('Fixed cluster pool test suite ', () => {
+describe('Fixed cluster pool test suite', () => {
after('Destroy all pools', async () => {
// We need to clean up the resources after our test
await echoPool.destroy()
--- /dev/null
+const expect = require('expect')
+const {
+ WorkerChoiceStrategies,
+ DynamicThreadPool,
+ FixedThreadPool
+} = require('../../lib/index')
+const TestUtils = require('../test-utils')
+
+describe('Selection strategies test suite', () => {
+ it('Verify that WorkerChoiceStrategies enumeration provides string values', () => {
+ expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
+ expect(WorkerChoiceStrategies.LESS_RECENTLY_USED).toBe('LESS_RECENTLY_USED')
+ })
+
+ it('Verify LESS_RECENTLY_USED is taken', async () => {
+ const max = 3
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
+ )
+
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.LESS_RECENTLY_USED
+ )
+
+ // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ await Promise.all(promises)
+
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify unknown strategies throw error', () => {
+ const min = 1
+ const max = 3
+ expect(
+ () =>
+ new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ {
+ maxTasks: 1000,
+ workerChoiceStrategy: 'UNKNOWN_STRATEGY'
+ }
+ )
+ ).toThrowError(
+ new Error("Worker choice strategy 'UNKNOWN_STRATEGY' not found")
+ )
+ })
+})
}
)
-describe('Dynamic thread pool test suite ', () => {
+describe('Dynamic thread pool test suite', () => {
it('Verify that the function is executed in a worker thread', async () => {
const result = await pool.execute({ test: 'test' })
expect(result).toBeDefined()
{ maxTasks: maxTasks }
)
-describe('Fixed thread pool test suite ', () => {
+describe('Fixed thread pool test suite', () => {
it('Choose worker round robin test', async () => {
const results = new Set()
for (let i = 0; i < numberOfThreads; i++) {