Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
- Bundle typescript types declaration into one file.
- Bundle typescript types declaration into one file.
+### Changed
+
+- Improve interleaved weighted round robin worker choice strategy implementation.
+
## [2.6.37] - 2023-08-28
### Fixed
## [2.6.37] - 2023-08-28
### Fixed
-The worker weights are maximum tasks execution time, once the worker has reached its maximum tasks execution time, the next task is assigned to the next worker. The default worker weight is the same for each and computed given the CPU cores speed and theirs numbers.
+The worker weights are maximum tasks execution time. Once the worker has reached its maximum tasks execution time, the next task is assigned to the next worker. The default worker weight is the same for each and computed given the CPU cores speed and theirs numbers.
-### Interleaved weighted round robin
+### Interleaved weighted round robin (experimental)
The worker weights are maximum tasks execution time. The rounds are the deduplicated worker weights.
The worker weights are maximum tasks execution time. The rounds are the deduplicated worker weights.
-During a round, if the worker weight is superior or equal to the current round weight, the task is assigned to the worker. Once all workers weight have been tested, the next round starts.
+During a round, if the worker weight is superior or equal to the current round weight and its tasks execution time is inferior or equal to the current round weight, the task is assigned to the worker. Once all workers weight have been tested, the next round starts.
The default worker weights is the same for each and computed given the CPU cores speed and theirs numbers. So the default 'rounds' consists of a unique worker weight.
## Statistics
The default worker weights is the same for each and computed given the CPU cores speed and theirs numbers. So the default 'rounds' consists of a unique worker weight.
## Statistics
/**
* Check the next worker node eligibility.
*
/**
* Check the next worker node eligibility.
*
- * @param chosenNextWorkerNodeKey - The chosen worker node key.
+ * @param chosenNextWorkerNodeKey - The chosen next worker node key.
*/
protected checkNextWorkerNodeEligibility (
chosenNextWorkerNodeKey: number | undefined
*/
protected checkNextWorkerNodeEligibility (
chosenNextWorkerNodeKey: number | undefined
import type { IWorker } from '../worker'
import type { IPool } from '../pool'
import type { IWorker } from '../worker'
import type { IPool } from '../pool'
-import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import {
+ DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+ DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+} from '../../utils'
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
import type {
IWorkerChoiceStrategy,
+ TaskStatisticsRequirements,
WorkerChoiceStrategyOptions
} from './selection-strategies-types'
WorkerChoiceStrategyOptions
} from './selection-strategies-types'
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
>
extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
implements IWorkerChoiceStrategy {
+ /** @inheritDoc */
+ public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
+ runTime: {
+ aggregate: true,
+ average: true,
+ median: false
+ },
+ waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
+ elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
+ }
+
- * This is used to determine the current round weight.
*/
private roundId: number = 0
*/
private roundId: number = 0
+ /**
+ * Default worker weight.
+ */
+ private readonly defaultWorkerWeight: number
/**
* Round weights.
*/
private roundWeights: number[]
/**
/**
* Round weights.
*/
private roundWeights: number[]
/**
- * Default worker weight.
- private readonly defaultWorkerWeight: number
+ private workerNodeId: number = 0
+ /**
+ * Worker virtual task runtime.
+ */
+ private workerVirtualTaskRunTime: number = 0
/** @inheritDoc */
public constructor (
/** @inheritDoc */
public constructor (
public reset (): boolean {
this.resetWorkerNodeKeyProperties()
this.roundId = 0
public reset (): boolean {
this.resetWorkerNodeKeyProperties()
this.roundId = 0
+ this.workerNodeId = 0
+ this.workerVirtualTaskRunTime = 0
/** @inheritDoc */
public choose (): number | undefined {
/** @inheritDoc */
public choose (): number | undefined {
- let roundId!: number
- let workerNodeId: number | undefined
for (
let roundIndex = this.roundId;
roundIndex < this.roundWeights.length;
roundIndex++
) {
for (
let roundIndex = this.roundId;
roundIndex < this.roundWeights.length;
roundIndex++
) {
+ this.roundId = roundIndex
- let workerNodeKey =
- this.nextWorkerNodeKey ?? this.previousWorkerNodeKey;
+ let workerNodeKey = this.workerNodeId;
workerNodeKey < this.pool.workerNodes.length;
workerNodeKey++
) {
workerNodeKey < this.pool.workerNodes.length;
workerNodeKey++
) {
+ this.workerNodeId = workerNodeKey
if (!this.isWorkerNodeEligible(workerNodeKey)) {
continue
}
if (!this.isWorkerNodeEligible(workerNodeKey)) {
continue
}
+ if (
+ this.workerNodeId !== this.nextWorkerNodeKey &&
+ this.workerVirtualTaskRunTime !== 0
+ ) {
+ this.workerVirtualTaskRunTime = 0
+ }
const workerWeight =
this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
const workerWeight =
this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight
- if (workerWeight >= this.roundWeights[roundIndex]) {
- workerNodeId = workerNodeKey
- break
+ if (
+ workerWeight >= this.roundWeights[roundIndex] &&
+ this.workerVirtualTaskRunTime < workerWeight
+ ) {
+ this.workerVirtualTaskRunTime =
+ this.workerVirtualTaskRunTime +
+ this.getWorkerTaskRunTime(workerNodeKey)
+ this.previousWorkerNodeKey =
+ this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
+ this.nextWorkerNodeKey = workerNodeKey
+ return this.nextWorkerNodeKey
- this.roundId = roundId
- if (workerNodeId == null) {
- this.previousWorkerNodeKey =
- this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
- }
- this.nextWorkerNodeKey = workerNodeId
- const chosenWorkerNodeKey = this.nextWorkerNodeKey
- if (this.nextWorkerNodeKey === this.pool.workerNodes.length - 1) {
- this.nextWorkerNodeKey = 0
- this.roundId =
- this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
+ this.interleavedWeightedRoundRobinNextWorkerNodeId()
+ }
+
+ private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
+ if (
+ this.roundId === this.roundWeights.length - 1 &&
+ this.workerNodeId === this.pool.workerNodes.length - 1
+ ) {
+ this.roundId = 0
+ this.workerNodeId = 0
+ } else if (this.workerNodeId === this.pool.workerNodes.length - 1) {
+ this.roundId = this.roundId + 1
+ this.workerNodeId = 0
- this.nextWorkerNodeKey =
- (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1
+ this.workerNodeId = this.workerNodeId + 1
- return chosenWorkerNodeKey
if (this.pool.workerNodes.length === 0) {
this.nextWorkerNodeKey = 0
} else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
if (this.pool.workerNodes.length === 0) {
this.nextWorkerNodeKey = 0
} else if (this.nextWorkerNodeKey > this.pool.workerNodes.length - 1) {
- this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
this.roundId =
this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
this.roundId =
this.roundId === this.roundWeights.length - 1 ? 0 : this.roundId + 1
+ this.nextWorkerNodeKey = this.pool.workerNodes.length - 1
+ this.workerVirtualTaskRunTime = 0
}
private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
}
private weightedRoundRobinNextWorkerNodeKey (): number | undefined {
- const workerVirtualTaskRunTime = this.workerVirtualTaskRunTime
const workerWeight =
this.opts.weights?.[
this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
] ?? this.defaultWorkerWeight
const workerWeight =
this.opts.weights?.[
this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
] ?? this.defaultWorkerWeight
- if (workerVirtualTaskRunTime < workerWeight) {
+ if (this.workerVirtualTaskRunTime < workerWeight) {
this.workerVirtualTaskRunTime =
this.workerVirtualTaskRunTime =
- workerVirtualTaskRunTime +
+ this.workerVirtualTaskRunTime +
this.getWorkerTaskRunTime(
this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
)
this.getWorkerTaskRunTime(
this.nextWorkerNodeKey ?? this.previousWorkerNodeKey
)
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ waitTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ waitTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
- history: expect.any(CircularArray)
+ history: new CircularArray()
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
- history: expect.any(CircularArray)
+ history: new CircularArray()
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
}
await Promise.all(promises)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.usage).toMatchObject({
+ expect(workerNode.usage).toStrictEqual({
tasks: {
executed: expect.any(Number),
executing: 0,
tasks: {
executed: expect.any(Number),
executing: 0,
+ runTime: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
- history: expect.any(CircularArray)
+ history: new CircularArray()
+ elu: expect.objectContaining({
+ idle: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
+ }),
+ active: expect.objectContaining({
history: expect.any(CircularArray)
history: expect.any(CircularArray)
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
})
expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
history: expect.any(CircularArray)
}),
waitTime: {
history: expect.any(CircularArray)
}),
waitTime: {
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
history: expect.any(CircularArray)
}),
waitTime: {
history: expect.any(CircularArray)
}),
waitTime: {
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
history: expect.any(CircularArray)
}),
waitTime: {
history: expect.any(CircularArray)
}),
waitTime: {
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
- history: expect.any(CircularArray)
+ history: new CircularArray()
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
waitTime: {
median: false
},
waitTime: {
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
).toStrictEqual({
runTime: {
- aggregate: false,
- average: false,
+ aggregate: true,
+ average: true,
median: false
},
waitTime: {
median: false
},
waitTime: {
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
for (const workerNode of pool.workerNodes) {
expect(workerNode.usage).toStrictEqual({
tasks: {
- executed: maxMultiplier,
+ executed: expect.any(Number),
executing: 0,
queued: 0,
maxQueued: 0,
stolen: 0,
failed: 0
},
executing: 0,
queued: 0,
maxQueued: 0,
stolen: 0,
failed: 0
},
- runTime: {
- history: new CircularArray()
- },
+ runTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
waitTime: {
history: new CircularArray()
},
waitTime: {
history: new CircularArray()
},
+ expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+ expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+ max * maxMultiplier
+ )
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
}
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
).roundId
).toBe(0)
pool.workerChoiceStrategyContext.workerChoiceStrategy
).roundId
).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).workerNodeId
+ ).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
- runTime: {
- history: new CircularArray()
- },
+ runTime: expect.objectContaining({
+ history: expect.any(CircularArray)
+ }),
waitTime: {
history: new CircularArray()
},
waitTime: {
history: new CircularArray()
},
pool.workerChoiceStrategyContext.workerChoiceStrategy
).roundId
).toBe(0)
pool.workerChoiceStrategyContext.workerChoiceStrategy
).roundId
).toBe(0)
+ expect(
+ pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+ pool.workerChoiceStrategyContext.workerChoiceStrategy
+ ).workerNodeId
+ ).toBe(0)
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy
expect(
pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
pool.workerChoiceStrategyContext.workerChoiceStrategy