- Use monotonic high resolution timer for worker tasks run time.
- Add worker tasks median run time to statistics.
+- Add worker tasks queue.
## [2.4.4] - 2023-04-07
ExitHandler,
MessageHandler,
OnlineHandler
-} from './pools/pool-worker'
+} from './pools/worker'
export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
export type { WorkerChoiceStrategy } from './pools/selection-strategies/selection-strategies-types'
export { DynamicThreadPool } from './pools/thread/dynamic'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
import { PoolEvents, type PoolOptions } from './pool'
import { PoolEmitter } from './pool'
-import type { IPoolInternal, TasksUsage, WorkerType } from './pool-internal'
+import type { IPoolInternal } from './pool-internal'
import { PoolType } from './pool-internal'
-import type { IPoolWorker } from './pool-worker'
+import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
import {
WorkerChoiceStrategies,
type WorkerChoiceStrategy
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export abstract class AbstractPool<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
> implements IPoolInternal<Worker, Data, Response> {
/** @inheritDoc */
- public readonly workers: Array<WorkerType<Worker>> = []
+ public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
/** @inheritDoc */
public readonly emitter?: PoolEmitter
}
/**
- * Gets the given worker key.
+ * Gets the given worker its worker node key.
*
* @param worker - The worker.
- * @returns The worker key if the worker is found in the pool, `-1` otherwise.
+ * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
*/
- private getWorkerKey (worker: Worker): number {
- return this.workers.findIndex(workerItem => workerItem.worker === worker)
+ private getWorkerNodeKey (worker: Worker): number {
+ return this.workerNodes.findIndex(
+ workerNode => workerNode.worker === worker
+ )
}
/** @inheritDoc */
): void {
this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
this.opts.workerChoiceStrategy = workerChoiceStrategy
- for (const [index, workerItem] of this.workers.entries()) {
- this.setWorker(index, workerItem.worker, {
- run: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- error: 0
- })
+ for (const [index, workerNode] of this.workerNodes.entries()) {
+ this.setWorkerNode(
+ index,
+ workerNode.worker,
+ {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ runTimeHistory: new CircularArray(),
+ avgRunTime: 0,
+ medRunTime: 0,
+ error: 0
+ },
+ workerNode.tasksQueue
+ )
}
this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
workerChoiceStrategy
protected internalBusy (): boolean {
return (
this.numberOfRunningTasks >= this.numberOfWorkers &&
- this.findFreeWorkerKey() === -1
+ this.findFreeWorkerNodeKey() === -1
)
}
/** @inheritDoc */
- public findFreeWorkerKey (): number {
- return this.workers.findIndex(workerItem => {
- return workerItem.tasksUsage.running === 0
+ public findFreeWorkerNodeKey (): number {
+ return this.workerNodes.findIndex(workerNode => {
+ return workerNode.tasksUsage?.running === 0
})
}
/** @inheritDoc */
public async execute (data: Data): Promise<Response> {
- const [workerKey, worker] = this.chooseWorker()
+ const [workerNodeKey, worker] = this.chooseWorker()
const messageId = crypto.randomUUID()
- const res = this.internalExecute(workerKey, worker, messageId)
+ const res = this.internalExecute(workerNodeKey, worker, messageId)
this.checkAndEmitFull()
this.checkAndEmitBusy()
this.sendToWorker(worker, {
/** @inheritDoc */
public async destroy (): Promise<void> {
await Promise.all(
- this.workers.map(async workerItem => {
- await this.destroyWorker(workerItem.worker)
+ this.workerNodes.map(async workerNode => {
+ await this.destroyWorker(workerNode.worker)
})
)
}
/**
- * Shutdowns given worker in the pool.
+ * Shutdowns the given worker.
*
- * @param worker - A worker within `workers`.
+ * @param worker - A worker within `workerNodes`.
*/
protected abstract destroyWorker (worker: Worker): void | Promise<void>
/**
- * Setup hook that can be overridden by a Poolifier pool implementation
- * to run code before workers are created in the abstract constructor.
+ * Setup hook to run code before worker node are created in the abstract constructor.
* Can be overridden
*
* @virtual
* Hook executed before the worker task promise resolution.
* Can be overridden.
*
- * @param workerKey - The worker key.
+ * @param workerNodeKey - The worker node key.
*/
- protected beforePromiseResponseHook (workerKey: number): void {
- ++this.workers[workerKey].tasksUsage.running
+ protected beforePromiseResponseHook (workerNodeKey: number): void {
+ ++this.workerNodes[workerNodeKey].tasksUsage.running
}
/**
}
/**
- * Chooses a worker for the next task.
+ * Chooses a worker node for the next task.
*
* The default uses a round robin algorithm to distribute the load.
*
- * @returns [worker key, worker].
+ * @returns [worker node key, worker].
*/
protected chooseWorker (): [number, Worker] {
- let workerKey: number
+ let workerNodeKey: number
if (
this.type === PoolType.DYNAMIC &&
!this.full &&
- this.findFreeWorkerKey() === -1
+ this.findFreeWorkerNodeKey() === -1
) {
const createdWorker = this.createAndSetupWorker()
this.registerWorkerMessageListener(createdWorker, message => {
void this.destroyWorker(createdWorker)
}
})
- workerKey = this.getWorkerKey(createdWorker)
+ workerNodeKey = this.getWorkerNodeKey(createdWorker)
} else {
- workerKey = this.workerChoiceStrategyContext.execute()
+ workerNodeKey = this.workerChoiceStrategyContext.execute()
}
- return [workerKey, this.workers[workerKey].worker]
+ return [workerNodeKey, this.workerNodes[workerNodeKey].worker]
}
/**
): void
/**
- * Registers a listener callback on a given worker.
+ * Registers a listener callback on the given worker.
*
* @param worker - The worker which should register a listener.
* @param listener - The message listener callback.
protected abstract createWorker (): Worker
/**
- * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
+ * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
*
* Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
*
* @param worker - The newly created worker.
- * @virtual
*/
protected abstract afterWorkerSetup (worker: Worker): void
/**
- * Creates a new worker for this pool and sets it up completely.
+ * Creates a new worker and sets it up completely in the pool worker nodes.
*
* @returns New, completely set up worker.
*/
worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
worker.once('exit', () => {
- this.removeWorker(worker)
+ this.removeWorkerNode(worker)
})
- this.pushWorker(worker, {
- run: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- error: 0
- })
+ this.pushWorkerNode(worker)
this.afterWorkerSetup(worker)
}
private async internalExecute (
- workerKey: number,
+ workerNodeKey: number,
worker: Worker,
messageId: string
): Promise<Response> {
- this.beforePromiseResponseHook(workerKey)
+ this.beforePromiseResponseHook(workerNodeKey)
return await new Promise<Response>((resolve, reject) => {
this.promiseResponseMap.set(messageId, { resolve, reject, worker })
})
}
/**
- * Gets the given worker tasks usage in the pool.
+ * Gets the given worker its tasks usage in the pool.
*
* @param worker - The worker.
* @returns The worker tasks usage.
*/
private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
- const workerKey = this.getWorkerKey(worker)
- if (workerKey !== -1) {
- return this.workers[workerKey].tasksUsage
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ if (workerNodeKey !== -1) {
+ return this.workerNodes[workerNodeKey].tasksUsage
}
- throw new Error('Worker could not be found in the pool')
+ throw new Error('Worker could not be found in the pool worker nodes')
}
/**
- * Pushes the given worker in the pool.
+ * Pushes the given worker in the pool worker nodes.
*
* @param worker - The worker.
- * @param tasksUsage - The worker tasks usage.
+ * @returns The worker nodes length.
*/
- private pushWorker (worker: Worker, tasksUsage: TasksUsage): void {
- this.workers.push({
+ private pushWorkerNode (worker: Worker): number {
+ return this.workerNodes.push({
worker,
- tasksUsage
+ tasksUsage: {
+ run: 0,
+ running: 0,
+ runTime: 0,
+ runTimeHistory: new CircularArray(),
+ avgRunTime: 0,
+ medRunTime: 0,
+ error: 0
+ },
+ tasksQueue: []
})
}
/**
- * Sets the given worker in the pool.
+ * Sets the given worker in the pool worker nodes.
*
- * @param workerKey - The worker key.
+ * @param workerNodeKey - The worker node key.
* @param worker - The worker.
* @param tasksUsage - The worker tasks usage.
+ * @param tasksQueue - The worker task queue.
*/
- private setWorker (
- workerKey: number,
+ private setWorkerNode (
+ workerNodeKey: number,
worker: Worker,
- tasksUsage: TasksUsage
+ tasksUsage: TasksUsage,
+ tasksQueue: Array<Task<Data>>
): void {
- this.workers[workerKey] = {
+ this.workerNodes[workerNodeKey] = {
worker,
- tasksUsage
+ tasksUsage,
+ tasksQueue
}
}
/**
- * Removes the given worker from the pool.
+ * Removes the given worker from the pool worker nodes.
*
- * @param worker - The worker that will be removed.
+ * @param worker - The worker.
*/
- protected removeWorker (worker: Worker): void {
- const workerKey = this.getWorkerKey(worker)
- this.workers.splice(workerKey, 1)
- this.workerChoiceStrategyContext.remove(workerKey)
+ protected removeWorkerNode (worker: Worker): void {
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ this.workerNodes.splice(workerNodeKey, 1)
+ this.workerChoiceStrategyContext.remove(workerNodeKey)
}
}
/** @inheritDoc */
public get full (): boolean {
- return this.workers.length === this.max
+ return this.workerNodes.length === this.max
}
/** @inheritDoc */
public get busy (): boolean {
- return this.full && this.findFreeWorkerKey() === -1
+ return this.full && this.findFreeWorkerNodeKey() === -1
}
}
/** @inheritDoc */
public get full (): boolean {
- return this.workers.length === this.numberOfWorkers
+ return this.workerNodes.length === this.numberOfWorkers
}
/** @inheritDoc */
-import type { CircularArray } from '../circular-array'
import type { IPool } from './pool'
-import type { IPoolWorker } from './pool-worker'
+import type { IWorker, WorkerNode } from './worker'
/**
* Internal pool types.
DYNAMIC = 'dynamic'
}
-/**
- * Internal tasks usage statistics.
- */
-export interface TasksUsage {
- run: number
- running: number
- runTime: number
- runTimeHistory: CircularArray<number>
- avgRunTime: number
- medRunTime: number
- error: number
-}
-
-/**
- * Internal worker type.
- *
- * @typeParam Worker - Type of worker type items which manages this pool.
- */
-export interface WorkerType<Worker extends IPoolWorker> {
- worker: Worker
- tasksUsage: TasksUsage
-}
-
/**
* Internal contract definition for a poolifier pool.
*
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export interface IPoolInternal<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
> extends IPool<Data, Response> {
/**
- * Pool worker type items array.
+ * Pool worker nodes.
*/
- readonly workers: Array<WorkerType<Worker>>
+ readonly workerNodes: Array<WorkerNode<Worker, Data>>
/**
* Pool type.
readonly busy: boolean
/**
- * Finds a free worker key based on the number of tasks the worker has applied.
+ * Finds a free worker node key based on the number of tasks the worker has applied.
*
- * If a worker is found with `0` running tasks, it is detected as free and its key is returned.
+ * If a worker is found with `0` running tasks, it is detected as free and its worker node key is returned.
*
* If no free worker is found, `-1` is returned.
*
- * @returns A worker key if there is one, `-1` otherwise.
+ * @returns A worker node key if there is one, `-1` otherwise.
*/
- findFreeWorkerKey: () => number
+ findFreeWorkerNodeKey: () => number
}
ExitHandler,
MessageHandler,
OnlineHandler
-} from './pool-worker'
+} from './worker'
import type { WorkerChoiceStrategy } from './selection-strategies/selection-strategies-types'
/**
import type { IPoolInternal } from '../pool-internal'
import { PoolType } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import type {
IWorkerChoiceStrategy,
RequiredStatistics
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export abstract class AbstractWorkerChoiceStrategy<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
> implements IWorkerChoiceStrategy {
public abstract choose (): number
/** @inheritDoc */
- public abstract remove (workerKey: number): boolean
+ public abstract remove (workerNodeKey: number): boolean
}
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export class FairShareWorkerChoiceStrategy<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
>
}
/**
- * Worker last virtual task execution timestamp.
+ * Worker last virtual task execution timestamp.
*/
private readonly workerLastVirtualTaskTimestamp: Map<
number,
/** @inheritDoc */
public choose (): number {
let minWorkerVirtualTaskEndTimestamp = Infinity
- let chosenWorkerKey!: number
- for (const [index] of this.pool.workers.entries()) {
+ let chosenWorkerNodeKey!: number
+ for (const [index] of this.pool.workerNodes.entries()) {
this.computeWorkerLastVirtualTaskTimestamp(index)
const workerLastVirtualTaskEndTimestamp =
this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0
workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
) {
minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
- chosenWorkerKey = index
+ chosenWorkerNodeKey = index
}
}
- return chosenWorkerKey
+ return chosenWorkerNodeKey
}
/** @inheritDoc */
- public remove (workerKey: number): boolean {
- const workerDeleted = this.workerLastVirtualTaskTimestamp.delete(workerKey)
+ public remove (workerNodeKey: number): boolean {
+ const deleted = this.workerLastVirtualTaskTimestamp.delete(workerNodeKey)
for (const [key, value] of this.workerLastVirtualTaskTimestamp.entries()) {
- if (key > workerKey) {
+ if (key > workerNodeKey) {
this.workerLastVirtualTaskTimestamp.set(key - 1, value)
}
}
- return workerDeleted
+ return deleted
}
/**
* Computes worker last virtual task timestamp.
*
- * @param workerKey - The worker key.
+ * @param workerNodeKey - The worker node key.
*/
- private computeWorkerLastVirtualTaskTimestamp (workerKey: number): void {
+ private computeWorkerLastVirtualTaskTimestamp (workerNodeKey: number): void {
const workerVirtualTaskStartTimestamp = Math.max(
performance.now(),
- this.workerLastVirtualTaskTimestamp.get(workerKey)?.end ?? -Infinity
+ this.workerLastVirtualTaskTimestamp.get(workerNodeKey)?.end ?? -Infinity
)
- this.workerLastVirtualTaskTimestamp.set(workerKey, {
+ this.workerLastVirtualTaskTimestamp.set(workerNodeKey, {
start: workerVirtualTaskStartTimestamp,
end:
workerVirtualTaskStartTimestamp +
- (this.pool.workers[workerKey].tasksUsage.avgRunTime ?? 0)
+ (this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime ?? 0)
})
}
}
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export class LessBusyWorkerChoiceStrategy<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
>
/** @inheritDoc */
public choose (): number {
- const freeWorkerKey = this.pool.findFreeWorkerKey()
- if (freeWorkerKey !== -1) {
- return freeWorkerKey
+ const freeWorkerNodeKey = this.pool.findFreeWorkerNodeKey()
+ if (freeWorkerNodeKey !== -1) {
+ return freeWorkerNodeKey
}
let minRunTime = Infinity
- let lessBusyWorkerKey!: number
- for (const [index, workerItem] of this.pool.workers.entries()) {
- const workerRunTime = workerItem.tasksUsage.runTime
+ let lessBusyWorkerNodeKey!: number
+ for (const [index, workerNode] of this.pool.workerNodes.entries()) {
+ const workerRunTime = workerNode.tasksUsage.runTime
if (workerRunTime === 0) {
return index
} else if (workerRunTime < minRunTime) {
minRunTime = workerRunTime
- lessBusyWorkerKey = index
+ lessBusyWorkerNodeKey = index
}
}
- return lessBusyWorkerKey
+ return lessBusyWorkerNodeKey
}
/** @inheritDoc */
- public remove (workerKey: number): boolean {
+ public remove (workerNodeKey: number): boolean {
return true
}
}
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type { IWorkerChoiceStrategy } from './selection-strategies-types'
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export class LessUsedWorkerChoiceStrategy<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
>
/** @inheritDoc */
public choose (): number {
- const freeWorkerKey = this.pool.findFreeWorkerKey()
- if (freeWorkerKey !== -1) {
- return freeWorkerKey
+ const freeWorkerNodeKey = this.pool.findFreeWorkerNodeKey()
+ if (freeWorkerNodeKey !== -1) {
+ return freeWorkerNodeKey
}
let minNumberOfTasks = Infinity
- let lessUsedWorkerKey!: number
- for (const [index, workerItem] of this.pool.workers.entries()) {
- const tasksUsage = workerItem.tasksUsage
+ let lessUsedWorkerNodeKey!: number
+ for (const [index, workerNode] of this.pool.workerNodes.entries()) {
+ const tasksUsage = workerNode.tasksUsage
const workerTasks = tasksUsage.run + tasksUsage.running
if (workerTasks === 0) {
return index
} else if (workerTasks < minNumberOfTasks) {
minNumberOfTasks = workerTasks
- lessUsedWorkerKey = index
+ lessUsedWorkerNodeKey = index
}
}
- return lessUsedWorkerKey
+ return lessUsedWorkerNodeKey
}
/** @inheritDoc */
- public remove (workerKey: number): boolean {
+ public remove (workerNodeKey: number): boolean {
return true
}
}
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type { IWorkerChoiceStrategy } from './selection-strategies-types'
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export class RoundRobinWorkerChoiceStrategy<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
/**
- * Id of the next worker.
+ * Id of the next worker node.
*/
- private nextWorkerId: number = 0
+ private nextWorkerNodeId: number = 0
/** @inheritDoc */
public reset (): boolean {
- this.nextWorkerId = 0
+ this.nextWorkerNodeId = 0
return true
}
/** @inheritDoc */
public choose (): number {
- const chosenWorkerKey = this.nextWorkerId
- this.nextWorkerId =
- this.nextWorkerId === this.pool.workers.length - 1
+ const chosenWorkerNodeKey = this.nextWorkerNodeId
+ this.nextWorkerNodeId =
+ this.nextWorkerNodeId === this.pool.workerNodes.length - 1
? 0
- : this.nextWorkerId + 1
- return chosenWorkerKey
+ : this.nextWorkerNodeId + 1
+ return chosenWorkerNodeKey
}
/** @inheritDoc */
- public remove (workerKey: number): boolean {
- if (this.nextWorkerId === workerKey) {
- if (this.pool.workers.length === 0) {
- this.nextWorkerId = 0
+ public remove (workerNodeKey: number): boolean {
+ if (this.nextWorkerNodeId === workerNodeKey) {
+ if (this.pool.workerNodes.length === 0) {
+ this.nextWorkerNodeId = 0
} else {
- this.nextWorkerId =
- this.nextWorkerId > this.pool.workers.length - 1
- ? this.pool.workers.length - 1
- : this.nextWorkerId
+ this.nextWorkerNodeId =
+ this.nextWorkerNodeId > this.pool.workerNodes.length - 1
+ ? this.pool.workerNodes.length - 1
+ : this.nextWorkerNodeId
}
}
return true
*/
export interface IWorkerChoiceStrategy {
/**
- * Required pool tasks usage statistics.
+ * Required tasks usage statistics.
*/
readonly requiredStatistics: RequiredStatistics
/**
*/
reset: () => boolean
/**
- * Chooses a worker in the pool and returns its key.
+ * Chooses a worker node in the pool and returns its key.
*/
choose: () => number
/**
- * Removes a worker reference from strategy internals.
+ * Removes a worker node key from strategy internals.
*
- * @param workerKey - The worker key.
+ * @param workerNodeKey - The worker node key.
*/
- remove: (workerKey: number) => boolean
+ remove: (workerNodeKey: number) => boolean
}
import { cpus } from 'node:os'
import type { IPoolInternal } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export class WeightedRoundRobinWorkerChoiceStrategy<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
>
}
/**
- * Worker id where the current task will be submitted.
+ * Worker node id where the current task will be submitted.
*/
- private currentWorkerId: number = 0
+ private currentWorkerNodeId: number = 0
/**
* Default worker weight.
*/
private readonly defaultWorkerWeight: number
/**
- * Per worker virtual task runtime map.
+ * Workers' virtual task runtime.
*/
private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
number,
/** @inheritDoc */
public reset (): boolean {
- this.currentWorkerId = 0
+ this.currentWorkerNodeId = 0
this.workersTaskRunTime.clear()
this.initWorkersTaskRunTime()
return true
/** @inheritDoc */
public choose (): number {
- const chosenWorkerKey = this.currentWorkerId
- if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
- this.initWorkerTaskRunTime(chosenWorkerKey)
+ const chosenWorkerNodeKey = this.currentWorkerNodeId
+ if (
+ this.isDynamicPool &&
+ !this.workersTaskRunTime.has(chosenWorkerNodeKey)
+ ) {
+ this.initWorkerTaskRunTime(chosenWorkerNodeKey)
}
const workerTaskRunTime =
- this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
+ this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0
const workerTaskWeight =
- this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
+ this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ??
this.defaultWorkerWeight
if (workerTaskRunTime < workerTaskWeight) {
this.setWorkerTaskRunTime(
- chosenWorkerKey,
+ chosenWorkerNodeKey,
workerTaskWeight,
workerTaskRunTime +
- (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
+ (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
)
} else {
- this.currentWorkerId =
- this.currentWorkerId === this.pool.workers.length - 1
+ this.currentWorkerNodeId =
+ this.currentWorkerNodeId === this.pool.workerNodes.length - 1
? 0
- : this.currentWorkerId + 1
- this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
+ : this.currentWorkerNodeId + 1
+ this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0)
}
- return chosenWorkerKey
+ return chosenWorkerNodeKey
}
/** @inheritDoc */
- public remove (workerKey: number): boolean {
- if (this.currentWorkerId === workerKey) {
- if (this.pool.workers.length === 0) {
- this.currentWorkerId = 0
+ public remove (workerNodeKey: number): boolean {
+ if (this.currentWorkerNodeId === workerNodeKey) {
+ if (this.pool.workerNodes.length === 0) {
+ this.currentWorkerNodeId = 0
} else {
- this.currentWorkerId =
- this.currentWorkerId > this.pool.workers.length - 1
- ? this.pool.workers.length - 1
- : this.currentWorkerId
+ this.currentWorkerNodeId =
+ this.currentWorkerNodeId > this.pool.workerNodes.length - 1
+ ? this.pool.workerNodes.length - 1
+ : this.currentWorkerNodeId
}
}
- const workerDeleted = this.workersTaskRunTime.delete(workerKey)
+ const deleted = this.workersTaskRunTime.delete(workerNodeKey)
for (const [key, value] of this.workersTaskRunTime) {
- if (key > workerKey) {
+ if (key > workerNodeKey) {
this.workersTaskRunTime.set(key - 1, value)
}
}
- return workerDeleted
+ return deleted
}
private initWorkersTaskRunTime (): void {
- for (const [index] of this.pool.workers.entries()) {
+ for (const [index] of this.pool.workerNodes.entries()) {
this.initWorkerTaskRunTime(index)
}
}
- private initWorkerTaskRunTime (workerKey: number): void {
- this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
+ private initWorkerTaskRunTime (workerNodeKey: number): void {
+ this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0)
}
private setWorkerTaskRunTime (
- workerKey: number,
+ workerNodeKey: number,
weight: number,
runTime: number
): void {
- this.workersTaskRunTime.set(workerKey, {
+ this.workersTaskRunTime.set(workerNodeKey, {
weight,
runTime
})
}
- private getWorkerVirtualTaskRunTime (workerKey: number): number {
- return this.pool.workers[workerKey].tasksUsage.avgRunTime
+ private getWorkerVirtualTaskRunTime (workerNodeKey: number): number {
+ return this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
}
private computeWorkerWeight (): number {
import type { IPoolInternal } from '../pool-internal'
-import type { IPoolWorker } from '../pool-worker'
+import type { IWorker } from '../worker'
import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy'
import { LessBusyWorkerChoiceStrategy } from './less-busy-worker-choice-strategy'
import { LessUsedWorkerChoiceStrategy } from './less-used-worker-choice-strategy'
* @typeParam Response - Type of response of execution. This can only be serializable data.
*/
export class WorkerChoiceStrategyContext<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Data = unknown,
Response = unknown
> {
/**
* Executes the worker choice strategy algorithm in the context.
*
- * @returns The key of the chosen one.
+ * @returns The key of the worker node.
*/
public execute (): number {
return (
}
/**
- * Removes a worker from the worker choice strategy in the context.
+ * Removes a worker node key from the worker choice strategy in the context.
*
- * @param workerKey - The key of the worker to remove.
+ * @param workerNodeKey - The key of the worker node.
* @returns `true` if the removal is successful, `false` otherwise.
*/
- public remove (workerKey: number): boolean {
+ public remove (workerNodeKey: number): boolean {
return (
this.workerChoiceStrategies.get(
this.workerChoiceStrategyType
) as IWorkerChoiceStrategy
- ).remove(workerKey)
+ ).remove(workerNodeKey)
}
}
/** @inheritDoc */
public get full (): boolean {
- return this.workers.length === this.max
+ return this.workerNodes.length === this.max
}
/** @inheritDoc */
public get busy (): boolean {
- return this.full && this.findFreeWorkerKey() === -1
+ return this.full && this.findFreeWorkerNodeKey() === -1
}
}
/** @inheritDoc */
public get full (): boolean {
- return this.workers.length === this.numberOfWorkers
+ return this.workerNodes.length === this.numberOfWorkers
}
/** @inheritDoc */
+import type { CircularArray } from '../circular-array'
+
/**
* Callback invoked if the worker has received a message.
*/
export type ExitHandler<Worker> = (this: Worker, code: number) => void
/**
- * Interface that describes the minimum required implementation of listener events for a pool worker.
+ * Worker task interface.
*/
-export interface IPoolWorker {
+export interface Task<Data = unknown> {
+ data: Data
+ id: string
+}
+
+/**
+ * Worker tasks usage statistics.
+ */
+export interface TasksUsage {
+ run: number
+ running: number
+ runTime: number
+ runTimeHistory: CircularArray<number>
+ avgRunTime: number
+ medRunTime: number
+ error: number
+}
+
+/**
+ * Worker interface.
+ */
+export interface IWorker {
/**
* Register an event listener.
*
*/
once: (event: 'exit', handler: ExitHandler<this>) => void
}
+
+/**
+ * Worker node interface.
+ */
+export interface WorkerNode<Worker extends IWorker, Data = unknown> {
+ worker: Worker
+ tasksUsage: TasksUsage
+ tasksQueue: Array<Task<Data>>
+}
import type { Worker as ClusterWorker } from 'node:cluster'
import type { MessagePort } from 'node:worker_threads'
import type { KillBehavior } from './worker/worker-options'
-import type { IPoolWorker } from './pools/pool-worker'
+import type { IWorker } from './pools/worker'
/**
* Make all properties in T non-readonly.
* @typeParam Response - Type of execution response. This can only be serializable data.
*/
export interface PromiseResponseWrapper<
- Worker extends IPoolWorker,
+ Worker extends IWorker,
Response = unknown
> {
/**
describe('Abstract pool test suite', () => {
const numberOfWorkers = 1
const workerNotFoundInPoolError = new Error(
- 'Worker could not be found in the pool'
+ 'Worker could not be found in the pool worker nodes'
)
class StubPoolWithRemoveAllWorker extends FixedThreadPool {
removeAllWorker () {
numberOfWorkers,
'./tests/worker-files/cluster/testWorker.js'
)
- for (const workerItem of pool.workers) {
- expect(workerItem.tasksUsage).toBeDefined()
- expect(workerItem.tasksUsage.run).toBe(0)
- expect(workerItem.tasksUsage.running).toBe(0)
- expect(workerItem.tasksUsage.runTime).toBe(0)
- expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
- expect(workerItem.tasksUsage.avgRunTime).toBe(0)
- expect(workerItem.tasksUsage.medRunTime).toBe(0)
- expect(workerItem.tasksUsage.error).toBe(0)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksUsage).toBeDefined()
+ expect(workerNode.tasksUsage.run).toBe(0)
+ expect(workerNode.tasksUsage.running).toBe(0)
+ expect(workerNode.tasksUsage.runTime).toBe(0)
+ expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+ expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+ expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+ expect(workerNode.tasksUsage.medRunTime).toBe(0)
+ expect(workerNode.tasksUsage.error).toBe(0)
+ }
+ await pool.destroy()
+ })
+
+ it('Verify that worker pool tasks queue are initialized', async () => {
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js'
+ )
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode.tasksQueue).toBeInstanceOf(Array)
+ expect(workerNode.tasksQueue.length).toBe(0)
}
await pool.destroy()
})
for (let i = 0; i < numberOfWorkers * 2; i++) {
promises.push(pool.execute())
}
- for (const workerItem of pool.workers) {
- expect(workerItem.tasksUsage).toBeDefined()
- expect(workerItem.tasksUsage.run).toBe(0)
- expect(workerItem.tasksUsage.running).toBe(numberOfWorkers * 2)
- expect(workerItem.tasksUsage.runTime).toBe(0)
- expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
- expect(workerItem.tasksUsage.avgRunTime).toBe(0)
- expect(workerItem.tasksUsage.medRunTime).toBe(0)
- expect(workerItem.tasksUsage.error).toBe(0)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksUsage).toBeDefined()
+ expect(workerNode.tasksUsage.run).toBe(0)
+ expect(workerNode.tasksUsage.running).toBe(numberOfWorkers * 2)
+ expect(workerNode.tasksUsage.runTime).toBe(0)
+ expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+ expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+ expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+ expect(workerNode.tasksUsage.medRunTime).toBe(0)
+ expect(workerNode.tasksUsage.error).toBe(0)
}
await Promise.all(promises)
- for (const workerItem of pool.workers) {
- expect(workerItem.tasksUsage).toBeDefined()
- expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
- expect(workerItem.tasksUsage.running).toBe(0)
- expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
- expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
- expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
- expect(workerItem.tasksUsage.medRunTime).toBe(0)
- expect(workerItem.tasksUsage.error).toBe(0)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksUsage).toBeDefined()
+ expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
+ expect(workerNode.tasksUsage.running).toBe(0)
+ expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+ expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+ expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.tasksUsage.medRunTime).toBe(0)
+ expect(workerNode.tasksUsage.error).toBe(0)
}
await pool.destroy()
})
promises.push(pool.execute())
}
await Promise.all(promises)
- for (const workerItem of pool.workers) {
- expect(workerItem.tasksUsage).toBeDefined()
- expect(workerItem.tasksUsage.run).toBe(numberOfWorkers * 2)
- expect(workerItem.tasksUsage.running).toBe(0)
- expect(workerItem.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
- expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
- expect(workerItem.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
- expect(workerItem.tasksUsage.medRunTime).toBe(0)
- expect(workerItem.tasksUsage.error).toBe(0)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksUsage).toBeDefined()
+ expect(workerNode.tasksUsage.run).toBe(numberOfWorkers * 2)
+ expect(workerNode.tasksUsage.running).toBe(0)
+ expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+ expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+ expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.tasksUsage.medRunTime).toBe(0)
+ expect(workerNode.tasksUsage.error).toBe(0)
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const workerItem of pool.workers) {
- expect(workerItem.tasksUsage).toBeDefined()
- expect(workerItem.tasksUsage.run).toBe(0)
- expect(workerItem.tasksUsage.running).toBe(0)
- expect(workerItem.tasksUsage.runTime).toBe(0)
- expect(workerItem.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
- expect(workerItem.tasksUsage.avgRunTime).toBe(0)
- expect(workerItem.tasksUsage.medRunTime).toBe(0)
- expect(workerItem.tasksUsage.error).toBe(0)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode.tasksUsage).toBeDefined()
+ expect(workerNode.tasksUsage.run).toBe(0)
+ expect(workerNode.tasksUsage.running).toBe(0)
+ expect(workerNode.tasksUsage.runTime).toBe(0)
+ expect(workerNode.tasksUsage.runTimeHistory).toBeInstanceOf(CircularArray)
+ expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
+ expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+ expect(workerNode.tasksUsage.medRunTime).toBe(0)
+ expect(workerNode.tasksUsage.error).toBe(0)
}
await pool.destroy()
})
for (let i = 0; i < max * 2; i++) {
pool.execute()
}
- expect(pool.workers.length).toBeLessThanOrEqual(max)
- expect(pool.workers.length).toBeGreaterThan(min)
+ expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
+ expect(pool.workerNodes.length).toBeGreaterThan(min)
// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
})
it('Verify scale worker up and down is working', async () => {
- expect(pool.workers.length).toBe(min)
+ expect(pool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.length).toBeGreaterThan(min)
+ expect(pool.workerNodes.length).toBeGreaterThan(min)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.length).toBe(min)
+ expect(pool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.length).toBeGreaterThan(min)
+ expect(pool.workerNodes.length).toBeGreaterThan(min)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.length).toBe(min)
+ expect(pool.workerNodes.length).toBe(min)
})
it('Shutdown test', async () => {
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.length).toBe(min)
+ expect(longRunningPool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.length).toBe(max)
+ expect(longRunningPool.workerNodes.length).toBe(max)
await TestUtils.waitExits(longRunningPool, max - min)
- expect(longRunningPool.workers.length).toBe(min)
+ expect(longRunningPool.workerNodes.length).toBe(min)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.length).toBe(min)
+ expect(longRunningPool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.length).toBe(max)
+ expect(longRunningPool.workerNodes.length).toBe(max)
await TestUtils.sleep(1500)
- // Here we expect the workers to be at the max size since the task is still running
- expect(longRunningPool.workers.length).toBe(max)
+ // Here we expect the workerNodes to be at the max size since the task is still running
+ expect(longRunningPool.workerNodes.length).toBe(max)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerId
+ ).nextWorkerNodeId
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerId
+ ).nextWorkerNodeId
).toBeDefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerId
+ ).nextWorkerNodeId
).toBe(0)
await pool.destroy()
pool = new DynamicThreadPool(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerId
+ ).nextWorkerNodeId
).toBeDefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.ROUND_ROBIN
- ).nextWorkerId
+ ).nextWorkerNodeId
).toBe(0)
// We need to clean up the resources after our test
await pool.destroy()
expect(pool.opts.workerChoiceStrategy).toBe(
WorkerChoiceStrategies.FAIR_SHARE
)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+ for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
.workerLastVirtualTaskTimestamp.keys()) {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
- .workerLastVirtualTaskTimestamp.get(workerKey).start
+ .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
- .workerLastVirtualTaskTimestamp.get(workerKey).end
+ .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
).toBe(0)
}
// We need to clean up the resources after our test
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.FAIR_SHARE
).workerLastVirtualTaskTimestamp.size
- ).toBe(pool.workers.length)
+ ).toBe(pool.workerNodes.length)
// We need to clean up the resources after our test
await pool.destroy()
})
// pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
// WorkerChoiceStrategies.FAIR_SHARE
// ).workerLastVirtualTaskTimestamp.size
- // ).toBe(pool.workers.length)
+ // ).toBe(pool.workerNodes.length)
// }
// We need to clean up the resources after our test
await pool.destroy()
).workerLastVirtualTaskTimestamp
).toBeDefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+ for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
.workerLastVirtualTaskTimestamp.keys()) {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
- .workerLastVirtualTaskTimestamp.get(workerKey).start
+ .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
- .workerLastVirtualTaskTimestamp.get(workerKey).end
+ .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
).toBe(0)
}
await pool.destroy()
).workerLastVirtualTaskTimestamp
).toBeDefined()
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+ for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
.workerLastVirtualTaskTimestamp.keys()) {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
- .workerLastVirtualTaskTimestamp.get(workerKey).start
+ .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.FAIR_SHARE)
- .workerLastVirtualTaskTimestamp.get(workerKey).end
+ .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
).toBe(0)
}
// We need to clean up the resources after our test
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- ).currentWorkerId
+ ).currentWorkerNodeId
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
).defaultWorkerWeight
).toBeGreaterThan(0)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+ for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
.workersTaskRunTime.keys()) {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
- .workersTaskRunTime.get(workerKey).weight
+ .workersTaskRunTime.get(workerNodeKey).weight
).toBeGreaterThan(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
- .workersTaskRunTime.get(workerKey).runTime
+ .workersTaskRunTime.get(workerNodeKey).runTime
).toBe(0)
}
// We need to clean up the resources after our test
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
).workersTaskRunTime.size
- ).toBe(pool.workers.length)
+ ).toBe(pool.workerNodes.length)
// We need to clean up the resources after our test
await pool.destroy()
})
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
).workersTaskRunTime.size
- ).toBe(pool.workers.length)
+ ).toBe(pool.workerNodes.length)
}
// We need to clean up the resources after our test
await pool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- ).currentWorkerId
+ ).currentWorkerNodeId
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- ).currentWorkerId
+ ).currentWorkerNodeId
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
).defaultWorkerWeight
).toBeGreaterThan(0)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+ for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
.workersTaskRunTime.keys()) {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
- .workersTaskRunTime.get(workerKey).runTime
+ .workersTaskRunTime.get(workerNodeKey).runTime
).toBe(0)
}
await pool.destroy()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- ).currentWorkerId
+ ).currentWorkerNodeId
).toBeDefined()
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
- ).currentWorkerId
+ ).currentWorkerNodeId
).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
).defaultWorkerWeight
).toBeGreaterThan(0)
- for (const workerKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+ for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
.workersTaskRunTime.keys()) {
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies
.get(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
- .workersTaskRunTime.get(workerKey).runTime
+ .workersTaskRunTime.get(workerNodeKey).runTime
).toBe(0)
}
// We need to clean up the resources after our test
.returns()
const resetResult = strategy.reset()
expect(resetResult).toBe(true)
- expect(strategy.currentWorkerId).toBe(0)
+ expect(strategy.currentWorkerNodeId).toBe(0)
expect(workersTaskRunTimeClearStub.calledOnce).toBe(true)
expect(initWorkersTaskRunTimeStub.calledOnce).toBe(true)
})
for (let i = 0; i < max * 2; i++) {
pool.execute()
}
- expect(pool.workers.length).toBeLessThanOrEqual(max)
- expect(pool.workers.length).toBeGreaterThan(min)
+ expect(pool.workerNodes.length).toBeLessThanOrEqual(max)
+ expect(pool.workerNodes.length).toBeGreaterThan(min)
// The `busy` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
// So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
expect(poolBusy).toBe(max + 1)
})
it('Verify scale thread up and down is working', async () => {
- expect(pool.workers.length).toBe(min)
+ expect(pool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.length).toBe(max)
+ expect(pool.workerNodes.length).toBe(max)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.length).toBe(min)
+ expect(pool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
pool.execute()
}
- expect(pool.workers.length).toBe(max)
+ expect(pool.workerNodes.length).toBe(max)
await TestUtils.waitExits(pool, max - min)
- expect(pool.workers.length).toBe(min)
+ expect(pool.workerNodes.length).toBe(min)
})
it('Shutdown test', async () => {
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.length).toBe(min)
+ expect(longRunningPool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.length).toBe(max)
+ expect(longRunningPool.workerNodes.length).toBe(max)
await TestUtils.waitExits(longRunningPool, max - min)
- expect(longRunningPool.workers.length).toBe(min)
+ expect(longRunningPool.workerNodes.length).toBe(min)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
exitHandler: () => console.log('long running worker exited')
}
)
- expect(longRunningPool.workers.length).toBe(min)
+ expect(longRunningPool.workerNodes.length).toBe(min)
for (let i = 0; i < max * 10; i++) {
longRunningPool.execute()
}
- expect(longRunningPool.workers.length).toBe(max)
+ expect(longRunningPool.workerNodes.length).toBe(max)
await TestUtils.sleep(1500)
- // Here we expect the workers to be at the max size since the task is still running
- expect(longRunningPool.workers.length).toBe(max)
+ // Here we expect the workerNodes to be at the max size since the task is still running
+ expect(longRunningPool.workerNodes.length).toBe(max)
// We need to clean up the resources after our test
await longRunningPool.destroy()
})
static async waitExits (pool, numberOfExitEventsToWait) {
return new Promise(resolve => {
let exitEvents = 0
- for (const workerItem of pool.workers) {
- workerItem.worker.on('exit', () => {
+ for (const workerNode of pool.workerNodes) {
+ workerNode.worker.on('exit', () => {
++exitEvents
if (exitEvents === numberOfExitEventsToWait) {
resolve(exitEvents)