X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=635b8af459363dc022671f089492f155b3470695;hb=be202c2c43686b8d46d7061622e17b6c173c9e33;hp=9601564fa6458ba4c10fff62bbc265660c9ec26a;hpb=776e97a2d35ba2003ddeddecc826353b569a4344;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 9601564f..649d5f0d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,26 +1,34 @@ +import { AsyncResource } from 'node:async_hooks' import { randomUUID } from 'node:crypto' +import { EventEmitterAsyncResource } from 'node:events' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' -import { EventEmitterAsyncResource } from 'node:events' + import type { MessageValue, PromiseResponseWrapper, - Task -} from '../utility-types' + Task, + TaskFunctionProperties +} from '../utility-types.js' import { + average, + buildTaskFunctionProperties, DEFAULT_TASK_NAME, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, - average, + exponentialDelay, isKillBehavior, isPlainObject, max, median, min, - round -} from '../utils' -import { KillBehaviors } from '../worker/worker-options' -import type { TaskFunction } from '../worker/task-functions' + round, + sleep +} from '../utils.js' +import type { + TaskFunction, + TaskFunctionObject +} from '../worker/task-functions.js' +import { KillBehaviors } from '../worker/worker-options.js' import { type IPool, PoolEvents, @@ -29,31 +37,35 @@ import { type PoolType, PoolTypes, type TasksQueueOptions -} from './pool' -import type { - IWorker, - IWorkerNode, - WorkerInfo, - WorkerNodeEventDetail, - WorkerType, - WorkerUsage -} from './worker' +} from './pool.js' import { - type MeasurementStatisticsRequirements, Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions -} from './selection-strategies/selection-strategies-types' -import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' -import { version } from './version' -import { WorkerNode } from './worker-node' +} from './selection-strategies/selection-strategies-types.js' +import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, + checkValidPriority, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics -} from './utils' + getDefaultTasksQueueOptions, + updateEluWorkerUsage, + updateRunTimeWorkerUsage, + updateTaskStatisticsWorkerUsage, + updateWaitTimeWorkerUsage, + waitWorkerNodeEvents +} from './utils.js' +import { version } from './version.js' +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerNodeEventDetail, + WorkerType +} from './worker.js' +import { WorkerNode } from './worker-node.js' /** * Base class that implements some shared logic for all poolifier pools. @@ -73,25 +85,25 @@ export abstract class AbstractPool< /** @inheritDoc */ public emitter?: EventEmitterAsyncResource - /** - * Dynamic pool maximum size property placeholder. - */ - protected readonly max?: number - /** * The task execution response promise map: * - `key`: The message id of each submitted task. - * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks. + * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource. * * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id. */ - protected promiseResponseMap: Map> = - new Map>() + protected promiseResponseMap: Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + > = new Map< + `${string}-${string}-${string}-${string}-${string}`, + PromiseResponseWrapper + >() /** - * Worker choice strategy context referencing a worker choice algorithm implementation. + * Worker choice strategies context referencing worker choice algorithms implementation. */ - protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< + protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext< Worker, Data, Response @@ -100,9 +112,12 @@ export abstract class AbstractPool< /** * The task functions added at runtime map: * - `key`: The task function name. - * - `value`: The task function itself. + * - `value`: The task function object. */ - private readonly taskFunctions: Map> + private readonly taskFunctions: Map< + string, + TaskFunctionObject + > /** * Whether the pool is started or not. @@ -112,30 +127,45 @@ export abstract class AbstractPool< * Whether the pool is starting or not. */ private starting: boolean + /** + * Whether the pool is destroying or not. + */ + private destroying: boolean + /** + * Whether the minimum number of workers is starting or not. + */ + private startingMinimumNumberOfWorkers: boolean + /** + * Whether the pool ready event has been emitted or not. + */ + private readyEventEmitted: boolean /** * The start timestamp of the pool. */ - private readonly startTimestamp + private startTimestamp?: number /** * Constructs a new poolifier pool. * - * @param numberOfWorkers - Number of workers that this pool should manage. + * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages. * @param filePath - Path to the worker file. * @param opts - Options for the pool. + * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages. */ public constructor ( - protected readonly numberOfWorkers: number, + protected readonly minimumNumberOfWorkers: number, protected readonly filePath: string, - protected readonly opts: PoolOptions + protected readonly opts: PoolOptions, + protected readonly maximumNumberOfWorkers?: number ) { if (!this.isMain()) { throw new Error( 'Cannot start a pool from a worker with the same type as the pool' ) } + this.checkPoolType() checkFilePath(this.filePath) - this.checkNumberOfWorkers(this.numberOfWorkers) + this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) @@ -143,45 +173,57 @@ export abstract class AbstractPool< this.enqueueTask = this.enqueueTask.bind(this) if (this.opts.enableEvents === true) { - this.initializeEventEmitter() + this.initEventEmitter() } - this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext< + this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext< Worker, Data, Response >( this, - this.opts.workerChoiceStrategy, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + [this.opts.workerChoiceStrategy!], this.opts.workerChoiceStrategyOptions ) this.setupHook() - this.taskFunctions = new Map>() + this.taskFunctions = new Map>() this.started = false this.starting = false + this.destroying = false + this.readyEventEmitted = false + this.startingMinimumNumberOfWorkers = false if (this.opts.startWorkers === true) { this.start() } + } - this.startTimestamp = performance.now() + private checkPoolType (): void { + if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) { + throw new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + } } - private checkNumberOfWorkers (numberOfWorkers: number): void { - if (numberOfWorkers == null) { + private checkMinimumNumberOfWorkers ( + minimumNumberOfWorkers: number | undefined + ): void { + if (minimumNumberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' ) - } else if (!Number.isSafeInteger(numberOfWorkers)) { + } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) { throw new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers' ) - } else if (numberOfWorkers < 0) { + } else if (minimumNumberOfWorkers < 0) { throw new RangeError( 'Cannot instantiate a pool with a negative number of workers' ) - } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) { + } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) { throw new RangeError('Cannot instantiate a fixed pool with zero worker') } } @@ -189,25 +231,22 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true - checkValidWorkerChoiceStrategy( - opts.workerChoiceStrategy as WorkerChoiceStrategy - ) + checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategyOptions( - opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + opts.workerChoiceStrategyOptions ) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...opts.workerChoiceStrategyOptions + if (opts.workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions } this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions) + checkValidTasksQueueOptions(opts.tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions + opts.tasksQueueOptions ) } } else { @@ -216,7 +255,7 @@ export abstract class AbstractPool< } private checkValidWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined ): void { if ( workerChoiceStrategyOptions != null && @@ -226,25 +265,10 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } - if ( - workerChoiceStrategyOptions?.retries != null && - !Number.isSafeInteger(workerChoiceStrategyOptions.retries) - ) { - throw new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - } - if ( - workerChoiceStrategyOptions?.retries != null && - workerChoiceStrategyOptions.retries < 0 - ) { - throw new RangeError( - `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero` - ) - } if ( workerChoiceStrategyOptions?.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize + Object.keys(workerChoiceStrategyOptions.weights).length !== + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) ) { throw new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -262,7 +286,7 @@ export abstract class AbstractPool< } } - private initializeEventEmitter (): void { + private initEventEmitter (): void { this.emitter = new EventEmitterAsyncResource({ name: `poolifier:${this.type}-${this.worker}-pool` }) @@ -276,13 +300,17 @@ export abstract class AbstractPool< worker: this.worker, started: this.started, ready: this.ready, - strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, - minSize: this.minSize, - maxSize: this.maxSize, - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate && - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .waitTime.aggregate && { utilization: round(this.utilization) }), + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + defaultStrategy: this.opts.workerChoiceStrategy!, + strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0, + minSize: this.minimumNumberOfWorkers, + maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true && + this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .waitTime.aggregate && { + utilization: round(this.utilization) + }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( (accumulator, workerNode) => @@ -291,9 +319,16 @@ export abstract class AbstractPool< : accumulator, 0 ), + ...(this.opts.enableTasksQueue === true && { + stealingWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.stealing ? accumulator + 1 : accumulator, + 0 + ) + }), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, + (accumulator, _, workerNodeKey) => + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( @@ -316,7 +351,7 @@ export abstract class AbstractPool< ...(this.opts.enableTasksQueue === true && { maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), + accumulator + (workerNode.usage.tasks.maxQueued ?? 0), 0 ) }), @@ -335,24 +370,24 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate && { + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true && { runTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + workerNode => workerNode.usage.runTime.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + workerNode => workerNode.usage.runTime.maximum ?? -Infinity ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -364,7 +399,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -378,24 +413,24 @@ export abstract class AbstractPool< }) } }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .waitTime.aggregate && { + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .waitTime.aggregate === true && { waitTime: { minimum: round( min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + workerNode => workerNode.usage.waitTime.minimum ?? Infinity ) ) ), maximum: round( max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + workerNode => workerNode.usage.waitTime.maximum ?? -Infinity ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -407,7 +442,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -420,6 +455,107 @@ export abstract class AbstractPool< ) }) } + }), + ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate === true && { + elu: { + idle: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => workerNode.usage.elu.idle.minimum ?? Infinity + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => workerNode.usage.elu.idle.maximum ?? -Infinity + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.elu.idle.history), + [] + ) + ) + ) + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.elu.idle.history), + [] + ) + ) + ) + }) + }, + active: { + minimum: round( + min( + ...this.workerNodes.map( + workerNode => workerNode.usage.elu.active.minimum ?? Infinity + ) + ) + ), + maximum: round( + max( + ...this.workerNodes.map( + workerNode => workerNode.usage.elu.active.maximum ?? -Infinity + ) + ) + ), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.average && { + average: round( + average( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.elu.active.history), + [] + ) + ) + ) + }), + ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements() + .elu.median && { + median: round( + median( + this.workerNodes.reduce( + (accumulator, workerNode) => + accumulator.concat(workerNode.usage.elu.active.history), + [] + ) + ) + ) + }) + }, + utilization: { + average: round( + average( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ), + median: round( + median( + this.workerNodes.map( + workerNode => workerNode.usage.elu.utilization ?? 0 + ) + ) + ) + } + } }) } } @@ -428,6 +564,9 @@ export abstract class AbstractPool< * The pool readiness boolean status. */ private get ready (): boolean { + if (this.empty) { + return false + } return ( this.workerNodes.reduce( (accumulator, workerNode) => @@ -435,26 +574,37 @@ export abstract class AbstractPool< ? accumulator + 1 : accumulator, 0 - ) >= this.minSize + ) >= this.minimumNumberOfWorkers ) } + /** + * The pool emptiness boolean status. + */ + protected get empty (): boolean { + return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0 + } + /** * The approximate pool utilization. * * @returns The pool utilization. */ private get utilization (): number { + if (this.startTimestamp == null) { + return 0 + } const poolTimeCapacity = - (performance.now() - this.startTimestamp) * this.maxSize + (performance.now() - this.startTimestamp) * + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.runTime?.aggregate ?? 0), + accumulator + (workerNode.usage.runTime.aggregate ?? 0), 0 ) const totalTasksWaitTime = this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + (workerNode.usage.waitTime?.aggregate ?? 0), + accumulator + (workerNode.usage.waitTime.aggregate ?? 0), 0 ) return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity @@ -472,20 +622,6 @@ export abstract class AbstractPool< */ protected abstract get worker (): WorkerType - /** - * The pool minimum size. - */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** - * The pool maximum size. - */ - protected get maxSize (): number { - return this.max ?? this.numberOfWorkers - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -495,28 +631,13 @@ export abstract class AbstractPool< private checkMessageWorkerId (message: MessageValue): void { if (message.workerId == null) { throw new Error('Worker message received without worker id') - } else if ( - message.workerId != null && - this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 - ) { + } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) { throw new Error( `Worker message received from unknown worker '${message.workerId}'` ) } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -534,32 +655,52 @@ export abstract class AbstractPool< workerChoiceStrategy: WorkerChoiceStrategy, workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions ): void { + let requireSync = false checkValidWorkerChoiceStrategy(workerChoiceStrategy) - this.opts.workerChoiceStrategy = workerChoiceStrategy - this.workerChoiceStrategyContext.setWorkerChoiceStrategy( - this.opts.workerChoiceStrategy - ) if (workerChoiceStrategyOptions != null) { - this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) + requireSync = !this.setWorkerChoiceStrategyOptions( + workerChoiceStrategyOptions + ) } - for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { - workerNode.resetUsage() - this.sendStatisticsMessageToWorker(workerNodeKey) + if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) { + this.opts.workerChoiceStrategy = workerChoiceStrategy + this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy( + this.opts.workerChoiceStrategy, + this.opts.workerChoiceStrategyOptions + ) + requireSync = true + } + if (requireSync) { + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } } } /** @inheritDoc */ public setWorkerChoiceStrategyOptions ( - workerChoiceStrategyOptions: WorkerChoiceStrategyOptions - ): void { + workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined + ): boolean { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...workerChoiceStrategyOptions + if (workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.workerChoiceStrategiesContext?.setOptions( + this.opts.workerChoiceStrategyOptions + ) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies(), + this.opts.workerChoiceStrategyOptions + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } + return true } - this.workerChoiceStrategyContext.setOptions( - this.opts.workerChoiceStrategyOptions - ) + return false } /** @inheritDoc */ @@ -573,22 +714,27 @@ export abstract class AbstractPool< this.flushTasksQueues() } this.opts.enableTasksQueue = enable - this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions) + this.setTasksQueueOptions(tasksQueueOptions) } /** @inheritDoc */ - public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void { + public setTasksQueueOptions ( + tasksQueueOptions: TasksQueueOptions | undefined + ): void { if (this.opts.enableTasksQueue === true) { checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setTasksQueueSize(this.opts.tasksQueueOptions.size!) if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() this.setTaskStealing() } else { this.unsetTaskStealing() } if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() this.setTasksStealingOnBackPressure() } else { this.unsetTasksStealingOnBackPressure() @@ -599,15 +745,12 @@ export abstract class AbstractPool< } private buildTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: TasksQueueOptions | undefined ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ), ...tasksQueueOptions } } @@ -619,37 +762,34 @@ export abstract class AbstractPool< } private setTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( - 'emptyqueue', - this.handleEmptyQueueEvent as EventListener - ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( - 'emptyqueue', - this.handleEmptyQueueEvent as EventListener + for (const workerNodeKey of this.workerNodes.keys()) { + this.workerNodes[workerNodeKey].off( + 'idle', + this.handleWorkerNodeIdleEvent ) } } private setTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( - 'backpressure', - this.handleBackPressureEvent as EventListener + for (const workerNodeKey of this.workerNodes.keys()) { + this.workerNodes[workerNodeKey].on( + 'backPressure', + this.handleWorkerNodeBackPressureEvent ) } } private unsetTasksStealingOnBackPressure (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( - 'backpressure', - this.handleBackPressureEvent as EventListener + for (const workerNodeKey of this.workerNodes.keys()) { + this.workerNodes[workerNodeKey].off( + 'backPressure', + this.handleWorkerNodeBackPressureEvent ) } } @@ -660,7 +800,10 @@ export abstract class AbstractPool< * The pool filling boolean status. */ protected get full (): boolean { - return this.workerNodes.length >= this.maxSize + return ( + this.workerNodes.length >= + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) + ) } /** @@ -682,7 +825,8 @@ export abstract class AbstractPool< workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) === -1 ) } @@ -694,6 +838,17 @@ export abstract class AbstractPool< ) } + private isWorkerNodeBusy (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes[workerNodeKey].usage.tasks.executing >= + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! + ) + } + return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + } + private async sendTaskFunctionOperationToWorker ( workerNodeKey: number, message: MessageValue @@ -703,21 +858,17 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - const workerId = this.getWorkerInfo(workerNodeKey).id as number + const workerId = this.getWorkerInfo(workerNodeKey)?.id if ( message.taskFunctionOperationStatus != null && message.workerId === workerId ) { if (message.taskFunctionOperationStatus) { resolve(true) - } else if (!message.taskFunctionOperationStatus) { + } else { reject( new Error( - `Task function operation '${ - message.taskFunctionOperation as string - }' failed on worker ${message.workerId} with error: '${ - message.workerError?.message as string - }'` + `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'` ) ) } @@ -765,10 +916,8 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - }' failed on worker ${ - errorResponse?.workerId as number - } with error: '${ - errorResponse?.workerError?.message as string + }' failed on worker ${errorResponse?.workerId} with error: '${ + errorResponse?.workerError?.message }'` ) ) @@ -780,7 +929,7 @@ export abstract class AbstractPool< } } } - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.registerWorkerMessageListener( workerNodeKey, taskFunctionOperationsListener @@ -792,21 +941,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public hasTaskFunction (name: string): boolean { - for (const workerNode of this.workerNodes) { - if ( - Array.isArray(workerNode.info.taskFunctionNames) && - workerNode.info.taskFunctionNames.includes(name) - ) { - return true - } - } - return false + return this.listTaskFunctionsProperties().some( + taskFunctionProperties => taskFunctionProperties.name === name + ) } /** @inheritDoc */ public async addTaskFunction ( name: string, - fn: TaskFunction + fn: TaskFunction | TaskFunctionObject ): Promise { if (typeof name !== 'string') { throw new TypeError('name argument must be a string') @@ -814,15 +957,26 @@ export abstract class AbstractPool< if (typeof name === 'string' && name.trim().length === 0) { throw new TypeError('name argument must not be an empty string') } - if (typeof fn !== 'function') { - throw new TypeError('fn argument must be a function') + if (typeof fn === 'function') { + fn = { taskFunction: fn } satisfies TaskFunctionObject } + if (typeof fn.taskFunction !== 'function') { + throw new TypeError('taskFunction property must be a function') + } + checkValidPriority(fn.priority) + checkValidWorkerChoiceStrategy(fn.strategy) const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', - taskFunctionName: name, - taskFunction: fn.toString() + taskFunctionProperties: buildTaskFunctionProperties(name, fn), + taskFunction: fn.taskFunction.toString() }) this.taskFunctions.set(name, fn) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies() + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } @@ -835,45 +989,145 @@ export abstract class AbstractPool< } const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'remove', - taskFunctionName: name + taskFunctionProperties: buildTaskFunctionProperties( + name, + this.taskFunctions.get(name) + ) }) - this.deleteTaskFunctionWorkerUsages(name) + for (const workerNode of this.workerNodes) { + workerNode.deleteTaskFunctionWorkerUsage(name) + } this.taskFunctions.delete(name) + this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies( + this.getWorkerChoiceStrategies() + ) + for (const workerNodeKey of this.workerNodes.keys()) { + this.sendStatisticsMessageToWorker(workerNodeKey) + } return opResult } /** @inheritDoc */ - public listTaskFunctionNames (): string[] { + public listTaskFunctionsProperties (): TaskFunctionProperties[] { for (const workerNode of this.workerNodes) { if ( - Array.isArray(workerNode.info.taskFunctionNames) && - workerNode.info.taskFunctionNames.length > 0 + Array.isArray(workerNode.info.taskFunctionsProperties) && + workerNode.info.taskFunctionsProperties.length > 0 ) { - return workerNode.info.taskFunctionNames + return workerNode.info.taskFunctionsProperties } } return [] } + /** + * Gets task function worker choice strategy, if any. + * + * @param name - The task function name. + * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getTaskFunctionWorkerChoiceStrategy = ( + name?: string + ): WorkerChoiceStrategy | undefined => { + name = name ?? DEFAULT_TASK_NAME + const taskFunctionsProperties = this.listTaskFunctionsProperties() + if (name === DEFAULT_TASK_NAME) { + name = taskFunctionsProperties[1]?.name + } + return taskFunctionsProperties.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function worker choice strategy, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function worker choice strategy if the worker node task function worker choice strategy is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionWorkerChoiceStrategy = ( + workerNodeKey: number, + name?: string + ): WorkerChoiceStrategy | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.strategy + } + + /** + * Gets worker node task function priority, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The worker node task function priority if the worker node task function priority is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionPriority = ( + workerNodeKey: number, + name?: string + ): number | undefined => { + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + return + } + name = name ?? DEFAULT_TASK_NAME + if (name === DEFAULT_TASK_NAME) { + name = workerInfo.taskFunctionsProperties?.[1]?.name + } + return workerInfo.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority + } + + /** + * Gets the worker choice strategies registered in this pool. + * + * @returns The worker choice strategies. + */ + private readonly getWorkerChoiceStrategies = + (): Set => { + return new Set([ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.workerChoiceStrategy!, + ...(this.listTaskFunctionsProperties() + .map( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.strategy + ) + .filter( + (strategy: WorkerChoiceStrategy | undefined) => strategy != null + ) as WorkerChoiceStrategy[]) + ]) + } + /** @inheritDoc */ public async setDefaultTaskFunction (name: string): Promise { return await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'default', - taskFunctionName: name + taskFunctionProperties: buildTaskFunctionProperties( + name, + this.taskFunctions.get(name) + ) }) } - private deleteTaskFunctionWorkerUsages (name: string): void { - for (const workerNode of this.workerNodes) { - workerNode.deleteTaskFunctionWorkerUsage(name) - } - } - private shallExecuteTask (workerNodeKey: number): boolean { return ( this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } @@ -881,13 +1135,17 @@ export abstract class AbstractPool< public async execute ( data?: Data, name?: string, - transferList?: TransferListItem[] + transferList?: readonly TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { if (!this.started) { reject(new Error('Cannot execute a task on not started pool')) return } + if (this.destroying) { + reject(new Error('Cannot execute a task on destroying pool')) + return + } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) return @@ -905,19 +1163,31 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() + const workerNodeKey = this.chooseWorkerNode(name) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), + strategy: this.getWorkerNodeTaskFunctionWorkerChoiceStrategy( + workerNodeKey, + name + ), transferList, timestamp, taskId: randomUUID() } - this.promiseResponseMap.set(task.taskId as string, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.set(task.taskId!, { resolve, reject, - workerNodeKey + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) }) if ( this.opts.enableTasksQueue === false || @@ -931,24 +1201,55 @@ export abstract class AbstractPool< }) } - /** @inheritdoc */ - public start (): void { - this.starting = true + /** + * Starts the minimum number of workers. + */ + private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void { + this.startingMinimumNumberOfWorkers = true while ( this.workerNodes.reduce( (accumulator, workerNode) => !workerNode.info.dynamic ? accumulator + 1 : accumulator, 0 - ) < this.numberOfWorkers + ) < this.minimumNumberOfWorkers ) { - this.createAndSetupWorkerNode() + const workerNodeKey = this.createAndSetupWorkerNode() + initWorkerNodeUsage && + this.initWorkerNodeUsage(this.workerNodes[workerNodeKey]) + } + this.startingMinimumNumberOfWorkers = false + } + + /** @inheritdoc */ + public start (): void { + if (this.started) { + throw new Error('Cannot start an already started pool') + } + if (this.starting) { + throw new Error('Cannot start an already starting pool') + } + if (this.destroying) { + throw new Error('Cannot start a destroying pool') } + this.starting = true + this.startMinimumNumberOfWorkers() + this.startTimestamp = performance.now() this.starting = false this.started = true } /** @inheritDoc */ public async destroy (): Promise { + if (!this.started) { + throw new Error('Cannot destroy an already destroyed pool') + } + if (this.starting) { + throw new Error('Cannot destroy an starting pool') + } + if (this.destroying) { + throw new Error('Cannot destroy an already destroying pool') + } + this.destroying = true await Promise.all( this.workerNodes.map(async (_, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) @@ -956,13 +1257,19 @@ export abstract class AbstractPool< ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() + this.readyEventEmitted = false + delete this.startTimestamp + this.destroying = false this.started = false } - protected async sendKillMessageToWorker ( - workerNodeKey: number - ): Promise { + private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (this.workerNodes[workerNodeKey] == null) { + resolve() + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -970,13 +1277,12 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - `Kill message handling failed on worker ${ - message.workerId as number - }` + `Kill message handling failed on worker ${message.workerId}` ) ) } } + // FIXME: should be registered only once this.registerWorkerMessageListener(workerNodeKey, killMessageListener) this.sendToWorker(workerNodeKey, { kill: true }) }) @@ -987,7 +1293,22 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode (workerNodeKey: number): Promise + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + const flushedTasks = this.flushTasksQueue(workerNodeKey) + const workerNode = this.workerNodes[workerNodeKey] + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).tasksFinishedTimeout + ) + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -1000,7 +1321,9 @@ export abstract class AbstractPool< } /** - * Should return whether the worker is the main worker or not. + * Returns whether the worker is the main worker or not. + * + * @returns `true` if the worker is the main worker, `false` otherwise. */ protected abstract isMain (): boolean @@ -1015,22 +1338,33 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategiesContext, + workerUsage, + task + ) } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - task.name as string - ) != null + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) != + null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategiesContext, + taskFunctionWorkerUsage, + task + ) } } @@ -1045,26 +1379,50 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { + let needWorkerChoiceStrategiesUpdate = false + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) + updateTaskStatisticsWorkerUsage(workerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategiesContext, + workerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategiesContext, + workerUsage, + message + ) + needWorkerChoiceStrategiesUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + message.taskPerformance!.name ) != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) - this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)! + updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategiesContext, + taskFunctionWorkerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategiesContext, + taskFunctionWorkerUsage, + message + ) + needWorkerChoiceStrategiesUpdate = true + } + if (needWorkerChoiceStrategiesUpdate) { + this.workerChoiceStrategiesContext?.update(workerNodeKey) } } @@ -1078,106 +1436,31 @@ export abstract class AbstractPool< const workerInfo = this.getWorkerInfo(workerNodeKey) return ( workerInfo != null && - Array.isArray(workerInfo.taskFunctionNames) && - workerInfo.taskFunctionNames.length > 2 - ) - } - - private updateTaskStatisticsWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const workerTaskStatistics = workerUsage.tasks - if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing > 0 - ) { - --workerTaskStatistics.executing - } - if (message.workerError == null) { - ++workerTaskStatistics.executed - } else { - ++workerTaskStatistics.failed - } - } - - private updateRunTimeWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - updateMeasurementStatistics( - workerUsage.runTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0 + Array.isArray(workerInfo.taskFunctionsProperties) && + workerInfo.taskFunctionsProperties.length > 2 ) } - private updateWaitTimeWorkerUsage ( - workerUsage: WorkerUsage, - task: Task - ): void { - const timestamp = performance.now() - const taskWaitTime = timestamp - (task.timestamp ?? timestamp) - updateMeasurementStatistics( - workerUsage.waitTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime - ) - } - - private updateEluWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - updateMeasurementStatistics( - workerUsage.elu.active, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0 - ) - updateMeasurementStatistics( - workerUsage.elu.idle, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0 - ) - if (eluTaskStatisticsRequirements.aggregate) { - if (message.taskPerformance?.elu != null) { - if (workerUsage.elu.utilization != null) { - workerUsage.elu.utilization = - (workerUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } else { - workerUsage.elu.utilization = message.taskPerformance.elu.utilization - } - } - } - } - /** * Chooses a worker node for the next task. * - * The default worker choice strategy uses a round robin algorithm to distribute the tasks. - * + * @param name - The task function name. * @returns The chosen worker node key */ - private chooseWorkerNode (): number { + private chooseWorkerNode (name?: string): number { if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage === + true ) { return workerNodeKey } } - return this.workerChoiceStrategyContext.execute() + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + return this.workerChoiceStrategiesContext!.execute( + this.getTaskFunctionWorkerChoiceStrategy(name) + ) } /** @@ -1185,9 +1468,7 @@ export abstract class AbstractPool< * * @returns Whether to create a dynamic worker or not. */ - private shallCreateDynamicWorker (): boolean { - return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() - } + protected abstract shallCreateDynamicWorker (): boolean /** * Sends a message to worker given its worker node key. @@ -1199,15 +1480,46 @@ export abstract class AbstractPool< protected abstract sendToWorker ( workerNodeKey: number, message: MessageValue, - transferList?: TransferListItem[] + transferList?: readonly TransferListItem[] ): void /** - * Creates a new worker. + * Initializes the worker node usage with sensible default values gathered during runtime. * - * @returns Newly created worker. + * @param workerNode - The worker node. */ - protected abstract createWorker (): Worker + private initWorkerNodeUsage (workerNode: IWorkerNode): void { + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate === true + ) { + workerNode.usage.runTime.aggregate = min( + ...this.workerNodes.map( + workerNode => workerNode.usage.runTime.aggregate ?? Infinity + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .waitTime.aggregate === true + ) { + workerNode.usage.waitTime.aggregate = min( + ...this.workerNodes.map( + workerNode => workerNode.usage.waitTime.aggregate ?? Infinity + ) + ) + } + if ( + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu + .aggregate === true + ) { + workerNode.usage.elu.active.aggregate = min( + ...this.workerNodes.map( + workerNode => workerNode.usage.elu.active.aggregate ?? Infinity + ) + ) + } + } /** * Creates a new, completely set up worker node. @@ -1215,41 +1527,61 @@ export abstract class AbstractPool< * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { - const worker = this.createWorker() - - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) - worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) - worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - const workerInfo = this.getWorkerInfo(workerNodeKey) - workerInfo.ready = false + const workerNode = this.createWorkerNode() + workerNode.registerWorkerEventHandler( + 'online', + this.opts.onlineHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'message', + this.opts.messageHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'error', + this.opts.errorHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('error', (error: Error) => { + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - this.workerNodes[workerNodeKey].closeChannel() if ( this.started && - !this.starting && + !this.destroying && this.opts.restartWorkerOnError === true ) { - if (workerInfo.dynamic) { + if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() - } else { - this.createAndSetupWorkerNode() + } else if (!this.startingMinimumNumberOfWorkers) { + this.startMinimumNumberOfWorkers(true) } } - if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + if ( + this.started && + !this.destroying && + this.opts.enableTasksQueue === true + ) { + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.terminate().catch((error: unknown) => { + this.emitter?.emit(PoolEvents.error, error) + }) }) - worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) - worker.once('exit', () => { - this.removeWorkerNode(worker) + workerNode.registerWorkerEventHandler( + 'exit', + this.opts.exitHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('exit', () => { + this.removeWorkerNode(workerNode) + if ( + this.started && + !this.startingMinimumNumberOfWorkers && + !this.destroying + ) { + this.startMinimumNumberOfWorkers(true) + } }) - - const workerNodeKey = this.addWorkerNode(worker) - + const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) - return workerNodeKey } @@ -1265,7 +1597,7 @@ export abstract class AbstractPool< const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) - const workerUsage = this.workerNodes[localWorkerNodeKey].usage + const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || @@ -1276,35 +1608,41 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - // Flag the worker as not ready immediately + // Flag the worker node as not ready immediately this.flagWorkerNodeAsNotReady(localWorkerNodeKey) - this.destroyWorkerNode(localWorkerNodeKey).catch(error => { + this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) if (this.taskFunctions.size > 0) { - for (const [taskFunctionName, taskFunction] of this.taskFunctions) { + for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) { this.sendTaskFunctionOperationToWorker(workerNodeKey, { taskFunctionOperation: 'add', - taskFunctionName, - taskFunction: taskFunction.toString() - }).catch(error => { + taskFunctionProperties: buildTaskFunctionProperties( + taskFunctionName, + taskFunctionObject + ), + taskFunction: taskFunctionObject.taskFunction.toString() + }).catch((error: unknown) => { this.emitter?.emit(PoolEvents.error, error) }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || - this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady === + true || + this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage === + true ) { - workerInfo.ready = true + workerNode.info.ready = true } + this.initWorkerNodeUsage(workerNode) this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } @@ -1358,7 +1696,7 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener( workerNodeKey, - this.workerMessageListener.bind(this) + this.workerMessageListener ) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) @@ -1366,15 +1704,15 @@ export abstract class AbstractPool< this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { - this.workerNodes[workerNodeKey].addEventListener( - 'emptyqueue', - this.handleEmptyQueueEvent as EventListener + this.workerNodes[workerNodeKey].on( + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { - this.workerNodes[workerNodeKey].addEventListener( - 'backpressure', - this.handleBackPressureEvent as EventListener + this.workerNodes[workerNodeKey].on( + 'backPressure', + this.handleWorkerNodeBackPressureEvent ) } } @@ -1396,19 +1734,36 @@ export abstract class AbstractPool< this.sendToWorker(workerNodeKey, { statistics: { runTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime.aggregate, - elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu.aggregate + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .runTime.aggregate ?? false, + elu: + this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements() + .elu.aggregate ?? false } }) } - private redistributeQueuedTasks (workerNodeKey: number): void { - while (this.tasksQueueSize(workerNodeKey) > 0) { + private cannotStealTask (): boolean { + return this.workerNodes.length <= 1 || this.info.queuedTasks === 0 + } + + private handleTask (workerNodeKey: number, task: Task): void { + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + } + + private redistributeQueuedTasks (sourceWorkerNodeKey: number): void { + if (sourceWorkerNodeKey === -1 || this.cannotStealTask()) { + return + } + while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return workerNode.info.ready && + return sourceWorkerNodeKey !== workerNodeKey && + workerNode.info.ready && workerNode.usage.tasks.queued < workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey @@ -1416,12 +1771,11 @@ export abstract class AbstractPool< }, 0 ) - const task = this.dequeueTask(workerNodeKey) as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + this.handleTask( + destinationWorkerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.dequeueTask(sourceWorkerNodeKey)! + ) } } @@ -1430,6 +1784,7 @@ export abstract class AbstractPool< taskName: string ): void { const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (workerNode?.usage != null) { ++workerNode.usage.tasks.stolen } @@ -1437,19 +1792,129 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ++workerNode.getTaskFunctionWorkerUsage(taskName)!.tasks.stolen + } + } + + private updateTaskSequentiallyStolenStatisticsWorkerUsage ( + workerNodeKey: number, + taskName: string, + previousTaskName?: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (workerNode?.usage != null) { + ++workerNode.usage.tasks.sequentiallyStolen + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! + if ( + taskFunctionWorkerUsage.tasks.sequentiallyStolen === 0 || + (previousTaskName != null && + previousTaskName === taskName && + taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) + ) { + ++taskFunctionWorkerUsage.tasks.sequentiallyStolen + } else if (taskFunctionWorkerUsage.tasks.sequentiallyStolen > 0) { + taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 + } + } + } + + private resetTaskSequentiallyStolenStatisticsWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (workerNode?.usage != null) { + workerNode.usage.tasks.sequentiallyStolen = 0 + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + workerNode.getTaskFunctionWorkerUsage(taskName) != null + ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage( taskName - ) as WorkerUsage - ++taskFunctionWorkerUsage.tasks.stolen + )!.tasks.sequentiallyStolen = 0 } } - private readonly handleEmptyQueueEvent = ( - event: CustomEvent + private readonly handleWorkerNodeIdleEvent = ( + eventDetail: WorkerNodeEventDetail, + previousStolenTask?: Task ): void => { - const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( - event.detail.workerId - ) + const { workerNodeKey } = eventDetail + if (workerNodeKey == null) { + throw new Error( + "WorkerNode event detail 'workerNodeKey' property must be defined" + ) + } + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) + } + if ( + this.cannotStealTask() || + (this.info.stealingWorkerNodes ?? 0) > + Math.floor(this.workerNodes.length / 2) + ) { + if (previousStolenTask != null) { + workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) + } + return + } + const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + if ( + previousStolenTask != null && + (workerNodeTasksUsage.executing > 0 || + this.tasksQueueSize(workerNodeKey) > 0) + ) { + workerInfo.stealing = false + this.resetTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + previousStolenTask.name! + ) + return + } + workerInfo.stealing = true + const stolenTask = this.workerNodeStealTask(workerNodeKey) + if (stolenTask != null) { + this.updateTaskSequentiallyStolenStatisticsWorkerUsage( + workerNodeKey, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name!, + previousStolenTask?.name + ) + } + sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) + .then(() => { + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) + return undefined + }) + .catch((error: unknown) => { + this.emitter?.emit(PoolEvents.error, error) + }) + } + + private readonly workerNodeStealTask = ( + workerNodeKey: number + ): Task | undefined => { const workerNodes = this.workerNodes .slice() .sort( @@ -1457,34 +1922,41 @@ export abstract class AbstractPool< workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued ) const sourceWorkerNode = workerNodes.find( - workerNode => - workerNode.info.ready && - workerNode.info.id !== event.detail.workerId && - workerNode.usage.tasks.queued > 0 + (sourceWorkerNode, sourceWorkerNodeKey) => + sourceWorkerNode.info.ready && + !sourceWorkerNode.info.stealing && + sourceWorkerNodeKey !== workerNodeKey && + sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } - this.updateTaskStolenStatisticsWorkerUsage( - destinationWorkerNodeKey, - task.name as string - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! + this.handleTask(workerNodeKey, task) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) + return task } } - private readonly handleBackPressureEvent = ( - event: CustomEvent + private readonly handleWorkerNodeBackPressureEvent = ( + eventDetail: WorkerNodeEventDetail ): void => { + if ( + this.cannotStealTask() || + this.hasBackPressure() || + (this.info.stealingWorkerNodes ?? 0) > + Math.floor(this.workerNodes.length / 2) + ) { + return + } const sizeOffset = 1 - if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } + const { workerId } = eventDetail const sourceWorkerNode = - this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)] + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] const workerNodes = this.workerNodes .slice() .sort( @@ -1495,20 +1967,25 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && - workerNode.info.id !== event.detail.workerId && + !workerNode.info.stealing && + workerNode.info.id !== workerId && workerNode.usage.tasks.queued < - (this.opts.tasksQueueOptions?.size as number) - sizeOffset + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.size! - sizeOffset ) { - const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo == null) { + throw new Error( + `Worker node with key '${workerNodeKey}' not found in pool` + ) } - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + workerInfo.stealing = true + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.dequeueLastPrioritizedTask()! + this.handleTask(workerNodeKey, task) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) + workerInfo.stealing = false } } } @@ -1516,62 +1993,101 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. */ - protected workerMessageListener (message: MessageValue): void { + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctionNames != null) { + const { workerId, ready, taskId, taskFunctionsProperties } = message + if (ready != null && taskFunctionsProperties != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.taskId != null) { + } else if (taskFunctionsProperties != null) { + // Task function properties message received from worker + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) + } + } else if (taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctionNames != null) { - // Task function names message received from worker - this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctionNames = message.taskFunctionNames } } - private handleWorkerReadyResponse (message: MessageValue): void { - if (message.ready === false) { - throw new Error( - `Worker ${message.workerId as number} failed to initialize` - ) - } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) - ) - workerInfo.ready = message.ready as boolean - workerInfo.taskFunctionNames = message.taskFunctionNames - if (this.ready) { + private checkAndEmitReadyEvent (): void { + if (!this.readyEventEmitted && this.ready) { this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } + private handleWorkerReadyResponse (message: MessageValue): void { + const { workerId, ready, taskFunctionsProperties } = message + if (ready == null || !ready) { + throw new Error(`Worker ${workerId} failed to initialize`) + } + const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.ready = ready + workerNode.info.taskFunctionsProperties = taskFunctionsProperties + this.sendStatisticsMessageToWorker(workerNodeKey) + this.checkAndEmitReadyEvent() + } + private handleTaskExecutionResponse (message: MessageValue): void { - const { taskId, workerError, data } = message - const promiseResponse = this.promiseResponseMap.get(taskId as string) + const { workerId, taskId, workerError, data } = message + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse + const workerNode = this.workerNodes[workerNodeKey] if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - promiseResponse.reject(workerError.message) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) } else { - promiseResponse.resolve(data as Response) + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) } - const workerNodeKey = promiseResponse.workerNodeKey + asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.workerChoiceStrategyContext.update(workerNodeKey) - this.promiseResponseMap.delete(taskId as string) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.delete(taskId!) + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode?.emit('taskFinished', taskId) if ( this.opts.enableTasksQueue === true && - this.tasksQueueSize(workerNodeKey) > 0 && - this.workerNodes[workerNodeKey].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + !this.destroying && + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + workerNode != null ) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + const workerNodeTasksUsage = workerNode.usage.tasks + if ( + this.tasksQueueSize(workerNodeKey) > 0 && + workerNodeTasksUsage.executing < + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! + ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) + } + if ( + workerNodeTasksUsage.executing === 0 && + this.tasksQueueSize(workerNodeKey) === 0 && + workerNodeTasksUsage.sequentiallyStolen === 0 + ) { + workerNode.emit('idle', { + workerId, + workerNodeKey + }) + } } } } @@ -1588,13 +2104,10 @@ export abstract class AbstractPool< } } - private checkAndEmitDynamicWorkerCreationEvents (): void { - if (this.type === PoolTypes.dynamic) { - if (this.full) { - this.emitter?.emit(PoolEvents.full, this.info) - } - } - } + /** + * Emits dynamic worker creation events. + */ + protected abstract checkAndEmitDynamicWorkerCreationEvents (): void /** * Gets the worker information given its worker node key. @@ -1602,57 +2115,80 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { return this.workerNodes[workerNodeKey]?.info } /** - * Adds the given worker in the pool worker nodes. + * Creates a worker node. * - * @param worker - The worker. - * @returns The added worker node key. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + * @returns The created worker node. */ - private addWorkerNode (worker: Worker): number { + private createWorkerNode (): IWorkerNode { const workerNode = new WorkerNode( - worker, - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.worker, + this.filePath, + { + env: this.opts.env, + workerOptions: this.opts.workerOptions, + tasksQueueBackPressureSize: + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).size, + tasksQueueBucketSize: + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 + } ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true } + return workerNode + } + + /** + * Adds the given worker node in the pool worker nodes. + * + * @param workerNode - The worker node. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + */ + private addWorkerNode (workerNode: IWorkerNode): number { this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey === -1) { throw new Error('Worker added not found in worker nodes') } return workerNodeKey } + private checkAndEmitEmptyEvent (): void { + if (this.empty) { + this.emitter?.emit(PoolEvents.empty, this.info) + this.readyEventEmitted = false + } + } + /** - * Removes the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext.remove(workerNodeKey) + this.workerChoiceStrategiesContext?.remove(workerNodeKey) } + this.checkAndEmitEmptyEvent() } protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { - this.getWorkerInfo(workerNodeKey).ready = false - } - - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) + const workerInfo = this.getWorkerInfo(workerNodeKey) + if (workerInfo != null) { + workerInfo.ready = false + } } private hasBackPressure (): boolean { @@ -1690,18 +2226,19 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - protected flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void { - for (const [workerNodeKey] of this.workerNodes.entries()) { + for (const workerNodeKey of this.workerNodes.keys()) { this.flushTasksQueue(workerNodeKey) } }