X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=06d09e6b72cc8937a950a3f129e7a97699d4f415;hb=db0e38eed9f8945c38ff579f5b33cb4bc44f62a1;hp=e7c27cdae8a881392aa3cb4c51e8ab0386365f99;hpb=5441aea6894a5031667db6447634fa5d97f69e36;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e7c27cda..06d09e6b 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -204,9 +204,10 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) @@ -244,6 +245,22 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries) + ) { + throw new TypeError( + 'Invalid worker choice strategy options: choice retries must be an integer' + ) + } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + workerChoiceStrategyOptions.choiceRetries <= 0 + ) { + throw new RangeError( + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + ) + } if ( workerChoiceStrategyOptions.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize @@ -566,7 +583,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -646,14 +666,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public listTaskFunctions (): string[] { - if ( - Array.isArray(this.getWorkerInfo(0).taskFunctions) && - (this.getWorkerInfo(0).taskFunctions as string[]).length > 0 - ) { - return this.getWorkerInfo(0).taskFunctions as string[] - } else { - return [] + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } } + return [] } /** @inheritDoc */ @@ -678,12 +699,11 @@ export abstract class AbstractPool< } const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( name != null && - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - !(this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).includes( - name - ) + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) ) { reject( new Error(`Task function '${name}' is not registered in the pool`) @@ -695,7 +715,7 @@ export abstract class AbstractPool< data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, + workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -779,12 +799,12 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage(task.name as string) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage + ++taskFunctionWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task) } } @@ -803,22 +823,29 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { - const taskWorkerUsage = this.workerNodes[ + if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) { + const taskFunctionWorkerUsage = this.workerNodes[ workerNodeKey - ].getTaskWorkerUsage( + ].getTaskFunctionWorkerUsage( message.taskPerformance?.name ?? DEFAULT_TASK_NAME ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message) + this.updateEluWorkerUsage(taskFunctionWorkerUsage, message) } } - private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + /** + * Whether the worker node shall update its task function worker usage or not. + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise. + */ + private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) return ( - Array.isArray(this.getWorkerInfo(workerNodeKey).taskFunctions) && - (this.getWorkerInfo(workerNodeKey).taskFunctions as string[]).length > 1 + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 2 ) } @@ -1125,7 +1152,7 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { + if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { @@ -1144,9 +1171,11 @@ export abstract class AbstractPool< if (message.ready === false) { throw new Error(`Worker ${message.workerId} failed to initialize`) } - this.getWorkerInfo( + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).ready = message.ready as boolean + ) + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } @@ -1209,7 +1238,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1217,7 +1250,7 @@ export abstract class AbstractPool< this.workerNodes.push(workerNode) const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } @@ -1235,6 +1268,17 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + return true + } + return false + } + /** * Executes the given task on the worker given its worker node key. * @@ -1247,7 +1291,14 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + if (this.hasWorkerNodeBackPressure(workerNodeKey)) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined {