Commit | Line | Data |
---|---|---|
bcfb06ce JB |
1 | import { cpus } from 'node:os' |
2 | ||
3 | import type { IPool } from '../pool.js' | |
4 | import type { IWorker } from '../worker.js' | |
5 | import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js' | |
6 | import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js' | |
7 | import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js' | |
8 | import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js' | |
9 | import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js' | |
10 | import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js' | |
11 | import { | |
12 | type IWorkerChoiceStrategy, | |
13 | WorkerChoiceStrategies, | |
14 | type WorkerChoiceStrategy, | |
15 | type WorkerChoiceStrategyOptions | |
16 | } from './selection-strategies-types.js' | |
17 | import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js' | |
18 | import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js' | |
19 | ||
bcfb06ce JB |
20 | const estimatedCpuSpeed = (): number => { |
21 | const runs = 150000000 | |
22 | const begin = performance.now() | |
23 | // eslint-disable-next-line no-empty | |
24 | for (let i = runs; i > 0; i--) {} | |
25 | const end = performance.now() | |
26 | const duration = end - begin | |
27 | return Math.trunc(runs / duration / 1000) // in MHz | |
28 | } | |
29 | ||
30 | const getDefaultWorkerWeight = (): number => { | |
31 | const currentCpus = cpus() | |
32 | let estCpuSpeed: number | undefined | |
33 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition | |
34 | if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) { | |
35 | estCpuSpeed = estimatedCpuSpeed() | |
36 | } | |
37 | let cpusCycleTimeWeight = 0 | |
38 | for (const cpu of currentCpus) { | |
39 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition | |
40 | if (cpu.speed == null || cpu.speed === 0) { | |
41 | cpu.speed = | |
42 | // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition | |
43 | currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ?? | |
44 | estCpuSpeed ?? | |
45 | 2000 | |
46 | } | |
47 | // CPU estimated cycle time | |
48 | const numberOfDigits = cpu.speed.toString().length - 1 | |
49 | const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits)) | |
50 | cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits) | |
51 | } | |
52 | return Math.round(cpusCycleTimeWeight / currentCpus.length) | |
53 | } | |
54 | ||
55 | const getDefaultWeights = ( | |
56 | poolMaxSize: number, | |
57 | defaultWorkerWeight?: number | |
58 | ): Record<number, number> => { | |
59 | defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight() | |
60 | const weights: Record<number, number> = {} | |
61 | for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) { | |
62 | weights[workerNodeKey] = defaultWorkerWeight | |
63 | } | |
64 | return weights | |
65 | } | |
66 | ||
67 | export const getWorkerChoiceStrategiesRetries = < | |
68 | Worker extends IWorker, | |
69 | Data, | |
70 | Response | |
71 | >( | |
72 | pool: IPool<Worker, Data, Response>, | |
73 | opts?: WorkerChoiceStrategyOptions | |
74 | ): number => { | |
75 | return ( | |
76 | pool.info.maxSize + | |
77 | Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length | |
78 | ) | |
79 | } | |
80 | ||
81 | export const buildWorkerChoiceStrategyOptions = < | |
82 | Worker extends IWorker, | |
83 | Data, | |
84 | Response | |
85 | >( | |
86 | pool: IPool<Worker, Data, Response>, | |
87 | opts?: WorkerChoiceStrategyOptions | |
88 | ): WorkerChoiceStrategyOptions => { | |
85bbc7ab | 89 | opts = structuredClone(opts ?? {}) |
bcfb06ce JB |
90 | opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize) |
91 | return { | |
92 | ...{ | |
93 | runTime: { median: false }, | |
94 | waitTime: { median: false }, | |
95 | elu: { median: false } | |
96 | }, | |
97 | ...opts | |
98 | } | |
99 | } | |
100 | ||
101 | export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>( | |
102 | workerChoiceStrategy: WorkerChoiceStrategy, | |
103 | pool: IPool<Worker, Data, Response>, | |
104 | context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>, | |
105 | opts?: WorkerChoiceStrategyOptions | |
106 | ): IWorkerChoiceStrategy => { | |
107 | switch (workerChoiceStrategy) { | |
108 | case WorkerChoiceStrategies.ROUND_ROBIN: | |
109 | return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts) | |
110 | case WorkerChoiceStrategies.LEAST_USED: | |
111 | return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts) | |
112 | case WorkerChoiceStrategies.LEAST_BUSY: | |
113 | return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts) | |
114 | case WorkerChoiceStrategies.LEAST_ELU: | |
115 | return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts) | |
116 | case WorkerChoiceStrategies.FAIR_SHARE: | |
117 | return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts) | |
118 | case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN: | |
119 | return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))( | |
120 | pool, | |
121 | opts | |
122 | ) | |
123 | case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN: | |
124 | return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind( | |
125 | context | |
126 | ))(pool, opts) | |
127 | default: | |
128 | throw new Error( | |
129 | // eslint-disable-next-line @typescript-eslint/restrict-template-expressions | |
130 | `Worker choice strategy '${workerChoiceStrategy}' is not valid` | |
131 | ) | |
132 | } | |
133 | } |