X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=8beede87c359e3559e41aa7652a007ee64eacaff;hb=6b81370106fdec4cab2d203f6892a7d79c2cd5c2;hp=192725082942b1aa7b8904c07eaf8a2731865f45;hpb=079de9913bd2a225fe8f08c51be74b1f4d4a78d4;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 19272508..8beede87 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,7 +1,12 @@ import { randomUUID } from 'node:crypto' import { performance } from 'node:perf_hooks' -import type { MessageValue, PromiseResponseWrapper } from '../utility-types' +import type { + MessageValue, + PromiseResponseWrapper, + Task +} from '../utility-types' import { + DEFAULT_TASK_NAME, DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, EMPTY_FUNCTION, isKillBehavior, @@ -24,7 +29,6 @@ import type { IWorker, IWorkerNode, MessageHandler, - Task, WorkerInfo, WorkerType, WorkerUsage @@ -308,7 +312,7 @@ export abstract class AbstractPool< ), maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.usage.tasks.maxQueued, + accumulator + (workerNode.usage.tasks?.maxQueued ?? 0), 0 ), failedTasks: this.workerNodes.reduce( @@ -403,14 +407,16 @@ export abstract class AbstractPool< private get starting (): boolean { return ( - !this.full || - (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready)) + this.workerNodes.length < this.minSize || + (this.workerNodes.length >= this.minSize && + this.workerNodes.some(workerNode => !workerNode.info.ready)) ) } private get ready (): boolean { return ( - this.full && this.workerNodes.every(workerNode => workerNode.info.ready) + this.workerNodes.length >= this.minSize && + this.workerNodes.every(workerNode => workerNode.info.ready) ) } @@ -468,6 +474,12 @@ export abstract class AbstractPool< ?.worker } + /** + * Checks if the worker id sent in the received message from a worker is valid. + * + * @param message - The received message. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. + */ private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && @@ -586,7 +598,7 @@ export abstract class AbstractPool< const timestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { - name, + name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), timestamp, @@ -669,6 +681,11 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) + const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + task.name as string + ) as WorkerUsage + ++taskWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) } /** @@ -682,10 +699,17 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage + const workerNodeKey = this.getWorkerNodeKey(worker) + const workerUsage = this.workerNodes[workerNodeKey].usage this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) + const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + message.taskPerformance?.name ?? DEFAULT_TASK_NAME + ) as WorkerUsage + this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskWorkerUsage, message) + this.updateEluWorkerUsage(taskWorkerUsage, message) } private updateTaskStatisticsWorkerUsage ( @@ -932,9 +956,6 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(worker) - } if (this.opts.restartWorkerOnError === true && !this.starting) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { this.createAndSetupDynamicWorker() @@ -942,6 +963,9 @@ export abstract class AbstractPool< this.createAndSetupWorker() } } + if (this.opts.enableTasksQueue === true) { + this.redistributeQueuedTasks(worker) + } }) worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)