Add fair sharing worker choice strategy
[poolifier.git] / src / pools / selection-strategies / weighted-round-robin-choice-strategy.ts
1 import { cpus } from 'os'
2 import type { AbstractPoolWorker } from '../abstract-pool-worker'
3 import type { IPoolInternal } from '../pool-internal'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5
6 /**
7 * Task run time.
8 */
9 type TaskRunTime = {
10 weight: number
11 runTime: number
12 }
13
14 /**
15 * Selects the next worker with a weighted round robin scheduling algorithm.
16 * Loosely modeled after the weighted round robin queueing algorithm: https://en.wikipedia.org/wiki/Weighted_round_robin.
17 *
18 * @template Worker Type of worker which manages the strategy.
19 * @template Data Type of data sent to the worker. This can only be serializable data.
20 * @template Response Type of response of execution. This can only be serializable data.
21 */
22 export class WeightedRoundRobinWorkerChoiceStrategy<
23 Worker extends AbstractPoolWorker,
24 Data,
25 Response
26 > extends AbstractWorkerChoiceStrategy<Worker, Data, Response> {
27 /**
28 * Worker index where the previous task was submitted.
29 */
30 private previousWorkerIndex: number = 0
31 /**
32 * Worker index where the current task will be submitted.
33 */
34 private currentWorkerIndex: number = 0
35 /**
36 * Default worker weight.
37 */
38 private defaultWorkerWeight: number
39 /**
40 * Per worker task runtime map.
41 */
42 private workerTaskRunTime: Map<Worker, TaskRunTime> = new Map<
43 Worker,
44 TaskRunTime
45 >()
46
47 /**
48 * Constructs a worker choice strategy that selects with a weighted round robin scheduling algorithm.
49 *
50 * @param pool The pool instance.
51 */
52 public constructor (pool: IPoolInternal<Worker, Data, Response>) {
53 super(pool)
54 this.defaultWorkerWeight = this.computeWorkerWeight()
55 this.initWorkerTaskRunTime()
56 }
57
58 /** @inheritDoc */
59 public choose (): Worker {
60 const currentWorker = this.pool.workers[this.currentWorkerIndex]
61 if (this.isDynamicPool === true) {
62 this.workerTaskRunTime.has(currentWorker) === false &&
63 this.workerTaskRunTime.set(currentWorker, {
64 weight: this.defaultWorkerWeight,
65 runTime: 0
66 })
67 }
68 const workerVirtualTaskRunTime =
69 this.getWorkerVirtualTaskRunTime(currentWorker) ?? 0
70 const workerTaskWeight =
71 this.workerTaskRunTime.get(currentWorker)?.weight ??
72 this.defaultWorkerWeight
73 if (this.currentWorkerIndex === this.previousWorkerIndex) {
74 const workerTaskRunTime =
75 (this.workerTaskRunTime.get(currentWorker)?.runTime ?? 0) +
76 workerVirtualTaskRunTime
77 this.workerTaskRunTime.set(currentWorker, {
78 weight: workerTaskWeight,
79 runTime: workerTaskRunTime
80 })
81 } else {
82 this.workerTaskRunTime.set(currentWorker, {
83 weight: workerTaskWeight,
84 runTime: 0
85 })
86 }
87 if (
88 workerVirtualTaskRunTime <
89 (this.workerTaskRunTime.get(currentWorker) ?? this.defaultWorkerWeight)
90 ) {
91 this.previousWorkerIndex = this.currentWorkerIndex
92 } else {
93 this.previousWorkerIndex = this.currentWorkerIndex
94 this.currentWorkerIndex =
95 this.pool.workers.length - 1 === this.currentWorkerIndex
96 ? 0
97 : this.currentWorkerIndex + 1
98 }
99 return this.pool.workers[this.currentWorkerIndex]
100 }
101
102 private computeWorkerWeight () {
103 let cpusCycleTimeWeight = 0
104 for (let cpu = 0; cpu < cpus().length; cpu++) {
105 // CPU estimated cycle time
106 const numberOfDigit = cpus()[cpu].speed.toString().length - 1
107 const cpuCycleTime = 1 / (cpus()[cpu].speed / Math.pow(10, numberOfDigit))
108 cpusCycleTimeWeight += cpuCycleTime * Math.pow(10, numberOfDigit)
109 }
110 return cpusCycleTimeWeight / cpus().length
111 }
112
113 private initWorkerTaskRunTime () {
114 for (const worker of this.pool.workers) {
115 this.workerTaskRunTime.set(worker, {
116 weight: this.defaultWorkerWeight,
117 runTime: 0
118 })
119 }
120 }
121
122 private getWorkerVirtualTaskRunTime (worker: Worker): number | undefined {
123 return this.pool.getWorkerAverageTasksRunTime(worker)
124 }
125 }