X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=ebc7b5d95f1d3fd84bb00ff0b689bc8ce18bade9;hb=26ce26ca8861318068427cc86697103e7a3ddbf4;hp=53b7b430efcc626c317e2186e7dbd4b409944dd6;hpb=9ad272eaf0cded8c7e70f1fe6f075c99f23619fa;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 53b7b430..ebc7b5d9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' import { EventEmitterAsyncResource } from 'node:events' +import { AsyncResource } from 'node:async_hooks' import type { MessageValue, PromiseResponseWrapper, @@ -9,7 +10,6 @@ import type { } from '../utility-types' import { DEFAULT_TASK_NAME, - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, average, exponentialDelay, @@ -42,7 +42,6 @@ import type { WorkerUsage } from './worker' import { - type MeasurementStatisticsRequirements, Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -55,7 +54,12 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics + getDefaultTasksQueueOptions, + updateEluWorkerUsage, + updateRunTimeWorkerUsage, + updateTaskStatisticsWorkerUsage, + updateWaitTimeWorkerUsage, + waitWorkerNodeEvents } from './utils' /** @@ -76,11 +80,6 @@ export abstract class AbstractPool< /** @inheritDoc */ public emitter?: EventEmitterAsyncResource - /** - * Dynamic pool maximum size property placeholder. - */ - protected readonly max?: number - /** * The task execution response promise map: * - `key`: The message id of each submitted task. @@ -131,22 +130,25 @@ export abstract class AbstractPool< /** * Constructs a new poolifier pool. * - * @param numberOfWorkers - Number of workers that this pool should manage. + * @param minimumNumberOfWorkers - Minimum number of workers that this pool should manage. + * @param maximumNumberOfWorkers - Maximum number of workers that this pool should manage. * @param filePath - Path to the worker file. * @param opts - Options for the pool. */ public constructor ( - protected readonly numberOfWorkers: number, + protected readonly minimumNumberOfWorkers: number, protected readonly filePath: string, - protected readonly opts: PoolOptions + protected readonly opts: PoolOptions, + protected readonly maximumNumberOfWorkers?: number ) { if (!this.isMain()) { throw new Error( 'Cannot start a pool from a worker with the same type as the pool' ) } + this.checkPoolType() checkFilePath(this.filePath) - this.checkNumberOfWorkers(this.numberOfWorkers) + this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers) this.checkPoolOptions(this.opts) this.chooseWorkerNode = this.chooseWorkerNode.bind(this) @@ -181,7 +183,15 @@ export abstract class AbstractPool< this.startTimestamp = performance.now() } - private checkNumberOfWorkers (numberOfWorkers: number): void { + private checkPoolType (): void { + if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) { + throw new Error( + 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization' + ) + } + } + + private checkMinimumNumberOfWorkers (numberOfWorkers: number): void { if (numberOfWorkers == null) { throw new Error( 'Cannot instantiate a pool without specifying the number of workers' @@ -210,9 +220,8 @@ export abstract class AbstractPool< this.checkValidWorkerChoiceStrategyOptions( opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions ) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...opts.workerChoiceStrategyOptions + if (opts.workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions } this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true @@ -239,25 +248,10 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } - if ( - workerChoiceStrategyOptions?.retries != null && - !Number.isSafeInteger(workerChoiceStrategyOptions.retries) - ) { - throw new TypeError( - 'Invalid worker choice strategy options: retries must be an integer' - ) - } - if ( - workerChoiceStrategyOptions?.retries != null && - workerChoiceStrategyOptions.retries < 0 - ) { - throw new RangeError( - `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero` - ) - } if ( workerChoiceStrategyOptions?.weights != null && - Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize + Object.keys(workerChoiceStrategyOptions.weights).length !== + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) ) { throw new Error( 'Invalid worker choice strategy options: must have a weight for each worker node' @@ -290,11 +284,11 @@ export abstract class AbstractPool< started: this.started, ready: this.ready, strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy, - minSize: this.minSize, - maxSize: this.maxSize, - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + minSize: this.minimumNumberOfWorkers, + maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers, + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { utilization: round(this.utilization) }), workerNodes: this.workerNodes.length, idleWorkerNodes: this.workerNodes.reduce( @@ -305,8 +299,8 @@ export abstract class AbstractPool< 0 ), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, + (accumulator, _workerNode, workerNodeKey) => + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( @@ -348,7 +342,7 @@ export abstract class AbstractPool< accumulator + workerNode.usage.tasks.failed, 0 ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.aggregate && { runTime: { minimum: round( @@ -365,7 +359,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.average && { average: round( average( @@ -377,7 +371,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .runTime.median && { median: round( median( @@ -391,7 +385,7 @@ export abstract class AbstractPool< }) } }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.aggregate && { waitTime: { minimum: round( @@ -408,7 +402,7 @@ export abstract class AbstractPool< ) ) ), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.average && { average: round( average( @@ -420,7 +414,7 @@ export abstract class AbstractPool< ) ) }), - ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements() + ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements() .waitTime.median && { median: round( median( @@ -448,7 +442,7 @@ export abstract class AbstractPool< ? accumulator + 1 : accumulator, 0 - ) >= this.minSize + ) >= this.minimumNumberOfWorkers ) } @@ -459,7 +453,8 @@ export abstract class AbstractPool< */ private get utilization (): number { const poolTimeCapacity = - (performance.now() - this.startTimestamp) * this.maxSize + (performance.now() - this.startTimestamp) * + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) const totalTasksRunTime = this.workerNodes.reduce( (accumulator, workerNode) => accumulator + (workerNode.usage.runTime?.aggregate ?? 0), @@ -485,20 +480,6 @@ export abstract class AbstractPool< */ protected abstract get worker (): WorkerType - /** - * The pool minimum size. - */ - protected get minSize (): number { - return this.numberOfWorkers - } - - /** - * The pool maximum size. - */ - protected get maxSize (): number { - return this.max ?? this.numberOfWorkers - } - /** * Checks if the worker id sent in the received message from a worker is valid. * @@ -515,18 +496,6 @@ export abstract class AbstractPool< } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -563,11 +532,11 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = { - ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, - ...workerChoiceStrategyOptions + if (workerChoiceStrategyOptions != null) { + this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions } this.workerChoiceStrategyContext.setOptions( + this, this.opts.workerChoiceStrategyOptions ) } @@ -594,11 +563,13 @@ export abstract class AbstractPool< this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() this.setTaskStealing() } else { this.unsetTaskStealing() } if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() this.setTasksStealingOnBackPressure() } else { this.unsetTasksStealingOnBackPressure() @@ -612,12 +583,9 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ), ...tasksQueueOptions } } @@ -670,7 +638,10 @@ export abstract class AbstractPool< * The pool filling boolean status. */ protected get full (): boolean { - return this.workerNodes.length >= this.maxSize + return ( + this.workerNodes.length >= + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) + ) } /** @@ -704,6 +675,16 @@ export abstract class AbstractPool< ) } + private isWorkerNodeBusy (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes[workerNodeKey].usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) + } + return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + } + private async sendTaskFunctionOperationToWorker ( workerNodeKey: number, message: MessageValue @@ -931,7 +912,13 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId as string, { resolve, reject, - workerNodeKey + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) }) if ( this.opts.enableTasksQueue === false || @@ -962,7 +949,7 @@ export abstract class AbstractPool< (accumulator, workerNode) => !workerNode.info.dynamic ? accumulator + 1 : accumulator, 0 - ) < this.numberOfWorkers + ) < this.minimumNumberOfWorkers ) { this.createAndSetupWorkerNode() } @@ -983,7 +970,7 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_, workerNodeKey) => { + this.workerNodes.map(async (_workerNode, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) @@ -995,10 +982,12 @@ export abstract class AbstractPool< this.started = false } - protected async sendKillMessageToWorker ( - workerNodeKey: number - ): Promise { + private async sendKillMessageToWorker (workerNodeKey: number): Promise { await new Promise((resolve, reject) => { + if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { + reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -1024,7 +1013,22 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode (workerNodeKey: number): Promise + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + const flushedTasks = this.flushTasksQueue(workerNodeKey) + const workerNode = this.workerNodes[workerNodeKey] + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).tasksFinishedTimeout + ) + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -1055,7 +1059,11 @@ export abstract class AbstractPool< if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + task + ) } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1067,7 +1075,11 @@ export abstract class AbstractPool< workerNodeKey ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage ++taskFunctionWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + task + ) } } @@ -1082,11 +1094,21 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { + let needWorkerChoiceStrategyUpdate = false if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) + updateTaskStatisticsWorkerUsage(workerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1099,9 +1121,21 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage( message.taskPerformance?.name as string ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) - this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) + updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true + } + if (needWorkerChoiceStrategyUpdate) { + this.workerChoiceStrategyContext.update(workerNodeKey) } } @@ -1120,84 +1154,6 @@ export abstract class AbstractPool< ) } - private updateTaskStatisticsWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const workerTaskStatistics = workerUsage.tasks - if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing > 0 - ) { - --workerTaskStatistics.executing - } - if (message.workerError == null) { - ++workerTaskStatistics.executed - } else { - ++workerTaskStatistics.failed - } - } - - private updateRunTimeWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - updateMeasurementStatistics( - workerUsage.runTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0 - ) - } - - private updateWaitTimeWorkerUsage ( - workerUsage: WorkerUsage, - task: Task - ): void { - const timestamp = performance.now() - const taskWaitTime = timestamp - (task.timestamp ?? timestamp) - updateMeasurementStatistics( - workerUsage.waitTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime - ) - } - - private updateEluWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - updateMeasurementStatistics( - workerUsage.elu.active, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0 - ) - updateMeasurementStatistics( - workerUsage.elu.idle, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0 - ) - if (eluTaskStatisticsRequirements.aggregate) { - if (message.taskPerformance?.elu != null) { - if (workerUsage.elu.utilization != null) { - workerUsage.elu.utilization = - (workerUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } else { - workerUsage.elu.utilization = message.taskPerformance.elu.utilization - } - } - } - } - /** * Chooses a worker node for the next task. * @@ -1239,55 +1195,56 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): void - /** - * Creates a new worker. - * - * @returns Newly created worker. - */ - protected abstract createWorker (): Worker - /** * Creates a new, completely set up worker node. * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { - const worker = this.createWorker() - - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) - worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) - worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - this.flagWorkerNodeAsNotReady(workerNodeKey) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerNode = this.createWorkerNode() + workerNode.registerWorkerEventHandler( + 'online', + this.opts.onlineHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'message', + this.opts.messageHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'error', + this.opts.errorHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - this.workerNodes[workerNodeKey].closeChannel() if ( this.started && !this.starting && !this.destroying && this.opts.restartWorkerOnError === true ) { - if (workerInfo.dynamic) { + if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else { this.createAndSetupWorkerNode() } } if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + workerNode?.terminate().catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) }) - worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) - worker.once('exit', () => { - this.removeWorkerNode(worker) + workerNode.registerWorkerEventHandler( + 'exit', + this.opts.exitHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('exit', () => { + this.removeWorkerNode(workerNode) }) - - const workerNodeKey = this.addWorkerNode(worker) - + const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) - return workerNodeKey } @@ -1396,7 +1353,7 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener( workerNodeKey, - this.workerMessageListener.bind(this) + this.workerMessageListener ) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) @@ -1442,7 +1399,21 @@ export abstract class AbstractPool< }) } + private handleTask (workerNodeKey: number, task: Task): void { + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + } + private redistributeQueuedTasks (workerNodeKey: number): void { + if (workerNodeKey === -1) { + return + } + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { @@ -1454,12 +1425,10 @@ export abstract class AbstractPool< }, 0 ) - const task = this.dequeueTask(workerNodeKey) as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + this.handleTask( + destinationWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) } } @@ -1536,6 +1505,9 @@ export abstract class AbstractPool< eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { + if (this.workerNodes.length <= 1) { + return + } const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( @@ -1610,11 +1582,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, @@ -1627,6 +1595,9 @@ export abstract class AbstractPool< private readonly handleBackPressureEvent = ( eventDetail: WorkerNodeEventDetail ): void => { + if (this.workerNodes.length <= 1) { + return + } const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { @@ -1649,11 +1620,7 @@ export abstract class AbstractPool< (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string @@ -1665,7 +1632,9 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. */ - protected workerMessageListener (message: MessageValue): void { + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { this.checkMessageWorkerId(message) const { workerId, ready, taskId, taskFunctionNames } = message if (ready != null && taskFunctionNames != null) { @@ -1702,18 +1671,28 @@ export abstract class AbstractPool< const { workerId, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse + const workerNode = this.workerNodes[workerNodeKey] if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - reject(workerError.message) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) } else { - resolve(data as Response) + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) } + asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) - if (this.opts.enableTasksQueue === true) { - const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + workerNode?.emit('taskFinished', taskId) + if (this.opts.enableTasksQueue === true && !this.destroying) { + const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && workerNodeTasksUsage.executing < @@ -1729,7 +1708,7 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerNode.emit('idleWorkerNode', { workerId: workerId as number, workerNodeKey }) @@ -1769,23 +1748,41 @@ export abstract class AbstractPool< } /** - * Adds the given worker in the pool worker nodes. + * Creates a worker node. * - * @param worker - The worker. - * @returns The added worker node key. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + * @returns The created worker node. */ - private addWorkerNode (worker: Worker): number { + private createWorkerNode (): IWorkerNode { const workerNode = new WorkerNode( - worker, - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.worker, + this.filePath, + { + env: this.opts.env, + workerOptions: this.opts.workerOptions, + tasksQueueBackPressureSize: + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions( + this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers + ).size + } ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true } + return workerNode + } + + /** + * Adds the given worker node in the pool worker nodes. + * + * @param workerNode - The worker node. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + */ + private addWorkerNode (workerNode: IWorkerNode): number { this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey === -1) { throw new Error('Worker added not found in worker nodes') } @@ -1793,12 +1790,12 @@ export abstract class AbstractPool< } /** - * Removes the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1852,14 +1849,17 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - protected flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, this.dequeueTask(workerNodeKey) as Task ) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void {