chore(deps-dev): apply updates
[poolifier.git] / src / pools / selection-strategies / selection-strategies-utils.ts
CommitLineData
bcfb06ce
JB
1import { cpus } from 'node:os'
2
3import type { IPool } from '../pool.js'
4import type { IWorker } from '../worker.js'
97231086
JB
5import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
6
bcfb06ce
JB
7import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
8import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
9import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
10import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
11import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
12import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
13import {
14 type IWorkerChoiceStrategy,
763abb1c 15 type MeasurementStatisticsRequirements,
19b8be8b
JB
16 type StrategyPolicy,
17 type TaskStatisticsRequirements,
bcfb06ce
JB
18 WorkerChoiceStrategies,
19 type WorkerChoiceStrategy,
3a502712 20 type WorkerChoiceStrategyOptions,
bcfb06ce
JB
21} from './selection-strategies-types.js'
22import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
bcfb06ce 23
bcfb06ce
JB
24const estimatedCpuSpeed = (): number => {
25 const runs = 150000000
26 const begin = performance.now()
27 // eslint-disable-next-line no-empty
28 for (let i = runs; i > 0; i--) {}
29 const end = performance.now()
30 const duration = end - begin
31 return Math.trunc(runs / duration / 1000) // in MHz
32}
33
34const getDefaultWorkerWeight = (): number => {
35 const currentCpus = cpus()
36 let estCpuSpeed: number | undefined
6e5d7052 37 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
bcfb06ce
JB
38 if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
39 estCpuSpeed = estimatedCpuSpeed()
40 }
41 let cpusCycleTimeWeight = 0
42 for (const cpu of currentCpus) {
6e5d7052 43 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
bcfb06ce
JB
44 if (cpu.speed == null || cpu.speed === 0) {
45 cpu.speed =
6e5d7052 46 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
bcfb06ce
JB
47 currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
48 estCpuSpeed ??
49 2000
50 }
51 // CPU estimated cycle time
52 const numberOfDigits = cpu.speed.toString().length - 1
53 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
54 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
55 }
56 return Math.round(cpusCycleTimeWeight / currentCpus.length)
57}
58
59const getDefaultWeights = (
60 poolMaxSize: number,
61 defaultWorkerWeight?: number
62): Record<number, number> => {
63 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
64 const weights: Record<number, number> = {}
65 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
66 weights[workerNodeKey] = defaultWorkerWeight
67 }
68 return weights
69}
70
71export const getWorkerChoiceStrategiesRetries = <
72 Worker extends IWorker,
73 Data,
74 Response
75>(
76 pool: IPool<Worker, Data, Response>,
77 opts?: WorkerChoiceStrategyOptions
78 ): number => {
79 return (
80 pool.info.maxSize +
81 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
82 )
83}
84
85export const buildWorkerChoiceStrategyOptions = <
86 Worker extends IWorker,
87 Data,
88 Response
89>(
90 pool: IPool<Worker, Data, Response>,
91 opts?: WorkerChoiceStrategyOptions
92 ): WorkerChoiceStrategyOptions => {
85bbc7ab 93 opts = structuredClone(opts ?? {})
bcfb06ce
JB
94 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
95 return {
96 ...{
97231086 97 elu: { median: false },
bcfb06ce
JB
98 runTime: { median: false },
99 waitTime: { median: false },
bcfb06ce 100 },
3a502712 101 ...opts,
bcfb06ce
JB
102 }
103}
104
763abb1c
JB
105export const toggleMedianMeasurementStatisticsRequirements = (
106 measurementStatisticsRequirements: MeasurementStatisticsRequirements,
107 toggleMedian: boolean
108): void => {
109 if (measurementStatisticsRequirements.average && toggleMedian) {
110 measurementStatisticsRequirements.average = false
111 measurementStatisticsRequirements.median = toggleMedian
112 }
113 if (measurementStatisticsRequirements.median && !toggleMedian) {
114 measurementStatisticsRequirements.average = true
115 measurementStatisticsRequirements.median = toggleMedian
116 }
117}
118
19b8be8b
JB
119export const buildWorkerChoiceStrategiesPolicy = (
120 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
121): StrategyPolicy => {
63af5400
JB
122 const policies: StrategyPolicy[] = Array.from(
123 workerChoiceStrategies,
124 ([_, workerChoiceStrategy]) => workerChoiceStrategy.strategyPolicy
125 )
19b8be8b 126 return {
3a502712 127 dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady),
97231086 128 dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
19b8be8b
JB
129 }
130}
131
132export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = (
133 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
134): TaskStatisticsRequirements => {
63af5400
JB
135 const taskStatisticsRequirements: TaskStatisticsRequirements[] = Array.from(
136 workerChoiceStrategies,
137 ([_, workerChoiceStrategy]) =>
19b8be8b 138 workerChoiceStrategy.taskStatisticsRequirements
63af5400 139 )
19b8be8b 140 return {
97231086
JB
141 elu: {
142 aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
143 average: taskStatisticsRequirements.some(r => r.elu.average),
144 median: taskStatisticsRequirements.some(r => r.elu.median),
145 },
19b8be8b
JB
146 runTime: {
147 aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
148 average: taskStatisticsRequirements.some(r => r.runTime.average),
3a502712 149 median: taskStatisticsRequirements.some(r => r.runTime.median),
19b8be8b
JB
150 },
151 waitTime: {
152 aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
153 average: taskStatisticsRequirements.some(r => r.waitTime.average),
3a502712 154 median: taskStatisticsRequirements.some(r => r.waitTime.median),
19b8be8b 155 },
19b8be8b
JB
156 }
157}
158
bcfb06ce
JB
159export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
160 workerChoiceStrategy: WorkerChoiceStrategy,
161 pool: IPool<Worker, Data, Response>,
162 context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>,
163 opts?: WorkerChoiceStrategyOptions
164): IWorkerChoiceStrategy => {
165 switch (workerChoiceStrategy) {
97231086
JB
166 case WorkerChoiceStrategies.FAIR_SHARE:
167 return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts)
168 case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN:
169 return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(
170 context
171 ))(pool, opts)
bcfb06ce
JB
172 case WorkerChoiceStrategies.LEAST_BUSY:
173 return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts)
174 case WorkerChoiceStrategies.LEAST_ELU:
175 return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts)
97231086
JB
176 case WorkerChoiceStrategies.LEAST_USED:
177 return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts)
178 case WorkerChoiceStrategies.ROUND_ROBIN:
179 return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts)
bcfb06ce
JB
180 case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
181 return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))(
182 pool,
183 opts
184 )
bcfb06ce
JB
185 default:
186 throw new Error(
6e5d7052 187 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
bcfb06ce
JB
188 `Worker choice strategy '${workerChoiceStrategy}' is not valid`
189 )
190 }
191}