Apply dependencies update
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-worker-choice-strategy.ts
CommitLineData
b3432a63
JB
1import { cpus } from 'os'
2import type { AbstractPoolWorker } from '../abstract-pool-worker'
3import type { IPoolInternal } from '../pool-internal'
4import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
10fcfaf4 5import type { RequiredStatistics } from './selection-strategies-types'
b3432a63
JB
6
7/**
8 * Task run time.
9 */
10type TaskRunTime = {
11 weight: number
12 runTime: number
13}
14
15/**
16 * Selects the next worker with a weighted round robin scheduling algorithm.
17 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
18 *
19 * @template Worker Type of worker which manages the strategy.
20 * @template Data Type of data sent to the worker. This can only be serializable data.
21 * @template Response Type of response of execution. This can only be serializable data.
22 */
23export class WeightedRoundRobinWorkerChoiceStrategy<
24 Worker extends AbstractPoolWorker,
25 Data,
26 Response
27> extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
10fcfaf4
JB
28 /** @inheritDoc */
29 public requiredStatistics: RequiredStatistics = {
30 runTime: true
31 }
32
b3432a63
JB
33 /**
34 * Worker index where the previous task was submitted.
35 */
36 private previousWorkerIndex: number = 0
37 /**
38 * Worker index where the current task will be submitted.
39 */
40 private currentWorkerIndex: number = 0
41 /**
42 * Default worker weight.
43 */
44 private defaultWorkerWeight: number
45 /**
46 * Per worker task runtime map.
47 */
48 private workerTaskRunTime: Map<Worker, TaskRunTime> = new Map<
49 Worker,
50 TaskRunTime
51 >()
52
53 /**
23ff945a 54 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
b3432a63
JB
55 *
56 * @param pool The pool instance.
57 */
58 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
59 super(pool)
60 this.defaultWorkerWeight = this.computeWorkerWeight()
61 this.initWorkerTaskRunTime()
62 }
63
64 /** @inheritDoc */
65 public choose (): Worker {
66 const currentWorker = this.pool.workers[this.currentWorkerIndex]
67 if (this.isDynamicPool === true) {
68 this.workerTaskRunTime.has(currentWorker) === false &&
69 this.workerTaskRunTime.set(currentWorker, {
70 weight: this.defaultWorkerWeight,
71 runTime: 0
72 })
73 }
74 const workerVirtualTaskRunTime =
75 this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
76 const workerTaskWeight =
77 this.workerTaskRunTime.get(currentWorker)?.weight ??
78 this.defaultWorkerWeight
79 if (this.currentWorkerIndex === this.previousWorkerIndex) {
80 const workerTaskRunTime =
81 (this.workerTaskRunTime.get(currentWorker)?.runTime ?? 0) +
82 workerVirtualTaskRunTime
83 this.workerTaskRunTime.set(currentWorker, {
84 weight: workerTaskWeight,
85 runTime: workerTaskRunTime
86 })
87 } else {
88 this.workerTaskRunTime.set(currentWorker, {
89 weight: workerTaskWeight,
90 runTime: 0
91 })
92 }
93 if (
94 workerVirtualTaskRunTime <
95 (this.workerTaskRunTime.get(currentWorker) ?? this.defaultWorkerWeight)
96 ) {
97 this.previousWorkerIndex = this.currentWorkerIndex
98 } else {
99 this.previousWorkerIndex = this.currentWorkerIndex
100 this.currentWorkerIndex =
101 this.pool.workers.length - 1 === this.currentWorkerIndex
102 ? 0
103 : this.currentWorkerIndex + 1
104 }
105 return this.pool.workers[this.currentWorkerIndex]
106 }
107
108 private computeWorkerWeight () {
109 let cpusCycleTimeWeight = 0
a59e741b 110 for (const cpu of cpus()) {
b3432a63 111 // CPU estimated cycle time
a59e741b
JB
112 const numberOfDigit = cpu.speed.toString().length - 1
113 const cpuCycleTime = 1 / (cpu.speed / Math.pow(10, numberOfDigit))
b3432a63
JB
114 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
115 }
116 return cpusCycleTimeWeight / cpus().length
117 }
118
119 private initWorkerTaskRunTime () {
120 for (const worker of this.pool.workers) {
121 this.workerTaskRunTime.set(worker, {
122 weight: this.defaultWorkerWeight,
123 runTime: 0
124 })
125 }
126 }
127
128 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
129 return this.pool.getWorkerAverageTasksRunTime(worker)
130 }
131}