X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=7ea9379665050a982259a4181c934a7eecc81adb;hb=cde8e887e8e2268ab9ecd0ca32a24ecae3e9b4a8;hp=5da69ca1cfd8190cdf39e78c0d5867e4218b140d;hpb=b641345c7f4990af9f93a94159d55a922f6df973;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5da69ca1..7ea93796 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -594,11 +594,13 @@ export abstract class AbstractPool< this.buildTasksQueueOptions(tasksQueueOptions) this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number) if (this.opts.tasksQueueOptions.taskStealing === true) { + this.unsetTaskStealing() this.setTaskStealing() } else { this.unsetTaskStealing() } if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) { + this.unsetTasksStealingOnBackPressure() this.setTasksStealingOnBackPressure() } else { this.unsetTasksStealingOnBackPressure() @@ -630,36 +632,36 @@ export abstract class AbstractPool< private setTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private unsetTaskStealing (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } } private setTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } private unsetTasksStealingOnBackPressure (): void { for (const [workerNodeKey] of this.workerNodes.entries()) { - this.workerNodes[workerNodeKey].removeEventListener( + this.workerNodes[workerNodeKey].off( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -983,12 +985,13 @@ export abstract class AbstractPool< } this.destroying = true await Promise.all( - this.workerNodes.map(async (_, workerNodeKey) => { + this.workerNodes.map(async (_workerNode, workerNodeKey) => { await this.destroyWorkerNode(workerNodeKey) }) ) this.emitter?.emit(PoolEvents.destroy, this.info) this.emitter?.emitDestroy() + this.emitter?.removeAllListeners() this.readyEventEmitted = false this.destroying = false this.started = false @@ -1395,7 +1398,7 @@ export abstract class AbstractPool< // Listen to worker messages. this.registerWorkerMessageListener( workerNodeKey, - this.workerMessageListener.bind(this) + this.workerMessageListener ) // Send the startup message to worker. this.sendStartupMessageToWorker(workerNodeKey) @@ -1403,15 +1406,15 @@ export abstract class AbstractPool< this.sendStatisticsMessageToWorker(workerNodeKey) if (this.opts.enableTasksQueue === true) { if (this.opts.tasksQueueOptions?.taskStealing === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'idleWorkerNode', - this.handleIdleWorkerNodeEvent as EventListener + this.handleIdleWorkerNodeEvent ) } if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) { - this.workerNodes[workerNodeKey].addEventListener( + this.workerNodes[workerNodeKey].on( 'backPressure', - this.handleBackPressureEvent as EventListener + this.handleBackPressureEvent ) } } @@ -1442,6 +1445,9 @@ export abstract class AbstractPool< } private redistributeQueuedTasks (workerNodeKey: number): void { + if (this.workerNodes.length <= 1) { + return + } while (this.tasksQueueSize(workerNodeKey) > 0) { const destinationWorkerNodeKey = this.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { @@ -1532,10 +1538,13 @@ export abstract class AbstractPool< } private readonly handleIdleWorkerNodeEvent = ( - event: CustomEvent, + eventDetail: WorkerNodeEventDetail, previousStolenTask?: Task ): void => { - const { workerNodeKey } = event.detail + if (this.workerNodes.length <= 1) { + return + } + const { workerNodeKey } = eventDetail if (workerNodeKey == null) { throw new Error( 'WorkerNode event detail workerNodeKey attribute must be defined' @@ -1586,7 +1595,7 @@ export abstract class AbstractPool< } sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen)) .then(() => { - this.handleIdleWorkerNodeEvent(event, stolenTask) + this.handleIdleWorkerNodeEvent(eventDetail, stolenTask) return undefined }) .catch(EMPTY_FUNCTION) @@ -1624,9 +1633,12 @@ export abstract class AbstractPool< } private readonly handleBackPressureEvent = ( - event: CustomEvent + eventDetail: WorkerNodeEventDetail ): void => { - const { workerId } = event.detail + if (this.workerNodes.length <= 1) { + return + } + const { workerId } = eventDetail const sizeOffset = 1 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) { return @@ -1664,7 +1676,9 @@ export abstract class AbstractPool< /** * This method is the message listener registered on each worker. */ - protected workerMessageListener (message: MessageValue): void { + protected readonly workerMessageListener = ( + message: MessageValue + ): void => { this.checkMessageWorkerId(message) const { workerId, ready, taskId, taskFunctionNames } = message if (ready != null && taskFunctionNames != null) { @@ -1728,11 +1742,10 @@ export abstract class AbstractPool< this.tasksQueueSize(workerNodeKey) === 0 && workerNodeTasksUsage.sequentiallyStolen === 0 ) { - this.workerNodes[workerNodeKey].dispatchEvent( - new CustomEvent('idleWorkerNode', { - detail: { workerId: workerId as number, workerNodeKey } - }) - ) + this.workerNodes[workerNodeKey].emit('idleWorkerNode', { + workerId: workerId as number, + workerNodeKey + }) } } }