X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=9b0ec92f3df1f9a2d5cdec26608cde6595a9cf96;hb=563d12ded5ff23152ed70f6dc675f01eb7cd9ee5;hp=ab319c627884c25c4851dc34ecb2626ebc528fc3;hpb=6349b7551b1060472ed8c97dd01ef0c827d62278;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ab319c62..9b0ec92f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' import type { TransferListItem } from 'node:worker_threads' import { EventEmitterAsyncResource } from 'node:events' +import { AsyncResource } from 'node:async_hooks' import type { MessageValue, PromiseResponseWrapper, @@ -35,6 +36,7 @@ import { import type { IWorker, IWorkerNode, + TaskStatistics, WorkerInfo, WorkerNodeEventDetail, WorkerType, @@ -304,8 +306,8 @@ export abstract class AbstractPool< 0 ), busyWorkerNodes: this.workerNodes.reduce( - (accumulator, workerNode) => - workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator, + (accumulator, _workerNode, workerNodeKey) => + this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator, 0 ), executedTasks: this.workerNodes.reduce( @@ -593,11 +595,13 @@ export abstract class AbstractPool< this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() this.setTaskStealing() } else { this.unsetTaskStealing() } if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() this.setTasksStealingOnBackPressure() } else { this.unsetTasksStealingOnBackPressure() @@ -629,36 +633,36 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private setTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } private unsetTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -703,6 +707,16 @@ export abstract class AbstractPool< ) } + private isWorkerNodeBusy (workerNodeKey: number): boolean { + if (this.opts.enableTasksQueue === true) { + return ( + this.workerNodes[workerNodeKey].usage.tasks.executing >= + (this.opts.tasksQueueOptions?.concurrency as number) + ) + } + return this.workerNodes[workerNodeKey].usage.tasks.executing > 0 + } + private async sendTaskFunctionOperationToWorker ( workerNodeKey: number, message: MessageValue @@ -930,7 +944,13 @@ export abstract class AbstractPool< this.promiseResponseMap.set(task.taskId as string, { resolve, reject, - workerNodeKey + workerNodeKey, + ...(this.emitter != null && { + asyncResource: new AsyncResource('poolifier:task', { + triggerAsyncId: this.emitter.asyncId, + requireManualDestroy: true + }) + }) }) if ( this.opts.enableTasksQueue === false || @@ -982,12 +1002,13 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_, workerNodeKey) => { + this.workerNodes.map(async (_workerNode, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() + this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -1394,7 +1415,7 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener( workerNodeKey, - this.workerMessageListener.bind(this) + this.workerMessageListener ) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) @@ -1402,15 +1423,15 @@ export abstract class AbstractPool< this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -1440,7 +1461,18 @@ export abstract class AbstractPool< }) } + private handleTask (workerNodeKey: number, task: Task): void { + if (this.shallExecuteTask(workerNodeKey)) { + this.executeTask(workerNodeKey, task) + } else { + this.enqueueTask(workerNodeKey, task) + } + } + private redistributeQueuedTasks (workerNodeKey: number): void { + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { @@ -1452,12 +1484,10 @@ export abstract class AbstractPool< }, 0 ) - const task = this.dequeueTask(workerNodeKey) as Task - if (this.shallExecuteTask(destinationWorkerNodeKey)) { - this.executeTask(destinationWorkerNodeKey, task) - } else { - this.enqueueTask(destinationWorkerNodeKey, task) - } + this.handleTask( + destinationWorkerNodeKey, + this.dequeueTask(workerNodeKey) as Task + ) } } @@ -1481,13 +1511,19 @@ export abstract class AbstractPool< } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number, - taskName: string + workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } + } + + private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1500,13 +1536,19 @@ export abstract class AbstractPool< } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number, - taskName: string + workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } + } + + private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1519,10 +1561,13 @@ export abstract class AbstractPool< } private readonly handleIdleWorkerNodeEvent = ( - event: CustomEvent, + eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - const { workerNodeKey } = event.detail + if (this.workerNodes.length <= 1) { + return + } + const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( 'WorkerNode event detail workerNodeKey attribute must be defined' @@ -1535,16 +1580,45 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - this.resetTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - previousStolenTask.name as string - ) + for (const taskName of this.workerNodes[workerNodeKey].info + .taskFunctionNames as string[]) { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + taskName + ) + } + this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } const stolenTask = this.workerNodeStealTask(workerNodeKey) + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + stolenTask != null + ) { + const taskFunctionTasksWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage(stolenTask.name as string) + ?.tasks as TaskStatistics + if ( + taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || + (previousStolenTask != null && + previousStolenTask.name === stolenTask.name && + taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) + ) { + this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) + } else { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) + } + } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(event, stolenTask) + this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1567,15 +1641,8 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } - this.updateTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + this.handleTask(workerNodeKey, task) + this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string @@ -1585,9 +1652,12 @@ export abstract class AbstractPool< } private readonly handleBackPressureEvent = ( - event: CustomEvent + eventDetail: WorkerNodeEventDetail ): void => { - const { workerId } = event.detail + if (this.workerNodes.length <= 1) { + return + } + const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { return @@ -1609,11 +1679,7 @@ export abstract class AbstractPool< (this.opts.tasksQueueOptions?.size as number) - sizeOffset ) { const task = sourceWorkerNode.popTask() as Task - if (this.shallExecuteTask(workerNodeKey)) { - this.executeTask(workerNodeKey, task) - } else { - this.enqueueTask(workerNodeKey, task) - } + this.handleTask(workerNodeKey, task) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string @@ -1625,19 +1691,22 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. */ - protected workerMessageListener (message: MessageValue): void { + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctionNames != null) { + const { workerId, ready, taskId, taskFunctionNames } = message + if (ready != null && taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.taskId != null) { + } else if (taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctionNames != null) { + } else if (taskFunctionNames != null) { // Task function names message received from worker this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctionNames = message.taskFunctionNames + this.getWorkerNodeKeyByWorkerId(workerId) + ).taskFunctionNames = taskFunctionNames } } @@ -1661,13 +1730,22 @@ export abstract class AbstractPool< const { workerId, taskId, workerError, data } = message const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - const { resolve, reject, workerNodeKey } = promiseResponse + const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse if (workerError != null) { this.emitter?.emit(PoolEvents.taskError, workerError) - reject(workerError.message) + asyncResource != null + ? asyncResource.runInAsyncScope( + reject, + this.emitter, + workerError.message + ) + : reject(workerError.message) } else { - resolve(data as Response) + asyncResource != null + ? asyncResource.runInAsyncScope(resolve, this.emitter, data) + : resolve(data as Response) } + asyncResource?.emitDestroy() this.afterTaskExecutionHook(workerNodeKey, message) this.workerChoiceStrategyContext.update(workerNodeKey) this.promiseResponseMap.delete(taskId as string) @@ -1688,11 +1766,10 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].dispatchEvent( - new CustomEvent('idleWorkerNode', { - detail: { workerId: workerId as number, workerNodeKey } - }) - ) + this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerId: workerId as number, + workerNodeKey + }) } } }