feat: add worker choice strategies retry mechanism
[poolifier.git] / src / pools / selection-strategies / abstract-worker-choice-strategy.ts
1 import { cpus } from 'node:os'
2 import {
3 DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
5 } from '../../utils'
6 import type { IPool } from '../pool'
7 import type { IWorker } from '../worker'
8 import type {
9 IWorkerChoiceStrategy,
10 MeasurementStatisticsRequirements,
11 StrategyPolicy,
12 TaskStatisticsRequirements,
13 WorkerChoiceStrategyOptions
14 } from './selection-strategies-types'
15
16 /**
17 * Worker choice strategy abstract base class.
18 *
19 * @typeParam Worker - Type of worker which manages the strategy.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
22 */
23 export abstract class AbstractWorkerChoiceStrategy<
24 Worker extends IWorker,
25 Data = unknown,
26 Response = unknown
27 > implements IWorkerChoiceStrategy {
28 /**
29 * The next worker node key.
30 */
31 protected nextWorkerNodeKey: number = 0
32
33 /** @inheritDoc */
34 public readonly strategyPolicy: StrategyPolicy = {
35 useDynamicWorker: false
36 }
37
38 /** @inheritDoc */
39 public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
40 runTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
41 waitTime: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS,
42 elu: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
43 }
44
45 /**
46 * Constructs a worker choice strategy bound to the pool.
47 *
48 * @param pool - The pool instance.
49 * @param opts - The worker choice strategy options.
50 */
51 public constructor (
52 protected readonly pool: IPool<Worker, Data, Response>,
53 protected opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
54 ) {
55 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
56 this.choose = this.choose.bind(this)
57 }
58
59 protected setTaskStatisticsRequirements (
60 opts: WorkerChoiceStrategyOptions
61 ): void {
62 this.toggleMedianMeasurementStatisticsRequirements(
63 this.taskStatisticsRequirements.runTime,
64 opts.runTime?.median as boolean
65 )
66 this.toggleMedianMeasurementStatisticsRequirements(
67 this.taskStatisticsRequirements.waitTime,
68 opts.waitTime?.median as boolean
69 )
70 this.toggleMedianMeasurementStatisticsRequirements(
71 this.taskStatisticsRequirements.elu,
72 opts.elu?.median as boolean
73 )
74 }
75
76 private toggleMedianMeasurementStatisticsRequirements (
77 measurementStatisticsRequirements: MeasurementStatisticsRequirements,
78 toggleMedian: boolean
79 ): void {
80 if (measurementStatisticsRequirements.average && toggleMedian) {
81 measurementStatisticsRequirements.average = false
82 measurementStatisticsRequirements.median = toggleMedian
83 }
84 if (measurementStatisticsRequirements.median && !toggleMedian) {
85 measurementStatisticsRequirements.average = true
86 measurementStatisticsRequirements.median = toggleMedian
87 }
88 }
89
90 /** @inheritDoc */
91 public abstract reset (): boolean
92
93 /** @inheritDoc */
94 public abstract update (workerNodeKey: number): boolean
95
96 /** @inheritDoc */
97 public abstract choose (): number
98
99 /** @inheritDoc */
100 public abstract remove (workerNodeKey: number): boolean
101
102 /** @inheritDoc */
103 public setOptions (opts: WorkerChoiceStrategyOptions): void {
104 this.opts = { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS, ...opts }
105 this.setTaskStatisticsRequirements(this.opts)
106 }
107
108 /**
109 * Whether the worker node is ready or not.
110 *
111 * @param workerNodeKey - The worker node key.
112 * @returns Whether the worker node is ready or not.
113 */
114 private isWorkerNodeReady (workerNodeKey: number): boolean {
115 return this.pool.workerNodes[workerNodeKey].info.ready
116 }
117
118 /**
119 * Whether the worker node has back pressure or not (i.e. its tasks queue is full).
120 *
121 * @param workerNodeKey - The worker node key.
122 * @returns `true` if the worker node has back pressure, `false` otherwise.
123 */
124 private hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
125 return this.pool.hasWorkerNodeBackPressure(workerNodeKey)
126 }
127
128 /**
129 * Whether the worker node is eligible or not.
130 * A worker node is eligible if it is ready and does not have back pressure.
131 *
132 * @param workerNodeKey - The worker node key.
133 * @returns `true` if the worker node is eligible, `false` otherwise.
134 * @see {@link isWorkerNodeReady}
135 * @see {@link hasWorkerNodeBackPressure}
136 */
137 protected isWorkerNodeEligible (workerNodeKey: number): boolean {
138 return (
139 this.isWorkerNodeReady(workerNodeKey) &&
140 !this.hasWorkerNodeBackPressure(workerNodeKey)
141 )
142 }
143
144 /**
145 * Gets the worker task runtime.
146 * If the task statistics require the average runtime, the average runtime is returned.
147 * If the task statistics require the median runtime , the median runtime is returned.
148 *
149 * @param workerNodeKey - The worker node key.
150 * @returns The worker task runtime.
151 */
152 protected getWorkerTaskRunTime (workerNodeKey: number): number {
153 return this.taskStatisticsRequirements.runTime.median
154 ? this.pool.workerNodes[workerNodeKey].usage.runTime?.median ?? 0
155 : this.pool.workerNodes[workerNodeKey].usage.runTime?.average ?? 0
156 }
157
158 /**
159 * Gets the worker task wait time.
160 * If the task statistics require the average wait time, the average wait time is returned.
161 * If the task statistics require the median wait time, the median wait time is returned.
162 *
163 * @param workerNodeKey - The worker node key.
164 * @returns The worker task wait time.
165 */
166 protected getWorkerTaskWaitTime (workerNodeKey: number): number {
167 return this.taskStatisticsRequirements.waitTime.median
168 ? this.pool.workerNodes[workerNodeKey].usage.waitTime?.median ?? 0
169 : this.pool.workerNodes[workerNodeKey].usage.waitTime?.average ?? 0
170 }
171
172 /**
173 * Gets the worker task ELU.
174 * If the task statistics require the average ELU, the average ELU is returned.
175 * If the task statistics require the median ELU, the median ELU is returned.
176 *
177 * @param workerNodeKey - The worker node key.
178 * @returns The worker task ELU.
179 */
180 protected getWorkerTaskElu (workerNodeKey: number): number {
181 return this.taskStatisticsRequirements.elu.median
182 ? this.pool.workerNodes[workerNodeKey].usage.elu.active?.median ?? 0
183 : this.pool.workerNodes[workerNodeKey].usage.elu.active?.average ?? 0
184 }
185
186 protected computeDefaultWorkerWeight (): number {
187 let cpusCycleTimeWeight = 0
188 for (const cpu of cpus()) {
189 // CPU estimated cycle time
190 const numberOfDigits = cpu.speed.toString().length - 1
191 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
192 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
193 }
194 return Math.round(cpusCycleTimeWeight / cpus().length)
195 }
196 }