build: switch from prettier to rome as code formatter
[poolifier.git] / src / worker / abstract-worker.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import type { Worker } from 'node:cluster'
3 import type { MessagePort } from 'node:worker_threads'
4 import { performance } from 'node:perf_hooks'
5 import type {
6 MessageValue,
7 Task,
8 TaskPerformance,
9 WorkerStatistics
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 isAsyncFunction,
15 isPlainObject
16 } from '../utils'
17 import {
18 type KillBehavior,
19 KillBehaviors,
20 type WorkerOptions
21 } from './worker-options'
22 import type {
23 TaskAsyncFunction,
24 TaskFunction,
25 TaskFunctions,
26 TaskSyncFunction
27 } from './task-functions'
28
29 const DEFAULT_MAX_INACTIVE_TIME = 60000
30 const DEFAULT_KILL_BEHAVIOR: KillBehavior = KillBehaviors.SOFT
31
32 /**
33 * Base class that implements some shared logic for all poolifier workers.
34 *
35 * @typeParam MainWorker - Type of main worker.
36 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
37 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
38 */
39 export abstract class AbstractWorker<
40 MainWorker extends Worker | MessagePort,
41 Data = unknown,
42 Response = unknown
43 > extends AsyncResource {
44 /**
45 * Worker id.
46 */
47 protected abstract id: number
48 /**
49 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
50 */
51 protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
52 /**
53 * Timestamp of the last task processed by this worker.
54 */
55 protected lastTaskTimestamp!: number
56 /**
57 * Performance statistics computation requirements.
58 */
59 protected statistics!: WorkerStatistics
60 /**
61 * Handler id of the `activeInterval` worker activity check.
62 */
63 protected activeInterval?: NodeJS.Timeout
64 /**
65 * Constructs a new poolifier worker.
66 *
67 * @param type - The type of async event.
68 * @param isMain - Whether this is the main worker or not.
69 * @param mainWorker - Reference to main worker.
70 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
71 * @param opts - Options for the worker.
72 */
73 public constructor (
74 type: string,
75 protected readonly isMain: boolean,
76 private readonly mainWorker: MainWorker,
77 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
78 protected readonly opts: WorkerOptions = {
79 /**
80 * The kill behavior option on this worker or its default value.
81 */
82 killBehavior: DEFAULT_KILL_BEHAVIOR,
83 /**
84 * The maximum time to keep this worker active while idle.
85 * The pool automatically checks and terminates this worker when the time expires.
86 */
87 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME
88 }
89 ) {
90 super(type)
91 this.checkWorkerOptions(this.opts)
92 this.checkTaskFunctions(taskFunctions)
93 if (!this.isMain) {
94 this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this))
95 }
96 }
97
98 private checkWorkerOptions (opts: WorkerOptions): void {
99 this.opts.killBehavior = opts.killBehavior ?? DEFAULT_KILL_BEHAVIOR
100 this.opts.maxInactiveTime =
101 opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME
102 delete this.opts.async
103 }
104
105 /**
106 * Checks if the `taskFunctions` parameter is passed to the constructor.
107 *
108 * @param taskFunctions - The task function(s) parameter that should be checked.
109 */
110 private checkTaskFunctions (
111 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>
112 ): void {
113 if (taskFunctions == null) {
114 throw new Error('taskFunctions parameter is mandatory')
115 }
116 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
117 if (typeof taskFunctions === 'function') {
118 const boundFn = taskFunctions.bind(this)
119 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
120 this.taskFunctions.set(
121 typeof taskFunctions.name === 'string' &&
122 taskFunctions.name.trim().length > 0
123 ? taskFunctions.name
124 : 'fn1',
125 boundFn
126 )
127 } else if (isPlainObject(taskFunctions)) {
128 let firstEntry = true
129 for (const [name, fn] of Object.entries(taskFunctions)) {
130 if (typeof name !== 'string') {
131 throw new TypeError(
132 'A taskFunctions parameter object key is not a string'
133 )
134 }
135 if (typeof fn !== 'function') {
136 throw new TypeError(
137 'A taskFunctions parameter object value is not a function'
138 )
139 }
140 const boundFn = fn.bind(this)
141 if (firstEntry) {
142 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
143 firstEntry = false
144 }
145 this.taskFunctions.set(name, boundFn)
146 }
147 if (firstEntry) {
148 throw new Error('taskFunctions parameter object is empty')
149 }
150 } else {
151 throw new TypeError(
152 'taskFunctions parameter is not a function or a plain object'
153 )
154 }
155 }
156
157 /**
158 * Checks if the worker has a task function with the given name.
159 *
160 * @param name - The name of the task function to check.
161 * @returns Whether the worker has a task function with the given name or not.
162 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string.
163 */
164 public hasTaskFunction (name: string): boolean {
165 if (typeof name !== 'string') {
166 throw new TypeError('name parameter is not a string')
167 }
168 return this.taskFunctions.has(name)
169 }
170
171 /**
172 * Adds a task function to the worker.
173 * If a task function with the same name already exists, it is replaced.
174 *
175 * @param name - The name of the task function to add.
176 * @param fn - The task function to add.
177 * @returns Whether the task function was added or not.
178 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string.
179 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
180 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
181 */
182 public addTaskFunction (
183 name: string,
184 fn: TaskFunction<Data, Response>
185 ): boolean {
186 if (typeof name !== 'string') {
187 throw new TypeError('name parameter is not a string')
188 }
189 if (name === DEFAULT_TASK_NAME) {
190 throw new Error(
191 'Cannot add a task function with the default reserved name'
192 )
193 }
194 if (typeof fn !== 'function') {
195 throw new TypeError('fn parameter is not a function')
196 }
197 try {
198 const boundFn = fn.bind(this)
199 if (
200 this.taskFunctions.get(name) ===
201 this.taskFunctions.get(DEFAULT_TASK_NAME)
202 ) {
203 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
204 }
205 this.taskFunctions.set(name, boundFn)
206 return true
207 } catch {
208 return false
209 }
210 }
211
212 /**
213 * Removes a task function from the worker.
214 *
215 * @param name - The name of the task function to remove.
216 * @returns Whether the task function existed and was removed or not.
217 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string.
218 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
219 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function.
220 */
221 public removeTaskFunction (name: string): boolean {
222 if (typeof name !== 'string') {
223 throw new TypeError('name parameter is not a string')
224 }
225 if (name === DEFAULT_TASK_NAME) {
226 throw new Error(
227 'Cannot remove the task function with the default reserved name'
228 )
229 }
230 if (
231 this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME)
232 ) {
233 throw new Error(
234 'Cannot remove the task function used as the default task function'
235 )
236 }
237 return this.taskFunctions.delete(name)
238 }
239
240 /**
241 * Lists the names of the worker's task functions.
242 *
243 * @returns The names of the worker's task functions.
244 */
245 public listTaskFunctions (): string[] {
246 return [...this.taskFunctions.keys()]
247 }
248
249 /**
250 * Sets the default task function to use in the worker.
251 *
252 * @param name - The name of the task function to use as default task function.
253 * @returns Whether the default task function was set or not.
254 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string.
255 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
256 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function.
257 */
258 public setDefaultTaskFunction (name: string): boolean {
259 if (typeof name !== 'string') {
260 throw new TypeError('name parameter is not a string')
261 }
262 if (name === DEFAULT_TASK_NAME) {
263 throw new Error(
264 'Cannot set the default task function reserved name as the default task function'
265 )
266 }
267 if (!this.taskFunctions.has(name)) {
268 throw new Error(
269 'Cannot set the default task function to a non-existing task function'
270 )
271 }
272 try {
273 this.taskFunctions.set(
274 DEFAULT_TASK_NAME,
275 this.taskFunctions.get(name) as TaskFunction<Data, Response>
276 )
277 return true
278 } catch {
279 return false
280 }
281 }
282
283 /**
284 * Handles the ready message sent by the main worker.
285 *
286 * @param message - The ready message.
287 */
288 protected abstract handleReadyMessage (message: MessageValue<Data>): void
289
290 /**
291 * Worker message listener.
292 *
293 * @param message - The received message.
294 */
295 protected messageListener (message: MessageValue<Data>): void {
296 if (message.workerId != null && message.workerId !== this.id) {
297 throw new Error('Message worker id does not match worker id')
298 } else if (message.workerId === this.id) {
299 if (message.statistics != null) {
300 // Statistics message received
301 this.statistics = message.statistics
302 } else if (message.checkActive != null) {
303 // Check active message received
304 !this.isMain && message.checkActive
305 ? this.startCheckActive()
306 : this.stopCheckActive()
307 } else if (message.taskId != null && message.data != null) {
308 // Task message received
309 this.run(message)
310 } else if (message.kill === true) {
311 // Kill message received
312 this.handleKillMessage(message)
313 }
314 }
315 }
316
317 /**
318 * Handles a kill message sent by the main worker.
319 *
320 * @param message - The kill message.
321 */
322 protected handleKillMessage (message: MessageValue<Data>): void {
323 !this.isMain && this.stopCheckActive()
324 this.emitDestroy()
325 }
326
327 /**
328 * Starts the worker check active interval.
329 */
330 private startCheckActive (): void {
331 this.lastTaskTimestamp = performance.now()
332 this.activeInterval = setInterval(
333 this.checkActive.bind(this),
334 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
335 )
336 }
337
338 /**
339 * Stops the worker check active interval.
340 */
341 private stopCheckActive (): void {
342 if (this.activeInterval != null) {
343 clearInterval(this.activeInterval)
344 delete this.activeInterval
345 }
346 }
347
348 /**
349 * Checks if the worker should be terminated, because its living too long.
350 */
351 private checkActive (): void {
352 if (
353 performance.now() - this.lastTaskTimestamp >
354 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
355 ) {
356 this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
357 }
358 }
359
360 /**
361 * Returns the main worker.
362 *
363 * @returns Reference to the main worker.
364 */
365 protected getMainWorker (): MainWorker {
366 if (this.mainWorker == null) {
367 throw new Error('Main worker not set')
368 }
369 return this.mainWorker
370 }
371
372 /**
373 * Sends a message to main worker.
374 *
375 * @param message - The response message.
376 */
377 protected abstract sendToMainWorker (
378 message: MessageValue<Response, Data>
379 ): void
380
381 /**
382 * Handles an error and convert it to a string so it can be sent back to the main worker.
383 *
384 * @param e - The error raised by the worker.
385 * @returns The error message.
386 */
387 protected handleError (e: Error | string): string {
388 return e instanceof Error ? e.message : e
389 }
390
391 /**
392 * Runs the given task.
393 *
394 * @param task - The task to execute.
395 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
396 */
397 protected run (task: Task<Data>): void {
398 if (this.isMain) {
399 throw new Error('Cannot run a task in the main worker')
400 }
401 const fn = this.getTaskFunction(task.name)
402 if (isAsyncFunction(fn)) {
403 this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
404 } else {
405 this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
406 }
407 }
408
409 /**
410 * Runs the given task function synchronously.
411 *
412 * @param fn - Task function that will be executed.
413 * @param task - Input data for the task function.
414 */
415 protected runSync (
416 fn: TaskSyncFunction<Data, Response>,
417 task: Task<Data>
418 ): void {
419 try {
420 let taskPerformance = this.beginTaskPerformance(task.name)
421 const res = fn(task.data)
422 taskPerformance = this.endTaskPerformance(taskPerformance)
423 this.sendToMainWorker({
424 data: res,
425 taskPerformance,
426 workerId: this.id,
427 taskId: task.taskId
428 })
429 } catch (e) {
430 const errorMessage = this.handleError(e as Error | string)
431 this.sendToMainWorker({
432 taskError: {
433 name: task.name ?? DEFAULT_TASK_NAME,
434 message: errorMessage,
435 data: task.data
436 },
437 workerId: this.id,
438 taskId: task.taskId
439 })
440 } finally {
441 this.updateLastTaskTimestamp()
442 }
443 }
444
445 /**
446 * Runs the given task function asynchronously.
447 *
448 * @param fn - Task function that will be executed.
449 * @param task - Input data for the task function.
450 */
451 protected runAsync (
452 fn: TaskAsyncFunction<Data, Response>,
453 task: Task<Data>
454 ): void {
455 let taskPerformance = this.beginTaskPerformance(task.name)
456 fn(task.data)
457 .then((res) => {
458 taskPerformance = this.endTaskPerformance(taskPerformance)
459 this.sendToMainWorker({
460 data: res,
461 taskPerformance,
462 workerId: this.id,
463 taskId: task.taskId
464 })
465 return null
466 })
467 .catch((e) => {
468 const errorMessage = this.handleError(e as Error | string)
469 this.sendToMainWorker({
470 taskError: {
471 name: task.name ?? DEFAULT_TASK_NAME,
472 message: errorMessage,
473 data: task.data
474 },
475 workerId: this.id,
476 taskId: task.taskId
477 })
478 })
479 .finally(() => {
480 this.updateLastTaskTimestamp()
481 })
482 .catch(EMPTY_FUNCTION)
483 }
484
485 /**
486 * Gets the task function with the given name.
487 *
488 * @param name - Name of the task function that will be returned.
489 * @returns The task function.
490 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
491 */
492 private getTaskFunction (name?: string): TaskFunction<Data, Response> {
493 name = name ?? DEFAULT_TASK_NAME
494 const fn = this.taskFunctions.get(name)
495 if (fn == null) {
496 throw new Error(`Task function '${name}' not found`)
497 }
498 return fn
499 }
500
501 private beginTaskPerformance (name?: string): TaskPerformance {
502 this.checkStatistics()
503 return {
504 name: name ?? DEFAULT_TASK_NAME,
505 timestamp: performance.now(),
506 ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
507 }
508 }
509
510 private endTaskPerformance (
511 taskPerformance: TaskPerformance
512 ): TaskPerformance {
513 this.checkStatistics()
514 return {
515 ...taskPerformance,
516 ...(this.statistics.runTime && {
517 runTime: performance.now() - taskPerformance.timestamp
518 }),
519 ...(this.statistics.elu && {
520 elu: performance.eventLoopUtilization(taskPerformance.elu)
521 })
522 }
523 }
524
525 private checkStatistics (): void {
526 if (this.statistics == null) {
527 throw new Error('Performance statistics computation requirements not set')
528 }
529 }
530
531 private updateLastTaskTimestamp (): void {
532 if (!this.isMain && this.activeInterval != null) {
533 this.lastTaskTimestamp = performance.now()
534 }
535 }
536 }