1 import { cpus
} from
'node:os'
3 DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
6 import type { IPool
} from
'../pool'
7 import type { IWorker
} from
'../worker'
10 MeasurementStatisticsRequirements
,
12 TaskStatisticsRequirements
,
13 WorkerChoiceStrategyOptions
14 } from
'./selection-strategies-types'
17 * Worker choice strategy abstract base class.
19 * @typeParam Worker - Type of worker which manages the strategy.
20 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
21 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
23 export abstract class AbstractWorkerChoiceStrategy
<
24 Worker
extends IWorker
,
27 > implements IWorkerChoiceStrategy
{
29 * The next worker node key.
31 protected nextWorkerNodeKey
: number = 0
34 public readonly strategyPolicy
: StrategyPolicy
= {
35 useDynamicWorker
: false
39 public readonly taskStatisticsRequirements
: TaskStatisticsRequirements
= {
40 runTime
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
41 waitTime
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
,
42 elu
: DEFAULT_MEASUREMENT_STATISTICS_REQUIREMENTS
46 * Constructs a worker choice strategy bound to the pool.
48 * @param pool - The pool instance.
49 * @param opts - The worker choice strategy options.
52 protected readonly pool
: IPool
<Worker
, Data
, Response
>,
53 protected opts
: WorkerChoiceStrategyOptions
= DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
55 this.choose
= this.choose
.bind(this)
58 protected setTaskStatisticsRequirements (
59 opts
: WorkerChoiceStrategyOptions
61 this.toggleMedianMeasurementStatisticsRequirements(
62 this.taskStatisticsRequirements
.runTime
,
63 opts
.runTime
?.median
as boolean
65 this.toggleMedianMeasurementStatisticsRequirements(
66 this.taskStatisticsRequirements
.waitTime
,
67 opts
.waitTime
?.median
as boolean
69 this.toggleMedianMeasurementStatisticsRequirements(
70 this.taskStatisticsRequirements
.elu
,
71 opts
.elu
?.median
as boolean
75 private toggleMedianMeasurementStatisticsRequirements (
76 measurementStatisticsRequirements
: MeasurementStatisticsRequirements
,
79 if (measurementStatisticsRequirements
.average
&& toggleMedian
) {
80 measurementStatisticsRequirements
.average
= false
81 measurementStatisticsRequirements
.median
= toggleMedian
83 if (measurementStatisticsRequirements
.median
&& !toggleMedian
) {
84 measurementStatisticsRequirements
.average
= true
85 measurementStatisticsRequirements
.median
= toggleMedian
90 public abstract reset (): boolean
93 public abstract update (workerNodeKey
: number): boolean
96 public abstract choose (): number
99 public abstract remove (workerNodeKey
: number): boolean
102 public setOptions (opts
: WorkerChoiceStrategyOptions
): void {
103 this.opts
= opts
?? DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
104 this.setTaskStatisticsRequirements(this.opts
)
108 * Whether the worker node is ready or not.
110 * @param workerNodeKey - The worker node key.
111 * @returns Whether the worker node is ready or not.
113 protected isWorkerNodeReady (workerNodeKey
: number): boolean {
114 return this.pool
.workerNodes
[workerNodeKey
].info
.ready
118 * Gets the worker task runtime.
119 * If the task statistics require the average runtime, the average runtime is returned.
120 * If the task statistics require the median runtime , the median runtime is returned.
122 * @param workerNodeKey - The worker node key.
123 * @returns The worker task runtime.
125 protected getWorkerTaskRunTime (workerNodeKey
: number): number {
126 return this.taskStatisticsRequirements
.runTime
.median
127 ? this.pool
.workerNodes
[workerNodeKey
].usage
.runTime
?.median
?? 0
128 : this.pool
.workerNodes
[workerNodeKey
].usage
.runTime
?.average
?? 0
132 * Gets the worker task wait time.
133 * If the task statistics require the average wait time, the average wait time is returned.
134 * If the task statistics require the median wait time, the median wait time is returned.
136 * @param workerNodeKey - The worker node key.
137 * @returns The worker task wait time.
139 protected getWorkerTaskWaitTime (workerNodeKey
: number): number {
140 return this.taskStatisticsRequirements
.waitTime
.median
141 ? this.pool
.workerNodes
[workerNodeKey
].usage
.waitTime
?.median
?? 0
142 : this.pool
.workerNodes
[workerNodeKey
].usage
.waitTime
?.average
?? 0
146 * Gets the worker task ELU.
147 * If the task statistics require the average ELU, the average ELU is returned.
148 * If the task statistics require the median ELU, the median ELU is returned.
150 * @param workerNodeKey - The worker node key.
151 * @returns The worker task ELU.
153 protected getWorkerTaskElu (workerNodeKey
: number): number {
154 return this.taskStatisticsRequirements
.elu
.median
155 ? this.pool
.workerNodes
[workerNodeKey
].usage
.elu
.active
?.median
?? 0
156 : this.pool
.workerNodes
[workerNodeKey
].usage
.elu
.active
?.average
?? 0
159 protected computeDefaultWorkerWeight (): number {
160 let cpusCycleTimeWeight
= 0
161 for (const cpu
of cpus()) {
162 // CPU estimated cycle time
163 const numberOfDigits
= cpu
.speed
.toString().length
- 1
164 const cpuCycleTime
= 1 / (cpu
.speed
/ Math.pow(10, numberOfDigits
))
165 cpusCycleTimeWeight
+= cpuCycleTime
* Math.pow(10, numberOfDigits
)
167 return Math.round(cpusCycleTimeWeight
/ cpus().length
)