X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=64fda2545627eec3fbcd3a7bdc3b7edd4ecb7ed2;hb=0ffa432eaff319902a2657c5e44374ea109cfb87;hp=d36e8a4737dbe5514aa936de561780baee5ba528;hpb=cea399c8027c4420ae869f177d9d518d4b51775f;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index d36e8a47..64fda254 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -92,10 +92,6 @@ export abstract class AbstractPool< * The start timestamp of the pool. */ private readonly startTimestamp - /** - * The task function names. - */ - private taskFunctions!: string[] /** * Constructs a new poolifier pool. @@ -208,9 +204,10 @@ export abstract class AbstractPool< this.opts.workerChoiceStrategy = opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy) - this.opts.workerChoiceStrategyOptions = - opts.workerChoiceStrategyOptions ?? - DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...opts.workerChoiceStrategyOptions + } this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) @@ -248,6 +245,22 @@ export abstract class AbstractPool< 'Invalid worker choice strategy options: must be a plain object' ) } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries) + ) { + throw new TypeError( + 'Invalid worker choice strategy options: choice retries must be an integer' + ) + } + if ( + workerChoiceStrategyOptions.choiceRetries != null && + workerChoiceStrategyOptions.choiceRetries <= 0 + ) { + throw new RangeError( + `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero` + ) + } if ( workerChoiceStrategyOptions.weights != null && Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize @@ -510,7 +523,9 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. */ private checkMessageWorkerId (message: MessageValue): void { - if ( + if (message.workerId == null) { + throw new Error('Worker message received without worker id') + } else if ( message.workerId != null && this.getWorkerNodeKeyByWorkerId(message.workerId) === -1 ) { @@ -568,7 +583,10 @@ export abstract class AbstractPool< workerChoiceStrategyOptions: WorkerChoiceStrategyOptions ): void { this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) - this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions + this.opts.workerChoiceStrategyOptions = { + ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, + ...workerChoiceStrategyOptions + } this.workerChoiceStrategyContext.setOptions( this.opts.workerChoiceStrategyOptions ) @@ -648,11 +666,15 @@ export abstract class AbstractPool< /** @inheritDoc */ public listTaskFunctions (): string[] { - if (this.taskFunctions != null) { - return this.taskFunctions - } else { - return [] + for (const workerNode of this.workerNodes) { + if ( + Array.isArray(workerNode.info.taskFunctions) && + workerNode.info.taskFunctions.length > 0 + ) { + return workerNode.info.taskFunctions + } } + return [] } /** @inheritDoc */ @@ -672,27 +694,28 @@ export abstract class AbstractPool< ) { reject(new TypeError('name argument must not be an empty string')) } + if (transferList != null && !Array.isArray(transferList)) { + reject(new TypeError('transferList argument must be an array')) + } + const timestamp = performance.now() + const workerNodeKey = this.chooseWorkerNode() + const workerInfo = this.getWorkerInfo(workerNodeKey) if ( name != null && - this.taskFunctions != null && - !this.taskFunctions.includes(name) + Array.isArray(workerInfo.taskFunctions) && + !workerInfo.taskFunctions.includes(name) ) { reject( new Error(`Task function '${name}' is not registered in the pool`) ) } - if (transferList != null && !Array.isArray(transferList)) { - reject(new TypeError('transferList argument must be an array')) - } - const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode() const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), transferList, timestamp, - workerId: this.getWorkerInfo(workerNodeKey).id as number, + workerId: workerInfo.id as number, taskId: randomUUID() } this.promiseResponseMap.set(task.taskId as string, { @@ -721,6 +744,7 @@ export abstract class AbstractPool< await this.destroyWorkerNode(workerNodeKey) }) ) + this.emitter?.emit(PoolEvents.destroy) } protected async sendKillMessageToWorker ( @@ -775,11 +799,13 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - task.name as string - ) as WorkerUsage - ++taskWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { + const taskWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskWorkerUsage(task.name as string) as WorkerUsage + ++taskWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) + } } /** @@ -797,12 +823,24 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - message.taskPerformance?.name ?? DEFAULT_TASK_NAME - ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) - this.updateRunTimeWorkerUsage(taskWorkerUsage, message) - this.updateEluWorkerUsage(taskWorkerUsage, message) + if (this.canUpdateTaskWorkerUsage(workerNodeKey)) { + const taskWorkerUsage = this.workerNodes[ + workerNodeKey + ].getTaskWorkerUsage( + message.taskPerformance?.name ?? DEFAULT_TASK_NAME + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskWorkerUsage, message) + this.updateEluWorkerUsage(taskWorkerUsage, message) + } + } + + private canUpdateTaskWorkerUsage (workerNodeKey: number): boolean { + const workerInfo = this.getWorkerInfo(workerNodeKey) + return ( + Array.isArray(workerInfo.taskFunctions) && + workerInfo.taskFunctions.length > 1 + ) } private updateTaskStatisticsWorkerUsage ( @@ -932,6 +970,7 @@ export abstract class AbstractPool< protected createAndSetupWorkerNode (): number { const worker = this.createWorker() + worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) worker.on('error', (error) => { @@ -951,7 +990,6 @@ export abstract class AbstractPool< this.redistributeQueuedTasks(workerNodeKey) } }) - worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { this.removeWorkerNode(worker) @@ -1108,7 +1146,7 @@ export abstract class AbstractPool< protected workerListener (): (message: MessageValue) => void { return (message) => { this.checkMessageWorkerId(message) - if (message.ready != null) { + if (message.ready != null && message.taskFunctions != null) { // Worker ready response received from worker this.handleWorkerReadyResponse(message) } else if (message.taskId != null) { @@ -1116,34 +1154,40 @@ export abstract class AbstractPool< this.handleTaskExecutionResponse(message) } else if (message.taskFunctions != null) { // Task functions message received from worker - this.taskFunctions = message.taskFunctions + this.getWorkerInfo( + this.getWorkerNodeKeyByWorkerId(message.workerId) + ).taskFunctions = message.taskFunctions } } } private handleWorkerReadyResponse (message: MessageValue): void { - this.getWorkerInfo( + if (message.ready === false) { + throw new Error(`Worker ${message.workerId} failed to initialize`) + } + const workerInfo = this.getWorkerInfo( this.getWorkerNodeKeyByWorkerId(message.workerId) - ).ready = message.ready as boolean + ) + workerInfo.ready = message.ready as boolean + workerInfo.taskFunctions = message.taskFunctions if (this.emitter != null && this.ready) { this.emitter.emit(PoolEvents.ready, this.info) } } private handleTaskExecutionResponse (message: MessageValue): void { - const promiseResponse = this.promiseResponseMap.get( - message.taskId as string - ) + const { taskId, taskError, data } = message + const promiseResponse = this.promiseResponseMap.get(taskId as string) if (promiseResponse != null) { - if (message.taskError != null) { - this.emitter?.emit(PoolEvents.taskError, message.taskError) - promiseResponse.reject(message.taskError.message) + if (taskError != null) { + this.emitter?.emit(PoolEvents.taskError, taskError) + promiseResponse.reject(taskError.message) } else { - promiseResponse.resolve(message.data as Response) + promiseResponse.resolve(data as Response) } const workerNodeKey = promiseResponse.workerNodeKey this.afterTaskExecutionHook(workerNodeKey, message) - this.promiseResponseMap.delete(message.taskId as string) + this.promiseResponseMap.delete(taskId as string) if ( this.opts.enableTasksQueue === true && this.tasksQueueSize(workerNodeKey) > 0 && @@ -1188,7 +1232,11 @@ export abstract class AbstractPool< * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found. */ private addWorkerNode (worker: Worker): number { - const workerNode = new WorkerNode(worker, this.worker) + const workerNode = new WorkerNode( + worker, + this.worker, + this.maxSize + ) // Flag the worker node as ready at pool startup. if (this.starting) { workerNode.info.ready = true @@ -1196,7 +1244,7 @@ export abstract class AbstractPool< this.workerNodes.push(workerNode) const workerNodeKey = this.getWorkerNodeKeyByWorker(worker) if (workerNodeKey === -1) { - throw new Error('Worker node not found') + throw new Error('Worker node added not found') } return workerNodeKey } @@ -1214,6 +1262,17 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + return true + } + return false + } + /** * Executes the given task on the worker given its worker node key. * @@ -1226,7 +1285,14 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - return this.workerNodes[workerNodeKey].enqueueTask(task) + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + if (this.hasWorkerNodeBackPressure(workerNodeKey)) { + this.emitter?.emit(PoolEvents.backPressure, { + workerId: this.getWorkerInfo(workerNodeKey).id, + ...this.info + }) + } + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined {