fix: ensure worker key can't be negative in worker choice strategies
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'node:os'
2 import type { IPoolInternal } from '../pool-internal'
3 import type { IPoolWorker } from '../pool-worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type {
6 IWorkerChoiceStrategy,
7 RequiredStatistics
8 } from './selection-strategies-types'
9
10 /**
11 * Virtual task runtime.
12 */
13 interface TaskRunTime {
14 weight: number
15 runTime: number
16 }
17
18 /**
19 * Selects the next worker with a weighted round robin scheduling algorithm.
20 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
21 *
22 * @typeParam Worker - Type of worker which manages the strategy.
23 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
24 * @typeParam Response - Type of response of execution. This can only be serializable data.
25 */
26 export class WeightedRoundRobinWorkerChoiceStrategy<
27 Worker extends IPoolWorker,
28 Data = unknown,
29 Response = unknown
30 >
31 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
32 implements IWorkerChoiceStrategy<Worker, Data, Response> {
33 /** {@inheritDoc} */
34 public readonly requiredStatistics: RequiredStatistics = {
35 runTime: true,
36 avgRunTime: true
37 }
38
39 /**
40 * Worker id where the current task will be submitted.
41 */
42 private currentWorkerId: number = 0
43 /**
44 * Default worker weight.
45 */
46 private readonly defaultWorkerWeight: number
47 /**
48 * Per worker virtual task runtime map.
49 */
50 private readonly workersTaskRunTime: Map<number, TaskRunTime> = new Map<
51 number,
52 TaskRunTime
53 >()
54
55 /**
56 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
57 *
58 * @param pool - The pool instance.
59 */
60 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
61 super(pool)
62 this.defaultWorkerWeight = this.computeWorkerWeight()
63 this.initWorkersTaskRunTime()
64 }
65
66 /** {@inheritDoc} */
67 public reset (): boolean {
68 this.currentWorkerId = 0
69 this.workersTaskRunTime.clear()
70 this.initWorkersTaskRunTime()
71 return true
72 }
73
74 /** {@inheritDoc} */
75 public choose (): number {
76 const chosenWorkerKey = this.currentWorkerId
77 if (this.isDynamicPool && !this.workersTaskRunTime.has(chosenWorkerKey)) {
78 this.initWorkerTaskRunTime(chosenWorkerKey)
79 }
80 const workerTaskRunTime =
81 this.workersTaskRunTime.get(chosenWorkerKey)?.runTime ?? 0
82 const workerTaskWeight =
83 this.workersTaskRunTime.get(chosenWorkerKey)?.weight ??
84 this.defaultWorkerWeight
85 if (workerTaskRunTime < workerTaskWeight) {
86 this.setWorkerTaskRunTime(
87 chosenWorkerKey,
88 workerTaskWeight,
89 workerTaskRunTime +
90 (this.getWorkerVirtualTaskRunTime(chosenWorkerKey) ?? 0)
91 )
92 } else {
93 this.currentWorkerId =
94 this.currentWorkerId === this.pool.workers.length - 1
95 ? 0
96 : this.currentWorkerId + 1
97 this.setWorkerTaskRunTime(this.currentWorkerId, workerTaskWeight, 0)
98 }
99 return chosenWorkerKey
100 }
101
102 /** {@inheritDoc} */
103 public remove (workerKey: number): boolean {
104 if (this.currentWorkerId === workerKey) {
105 if (this.pool.workers.length === 0) {
106 this.currentWorkerId = 0
107 } else {
108 this.currentWorkerId =
109 this.currentWorkerId > this.pool.workers.length - 1
110 ? this.pool.workers.length - 1
111 : this.currentWorkerId
112 }
113 }
114 const workerDeleted = this.workersTaskRunTime.delete(workerKey)
115 for (const [key, value] of this.workersTaskRunTime) {
116 if (key > workerKey) {
117 this.workersTaskRunTime.set(key - 1, value)
118 }
119 }
120 return workerDeleted
121 }
122
123 private initWorkersTaskRunTime (): void {
124 for (const [index] of this.pool.workers.entries()) {
125 this.initWorkerTaskRunTime(index)
126 }
127 }
128
129 private initWorkerTaskRunTime (workerKey: number): void {
130 this.setWorkerTaskRunTime(workerKey, this.defaultWorkerWeight, 0)
131 }
132
133 private setWorkerTaskRunTime (
134 workerKey: number,
135 weight: number,
136 runTime: number
137 ): void {
138 this.workersTaskRunTime.set(workerKey, {
139 weight,
140 runTime
141 })
142 }
143
144 private getWorkerVirtualTaskRunTime (workerKey: number): number {
145 return this.pool.workers[workerKey].tasksUsage.avgRunTime
146 }
147
148 private computeWorkerWeight (): number {
149 let cpusCycleTimeWeight = 0
150 for (const cpu of cpus()) {
151 // CPU estimated cycle time
152 const numberOfDigits = cpu.speed.toString().length - 1
153 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigits))
154 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigits)
155 }
156 return Math.round(cpusCycleTimeWeight / cpus().length)
157 }
158 }