73b2c70c3f3b05ce1031bf57fe5522c40874cfa6
[poolifier.git] / src / pools / worker-node.ts
1 import { MessageChannel } from 'node:worker_threads'
2 import { EventEmitter } from 'node:events'
3 import { CircularArray } from '../circular-array.js'
4 import type { Task } from '../utility-types.js'
5 import { DEFAULT_TASK_NAME, getWorkerId, getWorkerType } from '../utils.js'
6 import { Deque } from '../deque.js'
7 import {
8 type EventHandler,
9 type IWorker,
10 type IWorkerNode,
11 type StrategyData,
12 type WorkerInfo,
13 type WorkerNodeOptions,
14 type WorkerType,
15 WorkerTypes,
16 type WorkerUsage
17 } from './worker.js'
18 import { checkWorkerNodeArguments, createWorker } from './utils.js'
19
20 /**
21 * Worker node.
22 *
23 * @typeParam Worker - Type of worker.
24 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
25 */
26 export class WorkerNode<Worker extends IWorker, Data = unknown>
27 extends EventEmitter
28 implements IWorkerNode<Worker, Data> {
29 /** @inheritdoc */
30 public readonly worker: Worker
31 /** @inheritdoc */
32 public readonly info: WorkerInfo
33 /** @inheritdoc */
34 public usage: WorkerUsage
35 /** @inheritdoc */
36 public strategyData?: StrategyData
37 /** @inheritdoc */
38 public messageChannel?: MessageChannel
39 /** @inheritdoc */
40 public tasksQueueBackPressureSize: number
41 private readonly tasksQueue: Deque<Task<Data>>
42 private onBackPressureStarted: boolean
43 private readonly taskFunctionsUsage: Map<string, WorkerUsage>
44
45 /**
46 * Constructs a new worker node.
47 *
48 * @param type - The worker type.
49 * @param filePath - Path to the worker file.
50 * @param opts - The worker node options.
51 */
52 constructor (type: WorkerType, filePath: string, opts: WorkerNodeOptions) {
53 super()
54 checkWorkerNodeArguments(type, filePath, opts)
55 this.worker = createWorker<Worker>(type, filePath, {
56 env: opts.env,
57 workerOptions: opts.workerOptions
58 })
59 this.info = this.initWorkerInfo(this.worker)
60 this.usage = this.initWorkerUsage()
61 if (this.info.type === WorkerTypes.thread) {
62 this.messageChannel = new MessageChannel()
63 }
64 this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize
65 this.tasksQueue = new Deque<Task<Data>>()
66 this.onBackPressureStarted = false
67 this.taskFunctionsUsage = new Map<string, WorkerUsage>()
68 }
69
70 /** @inheritdoc */
71 public tasksQueueSize (): number {
72 return this.tasksQueue.size
73 }
74
75 /** @inheritdoc */
76 public enqueueTask (task: Task<Data>): number {
77 const tasksQueueSize = this.tasksQueue.push(task)
78 if (this.hasBackPressure() && !this.onBackPressureStarted) {
79 this.onBackPressureStarted = true
80 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
81 this.emit('backPressure', { workerId: this.info.id! })
82 this.onBackPressureStarted = false
83 }
84 return tasksQueueSize
85 }
86
87 /** @inheritdoc */
88 public unshiftTask (task: Task<Data>): number {
89 const tasksQueueSize = this.tasksQueue.unshift(task)
90 if (this.hasBackPressure() && !this.onBackPressureStarted) {
91 this.onBackPressureStarted = true
92 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
93 this.emit('backPressure', { workerId: this.info.id! })
94 this.onBackPressureStarted = false
95 }
96 return tasksQueueSize
97 }
98
99 /** @inheritdoc */
100 public dequeueTask (): Task<Data> | undefined {
101 return this.tasksQueue.shift()
102 }
103
104 /** @inheritdoc */
105 public popTask (): Task<Data> | undefined {
106 return this.tasksQueue.pop()
107 }
108
109 /** @inheritdoc */
110 public clearTasksQueue (): void {
111 this.tasksQueue.clear()
112 }
113
114 /** @inheritdoc */
115 public hasBackPressure (): boolean {
116 return this.tasksQueue.size >= this.tasksQueueBackPressureSize
117 }
118
119 /** @inheritdoc */
120 public resetUsage (): void {
121 this.usage = this.initWorkerUsage()
122 this.taskFunctionsUsage.clear()
123 }
124
125 /** @inheritdoc */
126 public async terminate (): Promise<void> {
127 const waitWorkerExit = new Promise<void>(resolve => {
128 this.registerOnceWorkerEventHandler('exit', () => {
129 resolve()
130 })
131 })
132 this.closeMessageChannel()
133 this.removeAllListeners()
134 switch (this.info.type) {
135 case WorkerTypes.thread:
136 await this.worker.terminate?.()
137 break
138 case WorkerTypes.cluster:
139 this.registerOnceWorkerEventHandler('disconnect', () => {
140 this.worker.kill?.()
141 })
142 this.worker.disconnect?.()
143 break
144 }
145 await waitWorkerExit
146 }
147
148 /** @inheritdoc */
149 public registerWorkerEventHandler (
150 event: string,
151 handler: EventHandler<Worker>
152 ): void {
153 this.worker.on(event, handler)
154 }
155
156 /** @inheritdoc */
157 public registerOnceWorkerEventHandler (
158 event: string,
159 handler: EventHandler<Worker>
160 ): void {
161 this.worker.once(event, handler)
162 }
163
164 /** @inheritdoc */
165 public getTaskFunctionWorkerUsage (name: string): WorkerUsage | undefined {
166 if (!Array.isArray(this.info.taskFunctionNames)) {
167 throw new Error(
168 `Cannot get task function worker usage for task function name '${name}' when task function names list is not yet defined`
169 )
170 }
171 if (
172 Array.isArray(this.info.taskFunctionNames) &&
173 this.info.taskFunctionNames.length < 3
174 ) {
175 throw new Error(
176 `Cannot get task function worker usage for task function name '${name}' when task function names list has less than 3 elements`
177 )
178 }
179 if (name === DEFAULT_TASK_NAME) {
180 name = this.info.taskFunctionNames[1]
181 }
182 if (!this.taskFunctionsUsage.has(name)) {
183 this.taskFunctionsUsage.set(name, this.initTaskFunctionWorkerUsage(name))
184 }
185 return this.taskFunctionsUsage.get(name)
186 }
187
188 /** @inheritdoc */
189 public deleteTaskFunctionWorkerUsage (name: string): boolean {
190 return this.taskFunctionsUsage.delete(name)
191 }
192
193 private closeMessageChannel (): void {
194 if (this.messageChannel != null) {
195 this.messageChannel.port1.unref()
196 this.messageChannel.port2.unref()
197 this.messageChannel.port1.close()
198 this.messageChannel.port2.close()
199 delete this.messageChannel
200 }
201 }
202
203 private initWorkerInfo (worker: Worker): WorkerInfo {
204 return {
205 id: getWorkerId(worker),
206 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
207 type: getWorkerType(worker)!,
208 dynamic: false,
209 ready: false,
210 stealing: false
211 }
212 }
213
214 private initWorkerUsage (): WorkerUsage {
215 const getTasksQueueSize = (): number => {
216 return this.tasksQueue.size
217 }
218 const getTasksQueueMaxSize = (): number => {
219 return this.tasksQueue.maxSize
220 }
221 return {
222 tasks: {
223 executed: 0,
224 executing: 0,
225 get queued (): number {
226 return getTasksQueueSize()
227 },
228 get maxQueued (): number {
229 return getTasksQueueMaxSize()
230 },
231 sequentiallyStolen: 0,
232 stolen: 0,
233 failed: 0
234 },
235 runTime: {
236 history: new CircularArray<number>()
237 },
238 waitTime: {
239 history: new CircularArray<number>()
240 },
241 elu: {
242 idle: {
243 history: new CircularArray<number>()
244 },
245 active: {
246 history: new CircularArray<number>()
247 }
248 }
249 }
250 }
251
252 private initTaskFunctionWorkerUsage (name: string): WorkerUsage {
253 const getTaskFunctionQueueSize = (): number => {
254 let taskFunctionQueueSize = 0
255 for (const task of this.tasksQueue) {
256 if (
257 (task.name === DEFAULT_TASK_NAME &&
258 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
259 name === this.info.taskFunctionNames![1]) ||
260 (task.name !== DEFAULT_TASK_NAME && name === task.name)
261 ) {
262 ++taskFunctionQueueSize
263 }
264 }
265 return taskFunctionQueueSize
266 }
267 return {
268 tasks: {
269 executed: 0,
270 executing: 0,
271 get queued (): number {
272 return getTaskFunctionQueueSize()
273 },
274 sequentiallyStolen: 0,
275 stolen: 0,
276 failed: 0
277 },
278 runTime: {
279 history: new CircularArray<number>()
280 },
281 waitTime: {
282 history: new CircularArray<number>()
283 },
284 elu: {
285 idle: {
286 history: new CircularArray<number>()
287 },
288 active: {
289 history: new CircularArray<number>()
290 }
291 }
292 }
293 }
294 }