/**
* Worker choice strategy context referencing a worker choice algorithm implementation.
- *
- * Default to a round robin algorithm.
*/
protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
Worker,
* Can be overridden.
*
* @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
*/
- protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
}
/**
if (message.taskError != null) {
++workerTaskStatistics.failed
}
-
this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateWaitTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
}
): void {
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .aggregate
) {
- workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
+ workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .avgRunTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.runTime.average =
- workerUsage.runTime.aggregation / workerUsage.tasks.executed
+ workerUsage.runTime.aggregate / workerUsage.tasks.executed
}
if (
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .medRunTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ .median &&
message.taskPerformance?.runTime != null
) {
workerUsage.runTime.history.push(message.taskPerformance.runTime)
private updateWaitTimeWorkerUsage (
workerUsage: WorkerUsage,
- message: MessageValue<Response>
+ task: Task<Data>
): void {
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
+ .aggregate
) {
- workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+ workerUsage.waitTime.aggregate += taskWaitTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .avgWaitTime &&
+ .waitTime.average &&
workerUsage.tasks.executed !== 0
) {
workerUsage.waitTime.average =
- workerUsage.waitTime.aggregation / workerUsage.tasks.executed
+ workerUsage.waitTime.aggregate / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .medWaitTime &&
- message.taskPerformance?.waitTime != null
+ .waitTime.median &&
+ taskWaitTime != null
) {
- workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+ workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
}
private updateEluWorkerUsage (
- workerTasksUsage: WorkerUsage,
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) {
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .aggregate
+ ) {
+ if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else if (message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ workerUsage.elu.idle.average =
+ workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
+ workerUsage.elu.active.average =
+ workerUsage.elu.active.aggregate / workerUsage.tasks.executed
+ }
if (
- workerTasksUsage.elu != null &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .median &&
message.taskPerformance?.elu != null
) {
- workerTasksUsage.elu = {
- idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
- active:
- workerTasksUsage.elu.active + message.taskPerformance.elu.active,
- utilization:
- (workerTasksUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- }
- } else if (message.taskPerformance?.elu != null) {
- workerTasksUsage.elu = message.taskPerformance.elu
+ workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
+ workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
+ workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
+ workerUsage.elu.active.median = median(workerUsage.elu.active.history)
}
}
}
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey)
+ this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}
statistics: {
runTime:
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .runTime,
- waitTime:
- this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .waitTime,
+ .runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu
+ .elu.aggregate
}
})
}
private getWorkerUsage (worker: Worker): WorkerUsage {
return {
- tasks: this.getTaskStatistics(this, worker),
+ tasks: this.getTaskStatistics(worker),
runTime: {
- aggregation: 0,
+ aggregate: 0,
average: 0,
median: 0,
history: new CircularArray()
},
waitTime: {
- aggregation: 0,
+ aggregate: 0,
average: 0,
median: 0,
history: new CircularArray()
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ utilization: 0
+ }
}
}
- private getTaskStatistics (
- self: AbstractPool<Worker, Data, Response>,
- worker: Worker
- ): TaskStatistics {
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
return {
executed: 0,
executing: 0,
get queued (): number {
- return self.tasksQueueSize(self.getWorkerNodeKey(worker))
+ return queueSize ?? 0
},
failed: 0
}