X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=c6712d198bc28a9d77c08ae817e5c96c3b9227c6;hb=0fa1d6f0433baf6ff3f67fefdbdf610612a4f3e9;hp=eb47bc2f24f67b983b4829a763705c6fb8b45828;hpb=f7b2b8101acbc121ec9c787a8b1c72ba3375175b;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb47bc2f..c6712d19 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -5,7 +5,8 @@ import { type TransferListItem } from 'node:worker_threads' import type { MessageValue, PromiseResponseWrapper, - Task + Task, + Writable } from '../utility-types' import { DEFAULT_TASK_NAME, @@ -28,13 +29,12 @@ import { PoolTypes, type TasksQueueOptions } from './pool' -import { - type IWorker, - type IWorkerNode, - type WorkerInfo, - type WorkerType, - WorkerTypes, - type WorkerUsage +import type { + IWorker, + IWorkerNode, + WorkerInfo, + WorkerType, + WorkerUsage } from './worker' import { type MeasurementStatisticsRequirements, @@ -85,10 +85,19 @@ export abstract class AbstractPool< Response > + /** + * Dynamic pool maximum size property placeholder. + */ + protected readonly max?: number + /** * Whether the pool is starting or not. */ private readonly starting: boolean + /** + * Whether the pool is started or not. + */ + private started: boolean /** * The start timestamp of the pool. */ @@ -107,7 +116,9 @@ export abstract class AbstractPool< protected readonly opts: PoolOptions ) { if (!this.isMain()) { - throw new Error('Cannot start a pool from a worker!') + throw new Error( + 'Cannot start a pool from a worker with the same type as the pool' + ) } this.checkNumberOfWorkers(this.numberOfWorkers) this.checkFilePath(this.filePath) @@ -116,8 +127,6 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) - this.dequeueTask = this.dequeueTask.bind(this) - this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() @@ -137,6 +146,7 @@ export abstract class AbstractPool< this.starting = true this.startPool() this.starting = false + this.started = true this.startTimestamp = performance.now() } @@ -175,7 +185,7 @@ export abstract class AbstractPool< protected checkDynamicPoolSize (min: number, max: number): void { if (this.type === PoolTypes.dynamic) { if (max == null) { - throw new Error( + throw new TypeError( 'Cannot instantiate a dynamic pool without specifying the maximum pool size' ) } else if (!Number.isSafeInteger(max)) { @@ -203,9 +213,10 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) @@ -243,6 +254,22 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries) + ) { + throw new TypeError( + 'Invalid worker choice strategy options: choice retries must be an integer' + ) + } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + workerChoiceStrategyOptions.choiceRetries <= 0 + ) { + throw new RangeError( + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + ) + } if ( workerChoiceStrategyOptions.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize @@ -264,7 +291,7 @@ export abstract class AbstractPool< } private checkValidTasksQueueOptions ( - tasksQueueOptions: TasksQueueOptions + tasksQueueOptions: Writable ): void { if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) { throw new TypeError('Invalid tasks queue options: must be a plain object') @@ -274,15 +301,39 @@ export abstract class AbstractPool< !Number.isSafeInteger(tasksQueueOptions.concurrency) ) { throw new TypeError( - 'Invalid worker tasks concurrency: must be an integer' + 'Invalid worker node tasks concurrency: must be an integer' ) } if ( tasksQueueOptions?.concurrency != null && tasksQueueOptions.concurrency <= 0 + ) { + throw new RangeError( + `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` + ) + } + if ( + tasksQueueOptions?.queueMaxSize != null && + tasksQueueOptions?.size != null ) { throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'` + 'Invalid tasks queue options: cannot specify both queueMaxSize and size' + ) + } + if (tasksQueueOptions?.queueMaxSize != null) { + tasksQueueOptions.size = tasksQueueOptions.queueMaxSize + } + if ( + tasksQueueOptions?.size != null && + !Number.isSafeInteger(tasksQueueOptions.size) + ) { + throw new TypeError( + 'Invalid worker node tasks queue max size: must be an integer' + ) + } + if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) { + throw new RangeError( + `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero` ) } } @@ -350,6 +401,9 @@ export abstract class AbstractPool< 0 ) }), + ...(this.opts.enableTasksQueue === true && { + backPressure: this.hasBackPressure() + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -361,14 +415,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.runTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity ) ) ), @@ -389,7 +443,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.runTime?.median ?? 0 + (workerNode) => workerNode.usage.runTime?.median ?? 0 ) ) ) @@ -402,14 +456,14 @@ export abstract class AbstractPool< minimum: round( Math.min( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.minimum ?? Infinity + (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity ) ) ), maximum: round( Math.max( ...this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity + (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity ) ) ), @@ -430,7 +484,7 @@ export abstract class AbstractPool< median: round( median( this.workerNodes.map( - workerNode => workerNode.usage.waitTime?.median ?? 0 + (workerNode) => workerNode.usage.waitTime?.median ?? 0 ) ) ) @@ -491,12 +545,16 @@ export abstract class AbstractPool< /** * The pool minimum size. */ - protected abstract get minSize (): number + protected get minSize (): number { + return this.numberOfWorkers + } /** * The pool maximum size. */ - protected abstract get maxSize (): number + protected get maxSize (): number { + return this.max ?? this.numberOfWorkers + } /** * Checks if the worker id sent in the received message from a worker is valid. @@ -505,7 +563,9 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ private checkMessageWorkerId (message: MessageValue): void { - if ( + if (message.workerId == null) { + throw new Error('Worker message received without worker id') + } else if ( message.workerId != null && this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { @@ -523,7 +583,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorker (worker: Worker): number { return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker + (workerNode) => workerNode.worker === worker ) } @@ -535,7 +595,7 @@ export abstract class AbstractPool< */ private getWorkerNodeKeyByWorkerId (workerId: number): number { return this.workerNodes.findIndex( - workerNode => workerNode.info.id === workerId + (workerNode) => workerNode.info.id === workerId ) } @@ -554,7 +614,7 @@ export abstract class AbstractPool< } for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { workerNode.resetUsage() - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + this.sendStatisticsMessageToWorker(workerNodeKey) } } @@ -563,7 +623,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -587,16 +650,27 @@ export abstract class AbstractPool< this.checkValidTasksQueueOptions(tasksQueueOptions) this.opts.tasksQueueOptions = this.buildTasksQueueOptions(tasksQueueOptions) + this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number) } else if (this.opts.tasksQueueOptions != null) { delete this.opts.tasksQueueOptions } } + private setTasksQueueMaxSize (size: number): void { + for (const workerNode of this.workerNodes) { + workerNode.tasksQueueBackPressureSize = size + } + } + private buildTasksQueueOptions ( tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - concurrency: tasksQueueOptions?.concurrency ?? 1 + ...{ + size: Math.pow(this.maxSize, 2), + concurrency: 1 + }, + ...tasksQueueOptions } } @@ -625,7 +699,7 @@ export abstract class AbstractPool< if (this.opts.enableTasksQueue === true) { return ( this.workerNodes.findIndex( - workerNode => + (workerNode) => workerNode.info.ready && workerNode.usage.tasks.executing < (this.opts.tasksQueueOptions?.concurrency as number) @@ -634,13 +708,26 @@ export abstract class AbstractPool< } else { return ( this.workerNodes.findIndex( - workerNode => + (workerNode) => workerNode.info.ready && workerNode.usage.tasks.executing === 0 ) === -1 ) } } + /** @inheritDoc */ + public listTaskFunctions (): string[] { + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } + } + return [] + } + /** @inheritDoc */ public async execute ( data?: Data, @@ -648,21 +735,41 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): Promise { return await new Promise((resolve, reject) => { + if (!this.started) { + reject(new Error('Cannot execute a task on destroyed pool')) + } if (name != null && typeof name !== 'string') { reject(new TypeError('name argument must be a string')) } + if ( + name != null && + typeof name === 'string' && + name.trim().length === 0 + ) { + reject(new TypeError('name argument must not be an empty string')) + } if (transferList != null && !Array.isArray(transferList)) { reject(new TypeError('transferList argument must be an array')) } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo + if ( + name != null && + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) + ) { + reject( + new Error(`Task function '${name}' is not registered in the pool`) + ) + } const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, + workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -680,7 +787,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.checkAndEmitEvents() }) } @@ -691,6 +797,24 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) + this.emitter?.emit(PoolEvents.destroy, this.info) + this.started = false + } + + protected async sendKillMessageToWorker ( + workerNodeKey: number, + workerId: number + ): Promise { + await new Promise((resolve, reject) => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { + if (message.kill === 'success') { + resolve() + } else if (message.kill === 'failure') { + reject(new Error(`Worker ${workerId} kill message handling failed`)) + } + }) + this.sendToWorker(workerNodeKey, { kill: true, workerId }) + }) } /** @@ -726,14 +850,23 @@ export abstract class AbstractPool< workerNodeKey: number, task: Task ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - task.name as string - ) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + task.name as string + ) != null + ) { + const taskFunctionWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + } } /** @@ -747,16 +880,42 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { - const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + if (this.workerNodes[workerNodeKey]?.usage != null) { + const workerUsage = this.workerNodes[workerNodeKey].usage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage( + message.taskPerformance?.name as string + ) != null + ) { + const taskFunctionWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage( + message.taskPerformance?.name as string + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) + this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) + } + } + + /** + * Whether the worker node shall update its task function worker usage or not. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. + */ + private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) + return ( + workerInfo != null && + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 2 + ) } private updateTaskStatisticsWorkerUsage ( @@ -764,7 +923,12 @@ export abstract class AbstractPool< message: MessageValue ): void { const workerTaskStatistics = workerUsage.tasks - --workerTaskStatistics.executing + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } if (message.taskError == null) { ++workerTaskStatistics.executed } else { @@ -841,7 +1005,7 @@ export abstract class AbstractPool< if (this.shallCreateDynamicWorker()) { const workerNodeKey = this.createAndSetupDynamicWorkerNode() if ( - this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage ) { return workerNodeKey } @@ -886,15 +1050,20 @@ export abstract class AbstractPool< protected createAndSetupWorkerNode (): number { const worker = this.createWorker() + worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { + worker.on('error', (error) => { const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo workerInfo.ready = false this.workerNodes[workerNodeKey].closeChannel() this.emitter?.emit(PoolEvents.error, error) - if (this.opts.restartWorkerOnError === true && !this.starting) { + if ( + this.opts.restartWorkerOnError === true && + !this.starting && + this.started + ) { if (workerInfo.dynamic) { this.createAndSetupDynamicWorkerNode() } else { @@ -905,7 +1074,6 @@ export abstract class AbstractPool< this.redistributeQueuedTasks(workerNodeKey) } }) - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { this.removeWorkerNode(worker) @@ -925,7 +1093,7 @@ export abstract class AbstractPool< */ protected createAndSetupDynamicWorkerNode (): number { const workerNodeKey = this.createAndSetupWorkerNode() - this.registerWorkerMessageListener(workerNodeKey, message => { + this.registerWorkerMessageListener(workerNodeKey, (message) => { const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId( message.workerId ) @@ -933,25 +1101,31 @@ export abstract class AbstractPool< // Kill message received from worker if ( isKillBehavior(KillBehaviors.HARD, message.kill) || - (message.kill != null && + (isKillBehavior(KillBehaviors.SOFT, message.kill) && ((this.opts.enableTasksQueue === false && workerUsage.tasks.executing === 0) || (this.opts.enableTasksQueue === true && workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { - this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION) + this.destroyWorkerNode(localWorkerNodeKey).catch((error) => { + this.emitter?.emit(PoolEvents.error, error) + }) } }) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo this.sendToWorker(workerNodeKey, { checkActive: true, workerId: workerInfo.id as number }) workerInfo.dynamic = true - if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { + if ( + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady || + this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage + ) { workerInfo.ready = true } + this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } @@ -979,8 +1153,14 @@ export abstract class AbstractPool< this.registerWorkerMessageListener(workerNodeKey, this.workerListener()) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) - // Send the worker statistics message to worker. - this.sendWorkerStatisticsMessageToWorker(workerNodeKey) + // Send the statistics message to worker. + this.sendStatisticsMessageToWorker(workerNodeKey) + if (this.opts.enableTasksQueue === true) { + this.workerNodes[workerNodeKey].onEmptyQueue = + this.taskStealingOnEmptyQueue.bind(this) + this.workerNodes[workerNodeKey].onBackPressure = + this.tasksStealingOnBackPressure.bind(this) + } } /** @@ -991,11 +1171,11 @@ export abstract class AbstractPool< protected abstract sendStartupMessageToWorker (workerNodeKey: number): void /** - * Sends the worker statistics message to worker given its worker node key. + * Sends the statistics message to worker given its worker node key. * * @param workerNodeKey - The worker node key. */ - private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void { + private sendStatisticsMessageToWorker (workerNodeKey: number): void { this.sendToWorker(workerNodeKey, { statistics: { runTime: @@ -1004,50 +1184,114 @@ export abstract class AbstractPool< elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu.aggregate }, - workerId: this.getWorkerInfo(workerNodeKey).id as number + workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number }) } private redistributeQueuedTasks (workerNodeKey: number): void { while (this.tasksQueueSize(workerNodeKey) > 0) { - let targetWorkerNodeKey: number = workerNodeKey + let destinationWorkerNodeKey: number = workerNodeKey let minQueuedTasks = Infinity let executeTask = false for (const [workerNodeId, workerNode] of this.workerNodes.entries()) { - const workerInfo = this.getWorkerInfo(workerNodeId) if ( + workerNode.info.ready && + workerNodeId !== workerNodeKey && + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + executeTask = true + } + if ( + workerNode.info.ready && workerNodeId !== workerNodeKey && - workerInfo.ready && workerNode.usage.tasks.queued === 0 ) { - if ( - this.workerNodes[workerNodeId].usage.tasks.executing < - (this.opts.tasksQueueOptions?.concurrency as number) - ) { - executeTask = true - } - targetWorkerNodeKey = workerNodeId + destinationWorkerNodeKey = workerNodeId break } if ( + workerNode.info.ready && workerNodeId !== workerNodeKey && - workerInfo.ready && workerNode.usage.tasks.queued < minQueuedTasks ) { minQueuedTasks = workerNode.usage.tasks.queued - targetWorkerNodeKey = workerNodeId + destinationWorkerNodeKey = workerNodeId } } + const task = { + ...(this.dequeueTask(workerNodeKey) as Task), + workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo) + .id as number + } if (executeTask) { - this.executeTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.executeTask(destinationWorkerNodeKey, task) } else { - this.enqueueTask( - targetWorkerNodeKey, - this.dequeueTask(workerNodeKey) as Task - ) + this.enqueueTask(destinationWorkerNodeKey, task) + } + } + } + + private taskStealingOnEmptyQueue (workerId: number): void { + const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId) + const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued + ) + for (const sourceWorkerNode of workerNodes) { + if ( + sourceWorkerNode.info.ready && + sourceWorkerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: destinationWorkerNode.info.id as number + } + if ( + destinationWorkerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask(destinationWorkerNodeKey, task) + } else { + this.enqueueTask(destinationWorkerNodeKey, task) + } + break + } + } + } + + private tasksStealingOnBackPressure (workerId: number): void { + const sourceWorkerNode = + this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)] + const workerNodes = this.workerNodes + .slice() + .sort( + (workerNodeA, workerNodeB) => + workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued + ) + for (const [workerNodeKey, workerNode] of workerNodes.entries()) { + if ( + workerNode.info.ready && + workerNode.info.id !== workerId && + sourceWorkerNode.usage.tasks.queued > 0 && + !workerNode.hasBackPressure() + ) { + const task = { + ...(sourceWorkerNode.popTask() as Task), + workerId: workerNode.info.id as number + } + if ( + workerNode.usage.tasks.executing < + (this.opts.tasksQueueOptions?.concurrency as number) + ) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } } } } @@ -1058,41 +1302,52 @@ export abstract class AbstractPool< * @returns The listener function to execute when a message is received from a worker. */ protected workerListener (): (message: MessageValue) => void { - return message => { + return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { + if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) + } else if (message.taskFunctions != null) { + // Task functions message received from worker + ( + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ) as WorkerInfo + ).taskFunctions = message.taskFunctions } } } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfo( + if (message.ready === false) { + throw new Error(`Worker ${message.workerId} failed to initialize`) + } + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).ready = message.ready as boolean + ) as WorkerInfo + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get( - message.taskId as string - ) + const { taskId, taskError, data } = message + const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (message.taskError != null) { - this.emitter?.emit(PoolEvents.taskError, message.taskError) - promiseResponse.reject(message.taskError.message) + if (taskError != null) { + this.emitter?.emit(PoolEvents.taskError, taskError) + promiseResponse.reject(taskError.message) } else { - promiseResponse.resolve(message.data as Response) + promiseResponse.resolve(data as Response) } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.taskId as string) + this.promiseResponseMap.delete(taskId as string) if ( this.opts.enableTasksQueue === true && this.tasksQueueSize(workerNodeKey) > 0 && @@ -1108,13 +1363,22 @@ export abstract class AbstractPool< } } - private checkAndEmitEvents (): void { - if (this.emitter != null) { - if (this.busy) { - this.emitter.emit(PoolEvents.busy, this.info) - } - if (this.type === PoolTypes.dynamic && this.full) { - this.emitter.emit(PoolEvents.full, this.info) + private checkAndEmitTaskExecutionEvents (): void { + if (this.busy) { + this.emitter?.emit(PoolEvents.busy, this.info) + } + } + + private checkAndEmitTaskQueuingEvents (): void { + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) + } + } + + private checkAndEmitDynamicWorkerCreationEvents (): void { + if (this.type === PoolTypes.dynamic) { + if (this.full) { + this.emitter?.emit(PoolEvents.full, this.info) } } } @@ -1125,8 +1389,8 @@ export abstract class AbstractPool< * @param workerNodeKey - The worker node key. * @returns The worker information. */ - protected getWorkerInfo (workerNodeKey: number): WorkerInfo { - return this.workerNodes[workerNodeKey].info + protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined { + return this.workerNodes[workerNodeKey]?.info } /** @@ -1137,7 +1401,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1145,7 +1413,7 @@ export abstract class AbstractPool< this.workerNodes.push(workerNode) const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } @@ -1163,6 +1431,23 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) + } + + private hasBackPressure (): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes.findIndex( + (workerNode) => !workerNode.hasBackPressure() + ) === -1 + ) + } + /** * Executes the given task on the worker given its worker node key. * @@ -1171,17 +1456,14 @@ export abstract class AbstractPool< */ private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) - this.sendToWorker( - workerNodeKey, - task, - this.worker === WorkerTypes.thread && task.transferList != null - ? task.transferList - : undefined - ) + this.sendToWorker(workerNodeKey, task, task.transferList) + this.checkAndEmitTaskExecutionEvents() } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + this.checkAndEmitTaskQueuingEvents() + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined {