refactor: cleanup task function default name handling
[poolifier.git] / src / worker / abstract-worker.ts
1 import { AsyncResource } from 'node:async_hooks'
2 import type { Worker } from 'node:cluster'
3 import type { MessagePort } from 'node:worker_threads'
4 import { performance } from 'node:perf_hooks'
5 import type {
6 MessageValue,
7 Task,
8 TaskPerformance,
9 WorkerStatistics
10 } from '../utility-types'
11 import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 isAsyncFunction,
15 isPlainObject
16 } from '../utils'
17 import { KillBehaviors, type WorkerOptions } from './worker-options'
18 import type {
19 TaskAsyncFunction,
20 TaskFunction,
21 TaskFunctionOperationResult,
22 TaskFunctions,
23 TaskSyncFunction
24 } from './task-functions'
25 import {
26 checkTaskFunctionName,
27 checkValidTaskFunctionEntry,
28 checkValidWorkerOptions
29 } from './utils'
30
31 const DEFAULT_MAX_INACTIVE_TIME = 60000
32 const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
33 /**
34 * The kill behavior option on this worker or its default value.
35 */
36 killBehavior: KillBehaviors.SOFT,
37 /**
38 * The maximum time to keep this worker active while idle.
39 * The pool automatically checks and terminates this worker when the time expires.
40 */
41 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
42 /**
43 * The function to call when the worker is killed.
44 */
45 killHandler: EMPTY_FUNCTION
46 }
47
48 /**
49 * Base class that implements some shared logic for all poolifier workers.
50 *
51 * @typeParam MainWorker - Type of main worker.
52 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
53 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
54 */
55 export abstract class AbstractWorker<
56 MainWorker extends Worker | MessagePort,
57 Data = unknown,
58 Response = unknown
59 > extends AsyncResource {
60 /**
61 * Worker id.
62 */
63 protected abstract id: number
64 /**
65 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
66 */
67 protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
68 /**
69 * Timestamp of the last task processed by this worker.
70 */
71 protected lastTaskTimestamp!: number
72 /**
73 * Performance statistics computation requirements.
74 */
75 protected statistics!: WorkerStatistics
76 /**
77 * Handler id of the `activeInterval` worker activity check.
78 */
79 protected activeInterval?: NodeJS.Timeout
80 /**
81 * Constructs a new poolifier worker.
82 *
83 * @param type - The type of async event.
84 * @param isMain - Whether this is the main worker or not.
85 * @param mainWorker - Reference to main worker.
86 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
87 * @param opts - Options for the worker.
88 */
89 public constructor (
90 type: string,
91 protected readonly isMain: boolean,
92 private readonly mainWorker: MainWorker,
93 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
94 protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
95 ) {
96 super(type)
97 if (this.isMain == null) {
98 throw new Error('isMain parameter is mandatory')
99 }
100 this.checkTaskFunctions(taskFunctions)
101 this.checkWorkerOptions(this.opts)
102 if (!this.isMain) {
103 // Should be once() but Node.js on windows has a bug that prevents it from working
104 this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
105 }
106 }
107
108 private checkWorkerOptions (opts: WorkerOptions): void {
109 checkValidWorkerOptions(opts)
110 this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
111 }
112
113 /**
114 * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
115 *
116 * @param taskFunctions - The task function(s) parameter that should be checked.
117 */
118 private checkTaskFunctions (
119 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>
120 ): void {
121 if (taskFunctions == null) {
122 throw new Error('taskFunctions parameter is mandatory')
123 }
124 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
125 if (typeof taskFunctions === 'function') {
126 const boundFn = taskFunctions.bind(this)
127 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
128 this.taskFunctions.set(
129 typeof taskFunctions.name === 'string' &&
130 taskFunctions.name.trim().length > 0
131 ? taskFunctions.name
132 : 'fn1',
133 boundFn
134 )
135 } else if (isPlainObject(taskFunctions)) {
136 let firstEntry = true
137 for (const [name, fn] of Object.entries(taskFunctions)) {
138 checkValidTaskFunctionEntry<Data, Response>(name, fn)
139 const boundFn = fn.bind(this)
140 if (firstEntry) {
141 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
142 firstEntry = false
143 }
144 this.taskFunctions.set(name, boundFn)
145 }
146 if (firstEntry) {
147 throw new Error('taskFunctions parameter object is empty')
148 }
149 } else {
150 throw new TypeError(
151 'taskFunctions parameter is not a function or a plain object'
152 )
153 }
154 }
155
156 /**
157 * Checks if the worker has a task function with the given name.
158 *
159 * @param name - The name of the task function to check.
160 * @returns Whether the worker has a task function with the given name or not.
161 */
162 public hasTaskFunction (name: string): TaskFunctionOperationResult {
163 try {
164 checkTaskFunctionName(name)
165 } catch (error) {
166 return { status: false, error: error as Error }
167 }
168 return { status: this.taskFunctions.has(name) }
169 }
170
171 /**
172 * Adds a task function to the worker.
173 * If a task function with the same name already exists, it is replaced.
174 *
175 * @param name - The name of the task function to add.
176 * @param fn - The task function to add.
177 * @returns Whether the task function was added or not.
178 */
179 public addTaskFunction (
180 name: string,
181 fn: TaskFunction<Data, Response>
182 ): TaskFunctionOperationResult {
183 try {
184 checkTaskFunctionName(name)
185 if (name === DEFAULT_TASK_NAME) {
186 throw new Error(
187 'Cannot add a task function with the default reserved name'
188 )
189 }
190 if (typeof fn !== 'function') {
191 throw new TypeError('fn parameter is not a function')
192 }
193 const boundFn = fn.bind(this)
194 if (
195 this.taskFunctions.get(name) ===
196 this.taskFunctions.get(DEFAULT_TASK_NAME)
197 ) {
198 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
199 }
200 this.taskFunctions.set(name, boundFn)
201 this.sendTaskFunctionNamesToMainWorker()
202 return { status: true }
203 } catch (error) {
204 return { status: false, error: error as Error }
205 }
206 }
207
208 /**
209 * Removes a task function from the worker.
210 *
211 * @param name - The name of the task function to remove.
212 * @returns Whether the task function existed and was removed or not.
213 */
214 public removeTaskFunction (name: string): TaskFunctionOperationResult {
215 try {
216 checkTaskFunctionName(name)
217 if (name === DEFAULT_TASK_NAME) {
218 throw new Error(
219 'Cannot remove the task function with the default reserved name'
220 )
221 }
222 if (
223 this.taskFunctions.get(name) ===
224 this.taskFunctions.get(DEFAULT_TASK_NAME)
225 ) {
226 throw new Error(
227 'Cannot remove the task function used as the default task function'
228 )
229 }
230 const deleteStatus = this.taskFunctions.delete(name)
231 this.sendTaskFunctionNamesToMainWorker()
232 return { status: deleteStatus }
233 } catch (error) {
234 return { status: false, error: error as Error }
235 }
236 }
237
238 /**
239 * Lists the names of the worker's task functions.
240 *
241 * @returns The names of the worker's task functions.
242 */
243 public listTaskFunctionNames (): string[] {
244 const names: string[] = [...this.taskFunctions.keys()]
245 let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
246 for (const [name, fn] of this.taskFunctions) {
247 if (
248 name !== DEFAULT_TASK_NAME &&
249 fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
250 ) {
251 defaultTaskFunctionName = name
252 break
253 }
254 }
255 return [
256 names[names.indexOf(DEFAULT_TASK_NAME)],
257 defaultTaskFunctionName,
258 ...names.filter(
259 name => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
260 )
261 ]
262 }
263
264 /**
265 * Sets the default task function to use in the worker.
266 *
267 * @param name - The name of the task function to use as default task function.
268 * @returns Whether the default task function was set or not.
269 */
270 public setDefaultTaskFunction (name: string): TaskFunctionOperationResult {
271 try {
272 checkTaskFunctionName(name)
273 if (name === DEFAULT_TASK_NAME) {
274 throw new Error(
275 'Cannot set the default task function reserved name as the default task function'
276 )
277 }
278 if (!this.taskFunctions.has(name)) {
279 throw new Error(
280 'Cannot set the default task function to a non-existing task function'
281 )
282 }
283 this.taskFunctions.set(
284 DEFAULT_TASK_NAME,
285 this.taskFunctions.get(name) as TaskFunction<Data, Response>
286 )
287 this.sendTaskFunctionNamesToMainWorker()
288 return { status: true }
289 } catch (error) {
290 return { status: false, error: error as Error }
291 }
292 }
293
294 /**
295 * Handles the ready message sent by the main worker.
296 *
297 * @param message - The ready message.
298 */
299 protected abstract handleReadyMessage (message: MessageValue<Data>): void
300
301 /**
302 * Worker message listener.
303 *
304 * @param message - The received message.
305 */
306 protected messageListener (message: MessageValue<Data>): void {
307 this.checkMessageWorkerId(message)
308 if (message.statistics != null) {
309 // Statistics message received
310 this.statistics = message.statistics
311 } else if (message.checkActive != null) {
312 // Check active message received
313 message.checkActive ? this.startCheckActive() : this.stopCheckActive()
314 } else if (message.taskFunctionOperation != null) {
315 // Task function operation message received
316 this.handleTaskFunctionOperationMessage(message)
317 } else if (message.taskId != null && message.data != null) {
318 // Task message received
319 this.run(message)
320 } else if (message.kill === true) {
321 // Kill message received
322 this.handleKillMessage(message)
323 }
324 }
325
326 protected handleTaskFunctionOperationMessage (
327 message: MessageValue<Data>
328 ): void {
329 const { taskFunctionOperation, taskFunctionName, taskFunction } = message
330 let response!: TaskFunctionOperationResult
331 switch (taskFunctionOperation) {
332 case 'add':
333 response = this.addTaskFunction(
334 taskFunctionName as string,
335 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
336 new Function(`return ${taskFunction as string}`)() as TaskFunction<
337 Data,
338 Response
339 >
340 )
341 break
342 case 'remove':
343 response = this.removeTaskFunction(taskFunctionName as string)
344 break
345 case 'default':
346 response = this.setDefaultTaskFunction(taskFunctionName as string)
347 break
348 default:
349 response = { status: false, error: new Error('Unknown task operation') }
350 break
351 }
352 this.sendToMainWorker({
353 taskFunctionOperation,
354 taskFunctionOperationStatus: response.status,
355 taskFunctionName,
356 ...(!response.status &&
357 response?.error != null && {
358 workerError: {
359 name: taskFunctionName as string,
360 message: this.handleError(response.error as Error | string)
361 }
362 })
363 })
364 }
365
366 /**
367 * Handles a kill message sent by the main worker.
368 *
369 * @param message - The kill message.
370 */
371 protected handleKillMessage (_message: MessageValue<Data>): void {
372 this.stopCheckActive()
373 if (isAsyncFunction(this.opts.killHandler)) {
374 (this.opts.killHandler?.() as Promise<void>)
375 .then(() => {
376 this.sendToMainWorker({ kill: 'success' })
377 return undefined
378 })
379 .catch(() => {
380 this.sendToMainWorker({ kill: 'failure' })
381 })
382 .finally(() => {
383 this.emitDestroy()
384 })
385 .catch(EMPTY_FUNCTION)
386 } else {
387 try {
388 // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
389 this.opts.killHandler?.() as void
390 this.sendToMainWorker({ kill: 'success' })
391 } catch {
392 this.sendToMainWorker({ kill: 'failure' })
393 } finally {
394 this.emitDestroy()
395 }
396 }
397 }
398
399 /**
400 * Check if the message worker id is set and matches the worker id.
401 *
402 * @param message - The message to check.
403 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
404 */
405 private checkMessageWorkerId (message: MessageValue<Data>): void {
406 if (message.workerId == null) {
407 throw new Error('Message worker id is not set')
408 } else if (message.workerId !== this.id) {
409 throw new Error(
410 `Message worker id ${message.workerId} does not match the worker id ${this.id}`
411 )
412 }
413 }
414
415 /**
416 * Starts the worker check active interval.
417 */
418 private startCheckActive (): void {
419 this.lastTaskTimestamp = performance.now()
420 this.activeInterval = setInterval(
421 this.checkActive.bind(this),
422 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
423 )
424 }
425
426 /**
427 * Stops the worker check active interval.
428 */
429 private stopCheckActive (): void {
430 if (this.activeInterval != null) {
431 clearInterval(this.activeInterval)
432 delete this.activeInterval
433 }
434 }
435
436 /**
437 * Checks if the worker should be terminated, because its living too long.
438 */
439 private checkActive (): void {
440 if (
441 performance.now() - this.lastTaskTimestamp >
442 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
443 ) {
444 this.sendToMainWorker({ kill: this.opts.killBehavior })
445 }
446 }
447
448 /**
449 * Returns the main worker.
450 *
451 * @returns Reference to the main worker.
452 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
453 */
454 protected getMainWorker (): MainWorker {
455 if (this.mainWorker == null) {
456 throw new Error('Main worker not set')
457 }
458 return this.mainWorker
459 }
460
461 /**
462 * Sends a message to main worker.
463 *
464 * @param message - The response message.
465 */
466 protected abstract sendToMainWorker (
467 message: MessageValue<Response, Data>
468 ): void
469
470 /**
471 * Sends task function names to the main worker.
472 */
473 protected sendTaskFunctionNamesToMainWorker (): void {
474 this.sendToMainWorker({
475 taskFunctionNames: this.listTaskFunctionNames()
476 })
477 }
478
479 /**
480 * Handles an error and convert it to a string so it can be sent back to the main worker.
481 *
482 * @param error - The error raised by the worker.
483 * @returns The error message.
484 */
485 protected handleError (error: Error | string): string {
486 return error instanceof Error ? error.message : error
487 }
488
489 /**
490 * Runs the given task.
491 *
492 * @param task - The task to execute.
493 */
494 protected run (task: Task<Data>): void {
495 const { name, taskId, data } = task
496 const taskFunctionName = name ?? DEFAULT_TASK_NAME
497 if (!this.taskFunctions.has(taskFunctionName)) {
498 this.sendToMainWorker({
499 workerError: {
500 name: name as string,
501 message: `Task function '${name as string}' not found`,
502 data
503 },
504 taskId
505 })
506 return
507 }
508 const fn = this.taskFunctions.get(taskFunctionName)
509 if (isAsyncFunction(fn)) {
510 this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
511 } else {
512 this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
513 }
514 }
515
516 /**
517 * Runs the given task function synchronously.
518 *
519 * @param fn - Task function that will be executed.
520 * @param task - Input data for the task function.
521 */
522 protected runSync (
523 fn: TaskSyncFunction<Data, Response>,
524 task: Task<Data>
525 ): void {
526 const { name, taskId, data } = task
527 try {
528 let taskPerformance = this.beginTaskPerformance(name)
529 const res = fn(data)
530 taskPerformance = this.endTaskPerformance(taskPerformance)
531 this.sendToMainWorker({
532 data: res,
533 taskPerformance,
534 taskId
535 })
536 } catch (error) {
537 this.sendToMainWorker({
538 workerError: {
539 name: name as string,
540 message: this.handleError(error as Error | string),
541 data
542 },
543 taskId
544 })
545 } finally {
546 this.updateLastTaskTimestamp()
547 }
548 }
549
550 /**
551 * Runs the given task function asynchronously.
552 *
553 * @param fn - Task function that will be executed.
554 * @param task - Input data for the task function.
555 */
556 protected runAsync (
557 fn: TaskAsyncFunction<Data, Response>,
558 task: Task<Data>
559 ): void {
560 const { name, taskId, data } = task
561 let taskPerformance = this.beginTaskPerformance(name)
562 fn(data)
563 .then(res => {
564 taskPerformance = this.endTaskPerformance(taskPerformance)
565 this.sendToMainWorker({
566 data: res,
567 taskPerformance,
568 taskId
569 })
570 return undefined
571 })
572 .catch(error => {
573 this.sendToMainWorker({
574 workerError: {
575 name: name as string,
576 message: this.handleError(error as Error | string),
577 data
578 },
579 taskId
580 })
581 })
582 .finally(() => {
583 this.updateLastTaskTimestamp()
584 })
585 .catch(EMPTY_FUNCTION)
586 }
587
588 private beginTaskPerformance (name?: string): TaskPerformance {
589 this.checkStatistics()
590 return {
591 name: name ?? DEFAULT_TASK_NAME,
592 timestamp: performance.now(),
593 ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
594 }
595 }
596
597 private endTaskPerformance (
598 taskPerformance: TaskPerformance
599 ): TaskPerformance {
600 this.checkStatistics()
601 return {
602 ...taskPerformance,
603 ...(this.statistics.runTime && {
604 runTime: performance.now() - taskPerformance.timestamp
605 }),
606 ...(this.statistics.elu && {
607 elu: performance.eventLoopUtilization(taskPerformance.elu)
608 })
609 }
610 }
611
612 private checkStatistics (): void {
613 if (this.statistics == null) {
614 throw new Error('Performance statistics computation requirements not set')
615 }
616 }
617
618 private updateLastTaskTimestamp (): void {
619 if (this.activeInterval != null) {
620 this.lastTaskTimestamp = performance.now()
621 }
622 }
623 }