*
* @param workerNodeKey - The worker node key.
*/
- protected beforeTaskExecutionHook (
- workerNodeKey: number,
- task: Task<Data>
- ): void {
- const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
- ++workerTasksUsage.running
- if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
- const waitTime = performance.now() - (task.submissionTimestamp ?? 0)
- workerTasksUsage.waitTime += waitTime
- if (
- this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime
- ) {
- workerTasksUsage.waitTimeHistory.push(waitTime)
- workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
- }
- }
+ protected beforeTaskExecutionHook (workerNodeKey: number): void {
+ ++this.workerNodes[workerNodeKey].tasksUsage.running
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerNodeKey = this.getWorkerNodeKey(worker)
- const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage
+ const workerTasksUsage =
+ this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
--workerTasksUsage.running
++workerTasksUsage.run
if (message.error != null) {
++workerTasksUsage.error
}
+ this.updateRunTimeTasksUsage(workerTasksUsage, message)
+ this.updateWaitTimeTasksUsage(workerTasksUsage, message)
+ }
+
+ private updateRunTimeTasksUsage (
+ workerTasksUsage: TasksUsage,
+ message: MessageValue<Response>
+ ): void {
if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
workerTasksUsage.runTime += message.runTime ?? 0
if (
workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
}
}
- if (
- this.workerChoiceStrategyContext.getRequiredStatistics().waitTime &&
- this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
- workerTasksUsage.run !== 0
- ) {
- workerTasksUsage.avgWaitTime =
- workerTasksUsage.waitTime / workerTasksUsage.run
+ }
+
+ private updateWaitTimeTasksUsage (
+ workerTasksUsage: TasksUsage,
+ message: MessageValue<Response>
+ ): void {
+ if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
+ workerTasksUsage.waitTime += message.waitTime ?? 0
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
+ workerTasksUsage.run !== 0
+ ) {
+ workerTasksUsage.avgWaitTime =
+ workerTasksUsage.waitTime / workerTasksUsage.run
+ }
+ if (
+ this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
+ message.waitTime != null
+ ) {
+ workerTasksUsage.waitTimeHistory.push(message.waitTime)
+ workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+ }
}
}
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey, task)
+ this.beforeTaskExecutionHook(workerNodeKey)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}