MessageHandler,
OnlineHandler,
Task,
- TasksUsage,
+ WorkerUsage,
WorkerNode
} from './pools/worker'
export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
type TasksQueueOptions,
type WorkerType
} from './pool'
-import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
+import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
import {
WorkerChoiceStrategies,
type WorkerChoiceStrategy,
workerNodes: this.workerNodes.length,
idleWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
+ workerNode.workerUsage.tasks.executing === 0
+ ? accumulator + 1
+ : accumulator,
0
),
busyWorkerNodes: this.workerNodes.reduce(
(accumulator, workerNode) =>
- workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
+ workerNode.workerUsage.tasks.executing > 0
+ ? accumulator + 1
+ : accumulator,
0
),
- runningTasks: this.workerNodes.reduce(
+ executedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
- accumulator + workerNode.tasksUsage.running,
+ accumulator + workerNode.workerUsage.tasks.executed,
+ 0
+ ),
+ executingTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.executing,
0
),
queuedTasks: this.workerNodes.reduce(
(accumulator, workerNode) =>
accumulator + workerNode.tasksQueue.maxSize,
0
+ ),
+ failedTasks: this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ accumulator + workerNode.workerUsage.tasks.failed,
+ 0
)
}
}
}
for (const workerNode of this.workerNodes) {
this.setWorkerNodeTasksUsage(workerNode, {
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: new CircularArray(),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ tasks: {
+ executing: 0,
+ executed: 0,
+ queued:
+ this.opts.enableTasksQueue === true
+ ? workerNode.tasksQueue.size
+ : 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
elu: undefined
})
this.setWorkerStatistics(workerNode.worker)
protected internalBusy (): boolean {
return (
this.workerNodes.findIndex(workerNode => {
- return workerNode.tasksUsage.running === 0
+ return workerNode.workerUsage.tasks.executing === 0
}) === -1
)
}
if (
this.opts.enableTasksQueue === true &&
(this.busy ||
- this.workerNodes[workerNodeKey].tasksUsage.running >=
+ this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
((this.opts.tasksQueueOptions as TasksQueueOptions)
.concurrency as number))
) {
* @param workerNodeKey - The worker node key.
*/
protected beforeTaskExecutionHook (workerNodeKey: number): void {
- ++this.workerNodes[workerNodeKey].tasksUsage.running
+ ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
+ if (this.opts.enableTasksQueue === true) {
+ this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
+ this.tasksQueueSize(workerNodeKey)
+ }
}
/**
worker: Worker,
message: MessageValue<Response>
): void {
- const workerTasksUsage =
- this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
- --workerTasksUsage.running
- ++workerTasksUsage.ran
+ const workerUsage =
+ this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
+ const workerTaskStatistics = workerUsage.tasks
+ --workerTaskStatistics.executing
+ ++workerTaskStatistics.executed
if (message.taskError != null) {
- ++workerTasksUsage.error
+ ++workerTaskStatistics.failed
}
- this.updateRunTimeTasksUsage(workerTasksUsage, message)
- this.updateWaitTimeTasksUsage(workerTasksUsage, message)
- this.updateEluTasksUsage(workerTasksUsage, message)
+
+ this.updateRunTimeWorkerUsage(workerUsage, message)
+ this.updateWaitTimeWorkerUsage(workerUsage, message)
+ this.updateEluWorkerUsage(workerUsage, message)
}
- private updateRunTimeTasksUsage (
- workerTasksUsage: TasksUsage,
+ private updateRunTimeWorkerUsage (
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
- workerTasksUsage.runTime += message.taskPerformance?.runTime ?? 0
+ workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
- workerTasksUsage.ran !== 0
+ workerUsage.tasks.executed !== 0
) {
- workerTasksUsage.avgRunTime =
- workerTasksUsage.runTime / workerTasksUsage.ran
+ workerUsage.runTime.average =
+ workerUsage.runTime.aggregation / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
message.taskPerformance?.runTime != null
) {
- workerTasksUsage.runTimeHistory.push(message.taskPerformance.runTime)
- workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
+ workerUsage.runTime.history.push(message.taskPerformance.runTime)
+ workerUsage.runTime.median = median(workerUsage.runTime.history)
}
}
}
- private updateWaitTimeTasksUsage (
- workerTasksUsage: TasksUsage,
+ private updateWaitTimeWorkerUsage (
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
- workerTasksUsage.waitTime += message.taskPerformance?.waitTime ?? 0
+ workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
if (
this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
- workerTasksUsage.ran !== 0
+ workerUsage.tasks.executed !== 0
) {
- workerTasksUsage.avgWaitTime =
- workerTasksUsage.waitTime / workerTasksUsage.ran
+ workerUsage.waitTime.average =
+ workerUsage.waitTime.aggregation / workerUsage.tasks.executed
}
if (
this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
message.taskPerformance?.waitTime != null
) {
- workerTasksUsage.waitTimeHistory.push(message.taskPerformance.waitTime)
- workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
+ workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
+ workerUsage.waitTime.median = median(workerUsage.waitTime.history)
}
}
}
- private updateEluTasksUsage (
- workerTasksUsage: TasksUsage,
+ private updateEluWorkerUsage (
+ workerTasksUsage: WorkerUsage,
message: MessageValue<Response>
): void {
if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
if (
isKillBehavior(KillBehaviors.HARD, message.kill) ||
(message.kill != null &&
- this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
+ this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
+ .executing === 0)
) {
// Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
this.flushTasksQueue(currentWorkerNodeKey)
* Sets the given worker node its tasks usage in the pool.
*
* @param workerNode - The worker node.
- * @param tasksUsage - The worker node tasks usage.
+ * @param workerUsage - The worker usage.
*/
private setWorkerNodeTasksUsage (
workerNode: WorkerNode<Worker, Data>,
- tasksUsage: TasksUsage
+ workerUsage: WorkerUsage
): void {
- workerNode.tasksUsage = tasksUsage
+ workerNode.workerUsage = workerUsage
}
/**
private pushWorkerNode (worker: Worker): number {
return this.workerNodes.push({
worker,
- tasksUsage: {
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: new CircularArray(),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: new CircularArray(),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ workerUsage: {
+ tasks: {
+ executed: 0,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
elu: undefined
},
tasksQueue: new Queue<Task<Data>>()
*
* @param workerNodeKey - The worker node key.
* @param worker - The worker.
- * @param tasksUsage - The worker tasks usage.
+ * @param workerUsage - The worker usage.
* @param tasksQueue - The worker task queue.
*/
private setWorkerNode (
workerNodeKey: number,
worker: Worker,
- tasksUsage: TasksUsage,
+ workerUsage: WorkerUsage,
tasksQueue: Queue<Task<Data>>
): void {
this.workerNodes[workerNodeKey] = {
worker,
- tasksUsage,
+ workerUsage,
tasksQueue
}
}
workerNodes: number
idleWorkerNodes: number
busyWorkerNodes: number
- runningTasks: number
+ executedTasks: number
+ executingTasks: number
queuedTasks: number
maxQueuedTasks: number
+ failedTasks: number
}
/**
/**
* Gets the worker task runtime.
- * If the required statistics are `avgRunTime`, the average runtime is returned.
- * If the required statistics are `medRunTime`, the median runtime is returned.
+ * If the task statistics wants `avgRunTime`, the average runtime is returned.
+ * If the task statistics wants `medRunTime`, the median runtime is returned.
*
* @param workerNodeKey - The worker node key.
* @returns The worker task runtime.
*/
protected getWorkerTaskRunTime (workerNodeKey: number): number {
return this.taskStatistics.medRunTime
- ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
- : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
+ ? this.pool.workerNodes[workerNodeKey].workerUsage.runTime.median
+ : this.pool.workerNodes[workerNodeKey].workerUsage.runTime.average
}
/**
* Gets the worker task wait time.
- * If the required statistics are `avgWaitTime`, the average wait time is returned.
- * If the required statistics are `medWaitTime`, the median wait time is returned.
+ * If the task statistics wants `avgWaitTime`, the average wait time is returned.
+ * If the task statistics wants `medWaitTime`, the median wait time is returned.
*
* @param workerNodeKey - The worker node key.
* @returns The worker task wait time.
*/
protected getWorkerWaitTime (workerNodeKey: number): number {
return this.taskStatistics.medWaitTime
- ? this.pool.workerNodes[workerNodeKey].tasksUsage.medWaitTime
- : this.pool.workerNodes[workerNodeKey].tasksUsage.avgWaitTime
+ ? this.pool.workerNodes[workerNodeKey].workerUsage.runTime.median
+ : this.pool.workerNodes[workerNodeKey].workerUsage.runTime.average
}
protected computeDefaultWorkerWeight (): number {
*/
private findFirstFreeWorkerNodeKey (): number {
return this.pool.workerNodes.findIndex(workerNode => {
- return workerNode.tasksUsage.running === 0
+ return workerNode.workerUsage.tasks.executing === 0
})
}
private findLastFreeWorkerNodeKey (): number {
// It requires node >= 18.0.0:
// return this.workerNodes.findLastIndex(workerNode => {
- // return workerNode.tasksUsage.running === 0
+ // return workerNode.workerUsage.tasks.executing === 0
// })
for (
let workerNodeKey = this.pool.workerNodes.length - 1;
workerNodeKey >= 0;
workerNodeKey--
) {
- if (this.pool.workerNodes[workerNodeKey].tasksUsage.running === 0) {
+ if (
+ this.pool.workerNodes[workerNodeKey].workerUsage.tasks.executing === 0
+ ) {
return workerNodeKey
}
}
let minRunTime = Infinity
let leastBusyWorkerNodeKey!: number
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
- const workerRunTime = workerNode.tasksUsage.runTime
+ const workerRunTime = workerNode.workerUsage.runTime.aggregation
if (workerRunTime === 0) {
return workerNodeKey
} else if (workerRunTime < minRunTime) {
let minNumberOfTasks = Infinity
let leastUsedWorkerNodeKey!: number
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
- const tasksUsage = workerNode.tasksUsage
- const workerTasks = tasksUsage.ran + tasksUsage.running
+ const workerTaskStatistics = workerNode.workerUsage.tasks
+ const workerTasks =
+ workerTaskStatistics.executed + workerTaskStatistics.executing
if (workerTasks === 0) {
return workerNodeKey
} else if (workerTasks < minNumberOfTasks) {
}
/**
- * Pool worker tasks usage statistics requirements.
+ * Pool worker node worker usage statistics requirements.
*
* @internal
*/
}
/**
- * Worker tasks usage statistics.
+ * Measure statistics.
*
* @internal
*/
-export interface TasksUsage {
+export interface MeasureStatistics {
/**
- * Number of tasks executed.
+ * Measure aggregation.
*/
- ran: number
+ aggregation: number
/**
- * Number of tasks running.
+ * Measure average.
*/
- running: number
+ average: number
/**
- * Tasks runtime.
+ * Measure median.
*/
- runTime: number
+ median: number
/**
- * Tasks runtime history.
+ * Measure history.
*/
- runTimeHistory: CircularArray<number>
+ history: CircularArray<number>
+}
+
+/**
+ * Task statistics.
+ *
+ * @internal
+ */
+
+export interface TaskStatistics {
/**
- * Average tasks runtime.
+ * Number of tasks executed.
*/
- avgRunTime: number
+ executed: number
/**
- * Median tasks runtime.
+ * Number of tasks executing.
*/
- medRunTime: number
+ executing: number
/**
- * Tasks wait time.
+ * Number of tasks queued.
*/
- waitTime: number
+ queued: number
/**
- * Tasks wait time history.
+ * Number of tasks failed.
*/
- waitTimeHistory: CircularArray<number>
+ failed: number
+}
+
+/**
+ * Worker usage statistics.
+ *
+ * @internal
+ */
+export interface WorkerUsage {
/**
- * Average tasks wait time.
+ * Tasks statistics.
*/
- avgWaitTime: number
+ tasks: TaskStatistics
/**
- * Median tasks wait time.
+ * Tasks runtime statistics.
*/
- medWaitTime: number
+ runTime: MeasureStatistics
/**
- * Number of tasks errored.
+ * Tasks wait time statistics.
*/
- error: number
+ waitTime: MeasureStatistics
/**
* Event loop utilization.
*/
*/
readonly worker: Worker
/**
- * Worker node tasks usage statistics.
+ * Worker node worker usage statistics.
*/
- tasksUsage: TasksUsage
+ workerUsage: WorkerUsage
/**
* Worker node tasks queue.
*/
workerNodes: numberOfWorkers,
idleWorkerNodes: numberOfWorkers,
busyWorkerNodes: 0,
- runningTasks: 0,
+ executedTasks: 0,
+ executingTasks: 0,
queuedTasks: 0,
- maxQueuedTasks: 0
+ maxQueuedTasks: 0,
+ failedTasks: 0
})
await pool.destroy()
pool = new DynamicClusterPool(
workerNodes: numberOfWorkers,
idleWorkerNodes: numberOfWorkers,
busyWorkerNodes: 0,
- runningTasks: 0,
+ executedTasks: 0,
+ executingTasks: 0,
queuedTasks: 0,
- maxQueuedTasks: 0
+ maxQueuedTasks: 0,
+ failedTasks: 0
})
await pool.destroy()
})
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: 0,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
promises.add(pool.execute())
}
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: 0,
- running: maxMultiplier,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: 0,
+ executing: maxMultiplier,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: expect.any(Number),
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ maxMultiplier
+ )
}
pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: 0,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: 0,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.runTimeHistory.length).toBe(0)
- expect(workerNode.tasksUsage.waitTimeHistory.length).toBe(0)
+ expect(workerNode.workerUsage.runTime.history.length).toBe(0)
+ expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
}
await pool.destroy()
})
workerNodes: expect.any(Number),
idleWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
- runningTasks: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
queuedTasks: expect.any(Number),
- maxQueuedTasks: expect.any(Number)
+ maxQueuedTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
})
await pool.destroy()
})
workerNodes: expect.any(Number),
idleWorkerNodes: expect.any(Number),
busyWorkerNodes: expect.any(Number),
- runningTasks: expect.any(Number),
+ executedTasks: expect.any(Number),
+ executingTasks: expect.any(Number),
queuedTasks: expect.any(Number),
- maxQueuedTasks: expect.any(Number)
+ maxQueuedTasks: expect.any(Number),
+ failedTasks: expect.any(Number)
})
await pool.destroy()
})
}
expect(promises.size).toBe(numberOfWorkers * maxMultiplier)
for (const workerNode of queuePool.workerNodes) {
- expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
+ expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual(
queuePool.opts.tasksQueueOptions.concurrency
)
- expect(workerNode.tasksUsage.ran).toBe(0)
+ expect(workerNode.workerUsage.tasks.executed).toBe(0)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
- expect(queuePool.info.runningTasks).toBe(numberOfWorkers)
+ expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
expect(queuePool.info.queuedTasks).toBe(
numberOfWorkers * maxMultiplier - numberOfWorkers
)
)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
- expect(workerNode.tasksUsage.running).toBe(0)
- expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
+ expect(workerNode.workerUsage.tasks.executing).toBe(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ maxMultiplier
+ )
expect(workerNode.tasksQueue.size).toBe(0)
}
})
})
expect(
errorPool.workerNodes.some(
- workerNode => workerNode.tasksUsage.error === 1
+ workerNode => workerNode.workerUsage.tasks.failed === 1
)
).toBe(true)
})
})
expect(
asyncErrorPool.workerNodes.some(
- workerNode => workerNode.tasksUsage.error === 1
+ workerNode => workerNode.workerUsage.tasks.failed === 1
)
).toBe(true)
})
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+
elu: undefined
})
}
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: expect.any(Number),
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.ran).toBeGreaterThanOrEqual(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
- expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual(
+ 0
+ )
}
// We need to clean up the resources after our test
await pool.destroy()
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: expect.any(Number),
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
- expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
}
// We need to clean up the resources after our test
await pool.destroy()
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: expect.any(Number),
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: expect.any(Number),
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: expect.any(Number),
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: 0,
+ median: expect.any(Number),
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: expect.any(Number),
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: expect.any(Number),
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.ran).toBeGreaterThanOrEqual(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
- expect(workerNode.tasksUsage.runTime).toBeGreaterThanOrEqual(0)
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThanOrEqual(
+ 0
+ )
+ expect(workerNode.workerUsage.runTime.average).toBeGreaterThanOrEqual(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: expect.any(Number),
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: expect.any(Number),
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: expect.any(Number),
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
- expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: expect.any(Number),
- running: 0,
- runTime: expect.any(Number),
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: expect.any(Number),
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: expect.any(Number),
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: expect.any(Number),
+ average: 0,
+ median: expect.any(Number),
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
- expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(max * maxMultiplier)
- expect(workerNode.tasksUsage.runTime).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.medRunTime).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
+ expect(workerNode.workerUsage.runTime.aggregation).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksUsage).toStrictEqual({
- ran: maxMultiplier,
- running: 0,
- runTime: 0,
- runTimeHistory: expect.any(CircularArray),
- avgRunTime: 0,
- medRunTime: 0,
- waitTime: 0,
- waitTimeHistory: expect.any(CircularArray),
- avgWaitTime: 0,
- medWaitTime: 0,
- error: 0,
+ expect(workerNode.workerUsage).toStrictEqual({
+ tasks: {
+ executed: maxMultiplier,
+ executing: 0,
+ queued: 0,
+ failed: 0
+ },
+ runTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ waitTime: {
+ aggregation: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
elu: undefined
})
}
}
expect(promises.size).toBe(numberOfThreads * maxMultiplier)
for (const workerNode of queuePool.workerNodes) {
- expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
+ expect(workerNode.workerUsage.tasks.executing).toBeLessThanOrEqual(
queuePool.opts.tasksQueueOptions.concurrency
)
- expect(workerNode.tasksUsage.ran).toBe(0)
+ expect(workerNode.workerUsage.tasks.executed).toBe(0)
expect(workerNode.tasksQueue.size).toBeGreaterThan(0)
}
- expect(queuePool.info.runningTasks).toBe(numberOfThreads)
+ expect(queuePool.info.executingTasks).toBe(numberOfThreads)
expect(queuePool.info.queuedTasks).toBe(
numberOfThreads * maxMultiplier - numberOfThreads
)
)
await Promise.all(promises)
for (const workerNode of queuePool.workerNodes) {
- expect(workerNode.tasksUsage.running).toBe(0)
- expect(workerNode.tasksUsage.ran).toBeGreaterThan(0)
- expect(workerNode.tasksUsage.ran).toBeLessThanOrEqual(maxMultiplier)
+ expect(workerNode.workerUsage.tasks.executing).toBe(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+ maxMultiplier
+ )
expect(workerNode.tasksQueue.size).toBe(0)
}
})
})
expect(
errorPool.workerNodes.some(
- workerNode => workerNode.tasksUsage.error === 1
+ workerNode => workerNode.workerUsage.tasks.failed === 1
)
).toBe(true)
})
})
expect(
asyncErrorPool.workerNodes.some(
- workerNode => workerNode.tasksUsage.error === 1
+ workerNode => workerNode.workerUsage.tasks.failed === 1
)
).toBe(true)
})