build(deps): bump poolifier
[poolifier.git] / src / pools / selection-strategies / selection-strategies-utils.ts
CommitLineData
bcfb06ce
JB
1import { cpus } from 'node:os'
2
3import type { IPool } from '../pool.js'
4import type { IWorker } from '../worker.js'
5import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
6import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
7import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
8import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
9import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
10import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
11import {
12 type IWorkerChoiceStrategy,
763abb1c 13 type MeasurementStatisticsRequirements,
19b8be8b
JB
14 type StrategyPolicy,
15 type TaskStatisticsRequirements,
bcfb06ce
JB
16 WorkerChoiceStrategies,
17 type WorkerChoiceStrategy,
18 type WorkerChoiceStrategyOptions
19} from './selection-strategies-types.js'
20import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
21import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
22
bcfb06ce
JB
23const estimatedCpuSpeed = (): number => {
24 const runs = 150000000
25 const begin = performance.now()
26 // eslint-disable-next-line no-empty
27 for (let i = runs; i > 0; i--) {}
28 const end = performance.now()
29 const duration = end - begin
30 return Math.trunc(runs / duration / 1000) // in MHz
31}
32
33const getDefaultWorkerWeight = (): number => {
34 const currentCpus = cpus()
35 let estCpuSpeed: number | undefined
36 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
37 if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
38 estCpuSpeed = estimatedCpuSpeed()
39 }
40 let cpusCycleTimeWeight = 0
41 for (const cpu of currentCpus) {
42 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
43 if (cpu.speed == null || cpu.speed === 0) {
44 cpu.speed =
45 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
46 currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
47 estCpuSpeed ??
48 2000
49 }
50 // CPU estimated cycle time
51 const numberOfDigits = cpu.speed.toString().length - 1
52 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
53 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
54 }
55 return Math.round(cpusCycleTimeWeight / currentCpus.length)
56}
57
58const getDefaultWeights = (
59 poolMaxSize: number,
60 defaultWorkerWeight?: number
61): Record<number, number> => {
62 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
63 const weights: Record<number, number> = {}
64 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
65 weights[workerNodeKey] = defaultWorkerWeight
66 }
67 return weights
68}
69
70export const getWorkerChoiceStrategiesRetries = <
71 Worker extends IWorker,
72 Data,
73 Response
74>(
75 pool: IPool<Worker, Data, Response>,
76 opts?: WorkerChoiceStrategyOptions
77 ): number => {
78 return (
79 pool.info.maxSize +
80 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
81 )
82}
83
84export const buildWorkerChoiceStrategyOptions = <
85 Worker extends IWorker,
86 Data,
87 Response
88>(
89 pool: IPool<Worker, Data, Response>,
90 opts?: WorkerChoiceStrategyOptions
91 ): WorkerChoiceStrategyOptions => {
85bbc7ab 92 opts = structuredClone(opts ?? {})
bcfb06ce
JB
93 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
94 return {
95 ...{
96 runTime: { median: false },
97 waitTime: { median: false },
98 elu: { median: false }
99 },
100 ...opts
101 }
102}
103
763abb1c
JB
104export const toggleMedianMeasurementStatisticsRequirements = (
105 measurementStatisticsRequirements: MeasurementStatisticsRequirements,
106 toggleMedian: boolean
107): void => {
108 if (measurementStatisticsRequirements.average && toggleMedian) {
109 measurementStatisticsRequirements.average = false
110 measurementStatisticsRequirements.median = toggleMedian
111 }
112 if (measurementStatisticsRequirements.median && !toggleMedian) {
113 measurementStatisticsRequirements.average = true
114 measurementStatisticsRequirements.median = toggleMedian
115 }
116}
117
19b8be8b
JB
118export const buildWorkerChoiceStrategiesPolicy = (
119 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
120): StrategyPolicy => {
63af5400
JB
121 const policies: StrategyPolicy[] = Array.from(
122 workerChoiceStrategies,
123 ([_, workerChoiceStrategy]) => workerChoiceStrategy.strategyPolicy
124 )
19b8be8b
JB
125 return {
126 dynamicWorkerUsage: policies.some(p => p.dynamicWorkerUsage),
127 dynamicWorkerReady: policies.some(p => p.dynamicWorkerReady)
128 }
129}
130
131export const buildWorkerChoiceStrategiesTaskStatisticsRequirements = (
132 workerChoiceStrategies: Map<WorkerChoiceStrategy, IWorkerChoiceStrategy>
133): TaskStatisticsRequirements => {
63af5400
JB
134 const taskStatisticsRequirements: TaskStatisticsRequirements[] = Array.from(
135 workerChoiceStrategies,
136 ([_, workerChoiceStrategy]) =>
19b8be8b 137 workerChoiceStrategy.taskStatisticsRequirements
63af5400 138 )
19b8be8b
JB
139 return {
140 runTime: {
141 aggregate: taskStatisticsRequirements.some(r => r.runTime.aggregate),
142 average: taskStatisticsRequirements.some(r => r.runTime.average),
143 median: taskStatisticsRequirements.some(r => r.runTime.median)
144 },
145 waitTime: {
146 aggregate: taskStatisticsRequirements.some(r => r.waitTime.aggregate),
147 average: taskStatisticsRequirements.some(r => r.waitTime.average),
148 median: taskStatisticsRequirements.some(r => r.waitTime.median)
149 },
150 elu: {
151 aggregate: taskStatisticsRequirements.some(r => r.elu.aggregate),
152 average: taskStatisticsRequirements.some(r => r.elu.average),
153 median: taskStatisticsRequirements.some(r => r.elu.median)
154 }
155 }
156}
157
bcfb06ce
JB
158export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
159 workerChoiceStrategy: WorkerChoiceStrategy,
160 pool: IPool<Worker, Data, Response>,
161 context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>,
162 opts?: WorkerChoiceStrategyOptions
163): IWorkerChoiceStrategy => {
164 switch (workerChoiceStrategy) {
165 case WorkerChoiceStrategies.ROUND_ROBIN:
166 return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts)
167 case WorkerChoiceStrategies.LEAST_USED:
168 return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts)
169 case WorkerChoiceStrategies.LEAST_BUSY:
170 return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts)
171 case WorkerChoiceStrategies.LEAST_ELU:
172 return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts)
173 case WorkerChoiceStrategies.FAIR_SHARE:
174 return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts)
175 case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
176 return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))(
177 pool,
178 opts
179 )
180 case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN:
181 return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(
182 context
183 ))(pool, opts)
184 default:
185 throw new Error(
186 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
187 `Worker choice strategy '${workerChoiceStrategy}' is not valid`
188 )
189 }
190}