type TasksQueueOptions,
type WorkerType
} from './pool'
-import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
+import type {
+ IWorker,
+ Task,
+ TaskStatistics,
+ WorkerNode,
+ WorkerUsage
+} from './worker'
import {
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
for (const workerNode of this.workerNodes) {
- this.setWorkerNodeTasksUsage(workerNode, {
- tasks: {
- executed: 0,
- executing: 0,
- queued:
- this.opts.enableTasksQueue === true
- ? workerNode.tasksQueue.size
- : 0,
- failed: 0
- },
- runTime: {
- aggregation: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- waitTime: {
- aggregation: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- elu: undefined
- })
+ this.setWorkerNodeTasksUsage(
+ workerNode,
+ this.getWorkerUsage(workerNode.worker)
+ )
this.setWorkerStatistics(workerNode.worker)
}
}
* Can be overridden.
*
* @param workerNodeKey - The worker node key.
+ * @param task - The task to execute.
*/
- protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
- if (this.opts.enableTasksQueue === true) {
- this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
- this.tasksQueueSize(workerNodeKey)
- }
+ protected beforeTaskExecutionHook (
+ workerNodeKey: number,
+ task: Task<Data>
+ ): void {
+ const workerUsage = this.workerNodes[workerNodeKey].workerUsage
+ ++workerUsage.tasks.executing
+ this.updateWaitTimeWorkerUsage(workerUsage, task)
}
/**
if (message.taskError != null) {
++workerTaskStatistics.failed
}
-
this.updateRunTimeWorkerUsage(workerUsage, message)
- this.updateWaitTimeWorkerUsage(workerUsage, message)
this.updateEluWorkerUsage(workerUsage, message)
}
workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
+ ) {
workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
if (
- this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .avgRunTime &&
workerUsage.tasks.executed !== 0
) {
workerUsage.runTime.average =
workerUsage.runTime.aggregation / workerUsage.tasks.executed
}
if (
- this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .medRunTime &&
message.taskPerformance?.runTime != null
) {
workerUsage.runTime.history.push(message.taskPerformance.runTime)
private updateWaitTimeWorkerUsage (
workerUsage: WorkerUsage,
- message: MessageValue<Response>
+ task: Task<Data>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
- workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
+ const timestamp = performance.now()
+ const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
+ ) {
+ workerUsage.waitTime.aggregation += taskWaitTime ?? 0
if (
- this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .avgWaitTime &&
workerUsage.tasks.executed !== 0
) {
workerUsage.waitTime.average =
workerUsage.waitTime.aggregation / workerUsage.tasks.executed
}
if (
- this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
- message.taskPerformance?.waitTime != null
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .medWaitTime &&
+ taskWaitTime != null
) {
- workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+ workerUsage.waitTime.history.push(taskWaitTime)
workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
workerTasksUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
+ if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) {
if (
workerTasksUsage.elu != null &&
message.taskPerformance?.elu != null
private pushWorkerNode (worker: Worker): number {
return this.workerNodes.push({
worker,
- workerUsage: {
- tasks: {
- executed: 0,
- executing: 0,
- queued: 0,
- failed: 0
- },
- runTime: {
- aggregation: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
-
- waitTime: {
- aggregation: 0,
- average: 0,
- median: 0,
- history: new CircularArray()
- },
- elu: undefined
- },
+ workerUsage: this.getWorkerUsage(worker),
tasksQueue: new Queue<Task<Data>>()
})
}
- /**
- * Sets the given worker in the pool worker nodes.
- *
- * @param workerNodeKey - The worker node key.
- * @param worker - The worker.
- * @param workerUsage - The worker usage.
- * @param tasksQueue - The worker task queue.
- */
- private setWorkerNode (
- workerNodeKey: number,
- worker: Worker,
- workerUsage: WorkerUsage,
- tasksQueue: Queue<Task<Data>>
- ): void {
- this.workerNodes[workerNodeKey] = {
- worker,
- workerUsage,
- tasksQueue
- }
- }
+ // /**
+ // * Sets the given worker in the pool worker nodes.
+ // *
+ // * @param workerNodeKey - The worker node key.
+ // * @param worker - The worker.
+ // * @param workerUsage - The worker usage.
+ // * @param tasksQueue - The worker task queue.
+ // */
+ // private setWorkerNode (
+ // workerNodeKey: number,
+ // worker: Worker,
+ // workerUsage: WorkerUsage,
+ // tasksQueue: Queue<Task<Data>>
+ // ): void {
+ // this.workerNodes[workerNodeKey] = {
+ // worker,
+ // workerUsage,
+ // tasksQueue
+ // }
+ // }
/**
* Removes the given worker from the pool worker nodes.
}
private executeTask (workerNodeKey: number, task: Task<Data>): void {
- this.beforeTaskExecutionHook(workerNodeKey)
+ this.beforeTaskExecutionHook(workerNodeKey, task)
this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
}
private setWorkerStatistics (worker: Worker): void {
this.sendToWorker(worker, {
statistics: {
- runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime,
- waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime,
- elu: this.workerChoiceStrategyContext.getTaskStatistics().elu
+ runTime:
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .runTime,
+ elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+ .elu
}
})
}
+
+ private getWorkerUsage (worker: Worker): WorkerUsage {
+ return {
+ tasks: this.getTaskStatistics(worker),
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ elu: undefined
+ }
+ }
+
+ private getTaskStatistics (worker: Worker): TaskStatistics {
+ const queueSize =
+ this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
+ return {
+ executed: 0,
+ executing: 0,
+ get queued (): number {
+ return queueSize ?? 0
+ },
+ failed: 0
+ }
+ }
}