fix: fix worker options handling
[poolifier.git] / src / worker / abstract-worker.ts
1 import type { Worker } from 'node:cluster'
2 import type { MessagePort } from 'node:worker_threads'
3 import { performance } from 'node:perf_hooks'
4 import type {
5 MessageValue,
6 Task,
7 TaskPerformance,
8 WorkerStatistics
9 } from '../utility-types'
10 import {
11 DEFAULT_TASK_NAME,
12 EMPTY_FUNCTION,
13 isAsyncFunction,
14 isPlainObject
15 } from '../utils'
16 import { KillBehaviors, type WorkerOptions } from './worker-options'
17 import type {
18 TaskAsyncFunction,
19 TaskFunction,
20 TaskFunctionOperationResult,
21 TaskFunctions,
22 TaskSyncFunction
23 } from './task-functions'
24 import {
25 checkTaskFunctionName,
26 checkValidTaskFunctionEntry,
27 checkValidWorkerOptions
28 } from './utils'
29
30 const DEFAULT_MAX_INACTIVE_TIME = 60000
31 const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
32 /**
33 * The kill behavior option on this worker or its default value.
34 */
35 killBehavior: KillBehaviors.SOFT,
36 /**
37 * The maximum time to keep this worker active while idle.
38 * The pool automatically checks and terminates this worker when the time expires.
39 */
40 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
41 /**
42 * The function to call when the worker is killed.
43 */
44 killHandler: EMPTY_FUNCTION
45 }
46
47 /**
48 * Base class that implements some shared logic for all poolifier workers.
49 *
50 * @typeParam MainWorker - Type of main worker.
51 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
52 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
53 */
54 export abstract class AbstractWorker<
55 MainWorker extends Worker | MessagePort,
56 Data = unknown,
57 Response = unknown
58 > {
59 /**
60 * Worker id.
61 */
62 protected abstract id: number
63 /**
64 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
65 */
66 protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
67 /**
68 * Timestamp of the last task processed by this worker.
69 */
70 protected lastTaskTimestamp!: number
71 /**
72 * Performance statistics computation requirements.
73 */
74 protected statistics!: WorkerStatistics
75 /**
76 * Handler id of the `activeInterval` worker activity check.
77 */
78 protected activeInterval?: NodeJS.Timeout
79 /**
80 * Constructs a new poolifier worker.
81 *
82 * @param isMain - Whether this is the main worker or not.
83 * @param mainWorker - Reference to main worker.
84 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
85 * @param opts - Options for the worker.
86 */
87 public constructor (
88 protected readonly isMain: boolean,
89 private readonly mainWorker: MainWorker,
90 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
91 protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
92 ) {
93 if (this.isMain == null) {
94 throw new Error('isMain parameter is mandatory')
95 }
96 this.checkTaskFunctions(taskFunctions)
97 this.checkWorkerOptions(this.opts)
98 if (!this.isMain) {
99 // Should be once() but Node.js on windows has a bug that prevents it from working
100 this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
101 }
102 }
103
104 private checkWorkerOptions (opts: WorkerOptions): void {
105 checkValidWorkerOptions(opts)
106 this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
107 }
108
109 /**
110 * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
111 *
112 * @param taskFunctions - The task function(s) parameter that should be checked.
113 */
114 private checkTaskFunctions (
115 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>
116 ): void {
117 if (taskFunctions == null) {
118 throw new Error('taskFunctions parameter is mandatory')
119 }
120 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
121 if (typeof taskFunctions === 'function') {
122 const boundFn = taskFunctions.bind(this)
123 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
124 this.taskFunctions.set(
125 typeof taskFunctions.name === 'string' &&
126 taskFunctions.name.trim().length > 0
127 ? taskFunctions.name
128 : 'fn1',
129 boundFn
130 )
131 } else if (isPlainObject(taskFunctions)) {
132 let firstEntry = true
133 for (const [name, fn] of Object.entries(taskFunctions)) {
134 checkValidTaskFunctionEntry<Data, Response>(name, fn)
135 const boundFn = fn.bind(this)
136 if (firstEntry) {
137 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
138 firstEntry = false
139 }
140 this.taskFunctions.set(name, boundFn)
141 }
142 if (firstEntry) {
143 throw new Error('taskFunctions parameter object is empty')
144 }
145 } else {
146 throw new TypeError(
147 'taskFunctions parameter is not a function or a plain object'
148 )
149 }
150 }
151
152 /**
153 * Checks if the worker has a task function with the given name.
154 *
155 * @param name - The name of the task function to check.
156 * @returns Whether the worker has a task function with the given name or not.
157 */
158 public hasTaskFunction (name: string): TaskFunctionOperationResult {
159 try {
160 checkTaskFunctionName(name)
161 } catch (error) {
162 return { status: false, error: error as Error }
163 }
164 return { status: this.taskFunctions.has(name) }
165 }
166
167 /**
168 * Adds a task function to the worker.
169 * If a task function with the same name already exists, it is replaced.
170 *
171 * @param name - The name of the task function to add.
172 * @param fn - The task function to add.
173 * @returns Whether the task function was added or not.
174 */
175 public addTaskFunction (
176 name: string,
177 fn: TaskFunction<Data, Response>
178 ): TaskFunctionOperationResult {
179 try {
180 checkTaskFunctionName(name)
181 if (name === DEFAULT_TASK_NAME) {
182 throw new Error(
183 'Cannot add a task function with the default reserved name'
184 )
185 }
186 if (typeof fn !== 'function') {
187 throw new TypeError('fn parameter is not a function')
188 }
189 const boundFn = fn.bind(this)
190 if (
191 this.taskFunctions.get(name) ===
192 this.taskFunctions.get(DEFAULT_TASK_NAME)
193 ) {
194 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
195 }
196 this.taskFunctions.set(name, boundFn)
197 this.sendTaskFunctionNamesToMainWorker()
198 return { status: true }
199 } catch (error) {
200 return { status: false, error: error as Error }
201 }
202 }
203
204 /**
205 * Removes a task function from the worker.
206 *
207 * @param name - The name of the task function to remove.
208 * @returns Whether the task function existed and was removed or not.
209 */
210 public removeTaskFunction (name: string): TaskFunctionOperationResult {
211 try {
212 checkTaskFunctionName(name)
213 if (name === DEFAULT_TASK_NAME) {
214 throw new Error(
215 'Cannot remove the task function with the default reserved name'
216 )
217 }
218 if (
219 this.taskFunctions.get(name) ===
220 this.taskFunctions.get(DEFAULT_TASK_NAME)
221 ) {
222 throw new Error(
223 'Cannot remove the task function used as the default task function'
224 )
225 }
226 const deleteStatus = this.taskFunctions.delete(name)
227 this.sendTaskFunctionNamesToMainWorker()
228 return { status: deleteStatus }
229 } catch (error) {
230 return { status: false, error: error as Error }
231 }
232 }
233
234 /**
235 * Lists the names of the worker's task functions.
236 *
237 * @returns The names of the worker's task functions.
238 */
239 public listTaskFunctionNames (): string[] {
240 const names: string[] = [...this.taskFunctions.keys()]
241 let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
242 for (const [name, fn] of this.taskFunctions) {
243 if (
244 name !== DEFAULT_TASK_NAME &&
245 fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
246 ) {
247 defaultTaskFunctionName = name
248 break
249 }
250 }
251 return [
252 names[names.indexOf(DEFAULT_TASK_NAME)],
253 defaultTaskFunctionName,
254 ...names.filter(
255 name => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
256 )
257 ]
258 }
259
260 /**
261 * Sets the default task function to use in the worker.
262 *
263 * @param name - The name of the task function to use as default task function.
264 * @returns Whether the default task function was set or not.
265 */
266 public setDefaultTaskFunction (name: string): TaskFunctionOperationResult {
267 try {
268 checkTaskFunctionName(name)
269 if (name === DEFAULT_TASK_NAME) {
270 throw new Error(
271 'Cannot set the default task function reserved name as the default task function'
272 )
273 }
274 if (!this.taskFunctions.has(name)) {
275 throw new Error(
276 'Cannot set the default task function to a non-existing task function'
277 )
278 }
279 this.taskFunctions.set(
280 DEFAULT_TASK_NAME,
281 this.taskFunctions.get(name) as TaskFunction<Data, Response>
282 )
283 this.sendTaskFunctionNamesToMainWorker()
284 return { status: true }
285 } catch (error) {
286 return { status: false, error: error as Error }
287 }
288 }
289
290 /**
291 * Handles the ready message sent by the main worker.
292 *
293 * @param message - The ready message.
294 */
295 protected abstract handleReadyMessage (message: MessageValue<Data>): void
296
297 /**
298 * Worker message listener.
299 *
300 * @param message - The received message.
301 */
302 protected messageListener (message: MessageValue<Data>): void {
303 this.checkMessageWorkerId(message)
304 if (message.statistics != null) {
305 // Statistics message received
306 this.statistics = message.statistics
307 } else if (message.checkActive != null) {
308 // Check active message received
309 message.checkActive ? this.startCheckActive() : this.stopCheckActive()
310 } else if (message.taskFunctionOperation != null) {
311 // Task function operation message received
312 this.handleTaskFunctionOperationMessage(message)
313 } else if (message.taskId != null && message.data != null) {
314 // Task message received
315 this.run(message)
316 } else if (message.kill === true) {
317 // Kill message received
318 this.handleKillMessage(message)
319 }
320 }
321
322 protected handleTaskFunctionOperationMessage (
323 message: MessageValue<Data>
324 ): void {
325 const { taskFunctionOperation, taskFunctionName, taskFunction } = message
326 let response!: TaskFunctionOperationResult
327 switch (taskFunctionOperation) {
328 case 'add':
329 response = this.addTaskFunction(
330 taskFunctionName as string,
331 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func
332 new Function(`return ${taskFunction as string}`)() as TaskFunction<
333 Data,
334 Response
335 >
336 )
337 break
338 case 'remove':
339 response = this.removeTaskFunction(taskFunctionName as string)
340 break
341 case 'default':
342 response = this.setDefaultTaskFunction(taskFunctionName as string)
343 break
344 default:
345 response = { status: false, error: new Error('Unknown task operation') }
346 break
347 }
348 this.sendToMainWorker({
349 taskFunctionOperation,
350 taskFunctionOperationStatus: response.status,
351 taskFunctionName,
352 ...(!response.status &&
353 response?.error != null && {
354 workerError: {
355 name: taskFunctionName as string,
356 message: this.handleError(response.error as Error | string)
357 }
358 })
359 })
360 }
361
362 /**
363 * Handles a kill message sent by the main worker.
364 *
365 * @param message - The kill message.
366 */
367 protected handleKillMessage (_message: MessageValue<Data>): void {
368 this.stopCheckActive()
369 if (isAsyncFunction(this.opts.killHandler)) {
370 (this.opts.killHandler?.() as Promise<void>)
371 .then(() => {
372 this.sendToMainWorker({ kill: 'success' })
373 return undefined
374 })
375 .catch(() => {
376 this.sendToMainWorker({ kill: 'failure' })
377 })
378 } else {
379 try {
380 // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
381 this.opts.killHandler?.() as void
382 this.sendToMainWorker({ kill: 'success' })
383 } catch {
384 this.sendToMainWorker({ kill: 'failure' })
385 }
386 }
387 }
388
389 /**
390 * Check if the message worker id is set and matches the worker id.
391 *
392 * @param message - The message to check.
393 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
394 */
395 private checkMessageWorkerId (message: MessageValue<Data>): void {
396 if (message.workerId == null) {
397 throw new Error('Message worker id is not set')
398 } else if (message.workerId !== this.id) {
399 throw new Error(
400 `Message worker id ${message.workerId} does not match the worker id ${this.id}`
401 )
402 }
403 }
404
405 /**
406 * Starts the worker check active interval.
407 */
408 private startCheckActive (): void {
409 this.lastTaskTimestamp = performance.now()
410 this.activeInterval = setInterval(
411 this.checkActive.bind(this),
412 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
413 )
414 }
415
416 /**
417 * Stops the worker check active interval.
418 */
419 private stopCheckActive (): void {
420 if (this.activeInterval != null) {
421 clearInterval(this.activeInterval)
422 delete this.activeInterval
423 }
424 }
425
426 /**
427 * Checks if the worker should be terminated, because its living too long.
428 */
429 private checkActive (): void {
430 if (
431 performance.now() - this.lastTaskTimestamp >
432 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
433 ) {
434 this.sendToMainWorker({ kill: this.opts.killBehavior })
435 }
436 }
437
438 /**
439 * Returns the main worker.
440 *
441 * @returns Reference to the main worker.
442 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
443 */
444 protected getMainWorker (): MainWorker {
445 if (this.mainWorker == null) {
446 throw new Error('Main worker not set')
447 }
448 return this.mainWorker
449 }
450
451 /**
452 * Sends a message to main worker.
453 *
454 * @param message - The response message.
455 */
456 protected abstract sendToMainWorker (
457 message: MessageValue<Response, Data>
458 ): void
459
460 /**
461 * Sends task function names to the main worker.
462 */
463 protected sendTaskFunctionNamesToMainWorker (): void {
464 this.sendToMainWorker({
465 taskFunctionNames: this.listTaskFunctionNames()
466 })
467 }
468
469 /**
470 * Handles an error and convert it to a string so it can be sent back to the main worker.
471 *
472 * @param error - The error raised by the worker.
473 * @returns The error message.
474 */
475 protected handleError (error: Error | string): string {
476 return error instanceof Error ? error.message : error
477 }
478
479 /**
480 * Runs the given task.
481 *
482 * @param task - The task to execute.
483 */
484 protected run (task: Task<Data>): void {
485 const { name, taskId, data } = task
486 const taskFunctionName = name ?? DEFAULT_TASK_NAME
487 if (!this.taskFunctions.has(taskFunctionName)) {
488 this.sendToMainWorker({
489 workerError: {
490 name: name as string,
491 message: `Task function '${name as string}' not found`,
492 data
493 },
494 taskId
495 })
496 return
497 }
498 const fn = this.taskFunctions.get(taskFunctionName)
499 if (isAsyncFunction(fn)) {
500 this.runAsync(fn as TaskAsyncFunction<Data, Response>, task)
501 } else {
502 this.runSync(fn as TaskSyncFunction<Data, Response>, task)
503 }
504 }
505
506 /**
507 * Runs the given task function synchronously.
508 *
509 * @param fn - Task function that will be executed.
510 * @param task - Input data for the task function.
511 */
512 protected runSync (
513 fn: TaskSyncFunction<Data, Response>,
514 task: Task<Data>
515 ): void {
516 const { name, taskId, data } = task
517 try {
518 let taskPerformance = this.beginTaskPerformance(name)
519 const res = fn(data)
520 taskPerformance = this.endTaskPerformance(taskPerformance)
521 this.sendToMainWorker({
522 data: res,
523 taskPerformance,
524 taskId
525 })
526 } catch (error) {
527 this.sendToMainWorker({
528 workerError: {
529 name: name as string,
530 message: this.handleError(error as Error | string),
531 data
532 },
533 taskId
534 })
535 } finally {
536 this.updateLastTaskTimestamp()
537 }
538 }
539
540 /**
541 * Runs the given task function asynchronously.
542 *
543 * @param fn - Task function that will be executed.
544 * @param task - Input data for the task function.
545 */
546 protected runAsync (
547 fn: TaskAsyncFunction<Data, Response>,
548 task: Task<Data>
549 ): void {
550 const { name, taskId, data } = task
551 let taskPerformance = this.beginTaskPerformance(name)
552 fn(data)
553 .then(res => {
554 taskPerformance = this.endTaskPerformance(taskPerformance)
555 this.sendToMainWorker({
556 data: res,
557 taskPerformance,
558 taskId
559 })
560 return undefined
561 })
562 .catch(error => {
563 this.sendToMainWorker({
564 workerError: {
565 name: name as string,
566 message: this.handleError(error as Error | string),
567 data
568 },
569 taskId
570 })
571 })
572 .finally(() => {
573 this.updateLastTaskTimestamp()
574 })
575 .catch(EMPTY_FUNCTION)
576 }
577
578 private beginTaskPerformance (name?: string): TaskPerformance {
579 this.checkStatistics()
580 return {
581 name: name ?? DEFAULT_TASK_NAME,
582 timestamp: performance.now(),
583 ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
584 }
585 }
586
587 private endTaskPerformance (
588 taskPerformance: TaskPerformance
589 ): TaskPerformance {
590 this.checkStatistics()
591 return {
592 ...taskPerformance,
593 ...(this.statistics.runTime && {
594 runTime: performance.now() - taskPerformance.timestamp
595 }),
596 ...(this.statistics.elu && {
597 elu: performance.eventLoopUtilization(taskPerformance.elu)
598 })
599 }
600 }
601
602 private checkStatistics (): void {
603 if (this.statistics == null) {
604 throw new Error('Performance statistics computation requirements not set')
605 }
606 }
607
608 private updateLastTaskTimestamp (): void {
609 if (this.activeInterval != null) {
610 this.lastTaskTimestamp = performance.now()
611 }
612 }
613 }