fix: fix fair share worker choice strategy internals update
[poolifier.git] / src / pools / selection-strategies / fair-share-worker-choice-strategy.ts
1 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
2 import type { IPool } from '../pool'
3 import type { IWorker } from '../worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type {
6 IWorkerChoiceStrategy,
7 RequiredStatistics,
8 WorkerChoiceStrategyOptions
9 } from './selection-strategies-types'
10
11 /**
12 * Selects the next worker with a fair share scheduling algorithm.
13 * Loosely modeled after the fair queueing algorithm: https://en.wikipedia.org/wiki/Fair_queuing.
14 *
15 * @typeParam Worker - Type of worker which manages the strategy.
16 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
17 * @typeParam Response - Type of execution response. This can only be serializable data.
18 */
19 export class FairShareWorkerChoiceStrategy<
20 Worker extends IWorker,
21 Data = unknown,
22 Response = unknown
23 >
24 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
25 implements IWorkerChoiceStrategy {
26 /** @inheritDoc */
27 public readonly requiredStatistics: RequiredStatistics = {
28 runTime: true,
29 avgRunTime: true,
30 medRunTime: false
31 }
32
33 /**
34 * Workers' virtual task end execution timestamp.
35 */
36 private workersVirtualTaskEndTimestamp: number[] = []
37
38 /** @inheritDoc */
39 public constructor (
40 pool: IPool<Worker, Data, Response>,
41 opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
42 ) {
43 super(pool, opts)
44 this.checkOptions(this.opts)
45 }
46
47 /** @inheritDoc */
48 public reset (): boolean {
49 this.workersVirtualTaskEndTimestamp = []
50 return true
51 }
52
53 /** @inheritDoc */
54 public update (workerNodeKey: number): boolean {
55 this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey)
56 return true
57 }
58
59 /** @inheritDoc */
60 public choose (): number {
61 let minWorkerVirtualTaskEndTimestamp = Infinity
62 let chosenWorkerNodeKey!: number
63 for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
64 if (this.workersVirtualTaskEndTimestamp[workerNodeKey] == null) {
65 this.computeWorkerVirtualTaskEndTimestamp(workerNodeKey)
66 }
67 const workerVirtualTaskEndTimestamp =
68 this.workersVirtualTaskEndTimestamp[workerNodeKey]
69 if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
70 minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
71 chosenWorkerNodeKey = workerNodeKey
72 }
73 }
74 return chosenWorkerNodeKey
75 }
76
77 /** @inheritDoc */
78 public remove (workerNodeKey: number): boolean {
79 this.workersVirtualTaskEndTimestamp.splice(workerNodeKey, 1)
80 return true
81 }
82
83 /**
84 * Computes the worker node key virtual task end timestamp.
85 *
86 * @param workerNodeKey - The worker node key.
87 */
88 private computeWorkerVirtualTaskEndTimestamp (workerNodeKey: number): void {
89 this.workersVirtualTaskEndTimestamp[workerNodeKey] =
90 this.getWorkerVirtualTaskEndTimestamp(
91 workerNodeKey,
92 this.getWorkerVirtualTaskStartTimestamp(workerNodeKey)
93 )
94 }
95
96 private getWorkerVirtualTaskEndTimestamp (
97 workerNodeKey: number,
98 workerVirtualTaskStartTimestamp: number
99 ): number {
100 const workerVirtualTaskRunTime = this.requiredStatistics.medRunTime
101 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
102 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
103 return workerVirtualTaskStartTimestamp + workerVirtualTaskRunTime
104 }
105
106 private getWorkerVirtualTaskStartTimestamp (workerNodeKey: number): number {
107 return Math.max(
108 performance.now(),
109 this.workersVirtualTaskEndTimestamp[workerNodeKey] ?? -Infinity
110 )
111 }
112 }