ef0133ac716b2a01087799154f3ff414518309cc
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { CircularArray } from '../circular-array'
3 import type { Task } from '../utility-types'
4 import {
5 DEFAULT_TASK_NAME,
6 EMPTY_FUNCTION,
7 exponentialDelay,
8 getWorkerId,
9 getWorkerType,
10 sleep
11 } from '../utils'
12 import { Deque } from '../deque'
13 import {
14 type IWorker,
15 type IWorkerNode,
16 type StrategyData,
17 type WorkerInfo,
18 type WorkerNodeEventDetail,
19 type WorkerType,
20 WorkerTypes,
21 type WorkerUsage
22 } from './worker'
23 import { checkWorkerNodeArguments } from './utils'
24
25 /**
26 * Worker node.
27 *
28 * @typeParam Worker - Type of worker.
29 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
30 */
31 export class WorkerNode<Worker extends IWorker, Data = unknown>
32 extends EventTarget
33 implements IWorkerNode<Worker, Data> {
34 /** @inheritdoc */
35 public readonly worker: Worker
36 /** @inheritdoc */
37 public readonly info: WorkerInfo
38 /** @inheritdoc */
39 public usage: WorkerUsage
40 /** @inheritdoc */
41 public strategyData?: StrategyData
42 /** @inheritdoc */
43 public messageChannel?: MessageChannel
44 /** @inheritdoc */
45 public tasksQueueBackPressureSize: number
46 private readonly tasksQueue: Deque<Task<Data>>
47 private onBackPressureStarted: boolean
48 private onEmptyQueueCount: number
49 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
50
51 /**
52 * Constructs a new worker node.
53 *
54 * @param worker - The worker.
55 * @param tasksQueueBackPressureSize - The tasks queue back pressure size.
56 */
57 constructor (worker: Worker, tasksQueueBackPressureSize: number) {
58 super()
59 checkWorkerNodeArguments<Worker>(worker, tasksQueueBackPressureSize)
60 this.worker = worker
61 this.info = this.initWorkerInfo(worker)
62 this.usage = this.initWorkerUsage()
63 if (this.info.type === WorkerTypes.thread) {
64 this.messageChannel = new MessageChannel()
65 }
66 this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
67 this.tasksQueue = new Deque<Task<Data>>()
68 this.onBackPressureStarted = false
69 this.onEmptyQueueCount = 0
70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
80 const tasksQueueSize = this.tasksQueue.push(task)
81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
82 this.onBackPressureStarted = true
83 this.dispatchEvent(
84 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
85 detail: { workerId: this.info.id as number }
86 })
87 )
88 this.onBackPressureStarted = false
89 }
90 return tasksQueueSize
91 }
92
93 /** @inheritdoc */
94 public unshiftTask (task: Task<Data>): number {
95 const tasksQueueSize = this.tasksQueue.unshift(task)
96 if (this.hasBackPressure() && !this.onBackPressureStarted) {
97 this.onBackPressureStarted = true
98 this.dispatchEvent(
99 new CustomEvent<WorkerNodeEventDetail>('backPressure', {
100 detail: { workerId: this.info.id as number }
101 })
102 )
103 this.onBackPressureStarted = false
104 }
105 return tasksQueueSize
106 }
107
108 /** @inheritdoc */
109 public dequeueTask (): Task<Data> | undefined {
110 const task = this.tasksQueue.shift()
111 if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
112 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
113 }
114 return task
115 }
116
117 /** @inheritdoc */
118 public popTask (): Task<Data> | undefined {
119 const task = this.tasksQueue.pop()
120 if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
121 this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
122 }
123 return task
124 }
125
126 /** @inheritdoc */
127 public clearTasksQueue (): void {
128 this.tasksQueue.clear()
129 }
130
131 /** @inheritdoc */
132 public hasBackPressure (): boolean {
133 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
134 }
135
136 /** @inheritdoc */
137 public resetUsage (): void {
138 this.usage = this.initWorkerUsage()
139 this.taskFunctionsUsage.clear()
140 }
141
142 /** @inheritdoc */
143 public closeChannel (): void {
144 if (this.messageChannel != null) {
145 this.messageChannel.port1.unref()
146 this.messageChannel.port2.unref()
147 this.messageChannel.port1.close()
148 this.messageChannel.port2.close()
149 delete this.messageChannel
150 }
151 }
152
153 /** @inheritdoc */
154 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
155 if (!Array.isArray(this.info.taskFunctionNames)) {
156 throw new Error(
157 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
158 )
159 }
160 if (
161 Array.isArray(this.info.taskFunctionNames) &&
162 this.info.taskFunctionNames.length < 3
163 ) {
164 throw new Error(
165 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
166 )
167 }
168 if (name === DEFAULT_TASK_NAME) {
169 name = this.info.taskFunctionNames[1]
170 }
171 if (!this.taskFunctionsUsage.has(name)) {
172 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
173 }
174 return this.taskFunctionsUsage.get(name)
175 }
176
177 /** @inheritdoc */
178 public deleteTaskFunctionWorkerUsage (name: string): boolean {
179 return this.taskFunctionsUsage.delete(name)
180 }
181
182 private async startOnEmptyQueue (): Promise<void> {
183 if (
184 this.onEmptyQueueCount > 0 &&
185 (this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
186 ) {
187 this.onEmptyQueueCount = 0
188 return
189 }
190 ++this.onEmptyQueueCount
191 this.dispatchEvent(
192 new CustomEvent<WorkerNodeEventDetail>('emptyQueue', {
193 detail: { workerId: this.info.id as number }
194 })
195 )
196 await sleep(exponentialDelay(this.onEmptyQueueCount))
197 await this.startOnEmptyQueue()
198 }
199
200 private initWorkerInfo (worker: Worker): WorkerInfo {
201 return {
202 id: getWorkerId(worker),
203 type: getWorkerType(worker) as WorkerType,
204 dynamic: false,
205 ready: false
206 }
207 }
208
209 private initWorkerUsage (): WorkerUsage {
210 const getTasksQueueSize = (): number => {
211 return this.tasksQueue.size
212 }
213 const getTasksQueueMaxSize = (): number => {
214 return this.tasksQueue.maxSize
215 }
216 return {
217 tasks: {
218 executed: 0,
219 executing: 0,
220 get queued (): number {
221 return getTasksQueueSize()
222 },
223 get maxQueued (): number {
224 return getTasksQueueMaxSize()
225 },
226 stolen: 0,
227 failed: 0
228 },
229 runTime: {
230 history: new CircularArray<number>()
231 },
232 waitTime: {
233 history: new CircularArray<number>()
234 },
235 elu: {
236 idle: {
237 history: new CircularArray<number>()
238 },
239 active: {
240 history: new CircularArray<number>()
241 }
242 }
243 }
244 }
245
246 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
247 const getTaskFunctionQueueSize = (): number => {
248 let taskFunctionQueueSize = 0
249 for (const task of this.tasksQueue) {
250 if (
251 (task.name === DEFAULT_TASK_NAME &&
252 name === (this.info.taskFunctionNames as string[])[1]) ||
253 (task.name !== DEFAULT_TASK_NAME && name === task.name)
254 ) {
255 ++taskFunctionQueueSize
256 }
257 }
258 return taskFunctionQueueSize
259 }
260 return {
261 tasks: {
262 executed: 0,
263 executing: 0,
264 get queued (): number {
265 return getTaskFunctionQueueSize()
266 },
267 stolen: 0,
268 failed: 0
269 },
270 runTime: {
271 history: new CircularArray<number>()
272 },
273 waitTime: {
274 history: new CircularArray<number>()
275 },
276 elu: {
277 idle: {
278 history: new CircularArray<number>()
279 },
280 active: {
281 history: new CircularArray<number>()
282 }
283 }
284 }
285 }
286 }