07517535e57ba7f7fc645973c3bfe406f9900271
[poolifier.git] / src / pools / selection-strategies / selection-strategies-utils.ts
1 import { cpus } from 'node:os'
2
3 import type { IPool } from '../pool.js'
4 import type { IWorker } from '../worker.js'
5 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
6 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
7 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
8 import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
9 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
10 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
11 import {
12 type IWorkerChoiceStrategy,
13 type MeasurementStatisticsRequirements,
14 type StrategyPolicy,
15 type TaskStatisticsRequirements,
16 WorkerChoiceStrategies,
17 type WorkerChoiceStrategy,
18 type WorkerChoiceStrategyOptions,
19 } from './selection-strategies-types.js'
20 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
21 import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
22
23 const estimatedCpuSpeed = (): number => {
24 const runs = 150000000
25 const begin = performance.now()
26 // eslint-disable-next-line no-empty
27 for (let i = runs; i > 0; i--) {}
28 const end = performance.now()
29 const duration = end - begin
30 return Math.trunc(runs / duration / 1000) // in MHz
31 }
32
33 const getDefaultWorkerWeight = (): number => {
34 const currentCpus = cpus()
35 let estCpuSpeed: number | undefined
36
37 if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
38 estCpuSpeed = estimatedCpuSpeed()
39 }
40 let cpusCycleTimeWeight = 0
41 for (const cpu of currentCpus) {
42 if (cpu.speed == null || cpu.speed === 0) {
43 cpu.speed =
44 currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
45 estCpuSpeed ??
46 2000
47 }
48 // CPU estimated cycle time
49 const numberOfDigits = cpu.speed.toString().length - 1
50 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
51 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
52 }
53 return Math.round(cpusCycleTimeWeight / currentCpus.length)
54 }
55
56 const getDefaultWeights = (
57 poolMaxSize: number,
58 defaultWorkerWeight?: number
59 ): Record<number, number> => {
60 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
61 const weights: Record<number, number> = {}
62 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
63 weights[workerNodeKey] = defaultWorkerWeight
64 }
65 return weights
66 }
67
68 export const getWorkerChoiceStrategiesRetries = <
69 Worker extends IWorker,
70 Data,
71 Response
72 >(
73 pool: IPool<Worker, Data, Response>,
74 opts?: WorkerChoiceStrategyOptions
75 ): number => {
76 return (
77 pool.info.maxSize +
78 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
79 )
80 }
81
82 export const buildWorkerChoiceStrategyOptions = <
83 Worker extends IWorker,
84 Data,
85 Response
86 >(
87 pool: IPool<Worker, Data, Response>,
88 opts?: WorkerChoiceStrategyOptions
89 ): WorkerChoiceStrategyOptions => {
90 opts = structuredClone(opts ?? {})
91 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
92 return {
93 ...{
94 runTime: { median: false },
95 waitTime: { median: false },
96 elu: { median: false },
97 },
98 ...opts,
99 }
100 }
101
102 export const toggleMedianMeasurementStatisticsRequirements = (
103 measurementStatisticsRequirements: MeasurementStatisticsRequirements,
104 toggleMedian: boolean
105 ): void => {
106 if (measurementStatisticsRequirements.average && toggleMedian) {
107 measurementStatisticsRequirements.average = false
108 measurementStatisticsRequirements.median = toggleMedian
109 }
110 if (measurementStatisticsRequirements.median && !toggleMedian) {
111 measurementStatisticsRequirements.average = true
112 measurementStatisticsRequirements.median = toggleMedian
113 }
114 }
115
116 export const buildWorkerChoiceStrategiesPolicy = (
117 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
118 ): StrategyPolicy => {
119 const policies: StrategyPolicy[] = Array.from(
120 workerChoiceStrategies,
121 ([_, workerChoiceStrategy]) => workerChoiceStrategy.strategyPolicy
122 )
123 return {
124 dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
125 dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady),
126 }
127 }
128
129 export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = (
130 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
131 ): TaskStatisticsRequirements => {
132 const taskStatisticsRequirements: TaskStatisticsRequirements[] = Array.from(
133 workerChoiceStrategies,
134 ([_, workerChoiceStrategy]) =>
135 workerChoiceStrategy.taskStatisticsRequirements
136 )
137 return {
138 runTime: {
139 aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
140 average: taskStatisticsRequirements.some(r => r.runTime.average),
141 median: taskStatisticsRequirements.some(r => r.runTime.median),
142 },
143 waitTime: {
144 aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
145 average: taskStatisticsRequirements.some(r => r.waitTime.average),
146 median: taskStatisticsRequirements.some(r => r.waitTime.median),
147 },
148 elu: {
149 aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
150 average: taskStatisticsRequirements.some(r => r.elu.average),
151 median: taskStatisticsRequirements.some(r => r.elu.median),
152 },
153 }
154 }
155
156 export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
157 workerChoiceStrategy: WorkerChoiceStrategy,
158 pool: IPool<Worker, Data, Response>,
159 context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>,
160 opts?: WorkerChoiceStrategyOptions
161 ): IWorkerChoiceStrategy => {
162 switch (workerChoiceStrategy) {
163 case WorkerChoiceStrategies.ROUND_ROBIN:
164 return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts)
165 case WorkerChoiceStrategies.LEAST_USED:
166 return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts)
167 case WorkerChoiceStrategies.LEAST_BUSY:
168 return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts)
169 case WorkerChoiceStrategies.LEAST_ELU:
170 return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts)
171 case WorkerChoiceStrategies.FAIR_SHARE:
172 return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts)
173 case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
174 return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))(
175 pool,
176 opts
177 )
178 case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN:
179 return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(
180 context
181 ))(pool, opts)
182 default:
183 throw new Error(
184 `Worker choice strategy '${workerChoiceStrategy}' is not valid`
185 )
186 }
187 }