Fixes to worker selection strategies
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
1 import { cpus } from 'os'
2 import type { IPoolInternal } from '../pool-internal'
3 import type { IPoolWorker } from '../pool-worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type { RequiredStatistics } from './selection-strategies-types'
6
7 /**
8 * Virtual task runtime.
9 */
10 type TaskRunTime = {
11 weight: number
12 runTime: number
13 }
14
15 /**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
22 */
23 export class WeightedRoundRobinWorkerChoiceStrategy<
24 Worker extends IPoolWorker,
25 Data,
26 Response
27 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
28 /** @inheritDoc */
29 public readonly requiredStatistics: RequiredStatistics = {
30 runTime: true
31 }
32
33 /**
34 * Worker index where the current task will be submitted.
35 */
36 private currentWorkerIndex: number = 0
37 /**
38 * Default worker weight.
39 */
40 private readonly defaultWorkerWeight: number
41 /**
42 * Per worker virtual task runtime map.
43 */
44 private readonly workersTaskRunTime: Map<Worker, TaskRunTime> = new Map<
45 Worker,
46 TaskRunTime
47 >()
48
49 /**
50 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
51 *
52 * @param pool The pool instance.
53 */
54 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
55 super(pool)
56 this.defaultWorkerWeight = this.computeWorkerWeight()
57 this.initWorkersTaskRunTime()
58 }
59
60 /** @inheritDoc */
61 public reset (): boolean {
62 this.currentWorkerIndex = 0
63 this.workersTaskRunTime.clear()
64 this.initWorkersTaskRunTime()
65 return true
66 }
67
68 /** @inheritDoc */
69 public choose (): Worker {
70 let chosenWorker = this.pool.workers[this.currentWorkerIndex]
71 if (
72 this.isDynamicPool === true &&
73 this.workersTaskRunTime.has(chosenWorker) === false
74 ) {
75 this.initWorkerTaskRunTime(chosenWorker)
76 }
77 const workerTaskWeight =
78 this.workersTaskRunTime.get(chosenWorker)?.weight ??
79 this.defaultWorkerWeight
80 if (
81 (this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0) <
82 workerTaskWeight
83 ) {
84 this.setWorkerTaskRunTime(
85 chosenWorker,
86 workerTaskWeight,
87 (this.workersTaskRunTime.get(chosenWorker)?.runTime ?? 0) +
88 (this.getWorkerVirtualTaskRunTime(chosenWorker) ?? 0)
89 )
90 } else {
91 this.currentWorkerIndex =
92 this.pool.workers.length - 1 === this.currentWorkerIndex
93 ? 0
94 : this.currentWorkerIndex + 1
95 chosenWorker = this.pool.workers[this.currentWorkerIndex]
96 this.setWorkerTaskRunTime(chosenWorker, workerTaskWeight, 0)
97 }
98 return chosenWorker
99 }
100
101 private initWorkersTaskRunTime (): void {
102 for (const worker of this.pool.workers) {
103 this.initWorkerTaskRunTime(worker)
104 }
105 }
106
107 private initWorkerTaskRunTime (worker: Worker): void {
108 this.setWorkerTaskRunTime(worker, this.defaultWorkerWeight, 0)
109 }
110
111 private setWorkerTaskRunTime (
112 worker: Worker,
113 weight: number,
114 runTime: number
115 ): void {
116 this.workersTaskRunTime.set(worker, {
117 weight,
118 runTime
119 })
120 }
121
122 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
123 return this.pool.getWorkerAverageTasksRunTime(worker)
124 }
125
126 private computeWorkerWeight (): number {
127 let cpusCycleTimeWeight = 0
128 for (const cpu of cpus()) {
129 // CPU estimated cycle time
130 const numberOfDigit = cpu.speed.toString().length - 1
131 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigit))
132 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
133 }
134 return Math.round(cpusCycleTimeWeight / cpus().length)
135 }
136 }