From: Jérôme Benoit Date: Sat, 8 Apr 2023 19:26:07 +0000 (+0200) Subject: feat: add tasks queue to pool data structure X-Git-Tag: v2.4.5~13 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=f06e48d8e14dcfe3277bd16b1bd2463136af13e6;p=poolifier.git feat: add tasks queue to pool data structure Signed-off-by: Jérôme Benoit --- diff --git a/CHANGELOG.md b/CHANGELOG.md index b2e14ab3..9b515d21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Use monotonic high resolution timer for worker tasks run time. - Add worker tasks median run time to statistics. +- Add worker tasks queue. ## [2.4.4] - 2023-04-07 diff --git a/src/index.ts b/src/index.ts index 349af12c..ebab6452 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,7 +8,7 @@ export type { ExitHandler, MessageHandler, OnlineHandler -} from './pools/pool-worker' +} from './pools/worker' export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types' export type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types' export { DynamicThreadPool } from './pools/thread/dynamic' diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 75b5cf79..0fb695fb 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -4,9 +4,9 @@ import { EMPTY_FUNCTION, median } from '../utils' import { KillBehaviors, isKillBehavior } from '../worker/worker-options' import { PoolEvents, type PoolOptions } from './pool' import { PoolEmitter } from './pool' -import type { IPoolInternal, TasksUsage, WorkerType } from './pool-internal' +import type { IPoolInternal } from './pool-internal' import { PoolType } from './pool-internal' -import type { IPoolWorker } from './pool-worker' +import type { IWorker, Task, TasksUsage, WorkerNode } from './worker' import { WorkerChoiceStrategies, type WorkerChoiceStrategy @@ -22,12 +22,12 @@ import { CircularArray } from '../circular-array' * @typeParam Response - Type of response of execution. This can only be serializable data. */ export abstract class AbstractPool< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > implements IPoolInternal { /** @inheritDoc */ - public readonly workers: Array> = [] + public readonly workerNodes: Array> = [] /** @inheritDoc */ public readonly emitter?: PoolEmitter @@ -152,13 +152,15 @@ export abstract class AbstractPool< } /** - * Gets the given worker key. + * Gets the given worker its worker node key. * * @param worker - The worker. - * @returns The worker key if the worker is found in the pool, `-1` otherwise. + * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise. */ - private getWorkerKey (worker: Worker): number { - return this.workers.findIndex(workerItem => workerItem.worker === worker) + private getWorkerNodeKey (worker: Worker): number { + return this.workerNodes.findIndex( + workerNode => workerNode.worker === worker + ) } /** @inheritDoc */ @@ -167,16 +169,21 @@ export abstract class AbstractPool< ): void { this.checkValidWorkerChoiceStrategy(workerChoiceStrategy) this.opts.workerChoiceStrategy = workerChoiceStrategy - for (const [index, workerItem] of this.workers.entries()) { - this.setWorker(index, workerItem.worker, { - run: 0, - running: 0, - runTime: 0, - runTimeHistory: new CircularArray(), - avgRunTime: 0, - medRunTime: 0, - error: 0 - }) + for (const [index, workerNode] of this.workerNodes.entries()) { + this.setWorkerNode( + index, + workerNode.worker, + { + run: 0, + running: 0, + runTime: 0, + runTimeHistory: new CircularArray(), + avgRunTime: 0, + medRunTime: 0, + error: 0 + }, + workerNode.tasksQueue + ) } this.workerChoiceStrategyContext.setWorkerChoiceStrategy( workerChoiceStrategy @@ -192,22 +199,22 @@ export abstract class AbstractPool< protected internalBusy (): boolean { return ( this.numberOfRunningTasks >= this.numberOfWorkers && - this.findFreeWorkerKey() === -1 + this.findFreeWorkerNodeKey() === -1 ) } /** @inheritDoc */ - public findFreeWorkerKey (): number { - return this.workers.findIndex(workerItem => { - return workerItem.tasksUsage.running === 0 + public findFreeWorkerNodeKey (): number { + return this.workerNodes.findIndex(workerNode => { + return workerNode.tasksUsage?.running === 0 }) } /** @inheritDoc */ public async execute (data: Data): Promise { - const [workerKey, worker] = this.chooseWorker() + const [workerNodeKey, worker] = this.chooseWorker() const messageId = crypto.randomUUID() - const res = this.internalExecute(workerKey, worker, messageId) + const res = this.internalExecute(workerNodeKey, worker, messageId) this.checkAndEmitFull() this.checkAndEmitBusy() this.sendToWorker(worker, { @@ -222,22 +229,21 @@ export abstract class AbstractPool< /** @inheritDoc */ public async destroy (): Promise { await Promise.all( - this.workers.map(async workerItem => { - await this.destroyWorker(workerItem.worker) + this.workerNodes.map(async workerNode => { + await this.destroyWorker(workerNode.worker) }) ) } /** - * Shutdowns given worker in the pool. + * Shutdowns the given worker. * - * @param worker - A worker within `workers`. + * @param worker - A worker within `workerNodes`. */ protected abstract destroyWorker (worker: Worker): void | Promise /** - * Setup hook that can be overridden by a Poolifier pool implementation - * to run code before workers are created in the abstract constructor. + * Setup hook to run code before worker node are created in the abstract constructor. * Can be overridden * * @virtual @@ -255,10 +261,10 @@ export abstract class AbstractPool< * Hook executed before the worker task promise resolution. * Can be overridden. * - * @param workerKey - The worker key. + * @param workerNodeKey - The worker node key. */ - protected beforePromiseResponseHook (workerKey: number): void { - ++this.workers[workerKey].tasksUsage.running + protected beforePromiseResponseHook (workerNodeKey: number): void { + ++this.workerNodes[workerNodeKey].tasksUsage.running } /** @@ -295,18 +301,18 @@ export abstract class AbstractPool< } /** - * Chooses a worker for the next task. + * Chooses a worker node for the next task. * * The default uses a round robin algorithm to distribute the load. * - * @returns [worker key, worker]. + * @returns [worker node key, worker]. */ protected chooseWorker (): [number, Worker] { - let workerKey: number + let workerNodeKey: number if ( this.type === PoolType.DYNAMIC && !this.full && - this.findFreeWorkerKey() === -1 + this.findFreeWorkerNodeKey() === -1 ) { const createdWorker = this.createAndSetupWorker() this.registerWorkerMessageListener(createdWorker, message => { @@ -319,11 +325,11 @@ export abstract class AbstractPool< void this.destroyWorker(createdWorker) } }) - workerKey = this.getWorkerKey(createdWorker) + workerNodeKey = this.getWorkerNodeKey(createdWorker) } else { - workerKey = this.workerChoiceStrategyContext.execute() + workerNodeKey = this.workerChoiceStrategyContext.execute() } - return [workerKey, this.workers[workerKey].worker] + return [workerNodeKey, this.workerNodes[workerNodeKey].worker] } /** @@ -338,7 +344,7 @@ export abstract class AbstractPool< ): void /** - * Registers a listener callback on a given worker. + * Registers a listener callback on the given worker. * * @param worker - The worker which should register a listener. * @param listener - The message listener callback. @@ -353,17 +359,16 @@ export abstract class AbstractPool< protected abstract createWorker (): Worker /** - * Function that can be hooked up when a worker has been newly created and moved to the workers registry. + * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes. * * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default. * * @param worker - The newly created worker. - * @virtual */ protected abstract afterWorkerSetup (worker: Worker): void /** - * Creates a new worker for this pool and sets it up completely. + * Creates a new worker and sets it up completely in the pool worker nodes. * * @returns New, completely set up worker. */ @@ -375,18 +380,10 @@ export abstract class AbstractPool< worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { - this.removeWorker(worker) + this.removeWorkerNode(worker) }) - this.pushWorker(worker, { - run: 0, - running: 0, - runTime: 0, - runTimeHistory: new CircularArray(), - avgRunTime: 0, - medRunTime: 0, - error: 0 - }) + this.pushWorkerNode(worker) this.afterWorkerSetup(worker) @@ -417,11 +414,11 @@ export abstract class AbstractPool< } private async internalExecute ( - workerKey: number, + workerNodeKey: number, worker: Worker, messageId: string ): Promise { - this.beforePromiseResponseHook(workerKey) + this.beforePromiseResponseHook(workerNodeKey) return await new Promise((resolve, reject) => { this.promiseResponseMap.set(messageId, { resolve, reject, worker }) }) @@ -444,58 +441,70 @@ export abstract class AbstractPool< } /** - * Gets the given worker tasks usage in the pool. + * Gets the given worker its tasks usage in the pool. * * @param worker - The worker. * @returns The worker tasks usage. */ private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined { - const workerKey = this.getWorkerKey(worker) - if (workerKey !== -1) { - return this.workers[workerKey].tasksUsage + const workerNodeKey = this.getWorkerNodeKey(worker) + if (workerNodeKey !== -1) { + return this.workerNodes[workerNodeKey].tasksUsage } - throw new Error('Worker could not be found in the pool') + throw new Error('Worker could not be found in the pool worker nodes') } /** - * Pushes the given worker in the pool. + * Pushes the given worker in the pool worker nodes. * * @param worker - The worker. - * @param tasksUsage - The worker tasks usage. + * @returns The worker nodes length. */ - private pushWorker (worker: Worker, tasksUsage: TasksUsage): void { - this.workers.push({ + private pushWorkerNode (worker: Worker): number { + return this.workerNodes.push({ worker, - tasksUsage + tasksUsage: { + run: 0, + running: 0, + runTime: 0, + runTimeHistory: new CircularArray(), + avgRunTime: 0, + medRunTime: 0, + error: 0 + }, + tasksQueue: [] }) } /** - * Sets the given worker in the pool. + * Sets the given worker in the pool worker nodes. * - * @param workerKey - The worker key. + * @param workerNodeKey - The worker node key. * @param worker - The worker. * @param tasksUsage - The worker tasks usage. + * @param tasksQueue - The worker task queue. */ - private setWorker ( - workerKey: number, + private setWorkerNode ( + workerNodeKey: number, worker: Worker, - tasksUsage: TasksUsage + tasksUsage: TasksUsage, + tasksQueue: Array> ): void { - this.workers[workerKey] = { + this.workerNodes[workerNodeKey] = { worker, - tasksUsage + tasksUsage, + tasksQueue } } /** - * Removes the given worker from the pool. + * Removes the given worker from the pool worker nodes. * - * @param worker - The worker that will be removed. + * @param worker - The worker. */ - protected removeWorker (worker: Worker): void { - const workerKey = this.getWorkerKey(worker) - this.workers.splice(workerKey, 1) - this.workerChoiceStrategyContext.remove(workerKey) + protected removeWorkerNode (worker: Worker): void { + const workerNodeKey = this.getWorkerNodeKey(worker) + this.workerNodes.splice(workerNodeKey, 1) + this.workerChoiceStrategyContext.remove(workerNodeKey) } } diff --git a/src/pools/cluster/dynamic.ts b/src/pools/cluster/dynamic.ts index b84a80ce..9f9abb01 100644 --- a/src/pools/cluster/dynamic.ts +++ b/src/pools/cluster/dynamic.ts @@ -41,11 +41,11 @@ export class DynamicClusterPool< /** @inheritDoc */ public get full (): boolean { - return this.workers.length === this.max + return this.workerNodes.length === this.max } /** @inheritDoc */ public get busy (): boolean { - return this.full && this.findFreeWorkerKey() === -1 + return this.full && this.findFreeWorkerNodeKey() === -1 } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 78de7073..eef394ec 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -102,7 +102,7 @@ export class FixedClusterPool< /** @inheritDoc */ public get full (): boolean { - return this.workers.length === this.numberOfWorkers + return this.workerNodes.length === this.numberOfWorkers } /** @inheritDoc */ diff --git a/src/pools/pool-internal.ts b/src/pools/pool-internal.ts index bf799581..8912dd2c 100644 --- a/src/pools/pool-internal.ts +++ b/src/pools/pool-internal.ts @@ -1,6 +1,5 @@ -import type { CircularArray } from '../circular-array' import type { IPool } from './pool' -import type { IPoolWorker } from './pool-worker' +import type { IWorker, WorkerNode } from './worker' /** * Internal pool types. @@ -12,29 +11,6 @@ export enum PoolType { DYNAMIC = 'dynamic' } -/** - * Internal tasks usage statistics. - */ -export interface TasksUsage { - run: number - running: number - runTime: number - runTimeHistory: CircularArray - avgRunTime: number - medRunTime: number - error: number -} - -/** - * Internal worker type. - * - * @typeParam Worker - Type of worker type items which manages this pool. - */ -export interface WorkerType { - worker: Worker - tasksUsage: TasksUsage -} - /** * Internal contract definition for a poolifier pool. * @@ -43,14 +19,14 @@ export interface WorkerType { * @typeParam Response - Type of response of execution. This can only be serializable data. */ export interface IPoolInternal< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > extends IPool { /** - * Pool worker type items array. + * Pool worker nodes. */ - readonly workers: Array> + readonly workerNodes: Array> /** * Pool type. @@ -74,13 +50,13 @@ export interface IPoolInternal< readonly busy: boolean /** - * Finds a free worker key based on the number of tasks the worker has applied. + * Finds a free worker node key based on the number of tasks the worker has applied. * - * If a worker is found with `0` running tasks, it is detected as free and its key is returned. + * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned. * * If no free worker is found, `-1` is returned. * - * @returns A worker key if there is one, `-1` otherwise. + * @returns A worker node key if there is one, `-1` otherwise. */ - findFreeWorkerKey: () => number + findFreeWorkerNodeKey: () => number } diff --git a/src/pools/pool.ts b/src/pools/pool.ts index f1da5356..60aceb78 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -4,7 +4,7 @@ import type { ExitHandler, MessageHandler, OnlineHandler -} from './pool-worker' +} from './worker' import type { WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types' /** diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 8ab3d360..337b50b1 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -1,6 +1,6 @@ import type { IPoolInternal } from '../pool-internal' import { PoolType } from '../pool-internal' -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import type { IWorkerChoiceStrategy, RequiredStatistics @@ -14,7 +14,7 @@ import type { * @typeParam Response - Type of response of execution. This can only be serializable data. */ export abstract class AbstractWorkerChoiceStrategy< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > implements IWorkerChoiceStrategy { @@ -46,5 +46,5 @@ export abstract class AbstractWorkerChoiceStrategy< public abstract choose (): number /** @inheritDoc */ - public abstract remove (workerKey: number): boolean + public abstract remove (workerNodeKey: number): boolean } diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index e16c1ea2..c2f85f18 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, @@ -22,7 +22,7 @@ interface WorkerVirtualTaskTimestamp { * @typeParam Response - Type of response of execution. This can only be serializable data. */ export class FairShareWorkerChoiceStrategy< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > @@ -36,7 +36,7 @@ export class FairShareWorkerChoiceStrategy< } /** - * Worker last virtual task execution timestamp. + * Worker last virtual task execution timestamp. */ private readonly workerLastVirtualTaskTimestamp: Map< number, @@ -52,8 +52,8 @@ export class FairShareWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number { let minWorkerVirtualTaskEndTimestamp = Infinity - let chosenWorkerKey!: number - for (const [index] of this.pool.workers.entries()) { + let chosenWorkerNodeKey!: number + for (const [index] of this.pool.workerNodes.entries()) { this.computeWorkerLastVirtualTaskTimestamp(index) const workerLastVirtualTaskEndTimestamp = this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0 @@ -61,38 +61,38 @@ export class FairShareWorkerChoiceStrategy< workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp ) { minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp - chosenWorkerKey = index + chosenWorkerNodeKey = index } } - return chosenWorkerKey + return chosenWorkerNodeKey } /** @inheritDoc */ - public remove (workerKey: number): boolean { - const workerDeleted = this.workerLastVirtualTaskTimestamp.delete(workerKey) + public remove (workerNodeKey: number): boolean { + const deleted = this.workerLastVirtualTaskTimestamp.delete(workerNodeKey) for (const [key, value] of this.workerLastVirtualTaskTimestamp.entries()) { - if (key > workerKey) { + if (key > workerNodeKey) { this.workerLastVirtualTaskTimestamp.set(key - 1, value) } } - return workerDeleted + return deleted } /** * Computes worker last virtual task timestamp. * - * @param workerKey - The worker key. + * @param workerNodeKey - The worker node key. */ - private computeWorkerLastVirtualTaskTimestamp (workerKey: number): void { + private computeWorkerLastVirtualTaskTimestamp (workerNodeKey: number): void { const workerVirtualTaskStartTimestamp = Math.max( performance.now(), - this.workerLastVirtualTaskTimestamp.get(workerKey)?.end ?? -Infinity + this.workerLastVirtualTaskTimestamp.get(workerNodeKey)?.end ?? -Infinity ) - this.workerLastVirtualTaskTimestamp.set(workerKey, { + this.workerLastVirtualTaskTimestamp.set(workerNodeKey, { start: workerVirtualTaskStartTimestamp, end: workerVirtualTaskStartTimestamp + - (this.pool.workers[workerKey].tasksUsage.avgRunTime ?? 0) + (this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime ?? 0) }) } } diff --git a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts index c03c9da0..d98732d6 100644 --- a/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-busy-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, @@ -13,7 +13,7 @@ import type { * @typeParam Response - Type of response of execution. This can only be serializable data. */ export class LessBusyWorkerChoiceStrategy< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > @@ -33,26 +33,26 @@ export class LessBusyWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number { - const freeWorkerKey = this.pool.findFreeWorkerKey() - if (freeWorkerKey !== -1) { - return freeWorkerKey + const freeWorkerNodeKey = this.pool.findFreeWorkerNodeKey() + if (freeWorkerNodeKey !== -1) { + return freeWorkerNodeKey } let minRunTime = Infinity - let lessBusyWorkerKey!: number - for (const [index, workerItem] of this.pool.workers.entries()) { - const workerRunTime = workerItem.tasksUsage.runTime + let lessBusyWorkerNodeKey!: number + for (const [index, workerNode] of this.pool.workerNodes.entries()) { + const workerRunTime = workerNode.tasksUsage.runTime if (workerRunTime === 0) { return index } else if (workerRunTime < minRunTime) { minRunTime = workerRunTime - lessBusyWorkerKey = index + lessBusyWorkerNodeKey = index } } - return lessBusyWorkerKey + return lessBusyWorkerNodeKey } /** @inheritDoc */ - public remove (workerKey: number): boolean { + public remove (workerNodeKey: number): boolean { return true } } diff --git a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts index 364c54a1..acf1e503 100644 --- a/src/pools/selection-strategies/less-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/less-used-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy } from './selection-strategies-types' @@ -10,7 +10,7 @@ import type { IWorkerChoiceStrategy } from './selection-strategies-types' * @typeParam Response - Type of response of execution. This can only be serializable data. */ export class LessUsedWorkerChoiceStrategy< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > @@ -23,27 +23,27 @@ export class LessUsedWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number { - const freeWorkerKey = this.pool.findFreeWorkerKey() - if (freeWorkerKey !== -1) { - return freeWorkerKey + const freeWorkerNodeKey = this.pool.findFreeWorkerNodeKey() + if (freeWorkerNodeKey !== -1) { + return freeWorkerNodeKey } let minNumberOfTasks = Infinity - let lessUsedWorkerKey!: number - for (const [index, workerItem] of this.pool.workers.entries()) { - const tasksUsage = workerItem.tasksUsage + let lessUsedWorkerNodeKey!: number + for (const [index, workerNode] of this.pool.workerNodes.entries()) { + const tasksUsage = workerNode.tasksUsage const workerTasks = tasksUsage.run + tasksUsage.running if (workerTasks === 0) { return index } else if (workerTasks < minNumberOfTasks) { minNumberOfTasks = workerTasks - lessUsedWorkerKey = index + lessUsedWorkerNodeKey = index } } - return lessUsedWorkerKey + return lessUsedWorkerNodeKey } /** @inheritDoc */ - public remove (workerKey: number): boolean { + public remove (workerNodeKey: number): boolean { return true } } diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index 29b05fa5..eda00c75 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -1,4 +1,4 @@ -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy } from './selection-strategies-types' @@ -10,43 +10,43 @@ import type { IWorkerChoiceStrategy } from './selection-strategies-types' * @typeParam Response - Type of response of execution. This can only be serializable data. */ export class RoundRobinWorkerChoiceStrategy< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > extends AbstractWorkerChoiceStrategy implements IWorkerChoiceStrategy { /** - * Id of the next worker. + * Id of the next worker node. */ - private nextWorkerId: number = 0 + private nextWorkerNodeId: number = 0 /** @inheritDoc */ public reset (): boolean { - this.nextWorkerId = 0 + this.nextWorkerNodeId = 0 return true } /** @inheritDoc */ public choose (): number { - const chosenWorkerKey = this.nextWorkerId - this.nextWorkerId = - this.nextWorkerId === this.pool.workers.length - 1 + const chosenWorkerNodeKey = this.nextWorkerNodeId + this.nextWorkerNodeId = + this.nextWorkerNodeId === this.pool.workerNodes.length - 1 ? 0 - : this.nextWorkerId + 1 - return chosenWorkerKey + : this.nextWorkerNodeId + 1 + return chosenWorkerNodeKey } /** @inheritDoc */ - public remove (workerKey: number): boolean { - if (this.nextWorkerId === workerKey) { - if (this.pool.workers.length === 0) { - this.nextWorkerId = 0 + public remove (workerNodeKey: number): boolean { + if (this.nextWorkerNodeId === workerNodeKey) { + if (this.pool.workerNodes.length === 0) { + this.nextWorkerNodeId = 0 } else { - this.nextWorkerId = - this.nextWorkerId > this.pool.workers.length - 1 - ? this.pool.workers.length - 1 - : this.nextWorkerId + this.nextWorkerNodeId = + this.nextWorkerNodeId > this.pool.workerNodes.length - 1 + ? this.pool.workerNodes.length - 1 + : this.nextWorkerNodeId } } return true diff --git a/src/pools/selection-strategies/selection-strategies-types.ts b/src/pools/selection-strategies/selection-strategies-types.ts index 22071734..f5f06a6e 100644 --- a/src/pools/selection-strategies/selection-strategies-types.ts +++ b/src/pools/selection-strategies/selection-strategies-types.ts @@ -43,7 +43,7 @@ export interface RequiredStatistics { */ export interface IWorkerChoiceStrategy { /** - * Required pool tasks usage statistics. + * Required tasks usage statistics. */ readonly requiredStatistics: RequiredStatistics /** @@ -51,13 +51,13 @@ export interface IWorkerChoiceStrategy { */ reset: () => boolean /** - * Chooses a worker in the pool and returns its key. + * Chooses a worker node in the pool and returns its key. */ choose: () => number /** - * Removes a worker reference from strategy internals. + * Removes a worker node key from strategy internals. * - * @param workerKey - The worker key. + * @param workerNodeKey - The worker node key. */ - remove: (workerKey: number) => boolean + remove: (workerNodeKey: number) => boolean } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index 96b5867b..1bdc02fb 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -1,6 +1,6 @@ import { cpus } from 'node:os' import type { IPoolInternal } from '../pool-internal' -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy' import type { IWorkerChoiceStrategy, @@ -24,7 +24,7 @@ interface TaskRunTime { * @typeParam Response - Type of response of execution. This can only be serializable data. */ export class WeightedRoundRobinWorkerChoiceStrategy< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > @@ -38,15 +38,15 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } /** - * Worker id where the current task will be submitted. + * Worker node id where the current task will be submitted. */ - private currentWorkerId: number = 0 + private currentWorkerNodeId: number = 0 /** * Default worker weight. */ private readonly defaultWorkerWeight: number /** - * Per worker virtual task runtime map. + * Workers' virtual task runtime. */ private readonly workersTaskRunTime: Map = new Map< number, @@ -66,7 +66,7 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public reset (): boolean { - this.currentWorkerId = 0 + this.currentWorkerNodeId = 0 this.workersTaskRunTime.clear() this.initWorkersTaskRunTime() return true @@ -74,76 +74,79 @@ export class WeightedRoundRobinWorkerChoiceStrategy< /** @inheritDoc */ public choose (): number { - const chosenWorkerKey = this.currentWorkerId - if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) { - this.initWorkerTaskRunTime(chosenWorkerKey) + const chosenWorkerNodeKey = this.currentWorkerNodeId + if ( + this.isDynamicPool && + !this.workersTaskRunTime.has(chosenWorkerNodeKey) + ) { + this.initWorkerTaskRunTime(chosenWorkerNodeKey) } const workerTaskRunTime = - this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0 + this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0 const workerTaskWeight = - this.workersTaskRunTime.get(chosenWorkerKey)?.weight ?? + this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ?? this.defaultWorkerWeight if (workerTaskRunTime < workerTaskWeight) { this.setWorkerTaskRunTime( - chosenWorkerKey, + chosenWorkerNodeKey, workerTaskWeight, workerTaskRunTime + - (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0) + (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0) ) } else { - this.currentWorkerId = - this.currentWorkerId === this.pool.workers.length - 1 + this.currentWorkerNodeId = + this.currentWorkerNodeId === this.pool.workerNodes.length - 1 ? 0 - : this.currentWorkerId + 1 - this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0) + : this.currentWorkerNodeId + 1 + this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0) } - return chosenWorkerKey + return chosenWorkerNodeKey } /** @inheritDoc */ - public remove (workerKey: number): boolean { - if (this.currentWorkerId === workerKey) { - if (this.pool.workers.length === 0) { - this.currentWorkerId = 0 + public remove (workerNodeKey: number): boolean { + if (this.currentWorkerNodeId === workerNodeKey) { + if (this.pool.workerNodes.length === 0) { + this.currentWorkerNodeId = 0 } else { - this.currentWorkerId = - this.currentWorkerId > this.pool.workers.length - 1 - ? this.pool.workers.length - 1 - : this.currentWorkerId + this.currentWorkerNodeId = + this.currentWorkerNodeId > this.pool.workerNodes.length - 1 + ? this.pool.workerNodes.length - 1 + : this.currentWorkerNodeId } } - const workerDeleted = this.workersTaskRunTime.delete(workerKey) + const deleted = this.workersTaskRunTime.delete(workerNodeKey) for (const [key, value] of this.workersTaskRunTime) { - if (key > workerKey) { + if (key > workerNodeKey) { this.workersTaskRunTime.set(key - 1, value) } } - return workerDeleted + return deleted } private initWorkersTaskRunTime (): void { - for (const [index] of this.pool.workers.entries()) { + for (const [index] of this.pool.workerNodes.entries()) { this.initWorkerTaskRunTime(index) } } - private initWorkerTaskRunTime (workerKey: number): void { - this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0) + private initWorkerTaskRunTime (workerNodeKey: number): void { + this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0) } private setWorkerTaskRunTime ( - workerKey: number, + workerNodeKey: number, weight: number, runTime: number ): void { - this.workersTaskRunTime.set(workerKey, { + this.workersTaskRunTime.set(workerNodeKey, { weight, runTime }) } - private getWorkerVirtualTaskRunTime (workerKey: number): number { - return this.pool.workers[workerKey].tasksUsage.avgRunTime + private getWorkerVirtualTaskRunTime (workerNodeKey: number): number { + return this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime } private computeWorkerWeight (): number { diff --git a/src/pools/selection-strategies/worker-choice-strategy-context.ts b/src/pools/selection-strategies/worker-choice-strategy-context.ts index 192cf005..0e5fd546 100644 --- a/src/pools/selection-strategies/worker-choice-strategy-context.ts +++ b/src/pools/selection-strategies/worker-choice-strategy-context.ts @@ -1,5 +1,5 @@ import type { IPoolInternal } from '../pool-internal' -import type { IPoolWorker } from '../pool-worker' +import type { IWorker } from '../worker' import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy' import { LessBusyWorkerChoiceStrategy } from './less-busy-worker-choice-strategy' import { LessUsedWorkerChoiceStrategy } from './less-used-worker-choice-strategy' @@ -20,7 +20,7 @@ import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-w * @typeParam Response - Type of response of execution. This can only be serializable data. */ export class WorkerChoiceStrategyContext< - Worker extends IPoolWorker, + Worker extends IWorker, Data = unknown, Response = unknown > { @@ -97,7 +97,7 @@ export class WorkerChoiceStrategyContext< /** * Executes the worker choice strategy algorithm in the context. * - * @returns The key of the chosen one. + * @returns The key of the worker node. */ public execute (): number { return ( @@ -108,16 +108,16 @@ export class WorkerChoiceStrategyContext< } /** - * Removes a worker from the worker choice strategy in the context. + * Removes a worker node key from the worker choice strategy in the context. * - * @param workerKey - The key of the worker to remove. + * @param workerNodeKey - The key of the worker node. * @returns `true` if the removal is successful, `false` otherwise. */ - public remove (workerKey: number): boolean { + public remove (workerNodeKey: number): boolean { return ( this.workerChoiceStrategies.get( this.workerChoiceStrategyType ) as IWorkerChoiceStrategy - ).remove(workerKey) + ).remove(workerNodeKey) } } diff --git a/src/pools/thread/dynamic.ts b/src/pools/thread/dynamic.ts index d8cc3a74..e1565d00 100644 --- a/src/pools/thread/dynamic.ts +++ b/src/pools/thread/dynamic.ts @@ -42,11 +42,11 @@ export class DynamicThreadPool< /** @inheritDoc */ public get full (): boolean { - return this.workers.length === this.max + return this.workerNodes.length === this.max } /** @inheritDoc */ public get busy (): boolean { - return this.full && this.findFreeWorkerKey() === -1 + return this.full && this.findFreeWorkerNodeKey() === -1 } } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index a95a8603..bc665bf1 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -98,7 +98,7 @@ export class FixedThreadPool< /** @inheritDoc */ public get full (): boolean { - return this.workers.length === this.numberOfWorkers + return this.workerNodes.length === this.numberOfWorkers } /** @inheritDoc */ diff --git a/src/pools/pool-worker.ts b/src/pools/worker.ts similarity index 64% rename from src/pools/pool-worker.ts rename to src/pools/worker.ts index e371e9c1..4ff2c494 100644 --- a/src/pools/pool-worker.ts +++ b/src/pools/worker.ts @@ -1,3 +1,5 @@ +import type { CircularArray } from '../circular-array' + /** * Callback invoked if the worker has received a message. */ @@ -19,9 +21,30 @@ export type OnlineHandler = (this: Worker) => void export type ExitHandler = (this: Worker, code: number) => void /** - * Interface that describes the minimum required implementation of listener events for a pool worker. + * Worker task interface. */ -export interface IPoolWorker { +export interface Task { + data: Data + id: string +} + +/** + * Worker tasks usage statistics. + */ +export interface TasksUsage { + run: number + running: number + runTime: number + runTimeHistory: CircularArray + avgRunTime: number + medRunTime: number + error: number +} + +/** + * Worker interface. + */ +export interface IWorker { /** * Register an event listener. * @@ -40,3 +63,12 @@ export interface IPoolWorker { */ once: (event: 'exit', handler: ExitHandler) => void } + +/** + * Worker node interface. + */ +export interface WorkerNode { + worker: Worker + tasksUsage: TasksUsage + tasksQueue: Array> +} diff --git a/src/utility-types.ts b/src/utility-types.ts index 05af4e3d..458b31a0 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,7 +1,7 @@ import type { Worker as ClusterWorker } from 'node:cluster' import type { MessagePort } from 'node:worker_threads' import type { KillBehavior } from './worker/worker-options' -import type { IPoolWorker } from './pools/pool-worker' +import type { IWorker } from './pools/worker' /** * Make all properties in T non-readonly. @@ -50,7 +50,7 @@ export interface MessageValue< * @typeParam Response - Type of execution response. This can only be serializable data. */ export interface PromiseResponseWrapper< - Worker extends IPoolWorker, + Worker extends IWorker, Response = unknown > { /** diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index 5063c7fd..54646e3a 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -11,7 +11,7 @@ const { CircularArray } = require('../../../lib/circular-array') describe('Abstract pool test suite', () => { const numberOfWorkers = 1 const workerNotFoundInPoolError = new Error( - 'Worker could not be found in the pool' + 'Worker could not be found in the pool worker nodes' ) class StubPoolWithRemoveAllWorker extends FixedThreadPool { removeAllWorker () { @@ -141,15 +141,29 @@ describe('Abstract pool test suite', () => { numberOfWorkers, './tests/worker-files/cluster/testWorker.js' ) - for (const workerItem of pool.workers) { - expect(workerItem.tasksUsage).toBeDefined() - expect(workerItem.tasksUsage.run).toBe(0) - expect(workerItem.tasksUsage.running).toBe(0) - expect(workerItem.tasksUsage.runTime).toBe(0) - expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) - expect(workerItem.tasksUsage.avgRunTime).toBe(0) - expect(workerItem.tasksUsage.medRunTime).toBe(0) - expect(workerItem.tasksUsage.error).toBe(0) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksUsage).toBeDefined() + expect(workerNode.tasksUsage.run).toBe(0) + expect(workerNode.tasksUsage.running).toBe(0) + expect(workerNode.tasksUsage.runTime).toBe(0) + expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) + expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0) + expect(workerNode.tasksUsage.avgRunTime).toBe(0) + expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.tasksUsage.error).toBe(0) + } + await pool.destroy() + }) + + it('Verify that worker pool tasks queue are initialized', async () => { + const pool = new FixedClusterPool( + numberOfWorkers, + './tests/worker-files/cluster/testWorker.js' + ) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksQueue).toBeDefined() + expect(workerNode.tasksQueue).toBeInstanceOf(Array) + expect(workerNode.tasksQueue.length).toBe(0) } await pool.destroy() }) @@ -163,26 +177,28 @@ describe('Abstract pool test suite', () => { for (let i = 0; i < numberOfWorkers * 2; i++) { promises.push(pool.execute()) } - for (const workerItem of pool.workers) { - expect(workerItem.tasksUsage).toBeDefined() - expect(workerItem.tasksUsage.run).toBe(0) - expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2) - expect(workerItem.tasksUsage.runTime).toBe(0) - expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) - expect(workerItem.tasksUsage.avgRunTime).toBe(0) - expect(workerItem.tasksUsage.medRunTime).toBe(0) - expect(workerItem.tasksUsage.error).toBe(0) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksUsage).toBeDefined() + expect(workerNode.tasksUsage.run).toBe(0) + expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2) + expect(workerNode.tasksUsage.runTime).toBe(0) + expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) + expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0) + expect(workerNode.tasksUsage.avgRunTime).toBe(0) + expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.tasksUsage.error).toBe(0) } await Promise.all(promises) - for (const workerItem of pool.workers) { - expect(workerItem.tasksUsage).toBeDefined() - expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2) - expect(workerItem.tasksUsage.running).toBe(0) - expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) - expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) - expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) - expect(workerItem.tasksUsage.medRunTime).toBe(0) - expect(workerItem.tasksUsage.error).toBe(0) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksUsage).toBeDefined() + expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2) + expect(workerNode.tasksUsage.running).toBe(0) + expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) + expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0) + expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.tasksUsage.error).toBe(0) } await pool.destroy() }) @@ -198,26 +214,28 @@ describe('Abstract pool test suite', () => { promises.push(pool.execute()) } await Promise.all(promises) - for (const workerItem of pool.workers) { - expect(workerItem.tasksUsage).toBeDefined() - expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2) - expect(workerItem.tasksUsage.running).toBe(0) - expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0) - expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) - expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) - expect(workerItem.tasksUsage.medRunTime).toBe(0) - expect(workerItem.tasksUsage.error).toBe(0) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksUsage).toBeDefined() + expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2) + expect(workerNode.tasksUsage.running).toBe(0) + expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0) + expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) + expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0) + expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0) + expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.tasksUsage.error).toBe(0) } pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const workerItem of pool.workers) { - expect(workerItem.tasksUsage).toBeDefined() - expect(workerItem.tasksUsage.run).toBe(0) - expect(workerItem.tasksUsage.running).toBe(0) - expect(workerItem.tasksUsage.runTime).toBe(0) - expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) - expect(workerItem.tasksUsage.avgRunTime).toBe(0) - expect(workerItem.tasksUsage.medRunTime).toBe(0) - expect(workerItem.tasksUsage.error).toBe(0) + for (const workerNode of pool.workerNodes) { + expect(workerNode.tasksUsage).toBeDefined() + expect(workerNode.tasksUsage.run).toBe(0) + expect(workerNode.tasksUsage.running).toBe(0) + expect(workerNode.tasksUsage.runTime).toBe(0) + expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray) + expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0) + expect(workerNode.tasksUsage.avgRunTime).toBe(0) + expect(workerNode.tasksUsage.medRunTime).toBe(0) + expect(workerNode.tasksUsage.error).toBe(0) } await pool.destroy() }) diff --git a/tests/pools/cluster/dynamic.test.js b/tests/pools/cluster/dynamic.test.js index 74cb5d8f..33a5ab8c 100644 --- a/tests/pools/cluster/dynamic.test.js +++ b/tests/pools/cluster/dynamic.test.js @@ -32,8 +32,8 @@ describe('Dynamic cluster pool test suite', () => { for (let i = 0; i < max * 2; i++) { pool.execute() } - expect(pool.workers.length).toBeLessThanOrEqual(max) - expect(pool.workers.length).toBeGreaterThan(min) + expect(pool.workerNodes.length).toBeLessThanOrEqual(max) + expect(pool.workerNodes.length).toBeGreaterThan(min) // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) @@ -42,19 +42,19 @@ describe('Dynamic cluster pool test suite', () => { }) it('Verify scale worker up and down is working', async () => { - expect(pool.workers.length).toBe(min) + expect(pool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.length).toBeGreaterThan(min) + expect(pool.workerNodes.length).toBeGreaterThan(min) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.length).toBe(min) + expect(pool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.length).toBeGreaterThan(min) + expect(pool.workerNodes.length).toBeGreaterThan(min) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.length).toBe(min) + expect(pool.workerNodes.length).toBe(min) }) it('Shutdown test', async () => { @@ -93,13 +93,13 @@ describe('Dynamic cluster pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.length).toBe(min) + expect(longRunningPool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.length).toBe(max) + expect(longRunningPool.workerNodes.length).toBe(max) await TestUtils.waitExits(longRunningPool, max - min) - expect(longRunningPool.workers.length).toBe(min) + expect(longRunningPool.workerNodes.length).toBe(min) // We need to clean up the resources after our test await longRunningPool.destroy() }) @@ -115,14 +115,14 @@ describe('Dynamic cluster pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.length).toBe(min) + expect(longRunningPool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.length).toBe(max) + expect(longRunningPool.workerNodes.length).toBe(max) await TestUtils.sleep(1500) - // Here we expect the workers to be at the max size since the task is still running - expect(longRunningPool.workers.length).toBe(max) + // Here we expect the workerNodes to be at the max size since the task is still running + expect(longRunningPool.workerNodes.length).toBe(max) // We need to clean up the resources after our test await longRunningPool.destroy() }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 5a7394a5..796c84ba 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -45,7 +45,7 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerId + ).nextWorkerNodeId ).toBe(0) // We need to clean up the resources after our test await pool.destroy() @@ -168,13 +168,13 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerId + ).nextWorkerNodeId ).toBeDefined() pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerId + ).nextWorkerNodeId ).toBe(0) await pool.destroy() pool = new DynamicThreadPool( @@ -186,13 +186,13 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerId + ).nextWorkerNodeId ).toBeDefined() pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.ROUND_ROBIN - ).nextWorkerId + ).nextWorkerNodeId ).toBe(0) // We need to clean up the resources after our test await pool.destroy() @@ -395,18 +395,18 @@ describe('Selection strategies test suite', () => { expect(pool.opts.workerChoiceStrategy).toBe( WorkerChoiceStrategies.FAIR_SHARE ) - for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) .workerLastVirtualTaskTimestamp.keys()) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) - .workerLastVirtualTaskTimestamp.get(workerKey).start + .workerLastVirtualTaskTimestamp.get(workerNodeKey).start ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) - .workerLastVirtualTaskTimestamp.get(workerKey).end + .workerLastVirtualTaskTimestamp.get(workerNodeKey).end ).toBe(0) } // We need to clean up the resources after our test @@ -477,7 +477,7 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.FAIR_SHARE ).workerLastVirtualTaskTimestamp.size - ).toBe(pool.workers.length) + ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) @@ -501,7 +501,7 @@ describe('Selection strategies test suite', () => { // pool.workerChoiceStrategyContext.workerChoiceStrategies.get( // WorkerChoiceStrategies.FAIR_SHARE // ).workerLastVirtualTaskTimestamp.size - // ).toBe(pool.workers.length) + // ).toBe(pool.workerNodes.length) // } // We need to clean up the resources after our test await pool.destroy() @@ -518,18 +518,18 @@ describe('Selection strategies test suite', () => { ).workerLastVirtualTaskTimestamp ).toBeDefined() pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) .workerLastVirtualTaskTimestamp.keys()) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) - .workerLastVirtualTaskTimestamp.get(workerKey).start + .workerLastVirtualTaskTimestamp.get(workerNodeKey).start ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) - .workerLastVirtualTaskTimestamp.get(workerKey).end + .workerLastVirtualTaskTimestamp.get(workerNodeKey).end ).toBe(0) } await pool.destroy() @@ -544,18 +544,18 @@ describe('Selection strategies test suite', () => { ).workerLastVirtualTaskTimestamp ).toBeDefined() pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE) - for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) .workerLastVirtualTaskTimestamp.keys()) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) - .workerLastVirtualTaskTimestamp.get(workerKey).start + .workerLastVirtualTaskTimestamp.get(workerNodeKey).start ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.FAIR_SHARE) - .workerLastVirtualTaskTimestamp.get(workerKey).end + .workerLastVirtualTaskTimestamp.get(workerNodeKey).end ).toBe(0) } // We need to clean up the resources after our test @@ -574,25 +574,25 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).currentWorkerId + ).currentWorkerNodeId ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN ).defaultWorkerWeight ).toBeGreaterThan(0) - for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) .workersTaskRunTime.keys()) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) - .workersTaskRunTime.get(workerKey).weight + .workersTaskRunTime.get(workerNodeKey).weight ).toBeGreaterThan(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) - .workersTaskRunTime.get(workerKey).runTime + .workersTaskRunTime.get(workerNodeKey).runTime ).toBe(0) } // We need to clean up the resources after our test @@ -663,7 +663,7 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN ).workersTaskRunTime.size - ).toBe(pool.workers.length) + ).toBe(pool.workerNodes.length) // We need to clean up the resources after our test await pool.destroy() }) @@ -690,7 +690,7 @@ describe('Selection strategies test suite', () => { pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN ).workersTaskRunTime.size - ).toBe(pool.workers.length) + ).toBe(pool.workerNodes.length) } // We need to clean up the resources after our test await pool.destroy() @@ -704,7 +704,7 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).currentWorkerId + ).currentWorkerNodeId ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -720,20 +720,20 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).currentWorkerId + ).currentWorkerNodeId ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN ).defaultWorkerWeight ).toBeGreaterThan(0) - for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) .workersTaskRunTime.keys()) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) - .workersTaskRunTime.get(workerKey).runTime + .workersTaskRunTime.get(workerNodeKey).runTime ).toBe(0) } await pool.destroy() @@ -745,7 +745,7 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).currentWorkerId + ).currentWorkerNodeId ).toBeDefined() expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( @@ -761,20 +761,20 @@ describe('Selection strategies test suite', () => { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN - ).currentWorkerId + ).currentWorkerNodeId ).toBe(0) expect( pool.workerChoiceStrategyContext.workerChoiceStrategies.get( WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN ).defaultWorkerWeight ).toBeGreaterThan(0) - for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies + for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) .workersTaskRunTime.keys()) { expect( pool.workerChoiceStrategyContext.workerChoiceStrategies .get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN) - .workersTaskRunTime.get(workerKey).runTime + .workersTaskRunTime.get(workerNodeKey).runTime ).toBe(0) } // We need to clean up the resources after our test diff --git a/tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.js b/tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.js index cc3e66a4..05f0bb4c 100644 --- a/tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.js +++ b/tests/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.test.js @@ -34,7 +34,7 @@ describe('Weighted round robin strategy worker choice strategy test suite', () = .returns() const resetResult = strategy.reset() expect(resetResult).toBe(true) - expect(strategy.currentWorkerId).toBe(0) + expect(strategy.currentWorkerNodeId).toBe(0) expect(workersTaskRunTimeClearStub.calledOnce).toBe(true) expect(initWorkersTaskRunTimeStub.calledOnce).toBe(true) }) diff --git a/tests/pools/thread/dynamic.test.js b/tests/pools/thread/dynamic.test.js index 9f69860a..feb4955d 100644 --- a/tests/pools/thread/dynamic.test.js +++ b/tests/pools/thread/dynamic.test.js @@ -32,8 +32,8 @@ describe('Dynamic thread pool test suite', () => { for (let i = 0; i < max * 2; i++) { pool.execute() } - expect(pool.workers.length).toBeLessThanOrEqual(max) - expect(pool.workers.length).toBeGreaterThan(min) + expect(pool.workerNodes.length).toBeLessThanOrEqual(max) + expect(pool.workerNodes.length).toBeGreaterThan(min) // The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool. // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool. expect(poolBusy).toBe(max + 1) @@ -42,19 +42,19 @@ describe('Dynamic thread pool test suite', () => { }) it('Verify scale thread up and down is working', async () => { - expect(pool.workers.length).toBe(min) + expect(pool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.length).toBe(max) + expect(pool.workerNodes.length).toBe(max) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.length).toBe(min) + expect(pool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { pool.execute() } - expect(pool.workers.length).toBe(max) + expect(pool.workerNodes.length).toBe(max) await TestUtils.waitExits(pool, max - min) - expect(pool.workers.length).toBe(min) + expect(pool.workerNodes.length).toBe(min) }) it('Shutdown test', async () => { @@ -93,13 +93,13 @@ describe('Dynamic thread pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.length).toBe(min) + expect(longRunningPool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.length).toBe(max) + expect(longRunningPool.workerNodes.length).toBe(max) await TestUtils.waitExits(longRunningPool, max - min) - expect(longRunningPool.workers.length).toBe(min) + expect(longRunningPool.workerNodes.length).toBe(min) // We need to clean up the resources after our test await longRunningPool.destroy() }) @@ -115,14 +115,14 @@ describe('Dynamic thread pool test suite', () => { exitHandler: () => console.log('long running worker exited') } ) - expect(longRunningPool.workers.length).toBe(min) + expect(longRunningPool.workerNodes.length).toBe(min) for (let i = 0; i < max * 10; i++) { longRunningPool.execute() } - expect(longRunningPool.workers.length).toBe(max) + expect(longRunningPool.workerNodes.length).toBe(max) await TestUtils.sleep(1500) - // Here we expect the workers to be at the max size since the task is still running - expect(longRunningPool.workers.length).toBe(max) + // Here we expect the workerNodes to be at the max size since the task is still running + expect(longRunningPool.workerNodes.length).toBe(max) // We need to clean up the resources after our test await longRunningPool.destroy() }) diff --git a/tests/test-utils.js b/tests/test-utils.js index 89447169..dc8d5941 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -4,8 +4,8 @@ class TestUtils { static async waitExits (pool, numberOfExitEventsToWait) { return new Promise(resolve => { let exitEvents = 0 - for (const workerItem of pool.workers) { - workerItem.worker.on('exit', () => { + for (const workerNode of pool.workerNodes) { + workerNode.worker.on('exit', () => { ++exitEvents if (exitEvents === numberOfExitEventsToWait) { resolve(exitEvents)