fix: fix faire share worker choice stategy internals update
[poolifier.git] / src / pools / selection-strategies / fair-share-worker-choice-strategy.ts
1 import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
2 import type { IPool } from '../pool'
3 import type { IWorker } from '../worker'
4 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
5 import type {
6 IWorkerChoiceStrategy,
7 RequiredStatistics,
8 WorkerChoiceStrategyOptions
9 } from './selection-strategies-types'
10
11 /**
12 * Worker virtual task timestamp.
13 */
14 interface WorkerVirtualTaskTimestamp {
15 start: number
16 end: number
17 }
18
19 /**
20 * Selects the next worker with a fair share scheduling algorithm.
21 * Loosely modeled after the fair queueing algorithm: https://en.wikipedia.org/wiki/Fair_queuing.
22 *
23 * @typeParam Worker - Type of worker which manages the strategy.
24 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
25 * @typeParam Response - Type of execution response. This can only be serializable data.
26 */
27 export class FairShareWorkerChoiceStrategy<
28 Worker extends IWorker,
29 Data = unknown,
30 Response = unknown
31 >
32 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
33 implements IWorkerChoiceStrategy {
34 /** @inheritDoc */
35 public readonly requiredStatistics: RequiredStatistics = {
36 runTime: true,
37 avgRunTime: true,
38 medRunTime: false
39 }
40
41 /**
42 * Workers' virtual task execution timestamp.
43 */
44 private workersVirtualTaskTimestamp: WorkerVirtualTaskTimestamp[] = []
45
46 /** @inheritDoc */
47 public constructor (
48 pool: IPool<Worker, Data, Response>,
49 opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
50 ) {
51 super(pool, opts)
52 this.checkOptions(this.opts)
53 }
54
55 /** @inheritDoc */
56 public reset (): boolean {
57 this.workersVirtualTaskTimestamp = []
58 return true
59 }
60
61 /** @inheritDoc */
62 public update (): boolean {
63 for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
64 this.computeWorkerVirtualTaskTimestamp(workerNodeKey)
65 }
66 return true
67 }
68
69 /** @inheritDoc */
70 public choose (): number {
71 let minWorkerVirtualTaskEndTimestamp = Infinity
72 let chosenWorkerNodeKey!: number
73 for (const [workerNodeKey] of this.pool.workerNodes.entries()) {
74 const workerVirtualTaskEndTimestamp =
75 this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? 0
76 if (workerVirtualTaskEndTimestamp < minWorkerVirtualTaskEndTimestamp) {
77 minWorkerVirtualTaskEndTimestamp = workerVirtualTaskEndTimestamp
78 chosenWorkerNodeKey = workerNodeKey
79 }
80 }
81 return chosenWorkerNodeKey
82 }
83
84 /** @inheritDoc */
85 public remove (workerNodeKey: number): boolean {
86 this.workersVirtualTaskTimestamp.splice(workerNodeKey, 1)
87 return true
88 }
89
90 /**
91 * Computes worker virtual task timestamp.
92 *
93 * @param workerNodeKey - The worker node key.
94 */
95 private computeWorkerVirtualTaskTimestamp (workerNodeKey: number): void {
96 const workerVirtualTaskStartTimestamp = Math.max(
97 performance.now(),
98 this.workersVirtualTaskTimestamp[workerNodeKey]?.end ?? -Infinity
99 )
100 const workerVirtualTaskTRunTime = this.requiredStatistics.medRunTime
101 ? this.pool.workerNodes[workerNodeKey].tasksUsage.medRunTime
102 : this.pool.workerNodes[workerNodeKey].tasksUsage.avgRunTime
103 this.workersVirtualTaskTimestamp[workerNodeKey] = {
104 start: workerVirtualTaskStartTimestamp,
105 end: workerVirtualTaskStartTimestamp + workerVirtualTaskTRunTime
106 }
107 }
108 }