1 import { cpus
} from
'os'
2 import type { AbstractPoolWorker
} from
'../abstract-pool-worker'
3 import type { IPoolInternal
} from
'../pool-internal'
4 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy'
15 * Selects the next worker with a weighted round robin scheduling algorithm.
16 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 * @template Worker Type of worker which manages the strategy.
19 * @template Data Type of data sent to the worker. This can only be serializable data.
20 * @template Response Type of response of execution. This can only be serializable data.
22 export class WeightedRoundRobinWorkerChoiceStrategy
<
23 Worker
extends AbstractPoolWorker
,
26 > extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
> {
28 * Worker index where the previous task was submitted.
30 private previousWorkerIndex
: number = 0
32 * Worker index where the current task will be submitted.
34 private currentWorkerIndex
: number = 0
36 * Default worker weight.
38 private defaultWorkerWeight
: number
40 * Per worker task runtime map.
42 private workerTaskRunTime
: Map
<Worker
, TaskRunTime
> = new Map
<
48 * Constructs a worker choice strategy that selects based a weighted round robin scheduling algorithm.
50 * @param pool The pool instance.
52 public constructor (pool
: IPoolInternal
<Worker
, Data
, Response
>) {
54 this.defaultWorkerWeight
= this.computeWorkerWeight()
55 this.initWorkerTaskRunTime()
59 public choose (): Worker
{
60 const currentWorker
= this.pool
.workers
[this.currentWorkerIndex
]
61 if (this.isDynamicPool
=== true) {
62 this.workerTaskRunTime
.has(currentWorker
) === false &&
63 this.workerTaskRunTime
.set(currentWorker
, {
64 weight
: this.defaultWorkerWeight
,
68 const workerVirtualTaskRunTime
=
69 this.getWorkerVirtualTaskRunTime(currentWorker
) ?? 0
70 const workerTaskWeight
=
71 this.workerTaskRunTime
.get(currentWorker
)?.weight
??
72 this.defaultWorkerWeight
73 if (this.currentWorkerIndex
=== this.previousWorkerIndex
) {
74 const workerTaskRunTime
=
75 (this.workerTaskRunTime
.get(currentWorker
)?.runTime
?? 0) +
76 workerVirtualTaskRunTime
77 this.workerTaskRunTime
.set(currentWorker
, {
78 weight
: workerTaskWeight
,
79 runTime
: workerTaskRunTime
82 this.workerTaskRunTime
.set(currentWorker
, {
83 weight
: workerTaskWeight
,
88 workerVirtualTaskRunTime
<
89 (this.workerTaskRunTime
.get(currentWorker
) ?? this.defaultWorkerWeight
)
91 this.previousWorkerIndex
= this.currentWorkerIndex
93 this.previousWorkerIndex
= this.currentWorkerIndex
94 this.currentWorkerIndex
=
95 this.pool
.workers
.length
- 1 === this.currentWorkerIndex
97 : this.currentWorkerIndex
+ 1
99 return this.pool
.workers
[this.currentWorkerIndex
]
102 private computeWorkerWeight () {
103 let cpusCycleTimeWeight
= 0
104 for (let cpu
= 0; cpu
< cpus().length
; cpu
++) {
105 // CPU estimated cycle time
106 const numberOfDigit
= cpus()[cpu
].speed
.toString().length
- 1
107 const cpuCycleTime
= 1 / (cpus()[cpu
].speed
/ Math.pow(10, numberOfDigit
))
108 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigit
)
110 return cpusCycleTimeWeight
/ cpus().length
113 private initWorkerTaskRunTime () {
114 for (const worker
of this.pool
.workers
) {
115 this.workerTaskRunTime
.set(worker
, {
116 weight
: this.defaultWorkerWeight
,
122 private getWorkerVirtualTaskRunTime (worker
: Worker
): number | undefined {
123 return this.pool
.getWorkerAverageTasksRunTime(worker
)