X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=06d09e6b72cc8937a950a3f129e7a97699d4f415;hb=db0e38eed9f8945c38ff579f5b33cb4bc44f62a1;hp=f89af5acf813afc18eeaaa1b9277039c6692e480;hpb=a5d152043363ea21ec667205c88e0b9f2a6ff04e;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index f89af5ac..06d09e6b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -204,9 +204,10 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) @@ -244,6 +245,22 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries) + ) { + throw new TypeError( + 'Invalid worker choice strategy options: choice retries must be an integer' + ) + } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + workerChoiceStrategyOptions.choiceRetries <= 0 + ) { + throw new RangeError( + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + ) + } if ( workerChoiceStrategyOptions.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize @@ -566,7 +583,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -646,14 +666,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public listTaskFunctions (): string[] { - if ( - Array.isArray(this.getWorkerInfo(0).taskFunctions) && - (this.getWorkerInfo(0).taskFunctions as string[]).length > 0 - ) { - return this.getWorkerInfo(0).taskFunctions as string[] - } else { - return [] + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } } + return [] } /** @inheritDoc */ @@ -778,12 +799,12 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage(task.name as string) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) } } @@ -802,23 +823,29 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage( + ].getTaskFunctionWorkerUsage( message.taskPerformance?.name ?? DEFAULT_TASK_NAME ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) + this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) } } - private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + /** + * Whether the worker node shall update its task function worker usage or not. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. + */ + private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { const workerInfo = this.getWorkerInfo(workerNodeKey) return ( Array.isArray(workerInfo.taskFunctions) && - workerInfo.taskFunctions.length > 1 + workerInfo.taskFunctions.length > 2 ) } @@ -1211,7 +1238,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1237,6 +1268,17 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + return true + } + return false + } + /** * Executes the given task on the worker given its worker node key. * @@ -1249,7 +1291,14 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + if (this.hasWorkerNodeBackPressure(workerNodeKey)) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined {