X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=dc1fc53f90bfbda6031a56cfd2c92e62867f07ec;hb=refs%2Ftags%2Fv2.6.29;hp=eb2c2f7818a687ac8f8016eb27489895cadfaaee;hpb=671d515455c745dc74f4c385fe23683975bfc3df;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb2c2f78..dc1fc53f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -204,9 +204,10 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) @@ -244,6 +245,22 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries) + ) { + throw new TypeError( + 'Invalid worker choice strategy options: choice retries must be an integer' + ) + } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + workerChoiceStrategyOptions.choiceRetries <= 0 + ) { + throw new RangeError( + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + ) + } if ( workerChoiceStrategyOptions.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize @@ -283,7 +300,7 @@ export abstract class AbstractPool< tasksQueueOptions.concurrency <= 0 ) { throw new Error( - `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'` + `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero` ) } } @@ -566,7 +583,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -779,12 +799,12 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage(task.name as string) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) } } @@ -803,23 +823,29 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage( + ].getTaskFunctionWorkerUsage( message.taskPerformance?.name ?? DEFAULT_TASK_NAME ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) + this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) } } - private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + /** + * Whether the worker node shall update its task function worker usage or not. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. + */ + private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 1 + workerInfo.taskFunctions.length > 2 ) } @@ -828,7 +854,19 @@ export abstract class AbstractPool< message: MessageValue ): void { const workerTaskStatistics = workerUsage.tasks - --workerTaskStatistics.executing + if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing > 0 + ) { + --workerTaskStatistics.executing + } else if ( + workerTaskStatistics.executing != null && + workerTaskStatistics.executing < 0 + ) { + throw new Error( + 'Worker usage statistic for tasks executing cannot be negative' + ) + } if (message.taskError == null) { ++workerTaskStatistics.executed } else { @@ -1242,6 +1280,23 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) + } + + private hasBackPressure (): boolean { + return ( + this.opts.enableTasksQueue === true && + this.workerNodes.findIndex( + (workerNode) => !workerNode.hasBackPressure() + ) !== -1 + ) + } + /** * Executes the given task on the worker given its worker node key. * @@ -1254,16 +1309,11 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - if ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) { - this.emitter?.emit(PoolEvents.backPressure, { - workerId: this.getWorkerInfo(workerNodeKey).id, - ...this.info - }) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + if (this.hasBackPressure()) { + this.emitter?.emit(PoolEvents.backPressure, this.info) } - return this.workerNodes[workerNodeKey].enqueueTask(task) + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined {