type TasksQueueOptions,
type WorkerType
} from './pool'
-import type {
- IWorker,
- Task,
- TaskStatistics,
- WorkerNode,
- WorkerUsage
-} from './worker'
+import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
import {
Measurements,
WorkerChoiceStrategies,
0
),
queuedTasks: this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.queued,
0
),
maxQueuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.tasksQueue.maxSize,
+ accumulator + workerNode.workerUsage.tasks.maxQueued,
0
),
failedTasks: this.workerNodes.reduce(
if (workerChoiceStrategyOptions != null) {
this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
}
- for (const workerNode of this.workerNodes) {
+ for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
this.setWorkerNodeTasksUsage(
workerNode,
- this.getWorkerUsage(workerNode.worker)
+ this.getWorkerUsage(workerNodeKey)
)
this.setWorkerStatistics(workerNode.worker)
}
const promiseResponse = this.promiseResponseMap.get(message.id)
if (promiseResponse != null) {
if (message.taskError != null) {
- promiseResponse.reject(message.taskError.message)
if (this.emitter != null) {
this.emitter.emit(PoolEvents.taskError, message.taskError)
}
+ promiseResponse.reject(message.taskError.message)
} else {
promiseResponse.resolve(message.data as Response)
}
* @returns The worker nodes length.
*/
private pushWorkerNode (worker: Worker): number {
- return this.workerNodes.push({
+ this.workerNodes.push({
worker,
- workerUsage: this.getWorkerUsage(worker),
+ workerUsage: this.getWorkerUsage(),
tasksQueue: new Queue<Task<Data>>()
})
+ const workerNodeKey = this.getWorkerNodeKey(worker)
+ this.setWorkerNodeTasksUsage(
+ this.workerNodes[workerNodeKey],
+ this.getWorkerUsage(workerNodeKey)
+ )
+ return this.workerNodes.length
}
// /**
return this.workerNodes[workerNodeKey].tasksQueue.size
}
+ private tasksMaxQueueSize (workerNodeKey: number): number {
+ return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+ }
+
private flushTasksQueue (workerNodeKey: number): void {
if (this.tasksQueueSize(workerNodeKey) > 0) {
for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
)
}
}
+ this.workerNodes[workerNodeKey].tasksQueue.clear()
}
private flushTasksQueues (): void {
})
}
- private getWorkerUsage (worker: Worker): WorkerUsage {
+ private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
+ const getTasksQueueSize = (workerNodeKey?: number): number => {
+ return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
+ }
+ const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
+ return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
+ }
return {
- tasks: this.getTaskStatistics(worker),
+ tasks: {
+ executed: 0,
+ executing: 0,
+ get queued (): number {
+ return getTasksQueueSize(workerNodeKey)
+ },
+ get maxQueued (): number {
+ return getTasksMaxQueueSize(workerNodeKey)
+ },
+ failed: 0
+ },
runTime: {
aggregate: 0,
average: 0,
}
}
}
-
- private getTaskStatistics (worker: Worker): TaskStatistics {
- const queueSize =
- this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
- return {
- executed: 0,
- executing: 0,
- get queued (): number {
- return queueSize ?? 0
- },
- failed: 0
- }
- }
}