1 import { cpus
} from
'node:os'
3 import type { IPool
} from
'../pool.js'
4 import type { IWorker
} from
'../worker.js'
5 import { FairShareWorkerChoiceStrategy
} from
'./fair-share-worker-choice-strategy.js'
6 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy
} from
'./interleaved-weighted-round-robin-worker-choice-strategy.js'
7 import { LeastBusyWorkerChoiceStrategy
} from
'./least-busy-worker-choice-strategy.js'
8 import { LeastEluWorkerChoiceStrategy
} from
'./least-elu-worker-choice-strategy.js'
9 import { LeastUsedWorkerChoiceStrategy
} from
'./least-used-worker-choice-strategy.js'
10 import { RoundRobinWorkerChoiceStrategy
} from
'./round-robin-worker-choice-strategy.js'
12 type IWorkerChoiceStrategy
,
13 type MeasurementStatisticsRequirements
,
15 type TaskStatisticsRequirements
,
16 WorkerChoiceStrategies
,
17 type WorkerChoiceStrategy
,
18 type WorkerChoiceStrategyOptions
,
19 } from
'./selection-strategies-types.js'
20 import { WeightedRoundRobinWorkerChoiceStrategy
} from
'./weighted-round-robin-worker-choice-strategy.js'
21 import type { WorkerChoiceStrategiesContext
} from
'./worker-choice-strategies-context.js'
23 const estimatedCpuSpeed
= (): number => {
24 const runs
= 150000000
25 const begin
= performance
.now()
26 // eslint-disable-next-line no-empty
27 for (let i
= runs
; i
> 0; i
--) {}
28 const end
= performance
.now()
29 const duration
= end
- begin
30 return Math.trunc(runs
/ duration
/ 1000) // in MHz
33 const getDefaultWorkerWeight
= (): number => {
34 const currentCpus
= cpus()
35 let estCpuSpeed
: number | undefined
36 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
37 if (currentCpus
.every(cpu
=> cpu
.speed
== null || cpu
.speed
=== 0)) {
38 estCpuSpeed
= estimatedCpuSpeed()
40 let cpusCycleTimeWeight
= 0
41 for (const cpu
of currentCpus
) {
42 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
43 if (cpu
.speed
== null || cpu
.speed
=== 0) {
45 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
46 currentCpus
.find(cpu
=> cpu
.speed
!= null && cpu
.speed
!== 0)?.speed
??
50 // CPU estimated cycle time
51 const numberOfDigits
= cpu
.speed
.toString().length
- 1
52 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
53 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
55 return Math.round(cpusCycleTimeWeight
/ currentCpus
.length
)
58 const getDefaultWeights
= (
60 defaultWorkerWeight
?: number
61 ): Record
<number, number> => {
62 defaultWorkerWeight
= defaultWorkerWeight
?? getDefaultWorkerWeight()
63 const weights
: Record
<number, number> = {}
64 for (let workerNodeKey
= 0; workerNodeKey
< poolMaxSize
; workerNodeKey
++) {
65 weights
[workerNodeKey
] = defaultWorkerWeight
70 export const getWorkerChoiceStrategiesRetries
= <
71 Worker
extends IWorker
,
75 pool
: IPool
<Worker
, Data
, Response
>,
76 opts
?: WorkerChoiceStrategyOptions
80 Object.keys(opts
?.weights
?? getDefaultWeights(pool
.info
.maxSize
)).length
84 export const buildWorkerChoiceStrategyOptions
= <
85 Worker
extends IWorker
,
89 pool
: IPool
<Worker
, Data
, Response
>,
90 opts
?: WorkerChoiceStrategyOptions
91 ): WorkerChoiceStrategyOptions
=> {
92 opts
= structuredClone(opts
?? {})
93 opts
.weights
= opts
.weights
?? getDefaultWeights(pool
.info
.maxSize
)
96 runTime
: { median
: false },
97 waitTime
: { median
: false },
98 elu
: { median
: false },
104 export const toggleMedianMeasurementStatisticsRequirements
= (
105 measurementStatisticsRequirements
: MeasurementStatisticsRequirements
,
106 toggleMedian
: boolean
108 if (measurementStatisticsRequirements
.average
&& toggleMedian
) {
109 measurementStatisticsRequirements
.average
= false
110 measurementStatisticsRequirements
.median
= toggleMedian
112 if (measurementStatisticsRequirements
.median
&& !toggleMedian
) {
113 measurementStatisticsRequirements
.average
= true
114 measurementStatisticsRequirements
.median
= toggleMedian
118 export const buildWorkerChoiceStrategiesPolicy
= (
119 workerChoiceStrategies
: Map
<WorkerChoiceStrategy
, IWorkerChoiceStrategy
>
120 ): StrategyPolicy
=> {
121 const policies
: StrategyPolicy
[] = Array.from(
122 workerChoiceStrategies
,
123 ([_
, workerChoiceStrategy
]) => workerChoiceStrategy
.strategyPolicy
126 dynamicWorkerUsage
: policies
.some(p
=> p
.dynamicWorkerUsage
),
127 dynamicWorkerReady
: policies
.some(p
=> p
.dynamicWorkerReady
),
131 export const buildWorkerChoiceStrategiesTaskStatisticsRequirements
= (
132 workerChoiceStrategies
: Map
<WorkerChoiceStrategy
, IWorkerChoiceStrategy
>
133 ): TaskStatisticsRequirements
=> {
134 const taskStatisticsRequirements
: TaskStatisticsRequirements
[] = Array.from(
135 workerChoiceStrategies
,
136 ([_
, workerChoiceStrategy
]) =>
137 workerChoiceStrategy
.taskStatisticsRequirements
141 aggregate
: taskStatisticsRequirements
.some(r
=> r
.runTime
.aggregate
),
142 average
: taskStatisticsRequirements
.some(r
=> r
.runTime
.average
),
143 median
: taskStatisticsRequirements
.some(r
=> r
.runTime
.median
),
146 aggregate
: taskStatisticsRequirements
.some(r
=> r
.waitTime
.aggregate
),
147 average
: taskStatisticsRequirements
.some(r
=> r
.waitTime
.average
),
148 median
: taskStatisticsRequirements
.some(r
=> r
.waitTime
.median
),
151 aggregate
: taskStatisticsRequirements
.some(r
=> r
.elu
.aggregate
),
152 average
: taskStatisticsRequirements
.some(r
=> r
.elu
.average
),
153 median
: taskStatisticsRequirements
.some(r
=> r
.elu
.median
),
158 export const getWorkerChoiceStrategy
= <Worker
extends IWorker
, Data
, Response
>(
159 workerChoiceStrategy
: WorkerChoiceStrategy
,
160 pool
: IPool
<Worker
, Data
, Response
>,
161 context
: ThisType
<WorkerChoiceStrategiesContext
<Worker
, Data
, Response
>>,
162 opts
?: WorkerChoiceStrategyOptions
163 ): IWorkerChoiceStrategy
=> {
164 switch (workerChoiceStrategy
) {
165 case WorkerChoiceStrategies
.ROUND_ROBIN
:
166 return new (RoundRobinWorkerChoiceStrategy
.bind(context
))(pool
, opts
)
167 case WorkerChoiceStrategies
.LEAST_USED
:
168 return new (LeastUsedWorkerChoiceStrategy
.bind(context
))(pool
, opts
)
169 case WorkerChoiceStrategies
.LEAST_BUSY
:
170 return new (LeastBusyWorkerChoiceStrategy
.bind(context
))(pool
, opts
)
171 case WorkerChoiceStrategies
.LEAST_ELU
:
172 return new (LeastEluWorkerChoiceStrategy
.bind(context
))(pool
, opts
)
173 case WorkerChoiceStrategies
.FAIR_SHARE
:
174 return new (FairShareWorkerChoiceStrategy
.bind(context
))(pool
, opts
)
175 case WorkerChoiceStrategies
.WEIGHTED_ROUND_ROBIN
:
176 return new (WeightedRoundRobinWorkerChoiceStrategy
.bind(context
))(
180 case WorkerChoiceStrategies
.INTERLEAVED_WEIGHTED_ROUND_ROBIN
:
181 return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy
.bind(
186 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
187 `Worker choice strategy '${workerChoiceStrategy}' is not valid`