777be6e4d4a2be104afd7986bd835e7e04312e96
[poolifier.git] / src / pools / selection-strategies / selection-strategies-utils.ts
1 import { cpus } from 'node:os'
2
3 import type { IPool } from '../pool.js'
4 import type { IWorker } from '../worker.js'
5 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
6 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
7 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
8 import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
9 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
10 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
11 import {
12 type IWorkerChoiceStrategy,
13 type StrategyPolicy,
14 type TaskStatisticsRequirements,
15 WorkerChoiceStrategies,
16 type WorkerChoiceStrategy,
17 type WorkerChoiceStrategyOptions
18 } from './selection-strategies-types.js'
19 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
20 import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
21
22 const estimatedCpuSpeed = (): number => {
23 const runs = 150000000
24 const begin = performance.now()
25 // eslint-disable-next-line no-empty
26 for (let i = runs; i > 0; i--) {}
27 const end = performance.now()
28 const duration = end - begin
29 return Math.trunc(runs / duration / 1000) // in MHz
30 }
31
32 const getDefaultWorkerWeight = (): number => {
33 const currentCpus = cpus()
34 let estCpuSpeed: number | undefined
35 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
36 if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
37 estCpuSpeed = estimatedCpuSpeed()
38 }
39 let cpusCycleTimeWeight = 0
40 for (const cpu of currentCpus) {
41 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
42 if (cpu.speed == null || cpu.speed === 0) {
43 cpu.speed =
44 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
45 currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
46 estCpuSpeed ??
47 2000
48 }
49 // CPU estimated cycle time
50 const numberOfDigits = cpu.speed.toString().length - 1
51 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
52 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
53 }
54 return Math.round(cpusCycleTimeWeight / currentCpus.length)
55 }
56
57 const getDefaultWeights = (
58 poolMaxSize: number,
59 defaultWorkerWeight?: number
60 ): Record<number, number> => {
61 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
62 const weights: Record<number, number> = {}
63 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
64 weights[workerNodeKey] = defaultWorkerWeight
65 }
66 return weights
67 }
68
69 export const getWorkerChoiceStrategiesRetries = <
70 Worker extends IWorker,
71 Data,
72 Response
73 >(
74 pool: IPool<Worker, Data, Response>,
75 opts?: WorkerChoiceStrategyOptions
76 ): number => {
77 return (
78 pool.info.maxSize +
79 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
80 )
81 }
82
83 export const buildWorkerChoiceStrategyOptions = <
84 Worker extends IWorker,
85 Data,
86 Response
87 >(
88 pool: IPool<Worker, Data, Response>,
89 opts?: WorkerChoiceStrategyOptions
90 ): WorkerChoiceStrategyOptions => {
91 opts = structuredClone(opts ?? {})
92 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
93 return {
94 ...{
95 runTime: { median: false },
96 waitTime: { median: false },
97 elu: { median: false }
98 },
99 ...opts
100 }
101 }
102
103 export const buildWorkerChoiceStrategiesPolicy = (
104 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
105 ): StrategyPolicy => {
106 const policies: StrategyPolicy[] = []
107 for (const workerChoiceStrategy of workerChoiceStrategies.values()) {
108 policies.push(workerChoiceStrategy.strategyPolicy)
109 }
110 return {
111 dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
112 dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
113 }
114 }
115
116 export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = (
117 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
118 ): TaskStatisticsRequirements => {
119 const taskStatisticsRequirements: TaskStatisticsRequirements[] = []
120 for (const workerChoiceStrategy of workerChoiceStrategies.values()) {
121 taskStatisticsRequirements.push(
122 workerChoiceStrategy.taskStatisticsRequirements
123 )
124 }
125 return {
126 runTime: {
127 aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
128 average: taskStatisticsRequirements.some(r => r.runTime.average),
129 median: taskStatisticsRequirements.some(r => r.runTime.median)
130 },
131 waitTime: {
132 aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
133 average: taskStatisticsRequirements.some(r => r.waitTime.average),
134 median: taskStatisticsRequirements.some(r => r.waitTime.median)
135 },
136 elu: {
137 aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
138 average: taskStatisticsRequirements.some(r => r.elu.average),
139 median: taskStatisticsRequirements.some(r => r.elu.median)
140 }
141 }
142 }
143
144 export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
145 workerChoiceStrategy: WorkerChoiceStrategy,
146 pool: IPool<Worker, Data, Response>,
147 context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>,
148 opts?: WorkerChoiceStrategyOptions
149 ): IWorkerChoiceStrategy => {
150 switch (workerChoiceStrategy) {
151 case WorkerChoiceStrategies.ROUND_ROBIN:
152 return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts)
153 case WorkerChoiceStrategies.LEAST_USED:
154 return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts)
155 case WorkerChoiceStrategies.LEAST_BUSY:
156 return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts)
157 case WorkerChoiceStrategies.LEAST_ELU:
158 return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts)
159 case WorkerChoiceStrategies.FAIR_SHARE:
160 return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts)
161 case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
162 return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))(
163 pool,
164 opts
165 )
166 case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN:
167 return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(
168 context
169 ))(pool, opts)
170 default:
171 throw new Error(
172 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
173 `Worker choice strategy '${workerChoiceStrategy}' is not valid`
174 )
175 }
176 }