refactor: explicity extends Task for MessageValue type
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'node:os'
2 import type { IWorker } from '../worker'
3 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
4 import type {
5 IWorkerChoiceStrategy,
6 RequiredStatistics,
7 WorkerChoiceStrategyOptions
8 } from './selection-strategies-types'
9 import type { IPool } from '../pool'
10
11 /**
12 * Virtual task runtime.
13 */
14 interface TaskRunTime {
15 weight: number
16 runTime: number
17 }
18
19 /**
20 * Selects the next worker with a weighted round robin scheduling algorithm.
21 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
22 *
23 * @typeParam Worker - Type of worker which manages the strategy.
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of execution response. This can only be serializable data.
26 */
27 export class WeightedRoundRobinWorkerChoiceStrategy<
28 Worker extends IWorker,
29 Data = unknown,
30 Response = unknown
31 >
32 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
33 implements IWorkerChoiceStrategy {
34 /** @inheritDoc */
35 public readonly requiredStatistics: RequiredStatistics = {
36 runTime: true,
37 avgRunTime: true,
38 medRunTime: false
39 }
40
41 /**
42 * Worker node id where the current task will be submitted.
43 */
44 private currentWorkerNodeId: number = 0
45 /**
46 * Default worker weight.
47 */
48 private readonly defaultWorkerWeight: number
49 /**
50 * Workers' virtual task runtime.
51 */
52 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
53 number,
54 TaskRunTime
55 >()
56
57 /**
58 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
59 *
60 * @param pool - The pool instance.
61 * @param opts - The worker choice strategy options.
62 */
63 public constructor (
64 pool: IPool<Worker, Data, Response>,
65 opts?: WorkerChoiceStrategyOptions
66 ) {
67 super(pool, opts)
68 this.defaultWorkerWeight = this.computeWorkerWeight()
69 this.initWorkersTaskRunTime()
70 }
71
72 /** @inheritDoc */
73 public reset (): boolean {
74 this.currentWorkerNodeId = 0
75 this.workersTaskRunTime.clear()
76 this.initWorkersTaskRunTime()
77 return true
78 }
79
80 /** @inheritDoc */
81 public choose (): number {
82 const chosenWorkerNodeKey = this.currentWorkerNodeId
83 if (
84 this.isDynamicPool &&
85 !this.workersTaskRunTime.has(chosenWorkerNodeKey)
86 ) {
87 this.initWorkerTaskRunTime(chosenWorkerNodeKey)
88 }
89 const workerTaskRunTime =
90 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.runTime ?? 0
91 const workerTaskWeight =
92 this.workersTaskRunTime.get(chosenWorkerNodeKey)?.weight ??
93 this.defaultWorkerWeight
94 if (workerTaskRunTime < workerTaskWeight) {
95 this.setWorkerTaskRunTime(
96 chosenWorkerNodeKey,
97 workerTaskWeight,
98 workerTaskRunTime +
99 (this.getWorkerVirtualTaskRunTime(chosenWorkerNodeKey) ?? 0)
100 )
101 } else {
102 this.currentWorkerNodeId =
103 this.currentWorkerNodeId === this.pool.workerNodes.length - 1
104 ? 0
105 : this.currentWorkerNodeId + 1
106 this.setWorkerTaskRunTime(this.currentWorkerNodeId, workerTaskWeight, 0)
107 }
108 return chosenWorkerNodeKey
109 }
110
111 /** @inheritDoc */
112 public remove (workerNodeKey: number): boolean {
113 if (this.currentWorkerNodeId === workerNodeKey) {
114 if (this.pool.workerNodes.length === 0) {
115 this.currentWorkerNodeId = 0
116 } else {
117 this.currentWorkerNodeId =
118 this.currentWorkerNodeId > this.pool.workerNodes.length - 1
119 ? this.pool.workerNodes.length - 1
120 : this.currentWorkerNodeId
121 }
122 }
123 const deleted = this.workersTaskRunTime.delete(workerNodeKey)
124 for (const [key, value] of this.workersTaskRunTime) {
125 if (key > workerNodeKey) {
126 this.workersTaskRunTime.set(key - 1, value)
127 }
128 }
129 return deleted
130 }
131
132 private initWorkersTaskRunTime (): void {
133 for (const [index] of this.pool.workerNodes.entries()) {
134 this.initWorkerTaskRunTime(index)
135 }
136 }
137
138 private initWorkerTaskRunTime (workerNodeKey: number): void {
139 this.setWorkerTaskRunTime(workerNodeKey, this.defaultWorkerWeight, 0)
140 }
141
142 private setWorkerTaskRunTime (
143 workerNodeKey: number,
144 weight: number,
145 runTime: number
146 ): void {
147 this.workersTaskRunTime.set(workerNodeKey, {
148 weight,
149 runTime
150 })
151 }
152
153 private getWorkerVirtualTaskRunTime (workerNodeKey: number): number {
154 return this.requiredStatistics.medRunTime
155 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
156 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
157 }
158
159 private computeWorkerWeight (): number {
160 let cpusCycleTimeWeight = 0
161 for (const cpu of cpus()) {
162 // CPU estimated cycle time
163 const numberOfDigits = cpu.speed.toString().length - 1
164 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
165 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
166 }
167 return Math.round(cpusCycleTimeWeight / cpus().length)
168 }
169 }