Merge branch 'master' of github.com:poolifier/poolifier into feature/task-functions
[poolifier.git] / src / worker / abstract-worker.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import type { Worker } from 'node:cluster'
3 import type { MessagePort } from 'node:worker_threads'
4 import { performance } from 'node:perf_hooks'
5 import type {
6 MessageValue,
7 Task,
8 TaskPerformance,
9 WorkerStatistics
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 isAsyncFunction,
15 isPlainObject
16 } from '../utils'
17 import { KillBehaviors, type WorkerOptions } from './worker-options'
18 import type {
19 TaskAsyncFunction,
20 TaskFunction,
21 TaskFunctionOperationReturnType,
22 TaskFunctions,
23 TaskSyncFunction
24 } from './task-functions'
25
26 const DEFAULT_MAX_INACTIVE_TIME = 60000
27 const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
28 /**
29 * The kill behavior option on this worker or its default value.
30 */
31 killBehavior: KillBehaviors.SOFT,
32 /**
33 * The maximum time to keep this worker active while idle.
34 * The pool automatically checks and terminates this worker when the time expires.
35 */
36 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
37 /**
38 * The function to call when the worker is killed.
39 */
40 killHandler: EMPTY_FUNCTION
41 }
42
43 /**
44 * Base class that implements some shared logic for all poolifier workers.
45 *
46 * @typeParam MainWorker - Type of main worker.
47 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
48 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
49 */
50 export abstract class AbstractWorker<
51 MainWorker extends Worker | MessagePort,
52 Data = unknown,
53 Response = unknown
54 > extends AsyncResource {
55 /**
56 * Worker id.
57 */
58 protected abstract id: number
59 /**
60 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
61 */
62 protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
63 /**
64 * Timestamp of the last task processed by this worker.
65 */
66 protected lastTaskTimestamp!: number
67 /**
68 * Performance statistics computation requirements.
69 */
70 protected statistics!: WorkerStatistics
71 /**
72 * Handler id of the `activeInterval` worker activity check.
73 */
74 protected activeInterval?: NodeJS.Timeout
75 /**
76 * Constructs a new poolifier worker.
77 *
78 * @param type - The type of async event.
79 * @param isMain - Whether this is the main worker or not.
80 * @param mainWorker - Reference to main worker.
81 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
82 * @param opts - Options for the worker.
83 */
84 public constructor (
85 type: string,
86 protected readonly isMain: boolean,
87 private readonly mainWorker: MainWorker,
88 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
89 protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
90 ) {
91 super(type)
92 if (this.isMain == null) {
93 throw new Error('isMain parameter is mandatory')
94 }
95 this.checkTaskFunctions(taskFunctions)
96 this.checkWorkerOptions(this.opts)
97 if (!this.isMain) {
98 this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
99 }
100 }
101
102 private checkWorkerOptions (opts: WorkerOptions): void {
103 this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
104 delete this.opts.async
105 }
106
107 private checkValidTaskFunction (
108 name: string,
109 fn: TaskFunction<Data, Response>
110 ): void {
111 if (typeof name !== 'string') {
112 throw new TypeError(
113 'A taskFunctions parameter object key is not a string'
114 )
115 }
116 if (typeof name === 'string' && name.trim().length === 0) {
117 throw new TypeError(
118 'A taskFunctions parameter object key is an empty string'
119 )
120 }
121 if (typeof fn !== 'function') {
122 throw new TypeError(
123 'A taskFunctions parameter object value is not a function'
124 )
125 }
126 }
127
128 /**
129 * Checks if the `taskFunctions` parameter is passed to the constructor.
130 *
131 * @param taskFunctions - The task function(s) parameter that should be checked.
132 */
133 private checkTaskFunctions (
134 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>
135 ): void {
136 if (taskFunctions == null) {
137 throw new Error('taskFunctions parameter is mandatory')
138 }
139 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
140 if (typeof taskFunctions === 'function') {
141 const boundFn = taskFunctions.bind(this)
142 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
143 this.taskFunctions.set(
144 typeof taskFunctions.name === 'string' &&
145 taskFunctions.name.trim().length > 0
146 ? taskFunctions.name
147 : 'fn1',
148 boundFn
149 )
150 } else if (isPlainObject(taskFunctions)) {
151 let firstEntry = true
152 for (const [name, fn] of Object.entries(taskFunctions)) {
153 this.checkValidTaskFunction(name, fn)
154 const boundFn = fn.bind(this)
155 if (firstEntry) {
156 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
157 firstEntry = false
158 }
159 this.taskFunctions.set(name, boundFn)
160 }
161 if (firstEntry) {
162 throw new Error('taskFunctions parameter object is empty')
163 }
164 } else {
165 throw new TypeError(
166 'taskFunctions parameter is not a function or a plain object'
167 )
168 }
169 }
170
171 /**
172 * Checks if the worker has a task function with the given name.
173 *
174 * @param name - The name of the task function to check.
175 * @returns Whether the worker has a task function with the given name or not.
176 */
177 public hasTaskFunction (name: string): TaskFunctionOperationReturnType {
178 try {
179 this.checkTaskFunctionName(name)
180 } catch (error) {
181 return { status: false, error: error as Error }
182 }
183 return { status: this.taskFunctions.has(name) }
184 }
185
186 /**
187 * Adds a task function to the worker.
188 * If a task function with the same name already exists, it is replaced.
189 *
190 * @param name - The name of the task function to add.
191 * @param fn - The task function to add.
192 * @returns Whether the task function was added or not.
193 */
194 public addTaskFunction (
195 name: string,
196 fn: TaskFunction<Data, Response>
197 ): TaskFunctionOperationReturnType {
198 try {
199 this.checkTaskFunctionName(name)
200 if (name === DEFAULT_TASK_NAME) {
201 throw new Error(
202 'Cannot add a task function with the default reserved name'
203 )
204 }
205 if (typeof fn !== 'function') {
206 throw new TypeError('fn parameter is not a function')
207 }
208 const boundFn = fn.bind(this)
209 if (
210 this.taskFunctions.get(name) ===
211 this.taskFunctions.get(DEFAULT_TASK_NAME)
212 ) {
213 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
214 }
215 this.taskFunctions.set(name, boundFn)
216 this.sendTaskFunctionNamesToMainWorker()
217 return { status: true }
218 } catch (error) {
219 return { status: false, error: error as Error }
220 }
221 }
222
223 /**
224 * Removes a task function from the worker.
225 *
226 * @param name - The name of the task function to remove.
227 * @returns Whether the task function existed and was removed or not.
228 */
229 public removeTaskFunction (name: string): TaskFunctionOperationReturnType {
230 try {
231 this.checkTaskFunctionName(name)
232 if (name === DEFAULT_TASK_NAME) {
233 throw new Error(
234 'Cannot remove the task function with the default reserved name'
235 )
236 }
237 if (
238 this.taskFunctions.get(name) ===
239 this.taskFunctions.get(DEFAULT_TASK_NAME)
240 ) {
241 throw new Error(
242 'Cannot remove the task function used as the default task function'
243 )
244 }
245 const deleteStatus = this.taskFunctions.delete(name)
246 this.sendTaskFunctionNamesToMainWorker()
247 return { status: deleteStatus }
248 } catch (error) {
249 return { status: false, error: error as Error }
250 }
251 }
252
253 /**
254 * Lists the names of the worker's task functions.
255 *
256 * @returns The names of the worker's task functions.
257 */
258 public listTaskFunctionNames (): string[] {
259 const names: string[] = [...this.taskFunctions.keys()]
260 let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
261 for (const [name, fn] of this.taskFunctions) {
262 if (
263 name !== DEFAULT_TASK_NAME &&
264 fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
265 ) {
266 defaultTaskFunctionName = name
267 break
268 }
269 }
270 return [
271 names[names.indexOf(DEFAULT_TASK_NAME)],
272 defaultTaskFunctionName,
273 ...names.filter(
274 name => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
275 )
276 ]
277 }
278
279 /**
280 * Sets the default task function to use in the worker.
281 *
282 * @param name - The name of the task function to use as default task function.
283 * @returns Whether the default task function was set or not.
284 */
285 public setDefaultTaskFunction (name: string): TaskFunctionOperationReturnType {
286 try {
287 this.checkTaskFunctionName(name)
288 if (name === DEFAULT_TASK_NAME) {
289 throw new Error(
290 'Cannot set the default task function reserved name as the default task function'
291 )
292 }
293 if (!this.taskFunctions.has(name)) {
294 throw new Error(
295 'Cannot set the default task function to a non-existing task function'
296 )
297 }
298 this.taskFunctions.set(
299 DEFAULT_TASK_NAME,
300 this.taskFunctions.get(name) as TaskFunction<Data, Response>
301 )
302 return { status: true }
303 } catch (error) {
304 return { status: false, error: error as Error }
305 }
306 }
307
308 private checkTaskFunctionName (name: string): void {
309 if (typeof name !== 'string') {
310 throw new TypeError('name parameter is not a string')
311 }
312 if (typeof name === 'string' && name.trim().length === 0) {
313 throw new TypeError('name parameter is an empty string')
314 }
315 }
316
317 /**
318 * Handles the ready message sent by the main worker.
319 *
320 * @param message - The ready message.
321 */
322 protected abstract handleReadyMessage (message: MessageValue<Data>): void
323
324 /**
325 * Worker message listener.
326 *
327 * @param message - The received message.
328 */
329 protected messageListener (message: MessageValue<Data>): void {
330 this.checkMessageWorkerId(message)
331 if (message.statistics != null) {
332 // Statistics message received
333 this.statistics = message.statistics
334 } else if (message.checkActive != null) {
335 // Check active message received
336 message.checkActive ? this.startCheckActive() : this.stopCheckActive()
337 } else if (message.taskFunctionOperation != null) {
338 // Task function operation message received
339 this.handleTaskFunctionOperationMessage(message)
340 } else if (message.taskId != null && message.data != null) {
341 // Task message received
342 this.run(message)
343 } else if (message.kill === true) {
344 // Kill message received
345 this.handleKillMessage(message)
346 }
347 }
348
349 protected handleTaskFunctionOperationMessage (
350 message: MessageValue<Data>
351 ): void {
352 const { taskFunctionOperation, taskFunction, taskFunctionName } = message
353 let response!: TaskFunctionOperationReturnType
354 if (taskFunctionOperation === 'add') {
355 response = this.addTaskFunction(
356 taskFunctionName as string,
357 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
358 new Function(`return ${taskFunction as string}`)() as TaskFunction<
359 Data,
360 Response
361 >
362 )
363 } else if (taskFunctionOperation === 'remove') {
364 response = this.removeTaskFunction(taskFunctionName as string)
365 } else if (taskFunctionOperation === 'default') {
366 response = this.setDefaultTaskFunction(taskFunctionName as string)
367 }
368 this.sendToMainWorker({
369 taskFunctionOperation,
370 taskFunctionOperationStatus: response.status,
371 workerError: {
372 name: taskFunctionName as string,
373 message: this.handleError(response.error as Error | string)
374 }
375 })
376 }
377
378 /**
379 * Handles a kill message sent by the main worker.
380 *
381 * @param message - The kill message.
382 */
383 protected handleKillMessage (message: MessageValue<Data>): void {
384 this.stopCheckActive()
385 if (isAsyncFunction(this.opts.killHandler)) {
386 (this.opts.killHandler?.() as Promise<void>)
387 .then(() => {
388 this.sendToMainWorker({ kill: 'success' })
389 return null
390 })
391 .catch(() => {
392 this.sendToMainWorker({ kill: 'failure' })
393 })
394 .finally(() => {
395 this.emitDestroy()
396 })
397 .catch(EMPTY_FUNCTION)
398 } else {
399 try {
400 // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
401 this.opts.killHandler?.() as void
402 this.sendToMainWorker({ kill: 'success' })
403 } catch {
404 this.sendToMainWorker({ kill: 'failure' })
405 } finally {
406 this.emitDestroy()
407 }
408 }
409 }
410
411 /**
412 * Check if the message worker id is set and matches the worker id.
413 *
414 * @param message - The message to check.
415 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
416 */
417 private checkMessageWorkerId (message: MessageValue<Data>): void {
418 if (message.workerId == null) {
419 throw new Error('Message worker id is not set')
420 } else if (message.workerId != null && message.workerId !== this.id) {
421 throw new Error(
422 `Message worker id ${message.workerId} does not match the worker id ${this.id}`
423 )
424 }
425 }
426
427 /**
428 * Starts the worker check active interval.
429 */
430 private startCheckActive (): void {
431 this.lastTaskTimestamp = performance.now()
432 this.activeInterval = setInterval(
433 this.checkActive.bind(this),
434 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
435 )
436 }
437
438 /**
439 * Stops the worker check active interval.
440 */
441 private stopCheckActive (): void {
442 if (this.activeInterval != null) {
443 clearInterval(this.activeInterval)
444 delete this.activeInterval
445 }
446 }
447
448 /**
449 * Checks if the worker should be terminated, because its living too long.
450 */
451 private checkActive (): void {
452 if (
453 performance.now() - this.lastTaskTimestamp >
454 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
455 ) {
456 this.sendToMainWorker({ kill: this.opts.killBehavior })
457 }
458 }
459
460 /**
461 * Returns the main worker.
462 *
463 * @returns Reference to the main worker.
464 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
465 */
466 protected getMainWorker (): MainWorker {
467 if (this.mainWorker == null) {
468 throw new Error('Main worker not set')
469 }
470 return this.mainWorker
471 }
472
473 /**
474 * Sends a message to main worker.
475 *
476 * @param message - The response message.
477 */
478 protected abstract sendToMainWorker (
479 message: MessageValue<Response, Data>
480 ): void
481
482 /**
483 * Sends task function names to the main worker.
484 */
485 protected sendTaskFunctionNamesToMainWorker (): void {
486 this.sendToMainWorker({
487 taskFunctionNames: this.listTaskFunctionNames()
488 })
489 }
490
491 /**
492 * Handles an error and convert it to a string so it can be sent back to the main worker.
493 *
494 * @param error - The error raised by the worker.
495 * @returns The error message.
496 */
497 protected handleError (error: Error | string): string {
498 return error instanceof Error ? error.message : error
499 }
500
501 /**
502 * Runs the given task.
503 *
504 * @param task - The task to execute.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
506 */
507 protected run (task: Task<Data>): void {
508 const { name, taskId, data } = task
509 const fn = this.taskFunctions.get(name ?? DEFAULT_TASK_NAME)
510 if (fn == null) {
511 this.sendToMainWorker({
512 workerError: {
513 name: name as string,
514 message: `Task function '${name as string}' not found`,
515 data
516 },
517 taskId
518 })
519 return
520 }
521 if (isAsyncFunction(fn)) {
522 this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
523 } else {
524 this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
525 }
526 }
527
528 /**
529 * Runs the given task function synchronously.
530 *
531 * @param fn - Task function that will be executed.
532 * @param task - Input data for the task function.
533 */
534 protected runSync (
535 fn: TaskSyncFunction<Data, Response>,
536 task: Task<Data>
537 ): void {
538 const { name, taskId, data } = task
539 try {
540 let taskPerformance = this.beginTaskPerformance(name)
541 const res = fn(data)
542 taskPerformance = this.endTaskPerformance(taskPerformance)
543 this.sendToMainWorker({
544 data: res,
545 taskPerformance,
546 taskId
547 })
548 } catch (error) {
549 this.sendToMainWorker({
550 workerError: {
551 name: name as string,
552 message: this.handleError(error as Error | string),
553 data
554 },
555 taskId
556 })
557 } finally {
558 this.updateLastTaskTimestamp()
559 }
560 }
561
562 /**
563 * Runs the given task function asynchronously.
564 *
565 * @param fn - Task function that will be executed.
566 * @param task - Input data for the task function.
567 */
568 protected runAsync (
569 fn: TaskAsyncFunction<Data, Response>,
570 task: Task<Data>
571 ): void {
572 const { name, taskId, data } = task
573 let taskPerformance = this.beginTaskPerformance(name)
574 fn(data)
575 .then(res => {
576 taskPerformance = this.endTaskPerformance(taskPerformance)
577 this.sendToMainWorker({
578 data: res,
579 taskPerformance,
580 taskId
581 })
582 return null
583 })
584 .catch(error => {
585 this.sendToMainWorker({
586 workerError: {
587 name: name as string,
588 message: this.handleError(error as Error | string),
589 data
590 },
591 taskId
592 })
593 })
594 .finally(() => {
595 this.updateLastTaskTimestamp()
596 })
597 .catch(EMPTY_FUNCTION)
598 }
599
600 private beginTaskPerformance (name?: string): TaskPerformance {
601 this.checkStatistics()
602 return {
603 name: name ?? DEFAULT_TASK_NAME,
604 timestamp: performance.now(),
605 ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
606 }
607 }
608
609 private endTaskPerformance (
610 taskPerformance: TaskPerformance
611 ): TaskPerformance {
612 this.checkStatistics()
613 return {
614 ...taskPerformance,
615 ...(this.statistics.runTime && {
616 runTime: performance.now() - taskPerformance.timestamp
617 }),
618 ...(this.statistics.elu && {
619 elu: performance.eventLoopUtilization(taskPerformance.elu)
620 })
621 }
622 }
623
624 private checkStatistics (): void {
625 if (this.statistics == null) {
626 throw new Error('Performance statistics computation requirements not set')
627 }
628 }
629
630 private updateLastTaskTimestamp (): void {
631 if (this.activeInterval != null) {
632 this.lastTaskTimestamp = performance.now()
633 }
634 }
635 }