refactor: factor out worker helpers
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4 import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
8 getWorkerId,
9 getWorkerType,
10 sleep
11 } from '../utils'
12 import { Deque } from '../deque'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type StrategyData,
17 type WorkerInfo,
18 type WorkerNodeEventCallback,
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22 } from './worker'
23 import { checkWorkerNodeArguments } from './utils'
24
25 /**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
31 export class WorkerNode<Worker extends IWorker, Data = unknown>
32 implements IWorkerNode<Worker, Data> {
33 /** @inheritdoc */
34 public readonly worker: Worker
35 /** @inheritdoc */
36 public readonly info: WorkerInfo
37 /** @inheritdoc */
38 public usage: WorkerUsage
39 /** @inheritdoc */
40 public strategyData?: StrategyData
41 /** @inheritdoc */
42 public messageChannel?: MessageChannel
43 /** @inheritdoc */
44 public tasksQueueBackPressureSize: number
45 /** @inheritdoc */
46 public onBackPressure?: WorkerNodeEventCallback
47 /** @inheritdoc */
48 public onEmptyQueue?: WorkerNodeEventCallback
49 private readonly tasksQueue: Deque<Task<Data>>
50 private onBackPressureStarted: boolean
51 private onEmptyQueueCount: number
52 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
53
54 /**
55 * Constructs a new worker node.
56 *
57 * @param worker - The worker.
58 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
59 */
60 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
61 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
62 this.worker = worker
63 this.info = this.initWorkerInfo(worker)
64 this.usage = this.initWorkerUsage()
65 if (this.info.type === WorkerTypes.thread) {
66 this.messageChannel = new MessageChannel()
67 }
68 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
69 this.tasksQueue = new Deque<Task<Data>>()
70 this.onBackPressureStarted = false
71 this.onEmptyQueueCount = 0
72 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
73 }
74
75 /** @inheritdoc */
76 public tasksQueueSize (): number {
77 return this.tasksQueue.size
78 }
79
80 /** @inheritdoc */
81 public enqueueTask (task: Task<Data>): number {
82 const tasksQueueSize = this.tasksQueue.push(task)
83 if (
84 this.onBackPressure != null &&
85 this.hasBackPressure() &&
86 !this.onBackPressureStarted
87 ) {
88 this.onBackPressureStarted = true
89 this.onBackPressure(this.info.id as number)
90 this.onBackPressureStarted = false
91 }
92 return tasksQueueSize
93 }
94
95 /** @inheritdoc */
96 public unshiftTask (task: Task<Data>): number {
97 const tasksQueueSize = this.tasksQueue.unshift(task)
98 if (
99 this.onBackPressure != null &&
100 this.hasBackPressure() &&
101 !this.onBackPressureStarted
102 ) {
103 this.onBackPressureStarted = true
104 this.onBackPressure(this.info.id as number)
105 this.onBackPressureStarted = false
106 }
107 return tasksQueueSize
108 }
109
110 /** @inheritdoc */
111 public dequeueTask (): Task<Data> | undefined {
112 const task = this.tasksQueue.shift()
113 if (
114 this.onEmptyQueue != null &&
115 this.tasksQueue.size === 0 &&
116 this.onEmptyQueueCount === 0
117 ) {
118 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
119 }
120 return task
121 }
122
123 /** @inheritdoc */
124 public popTask (): Task<Data> | undefined {
125 const task = this.tasksQueue.pop()
126 if (
127 this.onEmptyQueue != null &&
128 this.tasksQueue.size === 0 &&
129 this.onEmptyQueueCount === 0
130 ) {
131 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
132 }
133 return task
134 }
135
136 /** @inheritdoc */
137 public clearTasksQueue (): void {
138 this.tasksQueue.clear()
139 }
140
141 /** @inheritdoc */
142 public hasBackPressure (): boolean {
143 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
144 }
145
146 /** @inheritdoc */
147 public resetUsage (): void {
148 this.usage = this.initWorkerUsage()
149 this.taskFunctionsUsage.clear()
150 }
151
152 /** @inheritdoc */
153 public closeChannel (): void {
154 if (this.messageChannel != null) {
155 this.messageChannel?.port1.unref()
156 this.messageChannel?.port2.unref()
157 this.messageChannel?.port1.close()
158 this.messageChannel?.port2.close()
159 delete this.messageChannel
160 }
161 }
162
163 /** @inheritdoc */
164 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
165 if (!Array.isArray(this.info.taskFunctionNames)) {
166 throw new Error(
167 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
168 )
169 }
170 if (
171 Array.isArray(this.info.taskFunctionNames) &&
172 this.info.taskFunctionNames.length < 3
173 ) {
174 throw new Error(
175 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
176 )
177 }
178 if (name === DEFAULT_TASK_NAME) {
179 name = this.info.taskFunctionNames[1]
180 }
181 if (!this.taskFunctionsUsage.has(name)) {
182 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
183 }
184 return this.taskFunctionsUsage.get(name)
185 }
186
187 /** @inheritdoc */
188 public deleteTaskFunctionWorkerUsage (name: string): boolean {
189 return this.taskFunctionsUsage.delete(name)
190 }
191
192 private async startOnEmptyQueue (): Promise<void> {
193 if (
194 this.onEmptyQueueCount > 0 &&
195 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
196 ) {
197 this.onEmptyQueueCount = 0
198 return
199 }
200 ++this.onEmptyQueueCount
201 this.onEmptyQueue?.(this.info.id as number)
202 await sleep(exponentialDelay(this.onEmptyQueueCount))
203 await this.startOnEmptyQueue()
204 }
205
206 private initWorkerInfo (worker: Worker): WorkerInfo {
207 return {
208 id: getWorkerId(worker),
209 type: getWorkerType(worker) as WorkerType,
210 dynamic: false,
211 ready: false
212 }
213 }
214
215 private initWorkerUsage (): WorkerUsage {
216 const getTasksQueueSize = (): number => {
217 return this.tasksQueue.size
218 }
219 const getTasksQueueMaxSize = (): number => {
220 return this.tasksQueue.maxSize
221 }
222 return {
223 tasks: {
224 executed: 0,
225 executing: 0,
226 get queued (): number {
227 return getTasksQueueSize()
228 },
229 get maxQueued (): number {
230 return getTasksQueueMaxSize()
231 },
232 stolen: 0,
233 failed: 0
234 },
235 runTime: {
236 history: new CircularArray()
237 },
238 waitTime: {
239 history: new CircularArray()
240 },
241 elu: {
242 idle: {
243 history: new CircularArray()
244 },
245 active: {
246 history: new CircularArray()
247 }
248 }
249 }
250 }
251
252 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
253 const getTaskFunctionQueueSize = (): number => {
254 let taskFunctionQueueSize = 0
255 for (const task of this.tasksQueue) {
256 if (
257 (task.name === DEFAULT_TASK_NAME &&
258 name === (this.info.taskFunctionNames as string[])[1]) ||
259 (task.name !== DEFAULT_TASK_NAME && name === task.name)
260 ) {
261 ++taskFunctionQueueSize
262 }
263 }
264 return taskFunctionQueueSize
265 }
266 return {
267 tasks: {
268 executed: 0,
269 executing: 0,
270 get queued (): number {
271 return getTaskFunctionQueueSize()
272 },
273 stolen: 0,
274 failed: 0
275 },
276 runTime: {
277 history: new CircularArray()
278 },
279 waitTime: {
280 history: new CircularArray()
281 },
282 elu: {
283 idle: {
284 history: new CircularArray()
285 },
286 active: {
287 history: new CircularArray()
288 }
289 }
290 }
291 }
292 }