feat: add pool runtime setters
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'node:os'
2 import type { IWorker } from '../worker'
3 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
4 import type {
5 IWorkerChoiceStrategy,
6 RequiredStatistics,
7 WorkerChoiceStrategyOptions
8 } from './selection-strategies-types'
9 import type { IPool } from '../pool'
10 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
11
12 /**
13 * Virtual task runtime.
14 */
15 interface TaskRunTime {
16 weight: number
17 runTime: number
18 }
19
20 /**
21 * Selects the next worker with a weighted round robin scheduling algorithm.
22 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
23 *
24 * @typeParam Worker - Type of worker which manages the strategy.
25 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
26 * @typeParam Response - Type of execution response. This can only be serializable data.
27 */
28 export class WeightedRoundRobinWorkerChoiceStrategy<
29 Worker extends IWorker,
30 Data = unknown,
31 Response = unknown
32 >
33 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
34 implements IWorkerChoiceStrategy {
35 /** @inheritDoc */
36 public readonly requiredStatistics: RequiredStatistics = {
37 runTime: true,
38 avgRunTime: true,
39 medRunTime: false
40 }
41
42 /**
43 * Worker node id where the current task will be submitted.
44 */
45 private currentWorkerNodeId: number = 0
46 /**
47 * Default worker weight.
48 */
49 private readonly defaultWorkerWeight: number
50 /**
51 * Workers' virtual task runtime.
52 */
53 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
54 number,
55 TaskRunTime
56 >()
57
58 /** @inheritDoc */
59 public constructor (
60 pool: IPool<Worker, Data, Response>,
61 opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
62 ) {
63 super(pool, opts)
64 this.checkOptions(this.opts)
65 this.defaultWorkerWeight = this.computeWorkerWeight()
66 this.initWorkersTaskRunTime()
67 }
68
69 /** @inheritDoc */
70 public reset (): boolean {
71 this.currentWorkerNodeId = 0
72 this.workersTaskRunTime.clear()
73 this.initWorkersTaskRunTime()
74 return true
75 }
76
77 /** @inheritDoc */
78 public choose (): number {
79 const chosenWorkerNodeKey = this.currentWorkerNodeId
80 if (
81 this.isDynamicPool &&
82 !this.workersTaskRunTime.has(chosenWorkerNodeKey)
83 ) {
84 this.initWorkerTaskRunTime(chosenWorkerNodeKey)
85 }
86 const workerTaskRunTime =
87 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0
88 const workerTaskWeight =
89 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ??
90 this.defaultWorkerWeight
91 if (workerTaskRunTime < workerTaskWeight) {
92 this.setWorkerTaskRunTime(
93 chosenWorkerNodeKey,
94 workerTaskWeight,
95 workerTaskRunTime +
96 (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
97 )
98 } else {
99 this.currentWorkerNodeId =
100 this.currentWorkerNodeId === this.pool.workerNodes.length - 1
101 ? 0
102 : this.currentWorkerNodeId + 1
103 this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0)
104 }
105 return chosenWorkerNodeKey
106 }
107
108 /** @inheritDoc */
109 public remove (workerNodeKey: number): boolean {
110 if (this.currentWorkerNodeId === workerNodeKey) {
111 if (this.pool.workerNodes.length === 0) {
112 this.currentWorkerNodeId = 0
113 } else {
114 this.currentWorkerNodeId =
115 this.currentWorkerNodeId > this.pool.workerNodes.length - 1
116 ? this.pool.workerNodes.length - 1
117 : this.currentWorkerNodeId
118 }
119 }
120 const deleted = this.workersTaskRunTime.delete(workerNodeKey)
121 for (const [key, value] of this.workersTaskRunTime) {
122 if (key > workerNodeKey) {
123 this.workersTaskRunTime.set(key - 1, value)
124 }
125 }
126 return deleted
127 }
128
129 private initWorkersTaskRunTime (): void {
130 for (const [index] of this.pool.workerNodes.entries()) {
131 this.initWorkerTaskRunTime(index)
132 }
133 }
134
135 private initWorkerTaskRunTime (workerNodeKey: number): void {
136 this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0)
137 }
138
139 private setWorkerTaskRunTime (
140 workerNodeKey: number,
141 weight: number,
142 runTime: number
143 ): void {
144 this.workersTaskRunTime.set(workerNodeKey, {
145 weight,
146 runTime
147 })
148 }
149
150 private getWorkerVirtualTaskRunTime (workerNodeKey: number): number {
151 return this.requiredStatistics.medRunTime
152 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
153 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
154 }
155
156 private computeWorkerWeight (): number {
157 let cpusCycleTimeWeight = 0
158 for (const cpu of cpus()) {
159 // CPU estimated cycle time
160 const numberOfDigits = cpu.speed.toString().length - 1
161 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
162 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
163 }
164 return Math.round(cpusCycleTimeWeight / cpus().length)
165 }
166 }