### Added
- Add `LEAST_ELU` worker choice strategy (experimental).
+- Add tasks ELU instead of runtime support to `FAIR_SHARE` worker choice strategy.
### Changed
- `WorkerChoiceStrategies.LEAST_ELU`: Submit tasks to the worker with the minimum event loop utilization (ELU) (experimental)
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time
- `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental)
- - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time
+ - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time or ELU
`WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`, `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN` and `WorkerChoiceStrategies.FAIR_SHARE` strategies are targeted to heavy and long tasks.
Default: `WorkerChoiceStrategies.ROUND_ROBIN`
- `workerChoiceStrategyOptions` (optional) - The worker choice strategy options object to use in this pool.
Properties:
+ - `measurement` (optional) - The measurement to use in worker choice strategies: `runTime`, `waitTime` or `elu`.
- `runTime` (optional) - Use the tasks median runtime instead of the tasks average runtime in worker choice strategies.
- `waitTime` (optional) - Use the tasks median wait time instead of the tasks average wait time in worker choice strategies.
- - `weights` (optional) - The worker weights to use in the weighted round robin worker choice strategy: `{ 0: 200, 1: 300, ..., n: 100 }`
+ - `elu` (optional) - Use the tasks median ELU instead of the tasks average ELU in worker choice strategies.
+ - `weights` (optional) - The worker weights to use in the weighted round robin worker choice strategy: `{ 0: 200, 1: 300, ..., n: 100 }`.
- Default: `{ runTime: { median: false }, waitTime: { median: false } }`
+ Default: `{ runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.
Default: `true`
} from './pools/pool'
export type {
ErrorHandler,
+ EventLoopUtilizationMeasurementStatistics,
ExitHandler,
IWorker,
MeasurementStatistics,
WorkerNode,
WorkerUsage
} from './pools/worker'
-export { WorkerChoiceStrategies } from './pools/selection-strategies/selection-strategies-types'
+export {
+ Measurements,
+ WorkerChoiceStrategies
+} from './pools/selection-strategies/selection-strategies-types'
export type {
IWorkerChoiceStrategy,
+ Measurement,
+ MeasurementOptions,
+ MeasurementStatisticsRequirements,
TaskStatisticsRequirements,
WorkerChoiceStrategy,
WorkerChoiceStrategyOptions
.aggregate
) {
if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
- workerUsage.elu.idle.aggregate =
- workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle
- workerUsage.elu.active.aggregate =
- workerUsage.elu.active.aggregate + message.taskPerformance.elu.active
+ workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
+ workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
workerUsage.elu.utilization =
(workerUsage.elu.utilization +
message.taskPerformance.elu.utilization) /
/**
* Gets the worker task ELU.
- * If the task statistics require the ELU, the average ELU is returned.
- * If the task statistics require the ELU, the median ELU is returned.
+ * If the task statistics require the average ELU, the average ELU is returned.
+ * If the task statistics require the median ELU, the median ELU is returned.
*
* @param workerNodeKey - The worker node key.
* @returns The worker task ELU.
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
-import type {
- IWorkerChoiceStrategy,
- TaskStatisticsRequirements,
- WorkerChoiceStrategyOptions
+import {
+ type IWorkerChoiceStrategy,
+ Measurements,
+ type TaskStatisticsRequirements,
+ type WorkerChoiceStrategyOptions
} from './selection-strategies-types'
/**
median: false
},
elu: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
}
}
workerNodeKey: number,
workerVirtualTaskStartTimestamp: number
): number {
- return (
- workerVirtualTaskStartTimestamp + this.getWorkerTaskRunTime(workerNodeKey)
- )
+ const workerTaskRunTime =
+ this.opts.measurement === Measurements.elu
+ ? this.getWorkerTaskElu(workerNodeKey)
+ : this.getWorkerTaskRunTime(workerNodeKey)
+ return workerVirtualTaskStartTimestamp + workerTaskRunTime
}
private getWorkerVirtualTaskStartTimestamp (workerNodeKey: number): number {
*/
export type WorkerChoiceStrategy = keyof typeof WorkerChoiceStrategies
+/**
+ * Enumeration of measurements.
+ */
+export const Measurements = Object.freeze({
+ runTime: 'runTime',
+ waitTime: 'waitTime',
+ elu: 'elu'
+} as const)
+
+/**
+ * Measurement.
+ */
+export type Measurement = keyof typeof Measurements
+
/**
* Measurement options.
*/
-interface MeasurementOptions {
+export interface MeasurementOptions {
/**
* Set measurement median.
*/
* Worker choice strategy options.
*/
export interface WorkerChoiceStrategyOptions {
+ /**
+ * Measurement to use for worker choice strategy.
+ */
+ measurement?: Measurement
/**
* Runtime options.
*
*
* @internal
*/
-interface MeasurementStatisticsRequirements {
+export interface MeasurementStatisticsRequirements {
/**
* Require measurement aggregate.
*/
median: false
},
elu: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
}
})
- pool.setWorkerChoiceStrategyOptions({ runTime: { median: true } })
+ pool.setWorkerChoiceStrategyOptions({
+ runTime: { median: true },
+ elu: { median: true }
+ })
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- runTime: { median: true }
+ runTime: { median: true },
+ elu: { median: true }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- runTime: { median: true }
+ runTime: { median: true },
+ elu: { median: true }
})
}
expect(
median: false
},
elu: {
- aggregate: false,
+ aggregate: true,
average: false,
- median: false
+ median: true
}
})
- pool.setWorkerChoiceStrategyOptions({ runTime: { median: false } })
+ pool.setWorkerChoiceStrategyOptions({
+ runTime: { median: false },
+ elu: { median: false }
+ })
expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- runTime: { median: false }
+ runTime: { median: false },
+ elu: { median: false }
})
for (const [, workerChoiceStrategy] of pool.workerChoiceStrategyContext
.workerChoiceStrategies) {
expect(workerChoiceStrategy.opts).toStrictEqual({
- runTime: { median: false }
+ runTime: { median: false },
+ elu: { median: false }
})
}
expect(
median: false
},
elu: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
}
})
median: false
},
elu: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
}
})
median: false
},
elu: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
}
})
history: expect.any(CircularArray)
},
active: {
- aggregate: 0,
- average: 0,
+ aggregate: expect.any(Number),
+ average: expect.any(Number),
median: 0,
history: expect.any(CircularArray)
},
- utilization: 0
+ utilization: expect.any(Number)
}
})
expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
history: expect.any(CircularArray)
},
active: {
- aggregate: 0,
- average: 0,
+ aggregate: expect.any(Number),
+ average: expect.any(Number),
median: 0,
history: expect.any(CircularArray)
},
- utilization: 0
+ utilization: expect.any(Number)
}
})
expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
history: expect.any(CircularArray)
},
active: {
- aggregate: 0,
- average: 0,
+ aggregate: expect.any(Number),
+ average: expect.any(Number),
median: 0,
history: expect.any(CircularArray)
},
- utilization: 0
+ utilization: expect.any(Number)
}
})
expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
+ expect(workerNode.workerUsage.elu.utilization).toBeLessThanOrEqual(1)
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(