X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=f64e9ea41cd6ee3779e75e92cd158d963bd56a7b;hb=b4682ec0748f72b7175bbdf4ba5ee339e478567e;hp=ca660951932fcf24219b98a5ba996f524085e0c5;hpb=1c6fe997dcb16b510da6587b992cd3f66d62a259;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index ca660951..f64e9ea4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -68,8 +68,6 @@ export abstract class AbstractPool< /** * Worker choice strategy context referencing a worker choice algorithm implementation. - * - * Default to a round robin algorithm. */ protected workerChoiceStrategyContext: WorkerChoiceStrategyContext< Worker, @@ -486,14 +484,21 @@ export abstract class AbstractPool< ): void { const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage + this.updateTaskStatisticsWorkerUsage(workerUsage, message) + this.updateRunTimeWorkerUsage(workerUsage, message) + this.updateEluWorkerUsage(workerUsage, message) + } + + private updateTaskStatisticsWorkerUsage ( + workerUsage: WorkerUsage, + message: MessageValue + ): void { const workerTaskStatistics = workerUsage.tasks --workerTaskStatistics.executing ++workerTaskStatistics.executed if (message.taskError != null) { ++workerTaskStatistics.failed } - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateEluWorkerUsage(workerUsage, message) } private updateRunTimeWorkerUsage ( @@ -502,19 +507,21 @@ export abstract class AbstractPool< ): void { if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .aggregate ) { - workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0 + workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0 if ( - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .avgRunTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .average && workerUsage.tasks.executed !== 0 ) { workerUsage.runTime.average = - workerUsage.runTime.aggregation / workerUsage.tasks.executed + workerUsage.runTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .medRunTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .median && message.taskPerformance?.runTime != null ) { workerUsage.runTime.history.push(message.taskPerformance.runTime) @@ -531,19 +538,21 @@ export abstract class AbstractPool< const taskWaitTime = timestamp - (task.timestamp ?? timestamp) if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime + .aggregate ) { - workerUsage.waitTime.aggregation += taskWaitTime ?? 0 + workerUsage.waitTime.aggregate += taskWaitTime ?? 0 if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .avgWaitTime && + .waitTime.average && workerUsage.tasks.executed !== 0 ) { workerUsage.waitTime.average = - workerUsage.waitTime.aggregation / workerUsage.tasks.executed + workerUsage.waitTime.aggregate / + (workerUsage.tasks.executed - workerUsage.tasks.failed) } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .medWaitTime && + .waitTime.median && taskWaitTime != null ) { workerUsage.waitTime.history.push(taskWaitTime) @@ -553,25 +562,46 @@ export abstract class AbstractPool< } private updateEluWorkerUsage ( - workerTasksUsage: WorkerUsage, + workerUsage: WorkerUsage, message: MessageValue ): void { - if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) { + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .aggregate + ) { + if (workerUsage.elu != null && message.taskPerformance?.elu != null) { + workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate += message.taskPerformance.elu.active + workerUsage.elu.utilization = + (workerUsage.elu.utilization + + message.taskPerformance.elu.utilization) / + 2 + } else if (message.taskPerformance?.elu != null) { + workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle + workerUsage.elu.active.aggregate = message.taskPerformance.elu.active + workerUsage.elu.utilization = message.taskPerformance.elu.utilization + } + if ( + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .average && + workerUsage.tasks.executed !== 0 + ) { + const executedTasks = + workerUsage.tasks.executed - workerUsage.tasks.failed + workerUsage.elu.idle.average = + workerUsage.elu.idle.aggregate / executedTasks + workerUsage.elu.active.average = + workerUsage.elu.active.aggregate / executedTasks + } if ( - workerTasksUsage.elu != null && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu + .median && message.taskPerformance?.elu != null ) { - workerTasksUsage.elu = { - idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle, - active: - workerTasksUsage.elu.active + message.taskPerformance.elu.active, - utilization: - (workerTasksUsage.elu.utilization + - message.taskPerformance.elu.utilization) / - 2 - } - } else if (message.taskPerformance?.elu != null) { - workerTasksUsage.elu = message.taskPerformance.elu + workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle) + workerUsage.elu.active.history.push(message.taskPerformance.elu.active) + workerUsage.elu.idle.median = median(workerUsage.elu.idle.history) + workerUsage.elu.active.median = median(workerUsage.elu.active.history) } } } @@ -825,9 +855,9 @@ export abstract class AbstractPool< statistics: { runTime: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime, + .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .elu + .elu.aggregate } }) } @@ -836,18 +866,32 @@ export abstract class AbstractPool< return { tasks: this.getTaskStatistics(worker), runTime: { - aggregation: 0, + aggregate: 0, average: 0, median: 0, history: new CircularArray() }, waitTime: { - aggregation: 0, + aggregate: 0, average: 0, median: 0, history: new CircularArray() }, - elu: undefined + elu: { + idle: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + active: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + utilization: 0 + } } }