1 import type { IPool
} from
'../pool.js'
2 import type { IWorker
} from
'../worker.js'
5 TaskStatisticsRequirements
,
6 WorkerChoiceStrategyOptions
,
7 } from
'./selection-strategies-types.js'
9 import { DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
} from
'../utils.js'
10 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy.js'
13 * Selects the next worker with an interleaved weighted round robin scheduling algorithm.
14 * @typeParam Worker - Type of worker which manages the strategy.
15 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
16 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 export class InterleavedWeightedRoundRobinWorkerChoiceStrategy
<
19 Worker
extends IWorker
,
23 extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
>
24 implements IWorkerChoiceStrategy
{
33 private roundWeights
: number[]
37 private workerNodeId
= 0
39 * Worker node virtual execution time.
41 private workerNodeVirtualTaskExecutionTime
= 0
43 public override
readonly taskStatisticsRequirements
: TaskStatisticsRequirements
=
45 elu
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
60 pool
: IPool
<Worker
, Data
, Response
>,
61 opts
?: WorkerChoiceStrategyOptions
64 this.setTaskStatisticsRequirements(this.opts
)
65 this.roundWeights
= this.getRoundWeights()
68 private getRoundWeights (): number[] {
71 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
72 Object.values(this.opts
!.weights
!)
74 .sort((a
, b
) => a
- b
)
79 private interleavedWeightedRoundRobinNextWorkerNodeId (): void {
80 if (this.pool
.workerNodes
.length
=== 0) {
83 this.roundId
=== this.roundWeights
.length
- 1 &&
84 this.workerNodeId
=== this.pool
.workerNodes
.length
- 1
88 } else if (this.workerNodeId
=== this.pool
.workerNodes
.length
- 1) {
89 this.roundId
= this.roundId
+ 1
92 this.workerNodeId
= this.workerNodeId
+ 1
97 public choose (): number | undefined {
99 let roundIndex
= this.roundId
;
100 roundIndex
< this.roundWeights
.length
;
103 this.roundId
= roundIndex
105 let workerNodeKey
= this.workerNodeId
;
106 workerNodeKey
< this.pool
.workerNodes
.length
;
109 this.workerNodeId
= workerNodeKey
111 this.workerNodeId
!== this.nextWorkerNodeKey
&&
112 this.workerNodeVirtualTaskExecutionTime
!== 0
114 this.workerNodeVirtualTaskExecutionTime
= 0
116 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
117 const workerWeight
= this.opts
!.weights
![workerNodeKey
]
119 this.isWorkerNodeReady(workerNodeKey
) &&
120 workerWeight
>= this.roundWeights
[roundIndex
] &&
121 this.workerNodeVirtualTaskExecutionTime
< workerWeight
123 this.workerNodeVirtualTaskExecutionTime
+=
124 this.getWorkerNodeTaskWaitTime(workerNodeKey
) +
125 this.getWorkerNodeTaskRunTime(workerNodeKey
)
126 this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey
)
127 this.nextWorkerNodeKey
= workerNodeKey
128 return this.nextWorkerNodeKey
132 this.interleavedWeightedRoundRobinNextWorkerNodeId()
136 public remove (workerNodeKey
: number): boolean {
137 if (this.pool
.workerNodes
.length
=== 0) {
138 this.resetWorkerNodeKeyProperties()
139 this.workerNodeId
= 0
140 this.workerNodeVirtualTaskExecutionTime
= 0
144 this.workerNodeId
=== workerNodeKey
&&
145 this.workerNodeId
> this.pool
.workerNodes
.length
- 1
147 this.workerNodeId
= this.pool
.workerNodes
.length
- 1
150 this.previousWorkerNodeKey
=== workerNodeKey
&&
151 this.previousWorkerNodeKey
> this.pool
.workerNodes
.length
- 1
153 this.previousWorkerNodeKey
= this.pool
.workerNodes
.length
- 1
159 public reset (): boolean {
160 this.resetWorkerNodeKeyProperties()
162 this.workerNodeId
= 0
163 this.workerNodeVirtualTaskExecutionTime
= 0
168 public override
setOptions (
169 opts
: undefined | WorkerChoiceStrategyOptions
171 super.setOptions(opts
)
172 this.roundWeights
= this.getRoundWeights()
176 public update (): boolean {