1 import { cpus
} from
'node:os'
2 import type { IPoolInternal
} from
'../pool-internal'
3 import type { IWorker
} from
'../worker'
4 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy'
8 WorkerChoiceStrategyOptions
9 } from
'./selection-strategies-types'
12 * Virtual task runtime.
14 interface TaskRunTime
{
20 * Selects the next worker with a weighted round robin scheduling algorithm.
21 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
23 * @typeParam Worker - Type of worker which manages the strategy.
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of response of execution. This can only be serializable data.
27 export class WeightedRoundRobinWorkerChoiceStrategy
<
28 Worker
extends IWorker
,
32 extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
>
33 implements IWorkerChoiceStrategy
{
35 public readonly requiredStatistics
: RequiredStatistics
= {
42 * Worker node id where the current task will be submitted.
44 private currentWorkerNodeId
: number = 0
46 * Default worker weight.
48 private readonly defaultWorkerWeight
: number
50 * Workers' virtual task runtime.
52 private readonly workersTaskRunTime
: Map
<number, TaskRunTime
> = new Map
<
58 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
60 * @param pool - The pool instance.
61 * @param opts - The worker choice strategy options.
64 pool
: IPoolInternal
<Worker
, Data
, Response
>,
65 opts
?: WorkerChoiceStrategyOptions
68 this.defaultWorkerWeight
= this.computeWorkerWeight()
69 this.initWorkersTaskRunTime()
73 public reset (): boolean {
74 this.currentWorkerNodeId
= 0
75 this.workersTaskRunTime
.clear()
76 this.initWorkersTaskRunTime()
81 public choose (): number {
82 const chosenWorkerNodeKey
= this.currentWorkerNodeId
85 !this.workersTaskRunTime
.has(chosenWorkerNodeKey
)
87 this.initWorkerTaskRunTime(chosenWorkerNodeKey
)
89 const workerTaskRunTime
=
90 this.workersTaskRunTime
.get(chosenWorkerNodeKey
)?.runTime
?? 0
91 const workerTaskWeight
=
92 this.workersTaskRunTime
.get(chosenWorkerNodeKey
)?.weight
??
93 this.defaultWorkerWeight
94 if (workerTaskRunTime
< workerTaskWeight
) {
95 this.setWorkerTaskRunTime(
99 (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey
) ?? 0)
102 this.currentWorkerNodeId
=
103 this.currentWorkerNodeId
=== this.pool
.workerNodes
.length
- 1
105 : this.currentWorkerNodeId
+ 1
106 this.setWorkerTaskRunTime(this.currentWorkerNodeId
, workerTaskWeight
, 0)
108 return chosenWorkerNodeKey
112 public remove (workerNodeKey
: number): boolean {
113 if (this.currentWorkerNodeId
=== workerNodeKey
) {
114 if (this.pool
.workerNodes
.length
=== 0) {
115 this.currentWorkerNodeId
= 0
117 this.currentWorkerNodeId
=
118 this.currentWorkerNodeId
> this.pool
.workerNodes
.length
- 1
119 ? this.pool
.workerNodes
.length
- 1
120 : this.currentWorkerNodeId
123 const deleted
= this.workersTaskRunTime
.delete(workerNodeKey
)
124 for (const [key
, value
] of this.workersTaskRunTime
) {
125 if (key
> workerNodeKey
) {
126 this.workersTaskRunTime
.set(key
- 1, value
)
132 private initWorkersTaskRunTime (): void {
133 for (const [index
] of this.pool
.workerNodes
.entries()) {
134 this.initWorkerTaskRunTime(index
)
138 private initWorkerTaskRunTime (workerNodeKey
: number): void {
139 this.setWorkerTaskRunTime(workerNodeKey
, this.defaultWorkerWeight
, 0)
142 private setWorkerTaskRunTime (
143 workerNodeKey
: number,
147 this.workersTaskRunTime
.set(workerNodeKey
, {
153 private getWorkerVirtualTaskRunTime (workerNodeKey
: number): number {
154 return this.requiredStatistics
.medRunTime
155 ? this.pool
.workerNodes
[workerNodeKey
].tasksUsage
.medRunTime
156 : this.pool
.workerNodes
[workerNodeKey
].tasksUsage
.avgRunTime
159 private computeWorkerWeight (): number {
160 let cpusCycleTimeWeight
= 0
161 for (const cpu
of cpus()) {
162 // CPU estimated cycle time
163 const numberOfDigits
= cpu
.speed
.toString().length
- 1
164 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
165 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
167 return Math.round(cpusCycleTimeWeight
/ cpus().length
)