build(deps-dev): apply updates
[poolifier.git] / src / pools / selection-strategies / fair-share-worker-choice-strategy.ts
1 import type { IPool } from '../pool.js'
2 import type { IWorker } from '../worker.js'
3 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy.js'
4 import {
5 type IWorkerChoiceStrategy,
6 Measurements,
7 type TaskStatisticsRequirements,
8 type WorkerChoiceStrategyOptions,
9 } from './selection-strategies-types.js'
10
11 /**
12 * Selects the next worker with a fair share scheduling algorithm.
13 * Loosely modeled after the fair queueing algorithm: https://en.wikipedia.org/wiki/Fair_queuing.
14 * @typeParam Worker - Type of worker which manages the strategy.
15 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
16 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
17 */
18 export class FairShareWorkerChoiceStrategy<
19 Worker extends IWorker,
20 Data = unknown,
21 Response = unknown
22 >
23 extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
24 implements IWorkerChoiceStrategy {
25 /** @inheritDoc */
26 public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
27 runTime: {
28 aggregate: true,
29 average: true,
30 median: false,
31 },
32 waitTime: {
33 aggregate: true,
34 average: true,
35 median: false,
36 },
37 elu: {
38 aggregate: true,
39 average: true,
40 median: false,
41 },
42 }
43
44 /** @inheritDoc */
45 public constructor (
46 pool: IPool<Worker, Data, Response>,
47 opts?: WorkerChoiceStrategyOptions
48 ) {
49 super(pool, opts)
50 this.setTaskStatisticsRequirements(this.opts)
51 }
52
53 /** @inheritDoc */
54 public reset (): boolean {
55 for (const workerNode of this.pool.workerNodes) {
56 delete workerNode.strategyData?.virtualTaskEndTimestamp
57 }
58 return true
59 }
60
61 /** @inheritDoc */
62 public update (workerNodeKey: number): boolean {
63 this.pool.workerNodes[workerNodeKey].strategyData = {
64 virtualTaskEndTimestamp:
65 this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
66 }
67 return true
68 }
69
70 /** @inheritDoc */
71 public choose (): number | undefined {
72 this.setPreviousWorkerNodeKey(this.nextWorkerNodeKey)
73 this.nextWorkerNodeKey = this.fairShareNextWorkerNodeKey()
74 return this.nextWorkerNodeKey
75 }
76
77 /** @inheritDoc */
78 public remove (): boolean {
79 return true
80 }
81
82 private fairShareNextWorkerNodeKey (): number | undefined {
83 return this.pool.workerNodes.reduce(
84 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
85 if (workerNode.strategyData?.virtualTaskEndTimestamp == null) {
86 workerNode.strategyData = {
87 virtualTaskEndTimestamp:
88 this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey),
89 }
90 }
91 return this.isWorkerNodeReady(workerNodeKey) &&
92 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
93 workerNode.strategyData.virtualTaskEndTimestamp! <
94 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95 workerNodes[minWorkerNodeKey].strategyData!.virtualTaskEndTimestamp!
96 ? workerNodeKey
97 : minWorkerNodeKey
98 },
99 0
100 )
101 }
102
103 /**
104 * Computes the worker node key virtual task end timestamp.
105 * @param workerNodeKey - The worker node key.
106 * @returns The worker node key virtual task end timestamp.
107 */
108 private computeWorkerNodeVirtualTaskEndTimestamp (
109 workerNodeKey: number
110 ): number {
111 return this.getWorkerNodeVirtualTaskEndTimestamp(
112 workerNodeKey,
113 this.getWorkerNodeVirtualTaskStartTimestamp(workerNodeKey)
114 )
115 }
116
117 private getWorkerNodeVirtualTaskEndTimestamp (
118 workerNodeKey: number,
119 workerNodeVirtualTaskStartTimestamp: number
120 ): number {
121 const workerNodeTaskExecutionTime =
122 this.getWorkerNodeTaskWaitTime(workerNodeKey) +
123 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
124 (this.opts!.measurement === Measurements.elu
125 ? this.getWorkerNodeTaskElu(workerNodeKey)
126 : this.getWorkerNodeTaskRunTime(workerNodeKey))
127 return workerNodeVirtualTaskStartTimestamp + workerNodeTaskExecutionTime
128 }
129
130 private getWorkerNodeVirtualTaskStartTimestamp (
131 workerNodeKey: number
132 ): number {
133 const virtualTaskEndTimestamp =
134 this.pool.workerNodes[workerNodeKey]?.strategyData
135 ?.virtualTaskEndTimestamp
136 const now = performance.now()
137 return now < (virtualTaskEndTimestamp ?? Number.NEGATIVE_INFINITY)
138 ? // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
139 virtualTaskEndTimestamp!
140 : now
141 }
142 }