* Can be overridden.
*
* @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
*/
- protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
}
/**
if (message.taskError != null) {
++workerTaskStatistics.failed
}
-
this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateWaitTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
}
private updateWaitTimeWorkerUsage (
workerUsage: WorkerUsage,
- message: MessageValue<Response>
+ task: Task<Data>
): void {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
) {
- workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+ workerUsage.waitTime.aggregation += taskWaitTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.avgWaitTime &&
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.medWaitTime &&
- message.taskPerformance?.waitTime != null
+ taskWaitTime != null
) {
- workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+ workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey)
+ this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}
runTime:
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime,
- waitTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.elu
}
private getWorkerUsage (worker: Worker): WorkerUsage {
return {
- tasks: this.getTaskStatistics(this, worker),
+ tasks: this.getTaskStatistics(worker),
runTime: {
aggregation: 0,
average: 0,
}
}
- private getTaskStatistics (
- self: AbstractPool<Worker, Data, Response>,
- worker: Worker
- ): TaskStatistics {
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
return {
executed: 0,
executing: 0,
get queued (): number {
- return self.tasksQueueSize(self.getWorkerNodeKey(worker))
+ return queueSize ?? 0
},
failed: 0
}