1 import { cpus
} from
'node:os'
3 DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
6 import type { IPool
} from
'../pool'
7 import type { IWorker
} from
'../worker'
10 MeasurementStatisticsRequirements
,
12 TaskStatisticsRequirements
,
13 WorkerChoiceStrategyOptions
14 } from
'./selection-strategies-types'
17 * Worker choice strategy abstract base class.
19 * @typeParam Worker - Type of worker which manages the strategy.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
23 export abstract class AbstractWorkerChoiceStrategy
<
24 Worker
extends IWorker
,
27 > implements IWorkerChoiceStrategy
{
29 * The next worker node key.
31 protected nextWorkerNodeKey
: number | undefined = 0
34 * The previous worker node key.
36 protected previousWorkerNodeKey
: number = 0
39 public readonly strategyPolicy
: StrategyPolicy
= {
40 dynamicWorkerUsage
: false,
41 dynamicWorkerReady
: true
45 public readonly taskStatisticsRequirements
: TaskStatisticsRequirements
= {
46 runTime
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
47 waitTime
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
48 elu
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
52 * Constructs a worker choice strategy bound to the pool.
54 * @param pool - The pool instance.
55 * @param opts - The worker choice strategy options.
58 protected readonly pool
: IPool
<Worker
, Data
, Response
>,
59 protected opts
: WorkerChoiceStrategyOptions
= DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
61 this.opts
= { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
, ...opts
}
62 this.choose
= this.choose
.bind(this)
65 protected setTaskStatisticsRequirements (
66 opts
: WorkerChoiceStrategyOptions
68 this.toggleMedianMeasurementStatisticsRequirements(
69 this.taskStatisticsRequirements
.runTime
,
70 opts
.runTime
?.median
as boolean
72 this.toggleMedianMeasurementStatisticsRequirements(
73 this.taskStatisticsRequirements
.waitTime
,
74 opts
.waitTime
?.median
as boolean
76 this.toggleMedianMeasurementStatisticsRequirements(
77 this.taskStatisticsRequirements
.elu
,
78 opts
.elu
?.median
as boolean
82 private toggleMedianMeasurementStatisticsRequirements (
83 measurementStatisticsRequirements
: MeasurementStatisticsRequirements
,
86 if (measurementStatisticsRequirements
.average
&& toggleMedian
) {
87 measurementStatisticsRequirements
.average
= false
88 measurementStatisticsRequirements
.median
= toggleMedian
90 if (measurementStatisticsRequirements
.median
&& !toggleMedian
) {
91 measurementStatisticsRequirements
.average
= true
92 measurementStatisticsRequirements
.median
= toggleMedian
96 protected resetWorkerNodeKeyProperties (): void {
97 this.nextWorkerNodeKey
= 0
98 this.previousWorkerNodeKey
= 0
102 public abstract reset (): boolean
105 public abstract update (workerNodeKey
: number): boolean
108 public abstract choose (): number | undefined
111 public abstract remove (workerNodeKey
: number): boolean
114 public setOptions (opts
: WorkerChoiceStrategyOptions
): void {
115 this.opts
= { ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
, ...opts
}
116 this.setTaskStatisticsRequirements(this.opts
)
120 public hasPoolWorkerNodesReady (): boolean {
121 return this.pool
.workerNodes
.some(workerNode
=> workerNode
.info
.ready
)
125 * Whether the worker node is ready or not.
127 * @param workerNodeKey - The worker node key.
128 * @returns Whether the worker node is ready or not.
130 protected isWorkerNodeReady (workerNodeKey
: number): boolean {
131 return this.pool
.workerNodes
[workerNodeKey
]?.info
?.ready
?? false
135 * Check the next worker node readiness.
137 protected checkNextWorkerNodeReadiness (): void {
138 if (!this.isWorkerNodeReady(this.nextWorkerNodeKey
as number)) {
139 delete this.nextWorkerNodeKey
144 * Gets the worker node task runtime.
145 * If the task statistics require the average runtime, the average runtime is returned.
146 * If the task statistics require the median runtime , the median runtime is returned.
148 * @param workerNodeKey - The worker node key.
149 * @returns The worker node task runtime.
151 protected getWorkerNodeTaskRunTime (workerNodeKey
: number): number {
152 return this.taskStatisticsRequirements
.runTime
.median
153 ? this.pool
.workerNodes
[workerNodeKey
].usage
.runTime
.median
?? 0
154 : this.pool
.workerNodes
[workerNodeKey
].usage
.runTime
.average
?? 0
158 * Gets the worker node task wait time.
159 * If the task statistics require the average wait time, the average wait time is returned.
160 * If the task statistics require the median wait time, the median wait time is returned.
162 * @param workerNodeKey - The worker node key.
163 * @returns The worker node task wait time.
165 protected getWorkerNodeTaskWaitTime (workerNodeKey
: number): number {
166 return this.taskStatisticsRequirements
.waitTime
.median
167 ? this.pool
.workerNodes
[workerNodeKey
].usage
.waitTime
.median
?? 0
168 : this.pool
.workerNodes
[workerNodeKey
].usage
.waitTime
.average
?? 0
172 * Gets the worker node task ELU.
173 * If the task statistics require the average ELU, the average ELU is returned.
174 * If the task statistics require the median ELU, the median ELU is returned.
176 * @param workerNodeKey - The worker node key.
177 * @returns The worker node task ELU.
179 protected getWorkerNodeTaskElu (workerNodeKey
: number): number {
180 return this.taskStatisticsRequirements
.elu
.median
181 ? this.pool
.workerNodes
[workerNodeKey
].usage
.elu
.active
.median
?? 0
182 : this.pool
.workerNodes
[workerNodeKey
].usage
.elu
.active
.average
?? 0
186 * Sets safely the previous worker node key.
188 * @param workerNodeKey - The worker node key.
190 protected setPreviousWorkerNodeKey (workerNodeKey
: number | undefined): void {
191 this.previousWorkerNodeKey
= workerNodeKey
?? this.previousWorkerNodeKey
194 protected computeDefaultWorkerWeight (): number {
195 let cpusCycleTimeWeight
= 0
196 for (const cpu
of cpus()) {
197 // CPU estimated cycle time
198 const numberOfDigits
= cpu
.speed
.toString().length
- 1
199 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
200 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
202 return Math.round(cpusCycleTimeWeight
/ cpus().length
)