X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=7ea9379665050a982259a4181c934a7eecc81adb;hb=8c08e336b31164436961b1757f653e8225051b2b;hp=ab319c627884c25c4851dc34ecb2626ebc528fc3;hpb=463226a44b0370911db746052bcad0bcc7878acf;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ab319c62..7ea93796 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -35,6 +35,7 @@ import { import type { IWorker, IWorkerNode, + TaskStatistics, WorkerInfo, WorkerNodeEventDetail, WorkerType, @@ -593,11 +594,13 @@ export abstract class AbstractPool< this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() this.setTaskStealing() } else { this.unsetTaskStealing() } if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() this.setTasksStealingOnBackPressure() } else { this.unsetTasksStealingOnBackPressure() @@ -629,36 +632,36 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private setTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } private unsetTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -982,12 +985,13 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_, workerNodeKey) => { + this.workerNodes.map(async (_workerNode, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() + this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -1394,7 +1398,7 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener( workerNodeKey, - this.workerMessageListener.bind(this) + this.workerMessageListener ) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) @@ -1402,15 +1406,15 @@ export abstract class AbstractPool< this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -1441,6 +1445,9 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { @@ -1481,13 +1488,19 @@ export abstract class AbstractPool< } private updateTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number, - taskName: string + workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] if (workerNode?.usage != null) { ++workerNode.usage.tasks.sequentiallyStolen } + } + + private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1500,13 +1513,19 @@ export abstract class AbstractPool< } private resetTaskSequentiallyStolenStatisticsWorkerUsage ( - workerNodeKey: number, - taskName: string + workerNodeKey: number ): void { const workerNode = this.workerNodes[workerNodeKey] if (workerNode?.usage != null) { workerNode.usage.tasks.sequentiallyStolen = 0 } + } + + private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage ( + workerNodeKey: number, + taskName: string + ): void { + const workerNode = this.workerNodes[workerNodeKey] if ( this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && workerNode.getTaskFunctionWorkerUsage(taskName) != null @@ -1519,10 +1538,13 @@ export abstract class AbstractPool< } private readonly handleIdleWorkerNodeEvent = ( - event: CustomEvent, + eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - const { workerNodeKey } = event.detail + if (this.workerNodes.length <= 1) { + return + } + const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( 'WorkerNode event detail workerNodeKey attribute must be defined' @@ -1535,16 +1557,45 @@ export abstract class AbstractPool< (workerNodeTasksUsage.executing > 0 || this.tasksQueueSize(workerNodeKey) > 0) ) { - this.resetTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - previousStolenTask.name as string - ) + for (const taskName of this.workerNodes[workerNodeKey].info + .taskFunctionNames as string[]) { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + taskName + ) + } + this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) return } const stolenTask = this.workerNodeStealTask(workerNodeKey) + if ( + this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) && + stolenTask != null + ) { + const taskFunctionTasksWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskFunctionWorkerUsage(stolenTask.name as string) + ?.tasks as TaskStatistics + if ( + taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 || + (previousStolenTask != null && + previousStolenTask.name === stolenTask.name && + taskFunctionTasksWorkerUsage.sequentiallyStolen > 0) + ) { + this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) + } else { + this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage( + workerNodeKey, + stolenTask.name as string + ) + } + } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(event, stolenTask) + this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1572,10 +1623,7 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.updateTaskSequentiallyStolenStatisticsWorkerUsage( - workerNodeKey, - task.name as string - ) + this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) this.updateTaskStolenStatisticsWorkerUsage( workerNodeKey, task.name as string @@ -1585,9 +1633,12 @@ export abstract class AbstractPool< } private readonly handleBackPressureEvent = ( - event: CustomEvent + eventDetail: WorkerNodeEventDetail ): void => { - const { workerId } = event.detail + if (this.workerNodes.length <= 1) { + return + } + const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { return @@ -1625,19 +1676,22 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. */ - protected workerMessageListener (message: MessageValue): void { + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { this.checkMessageWorkerId(message) - if (message.ready != null && message.taskFunctionNames != null) { + const { workerId, ready, taskId, taskFunctionNames } = message + if (ready != null && taskFunctionNames != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) - } else if (message.taskId != null) { + } else if (taskId != null) { // Task execution response received from worker this.handleTaskExecutionResponse(message) - } else if (message.taskFunctionNames != null) { + } else if (taskFunctionNames != null) { // Task function names message received from worker this.getWorkerInfo( - this.getWorkerNodeKeyByWorkerId(message.workerId) - ).taskFunctionNames = message.taskFunctionNames + this.getWorkerNodeKeyByWorkerId(workerId) + ).taskFunctionNames = taskFunctionNames } } @@ -1688,11 +1742,10 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].dispatchEvent( - new CustomEvent('idleWorkerNode', { - detail: { workerId: workerId as number, workerNodeKey } - }) - ) + this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerId: workerId as number, + workerNodeKey + }) } } }