X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=ebc7b5d95f1d3fd84bb00ff0b689bc8ce18bade9;hb=26ce26ca8861318068427cc86697103e7a3ddbf4;hp=670d490daa18c0cd4e3e03217df69275aa35b824;hpb=c329fd41c48904770df633b6d5ea2b3d37f3eafd;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 670d490d..ebc7b5d9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -10,7 +10,6 @@ import type { } from '../utility-types' import { DEFAULT_TASK_NAME, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, average, exponentialDelay, @@ -55,6 +54,7 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, + getDefaultTasksQueueOptions, updateEluWorkerUsage, updateRunTimeWorkerUsage, updateTaskStatisticsWorkerUsage, @@ -80,11 +80,6 @@ export abstract class AbstractPool< /** @inheritDoc */ public emitter?: EventEmitterAsyncResource - /** - * Dynamic pool maximum size property placeholder. - */ - protected readonly max?: number - /** * The task execution response promise map: * - `key`: The message id of each submitted task. @@ -135,22 +130,25 @@ export abstract class AbstractPool< /** * Constructs a new poolifier pool. * - * @param numberOfWorkers - Number of workers that this pool should manage. + * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage. + * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage. * @param filePath - Path to the worker file. * @param opts - Options for the pool. */ public constructor ( - protected readonly numberOfWorkers: number, + protected readonly minimumNumberOfWorkers: number, protected readonly filePath: string, - protected readonly opts: PoolOptions + protected readonly opts: PoolOptions, + protected readonly maximumNumberOfWorkers?: number ) { if (!this.isMain()) { throw new Error( 'Cannot start a pool from a worker with the same type as the pool' ) } + this.checkPoolType() checkFilePath(this.filePath) - this.checkNumberOfWorkers(this.numberOfWorkers) + this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) @@ -185,7 +183,15 @@ export abstract class AbstractPool< this.startTimestamp = performance.now() } - private checkNumberOfWorkers (numberOfWorkers: number): void { + private checkPoolType (): void { + if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) { + throw new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + } + } + + private checkMinimumNumberOfWorkers (numberOfWorkers: number): void { if (numberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' @@ -214,9 +220,8 @@ export abstract class AbstractPool< this.checkValidWorkerChoiceStrategyOptions( opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions ) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...opts.workerChoiceStrategyOptions + if (opts.workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions } this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true @@ -243,25 +248,10 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } - if ( - workerChoiceStrategyOptions?.retries != null && - !Number.isSafeInteger(workerChoiceStrategyOptions.retries) - ) { - throw new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - } - if ( - workerChoiceStrategyOptions?.retries != null && - workerChoiceStrategyOptions.retries < 0 - ) { - throw new RangeError( - `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero` - ) - } if ( workerChoiceStrategyOptions?.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize + Object.keys(workerChoiceStrategyOptions.weights).length !== + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) ) { throw new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -294,11 +284,11 @@ export abstract class AbstractPool< started: this.started, ready: this.ready, strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, - minSize: this.minSize, - maxSize: this.maxSize, - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + minSize: this.minimumNumberOfWorkers, + maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( @@ -352,7 +342,7 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && { runTime: { minimum: round( @@ -369,7 +359,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -381,7 +371,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -395,7 +385,7 @@ export abstract class AbstractPool< }) } }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { waitTime: { minimum: round( @@ -412,7 +402,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -424,7 +414,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -452,7 +442,7 @@ export abstract class AbstractPool< ? accumulator + 1 : accumulator, 0 - ) >= this.minSize + ) >= this.minimumNumberOfWorkers ) } @@ -463,7 +453,8 @@ export abstract class AbstractPool< */ private get utilization (): number { const poolTimeCapacity = - (performance.now() - this.startTimestamp) * this.maxSize + (performance.now() - this.startTimestamp) * + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.runTime?.aggregate ?? 0), @@ -489,20 +480,6 @@ export abstract class AbstractPool< */ protected abstract get worker (): WorkerType - /** - * The pool minimum size. - */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** - * The pool maximum size. - */ - protected get maxSize (): number { - return this.max ?? this.numberOfWorkers - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -519,18 +496,6 @@ export abstract class AbstractPool< } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -567,11 +532,11 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...workerChoiceStrategyOptions + if (workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } this.workerChoiceStrategyContext.setOptions( + this, this.opts.workerChoiceStrategyOptions ) } @@ -618,12 +583,9 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ), ...tasksQueueOptions } } @@ -676,7 +638,10 @@ export abstract class AbstractPool< * The pool filling boolean status. */ protected get full (): boolean { - return this.workerNodes.length >= this.maxSize + return ( + this.workerNodes.length >= + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) + ) } /** @@ -984,7 +949,7 @@ export abstract class AbstractPool< (accumulator, workerNode) => !workerNode.info.dynamic ? accumulator + 1 : accumulator, 0 - ) < this.numberOfWorkers + ) < this.minimumNumberOfWorkers ) { this.createAndSetupWorkerNode() } @@ -1017,10 +982,12 @@ export abstract class AbstractPool< this.started = false } - protected async sendKillMessageToWorker ( - workerNodeKey: number - ): Promise { + private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { + if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { + reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -1050,7 +1017,15 @@ export abstract class AbstractPool< this.flagWorkerNodeAsNotReady(workerNodeKey) const flushedTasks = this.flushTasksQueue(workerNodeKey) const workerNode = this.workerNodes[workerNodeKey] - await waitWorkerNodeEvents(workerNode, 'taskFinished', flushedTasks) + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).tasksFinishedTimeout + ) await this.sendKillMessageToWorker(workerNodeKey) await workerNode.terminate() } @@ -1257,7 +1232,7 @@ export abstract class AbstractPool< if (this.started && this.opts.enableTasksQueue === true) { this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } - workerNode.terminate().catch(error => { + workerNode?.terminate().catch(error => { this.emitter?.emit(PoolEvents.error, error) }) }) @@ -1433,6 +1408,9 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + if (workerNodeKey === -1) { + return + } if (this.workerNodes.length <= 1) { return } @@ -1713,7 +1691,7 @@ export abstract class AbstractPool< this.afterTaskExecutionHook(workerNodeKey, message) this.promiseResponseMap.delete(taskId as string) workerNode?.emit('taskFinished', taskId) - if (this.opts.enableTasksQueue === true) { + if (this.opts.enableTasksQueue === true && !this.destroying) { const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && @@ -1782,7 +1760,10 @@ export abstract class AbstractPool< env: this.opts.env, workerOptions: this.opts.workerOptions, tasksQueueBackPressureSize: - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).size } ) // Flag the worker node as ready at pool startup.