}
private updateEluWorkerUsage (
- workerTasksUsage: WorkerUsage,
+ workerUsage: WorkerUsage,
message: MessageValue<Response>
): void {
- if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) {
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .aggregate
+ ) {
+ if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate =
+ workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate =
+ workerUsage.elu.active.aggregate + message.taskPerformance.elu.active
+ workerUsage.elu.utilization =
+ (workerUsage.elu.utilization +
+ message.taskPerformance.elu.utilization) /
+ 2
+ } else if (message.taskPerformance?.elu != null) {
+ workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
+ workerUsage.elu.utilization = message.taskPerformance.elu.utilization
+ }
if (
- workerTasksUsage.elu != null &&
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .average &&
+ workerUsage.tasks.executed !== 0
+ ) {
+ workerUsage.elu.idle.average =
+ workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
+ workerUsage.elu.active.average =
+ workerUsage.elu.active.aggregate / workerUsage.tasks.executed
+ }
+ if (
+ this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
+ .median &&
message.taskPerformance?.elu != null
) {
- workerTasksUsage.elu = {
- idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
- active:
- workerTasksUsage.elu.active + message.taskPerformance.elu.active,
- utilization:
- (workerTasksUsage.elu.utilization +
- message.taskPerformance.elu.utilization) /
- 2
- }
- } else if (message.taskPerformance?.elu != null) {
- workerTasksUsage.elu = message.taskPerformance.elu
+ workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
+ workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
+ workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
+ workerUsage.elu.active.median = median(workerUsage.elu.active.history)
}
}
}
this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
.runTime.aggregate,
elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
- .elu
+ .elu.aggregate
}
})
}
median: 0,
history: new CircularArray()
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: new CircularArray()
+ },
+ utilization: 0
+ }
}
}
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
}
/**
this.taskStatisticsRequirements.waitTime.median = opts.waitTime
.median as boolean
}
+ if (
+ this.taskStatisticsRequirements.elu.average &&
+ opts.elu?.median === true
+ ) {
+ this.taskStatisticsRequirements.elu.average = false
+ this.taskStatisticsRequirements.elu.median = opts.elu.median as boolean
+ }
+ if (
+ this.taskStatisticsRequirements.elu.median &&
+ opts.elu?.median === false
+ ) {
+ this.taskStatisticsRequirements.elu.average = true
+ this.taskStatisticsRequirements.elu.median = opts.elu.median as boolean
+ }
}
/** @inheritDoc */
* @param workerNodeKey - The worker node key.
* @returns The worker task wait time.
*/
- protected getWorkerWaitTime (workerNodeKey: number): number {
+ protected getWorkerTaskWaitTime (workerNodeKey: number): number {
return this.taskStatisticsRequirements.waitTime.median
? this.pool.workerNodes[workerNodeKey].workerUsage.runTime.median
: this.pool.workerNodes[workerNodeKey].workerUsage.runTime.average
}
+ /**
+ * Gets the worker task ELU.
+ * If the task statistics require the ELU, the average ELU is returned.
+ * If the task statistics require the ELU, the median ELU is returned.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @returns The worker task ELU.
+ */
+ protected getWorkerTaskElu (workerNodeKey: number): number {
+ return this.taskStatisticsRequirements.elu.median
+ ? this.pool.workerNodes[workerNodeKey].workerUsage.elu.active.median
+ : this.pool.workerNodes[workerNodeKey].workerUsage.elu.active.average
+ }
+
protected computeDefaultWorkerWeight (): number {
let cpusCycleTimeWeight = 0
for (const cpu of cpus()) {
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
}
/**
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
}
/** @inheritDoc */
average: false,
median: false
},
- elu: true
+ elu: {
+ aggregate: true,
+ average: false,
+ median: false
+ }
}
/** @inheritDoc */
let leastEluWorkerNodeKey!: number
for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
const workerUsage = workerNode.workerUsage
- const workerElu = workerUsage.elu?.active ?? 0
+ const workerElu = workerUsage.elu?.active.aggregate ?? 0
if (workerElu === 0) {
return workerNodeKey
} else if (workerElu < minWorkerElu) {
* @defaultValue \{ median: false \}
*/
waitTime?: MeasurementOptions
+ /**
+ * Event loop utilization options.
+ *
+ * @defaultValue \{ median: false \}
+ */
+ elu?: MeasurementOptions
/**
* Worker weights to use for weighted round robin worker selection strategy.
* Weight is the tasks maximum average or median runtime in milliseconds.
*/
waitTime: MeasurementStatisticsRequirements
/**
- * Event loop utilization.
+ * Tasks event loop utilization requirements.
*/
- elu: boolean
+ elu: MeasurementStatisticsRequirements
}
/**
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
}
/**
-import type { EventLoopUtilization } from 'node:perf_hooks'
import type { CircularArray } from '../circular-array'
import type { Queue } from '../queue'
history: CircularArray<number>
}
+/**
+ * Event loop utilization measurement statistics.
+ *
+ * @internal
+ */
+export interface EventLoopUtilizationMeasurementStatistics {
+ idle: MeasurementStatistics
+ active: MeasurementStatistics
+ utilization: number
+}
+
/**
* Task statistics.
*
*/
waitTime: MeasurementStatistics
/**
- * Event loop utilization.
+ * Tasks event loop utilization statistics.
*/
- elu: EventLoopUtilization | undefined
+ elu: EventLoopUtilizationMeasurementStatistics
}
/**
export const DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS: WorkerChoiceStrategyOptions =
{
runTime: { median: false },
- waitTime: { median: false }
+ waitTime: { median: false },
+ elu: { median: false }
}
/**
)
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
runTime: { median: false },
- waitTime: { median: false }
+ waitTime: { median: false },
+ elu: { median: false }
})
expect(pool.opts.messageHandler).toBeUndefined()
expect(pool.opts.errorHandler).toBeUndefined()
)
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
runTime: { median: false },
- waitTime: { median: false }
+ waitTime: { median: false },
+ elu: { median: false }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
runTime: { median: false },
- waitTime: { median: false }
+ waitTime: { median: false },
+ elu: { median: false }
})
}
expect(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
pool.setWorkerChoiceStrategyOptions({ runTime: { median: true } })
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
pool.setWorkerChoiceStrategyOptions({ runTime: { median: false } })
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
})
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
await Promise.all(promises)
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.runTime.history.length).toBe(0)
expect(workerNode.workerUsage.waitTime.history.length).toBe(0)
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
expect(
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
expect(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
// We need to clean up the resources after our test
median: 0,
history: expect.any(CircularArray)
},
-
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
// We need to clean up the resources after our test
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
average: false,
median: false
},
- elu: true
+ elu: {
+ aggregate: true,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: true
+ elu: {
+ aggregate: true,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- const expectedWorkerUsage = {
+ expect(workerNode.workerUsage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
average: 0,
median: 0,
history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: expect.any(Number),
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: expect.any(Number)
}
- }
- if (workerNode.workerUsage.elu === undefined) {
- expect(workerNode.workerUsage).toStrictEqual({
- ...expectedWorkerUsage,
- elu: undefined
- })
- } else {
- expect(workerNode.workerUsage).toStrictEqual({
- ...expectedWorkerUsage,
- elu: {
- active: expect.any(Number),
- idle: 0,
- utilization: 1
- }
- })
- }
+ })
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
max * maxMultiplier
)
+ expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
}
// We need to clean up the resources after our test
await pool.destroy()
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- const expectedWorkerUsage = {
+ expect(workerNode.workerUsage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
average: 0,
median: 0,
history: expect.any(CircularArray)
+ },
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: expect.any(Number),
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: expect.any(Number)
}
- }
- if (workerNode.workerUsage.elu === undefined) {
- expect(workerNode.workerUsage).toStrictEqual({
- ...expectedWorkerUsage,
- elu: undefined
- })
- } else {
- expect(workerNode.workerUsage).toStrictEqual({
- ...expectedWorkerUsage,
- elu: {
- active: expect.any(Number),
- idle: 0,
- utilization: 1
- }
- })
- }
+ })
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
max * maxMultiplier
)
+ expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
}
// We need to clean up the resources after our test
await pool.destroy()
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
await pool.destroy()
pool = new DynamicThreadPool(
average: false,
median: false
},
- elu: false
+ elu: {
+ aggregate: false,
+ average: false,
+ median: false
+ }
})
// We need to clean up the resources after our test
await pool.destroy()
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
expect(
median: 0,
history: expect.any(CircularArray)
},
- elu: undefined
+ elu: {
+ idle: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ active: {
+ aggregate: 0,
+ average: 0,
+ median: 0,
+ history: expect.any(CircularArray)
+ },
+ utilization: 0
+ }
})
}
expect(