1 import { cpus
} from
'os'
2 import type { IPoolInternal
} from
'../pool-internal'
3 import type { IPoolWorker
} from
'../pool-worker'
4 import { AbstractWorkerChoiceStrategy
} from
'./abstract-worker-choice-strategy'
5 import type { RequiredStatistics
} from
'./selection-strategies-types'
8 * Virtual task runtime.
10 interface TaskRunTime
{
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
19 * @typeParam Worker - Type of worker which manages the strategy.
20 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
21 * @typeParam Response - Type of response of execution. This can only be serializable data.
23 export class WeightedRoundRobinWorkerChoiceStrategy
<
24 Worker
extends IPoolWorker
,
27 > extends AbstractWorkerChoiceStrategy
<Worker
, Data
, Response
> {
29 public readonly requiredStatistics
: RequiredStatistics
= {
34 * Worker id where the current task will be submitted.
36 private currentWorkerId
: number = 0
38 * Default worker weight.
40 private readonly defaultWorkerWeight
: number
42 * Per worker virtual task runtime map.
44 private readonly workersTaskRunTime
: Map
<Worker
, TaskRunTime
> = new Map
<
50 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
52 * @param pool - The pool instance.
54 public constructor (pool
: IPoolInternal
<Worker
, Data
, Response
>) {
56 this.defaultWorkerWeight
= this.computeWorkerWeight()
57 this.initWorkersTaskRunTime()
61 public reset (): boolean {
62 this.currentWorkerId
= 0
63 this.workersTaskRunTime
.clear()
64 this.initWorkersTaskRunTime()
69 public choose (): Worker
{
70 const chosenWorker
= this.pool
.workers
.get(this.currentWorkerId
)
72 if (this.isDynamicPool
&& !this.workersTaskRunTime
.has(chosenWorker
)) {
73 this.initWorkerTaskRunTime(chosenWorker
)
75 const workerTaskRunTime
=
76 this.workersTaskRunTime
.get(chosenWorker
)?.runTime
?? 0
77 const workerTaskWeight
=
78 this.workersTaskRunTime
.get(chosenWorker
)?.weight
??
79 this.defaultWorkerWeight
80 if (workerTaskRunTime
< workerTaskWeight
) {
81 this.setWorkerTaskRunTime(
85 (this.getWorkerVirtualTaskRunTime(chosenWorker
) ?? 0)
88 this.currentWorkerId
=
89 this.currentWorkerId
=== this.pool
.workers
.size
- 1
91 : this.currentWorkerId
+ 1
92 this.setWorkerTaskRunTime(
93 this.pool
.workers
.get(this.currentWorkerId
)?.worker
as Worker
,
101 private initWorkersTaskRunTime (): void {
102 for (const value
of this.pool
.workers
.values()) {
103 this.initWorkerTaskRunTime(value
.worker
)
107 private initWorkerTaskRunTime (worker
: Worker
): void {
108 this.setWorkerTaskRunTime(worker
, this.defaultWorkerWeight
, 0)
111 private setWorkerTaskRunTime (
116 this.workersTaskRunTime
.set(worker
, {
122 private getWorkerVirtualTaskRunTime (worker
: Worker
): number | undefined {
123 return this.pool
.getWorkerAverageTasksRunTime(worker
)
126 private computeWorkerWeight (): number {
127 let cpusCycleTimeWeight
= 0
128 for (const cpu
of cpus()) {
129 // CPU estimated cycle time
130 const numberOfDigits
= cpu
.speed
.toString().length
- 1
131 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
132 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
134 return Math.round(cpusCycleTimeWeight
/ cpus().length
)