* Can be overridden.
*
* @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
*/
- protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
}
/**
if (message.taskError != null) {
++workerTaskStatistics.failed
}
-
this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateWaitTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
}
): void {
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .aggregate
) {
- workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
+ workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .avgRunTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.runTime.average =
- workerUsage.runTime.aggregation / workerUsage.tasks.executed
+ workerUsage.runTime.aggregate / workerUsage.tasks.executed
}
if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .medRunTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .median &&
message.taskPerformance?.runTime != null
) {
workerUsage.runTime.history.push(message.taskPerformance.runTime)
private updateWaitTimeWorkerUsage (
workerUsage: WorkerUsage,
- message: MessageValue<Response>
+ task: Task<Data>
): void {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
+ .aggregate
) {
- workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+ workerUsage.waitTime.aggregate += taskWaitTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .avgWaitTime &&
+ .waitTime.average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.waitTime.average =
- workerUsage.waitTime.aggregation / workerUsage.tasks.executed
+ workerUsage.waitTime.aggregate / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .medWaitTime &&
- message.taskPerformance?.waitTime != null
+ .waitTime.median &&
+ taskWaitTime != null
) {
- workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+ workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey)
+ this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}
statistics: {
runTime:
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime,
- waitTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime,
+ .runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu
}
private getWorkerUsage (worker: Worker): WorkerUsage {
return {
- tasks: this.getTaskStatistics(this, worker),
+ tasks: this.getTaskStatistics(worker),
runTime: {
- aggregation: 0,
+ aggregate: 0,
average: 0,
median: 0,
history: new CircularArray()
},
waitTime: {
- aggregation: 0,
+ aggregate: 0,
average: 0,
median: 0,
history: new CircularArray()
}
}
- private getTaskStatistics (
- self: AbstractPool<Worker, Data, Response>,
- worker: Worker
- ): TaskStatistics {
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
return {
executed: 0,
executing: 0,
get queued (): number {
- return self.tasksQueueSize(self.getWorkerNodeKey(worker))
+ return queueSize ?? 0
},
failed: 0
}