1 import { cpus
} from
'node:os'
2 import type { IWorker
} from
'../worker'
3 import type { IPool
} from
'../pool'
4 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
} from
'../../utils'
5 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy'
9 WorkerChoiceStrategyOptions
10 } from
'./selection-strategies-types'
13 * Virtual task runtime.
15 interface TaskRunTime
{
21 * Selects the next worker with a weighted round robin scheduling algorithm.
22 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
24 * @typeParam Worker - Type of worker which manages the strategy.
25 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
26 * @typeParam Response - Type of execution response. This can only be serializable data.
28 export class WeightedRoundRobinWorkerChoiceStrategy
<
29 Worker
extends IWorker
,
33 extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
>
34 implements IWorkerChoiceStrategy
{
36 public readonly requiredStatistics
: RequiredStatistics
= {
43 * Worker node id where the current task will be submitted.
45 private currentWorkerNodeId
: number = 0
47 * Default worker weight.
49 private readonly defaultWorkerWeight
: number
51 * Workers' virtual task runtime.
53 private readonly workersTaskRunTime
: Map
<number, TaskRunTime
> = new Map
<
60 pool
: IPool
<Worker
, Data
, Response
>,
61 opts
: WorkerChoiceStrategyOptions
= DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
64 this.checkOptions(this.opts
)
65 this.defaultWorkerWeight
= this.computeWorkerWeight()
66 this.initWorkersTaskRunTime()
70 public reset (): boolean {
71 this.currentWorkerNodeId
= 0
72 this.workersTaskRunTime
.clear()
73 this.initWorkersTaskRunTime()
78 public choose (): number {
79 const chosenWorkerNodeKey
= this.currentWorkerNodeId
82 !this.workersTaskRunTime
.has(chosenWorkerNodeKey
)
84 this.initWorkerTaskRunTime(chosenWorkerNodeKey
)
86 const workerTaskRunTime
=
87 this.workersTaskRunTime
.get(chosenWorkerNodeKey
)?.runTime
?? 0
88 const workerTaskWeight
=
89 this.workersTaskRunTime
.get(chosenWorkerNodeKey
)?.weight
??
90 this.defaultWorkerWeight
91 if (workerTaskRunTime
< workerTaskWeight
) {
92 this.setWorkerTaskRunTime(
96 (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey
) ?? 0)
99 this.currentWorkerNodeId
=
100 this.currentWorkerNodeId
=== this.pool
.workerNodes
.length
- 1
102 : this.currentWorkerNodeId
+ 1
103 this.setWorkerTaskRunTime(this.currentWorkerNodeId
, workerTaskWeight
, 0)
105 return chosenWorkerNodeKey
109 public remove (workerNodeKey
: number): boolean {
110 if (this.currentWorkerNodeId
=== workerNodeKey
) {
111 if (this.pool
.workerNodes
.length
=== 0) {
112 this.currentWorkerNodeId
= 0
114 this.currentWorkerNodeId
=
115 this.currentWorkerNodeId
> this.pool
.workerNodes
.length
- 1
116 ? this.pool
.workerNodes
.length
- 1
117 : this.currentWorkerNodeId
120 const deleted
= this.workersTaskRunTime
.delete(workerNodeKey
)
121 for (const [key
, value
] of this.workersTaskRunTime
) {
122 if (key
> workerNodeKey
) {
123 this.workersTaskRunTime
.set(key
- 1, value
)
129 private initWorkersTaskRunTime (): void {
130 for (const [index
] of this.pool
.workerNodes
.entries()) {
131 this.initWorkerTaskRunTime(index
)
135 private initWorkerTaskRunTime (workerNodeKey
: number): void {
136 this.setWorkerTaskRunTime(workerNodeKey
, this.defaultWorkerWeight
, 0)
139 private setWorkerTaskRunTime (
140 workerNodeKey
: number,
144 this.workersTaskRunTime
.set(workerNodeKey
, {
150 private getWorkerVirtualTaskRunTime (workerNodeKey
: number): number {
151 return this.requiredStatistics
.medRunTime
152 ? this.pool
.workerNodes
[workerNodeKey
].tasksUsage
.medRunTime
153 : this.pool
.workerNodes
[workerNodeKey
].tasksUsage
.avgRunTime
156 private computeWorkerWeight (): number {
157 let cpusCycleTimeWeight
= 0
158 for (const cpu
of cpus()) {
159 // CPU estimated cycle time
160 const numberOfDigits
= cpu
.speed
.toString().length
- 1
161 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
162 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
164 return Math.round(cpusCycleTimeWeight
/ cpus().length
)