chore: v2.4.0-3
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'node:os'
2 import type { IPoolInternal } from '../pool-internal'
3 import type { IPoolWorker } from '../pool-worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type {
6 IWorkerChoiceStrategy,
7 RequiredStatistics
8 } from './selection-strategies-types'
9
10 /**
11 * Virtual task runtime.
12 */
13 interface TaskRunTime {
14 weight: number
15 runTime: number
16 }
17
18 /**
19 * Selects the next worker with a weighted round robin scheduling algorithm.
20 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
21 *
22 * @typeParam Worker - Type of worker which manages the strategy.
23 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
24 * @typeParam Response - Type of response of execution. This can only be serializable data.
25 */
26 export class WeightedRoundRobinWorkerChoiceStrategy<
27 Worker extends IPoolWorker,
28 Data,
29 Response
30 >
31 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
32 implements IWorkerChoiceStrategy {
33 /** {@inheritDoc} */
34 public readonly requiredStatistics: RequiredStatistics = {
35 runTime: true
36 }
37
38 /**
39 * Worker id where the current task will be submitted.
40 */
41 private currentWorkerId: number = 0
42 /**
43 * Default worker weight.
44 */
45 private readonly defaultWorkerWeight: number
46 /**
47 * Per worker virtual task runtime map.
48 */
49 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
50 number,
51 TaskRunTime
52 >()
53
54 /**
55 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
56 *
57 * @param pool - The pool instance.
58 */
59 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
60 super(pool)
61 this.defaultWorkerWeight = this.computeWorkerWeight()
62 this.initWorkersTaskRunTime()
63 }
64
65 /** {@inheritDoc} */
66 public reset (): boolean {
67 this.currentWorkerId = 0
68 this.workersTaskRunTime.clear()
69 this.initWorkersTaskRunTime()
70 return true
71 }
72
73 /** {@inheritDoc} */
74 public choose (): number {
75 const chosenWorkerKey = this.currentWorkerId
76 if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
77 this.initWorkerTaskRunTime(chosenWorkerKey)
78 }
79 const workerTaskRunTime =
80 this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
81 const workerTaskWeight =
82 this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
83 this.defaultWorkerWeight
84 if (workerTaskRunTime < workerTaskWeight) {
85 this.setWorkerTaskRunTime(
86 chosenWorkerKey,
87 workerTaskWeight,
88 workerTaskRunTime +
89 (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
90 )
91 } else {
92 this.currentWorkerId =
93 this.currentWorkerId === this.pool.workers.length - 1
94 ? 0
95 : this.currentWorkerId + 1
96 this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
97 }
98 return chosenWorkerKey
99 }
100
101 /** {@inheritDoc} */
102 public remove (workerKey: number): boolean {
103 if (this.currentWorkerId === workerKey) {
104 this.currentWorkerId =
105 this.currentWorkerId > this.pool.workers.length - 1
106 ? this.pool.workers.length - 1
107 : this.currentWorkerId
108 }
109 const workerDeleted = this.workersTaskRunTime.delete(workerKey)
110 for (const [key, value] of this.workersTaskRunTime) {
111 if (key > workerKey) {
112 this.workersTaskRunTime.set(key - 1, value)
113 }
114 }
115 return workerDeleted
116 }
117
118 private initWorkersTaskRunTime (): void {
119 for (const [index] of this.pool.workers.entries()) {
120 this.initWorkerTaskRunTime(index)
121 }
122 }
123
124 private initWorkerTaskRunTime (workerKey: number): void {
125 this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
126 }
127
128 private setWorkerTaskRunTime (
129 workerKey: number,
130 weight: number,
131 runTime: number
132 ): void {
133 this.workersTaskRunTime.set(workerKey, {
134 weight,
135 runTime
136 })
137 }
138
139 private getWorkerVirtualTaskRunTime (workerKey: number): number {
140 return this.pool.workers[workerKey].tasksUsage.avgRunTime
141 }
142
143 private computeWorkerWeight (): number {
144 let cpusCycleTimeWeight = 0
145 for (const cpu of cpus()) {
146 // CPU estimated cycle time
147 const numberOfDigits = cpu.speed.toString().length - 1
148 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
149 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
150 }
151 return Math.round(cpusCycleTimeWeight / cpus().length)
152 }
153 }