bf2600d57782873bea28f50f413a7de945a35b35
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type ErrorHandler,
9 type ExitHandler,
10 type IWorker,
11 type IWorkerNode,
12 type MessageHandler,
13 type OnlineHandler,
14 type StrategyData,
15 type WorkerInfo,
16 type WorkerNodeOptions,
17 type WorkerType,
18 WorkerTypes,
19 type WorkerUsage
20 } from './worker.js'
21 import { checkWorkerNodeArguments, createWorker } from './utils.js'
22
23 /**
24 * Worker node.
25 *
26 * @typeParam Worker - Type of worker.
27 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
28 */
29 export class WorkerNode<Worker extends IWorker, Data = unknown>
30 extends EventEmitter
31 implements IWorkerNode<Worker, Data> {
32 /** @inheritdoc */
33 public readonly worker: Worker
34 /** @inheritdoc */
35 public readonly info: WorkerInfo
36 /** @inheritdoc */
37 public usage: WorkerUsage
38 /** @inheritdoc */
39 public strategyData?: StrategyData
40 /** @inheritdoc */
41 public messageChannel?: MessageChannel
42 /** @inheritdoc */
43 public tasksQueueBackPressureSize: number
44 private readonly tasksQueue: Deque<Task<Data>>
45 private onBackPressureStarted: boolean
46 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
47
48 /**
49 * Constructs a new worker node.
50 *
51 * @param type - The worker type.
52 * @param filePath - Path to the worker file.
53 * @param opts - The worker node options.
54 */
55 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
56 super()
57 checkWorkerNodeArguments(type, filePath, opts)
58 this.worker = createWorker<Worker>(type, filePath, {
59 env: opts.env,
60 workerOptions: opts.workerOptions
61 })
62 this.info = this.initWorkerInfo(this.worker)
63 this.usage = this.initWorkerUsage()
64 if (this.info.type === WorkerTypes.thread) {
65 this.messageChannel = new MessageChannel()
66 }
67 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
68 this.tasksQueue = new Deque<Task<Data>>()
69 this.onBackPressureStarted = false
70 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
71 }
72
73 /** @inheritdoc */
74 public tasksQueueSize (): number {
75 return this.tasksQueue.size
76 }
77
78 /** @inheritdoc */
79 public enqueueTask (task: Task<Data>): number {
80 const tasksQueueSize = this.tasksQueue.push(task)
81 if (this.hasBackPressure() && !this.onBackPressureStarted) {
82 this.onBackPressureStarted = true
83 this.emit('backPressure', { workerId: this.info.id as number })
84 this.onBackPressureStarted = false
85 }
86 return tasksQueueSize
87 }
88
89 /** @inheritdoc */
90 public unshiftTask (task: Task<Data>): number {
91 const tasksQueueSize = this.tasksQueue.unshift(task)
92 if (this.hasBackPressure() && !this.onBackPressureStarted) {
93 this.onBackPressureStarted = true
94 this.emit('backPressure', { workerId: this.info.id as number })
95 this.onBackPressureStarted = false
96 }
97 return tasksQueueSize
98 }
99
100 /** @inheritdoc */
101 public dequeueTask (): Task<Data> | undefined {
102 return this.tasksQueue.shift()
103 }
104
105 /** @inheritdoc */
106 public popTask (): Task<Data> | undefined {
107 return this.tasksQueue.pop()
108 }
109
110 /** @inheritdoc */
111 public clearTasksQueue (): void {
112 this.tasksQueue.clear()
113 }
114
115 /** @inheritdoc */
116 public hasBackPressure (): boolean {
117 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
118 }
119
120 /** @inheritdoc */
121 public resetUsage (): void {
122 this.usage = this.initWorkerUsage()
123 this.taskFunctionsUsage.clear()
124 }
125
126 /** @inheritdoc */
127 public async terminate (): Promise<void> {
128 const waitWorkerExit = new Promise<void>(resolve => {
129 this.registerOnceWorkerEventHandler('exit', () => {
130 resolve()
131 })
132 })
133 this.closeMessageChannel()
134 this.removeAllListeners()
135 switch (this.info.type) {
136 case WorkerTypes.thread:
137 await this.worker.terminate?.()
138 break
139 case WorkerTypes.cluster:
140 this.registerOnceWorkerEventHandler('disconnect', () => {
141 this.worker.kill?.()
142 })
143 this.worker.disconnect?.()
144 break
145 }
146 await waitWorkerExit
147 }
148
149 /** @inheritdoc */
150 public registerWorkerEventHandler (
151 event: string,
152 handler:
153 | OnlineHandler<Worker>
154 | MessageHandler<Worker>
155 | ErrorHandler<Worker>
156 | ExitHandler<Worker>
157 ): void {
158 this.worker.on(event, handler)
159 }
160
161 /** @inheritdoc */
162 public registerOnceWorkerEventHandler (
163 event: string,
164 handler:
165 | OnlineHandler<Worker>
166 | MessageHandler<Worker>
167 | ErrorHandler<Worker>
168 | ExitHandler<Worker>
169 ): void {
170 this.worker.once(event, handler)
171 }
172
173 /** @inheritdoc */
174 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
175 if (!Array.isArray(this.info.taskFunctionNames)) {
176 throw new Error(
177 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
178 )
179 }
180 if (
181 Array.isArray(this.info.taskFunctionNames) &&
182 this.info.taskFunctionNames.length < 3
183 ) {
184 throw new Error(
185 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
186 )
187 }
188 if (name === DEFAULT_TASK_NAME) {
189 name = this.info.taskFunctionNames[1]
190 }
191 if (!this.taskFunctionsUsage.has(name)) {
192 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
193 }
194 return this.taskFunctionsUsage.get(name)
195 }
196
197 /** @inheritdoc */
198 public deleteTaskFunctionWorkerUsage (name: string): boolean {
199 return this.taskFunctionsUsage.delete(name)
200 }
201
202 private closeMessageChannel (): void {
203 if (this.messageChannel != null) {
204 this.messageChannel.port1.unref()
205 this.messageChannel.port2.unref()
206 this.messageChannel.port1.close()
207 this.messageChannel.port2.close()
208 delete this.messageChannel
209 }
210 }
211
212 private initWorkerInfo (worker: Worker): WorkerInfo {
213 return {
214 id: getWorkerId(worker),
215 type: getWorkerType(worker) as WorkerType,
216 dynamic: false,
217 ready: false,
218 stealing: false
219 }
220 }
221
222 private initWorkerUsage (): WorkerUsage {
223 const getTasksQueueSize = (): number => {
224 return this.tasksQueue.size
225 }
226 const getTasksQueueMaxSize = (): number => {
227 return this.tasksQueue.maxSize
228 }
229 return {
230 tasks: {
231 executed: 0,
232 executing: 0,
233 get queued (): number {
234 return getTasksQueueSize()
235 },
236 get maxQueued (): number {
237 return getTasksQueueMaxSize()
238 },
239 sequentiallyStolen: 0,
240 stolen: 0,
241 failed: 0
242 },
243 runTime: {
244 history: new CircularArray<number>()
245 },
246 waitTime: {
247 history: new CircularArray<number>()
248 },
249 elu: {
250 idle: {
251 history: new CircularArray<number>()
252 },
253 active: {
254 history: new CircularArray<number>()
255 }
256 }
257 }
258 }
259
260 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
261 const getTaskFunctionQueueSize = (): number => {
262 let taskFunctionQueueSize = 0
263 for (const task of this.tasksQueue) {
264 if (
265 (task.name === DEFAULT_TASK_NAME &&
266 name === (this.info.taskFunctionNames as string[])[1]) ||
267 (task.name !== DEFAULT_TASK_NAME && name === task.name)
268 ) {
269 ++taskFunctionQueueSize
270 }
271 }
272 return taskFunctionQueueSize
273 }
274 return {
275 tasks: {
276 executed: 0,
277 executing: 0,
278 get queued (): number {
279 return getTaskFunctionQueueSize()
280 },
281 sequentiallyStolen: 0,
282 stolen: 0,
283 failed: 0
284 },
285 runTime: {
286 history: new CircularArray<number>()
287 },
288 waitTime: {
289 history: new CircularArray<number>()
290 },
291 elu: {
292 idle: {
293 history: new CircularArray<number>()
294 },
295 active: {
296 history: new CircularArray<number>()
297 }
298 }
299 }
300 }
301 }