107d72384166053e5783f121ef14d0a7abbe69ca
[poolifier.git] / src / worker / abstract-worker.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import type { Worker } from 'node:cluster'
3 import type { MessagePort } from 'node:worker_threads'
4 import { performance } from 'node:perf_hooks'
5 import type {
6 MessageValue,
7 Task,
8 TaskPerformance,
9 WorkerStatistics
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 isAsyncFunction,
15 isPlainObject
16 } from '../utils'
17 import { KillBehaviors, type WorkerOptions } from './worker-options'
18 import type {
19 TaskAsyncFunction,
20 TaskFunction,
21 TaskFunctionOperationReturnType,
22 TaskFunctions,
23 TaskSyncFunction
24 } from './task-functions'
25
26 const DEFAULT_MAX_INACTIVE_TIME = 60000
27 const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
28 /**
29 * The kill behavior option on this worker or its default value.
30 */
31 killBehavior: KillBehaviors.SOFT,
32 /**
33 * The maximum time to keep this worker active while idle.
34 * The pool automatically checks and terminates this worker when the time expires.
35 */
36 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
37 /**
38 * The function to call when the worker is killed.
39 */
40 killHandler: EMPTY_FUNCTION
41 }
42
43 /**
44 * Base class that implements some shared logic for all poolifier workers.
45 *
46 * @typeParam MainWorker - Type of main worker.
47 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
48 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
49 */
50 export abstract class AbstractWorker<
51 MainWorker extends Worker | MessagePort,
52 Data = unknown,
53 Response = unknown
54 > extends AsyncResource {
55 /**
56 * Worker id.
57 */
58 protected abstract id: number
59 /**
60 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
61 */
62 protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
63 /**
64 * Timestamp of the last task processed by this worker.
65 */
66 protected lastTaskTimestamp!: number
67 /**
68 * Performance statistics computation requirements.
69 */
70 protected statistics!: WorkerStatistics
71 /**
72 * Handler id of the `activeInterval` worker activity check.
73 */
74 protected activeInterval?: NodeJS.Timeout
75 /**
76 * Constructs a new poolifier worker.
77 *
78 * @param type - The type of async event.
79 * @param isMain - Whether this is the main worker or not.
80 * @param mainWorker - Reference to main worker.
81 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
82 * @param opts - Options for the worker.
83 */
84 public constructor (
85 type: string,
86 protected readonly isMain: boolean,
87 private readonly mainWorker: MainWorker,
88 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
89 protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
90 ) {
91 super(type)
92 if (this.isMain == null) {
93 throw new Error('isMain parameter is mandatory')
94 }
95 this.checkTaskFunctions(taskFunctions)
96 this.checkWorkerOptions(this.opts)
97 if (!this.isMain) {
98 this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
99 }
100 }
101
102 private checkWorkerOptions (opts: WorkerOptions): void {
103 this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
104 delete this.opts.async
105 }
106
107 private checkValidTaskFunction (
108 name: string,
109 fn: TaskFunction<Data, Response>
110 ): void {
111 if (typeof name !== 'string') {
112 throw new TypeError(
113 'A taskFunctions parameter object key is not a string'
114 )
115 }
116 if (typeof name === 'string' && name.trim().length === 0) {
117 throw new TypeError(
118 'A taskFunctions parameter object key is an empty string'
119 )
120 }
121 if (typeof fn !== 'function') {
122 throw new TypeError(
123 'A taskFunctions parameter object value is not a function'
124 )
125 }
126 }
127
128 /**
129 * Checks if the `taskFunctions` parameter is passed to the constructor.
130 *
131 * @param taskFunctions - The task function(s) parameter that should be checked.
132 */
133 private checkTaskFunctions (
134 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>
135 ): void {
136 if (taskFunctions == null) {
137 throw new Error('taskFunctions parameter is mandatory')
138 }
139 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
140 if (typeof taskFunctions === 'function') {
141 const boundFn = taskFunctions.bind(this)
142 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
143 this.taskFunctions.set(
144 typeof taskFunctions.name === 'string' &&
145 taskFunctions.name.trim().length > 0
146 ? taskFunctions.name
147 : 'fn1',
148 boundFn
149 )
150 } else if (isPlainObject(taskFunctions)) {
151 let firstEntry = true
152 for (const [name, fn] of Object.entries(taskFunctions)) {
153 this.checkValidTaskFunction(name, fn)
154 const boundFn = fn.bind(this)
155 if (firstEntry) {
156 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
157 firstEntry = false
158 }
159 this.taskFunctions.set(name, boundFn)
160 }
161 if (firstEntry) {
162 throw new Error('taskFunctions parameter object is empty')
163 }
164 } else {
165 throw new TypeError(
166 'taskFunctions parameter is not a function or a plain object'
167 )
168 }
169 }
170
171 /**
172 * Checks if the worker has a task function with the given name.
173 *
174 * @param name - The name of the task function to check.
175 * @returns Whether the worker has a task function with the given name or not.
176 */
177 public hasTaskFunction (name: string): TaskFunctionOperationReturnType {
178 try {
179 this.checkTaskFunctionName(name)
180 } catch (error) {
181 return { status: false, error: error as Error }
182 }
183 return { status: this.taskFunctions.has(name) }
184 }
185
186 /**
187 * Adds a task function to the worker.
188 * If a task function with the same name already exists, it is replaced.
189 *
190 * @param name - The name of the task function to add.
191 * @param fn - The task function to add.
192 * @returns Whether the task function was added or not.
193 */
194 public addTaskFunction (
195 name: string,
196 fn: TaskFunction<Data, Response>
197 ): TaskFunctionOperationReturnType {
198 try {
199 this.checkTaskFunctionName(name)
200 if (name === DEFAULT_TASK_NAME) {
201 throw new Error(
202 'Cannot add a task function with the default reserved name'
203 )
204 }
205 if (typeof fn !== 'function') {
206 throw new TypeError('fn parameter is not a function')
207 }
208 const boundFn = fn.bind(this)
209 if (
210 this.taskFunctions.get(name) ===
211 this.taskFunctions.get(DEFAULT_TASK_NAME)
212 ) {
213 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
214 }
215 this.taskFunctions.set(name, boundFn)
216 this.sendTaskFunctionNamesToMainWorker()
217 return { status: true }
218 } catch (error) {
219 return { status: false, error: error as Error }
220 }
221 }
222
223 /**
224 * Removes a task function from the worker.
225 *
226 * @param name - The name of the task function to remove.
227 * @returns Whether the task function existed and was removed or not.
228 */
229 public removeTaskFunction (name: string): TaskFunctionOperationReturnType {
230 try {
231 this.checkTaskFunctionName(name)
232 if (name === DEFAULT_TASK_NAME) {
233 throw new Error(
234 'Cannot remove the task function with the default reserved name'
235 )
236 }
237 if (
238 this.taskFunctions.get(name) ===
239 this.taskFunctions.get(DEFAULT_TASK_NAME)
240 ) {
241 throw new Error(
242 'Cannot remove the task function used as the default task function'
243 )
244 }
245 const deleteStatus = this.taskFunctions.delete(name)
246 this.sendTaskFunctionNamesToMainWorker()
247 return { status: deleteStatus }
248 } catch (error) {
249 return { status: false, error: error as Error }
250 }
251 }
252
253 /**
254 * Lists the names of the worker's task functions.
255 *
256 * @returns The names of the worker's task functions.
257 */
258 public listTaskFunctionNames (): string[] {
259 const names: string[] = [...this.taskFunctions.keys()]
260 let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
261 for (const [name, fn] of this.taskFunctions) {
262 if (
263 name !== DEFAULT_TASK_NAME &&
264 fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
265 ) {
266 defaultTaskFunctionName = name
267 break
268 }
269 }
270 return [
271 names[names.indexOf(DEFAULT_TASK_NAME)],
272 defaultTaskFunctionName,
273 ...names.filter(
274 name => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
275 )
276 ]
277 }
278
279 /**
280 * Sets the default task function to use in the worker.
281 *
282 * @param name - The name of the task function to use as default task function.
283 * @returns Whether the default task function was set or not.
284 */
285 public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType {
286 try {
287 this.checkTaskFunctionName(name)
288 if (name === DEFAULT_TASK_NAME) {
289 throw new Error(
290 'Cannot set the default task function reserved name as the default task function'
291 )
292 }
293 if (!this.taskFunctions.has(name)) {
294 throw new Error(
295 'Cannot set the default task function to a non-existing task function'
296 )
297 }
298 this.taskFunctions.set(
299 DEFAULT_TASK_NAME,
300 this.taskFunctions.get(name) as TaskFunction<Data, Response>
301 )
302 return { status: true }
303 } catch (error) {
304 return { status: false, error: error as Error }
305 }
306 }
307
308 private checkTaskFunctionName (name: string): void {
309 if (typeof name !== 'string') {
310 throw new TypeError('name parameter is not a string')
311 }
312 if (typeof name === 'string' && name.trim().length === 0) {
313 throw new TypeError('name parameter is an empty string')
314 }
315 }
316
317 /**
318 * Handles the ready message sent by the main worker.
319 *
320 * @param message - The ready message.
321 */
322 protected abstract handleReadyMessage (message: MessageValue<Data>): void
323
324 /**
325 * Worker message listener.
326 *
327 * @param message - The received message.
328 */
329 protected messageListener (message: MessageValue<Data>): void {
330 this.checkMessageWorkerId(message)
331 if (message.statistics != null) {
332 // Statistics message received
333 this.statistics = message.statistics
334 } else if (message.checkActive != null) {
335 // Check active message received
336 message.checkActive ? this.startCheckActive() : this.stopCheckActive()
337 } else if (message.taskFunctionOperation != null) {
338 // Task function operation message received
339 this.handleTaskFunctionOperationMessage(message)
340 } else if (message.taskId != null && message.data != null) {
341 // Task message received
342 this.run(message)
343 } else if (message.kill === true) {
344 // Kill message received
345 this.handleKillMessage(message)
346 }
347 }
348
349 protected handleTaskFunctionOperationMessage (
350 message: MessageValue<Data>
351 ): void {
352 const { taskFunctionOperation, taskFunction, taskFunctionName } = message
353 let response!: TaskFunctionOperationReturnType
354 if (taskFunctionOperation === 'add') {
355 response = this.addTaskFunction(
356 taskFunctionName as string,
357 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
358 new Function(`return ${taskFunction as string}`)() as TaskFunction<
359 Data,
360 Response
361 >
362 )
363 } else if (taskFunctionOperation === 'remove') {
364 response = this.removeTaskFunction(taskFunctionName as string)
365 } else if (taskFunctionOperation === 'default') {
366 response = this.setDefaultTaskFunction(taskFunctionName as string)
367 }
368 this.sendToMainWorker({
369 taskFunctionOperation,
370 taskFunctionOperationStatus: response.status,
371 workerError: {
372 name: taskFunctionName as string,
373 message: this.handleError(response.error as Error | string)
374 },
375 workerId: this.id
376 })
377 }
378
379 /**
380 * Handles a kill message sent by the main worker.
381 *
382 * @param message - The kill message.
383 */
384 protected handleKillMessage (message: MessageValue<Data>): void {
385 this.stopCheckActive()
386 if (isAsyncFunction(this.opts.killHandler)) {
387 (this.opts.killHandler?.() as Promise<void>)
388 .then(() => {
389 this.sendToMainWorker({ kill: 'success', workerId: this.id })
390 return null
391 })
392 .catch(() => {
393 this.sendToMainWorker({ kill: 'failure', workerId: this.id })
394 })
395 .finally(() => {
396 this.emitDestroy()
397 })
398 .catch(EMPTY_FUNCTION)
399 } else {
400 try {
401 // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
402 this.opts.killHandler?.() as void
403 this.sendToMainWorker({ kill: 'success', workerId: this.id })
404 } catch {
405 this.sendToMainWorker({ kill: 'failure', workerId: this.id })
406 } finally {
407 this.emitDestroy()
408 }
409 }
410 }
411
412 /**
413 * Check if the message worker id is set and matches the worker id.
414 *
415 * @param message - The message to check.
416 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
417 */
418 private checkMessageWorkerId (message: MessageValue<Data>): void {
419 if (message.workerId == null) {
420 throw new Error('Message worker id is not set')
421 } else if (message.workerId != null && message.workerId !== this.id) {
422 throw new Error(
423 `Message worker id ${message.workerId} does not match the worker id ${this.id}`
424 )
425 }
426 }
427
428 /**
429 * Starts the worker check active interval.
430 */
431 private startCheckActive (): void {
432 this.lastTaskTimestamp = performance.now()
433 this.activeInterval = setInterval(
434 this.checkActive.bind(this),
435 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
436 )
437 }
438
439 /**
440 * Stops the worker check active interval.
441 */
442 private stopCheckActive (): void {
443 if (this.activeInterval != null) {
444 clearInterval(this.activeInterval)
445 delete this.activeInterval
446 }
447 }
448
449 /**
450 * Checks if the worker should be terminated, because its living too long.
451 */
452 private checkActive (): void {
453 if (
454 performance.now() - this.lastTaskTimestamp >
455 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
456 ) {
457 this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
458 }
459 }
460
461 /**
462 * Returns the main worker.
463 *
464 * @returns Reference to the main worker.
465 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
466 */
467 protected getMainWorker (): MainWorker {
468 if (this.mainWorker == null) {
469 throw new Error('Main worker not set')
470 }
471 return this.mainWorker
472 }
473
474 /**
475 * Sends a message to main worker.
476 *
477 * @param message - The response message.
478 */
479 protected abstract sendToMainWorker (
480 message: MessageValue<Response, Data>
481 ): void
482
483 /**
484 * Sends task function names to the main worker.
485 */
486 protected sendTaskFunctionNamesToMainWorker (): void {
487 this.sendToMainWorker({
488 taskFunctionNames: this.listTaskFunctionNames(),
489 workerId: this.id
490 })
491 }
492
493 /**
494 * Handles an error and convert it to a string so it can be sent back to the main worker.
495 *
496 * @param error - The error raised by the worker.
497 * @returns The error message.
498 */
499 protected handleError (error: Error | string): string {
500 return error instanceof Error ? error.message : error
501 }
502
503 /**
504 * Runs the given task.
505 *
506 * @param task - The task to execute.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
508 */
509 protected run (task: Task<Data>): void {
510 const { name, taskId, data } = task
511 const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
512 if (fn == null) {
513 this.sendToMainWorker({
514 workerError: {
515 name: name as string,
516 message: `Task function '${name as string}' not found`,
517 data
518 },
519 workerId: this.id,
520 taskId
521 })
522 return
523 }
524 if (isAsyncFunction(fn)) {
525 this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
526 } else {
527 this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
528 }
529 }
530
531 /**
532 * Runs the given task function synchronously.
533 *
534 * @param fn - Task function that will be executed.
535 * @param task - Input data for the task function.
536 */
537 protected runSync (
538 fn: TaskSyncFunction<Data, Response>,
539 task: Task<Data>
540 ): void {
541 const { name, taskId, data } = task
542 try {
543 let taskPerformance = this.beginTaskPerformance(name)
544 const res = fn(data)
545 taskPerformance = this.endTaskPerformance(taskPerformance)
546 this.sendToMainWorker({
547 data: res,
548 taskPerformance,
549 workerId: this.id,
550 taskId
551 })
552 } catch (error) {
553 this.sendToMainWorker({
554 workerError: {
555 name: name as string,
556 message: this.handleError(error as Error | string),
557 data
558 },
559 workerId: this.id,
560 taskId
561 })
562 } finally {
563 this.updateLastTaskTimestamp()
564 }
565 }
566
567 /**
568 * Runs the given task function asynchronously.
569 *
570 * @param fn - Task function that will be executed.
571 * @param task - Input data for the task function.
572 */
573 protected runAsync (
574 fn: TaskAsyncFunction<Data, Response>,
575 task: Task<Data>
576 ): void {
577 const { name, taskId, data } = task
578 let taskPerformance = this.beginTaskPerformance(name)
579 fn(data)
580 .then(res => {
581 taskPerformance = this.endTaskPerformance(taskPerformance)
582 this.sendToMainWorker({
583 data: res,
584 taskPerformance,
585 workerId: this.id,
586 taskId
587 })
588 return null
589 })
590 .catch(error => {
591 this.sendToMainWorker({
592 workerError: {
593 name: name as string,
594 message: this.handleError(error as Error | string),
595 data
596 },
597 workerId: this.id,
598 taskId
599 })
600 })
601 .finally(() => {
602 this.updateLastTaskTimestamp()
603 })
604 .catch(EMPTY_FUNCTION)
605 }
606
607 private beginTaskPerformance (name?: string): TaskPerformance {
608 this.checkStatistics()
609 return {
610 name: name ?? DEFAULT_TASK_NAME,
611 timestamp: performance.now(),
612 ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
613 }
614 }
615
616 private endTaskPerformance (
617 taskPerformance: TaskPerformance
618 ): TaskPerformance {
619 this.checkStatistics()
620 return {
621 ...taskPerformance,
622 ...(this.statistics.runTime && {
623 runTime: performance.now() - taskPerformance.timestamp
624 }),
625 ...(this.statistics.elu && {
626 elu: performance.eventLoopUtilization(taskPerformance.elu)
627 })
628 }
629 }
630
631 private checkStatistics (): void {
632 if (this.statistics == null) {
633 throw new Error('Performance statistics computation requirements not set')
634 }
635 }
636
637 private updateLastTaskTimestamp (): void {
638 if (this.activeInterval != null) {
639 this.lastTaskTimestamp = performance.now()
640 }
641 }
642 }