- const workerNodeKey = this.getWorkerNodeKey(worker)
- const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
- --workerTasksUsage.running
- ++workerTasksUsage.run
- if (message.error != null) {
- ++workerTasksUsage.error
- }
- if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
- workerTasksUsage.runTime += message.runTime ?? 0
+ const workerUsage =
+ this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+ this.updateTaskStatisticsWorkerUsage(workerUsage, message)
+ this.updateRunTimeWorkerUsage(workerUsage, message)
+ this.updateEluWorkerUsage(workerUsage, message)
+ }
+
+ private updateTaskStatisticsWorkerUsage (
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void {
+ const workerTaskStatistics = workerUsage.tasks
+ --workerTaskStatistics.executing
+ ++workerTaskStatistics.executed
+ if (message.taskError != null) {
+ ++workerTaskStatistics.failed
+ }
+ }
+
+ private updateRunTimeWorkerUsage (
+ workerUsage: WorkerUsage,
+ message: MessageValue<Response>
+ ): void {
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .aggregate
+ ) {
+ workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ workerUsage.runTime.average =
+ workerUsage.runTime.aggregate /
+ (workerUsage.tasks.executed - workerUsage.tasks.failed)
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .median &&
+ message.taskPerformance?.runTime != null
+ ) {
+ workerUsage.runTime.history.push(message.taskPerformance.runTime)
+ workerUsage.runTime.median = median(workerUsage.runTime.history)
+ }
+ }
+ }
+
+ private updateWaitTimeWorkerUsage (
+ workerUsage: WorkerUsage,
+ task: Task<Data>
+ ): void {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
+ .aggregate
+ ) {
+ workerUsage.waitTime.aggregate += taskWaitTime ?? 0