fix: fix worker choice strategy retries mechanism on some edge cases
[poolifier.git] / src / pools / selection-strategies / abstract-worker-choice-strategy.ts
1 import { cpus } from 'node:os'
2 import {
3 DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
5 } from '../../utils'
6 import type { IPool } from '../pool'
7 import type { IWorker } from '../worker'
8 import type {
9 IWorkerChoiceStrategy,
10 MeasurementStatisticsRequirements,
11 StrategyPolicy,
12 TaskStatisticsRequirements,
13 WorkerChoiceStrategyOptions
14 } from './selection-strategies-types'
15
16 /**
17 * Worker choice strategy abstract base class.
18 *
19 * @typeParam Worker - Type of worker which manages the strategy.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 */
23 export abstract class AbstractWorkerChoiceStrategy<
24 Worker extends IWorker,
25 Data = unknown,
26 Response = unknown
27 > implements IWorkerChoiceStrategy {
28 /**
29 * The next worker node key.
30 */
31 protected nextWorkerNodeKey: number | undefined = 0
32
33 /** @inheritDoc */
34 public readonly strategyPolicy: StrategyPolicy = {
35 dynamicWorkerUsage: false,
36 dynamicWorkerReady: false
37 }
38
39 /** @inheritDoc */
40 public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
41 runTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
42 waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
43 elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
44 }
45
46 /**
47 * Constructs a worker choice strategy bound to the pool.
48 *
49 * @param pool - The pool instance.
50 * @param opts - The worker choice strategy options.
51 */
52 public constructor (
53 protected readonly pool: IPool<Worker, Data, Response>,
54 protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
55 ) {
56 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
57 this.choose = this.choose.bind(this)
58 }
59
60 protected setTaskStatisticsRequirements (
61 opts: WorkerChoiceStrategyOptions
62 ): void {
63 this.toggleMedianMeasurementStatisticsRequirements(
64 this.taskStatisticsRequirements.runTime,
65 opts.runTime?.median as boolean
66 )
67 this.toggleMedianMeasurementStatisticsRequirements(
68 this.taskStatisticsRequirements.waitTime,
69 opts.waitTime?.median as boolean
70 )
71 this.toggleMedianMeasurementStatisticsRequirements(
72 this.taskStatisticsRequirements.elu,
73 opts.elu?.median as boolean
74 )
75 }
76
77 private toggleMedianMeasurementStatisticsRequirements (
78 measurementStatisticsRequirements: MeasurementStatisticsRequirements,
79 toggleMedian: boolean
80 ): void {
81 if (measurementStatisticsRequirements.average && toggleMedian) {
82 measurementStatisticsRequirements.average = false
83 measurementStatisticsRequirements.median = toggleMedian
84 }
85 if (measurementStatisticsRequirements.median && !toggleMedian) {
86 measurementStatisticsRequirements.average = true
87 measurementStatisticsRequirements.median = toggleMedian
88 }
89 }
90
91 /** @inheritDoc */
92 public abstract reset (): boolean
93
94 /** @inheritDoc */
95 public abstract update (workerNodeKey: number): boolean
96
97 /** @inheritDoc */
98 public abstract choose (): number | undefined
99
100 /** @inheritDoc */
101 public abstract remove (workerNodeKey: number): boolean
102
103 /** @inheritDoc */
104 public setOptions (opts: WorkerChoiceStrategyOptions): void {
105 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
106 this.setTaskStatisticsRequirements(this.opts)
107 }
108
109 /**
110 * Whether the worker node is ready or not.
111 *
112 * @param workerNodeKey - The worker node key.
113 * @returns Whether the worker node is ready or not.
114 */
115 private isWorkerNodeReady (workerNodeKey: number): boolean {
116 return this.pool.workerNodes[workerNodeKey].info.ready
117 }
118
119 /**
120 * Whether the worker node has back pressure or not (i.e. its tasks queue is full).
121 *
122 * @param workerNodeKey - The worker node key.
123 * @returns `true` if the worker node has back pressure, `false` otherwise.
124 */
125 private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
126 return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
127 }
128
129 /**
130 * Whether the worker node is eligible or not.
131 * A worker node is eligible if it is ready and does not have back pressure.
132 *
133 * @param workerNodeKey - The worker node key.
134 * @returns `true` if the worker node is eligible, `false` otherwise.
135 * @see {@link isWorkerNodeReady}
136 * @see {@link hasWorkerNodeBackPressure}
137 */
138 protected isWorkerNodeEligible (workerNodeKey: number): boolean {
139 return (
140 this.isWorkerNodeReady(workerNodeKey) &&
141 !this.hasWorkerNodeBackPressure(workerNodeKey)
142 )
143 }
144
145 /**
146 * Gets the worker task runtime.
147 * If the task statistics require the average runtime, the average runtime is returned.
148 * If the task statistics require the median runtime , the median runtime is returned.
149 *
150 * @param workerNodeKey - The worker node key.
151 * @returns The worker task runtime.
152 */
153 protected getWorkerTaskRunTime (workerNodeKey: number): number {
154 return this.taskStatisticsRequirements.runTime.median
155 ? this.pool.workerNodes[workerNodeKey].usage.runTime?.median ?? 0
156 : this.pool.workerNodes[workerNodeKey].usage.runTime?.average ?? 0
157 }
158
159 /**
160 * Gets the worker task wait time.
161 * If the task statistics require the average wait time, the average wait time is returned.
162 * If the task statistics require the median wait time, the median wait time is returned.
163 *
164 * @param workerNodeKey - The worker node key.
165 * @returns The worker task wait time.
166 */
167 protected getWorkerTaskWaitTime (workerNodeKey: number): number {
168 return this.taskStatisticsRequirements.waitTime.median
169 ? this.pool.workerNodes[workerNodeKey].usage.waitTime?.median ?? 0
170 : this.pool.workerNodes[workerNodeKey].usage.waitTime?.average ?? 0
171 }
172
173 /**
174 * Gets the worker task ELU.
175 * If the task statistics require the average ELU, the average ELU is returned.
176 * If the task statistics require the median ELU, the median ELU is returned.
177 *
178 * @param workerNodeKey - The worker node key.
179 * @returns The worker task ELU.
180 */
181 protected getWorkerTaskElu (workerNodeKey: number): number {
182 return this.taskStatisticsRequirements.elu.median
183 ? this.pool.workerNodes[workerNodeKey].usage.elu.active?.median ?? 0
184 : this.pool.workerNodes[workerNodeKey].usage.elu.active?.average ?? 0
185 }
186
187 /**
188 * Assign to nextWorkerNodeKey property the chosen worker node key.
189 *
190 * @param chosenWorkerNodeKey - The chosen worker node key.
191 */
192 protected assignChosenWorkerNodeKey (
193 chosenWorkerNodeKey: number | undefined
194 ): void {
195 if (chosenWorkerNodeKey != null) {
196 this.nextWorkerNodeKey = chosenWorkerNodeKey
197 } else {
198 this.nextWorkerNodeKey = undefined
199 }
200 }
201
202 protected computeDefaultWorkerWeight (): number {
203 let cpusCycleTimeWeight = 0
204 for (const cpu of cpus()) {
205 // CPU estimated cycle time
206 const numberOfDigits = cpu.speed.toString().length - 1
207 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
208 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
209 }
210 return Math.round(cpusCycleTimeWeight / cpus().length)
211 }
212 }