X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8e32218dfd521567663a27b09db1b2747d74b582;hb=1087637e65991bbcf07248b277d0fa99594a1528;hp=56be9a1a9ffe0ca22131b40d9308adab87b9894a;hpb=c3719753af0a9be03abf722a7543495359e817b5;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 56be9a1a..8e32218d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -7,10 +7,9 @@ import type { MessageValue, PromiseResponseWrapper, Task -} from '../utility-types' +} from '../utility-types.js' import { DEFAULT_TASK_NAME, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, average, exponentialDelay, @@ -21,9 +20,9 @@ import { min, round, sleep -} from '../utils' -import { KillBehaviors } from '../worker/worker-options' -import type { TaskFunction } from '../worker/task-functions' +} from '../utils.js' +import { KillBehaviors } from '../worker/worker-options.js' +import type { TaskFunction } from '../worker/task-functions.js' import { type IPool, PoolEvents, @@ -32,32 +31,34 @@ import { type PoolType, PoolTypes, type TasksQueueOptions -} from './pool' +} from './pool.js' import type { IWorker, IWorkerNode, - TaskStatistics, WorkerInfo, WorkerNodeEventDetail, - WorkerType, - WorkerUsage -} from './worker' + WorkerType +} from './worker.js' import { - type MeasurementStatisticsRequirements, Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, type WorkerChoiceStrategyOptions -} from './selection-strategies/selection-strategies-types' -import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context' -import { version } from './version' -import { WorkerNode } from './worker-node' +} from './selection-strategies/selection-strategies-types.js' +import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js' +import { version } from './version.js' +import { WorkerNode } from './worker-node.js' import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics -} from './utils' + getDefaultTasksQueueOptions, + updateEluWorkerUsage, + updateRunTimeWorkerUsage, + updateTaskStatisticsWorkerUsage, + updateWaitTimeWorkerUsage, + waitWorkerNodeEvents +} from './utils.js' /** * Base class that implements some shared logic for all poolifier pools. @@ -77,11 +78,6 @@ export abstract class AbstractPool< /** @inheritDoc */ public emitter?: EventEmitterAsyncResource - /** - * Dynamic pool maximum size property placeholder. - */ - protected readonly max?: number - /** * The task execution response promise map: * - `key`: The message id of each submitted task. @@ -132,22 +128,25 @@ export abstract class AbstractPool< /** * Constructs a new poolifier pool. * - * @param numberOfWorkers - Number of workers that this pool should manage. + * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages. * @param filePath - Path to the worker file. * @param opts - Options for the pool. + * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages. */ public constructor ( - protected readonly numberOfWorkers: number, + protected readonly minimumNumberOfWorkers: number, protected readonly filePath: string, - protected readonly opts: PoolOptions + protected readonly opts: PoolOptions, + protected readonly maximumNumberOfWorkers?: number ) { if (!this.isMain()) { throw new Error( 'Cannot start a pool from a worker with the same type as the pool' ) } + this.checkPoolType() checkFilePath(this.filePath) - this.checkNumberOfWorkers(this.numberOfWorkers) + this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) @@ -182,20 +181,28 @@ export abstract class AbstractPool< this.startTimestamp = performance.now() } - private checkNumberOfWorkers (numberOfWorkers: number): void { - if (numberOfWorkers == null) { + private checkPoolType (): void { + if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) { + throw new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + } + } + + private checkMinimumNumberOfWorkers (minimumNumberOfWorkers: number): void { + if (minimumNumberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' ) - } else if (!Number.isSafeInteger(numberOfWorkers)) { + } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) { throw new TypeError( 'Cannot instantiate a pool with a non safe integer number of workers' ) - } else if (numberOfWorkers < 0) { + } else if (minimumNumberOfWorkers < 0) { throw new RangeError( 'Cannot instantiate a pool with a negative number of workers' ) - } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) { + } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) { throw new RangeError('Cannot instantiate a fixed pool with zero worker') } } @@ -203,25 +210,26 @@ export abstract class AbstractPool< private checkPoolOptions (opts: PoolOptions): void { if (isPlainObject(opts)) { this.opts.startWorkers = opts.startWorkers ?? true - checkValidWorkerChoiceStrategy( - opts.workerChoiceStrategy as WorkerChoiceStrategy - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy!) this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategyOptions( - opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + opts.workerChoiceStrategyOptions! ) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...opts.workerChoiceStrategyOptions + if (opts.workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions } this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { - checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + checkValidTasksQueueOptions(opts.tasksQueueOptions!) this.opts.tasksQueueOptions = this.buildTasksQueueOptions( - opts.tasksQueueOptions as TasksQueueOptions + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + opts.tasksQueueOptions! ) } } else { @@ -240,25 +248,10 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } - if ( - workerChoiceStrategyOptions?.retries != null && - !Number.isSafeInteger(workerChoiceStrategyOptions.retries) - ) { - throw new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - } - if ( - workerChoiceStrategyOptions?.retries != null && - workerChoiceStrategyOptions.retries < 0 - ) { - throw new RangeError( - `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero` - ) - } if ( workerChoiceStrategyOptions?.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize + Object.keys(workerChoiceStrategyOptions.weights).length !== + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) ) { throw new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -290,12 +283,13 @@ export abstract class AbstractPool< worker: this.worker, started: this.started, ready: this.ready, - strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, - minSize: this.minSize, - maxSize: this.maxSize, - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + strategy: this.opts.workerChoiceStrategy!, + minSize: this.minimumNumberOfWorkers, + maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( @@ -305,6 +299,13 @@ export abstract class AbstractPool< : accumulator, 0 ), + ...(this.opts.enableTasksQueue === true && { + stealingWorkerNodes: this.workerNodes.reduce( + (accumulator, workerNode) => + workerNode.info.stealing ? accumulator + 1 : accumulator, + 0 + ) + }), busyWorkerNodes: this.workerNodes.reduce( (accumulator, _workerNode, workerNodeKey) => this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, @@ -349,7 +350,7 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && { runTime: { minimum: round( @@ -366,7 +367,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -378,7 +379,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -392,7 +393,7 @@ export abstract class AbstractPool< }) } }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { waitTime: { minimum: round( @@ -409,7 +410,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -421,7 +422,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -449,7 +450,7 @@ export abstract class AbstractPool< ? accumulator + 1 : accumulator, 0 - ) >= this.minSize + ) >= this.minimumNumberOfWorkers ) } @@ -460,7 +461,8 @@ export abstract class AbstractPool< */ private get utilization (): number { const poolTimeCapacity = - (performance.now() - this.startTimestamp) * this.maxSize + (performance.now() - this.startTimestamp) * + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.runTime?.aggregate ?? 0), @@ -486,20 +488,6 @@ export abstract class AbstractPool< */ protected abstract get worker (): WorkerType - /** - * The pool minimum size. - */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** - * The pool maximum size. - */ - protected get maxSize (): number { - return this.max ?? this.numberOfWorkers - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -516,18 +504,6 @@ export abstract class AbstractPool< } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -564,9 +540,8 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...workerChoiceStrategyOptions + if (workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions @@ -584,7 +559,8 @@ export abstract class AbstractPool< this.flushTasksQueues() } this.opts.enableTasksQueue = enable - this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setTasksQueueOptions(tasksQueueOptions!) } /** @inheritDoc */ @@ -593,7 +569,8 @@ export abstract class AbstractPool< checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) - this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setTasksQueueSize(this.opts.tasksQueueOptions.size!) if (this.opts.tasksQueueOptions.taskStealing === true) { this.unsetTaskStealing() this.setTaskStealing() @@ -615,12 +592,9 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ), ...tasksQueueOptions } } @@ -633,18 +607,15 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent - ) + this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } } @@ -653,7 +624,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -662,7 +633,7 @@ export abstract class AbstractPool< for (const [workerNodeKey] of this.workerNodes.entries()) { this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -673,7 +644,10 @@ export abstract class AbstractPool< * The pool filling boolean status. */ protected get full (): boolean { - return this.workerNodes.length >= this.maxSize + return ( + this.workerNodes.length >= + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) + ) } /** @@ -695,7 +669,8 @@ export abstract class AbstractPool< workerNode => workerNode.info.ready && workerNode.usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) === -1 ) } @@ -711,7 +686,8 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes[workerNodeKey].usage.tasks.executing >= - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 @@ -726,7 +702,8 @@ export abstract class AbstractPool< message: MessageValue ): void => { this.checkMessageWorkerId(message) - const workerId = this.getWorkerInfo(workerNodeKey).id as number + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const workerId = this.getWorkerInfo(workerNodeKey).id! if ( message.taskFunctionOperationStatus != null && message.workerId === workerId @@ -738,8 +715,10 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion }' failed on worker ${message.workerId} with error: '${ - message.workerError?.message as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + message.workerError!.message }'` ) ) @@ -788,10 +767,11 @@ export abstract class AbstractPool< new Error( `Task function operation '${ message.taskFunctionOperation as string - }' failed on worker ${ - errorResponse?.workerId as number - } with error: '${ - errorResponse?.workerError?.message as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + }' failed on worker ${errorResponse! + .workerId!} with error: '${ + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + errorResponse!.workerError!.message }'` ) ) @@ -896,7 +876,8 @@ export abstract class AbstractPool< return ( this.tasksQueueSize(workerNodeKey) === 0 && this.workerNodes[workerNodeKey].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) } @@ -941,7 +922,8 @@ export abstract class AbstractPool< timestamp, taskId: randomUUID() } - this.promiseResponseMap.set(task.taskId as string, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.set(task.taskId!, { resolve, reject, workerNodeKey, @@ -981,7 +963,7 @@ export abstract class AbstractPool< (accumulator, workerNode) => !workerNode.info.dynamic ? accumulator + 1 : accumulator, 0 - ) < this.numberOfWorkers + ) < this.minimumNumberOfWorkers ) { this.createAndSetupWorkerNode() } @@ -1014,10 +996,12 @@ export abstract class AbstractPool< this.started = false } - protected async sendKillMessageToWorker ( - workerNodeKey: number - ): Promise { + private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { + if (this.workerNodes?.[workerNodeKey] == null) { + resolve() + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -1025,9 +1009,8 @@ export abstract class AbstractPool< } else if (message.kill === 'failure') { reject( new Error( - `Kill message handling failed on worker ${ - message.workerId as number - }` + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + `Kill message handling failed on worker ${message.workerId!}` ) ) } @@ -1043,7 +1026,22 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode (workerNodeKey: number): Promise + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + const flushedTasks = this.flushTasksQueue(workerNodeKey) + const workerNode = this.workerNodes[workerNodeKey] + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).tasksFinishedTimeout + ) + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -1074,19 +1072,29 @@ export abstract class AbstractPool< if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + task + ) } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && - this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - task.name as string - ) != null + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) != + null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(task.name!)! ++taskFunctionWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + task + ) } } @@ -1101,26 +1109,49 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { + let needWorkerChoiceStrategyUpdate = false if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) + updateTaskStatisticsWorkerUsage(workerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + message.taskPerformance!.name ) != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage( - message.taskPerformance?.name as string - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) - this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)! + updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true + } + if (needWorkerChoiceStrategyUpdate) { + this.workerChoiceStrategyContext.update(workerNodeKey) } } @@ -1139,84 +1170,6 @@ export abstract class AbstractPool< ) } - private updateTaskStatisticsWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const workerTaskStatistics = workerUsage.tasks - if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing > 0 - ) { - --workerTaskStatistics.executing - } - if (message.workerError == null) { - ++workerTaskStatistics.executed - } else { - ++workerTaskStatistics.failed - } - } - - private updateRunTimeWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - updateMeasurementStatistics( - workerUsage.runTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0 - ) - } - - private updateWaitTimeWorkerUsage ( - workerUsage: WorkerUsage, - task: Task - ): void { - const timestamp = performance.now() - const taskWaitTime = timestamp - (task.timestamp ?? timestamp) - updateMeasurementStatistics( - workerUsage.waitTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime - ) - } - - private updateEluWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - updateMeasurementStatistics( - workerUsage.elu.active, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0 - ) - updateMeasurementStatistics( - workerUsage.elu.idle, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0 - ) - if (eluTaskStatisticsRequirements.aggregate) { - if (message.taskPerformance?.elu != null) { - if (workerUsage.elu.utilization != null) { - workerUsage.elu.utilization = - (workerUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } else { - workerUsage.elu.utilization = message.taskPerformance.elu.utilization - } - } - } - } - /** * Chooses a worker node for the next task. * @@ -1241,9 +1194,7 @@ export abstract class AbstractPool< * * @returns Whether to create a dynamic worker or not. */ - private shallCreateDynamicWorker (): boolean { - return this.type === PoolTypes.dynamic && !this.full && this.internalBusy() - } + protected abstract shallCreateDynamicWorker (): boolean /** * Sends a message to worker given its worker node key. @@ -1278,33 +1229,36 @@ export abstract class AbstractPool< this.opts.errorHandler ?? EMPTY_FUNCTION ) workerNode.registerWorkerEventHandler('error', (error: Error) => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(workerNode.worker) - this.flagWorkerNodeAsNotReady(workerNodeKey) - const workerInfo = this.getWorkerInfo(workerNodeKey) + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - this.workerNodes[workerNodeKey].closeChannel() if ( this.started && - !this.starting && !this.destroying && this.opts.restartWorkerOnError === true ) { - if (workerInfo.dynamic) { + if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else { this.createAndSetupWorkerNode() } } - if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + if ( + this.started && + !this.destroying && + this.opts.enableTasksQueue === true + ) { + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + workerNode?.terminate().catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) }) workerNode.registerWorkerEventHandler( 'exit', this.opts.exitHandler ?? EMPTY_FUNCTION ) workerNode.registerOnceWorkerEventHandler('exit', () => { - this.removeWorkerNode(workerNode.worker) + this.removeWorkerNode(workerNode) }) const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) @@ -1341,7 +1295,6 @@ export abstract class AbstractPool< }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) this.sendToWorker(workerNodeKey, { checkActive: true }) @@ -1356,12 +1309,13 @@ export abstract class AbstractPool< }) } } - workerInfo.dynamic = true + const workerNode = this.workerNodes[workerNodeKey] + workerNode.info.dynamic = true if ( this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { - workerInfo.ready = true + workerNode.info.ready = true } this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey @@ -1425,14 +1379,14 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { this.workerNodes[workerNodeKey].on( - 'idleWorkerNode', - this.handleIdleWorkerNodeEvent + 'idle', + this.handleWorkerNodeIdleEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent + this.handleWorkerNodeBackPressureEvent ) } } @@ -1462,6 +1416,10 @@ export abstract class AbstractPool< }) } + private cannotStealTask (): boolean { + return this.workerNodes.length <= 1 || this.info.queuedTasks === 0 + } + private handleTask (workerNodeKey: number, task: Task): void { if (this.shallExecuteTask(workerNodeKey)) { this.executeTask(workerNodeKey, task) @@ -1471,7 +1429,7 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { - if (this.workerNodes.length <= 1) { + if (workerNodeKey === -1 || this.cannotStealTask()) { return } while (this.tasksQueueSize(workerNodeKey) > 0) { @@ -1487,7 +1445,8 @@ export abstract class AbstractPool< ) this.handleTask( destinationWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.dequeueTask(workerNodeKey)! ) } } @@ -1504,9 +1463,9 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! ++taskFunctionWorkerUsage.tasks.stolen } } @@ -1529,9 +1488,9 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! ++taskFunctionWorkerUsage.tasks.sequentiallyStolen } } @@ -1554,26 +1513,33 @@ export abstract class AbstractPool< this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null ) { - const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage( - taskName - ) as WorkerUsage + const taskFunctionWorkerUsage = + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerNode.getTaskFunctionWorkerUsage(taskName)! taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0 } } - private readonly handleIdleWorkerNodeEvent = ( + private readonly handleWorkerNodeIdleEvent = ( eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - if (this.workerNodes.length <= 1) { - return - } const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( - 'WorkerNode event detail workerNodeKey attribute must be defined' + 'WorkerNode event detail workerNodeKey property must be defined' ) } + if ( + this.cannotStealTask() || + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) + ) { + if (previousStolenTask != null) { + this.getWorkerInfo(workerNodeKey).stealing = false + } + return + } const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks if ( previousStolenTask != null && @@ -1581,8 +1547,10 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { + this.getWorkerInfo(workerNodeKey).stealing = false + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion for (const taskName of this.workerNodes[workerNodeKey].info - .taskFunctionNames as string[]) { + .taskFunctionNames!) { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, taskName @@ -1591,15 +1559,17 @@ export abstract class AbstractPool< this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } + this.getWorkerInfo(workerNodeKey).stealing = true const stolenTask = this.workerNodeStealTask(workerNodeKey) if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && stolenTask != null ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const taskFunctionTasksWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskFunctionWorkerUsage(stolenTask.name as string) - ?.tasks as TaskStatistics + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks if ( taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || (previousStolenTask != null && @@ -1608,18 +1578,20 @@ export abstract class AbstractPool< ) { this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, - stolenTask.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name! ) } else { this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( workerNodeKey, - stolenTask.name as string + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + stolenTask.name! ) } } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) + this.handleWorkerNodeIdleEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1637,30 +1609,35 @@ export abstract class AbstractPool< const sourceWorkerNode = workerNodes.find( (sourceWorkerNode, sourceWorkerNodeKey) => sourceWorkerNode.info.ready && + !sourceWorkerNode.info.stealing && sourceWorkerNodeKey !== workerNodeKey && sourceWorkerNode.usage.tasks.queued > 0 ) if (sourceWorkerNode != null) { - const task = sourceWorkerNode.popTask() as Task + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) return task } } - private readonly handleBackPressureEvent = ( + private readonly handleWorkerNodeBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { - if (this.workerNodes.length <= 1) { + if ( + this.cannotStealTask() || + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.info.stealingWorkerNodes! > Math.floor(this.workerNodes.length / 2) + ) { return } const { workerId } = eventDetail const sizeOffset = 1 - if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + if (this.opts.tasksQueueOptions!.size! <= sizeOffset) { return } const sourceWorkerNode = @@ -1675,16 +1652,19 @@ export abstract class AbstractPool< if ( sourceWorkerNode.usage.tasks.queued > 0 && workerNode.info.ready && + !workerNode.info.stealing && workerNode.info.id !== workerId && workerNode.usage.tasks.queued < - (this.opts.tasksQueueOptions?.size as number) - sizeOffset + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.size! - sizeOffset ) { - const task = sourceWorkerNode.popTask() as Task + this.getWorkerInfo(workerNodeKey).stealing = true + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const task = sourceWorkerNode.popTask()! this.handleTask(workerNodeKey, task) - this.updateTaskStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) + this.getWorkerInfo(workerNodeKey).stealing = false } } } @@ -1713,25 +1693,27 @@ export abstract class AbstractPool< private handleWorkerReadyResponse (message: MessageValue): void { const { workerId, ready, taskFunctionNames } = message - if (ready === false) { - throw new Error(`Worker ${workerId as number} failed to initialize`) + if (ready == null || !ready) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + throw new Error(`Worker ${workerId!} failed to initialize`) } - const workerInfo = this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(workerId) - ) - workerInfo.ready = ready as boolean - workerInfo.taskFunctionNames = taskFunctionNames + const workerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + workerNode.info.ready = ready + workerNode.info.taskFunctionNames = taskFunctionNames if (!this.readyEventEmitted && this.ready) { - this.readyEventEmitted = true this.emitter?.emit(PoolEvents.ready, this.info) + this.readyEventEmitted = true } } private handleTaskExecutionResponse (message: MessageValue): void { const { workerId, taskId, workerError, data } = message - const promiseResponse = this.promiseResponseMap.get(taskId as string) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const promiseResponse = this.promiseResponseMap.get(taskId!) if (promiseResponse != null) { const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse + const workerNode = this.workerNodes[workerNodeKey] if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) asyncResource != null @@ -1748,27 +1730,28 @@ export abstract class AbstractPool< } asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.workerChoiceStrategyContext.update(workerNodeKey) - this.promiseResponseMap.delete(taskId as string) - if (this.opts.enableTasksQueue === true) { - const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.promiseResponseMap.delete(taskId!) + workerNode?.emit('taskFinished', taskId) + if (this.opts.enableTasksQueue === true && !this.destroying) { + const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && workerNodeTasksUsage.executing < - (this.opts.tasksQueueOptions?.concurrency as number) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.opts.tasksQueueOptions!.concurrency! ) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) } if ( workerNodeTasksUsage.executing === 0 && this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].emit('idleWorkerNode', { - workerId: workerId as number, + workerNode.emit('idle', { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + workerId: workerId!, workerNodeKey }) } @@ -1788,13 +1771,10 @@ export abstract class AbstractPool< } } - private checkAndEmitDynamicWorkerCreationEvents (): void { - if (this.type === PoolTypes.dynamic) { - if (this.full) { - this.emitter?.emit(PoolEvents.full, this.info) - } - } - } + /** + * Emits dynamic worker creation events. + */ + protected abstract checkAndEmitDynamicWorkerCreationEvents (): void /** * Gets the worker information given its worker node key. @@ -1819,7 +1799,10 @@ export abstract class AbstractPool< env: this.opts.env, workerOptions: this.opts.workerOptions, tasksQueueBackPressureSize: - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).size } ) // Flag the worker node as ready at pool startup. @@ -1846,12 +1829,12 @@ export abstract class AbstractPool< } /** - * Removes the worker node associated to the give given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1862,14 +1845,6 @@ export abstract class AbstractPool< this.getWorkerInfo(workerNodeKey).ready = false } - /** @inheritDoc */ - public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) - } - private hasBackPressure (): boolean { return ( this.opts.enableTasksQueue === true && @@ -1905,14 +1880,15 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - protected flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { - this.executeTask( - workerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void {