fix: ensure task function ops sync worker choice strategies
[poolifier.git] / src / pools / selection-strategies / selection-strategies-utils.ts
1 import { cpus } from 'node:os'
2
3 import type { IPool } from '../pool.js'
4 import type { IWorker } from '../worker.js'
5 import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strategy.js'
6 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy.js'
7 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy.js'
8 import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy.js'
9 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy.js'
10 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy.js'
11 import {
12 type IWorkerChoiceStrategy,
13 WorkerChoiceStrategies,
14 type WorkerChoiceStrategy,
15 type WorkerChoiceStrategyOptions
16 } from './selection-strategies-types.js'
17 import { WeightedRoundRobinWorkerChoiceStrategy } from './weighted-round-robin-worker-choice-strategy.js'
18 import type { WorkerChoiceStrategiesContext } from './worker-choice-strategies-context.js'
19
20 const estimatedCpuSpeed = (): number => {
21 const runs = 150000000
22 const begin = performance.now()
23 // eslint-disable-next-line no-empty
24 for (let i = runs; i > 0; i--) {}
25 const end = performance.now()
26 const duration = end - begin
27 return Math.trunc(runs / duration / 1000) // in MHz
28 }
29
30 const getDefaultWorkerWeight = (): number => {
31 const currentCpus = cpus()
32 let estCpuSpeed: number | undefined
33 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
34 if (currentCpus.every(cpu => cpu.speed == null || cpu.speed === 0)) {
35 estCpuSpeed = estimatedCpuSpeed()
36 }
37 let cpusCycleTimeWeight = 0
38 for (const cpu of currentCpus) {
39 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
40 if (cpu.speed == null || cpu.speed === 0) {
41 cpu.speed =
42 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
43 currentCpus.find(cpu => cpu.speed != null && cpu.speed !== 0)?.speed ??
44 estCpuSpeed ??
45 2000
46 }
47 // CPU estimated cycle time
48 const numberOfDigits = cpu.speed.toString().length - 1
49 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
50 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
51 }
52 return Math.round(cpusCycleTimeWeight / currentCpus.length)
53 }
54
55 const getDefaultWeights = (
56 poolMaxSize: number,
57 defaultWorkerWeight?: number
58 ): Record<number, number> => {
59 defaultWorkerWeight = defaultWorkerWeight ?? getDefaultWorkerWeight()
60 const weights: Record<number, number> = {}
61 for (let workerNodeKey = 0; workerNodeKey < poolMaxSize; workerNodeKey++) {
62 weights[workerNodeKey] = defaultWorkerWeight
63 }
64 return weights
65 }
66
67 export const getWorkerChoiceStrategiesRetries = <
68 Worker extends IWorker,
69 Data,
70 Response
71 >(
72 pool: IPool<Worker, Data, Response>,
73 opts?: WorkerChoiceStrategyOptions
74 ): number => {
75 return (
76 pool.info.maxSize +
77 Object.keys(opts?.weights ?? getDefaultWeights(pool.info.maxSize)).length
78 )
79 }
80
81 export const buildWorkerChoiceStrategyOptions = <
82 Worker extends IWorker,
83 Data,
84 Response
85 >(
86 pool: IPool<Worker, Data, Response>,
87 opts?: WorkerChoiceStrategyOptions
88 ): WorkerChoiceStrategyOptions => {
89 opts = structuredClone(opts ?? {})
90 opts.weights = opts.weights ?? getDefaultWeights(pool.info.maxSize)
91 return {
92 ...{
93 runTime: { median: false },
94 waitTime: { median: false },
95 elu: { median: false }
96 },
97 ...opts
98 }
99 }
100
101 export const getWorkerChoiceStrategy = <Worker extends IWorker, Data, Response>(
102 workerChoiceStrategy: WorkerChoiceStrategy,
103 pool: IPool<Worker, Data, Response>,
104 context: ThisType<WorkerChoiceStrategiesContext<Worker, Data, Response>>,
105 opts?: WorkerChoiceStrategyOptions
106 ): IWorkerChoiceStrategy => {
107 switch (workerChoiceStrategy) {
108 case WorkerChoiceStrategies.ROUND_ROBIN:
109 return new (RoundRobinWorkerChoiceStrategy.bind(context))(pool, opts)
110 case WorkerChoiceStrategies.LEAST_USED:
111 return new (LeastUsedWorkerChoiceStrategy.bind(context))(pool, opts)
112 case WorkerChoiceStrategies.LEAST_BUSY:
113 return new (LeastBusyWorkerChoiceStrategy.bind(context))(pool, opts)
114 case WorkerChoiceStrategies.LEAST_ELU:
115 return new (LeastEluWorkerChoiceStrategy.bind(context))(pool, opts)
116 case WorkerChoiceStrategies.FAIR_SHARE:
117 return new (FairShareWorkerChoiceStrategy.bind(context))(pool, opts)
118 case WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN:
119 return new (WeightedRoundRobinWorkerChoiceStrategy.bind(context))(
120 pool,
121 opts
122 )
123 case WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN:
124 return new (InterleavedWeightedRoundRobinWorkerChoiceStrategy.bind(
125 context
126 ))(pool, opts)
127 default:
128 throw new Error(
129 // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
130 `Worker choice strategy '${workerChoiceStrategy}' is not valid`
131 )
132 }
133 }