## [Unreleased]
+### Fixed
+
+- Ensure dynamic worker node are initialized with sensible worker node usage default values to avoid worker choice strategies biased decisions.
+- Account for tasks wait time in task execution time computation in worker choice strategies to avoid biased decisions under load with several prioritized task functions and tasks queue enabled.
+
## [4.0.0] - 2024-04-30
### Changed
- `workerChoiceStrategy` (optional) - The default worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
- - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks
+ - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executing and queued tasks
- `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time
- `WorkerChoiceStrategies.LEAST_ELU`: Submit tasks to the worker with the minimum event loop utilization (ELU)
- `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a [weighted round robin scheduling algorithm](./worker-choice-strategies.md#weighted-round-robin) based on tasks execution time
this.enqueueTask = this.enqueueTask.bind(this)
if (this.opts.enableEvents === true) {
- this.initializeEventEmitter()
+ this.initEventEmitter()
}
this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
Worker,
}
}
- private initializeEventEmitter (): void {
+ private initEventEmitter (): void {
this.emitter = new EventEmitterAsyncResource({
name: `poolifier:${this.type}-${this.worker}-pool`
})
/**
* Starts the minimum number of workers.
*/
- private startMinimumNumberOfWorkers (): void {
+ private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
this.startingMinimumNumberOfWorkers = true
while (
this.workerNodes.reduce(
0
) < this.minimumNumberOfWorkers
) {
- this.createAndSetupWorkerNode()
+ const workerNodeKey = this.createAndSetupWorkerNode()
+ initWorkerNodeUsage &&
+ this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
}
this.startingMinimumNumberOfWorkers = false
}
transferList?: readonly TransferListItem[]
): void
+ /**
+ * Initializes the worker node usage with sensible default values gathered during runtime.
+ *
+ * @param workerNode - The worker node.
+ */
+ private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .runTime.aggregate === true
+ ) {
+ workerNode.usage.runTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.runTime.aggregate ?? Infinity
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
+ .waitTime.aggregate === true
+ ) {
+ workerNode.usage.waitTime.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
+ )
+ )
+ }
+ if (
+ this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
+ .aggregate === true
+ ) {
+ workerNode.usage.elu.active.aggregate = min(
+ ...this.workerNodes.map(
+ workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
+ )
+ )
+ }
+ }
+
/**
* Creates a new, completely set up worker node.
*
if (workerNode.info.dynamic) {
this.createAndSetupDynamicWorkerNode()
} else if (!this.startingMinimumNumberOfWorkers) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
}
if (
!this.startingMinimumNumberOfWorkers &&
!this.destroying
) {
- this.startMinimumNumberOfWorkers()
+ this.startMinimumNumberOfWorkers(true)
}
})
const workerNodeKey = this.addWorkerNode(workerNode)
) {
workerNode.info.ready = true
}
+ this.initWorkerNodeUsage(workerNode)
this.checkAndEmitDynamicWorkerCreationEvents()
return workerNodeKey
}
/**
* Gets the worker node task runtime.
* If the task statistics require the average runtime, the average runtime is returned.
- * If the task statistics require the median runtime , the median runtime is returned.
+ * If the task statistics require the median runtime, the median runtime is returned.
*
* @param workerNodeKey - The worker node key.
* @returns The worker node task runtime.
import type { IPool } from '../pool.js'
-import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS } from '../utils.js'
import type { IWorker } from '../worker.js'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
import {
average: true,
median: false
},
- waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+ waitTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
elu: {
aggregate: true,
average: true,
workerNodeKey: number,
workerNodeVirtualTaskStartTimestamp: number
): number {
- const workerNodeTaskRunTime =
+ const workerNodeTaskExecutionTime =
+ this.getWorkerNodeTaskWaitTime(workerNodeKey) +
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- this.opts!.measurement === Measurements.elu
+ (this.opts!.measurement === Measurements.elu
? this.getWorkerNodeTaskElu(workerNodeKey)
- : this.getWorkerNodeTaskRunTime(workerNodeKey)
- return workerNodeVirtualTaskStartTimestamp + workerNodeTaskRunTime
+ : this.getWorkerNodeTaskRunTime(workerNodeKey))
+ return workerNodeVirtualTaskStartTimestamp + workerNodeTaskExecutionTime
}
private getWorkerNodeVirtualTaskStartTimestamp (
average: true,
median: false
},
- waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+ waitTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
}
*/
private workerNodeId = 0
/**
- * Worker node virtual task runtime.
+ * Worker node virtual execution time.
*/
- private workerNodeVirtualTaskRunTime = 0
+ private workerNodeVirtualTaskExecutionTime = 0
/** @inheritDoc */
public constructor (
this.resetWorkerNodeKeyProperties()
this.roundId = 0
this.workerNodeId = 0
- this.workerNodeVirtualTaskRunTime = 0
+ this.workerNodeVirtualTaskExecutionTime = 0
return true
}
this.workerNodeId = workerNodeKey
if (
this.workerNodeId !== this.nextWorkerNodeKey &&
- this.workerNodeVirtualTaskRunTime !== 0
+ this.workerNodeVirtualTaskExecutionTime !== 0
) {
- this.workerNodeVirtualTaskRunTime = 0
+ this.workerNodeVirtualTaskExecutionTime = 0
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const workerWeight = this.opts!.weights![workerNodeKey]
if (
this.isWorkerNodeReady(workerNodeKey) &&
workerWeight >= this.roundWeights[roundIndex] &&
- this.workerNodeVirtualTaskRunTime < workerWeight
+ this.workerNodeVirtualTaskExecutionTime < workerWeight
) {
- this.workerNodeVirtualTaskRunTime =
- this.workerNodeVirtualTaskRunTime +
+ this.workerNodeVirtualTaskExecutionTime +=
+ this.getWorkerNodeTaskWaitTime(workerNodeKey) +
this.getWorkerNodeTaskRunTime(workerNodeKey)
this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
this.nextWorkerNodeKey = workerNodeKey
if (this.pool.workerNodes.length === 0) {
this.resetWorkerNodeKeyProperties()
this.workerNodeId = 0
- this.workerNodeVirtualTaskRunTime = 0
+ this.workerNodeVirtualTaskExecutionTime = 0
return true
}
if (
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
- (workerNode.usage.runTime.aggregate ?? 0) +
- (workerNode.usage.waitTime.aggregate ?? 0) <
- (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) +
- (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0)
+ (workerNode.usage.waitTime.aggregate ?? 0) +
+ (workerNode.usage.runTime.aggregate ?? 0) <
+ (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) +
+ (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0)
? workerNodeKey
: minWorkerNodeKey
},
return this.pool.workerNodes.reduce(
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
return this.isWorkerNodeReady(workerNodeKey) &&
- workerNode.usage.tasks.executed +
- workerNode.usage.tasks.executing +
- workerNode.usage.tasks.queued <
- workerNodes[minWorkerNodeKey].usage.tasks.executed +
- workerNodes[minWorkerNodeKey].usage.tasks.executing +
+ workerNode.usage.tasks.executing + workerNode.usage.tasks.queued <
+ workerNodes[minWorkerNodeKey].usage.tasks.executing +
workerNodes[minWorkerNodeKey].usage.tasks.queued
? workerNodeKey
: minWorkerNodeKey
average: true,
median: false
},
- waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+ waitTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
}
/**
- * Worker node virtual task runtime.
+ * Worker node virtual execution time.
*/
- private workerNodeVirtualTaskRunTime = 0
+ private workerNodeVirtualTaskExecutionTime = 0
/** @inheritDoc */
public constructor (
/** @inheritDoc */
public reset (): boolean {
this.resetWorkerNodeKeyProperties()
- this.workerNodeVirtualTaskRunTime = 0
+ this.workerNodeVirtualTaskExecutionTime = 0
return true
}
return true
}
if (this.nextWorkerNodeKey === workerNodeKey) {
- this.workerNodeVirtualTaskRunTime = 0
+ this.workerNodeVirtualTaskExecutionTime = 0
if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
}
const workerWeight =
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.opts!.weights![this.nextWorkerNodeKey ?? this.previousWorkerNodeKey]
- if (this.workerNodeVirtualTaskRunTime < workerWeight) {
- this.workerNodeVirtualTaskRunTime =
- this.workerNodeVirtualTaskRunTime +
+ if (this.workerNodeVirtualTaskExecutionTime < workerWeight) {
+ this.workerNodeVirtualTaskExecutionTime +=
+ this.getWorkerNodeTaskWaitTime(
+ this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+ ) +
this.getWorkerNodeTaskRunTime(
this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
)
this.nextWorkerNodeKey === this.pool.workerNodes.length - 1
? 0
: (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
- this.workerNodeVirtualTaskRunTime = 0
+ this.workerNodeVirtualTaskExecutionTime = 0
}
return this.nextWorkerNodeKey
}
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: true
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
+ ).workerNodeVirtualTaskExecutionTime
).toBe(0)
} else if (
workerChoiceStrategy ===
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
workerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
+ ).workerNodeVirtualTaskExecutionTime
).toBe(0)
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: expect.objectContaining({
idle: expect.objectContaining({
history: expect.any(CircularArray)
} else {
expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
}
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.average == null) {
+ expect(workerNode.usage.waitTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
+ }
if (workerNode.usage.elu.active.aggregate == null) {
expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
} else {
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: expect.objectContaining({
idle: expect.objectContaining({
history: expect.any(CircularArray)
} else {
expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
}
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.average == null) {
+ expect(workerNode.usage.waitTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
+ }
if (workerNode.usage.elu.active.aggregate == null) {
expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
} else {
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: expect.objectContaining({
idle: expect.objectContaining({
history: expect.any(CircularArray)
} else {
expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
}
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.median == null) {
+ expect(workerNode.usage.waitTime.median).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.median).toBeGreaterThan(0)
+ }
if (workerNode.usage.elu.active.aggregate == null) {
expect(workerNode.usage.elu.active.aggregate).toBeUndefined()
} else {
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: {
idle: {
history: new CircularArray()
} else {
expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
}
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.average == null) {
+ expect(workerNode.usage.waitTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
+ ).workerNodeVirtualTaskExecutionTime
).toBeGreaterThanOrEqual(0)
// We need to clean up the resources after our test
await pool.destroy()
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: {
idle: {
history: new CircularArray()
} else {
expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
}
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.average == null) {
+ expect(workerNode.usage.waitTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
+ ).workerNodeVirtualTaskExecutionTime
).toBeGreaterThanOrEqual(0)
// We need to clean up the resources after our test
await pool.destroy()
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: {
idle: {
history: new CircularArray()
} else {
expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
}
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.median == null) {
+ expect(workerNode.usage.waitTime.median).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.median).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
pool.workerChoiceStrategiesContext.defaultWorkerChoiceStrategy
- ).workerNodeVirtualTaskRunTime
+ ).workerNodeVirtualTaskExecutionTime
).toBeGreaterThanOrEqual(0)
// We need to clean up the resources after our test
await pool.destroy()
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
median: false
},
waitTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
elu: {
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: {
idle: {
history: new CircularArray()
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
max * maxMultiplier
)
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.average == null) {
+ expect(workerNode.usage.runTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.average == null) {
+ expect(workerNode.usage.waitTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
runTime: expect.objectContaining({
history: expect.any(CircularArray)
}),
- waitTime: {
- history: new CircularArray()
- },
+ waitTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
elu: {
idle: {
history: new CircularArray()
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
max * maxMultiplier
)
+ if (workerNode.usage.runTime.aggregate == null) {
+ expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.runTime.average == null) {
+ expect(workerNode.usage.runTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.aggregate == null) {
+ expect(workerNode.usage.waitTime.aggregate).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.aggregate).toBeGreaterThan(0)
+ }
+ if (workerNode.usage.waitTime.average == null) {
+ expect(workerNode.usage.waitTime.average).toBeUndefined()
+ } else {
+ expect(workerNode.usage.waitTime.average).toBeGreaterThan(0)
+ }
}
expect(
pool.workerChoiceStrategiesContext.workerChoiceStrategies.get(
expect(strategy.reset()).toBe(true)
expect(strategy.nextWorkerNodeKey).toBe(0)
expect(strategy.previousWorkerNodeKey).toBe(0)
- expect(strategy.workerNodeVirtualTaskRunTime).toBe(0)
+ expect(strategy.workerNodeVirtualTaskExecutionTime).toBe(0)
})
it('Verify that IWRR reset() resets internals', () => {
expect(strategy.previousWorkerNodeKey).toBe(0)
expect(strategy.roundId).toBe(0)
expect(strategy.workerNodeId).toBe(0)
- expect(strategy.workerNodeVirtualTaskRunTime).toBe(0)
+ expect(strategy.workerNodeVirtualTaskExecutionTime).toBe(0)
})
})