X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=8beede87c359e3559e41aa7652a007ee64eacaff;hb=6b81370106fdec4cab2d203f6892a7d79c2cd5c2;hp=026bf918a40ce0158c3c159cd7734a6919f6e9c7;hpb=3557031d88b5ff00ea86695728b3ba3af46e9faa;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 026bf918..8beede87 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -474,6 +474,12 @@ export abstract class AbstractPool< ?.worker } + /** + * Checks if the worker id sent in the received message from a worker is valid. + * + * @param message - The received message. + * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid. + */ private checkMessageWorkerId (message: MessageValue): void { if ( message.workerId != null && @@ -675,11 +681,11 @@ export abstract class AbstractPool< const workerUsage = this.workerNodes[workerNodeKey].usage ++workerUsage.tasks.executing this.updateWaitTimeWorkerUsage(workerUsage, task) - const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( task.name as string ) as WorkerUsage - ++tasksWorkerUsage.tasks.executing - this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task) + ++taskWorkerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(taskWorkerUsage, task) } /** @@ -698,12 +704,12 @@ export abstract class AbstractPool< this.updateTaskStatisticsWorkerUsage(workerUsage, message) this.updateRunTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) - const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( - message.name as string + const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage( + message.taskPerformance?.name ?? DEFAULT_TASK_NAME ) as WorkerUsage - this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message) - this.updateRunTimeWorkerUsage(tasksWorkerUsage, message) - this.updateEluWorkerUsage(tasksWorkerUsage, message) + this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message) + this.updateRunTimeWorkerUsage(taskWorkerUsage, message) + this.updateEluWorkerUsage(taskWorkerUsage, message) } private updateTaskStatisticsWorkerUsage ( @@ -950,9 +956,6 @@ export abstract class AbstractPool< if (this.emitter != null) { this.emitter.emit(PoolEvents.error, error) } - if (this.opts.enableTasksQueue === true) { - this.redistributeQueuedTasks(worker) - } if (this.opts.restartWorkerOnError === true && !this.starting) { if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) { this.createAndSetupDynamicWorker() @@ -960,6 +963,9 @@ export abstract class AbstractPool< this.createAndSetupWorker() } } + if (this.opts.enableTasksQueue === true) { + this.redistributeQueuedTasks(worker) + } }) worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)