1 import { cpus
} from
'os'
2 import type { AbstractPoolWorker
} from
'../abstract-pool-worker'
3 import type { IPoolInternal
} from
'../pool-internal'
4 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy'
5 import type { RequiredStatistics
} from
'./selection-strategies-types'
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
23 export class WeightedRoundRobinWorkerChoiceStrategy
<
24 Worker
extends AbstractPoolWorker
,
27 > extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
> {
29 public requiredStatistics
: RequiredStatistics
= {
34 * Worker index where the previous task was submitted.
36 private previousWorkerIndex
: number = 0
38 * Worker index where the current task will be submitted.
40 private currentWorkerIndex
: number = 0
42 * Default worker weight.
44 private defaultWorkerWeight
: number
46 * Per worker task runtime map.
48 private workerTaskRunTime
: Map
<Worker
, TaskRunTime
> = new Map
<
54 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
56 * @param pool The pool instance.
58 public constructor (pool
: IPoolInternal
<Worker
, Data
, Response
>) {
60 this.defaultWorkerWeight
= this.computeWorkerWeight()
61 this.initWorkerTaskRunTime()
65 public choose (): Worker
{
66 const currentWorker
= this.pool
.workers
[this.currentWorkerIndex
]
67 if (this.isDynamicPool
=== true) {
68 this.workerTaskRunTime
.has(currentWorker
) === false &&
69 this.workerTaskRunTime
.set(currentWorker
, {
70 weight
: this.defaultWorkerWeight
,
74 const workerVirtualTaskRunTime
=
75 this.getWorkerVirtualTaskRunTime(currentWorker
) ?? 0
76 const workerTaskWeight
=
77 this.workerTaskRunTime
.get(currentWorker
)?.weight
??
78 this.defaultWorkerWeight
79 if (this.currentWorkerIndex
=== this.previousWorkerIndex
) {
80 const workerTaskRunTime
=
81 (this.workerTaskRunTime
.get(currentWorker
)?.runTime
?? 0) +
82 workerVirtualTaskRunTime
83 this.workerTaskRunTime
.set(currentWorker
, {
84 weight
: workerTaskWeight
,
85 runTime
: workerTaskRunTime
88 this.workerTaskRunTime
.set(currentWorker
, {
89 weight
: workerTaskWeight
,
94 workerVirtualTaskRunTime
<
95 (this.workerTaskRunTime
.get(currentWorker
) ?? this.defaultWorkerWeight
)
97 this.previousWorkerIndex
= this.currentWorkerIndex
99 this.previousWorkerIndex
= this.currentWorkerIndex
100 this.currentWorkerIndex
=
101 this.pool
.workers
.length
- 1 === this.currentWorkerIndex
103 : this.currentWorkerIndex
+ 1
105 return this.pool
.workers
[this.currentWorkerIndex
]
108 private computeWorkerWeight () {
109 let cpusCycleTimeWeight
= 0
110 for (const cpu
of cpus()) {
111 // CPU estimated cycle time
112 const numberOfDigit
= cpu
.speed
.toString().length
- 1
113 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigit
))
114 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigit
)
116 return cpusCycleTimeWeight
/ cpus().length
119 private initWorkerTaskRunTime () {
120 for (const worker
of this.pool
.workers
) {
121 this.workerTaskRunTime
.set(worker
, {
122 weight
: this.defaultWorkerWeight
,
128 private getWorkerVirtualTaskRunTime (worker
: Worker
): number | undefined {
129 return this.pool
.getWorkerAverageTasksRunTime(worker
)