refactor: factor out worker public API input sanity checks
[poolifier.git] / src / worker / abstract-worker.ts
CommitLineData
fc3e6586 1import { AsyncResource } from 'node:async_hooks'
6677a3d3 2import type { Worker } from 'node:cluster'
fc3e6586 3import type { MessagePort } from 'node:worker_threads'
d715b7bc
JB
4import { performance } from 'node:perf_hooks'
5import type {
6 MessageValue,
5c4d16da 7 Task,
d715b7bc
JB
8 TaskPerformance,
9 WorkerStatistics
10} from '../utility-types'
ff128cc9
JB
11import {
12 DEFAULT_TASK_NAME,
13 EMPTY_FUNCTION,
14 isAsyncFunction,
15 isPlainObject
16} from '../utils'
d38d0e30 17import { KillBehaviors, type WorkerOptions } from './worker-options'
b6b32453 18import type {
82ea6492
JB
19 TaskAsyncFunction,
20 TaskFunction,
b6b32453 21 TaskFunctions,
82ea6492
JB
22 TaskSyncFunction
23} from './task-functions'
4c35177b 24
978aad6f 25const DEFAULT_MAX_INACTIVE_TIME = 60000
d38d0e30
JB
26const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
27 /**
28 * The kill behavior option on this worker or its default value.
29 */
30 killBehavior: KillBehaviors.SOFT,
31 /**
32 * The maximum time to keep this worker active while idle.
33 * The pool automatically checks and terminates this worker when the time expires.
34 */
35 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
36 /**
37 * The function to call when the worker is killed.
38 */
39 killHandler: EMPTY_FUNCTION
40}
c97c7edb 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier workers.
729c563d 44 *
38e795c1 45 * @typeParam MainWorker - Type of main worker.
e102732c
JB
46 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
47 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractWorker<
6677a3d3 50 MainWorker extends Worker | MessagePort,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c97c7edb 53> extends AsyncResource {
f59e1027 54 /**
83fa0a36 55 * Worker id.
f59e1027
JB
56 */
57 protected abstract id: number
a86b6df1
JB
58 /**
59 * Task function(s) processed by the worker when the pool's `execution` function is invoked.
60 */
82ea6492 61 protected taskFunctions!: Map<string, TaskFunction<Data, Response>>
729c563d
S
62 /**
63 * Timestamp of the last task processed by this worker.
64 */
a9d9ea34 65 protected lastTaskTimestamp!: number
b6b32453 66 /**
8a970421 67 * Performance statistics computation requirements.
b6b32453
JB
68 */
69 protected statistics!: WorkerStatistics
729c563d 70 /**
b0a4db63 71 * Handler id of the `activeInterval` worker activity check.
729c563d 72 */
b0a4db63 73 protected activeInterval?: NodeJS.Timeout
c97c7edb 74 /**
729c563d 75 * Constructs a new poolifier worker.
c97c7edb 76 *
38e795c1
JB
77 * @param type - The type of async event.
78 * @param isMain - Whether this is the main worker or not.
38e795c1 79 * @param mainWorker - Reference to main worker.
85aeb3f3 80 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execution` function is invoked. The first function is the default function.
38e795c1 81 * @param opts - Options for the worker.
c97c7edb
S
82 */
83 public constructor (
84 type: string,
c2ade475 85 protected readonly isMain: boolean,
6c0c538c 86 private readonly mainWorker: MainWorker,
82ea6492 87 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
d38d0e30 88 protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
c97c7edb
S
89 ) {
90 super(type)
e088a00c 91 this.checkWorkerOptions(this.opts)
a86b6df1 92 this.checkTaskFunctions(taskFunctions)
1f68cede 93 if (!this.isMain) {
85aeb3f3 94 this.getMainWorker()?.on('message', this.handleReadyMessage.bind(this))
c97c7edb
S
95 }
96 }
97
41aa7dcd 98 private checkWorkerOptions (opts: WorkerOptions): void {
d38d0e30 99 this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
3c5dc3fb 100 delete this.opts.async
41aa7dcd
JB
101 }
102
103 /**
a86b6df1 104 * Checks if the `taskFunctions` parameter is passed to the constructor.
41aa7dcd 105 *
82888165 106 * @param taskFunctions - The task function(s) parameter that should be checked.
41aa7dcd 107 */
a86b6df1 108 private checkTaskFunctions (
82ea6492 109 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>
a86b6df1 110 ): void {
ec8fd331
JB
111 if (taskFunctions == null) {
112 throw new Error('taskFunctions parameter is mandatory')
113 }
82ea6492 114 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
0d80593b 115 if (typeof taskFunctions === 'function') {
2a69b8c5
JB
116 const boundFn = taskFunctions.bind(this)
117 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
118 this.taskFunctions.set(
119 typeof taskFunctions.name === 'string' &&
8ebe6c30 120 taskFunctions.name.trim().length > 0
2a69b8c5
JB
121 ? taskFunctions.name
122 : 'fn1',
123 boundFn
124 )
0d80593b 125 } else if (isPlainObject(taskFunctions)) {
82888165 126 let firstEntry = true
a86b6df1 127 for (const [name, fn] of Object.entries(taskFunctions)) {
42e2b8a6
JB
128 if (typeof name !== 'string') {
129 throw new TypeError(
130 'A taskFunctions parameter object key is not a string'
131 )
132 }
90d7d101
JB
133 if (typeof name === 'string' && name.trim().length === 0) {
134 throw new TypeError(
135 'A taskFunctions parameter object key an empty string'
136 )
137 }
a86b6df1 138 if (typeof fn !== 'function') {
0d80593b 139 throw new TypeError(
a86b6df1
JB
140 'A taskFunctions parameter object value is not a function'
141 )
142 }
2a69b8c5 143 const boundFn = fn.bind(this)
82888165 144 if (firstEntry) {
2a69b8c5 145 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
82888165
JB
146 firstEntry = false
147 }
c50b93fb 148 this.taskFunctions.set(name, boundFn)
a86b6df1 149 }
630f0acf
JB
150 if (firstEntry) {
151 throw new Error('taskFunctions parameter object is empty')
152 }
a86b6df1 153 } else {
f34fdabe
JB
154 throw new TypeError(
155 'taskFunctions parameter is not a function or a plain object'
156 )
41aa7dcd
JB
157 }
158 }
159
968a2e8c
JB
160 /**
161 * Checks if the worker has a task function with the given name.
162 *
163 * @param name - The name of the task function to check.
164 * @returns Whether the worker has a task function with the given name or not.
9e746eec 165 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
968a2e8c
JB
166 */
167 public hasTaskFunction (name: string): boolean {
d5e3c4ff 168 this.checkTaskFunctionName(name)
968a2e8c
JB
169 return this.taskFunctions.has(name)
170 }
171
172 /**
173 * Adds a task function to the worker.
174 * If a task function with the same name already exists, it is replaced.
175 *
176 * @param name - The name of the task function to add.
177 * @param fn - The task function to add.
178 * @returns Whether the task function was added or not.
9e746eec 179 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
13a332e6
JB
180 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
181 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `fn` parameter is not a function.
968a2e8c
JB
182 */
183 public addTaskFunction (
184 name: string,
82ea6492 185 fn: TaskFunction<Data, Response>
968a2e8c 186 ): boolean {
d5e3c4ff 187 this.checkTaskFunctionName(name)
968a2e8c
JB
188 if (name === DEFAULT_TASK_NAME) {
189 throw new Error(
190 'Cannot add a task function with the default reserved name'
191 )
192 }
193 if (typeof fn !== 'function') {
194 throw new TypeError('fn parameter is not a function')
195 }
196 try {
646d040a 197 const boundFn = fn.bind(this)
968a2e8c
JB
198 if (
199 this.taskFunctions.get(name) ===
200 this.taskFunctions.get(DEFAULT_TASK_NAME)
201 ) {
2a69b8c5 202 this.taskFunctions.set(DEFAULT_TASK_NAME, boundFn)
968a2e8c 203 }
2a69b8c5 204 this.taskFunctions.set(name, boundFn)
90d7d101 205 this.sendTaskFunctionsListToMainWorker()
968a2e8c
JB
206 return true
207 } catch {
208 return false
209 }
210 }
211
212 /**
213 * Removes a task function from the worker.
214 *
215 * @param name - The name of the task function to remove.
216 * @returns Whether the task function existed and was removed or not.
9e746eec 217 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
13a332e6
JB
218 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
219 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the task function used as default task function.
968a2e8c
JB
220 */
221 public removeTaskFunction (name: string): boolean {
d5e3c4ff 222 this.checkTaskFunctionName(name)
968a2e8c
JB
223 if (name === DEFAULT_TASK_NAME) {
224 throw new Error(
225 'Cannot remove the task function with the default reserved name'
226 )
227 }
228 if (
229 this.taskFunctions.get(name) === this.taskFunctions.get(DEFAULT_TASK_NAME)
230 ) {
231 throw new Error(
232 'Cannot remove the task function used as the default task function'
233 )
234 }
90d7d101
JB
235 const deleteStatus = this.taskFunctions.delete(name)
236 this.sendTaskFunctionsListToMainWorker()
237 return deleteStatus
968a2e8c
JB
238 }
239
240 /**
c50b93fb
JB
241 * Lists the names of the worker's task functions.
242 *
243 * @returns The names of the worker's task functions.
244 */
245 public listTaskFunctions (): string[] {
b558f6b5
JB
246 const names: string[] = [...this.taskFunctions.keys()]
247 let defaultTaskFunctionName: string = DEFAULT_TASK_NAME
248 for (const [name, fn] of this.taskFunctions) {
249 if (
250 name !== DEFAULT_TASK_NAME &&
251 fn === this.taskFunctions.get(DEFAULT_TASK_NAME)
252 ) {
253 defaultTaskFunctionName = name
254 break
255 }
256 }
257 return [
258 names[names.indexOf(DEFAULT_TASK_NAME)],
259 defaultTaskFunctionName,
260 ...names.filter(
261 (name) => name !== DEFAULT_TASK_NAME && name !== defaultTaskFunctionName
262 )
263 ]
c50b93fb
JB
264 }
265
266 /**
267 * Sets the default task function to use in the worker.
968a2e8c
JB
268 *
269 * @param name - The name of the task function to use as default task function.
270 * @returns Whether the default task function was set or not.
9e746eec 271 * @throws {@link https://nodejs.org/api/errors.html#class-typeerror} If the `name` parameter is not a string or an empty string.
13a332e6
JB
272 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is the default task function reserved name.
273 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the `name` parameter is a non-existing task function.
968a2e8c
JB
274 */
275 public setDefaultTaskFunction (name: string): boolean {
d5e3c4ff 276 this.checkTaskFunctionName(name)
968a2e8c
JB
277 if (name === DEFAULT_TASK_NAME) {
278 throw new Error(
279 'Cannot set the default task function reserved name as the default task function'
280 )
281 }
282 if (!this.taskFunctions.has(name)) {
283 throw new Error(
284 'Cannot set the default task function to a non-existing task function'
285 )
286 }
287 try {
288 this.taskFunctions.set(
289 DEFAULT_TASK_NAME,
82ea6492 290 this.taskFunctions.get(name) as TaskFunction<Data, Response>
968a2e8c
JB
291 )
292 return true
293 } catch {
294 return false
295 }
296 }
297
d5e3c4ff
JB
298 private checkTaskFunctionName (name: string): void {
299 if (typeof name !== 'string') {
300 throw new TypeError('name parameter is not a string')
301 }
302 if (typeof name === 'string' && name.trim().length === 0) {
303 throw new TypeError('name parameter is an empty string')
304 }
305 }
306
a038b517
JB
307 /**
308 * Handles the ready message sent by the main worker.
309 *
310 * @param message - The ready message.
311 */
312 protected abstract handleReadyMessage (message: MessageValue<Data>): void
313
aee46736
JB
314 /**
315 * Worker message listener.
316 *
6b813701 317 * @param message - The received message.
aee46736 318 */
85aeb3f3 319 protected messageListener (message: MessageValue<Data>): void {
9e746eec 320 this.checkMessageWorkerId(message)
310de0aa
JB
321 if (message.statistics != null) {
322 // Statistics message received
323 this.statistics = message.statistics
324 } else if (message.checkActive != null) {
325 // Check active message received
326 message.checkActive ? this.startCheckActive() : this.stopCheckActive()
327 } else if (message.taskId != null && message.data != null) {
328 // Task message received
329 this.run(message)
330 } else if (message.kill === true) {
331 // Kill message received
332 this.handleKillMessage(message)
cf597bc5
JB
333 }
334 }
335
984dc9c8
JB
336 /**
337 * Handles a kill message sent by the main worker.
338 *
339 * @param message - The kill message.
340 */
341 protected handleKillMessage (message: MessageValue<Data>): void {
29d8b961 342 this.stopCheckActive()
07588f30
JB
343 if (isAsyncFunction(this.opts.killHandler)) {
344 (this.opts.killHandler?.() as Promise<void>)
1e3214b6
JB
345 .then(() => {
346 this.sendToMainWorker({ kill: 'success', workerId: this.id })
347 return null
348 })
349 .catch(() => {
350 this.sendToMainWorker({ kill: 'failure', workerId: this.id })
351 })
352 .finally(() => {
353 this.emitDestroy()
354 })
07588f30
JB
355 .catch(EMPTY_FUNCTION)
356 } else {
1e3214b6
JB
357 try {
358 // eslint-disable-next-line @typescript-eslint/no-invalid-void-type
359 this.opts.killHandler?.() as void
360 this.sendToMainWorker({ kill: 'success', workerId: this.id })
7c8ac84e 361 } catch {
1e3214b6
JB
362 this.sendToMainWorker({ kill: 'failure', workerId: this.id })
363 } finally {
364 this.emitDestroy()
365 }
07588f30 366 }
984dc9c8
JB
367 }
368
9e746eec
JB
369 /**
370 * Check if the message worker id is set and matches the worker id.
371 *
372 * @param message - The message to check.
373 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
374 */
375 private checkMessageWorkerId (message: MessageValue<Data>): void {
376 if (message.workerId == null) {
377 throw new Error('Message worker id is not set')
378 } else if (message.workerId != null && message.workerId !== this.id) {
379 throw new Error(
380 `Message worker id ${message.workerId} does not match the worker id ${this.id}`
381 )
382 }
383 }
384
48487131 385 /**
b0a4db63 386 * Starts the worker check active interval.
48487131 387 */
b0a4db63 388 private startCheckActive (): void {
75d3401a 389 this.lastTaskTimestamp = performance.now()
b0a4db63
JB
390 this.activeInterval = setInterval(
391 this.checkActive.bind(this),
75d3401a 392 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
984dc9c8 393 )
75d3401a
JB
394 }
395
48487131 396 /**
b0a4db63 397 * Stops the worker check active interval.
48487131 398 */
b0a4db63 399 private stopCheckActive (): void {
c3f498b5
JB
400 if (this.activeInterval != null) {
401 clearInterval(this.activeInterval)
402 delete this.activeInterval
403 }
48487131
JB
404 }
405
406 /**
407 * Checks if the worker should be terminated, because its living too long.
408 */
b0a4db63 409 private checkActive (): void {
48487131
JB
410 if (
411 performance.now() - this.lastTaskTimestamp >
412 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
413 ) {
21f710aa 414 this.sendToMainWorker({ kill: this.opts.killBehavior, workerId: this.id })
48487131
JB
415 }
416 }
417
729c563d
S
418 /**
419 * Returns the main worker.
838898f1
S
420 *
421 * @returns Reference to the main worker.
729c563d 422 */
838898f1 423 protected getMainWorker (): MainWorker {
78cea37e 424 if (this.mainWorker == null) {
e102732c 425 throw new Error('Main worker not set')
838898f1
S
426 }
427 return this.mainWorker
428 }
c97c7edb 429
729c563d 430 /**
aa9eede8 431 * Sends a message to main worker.
729c563d 432 *
38e795c1 433 * @param message - The response message.
729c563d 434 */
82f36766
JB
435 protected abstract sendToMainWorker (
436 message: MessageValue<Response, Data>
437 ): void
c97c7edb 438
90d7d101
JB
439 /**
440 * Sends the list of task function names to the main worker.
441 */
442 protected sendTaskFunctionsListToMainWorker (): void {
443 this.sendToMainWorker({
444 taskFunctions: this.listTaskFunctions(),
445 workerId: this.id
446 })
447 }
448
729c563d 449 /**
8accb8d5 450 * Handles an error and convert it to a string so it can be sent back to the main worker.
729c563d 451 *
38e795c1 452 * @param e - The error raised by the worker.
ab80dc46 453 * @returns The error message.
729c563d 454 */
c97c7edb 455 protected handleError (e: Error | string): string {
985d0e79 456 return e instanceof Error ? e.message : e
c97c7edb
S
457 }
458
5c4d16da
JB
459 /**
460 * Runs the given task.
461 *
462 * @param task - The task to execute.
463 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
464 */
465 protected run (task: Task<Data>): void {
466 const fn = this.getTaskFunction(task.name)
467 if (isAsyncFunction(fn)) {
468 this.runInAsyncScope(this.runAsync.bind(this), this, fn, task)
469 } else {
470 this.runInAsyncScope(this.runSync.bind(this), this, fn, task)
471 }
472 }
473
729c563d 474 /**
4dd93fcf 475 * Runs the given task function synchronously.
729c563d 476 *
5c4d16da
JB
477 * @param fn - Task function that will be executed.
478 * @param task - Input data for the task function.
729c563d 479 */
70a4f5ea 480 protected runSync (
82ea6492 481 fn: TaskSyncFunction<Data, Response>,
5c4d16da 482 task: Task<Data>
c97c7edb 483 ): void {
310de0aa 484 const { name, taskId, data } = task
c97c7edb 485 try {
310de0aa
JB
486 let taskPerformance = this.beginTaskPerformance(name)
487 const res = fn(data)
d715b7bc 488 taskPerformance = this.endTaskPerformance(taskPerformance)
3fafb1b2
JB
489 this.sendToMainWorker({
490 data: res,
d715b7bc 491 taskPerformance,
f59e1027 492 workerId: this.id,
310de0aa 493 taskId
3fafb1b2 494 })
c97c7edb 495 } catch (e) {
985d0e79 496 const errorMessage = this.handleError(e as Error | string)
91ee39ed 497 this.sendToMainWorker({
82f36766 498 taskError: {
310de0aa 499 name: name ?? DEFAULT_TASK_NAME,
985d0e79 500 message: errorMessage,
310de0aa 501 data
82f36766 502 },
21f710aa 503 workerId: this.id,
310de0aa 504 taskId
91ee39ed 505 })
6e9d10db 506 } finally {
c3f498b5 507 this.updateLastTaskTimestamp()
c97c7edb
S
508 }
509 }
510
729c563d 511 /**
4dd93fcf 512 * Runs the given task function asynchronously.
729c563d 513 *
5c4d16da
JB
514 * @param fn - Task function that will be executed.
515 * @param task - Input data for the task function.
729c563d 516 */
c97c7edb 517 protected runAsync (
82ea6492 518 fn: TaskAsyncFunction<Data, Response>,
5c4d16da 519 task: Task<Data>
c97c7edb 520 ): void {
310de0aa
JB
521 const { name, taskId, data } = task
522 let taskPerformance = this.beginTaskPerformance(name)
523 fn(data)
8ebe6c30 524 .then((res) => {
d715b7bc 525 taskPerformance = this.endTaskPerformance(taskPerformance)
3fafb1b2
JB
526 this.sendToMainWorker({
527 data: res,
d715b7bc 528 taskPerformance,
f59e1027 529 workerId: this.id,
310de0aa 530 taskId
3fafb1b2 531 })
c97c7edb
S
532 return null
533 })
8ebe6c30 534 .catch((e) => {
985d0e79 535 const errorMessage = this.handleError(e as Error | string)
91ee39ed 536 this.sendToMainWorker({
82f36766 537 taskError: {
310de0aa 538 name: name ?? DEFAULT_TASK_NAME,
985d0e79 539 message: errorMessage,
310de0aa 540 data
82f36766 541 },
21f710aa 542 workerId: this.id,
310de0aa 543 taskId
91ee39ed 544 })
6e9d10db
JB
545 })
546 .finally(() => {
c3f498b5 547 this.updateLastTaskTimestamp()
c97c7edb 548 })
6e9d10db 549 .catch(EMPTY_FUNCTION)
c97c7edb 550 }
ec8fd331 551
82888165 552 /**
5c4d16da 553 * Gets the task function with the given name.
82888165 554 *
ff128cc9 555 * @param name - Name of the task function that will be returned.
5c4d16da
JB
556 * @returns The task function.
557 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the task function is not found.
82888165 558 */
82ea6492 559 private getTaskFunction (name?: string): TaskFunction<Data, Response> {
ff128cc9 560 name = name ?? DEFAULT_TASK_NAME
ec8fd331
JB
561 const fn = this.taskFunctions.get(name)
562 if (fn == null) {
ace229a1 563 throw new Error(`Task function '${name}' not found`)
ec8fd331
JB
564 }
565 return fn
566 }
62c15a68 567
197b4aa5 568 private beginTaskPerformance (name?: string): TaskPerformance {
8a970421 569 this.checkStatistics()
62c15a68 570 return {
ff128cc9 571 name: name ?? DEFAULT_TASK_NAME,
1c6fe997 572 timestamp: performance.now(),
b6b32453 573 ...(this.statistics.elu && { elu: performance.eventLoopUtilization() })
62c15a68
JB
574 }
575 }
576
d9d31201
JB
577 private endTaskPerformance (
578 taskPerformance: TaskPerformance
579 ): TaskPerformance {
8a970421 580 this.checkStatistics()
62c15a68
JB
581 return {
582 ...taskPerformance,
b6b32453
JB
583 ...(this.statistics.runTime && {
584 runTime: performance.now() - taskPerformance.timestamp
585 }),
586 ...(this.statistics.elu && {
62c15a68 587 elu: performance.eventLoopUtilization(taskPerformance.elu)
b6b32453 588 })
62c15a68
JB
589 }
590 }
8a970421
JB
591
592 private checkStatistics (): void {
593 if (this.statistics == null) {
594 throw new Error('Performance statistics computation requirements not set')
595 }
596 }
c3f498b5
JB
597
598 private updateLastTaskTimestamp (): void {
29d8b961 599 if (this.activeInterval != null) {
c3f498b5
JB
600 this.lastTaskTimestamp = performance.now()
601 }
602 }
c97c7edb 603}