refactor: code cleanups
[poolifier.git] / src / pools / worker-node.ts
CommitLineData
85aeb3f3 1import { MessageChannel } from 'node:worker_threads'
4b628b48 2import { CircularArray } from '../circular-array'
5c4d16da 3import type { Task } from '../utility-types'
68cbdc84
JB
4import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
75de9f41
JB
8 getWorkerId,
9 getWorkerType,
68cbdc84
JB
10 sleep
11} from '../utils'
574b351d 12import { Deque } from '../deque'
4b628b48
JB
13import {
14 type IWorker,
15 type IWorkerNode,
f3a91bac 16 type StrategyData,
4b628b48 17 type WorkerInfo,
a9780ad2 18 type WorkerNodeEventCallback,
4b628b48
JB
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22} from './worker'
23
60664f48
JB
24/**
25 * Worker node.
26 *
27 * @typeParam Worker - Type of worker.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 */
4b628b48
JB
30export class WorkerNode<Worker extends IWorker, Data = unknown>
31implements IWorkerNode<Worker, Data> {
671d5154 32 /** @inheritdoc */
4b628b48 33 public readonly worker: Worker
671d5154 34 /** @inheritdoc */
4b628b48 35 public readonly info: WorkerInfo
671d5154 36 /** @inheritdoc */
4b628b48 37 public usage: WorkerUsage
20c6f652 38 /** @inheritdoc */
f3a91bac
JB
39 public strategyData?: StrategyData
40 /** @inheritdoc */
26fb3c18
JB
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
20c6f652 43 public tasksQueueBackPressureSize: number
72695f86 44 /** @inheritdoc */
a9780ad2 45 public onBackPressure?: WorkerNodeEventCallback
dd951876 46 /** @inheritdoc */
a9780ad2 47 public onEmptyQueue?: WorkerNodeEventCallback
574b351d 48 private readonly tasksQueue: Deque<Task<Data>>
68cbdc84 49 private onEmptyQueueCount: number
26fb3c18 50 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
4b628b48 51
60664f48
JB
52 /**
53 * Constructs a new worker node.
54 *
55 * @param worker - The worker.
20c6f652 56 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
60664f48 57 */
75de9f41 58 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
5b49e864 59 this.checkWorkerNodeArguments(worker, tasksQueueBackPressureSize)
4b628b48 60 this.worker = worker
75de9f41 61 this.info = this.initWorkerInfo(worker)
26fb3c18 62 this.usage = this.initWorkerUsage()
75de9f41 63 if (this.info.type === WorkerTypes.thread) {
7884d183
JB
64 this.messageChannel = new MessageChannel()
65 }
20c6f652 66 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
26fb3c18 67 this.tasksQueue = new Deque<Task<Data>>()
68cbdc84 68 this.onEmptyQueueCount = 0
26fb3c18 69 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
4b628b48
JB
70 }
71
72 /** @inheritdoc */
73 public tasksQueueSize (): number {
74 return this.tasksQueue.size
75 }
76
4b628b48
JB
77 /** @inheritdoc */
78 public enqueueTask (task: Task<Data>): number {
72695f86
JB
79 const tasksQueueSize = this.tasksQueue.push(task)
80 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 81 this.onBackPressure(this.info.id as number)
72695f86
JB
82 }
83 return tasksQueueSize
84 }
85
86 /** @inheritdoc */
87 public unshiftTask (task: Task<Data>): number {
88 const tasksQueueSize = this.tasksQueue.unshift(task)
89 if (this.onBackPressure != null && this.hasBackPressure()) {
0741fbeb 90 this.onBackPressure(this.info.id as number)
72695f86
JB
91 }
92 return tasksQueueSize
4b628b48
JB
93 }
94
95 /** @inheritdoc */
96 public dequeueTask (): Task<Data> | undefined {
dd951876
JB
97 const task = this.tasksQueue.shift()
98 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 99 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
100 }
101 return task
4b628b48
JB
102 }
103
72695f86
JB
104 /** @inheritdoc */
105 public popTask (): Task<Data> | undefined {
dd951876
JB
106 const task = this.tasksQueue.pop()
107 if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
68cbdc84 108 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
dd951876
JB
109 }
110 return task
72695f86
JB
111 }
112
4b628b48
JB
113 /** @inheritdoc */
114 public clearTasksQueue (): void {
115 this.tasksQueue.clear()
116 }
117
671d5154
JB
118 /** @inheritdoc */
119 public hasBackPressure (): boolean {
8735b4e5 120 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
671d5154
JB
121 }
122
ff128cc9 123 /** @inheritdoc */
4b628b48
JB
124 public resetUsage (): void {
125 this.usage = this.initWorkerUsage()
db0e38ee 126 this.taskFunctionsUsage.clear()
ff128cc9
JB
127 }
128
3f09ed9f
JB
129 /** @inheritdoc */
130 public closeChannel (): void {
7884d183
JB
131 if (this.messageChannel != null) {
132 this.messageChannel?.port1.unref()
133 this.messageChannel?.port2.unref()
134 this.messageChannel?.port1.close()
135 this.messageChannel?.port2.close()
136 delete this.messageChannel
3f09ed9f
JB
137 }
138 }
139
ff128cc9 140 /** @inheritdoc */
db0e38ee 141 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
6703b9f4 142 if (!Array.isArray(this.info.taskFunctionNames)) {
71b2b6d8 143 throw new Error(
db0e38ee 144 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
71b2b6d8
JB
145 )
146 }
b558f6b5 147 if (
6703b9f4
JB
148 Array.isArray(this.info.taskFunctionNames) &&
149 this.info.taskFunctionNames.length < 3
b558f6b5 150 ) {
db0e38ee
JB
151 throw new Error(
152 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
153 )
154 }
155 if (name === DEFAULT_TASK_NAME) {
6703b9f4 156 name = this.info.taskFunctionNames[1]
b558f6b5 157 }
db0e38ee
JB
158 if (!this.taskFunctionsUsage.has(name)) {
159 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
ff128cc9 160 }
db0e38ee 161 return this.taskFunctionsUsage.get(name)
4b628b48
JB
162 }
163
68cbdc84 164 private async startOnEmptyQueue (): Promise<void> {
1f0766e7
JB
165 if (
166 this.onEmptyQueueCount > 0 &&
79b197bb 167 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
1f0766e7 168 ) {
60b7a7cc
JB
169 this.onEmptyQueueCount = 0
170 return
171 }
a9780ad2 172 (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
60b7a7cc
JB
173 ++this.onEmptyQueueCount
174 await sleep(exponentialDelay(this.onEmptyQueueCount))
175 await this.startOnEmptyQueue()
68cbdc84
JB
176 }
177
75de9f41 178 private initWorkerInfo (worker: Worker): WorkerInfo {
4b628b48 179 return {
75de9f41
JB
180 id: getWorkerId(worker),
181 type: getWorkerType(worker) as WorkerType,
4b628b48 182 dynamic: false,
7884d183 183 ready: false
4b628b48
JB
184 }
185 }
186
187 private initWorkerUsage (): WorkerUsage {
188 const getTasksQueueSize = (): number => {
dd951876 189 return this.tasksQueue.size
4b628b48 190 }
bf4ef2ca 191 const getTasksQueueMaxSize = (): number => {
dd951876 192 return this.tasksQueue.maxSize
4b628b48
JB
193 }
194 return {
195 tasks: {
196 executed: 0,
197 executing: 0,
198 get queued (): number {
199 return getTasksQueueSize()
200 },
201 get maxQueued (): number {
bf4ef2ca 202 return getTasksQueueMaxSize()
4b628b48 203 },
68cbdc84 204 stolen: 0,
4b628b48
JB
205 failed: 0
206 },
207 runTime: {
208 history: new CircularArray()
209 },
210 waitTime: {
211 history: new CircularArray()
212 },
213 elu: {
214 idle: {
215 history: new CircularArray()
216 },
217 active: {
218 history: new CircularArray()
219 }
220 }
221 }
222 }
223
db0e38ee 224 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
e5ece61d
JB
225 const getTaskFunctionQueueSize = (): number => {
226 let taskFunctionQueueSize = 0
b25a42cd 227 for (const task of this.tasksQueue) {
dd92a715 228 if (
e5ece61d 229 (task.name === DEFAULT_TASK_NAME &&
6703b9f4 230 name === (this.info.taskFunctionNames as string[])[1]) ||
e5ece61d 231 (task.name !== DEFAULT_TASK_NAME && name === task.name)
dd92a715 232 ) {
e5ece61d 233 ++taskFunctionQueueSize
b25a42cd
JB
234 }
235 }
e5ece61d 236 return taskFunctionQueueSize
b25a42cd
JB
237 }
238 return {
239 tasks: {
240 executed: 0,
241 executing: 0,
242 get queued (): number {
e5ece61d 243 return getTaskFunctionQueueSize()
b25a42cd 244 },
68cbdc84 245 stolen: 0,
b25a42cd
JB
246 failed: 0
247 },
248 runTime: {
249 history: new CircularArray()
250 },
251 waitTime: {
252 history: new CircularArray()
253 },
254 elu: {
255 idle: {
256 history: new CircularArray()
257 },
258 active: {
259 history: new CircularArray()
260 }
261 }
262 }
263 }
5b49e864
JB
264
265 private checkWorkerNodeArguments (
266 worker: Worker,
267 tasksQueueBackPressureSize: number
268 ): void {
269 if (worker == null) {
270 throw new TypeError('Cannot construct a worker node without a worker')
271 }
272 if (tasksQueueBackPressureSize == null) {
273 throw new TypeError(
274 'Cannot construct a worker node without a tasks queue back pressure size'
275 )
276 }
277 if (!Number.isSafeInteger(tasksQueueBackPressureSize)) {
278 throw new TypeError(
279 'Cannot construct a worker node with a tasks queue back pressure size that is not an integer'
280 )
281 }
282 if (tasksQueueBackPressureSize <= 0) {
283 throw new RangeError(
284 'Cannot construct a worker node with a tasks queue back pressure size that is not a positive integer'
285 )
286 }
287 }
4b628b48 288}