'comparator',
'ecma',
'enum',
+ 'fibonacci',
'inheritdoc',
'jsdoc',
'poolifier',
## [2.2.1] - 2022-05-01
--
+### Added
+
+- Dynamic worker choice strategy change at runtime.
## [2.2.0] - 2022-05-01
/**
* Intentionally inefficient implementation.
*
- * @param {*} n
+ * @param {number} n
* @returns {number}
*/
function fibonacci (n) {
-export type {
- ErrorHandler,
- ExitHandler,
- IWorker,
- OnlineHandler,
- PoolOptions
-} from './pools/abstract-pool'
export { DynamicClusterPool } from './pools/cluster/dynamic'
export { FixedClusterPool } from './pools/cluster/fixed'
export type { ClusterPoolOptions } from './pools/cluster/fixed'
-export type { IPool } from './pools/pool'
-export { WorkerChoiceStrategies } from './pools/selection-strategies'
-export type { WorkerChoiceStrategy } from './pools/selection-strategies'
+export type { IPool, PoolOptions } from './pools/pool'
+export type {
+ ErrorHandler,
+ ExitHandler,
+ IPoolWorker,
+ OnlineHandler
+} from './pools/pool-worker'
+export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
+export type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types'
export { DynamicThreadPool } from './pools/thread/dynamic'
export { FixedThreadPool } from './pools/thread/fixed'
export type { ThreadWorkerWithMessageChannel } from './pools/thread/fixed'
--- /dev/null
+import type {
+ ErrorHandler,
+ ExitHandler,
+ IPoolWorker,
+ MessageHandler,
+ OnlineHandler
+} from './pool-worker'
+
+/**
+ * Basic class that implement the minimum required for a pool worker.
+ */
+export abstract class AbstractPoolWorker implements IPoolWorker {
+ /** @inheritdoc */
+ abstract on (event: 'message', handler: MessageHandler<this>): void
+ /** @inheritdoc */
+ abstract on (event: 'error', handler: ErrorHandler<this>): void
+ /** @inheritdoc */
+ abstract on (event: 'online', handler: OnlineHandler<this>): void
+ /** @inheritdoc */
+ abstract on (event: 'exit', handler: ExitHandler<this>): void
+ /** @inheritdoc */
+ abstract once (event: 'exit', handler: ExitHandler<this>): void
+}
} from '../utility-types'
import { EMPTY_FUNCTION } from '../utils'
import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
+import type { PoolOptions } from './pool'
import type { IPoolInternal } from './pool-internal'
import { PoolEmitter, PoolType } from './pool-internal'
-import type { WorkerChoiceStrategy } from './selection-strategies'
import {
WorkerChoiceStrategies,
- WorkerChoiceStrategyContext
-} from './selection-strategies'
-
-/**
- * Callback invoked if the worker has received a message.
- */
-export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
-
-/**
- * Callback invoked if the worker raised an error.
- */
-export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
-
-/**
- * Callback invoked when the worker has started successfully.
- */
-export type OnlineHandler<Worker> = (this: Worker) => void
-
-/**
- * Callback invoked when the worker exits successfully.
- */
-export type ExitHandler<Worker> = (this: Worker, code: number) => void
-
-/**
- * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
- */
-export interface IWorker {
- /**
- * Register a listener to the message event.
- *
- * @param event `'message'`.
- * @param handler The message handler.
- */
- on(event: 'message', handler: MessageHandler<this>): void
- /**
- * Register a listener to the error event.
- *
- * @param event `'error'`.
- * @param handler The error handler.
- */
- on(event: 'error', handler: ErrorHandler<this>): void
- /**
- * Register a listener to the online event.
- *
- * @param event `'online'`.
- * @param handler The online handler.
- */
- on(event: 'online', handler: OnlineHandler<this>): void
- /**
- * Register a listener to the exit event.
- *
- * @param event `'exit'`.
- * @param handler The exit handler.
- */
- on(event: 'exit', handler: ExitHandler<this>): void
- /**
- * Register a listener to the exit event that will only performed once.
- *
- * @param event `'exit'`.
- * @param handler The exit handler.
- */
- once(event: 'exit', handler: ExitHandler<this>): void
-}
-
-/**
- * Options for a poolifier pool.
- */
-export interface PoolOptions<Worker> {
- /**
- * A function that will listen for message event on each worker.
- */
- messageHandler?: MessageHandler<Worker>
- /**
- * A function that will listen for error event on each worker.
- */
- errorHandler?: ErrorHandler<Worker>
- /**
- * A function that will listen for online event on each worker.
- */
- onlineHandler?: OnlineHandler<Worker>
- /**
- * A function that will listen for exit event on each worker.
- */
- exitHandler?: ExitHandler<Worker>
- /**
- * The work choice strategy to use in this pool.
- */
- workerChoiceStrategy?: WorkerChoiceStrategy
- /**
- * Pool events emission.
- *
- * @default true
- */
- enableEvents?: boolean
-}
+ WorkerChoiceStrategy
+} from './selection-strategies/selection-strategies-types'
+import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
/**
* Base class containing some shared logic for all poolifier pools.
* @template Response Type of response of execution. This can only be serializable data.
*/
export abstract class AbstractPool<
- Worker extends IWorker,
+ Worker extends AbstractPoolWorker,
Data = unknown,
Response = unknown
> implements IPoolInternal<Worker, Data, Response> {
this.registerWorkerMessageListener(workerCreated, message => {
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
- this.tasks.get(workerCreated) === 0
+ this.getWorkerRunningTasks(workerCreated) === 0
) {
// Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
this.destroyWorker(workerCreated) as void
return this.promiseMap.size
}
+ /** @inheritdoc */
+ public getWorkerRunningTasks (worker: Worker): number | undefined {
+ return this.tasks.get(worker)
+ }
+
+ /** @inheritdoc */
+ public getWorkerIndex (worker: Worker): number {
+ return this.workers.indexOf(worker)
+ }
+
/** @inheritdoc */
public setWorkerChoiceStrategy (
workerChoiceStrategy: WorkerChoiceStrategy
protected internalGetBusyStatus (): boolean {
return (
this.numberOfRunningTasks >= this.numberOfWorkers &&
- this.findFreeTasksMapEntry() === false
+ this.findFreeWorker() === false
)
}
/** @inheritdoc */
- public findFreeTasksMapEntry (): [Worker, number] | false {
- for (const [worker, numberOfTasks] of this.tasks) {
- if (numberOfTasks === 0) {
- // A worker is free, return the matching tasks map entry
- return [worker, numberOfTasks]
+ public findFreeWorker (): Worker | false {
+ for (const worker of this.workers) {
+ if (this.getWorkerRunningTasks(worker) === 0) {
+ // A worker is free, return the matching worker
+ return worker
}
}
return false
*/
protected removeWorker (worker: Worker): void {
// Clean worker from data structure
- const workerIndex = this.workers.indexOf(worker)
- this.workers.splice(workerIndex, 1)
+ this.workers.splice(this.getWorkerIndex(worker), 1)
this.tasks.delete(worker)
}
import type { Worker } from 'cluster'
import cluster from 'cluster'
import type { MessageValue } from '../../utility-types'
-import type { PoolOptions } from '../abstract-pool'
import { AbstractPool } from '../abstract-pool'
+import type { PoolOptions } from '../pool'
import { PoolType } from '../pool-internal'
/**
import EventEmitter from 'events'
-import type { IWorker } from './abstract-pool'
+import type { AbstractPoolWorker } from './abstract-pool-worker'
import type { IPool } from './pool'
/**
* @template Response Type of response of execution.
*/
export interface IPoolInternal<
- Worker extends IWorker,
+ Worker extends AbstractPoolWorker,
Data = unknown,
Response = unknown
> extends IPool<Data, Response> {
readonly numberOfRunningTasks: number
/**
- * Find a tasks map entry with a free worker based on the number of tasks the worker has applied.
+ * Find a free worker based on the number of tasks the worker has applied.
*
- * If an entry is found with a worker that has `0` tasks, it is detected as free.
+ * If a worker is found with `0` running tasks, it is detected as free and returned.
*
- * If no tasks map entry with a free worker was found, `false` will be returned.
+ * If no free worker is found, `false` is returned.
*
- * @returns A tasks map entry with a free worker if there was one, otherwise `false`.
+ * @returns A free worker if there is one, otherwise `false`.
*/
- findFreeTasksMapEntry(): [Worker, number] | false
+ findFreeWorker(): Worker | false
+
+ /**
+ * Get worker index.
+ *
+ * @param worker The worker.
+ * @returns The worker index.
+ */
+ getWorkerIndex(worker: Worker): number
+
+ /**
+ * Get worker running tasks.
+ *
+ * @param worker The worker.
+ * @returns The number of tasks currently running on the worker.
+ */
+ getWorkerRunningTasks(worker: Worker): number | undefined
}
--- /dev/null
+import type { Worker as ClusterWorker } from 'cluster'
+import type { Worker as WorkerThread } from 'worker_threads'
+import type { Draft } from '../utility-types'
+
+/**
+ * Poolifier supported worker type.
+ */
+export type WorkerType = WorkerThread & ClusterWorker & Draft<MessageChannel>
+
+/**
+ * Callback invoked if the worker has received a message.
+ */
+export type MessageHandler<Worker> = (this: Worker, m: unknown) => void
+
+/**
+ * Callback invoked if the worker raised an error.
+ */
+export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
+
+/**
+ * Callback invoked when the worker has started successfully.
+ */
+export type OnlineHandler<Worker> = (this: Worker) => void
+
+/**
+ * Callback invoked when the worker exits successfully.
+ */
+export type ExitHandler<Worker> = (this: Worker, code: number) => void
+
+/**
+ * Basic interface that describes the minimum required implementation of listener events for a pool worker.
+ */
+export interface IPoolWorker {
+ /**
+ * Worker identifier.
+ */
+ readonly id?: number
+ /**
+ * Register a listener to the message event.
+ *
+ * @param event `'message'`.
+ * @param handler The message handler.
+ */
+ on(event: 'message', handler: MessageHandler<this>): void
+ /**
+ * Register a listener to the error event.
+ *
+ * @param event `'error'`.
+ * @param handler The error handler.
+ */
+ on(event: 'error', handler: ErrorHandler<this>): void
+ /**
+ * Register a listener to the online event.
+ *
+ * @param event `'online'`.
+ * @param handler The online handler.
+ */
+ on(event: 'online', handler: OnlineHandler<this>): void
+ /**
+ * Register a listener to the exit event.
+ *
+ * @param event `'exit'`.
+ * @param handler The exit handler.
+ */
+ on(event: 'exit', handler: ExitHandler<this>): void
+ /**
+ * Register a listener to the exit event that will only performed once.
+ *
+ * @param event `'exit'`.
+ * @param handler The exit handler.
+ */
+ once(event: 'exit', handler: ExitHandler<this>): void
+}
-import type { WorkerChoiceStrategy } from './selection-strategies'
+import type {
+ ErrorHandler,
+ ExitHandler,
+ MessageHandler,
+ OnlineHandler
+} from './pool-worker'
+import type { WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types'
+
+/**
+ * Options for a poolifier pool.
+ */
+export interface PoolOptions<Worker> {
+ /**
+ * A function that will listen for message event on each worker.
+ */
+ messageHandler?: MessageHandler<Worker>
+ /**
+ * A function that will listen for error event on each worker.
+ */
+ errorHandler?: ErrorHandler<Worker>
+ /**
+ * A function that will listen for online event on each worker.
+ */
+ onlineHandler?: OnlineHandler<Worker>
+ /**
+ * A function that will listen for exit event on each worker.
+ */
+ exitHandler?: ExitHandler<Worker>
+ /**
+ * The work choice strategy to use in this pool.
+ */
+ workerChoiceStrategy?: WorkerChoiceStrategy
+ /**
+ * Pool events emission.
+ *
+ * @default true
+ */
+ enableEvents?: boolean
+}
/**
* Contract definition for a poolifier pool.
+++ /dev/null
-import type { IWorker } from './abstract-pool'
-import type { IPoolInternal } from './pool-internal'
-import { PoolType } from './pool-internal'
-
-/**
- * Enumeration of worker choice strategies.
- */
-export const WorkerChoiceStrategies = Object.freeze({
- /**
- * Round robin worker selection strategy.
- */
- ROUND_ROBIN: 'ROUND_ROBIN',
- /**
- * Less recently used worker selection strategy.
- */
- LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
-} as const)
-
-/**
- * Worker choice strategy.
- */
-export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
-
-/**
- * Worker choice strategy interface.
- *
- * @template Worker Type of worker which manages the strategy.
- */
-interface IWorkerChoiceStrategy<Worker extends IWorker> {
- /**
- * Choose a worker in the pool.
- */
- choose(): Worker
-}
-
-/**
- * Selects the next worker in a round robin fashion.
- *
- * @template Worker Type of worker which manages the strategy.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-class RoundRobinWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
- implements IWorkerChoiceStrategy<Worker> {
- /**
- * Index for the next worker.
- */
- private nextWorkerIndex: number = 0
-
- /**
- * Constructs a worker choice strategy that selects in a round robin fashion.
- *
- * @param pool The pool instance.
- */
- public constructor (
- private readonly pool: IPoolInternal<Worker, Data, Response>
- ) {}
-
- /** @inheritdoc */
- public choose (): Worker {
- const chosenWorker = this.pool.workers[this.nextWorkerIndex]
- this.nextWorkerIndex =
- this.nextWorkerIndex === this.pool.workers.length - 1
- ? 0
- : this.nextWorkerIndex + 1
- return chosenWorker
- }
-}
-
-/**
- * Selects the less recently used worker.
- *
- * @template Worker Type of worker which manages the strategy.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-class LessRecentlyUsedWorkerChoiceStrategy<
- Worker extends IWorker,
- Data,
- Response
-> implements IWorkerChoiceStrategy<Worker> {
- /**
- * Constructs a worker choice strategy that selects based on less recently used.
- *
- * @param pool The pool instance.
- */
- public constructor (
- private readonly pool: IPoolInternal<Worker, Data, Response>
- ) {}
-
- /** @inheritdoc */
- public choose (): Worker {
- const isPoolDynamic = this.pool.type === PoolType.DYNAMIC
- let minNumberOfTasks = Infinity
- // A worker is always found because it picks the one with fewer tasks
- let lessRecentlyUsedWorker!: Worker
- for (const [worker, numberOfTasks] of this.pool.tasks) {
- if (!isPoolDynamic && numberOfTasks === 0) {
- return worker
- } else if (numberOfTasks < minNumberOfTasks) {
- lessRecentlyUsedWorker = worker
- minNumberOfTasks = numberOfTasks
- }
- }
- return lessRecentlyUsedWorker
- }
-}
-
-/**
- * Dynamically choose a worker.
- *
- * @template Worker Type of worker which manages the strategy.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
- implements IWorkerChoiceStrategy<Worker> {
- private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
-
- /**
- * Constructs a worker choice strategy for dynamical pools.
- *
- * @param pool The pool instance.
- * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
- * @param workerChoiceStrategy The worker choice strategy when the pull is busy.
- */
- public constructor (
- private readonly pool: IPoolInternal<Worker, Data, Response>,
- private createDynamicallyWorkerCallback: () => Worker,
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ) {
- this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
- this.pool,
- workerChoiceStrategy
- )
- }
-
- /** @inheritdoc */
- public choose (): Worker {
- const freeTaskMapEntry = this.pool.findFreeTasksMapEntry()
- if (freeTaskMapEntry) {
- return freeTaskMapEntry[0]
- }
-
- if (this.pool.busy) {
- return this.workerChoiceStrategy.choose()
- }
-
- // All workers are busy, create a new worker
- return this.createDynamicallyWorkerCallback()
- }
-}
-
-/**
- * The worker choice strategy context.
- *
- * @template Worker Type of worker.
- * @template Data Type of data sent to the worker. This can only be serializable data.
- * @template Response Type of response of execution. This can only be serializable data.
- */
-export class WorkerChoiceStrategyContext<
- Worker extends IWorker,
- Data,
- Response
-> {
- // Will be set by setter in constructor
- private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
-
- /**
- * Worker choice strategy context constructor.
- *
- * @param pool The pool instance.
- * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
- * @param workerChoiceStrategy The worker choice strategy.
- */
- public constructor (
- private readonly pool: IPoolInternal<Worker, Data, Response>,
- private createDynamicallyWorkerCallback: () => Worker,
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ) {
- this.setWorkerChoiceStrategy(workerChoiceStrategy)
- }
-
- /**
- * Get the worker choice strategy instance specific to the pool type.
- *
- * @param workerChoiceStrategy The worker choice strategy.
- * @returns The worker choice strategy instance for the pool type.
- */
- private getPoolWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ): IWorkerChoiceStrategy<Worker> {
- if (this.pool.type === PoolType.DYNAMIC) {
- return new DynamicPoolWorkerChoiceStrategy(
- this.pool,
- this.createDynamicallyWorkerCallback,
- workerChoiceStrategy
- )
- }
- return SelectionStrategiesUtils.getWorkerChoiceStrategy(
- this.pool,
- workerChoiceStrategy
- )
- }
-
- /**
- * Set the worker choice strategy to use in the context.
- *
- * @param workerChoiceStrategy The worker choice strategy to set.
- */
- public setWorkerChoiceStrategy (
- workerChoiceStrategy: WorkerChoiceStrategy
- ): void {
- this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
- workerChoiceStrategy
- )
- }
-
- /**
- * Choose a worker with the underlying selection strategy.
- *
- * @returns The chosen one.
- */
- public execute (): Worker {
- return this.workerChoiceStrategy.choose()
- }
-}
-
-/**
- * Worker selection strategies helpers class.
- */
-class SelectionStrategiesUtils {
- /**
- * Get the worker choice strategy instance.
- *
- * @param pool The pool instance.
- * @param workerChoiceStrategy The worker choice strategy.
- * @returns The worker choice strategy instance.
- */
- public static getWorkerChoiceStrategy<
- Worker extends IWorker,
- Data,
- Response
- > (
- pool: IPoolInternal<Worker, Data, Response>,
- workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
- ): IWorkerChoiceStrategy<Worker> {
- switch (workerChoiceStrategy) {
- case WorkerChoiceStrategies.ROUND_ROBIN:
- return new RoundRobinWorkerChoiceStrategy(pool)
- case WorkerChoiceStrategies.LESS_RECENTLY_USED:
- return new LessRecentlyUsedWorkerChoiceStrategy(pool)
- default:
- throw new Error(
- // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
- `Worker choice strategy '${workerChoiceStrategy}' not found`
- )
- }
- }
-}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { PoolType } from '../pool-internal'
+import type { IWorkerChoiceStrategy } from './selection-strategies-types'
+
+/**
+ * Abstract worker choice strategy class.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export abstract class AbstractWorkerChoiceStrategy<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+> implements IWorkerChoiceStrategy<Worker> {
+ /** @inheritdoc */
+ public isDynamicPool: boolean = this.pool.type === PoolType.DYNAMIC
+
+ /**
+ * Constructs a worker choice strategy attached to the pool.
+ *
+ * @param pool The pool instance.
+ */
+ public constructor (
+ protected readonly pool: IPoolInternal<Worker, Data, Response>
+ ) {}
+
+ /** @inheritdoc */
+ public abstract choose (): Worker
+}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type {
+ IWorkerChoiceStrategy,
+ WorkerChoiceStrategy
+} from './selection-strategies-types'
+import { WorkerChoiceStrategies } from './selection-strategies-types'
+import { SelectionStrategiesUtils } from './selection-strategies-utils'
+
+/**
+ * Dynamically choose a worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class DynamicPoolWorkerChoiceStrategy<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ private workerChoiceStrategy: IWorkerChoiceStrategy<Worker>
+
+ /**
+ * Constructs a worker choice strategy for dynamical pool.
+ *
+ * @param pool The pool instance.
+ * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
+ * @param workerChoiceStrategy The worker choice strategy when the pull is busy.
+ */
+ public constructor (
+ pool: IPoolInternal<Worker, Data, Response>,
+ private createDynamicallyWorkerCallback: () => Worker,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ) {
+ super(pool)
+ this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
+ }
+
+ /** @inheritdoc */
+ public choose (): Worker {
+ const freeWorker = this.pool.findFreeWorker()
+ if (freeWorker) {
+ return freeWorker
+ }
+
+ if (this.pool.busy) {
+ return this.workerChoiceStrategy.choose()
+ }
+
+ // All workers are busy, create a new worker
+ return this.createDynamicallyWorkerCallback()
+ }
+}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+
+/**
+ * Selects the less recently used worker.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class LessRecentlyUsedWorkerChoiceStrategy<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /** @inheritdoc */
+ public choose (): Worker {
+ let minNumberOfRunningTasks = Infinity
+ // A worker is always found because it picks the one with fewer tasks
+ let lessRecentlyUsedWorker!: Worker
+ for (const worker of this.pool.workers) {
+ const workerRunningTasks = this.pool.getWorkerRunningTasks(worker)
+ if (!this.isDynamicPool && workerRunningTasks === 0) {
+ return worker
+ } else if (
+ workerRunningTasks !== undefined &&
+ workerRunningTasks < minNumberOfRunningTasks
+ ) {
+ lessRecentlyUsedWorker = worker
+ minNumberOfRunningTasks = workerRunningTasks
+ }
+ }
+ return lessRecentlyUsedWorker
+ }
+}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+
+/**
+ * Selects the next worker in a round robin fashion.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class RoundRobinWorkerChoiceStrategy<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
+ /**
+ * Index for the next worker.
+ */
+ private nextWorkerIndex: number = 0
+
+ /** @inheritdoc */
+ public choose (): Worker {
+ const chosenWorker = this.pool.workers[this.nextWorkerIndex]
+ this.nextWorkerIndex =
+ this.nextWorkerIndex === this.pool.workers.length - 1
+ ? 0
+ : this.nextWorkerIndex + 1
+ return chosenWorker
+ }
+}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+
+/**
+ * Enumeration of worker choice strategies.
+ */
+export const WorkerChoiceStrategies = Object.freeze({
+ /**
+ * Round robin worker selection strategy.
+ */
+ ROUND_ROBIN: 'ROUND_ROBIN',
+ /**
+ * Less recently used worker selection strategy.
+ */
+ LESS_RECENTLY_USED: 'LESS_RECENTLY_USED'
+} as const)
+
+/**
+ * Worker choice strategy.
+ */
+export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
+
+/**
+ * Worker choice strategy interface.
+ *
+ * @template Worker Type of worker which manages the strategy.
+ */
+export interface IWorkerChoiceStrategy<Worker extends AbstractPoolWorker> {
+ /**
+ * Is the pool attached to the strategy dynamic?.
+ */
+ isDynamicPool: boolean
+ /**
+ * Choose a worker in the pool.
+ */
+ choose(): Worker
+}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { LessRecentlyUsedWorkerChoiceStrategy } from './less-recently-used-worker-choice-strategy'
+import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
+import type {
+ IWorkerChoiceStrategy,
+ WorkerChoiceStrategy
+} from './selection-strategies-types'
+import { WorkerChoiceStrategies } from './selection-strategies-types'
+
+/**
+ * Worker selection strategies helpers class.
+ */
+export class SelectionStrategiesUtils {
+ /**
+ * Get the worker choice strategy instance.
+ *
+ * @param pool The pool instance.
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance.
+ */
+ public static getWorkerChoiceStrategy<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+ > (
+ pool: IPoolInternal<Worker, Data, Response>,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ): IWorkerChoiceStrategy<Worker> {
+ switch (workerChoiceStrategy) {
+ case WorkerChoiceStrategies.ROUND_ROBIN:
+ return new RoundRobinWorkerChoiceStrategy(pool)
+ case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+ return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+ default:
+ throw new Error(
+ // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
+ `Worker choice strategy '${workerChoiceStrategy}' not found`
+ )
+ }
+ }
+}
--- /dev/null
+import type { AbstractPoolWorker } from '../abstract-pool-worker'
+import type { IPoolInternal } from '../pool-internal'
+import { PoolType } from '../pool-internal'
+import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy'
+import type {
+ IWorkerChoiceStrategy,
+ WorkerChoiceStrategy
+} from './selection-strategies-types'
+import { WorkerChoiceStrategies } from './selection-strategies-types'
+import { SelectionStrategiesUtils } from './selection-strategies-utils'
+
+/**
+ * The worker choice strategy context.
+ *
+ * @template Worker Type of worker.
+ * @template Data Type of data sent to the worker. This can only be serializable data.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export class WorkerChoiceStrategyContext<
+ Worker extends AbstractPoolWorker,
+ Data,
+ Response
+> {
+ // Will be set by setter in constructor
+ private workerChoiceStrategy!: IWorkerChoiceStrategy<Worker>
+
+ /**
+ * Worker choice strategy context constructor.
+ *
+ * @param pool The pool instance.
+ * @param createDynamicallyWorkerCallback The worker creation callback for dynamic pool.
+ * @param workerChoiceStrategy The worker choice strategy.
+ */
+ public constructor (
+ private readonly pool: IPoolInternal<Worker, Data, Response>,
+ private createDynamicallyWorkerCallback: () => Worker,
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ) {
+ this.setWorkerChoiceStrategy(workerChoiceStrategy)
+ }
+
+ /**
+ * Get the worker choice strategy instance specific to the pool type.
+ *
+ * @param workerChoiceStrategy The worker choice strategy.
+ * @returns The worker choice strategy instance for the pool type.
+ */
+ private getPoolWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+ ): IWorkerChoiceStrategy<Worker> {
+ if (this.pool.type === PoolType.DYNAMIC) {
+ return new DynamicPoolWorkerChoiceStrategy(
+ this.pool,
+ this.createDynamicallyWorkerCallback,
+ workerChoiceStrategy
+ )
+ }
+ return SelectionStrategiesUtils.getWorkerChoiceStrategy(
+ this.pool,
+ workerChoiceStrategy
+ )
+ }
+
+ /**
+ * Set the worker choice strategy to use in the context.
+ *
+ * @param workerChoiceStrategy The worker choice strategy to set.
+ */
+ public setWorkerChoiceStrategy (
+ workerChoiceStrategy: WorkerChoiceStrategy
+ ): void {
+ this.workerChoiceStrategy = this.getPoolWorkerChoiceStrategy(
+ workerChoiceStrategy
+ )
+ }
+
+ /**
+ * Choose a worker with the underlying selection strategy.
+ *
+ * @returns The chosen one.
+ */
+ public execute (): Worker {
+ return this.workerChoiceStrategy.choose()
+ }
+}
-import type { PoolOptions } from '../abstract-pool'
+import type { PoolOptions } from '../pool'
import { PoolType } from '../pool-internal'
import type { ThreadWorkerWithMessageChannel } from './fixed'
import { FixedThreadPool } from './fixed'
import { isMainThread, MessageChannel, SHARE_ENV, Worker } from 'worker_threads'
import type { Draft, MessageValue } from '../../utility-types'
-import type { PoolOptions } from '../abstract-pool'
import { AbstractPool } from '../abstract-pool'
+import type { PoolOptions } from '../pool'
import { PoolType } from '../pool-internal'
/**
import type { Worker as ClusterWorker } from 'cluster'
import type { MessagePort } from 'worker_threads'
-import type { IWorker } from './pools/abstract-pool'
+import type { AbstractPoolWorker } from './pools/abstract-pool-worker'
import type { KillBehavior } from './worker/worker-options'
/**
* @template Response Type of response of execution. This can only be serializable data.
*/
export interface PromiseWorkerResponseWrapper<
- Worker extends IWorker,
+ Worker extends AbstractPoolWorker,
Response = unknown
> {
/**
await pool.destroy()
})
+ it('Verify ROUND_ROBIN strategy can be run in a fixed pool', async () => {
+ const max = 3
+ const pool = new FixedThreadPool(
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
+ )
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.ROUND_ROBIN
+ )
+ // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ await Promise.all(promises)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
+ it('Verify ROUND_ROBIN strategy can be run in a dynamic pool', async () => {
+ const min = 0
+ const max = 3
+ const pool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker-files/thread/testWorker.js',
+ { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
+ )
+ expect(pool.opts.workerChoiceStrategy).toBe(
+ WorkerChoiceStrategies.ROUND_ROBIN
+ )
+ // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
+ const promises = []
+ for (let i = 0; i < max * 2; i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ await Promise.all(promises)
+ // We need to clean up the resources after our test
+ await pool.destroy()
+ })
+
it('Verify LESS_RECENTLY_USED strategy is taken at pool creation', async () => {
const max = 3
const pool = new FixedThreadPool(
for (let i = 0; i < max * 2; i++) {
promises.push(pool.execute({ test: 'test' }))
}
- expect(pool.workers.length).toBe(max)
+ expect(pool.workers.length).toBeLessThanOrEqual(max)
+ expect(pool.workers.length).toBeGreaterThan(min)
// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
/**
* Intentionally inefficient implementation.
*
- * @param {*} n
+ * @param {number} n
* @returns {number}
*/
static fibonacci (n) {
/**
* Intentionally inefficient implementation.
*
- * @param {*} n
+ * @param {number} n
* @returns {number}
*/
static factorial (n) {
"outDir": "lib",
"esModuleInterop": true,
"declaration": true,
- "strict": true
+ "strict": true,
+ "importsNotUsedAsValues": "error"
},
"include": ["src/**/*.ts"],
"exclude": ["node_modules"]