X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=183344f84b42a2a55411788d1d414fb82383f9dc;hb=refs%2Ftags%2Fv2.6.30;hp=53aad1fdfcb548f56c9b1e87e93e86799b11271d;hpb=9e8442454c11d9fba371dee3deadc9c26a49a335;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 53aad1fd..183344f8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -84,6 +84,11 @@ export abstract class AbstractPool< Response > + /** + * Dynamic pool maximum size property placeholder. + */ + protected readonly max?: number + /** * Whether the pool is starting or not. */ @@ -117,8 +122,6 @@ export abstract class AbstractPool< this.chooseWorkerNode = this.chooseWorkerNode.bind(this) this.executeTask = this.executeTask.bind(this) this.enqueueTask = this.enqueueTask.bind(this) - this.dequeueTask = this.dequeueTask.bind(this) - this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this) if (this.opts.enableEvents === true) { this.emitter = new PoolEmitter() @@ -300,7 +303,7 @@ export abstract class AbstractPool< tasksQueueOptions.concurrency <= 0 ) { throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'` + `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero` ) } } @@ -368,6 +371,9 @@ export abstract class AbstractPool< 0 ) }), + ...(this.opts.enableTasksQueue === true && { + backPressure: this.hasBackPressure() + }), failedTasks: this.workerNodes.reduce( (accumulator, workerNode) => accumulator + workerNode.usage.tasks.failed, @@ -509,12 +515,16 @@ export abstract class AbstractPool< /** * The pool minimum size. */ - protected abstract get minSize (): number + protected get minSize (): number { + return this.numberOfWorkers + } /** * The pool maximum size. */ - protected abstract get maxSize (): number + protected get maxSize (): number { + return this.max ?? this.numberOfWorkers + } /** * Checks if the worker id sent in the received message from a worker is valid. @@ -733,7 +743,6 @@ export abstract class AbstractPool< } else { this.enqueueTask(workerNodeKey, task) } - this.checkAndEmitEvents() }) } @@ -744,7 +753,7 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) - this.emitter?.emit(PoolEvents.destroy) + this.emitter?.emit(PoolEvents.destroy, this.info) } protected async sendKillMessageToWorker ( @@ -854,7 +863,19 @@ export abstract class AbstractPool< message: MessageValue ): void { const workerTaskStatistics = workerUsage.tasks - --workerTaskStatistics.executing + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } else if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing < 0 + ) { + throw new Error( + 'Worker usage statistic for tasks executing cannot be negative' + ) + } if (message.taskError == null) { ++workerTaskStatistics.executed } else { @@ -1044,6 +1065,7 @@ export abstract class AbstractPool< if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) { workerInfo.ready = true } + this.checkAndEmitDynamicWorkerCreationEvents() return workerNodeKey } @@ -1209,13 +1231,22 @@ export abstract class AbstractPool< } } - private checkAndEmitEvents (): void { - if (this.emitter != null) { - if (this.busy) { - this.emitter.emit(PoolEvents.busy, this.info) - } - if (this.type === PoolTypes.dynamic && this.full) { - this.emitter.emit(PoolEvents.full, this.info) + private checkAndEmitTaskExecutionEvents (): void { + if (this.busy) { + this.emitter?.emit(PoolEvents.busy, this.info) + } + } + + private checkAndEmitTaskQueuingEvents (): void { + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) + } + } + + private checkAndEmitDynamicWorkerCreationEvents (): void { + if (this.type === PoolTypes.dynamic) { + if (this.full) { + this.emitter?.emit(PoolEvents.full, this.info) } } } @@ -1281,7 +1312,7 @@ export abstract class AbstractPool< this.opts.enableTasksQueue === true && this.workerNodes.findIndex( (workerNode) => !workerNode.hasBackPressure() - ) !== -1 + ) === -1 ) } @@ -1294,13 +1325,12 @@ export abstract class AbstractPool< private executeTask (workerNodeKey: number, task: Task): void { this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(workerNodeKey, task, task.transferList) + this.checkAndEmitTaskExecutionEvents() } private enqueueTask (workerNodeKey: number, task: Task): number { const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) - if (this.hasBackPressure()) { - this.emitter?.emit(PoolEvents.backPressure, this.info) - } + this.checkAndEmitTaskQueuingEvents() return tasksQueueSize }