feat: add pool runtime setters
[poolifier.git] / src / pools / selection-strategies / fair-share-worker-choice-strategy.ts
1 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
2 import type { IPool } from '../pool'
3 import type { IWorker } from '../worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type {
6 IWorkerChoiceStrategy,
7 RequiredStatistics,
8 WorkerChoiceStrategyOptions
9 } from './selection-strategies-types'
10
11 /**
12 * Worker virtual task timestamp.
13 */
14 interface WorkerVirtualTaskTimestamp {
15 start: number
16 end: number
17 }
18
19 /**
20 * Selects the next worker with a fair share scheduling algorithm.
21 * Loosely modeled after the fair queueing algorithm: https://en.wikipedia.org/wiki/Fair_queuing.
22 *
23 * @typeParam Worker - Type of worker which manages the strategy.
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of execution response. This can only be serializable data.
26 */
27 export class FairShareWorkerChoiceStrategy<
28 Worker extends IWorker,
29 Data = unknown,
30 Response = unknown
31 >
32 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
33 implements IWorkerChoiceStrategy {
34 /** @inheritDoc */
35 public readonly requiredStatistics: RequiredStatistics = {
36 runTime: true,
37 avgRunTime: true,
38 medRunTime: false
39 }
40
41 /**
42 * Worker last virtual task execution timestamp.
43 */
44 private readonly workerLastVirtualTaskTimestamp: Map<
45 number,
46 WorkerVirtualTaskTimestamp
47 > = new Map<number, WorkerVirtualTaskTimestamp>()
48
49 /** @inheritDoc */
50 public constructor (
51 pool: IPool<Worker, Data, Response>,
52 opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
53 ) {
54 super(pool, opts)
55 this.checkOptions(this.opts)
56 }
57
58 /** @inheritDoc */
59 public reset (): boolean {
60 this.workerLastVirtualTaskTimestamp.clear()
61 return true
62 }
63
64 /** @inheritDoc */
65 public choose (): number {
66 let minWorkerVirtualTaskEndTimestamp = Infinity
67 let chosenWorkerNodeKey!: number
68 for (const [index] of this.pool.workerNodes.entries()) {
69 this.computeWorkerLastVirtualTaskTimestamp(index)
70 const workerLastVirtualTaskEndTimestamp =
71 this.workerLastVirtualTaskTimestamp.get(index)?.end ?? 0
72 if (
73 workerLastVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp
74 ) {
75 minWorkerVirtualTaskEndTimestamp = workerLastVirtualTaskEndTimestamp
76 chosenWorkerNodeKey = index
77 }
78 }
79 return chosenWorkerNodeKey
80 }
81
82 /** @inheritDoc */
83 public remove (workerNodeKey: number): boolean {
84 const deleted = this.workerLastVirtualTaskTimestamp.delete(workerNodeKey)
85 for (const [key, value] of this.workerLastVirtualTaskTimestamp.entries()) {
86 if (key > workerNodeKey) {
87 this.workerLastVirtualTaskTimestamp.set(key - 1, value)
88 }
89 }
90 return deleted
91 }
92
93 /**
94 * Computes worker last virtual task timestamp.
95 *
96 * @param workerNodeKey - The worker node key.
97 */
98 private computeWorkerLastVirtualTaskTimestamp (workerNodeKey: number): void {
99 const workerVirtualTaskStartTimestamp = Math.max(
100 performance.now(),
101 this.workerLastVirtualTaskTimestamp.get(workerNodeKey)?.end ?? -Infinity
102 )
103 const workerVirtualTaskTRunTime = this.requiredStatistics.medRunTime
104 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
105 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
106 this.workerLastVirtualTaskTimestamp.set(workerNodeKey, {
107 start: workerVirtualTaskStartTimestamp,
108 end: workerVirtualTaskStartTimestamp + (workerVirtualTaskTRunTime ?? 0)
109 })
110 }
111 }