X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=dceef6decb534f083510d90a947a05861c2ec2df;hb=a1763c54c962c69b5e02a30c0909724986495fcd;hp=06d09e6b72cc8937a950a3f129e7a97699d4f415;hpb=db0e38eed9f8945c38ff579f5b33cb4bc44f62a1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 06d09e6b..dceef6de 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -118,7 +118,10 @@ export abstract class AbstractPool< this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) this.dequeueTask = this.dequeueTask.bind(this) - this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) + this.checkAndEmitTaskExecutionEvents = + this.checkAndEmitTaskExecutionEvents.bind(this) + this.checkAndEmitTaskQueuingEvents = + this.checkAndEmitTaskQueuingEvents.bind(this) if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() @@ -300,7 +303,7 @@ export abstract class AbstractPool< tasksQueueOptions.concurrency <= 0 ) { throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'` + `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero` ) } } @@ -368,6 +371,9 @@ export abstract class AbstractPool< 0 ) }), + ...(this.opts.enableTasksQueue === true && { + backPressure: this.hasBackPressure() + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -733,7 +739,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.checkAndEmitEvents() }) } @@ -854,7 +859,19 @@ export abstract class AbstractPool< message: MessageValue ): void { const workerTaskStatistics = workerUsage.tasks - --workerTaskStatistics.executing + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } else if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing < 0 + ) { + throw new Error( + 'Worker usage statistic for tasks executing cannot be negative' + ) + } if (message.taskError == null) { ++workerTaskStatistics.executed } else { @@ -1209,7 +1226,7 @@ export abstract class AbstractPool< } } - private checkAndEmitEvents (): void { + private checkAndEmitTaskExecutionEvents (): void { if (this.emitter != null) { if (this.busy) { this.emitter.emit(PoolEvents.busy, this.info) @@ -1220,6 +1237,12 @@ export abstract class AbstractPool< } } + private checkAndEmitTaskQueuingEvents (): void { + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) + } + } + /** * Gets the worker information given its worker node key. * @@ -1270,13 +1293,19 @@ export abstract class AbstractPool< /** @inheritDoc */ public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - if ( + return ( this.opts.enableTasksQueue === true && this.workerNodes[workerNodeKey].hasBackPressure() - ) { - return true - } - return false + ) + } + + private hasBackPressure (): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes.findIndex( + (workerNode) => !workerNode.hasBackPressure() + ) === -1 + ) } /** @@ -1288,16 +1317,12 @@ export abstract class AbstractPool< private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(workerNodeKey, task, task.transferList) + this.checkAndEmitTaskExecutionEvents() } private enqueueTask (workerNodeKey: number, task: Task): number { const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) - if (this.hasWorkerNodeBackPressure(workerNodeKey)) { - this.emitter?.emit(PoolEvents.backPressure, { - workerId: this.getWorkerInfo(workerNodeKey).id, - ...this.info - }) - } + this.checkAndEmitTaskQueuingEvents() return tasksQueueSize }