X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=11cc9cc15ef419346827e5f627150f04e73ba158;hb=55d7d6002049be09a06b08da26febe2e8bfa494b;hp=da0eb335b26f41092fb57560d255b5b0b624ad6e;hpb=f18fd12bf1ec2ea3649374af210a65b9a9b5a54c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index da0eb335..11cc9cc1 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -43,7 +43,6 @@ import type { WorkerUsage } from './worker' import { - type MeasurementStatisticsRequirements, Measurements, WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -56,7 +55,12 @@ import { checkFilePath, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, - updateMeasurementStatistics + getDefaultTasksQueueOptions, + updateEluWorkerUsage, + updateRunTimeWorkerUsage, + updateTaskStatisticsWorkerUsage, + updateWaitTimeWorkerUsage, + waitWorkerNodeEvents } from './utils' /** @@ -306,8 +310,8 @@ export abstract class AbstractPool< 0 ), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, + (accumulator, _workerNode, workerNodeKey) => + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( @@ -516,18 +520,6 @@ export abstract class AbstractPool< } } - /** - * Gets the given worker its worker node key. - * - * @param worker - The worker. - * @returns The worker node key if found in the pool worker nodes, `-1` otherwise. - */ - private getWorkerNodeKeyByWorker (worker: Worker): number { - return this.workerNodes.findIndex( - workerNode => workerNode.worker === worker - ) - } - /** * Gets the worker node key given its worker id. * @@ -615,12 +607,7 @@ export abstract class AbstractPool< tasksQueueOptions: TasksQueueOptions ): TasksQueueOptions { return { - ...{ - size: Math.pow(this.maxSize, 2), - concurrency: 1, - taskStealing: true, - tasksStealingOnBackPressure: true - }, + ...getDefaultTasksQueueOptions(this.maxSize), ...tasksQueueOptions } } @@ -707,6 +694,16 @@ export abstract class AbstractPool< ) } + private isWorkerNodeBusy (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes[workerNodeKey].usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) + } + return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + } + private async sendTaskFunctionOperationToWorker ( workerNodeKey: number, message: MessageValue @@ -1008,6 +1005,10 @@ export abstract class AbstractPool< workerNodeKey: number ): Promise { await new Promise((resolve, reject) => { + if (workerNodeKey < 0 || workerNodeKey >= this.workerNodes.length) { + reject(new Error(`Invalid worker node key '${workerNodeKey}'`)) + return + } const killMessageListener = (message: MessageValue): void => { this.checkMessageWorkerId(message) if (message.kill === 'success') { @@ -1033,7 +1034,20 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected abstract destroyWorkerNode (workerNodeKey: number): Promise + protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) + const flushedTasks = this.flushTasksQueue(workerNodeKey) + const workerNode = this.workerNodes[workerNodeKey] + await waitWorkerNodeEvents( + workerNode, + 'taskFinished', + flushedTasks, + this.opts.tasksQueueOptions?.tasksFinishedTimeout ?? + getDefaultTasksQueueOptions(this.maxSize).tasksFinishedTimeout + ) + await this.sendKillMessageToWorker(workerNodeKey) + await workerNode.terminate() + } /** * Setup hook to execute code before worker nodes are created in the abstract constructor. @@ -1064,7 +1078,11 @@ export abstract class AbstractPool< if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(workerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + task + ) } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1076,7 +1094,11 @@ export abstract class AbstractPool< workerNodeKey ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage ++taskFunctionWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) + updateWaitTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + task + ) } } @@ -1091,11 +1113,21 @@ export abstract class AbstractPool< workerNodeKey: number, message: MessageValue ): void { + let needWorkerChoiceStrategyUpdate = false if (this.workerNodes[workerNodeKey]?.usage != null) { const workerUsage = this.workerNodes[workerNodeKey].usage - this.updateTaskStatisticsWorkerUsage(workerUsage, message) - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) + updateTaskStatisticsWorkerUsage(workerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + workerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true } if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && @@ -1108,9 +1140,21 @@ export abstract class AbstractPool< ].getTaskFunctionWorkerUsage( message.taskPerformance?.name as string ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) - this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) + updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + updateRunTimeWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + updateEluWorkerUsage( + this.workerChoiceStrategyContext, + taskFunctionWorkerUsage, + message + ) + needWorkerChoiceStrategyUpdate = true + } + if (needWorkerChoiceStrategyUpdate) { + this.workerChoiceStrategyContext.update(workerNodeKey) } } @@ -1129,84 +1173,6 @@ export abstract class AbstractPool< ) } - private updateTaskStatisticsWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - const workerTaskStatistics = workerUsage.tasks - if ( - workerTaskStatistics.executing != null && - workerTaskStatistics.executing > 0 - ) { - --workerTaskStatistics.executing - } - if (message.workerError == null) { - ++workerTaskStatistics.executed - } else { - ++workerTaskStatistics.failed - } - } - - private updateRunTimeWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - updateMeasurementStatistics( - workerUsage.runTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime, - message.taskPerformance?.runTime ?? 0 - ) - } - - private updateWaitTimeWorkerUsage ( - workerUsage: WorkerUsage, - task: Task - ): void { - const timestamp = performance.now() - const taskWaitTime = timestamp - (task.timestamp ?? timestamp) - updateMeasurementStatistics( - workerUsage.waitTime, - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime, - taskWaitTime - ) - } - - private updateEluWorkerUsage ( - workerUsage: WorkerUsage, - message: MessageValue - ): void { - if (message.workerError != null) { - return - } - const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements = - this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu - updateMeasurementStatistics( - workerUsage.elu.active, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.active ?? 0 - ) - updateMeasurementStatistics( - workerUsage.elu.idle, - eluTaskStatisticsRequirements, - message.taskPerformance?.elu?.idle ?? 0 - ) - if (eluTaskStatisticsRequirements.aggregate) { - if (message.taskPerformance?.elu != null) { - if (workerUsage.elu.utilization != null) { - workerUsage.elu.utilization = - (workerUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } else { - workerUsage.elu.utilization = message.taskPerformance.elu.utilization - } - } - } - } - /** * Chooses a worker node for the next task. * @@ -1248,55 +1214,56 @@ export abstract class AbstractPool< transferList?: TransferListItem[] ): void - /** - * Creates a new worker. - * - * @returns Newly created worker. - */ - protected abstract createWorker (): Worker - /** * Creates a new, completely set up worker node. * * @returns New, completely set up worker node key. */ protected createAndSetupWorkerNode (): number { - const worker = this.createWorker() - - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) - worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) - worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) - worker.on('error', error => { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) - this.flagWorkerNodeAsNotReady(workerNodeKey) - const workerInfo = this.getWorkerInfo(workerNodeKey) + const workerNode = this.createWorkerNode() + workerNode.registerWorkerEventHandler( + 'online', + this.opts.onlineHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'message', + this.opts.messageHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler( + 'error', + this.opts.errorHandler ?? EMPTY_FUNCTION + ) + workerNode.registerWorkerEventHandler('error', (error: Error) => { + workerNode.info.ready = false this.emitter?.emit(PoolEvents.error, error) - this.workerNodes[workerNodeKey].closeChannel() if ( this.started && !this.starting && !this.destroying && this.opts.restartWorkerOnError === true ) { - if (workerInfo.dynamic) { + if (workerNode.info.dynamic) { this.createAndSetupDynamicWorkerNode() } else { this.createAndSetupWorkerNode() } } if (this.started && this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(workerNodeKey) + this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode)) } + workerNode?.terminate().catch(error => { + this.emitter?.emit(PoolEvents.error, error) + }) }) - worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) - worker.once('exit', () => { - this.removeWorkerNode(worker) + workerNode.registerWorkerEventHandler( + 'exit', + this.opts.exitHandler ?? EMPTY_FUNCTION + ) + workerNode.registerOnceWorkerEventHandler('exit', () => { + this.removeWorkerNode(workerNode) }) - - const workerNodeKey = this.addWorkerNode(worker) - + const workerNodeKey = this.addWorkerNode(workerNode) this.afterWorkerNodeSetup(workerNodeKey) - return workerNodeKey } @@ -1451,7 +1418,18 @@ export abstract class AbstractPool< }) } + private handleTask (workerNodeKey: number, task: Task): void { + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + } + private redistributeQueuedTasks (workerNodeKey: number): void { + if (workerNodeKey === -1) { + return + } if (this.workerNodes.length <= 1) { return } @@ -1466,12 +1444,10 @@ export abstract class AbstractPool< }, 0 ) - const task = this.dequeueTask(workerNodeKey) as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + this.handleTask( + destinationWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) } } @@ -1625,11 +1601,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, @@ -1667,11 +1639,7 @@ export abstract class AbstractPool< (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string @@ -1723,6 +1691,7 @@ export abstract class AbstractPool< const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse + const workerNode = this.workerNodes[workerNodeKey] if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) asyncResource != null @@ -1739,10 +1708,10 @@ export abstract class AbstractPool< } asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) - this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) - if (this.opts.enableTasksQueue === true) { - const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks + workerNode?.emit('taskFinished', taskId) + if (this.opts.enableTasksQueue === true && !this.destroying) { + const workerNodeTasksUsage = workerNode.usage.tasks if ( this.tasksQueueSize(workerNodeKey) > 0 && workerNodeTasksUsage.executing < @@ -1758,7 +1727,7 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerNode.emit('idleWorkerNode', { workerId: workerId as number, workerNodeKey }) @@ -1798,23 +1767,39 @@ export abstract class AbstractPool< } /** - * Adds the given worker in the pool worker nodes. + * Creates a worker node. * - * @param worker - The worker. - * @returns The added worker node key. - * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + * @returns The created worker node. */ - private addWorkerNode (worker: Worker): number { + private createWorkerNode (): IWorkerNode { const workerNode = new WorkerNode( - worker, - this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2) + this.worker, + this.filePath, + { + env: this.opts.env, + workerOptions: this.opts.workerOptions, + tasksQueueBackPressureSize: + this.opts.tasksQueueOptions?.size ?? + getDefaultTasksQueueOptions(this.maxSize).size + } ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true } + return workerNode + } + + /** + * Adds the given worker node in the pool worker nodes. + * + * @param workerNode - The worker node. + * @returns The added worker node key. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. + */ + private addWorkerNode (workerNode: IWorkerNode): number { this.workerNodes.push(workerNode) - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey === -1) { throw new Error('Worker added not found in worker nodes') } @@ -1822,12 +1807,12 @@ export abstract class AbstractPool< } /** - * Removes the given worker from the pool worker nodes. + * Removes the worker node from the pool worker nodes. * - * @param worker - The worker. + * @param workerNode - The worker node. */ - private removeWorkerNode (worker: Worker): void { - const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) + private removeWorkerNode (workerNode: IWorkerNode): void { + const workerNodeKey = this.workerNodes.indexOf(workerNode) if (workerNodeKey !== -1) { this.workerNodes.splice(workerNodeKey, 1) this.workerChoiceStrategyContext.remove(workerNodeKey) @@ -1881,14 +1866,17 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueueSize() } - protected flushTasksQueue (workerNodeKey: number): void { + protected flushTasksQueue (workerNodeKey: number): number { + let flushedTasks = 0 while (this.tasksQueueSize(workerNodeKey) > 0) { this.executeTask( workerNodeKey, this.dequeueTask(workerNodeKey) as Task ) + ++flushedTasks } this.workerNodes[workerNodeKey].clearTasksQueue() + return flushedTasks } private flushTasksQueues (): void {