1 import { cpus
} from
'node:os'
2 import type { IPoolInternal
} from
'../pool-internal'
3 import type { IPoolWorker
} from
'../pool-worker'
4 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy'
8 } from
'./selection-strategies-types'
11 * Virtual task runtime.
13 interface TaskRunTime
{
19 * Selects the next worker with a weighted round robin scheduling algorithm.
20 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
22 * @typeParam Worker - Type of worker which manages the strategy.
23 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
24 * @typeParam Response - Type of response of execution. This can only be serializable data.
26 export class WeightedRoundRobinWorkerChoiceStrategy
<
27 Worker
extends IPoolWorker
,
31 extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
>
32 implements IWorkerChoiceStrategy
{
34 public readonly requiredStatistics
: RequiredStatistics
= {
40 * Worker id where the current task will be submitted.
42 private currentWorkerId
: number = 0
44 * Default worker weight.
46 private readonly defaultWorkerWeight
: number
48 * Per worker virtual task runtime map.
50 private readonly workersTaskRunTime
: Map
<number, TaskRunTime
> = new Map
<
56 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
58 * @param pool - The pool instance.
60 public constructor (pool
: IPoolInternal
<Worker
, Data
, Response
>) {
62 this.defaultWorkerWeight
= this.computeWorkerWeight()
63 this.initWorkersTaskRunTime()
67 public reset (): boolean {
68 this.currentWorkerId
= 0
69 this.workersTaskRunTime
.clear()
70 this.initWorkersTaskRunTime()
75 public choose (): number {
76 const chosenWorkerKey
= this.currentWorkerId
77 if (this.isDynamicPool
&& !this.workersTaskRunTime
.has(chosenWorkerKey
)) {
78 this.initWorkerTaskRunTime(chosenWorkerKey
)
80 const workerTaskRunTime
=
81 this.workersTaskRunTime
.get(chosenWorkerKey
)?.runTime
?? 0
82 const workerTaskWeight
=
83 this.workersTaskRunTime
.get(chosenWorkerKey
)?.weight
??
84 this.defaultWorkerWeight
85 if (workerTaskRunTime
< workerTaskWeight
) {
86 this.setWorkerTaskRunTime(
90 (this.getWorkerVirtualTaskRunTime(chosenWorkerKey
) ?? 0)
93 this.currentWorkerId
=
94 this.currentWorkerId
=== this.pool
.workers
.length
- 1
96 : this.currentWorkerId
+ 1
97 this.setWorkerTaskRunTime(this.currentWorkerId
, workerTaskWeight
, 0)
99 return chosenWorkerKey
103 public remove (workerKey
: number): boolean {
104 if (this.currentWorkerId
=== workerKey
) {
105 if (this.pool
.workers
.length
=== 0) {
106 this.currentWorkerId
= 0
108 this.currentWorkerId
=
109 this.currentWorkerId
> this.pool
.workers
.length
- 1
110 ? this.pool
.workers
.length
- 1
111 : this.currentWorkerId
114 const workerDeleted
= this.workersTaskRunTime
.delete(workerKey
)
115 for (const [key
, value
] of this.workersTaskRunTime
) {
116 if (key
> workerKey
) {
117 this.workersTaskRunTime
.set(key
- 1, value
)
123 private initWorkersTaskRunTime (): void {
124 for (const [index
] of this.pool
.workers
.entries()) {
125 this.initWorkerTaskRunTime(index
)
129 private initWorkerTaskRunTime (workerKey
: number): void {
130 this.setWorkerTaskRunTime(workerKey
, this.defaultWorkerWeight
, 0)
133 private setWorkerTaskRunTime (
138 this.workersTaskRunTime
.set(workerKey
, {
144 private getWorkerVirtualTaskRunTime (workerKey
: number): number {
145 return this.pool
.workers
[workerKey
].tasksUsage
.avgRunTime
148 private computeWorkerWeight (): number {
149 let cpusCycleTimeWeight
= 0
150 for (const cpu
of cpus()) {
151 // CPU estimated cycle time
152 const numberOfDigits
= cpu
.speed
.toString().length
- 1
153 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
154 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
156 return Math.round(cpusCycleTimeWeight
/ cpus().length
)