chore(deps-dev): bump tatami-ng to 0.6.0
[poolifier.git] / src / worker / abstract-worker.ts
CommitLineData
6677a3d3 1import type { Worker } from 'node:cluster'
ded253e2
JB
2import type { MessagePort } from 'node:worker_threads'
3
97231086
JB
4import { performance } from 'node:perf_hooks'
5
d715b7bc
JB
6import type {
7 MessageValue,
5c4d16da 8 Task,
31847469 9 TaskFunctionProperties,
d715b7bc 10 TaskPerformance,
3a502712 11 WorkerStatistics,
d35e5717 12} from '../utility-types.js'
b6b32453 13import type {
82ea6492
JB
14 TaskAsyncFunction,
15 TaskFunction,
31847469 16 TaskFunctionObject,
4e38fd21 17 TaskFunctionOperationResult,
b6b32453 18 TaskFunctions,
3a502712 19 TaskSyncFunction,
d35e5717 20} from './task-functions.js'
97231086
JB
21
22import {
23 buildTaskFunctionProperties,
24 DEFAULT_TASK_NAME,
25 EMPTY_FUNCTION,
26 isAsyncFunction,
27 isPlainObject,
28} from '../utils.js'
9a38f99e
JB
29import {
30 checkTaskFunctionName,
bcfb06ce 31 checkValidTaskFunctionObjectEntry,
3a502712 32 checkValidWorkerOptions,
d35e5717 33} from './utils.js'
ded253e2 34import { KillBehaviors, type WorkerOptions } from './worker-options.js'
4c35177b 35
978aad6f 36const DEFAULT_MAX_INACTIVE_TIME = 60000
d38d0e30
JB
37const DEFAULT_WORKER_OPTIONS: WorkerOptions = {
38 /**
39 * The kill behavior option on this worker or its default value.
40 */
41 killBehavior: KillBehaviors.SOFT,
97231086
JB
42 /**
43 * The function to call when the worker is killed.
44 */
45 killHandler: EMPTY_FUNCTION,
d38d0e30
JB
46 /**
47 * The maximum time to keep this worker active while idle.
48 * The pool automatically checks and terminates this worker when the time expires.
49 */
50 maxInactiveTime: DEFAULT_MAX_INACTIVE_TIME,
d38d0e30 51}
c97c7edb 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier workers.
38e795c1 55 * @typeParam MainWorker - Type of main worker.
e102732c
JB
56 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
57 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractWorker<
97231086 60 MainWorker extends MessagePort | Worker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
69b70a7e 63> {
f59e1027 64 /**
97231086 65 * Handler id of the `activeInterval` worker activity check.
f59e1027 66 */
97231086 67 protected activeInterval?: NodeJS.Timeout
a86b6df1 68 /**
97231086 69 * Worker id.
a86b6df1 70 */
97231086 71 protected abstract id: number
729c563d
S
72 /**
73 * Timestamp of the last task processed by this worker.
74 */
a9d9ea34 75 protected lastTaskTimestamp!: number
97231086
JB
76 /**
77 * Runs the given task.
78 * @param task - The task to execute.
79 */
80 protected readonly run = (task: Task<Data>): void => {
81 const { data, name, taskId } = task
82 const taskFunctionName = name ?? DEFAULT_TASK_NAME
83 if (!this.taskFunctions.has(taskFunctionName)) {
84 this.sendToMainWorker({
85 taskId,
86 workerError: {
87 data,
88 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
89 message: `Task function '${name!}' not found`,
90 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
91 name: name!,
92 },
93 })
94 return
95 }
96 const fn = this.taskFunctions.get(taskFunctionName)?.taskFunction
97 if (isAsyncFunction(fn)) {
98 this.runAsync(fn as TaskAsyncFunction<Data, Response>, task)
99 } else {
100 this.runSync(fn as TaskSyncFunction<Data, Response>, task)
101 }
102 }
103
104 /**
105 * Runs the given task function asynchronously.
106 * @param fn - Task function that will be executed.
107 * @param task - Input data for the task function.
108 */
109 protected readonly runAsync = (
110 fn: TaskAsyncFunction<Data, Response>,
111 task: Task<Data>
112 ): void => {
113 const { data, name, taskId } = task
114 let taskPerformance = this.beginTaskPerformance(name)
115 fn(data)
116 .then(res => {
117 taskPerformance = this.endTaskPerformance(taskPerformance)
118 this.sendToMainWorker({
119 data: res,
120 taskId,
121 taskPerformance,
122 })
123 return undefined
124 })
125 .catch((error: unknown) => {
126 this.sendToMainWorker({
127 taskId,
128 workerError: {
129 data,
130 message: this.handleError(error as Error | string),
131 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
132 name: name!,
133 },
134 })
135 })
136 .finally(() => {
137 this.updateLastTaskTimestamp()
138 })
139 .catch(EMPTY_FUNCTION)
140 }
141
142 /**
143 * Runs the given task function synchronously.
144 * @param fn - Task function that will be executed.
145 * @param task - Input data for the task function.
146 */
147 protected readonly runSync = (
148 fn: TaskSyncFunction<Data, Response>,
149 task: Task<Data>
150 ): void => {
151 const { data, name, taskId } = task
152 try {
153 let taskPerformance = this.beginTaskPerformance(name)
154 const res = fn(data)
155 taskPerformance = this.endTaskPerformance(taskPerformance)
156 this.sendToMainWorker({
157 data: res,
158 taskId,
159 taskPerformance,
160 })
161 } catch (error) {
162 this.sendToMainWorker({
163 taskId,
164 workerError: {
165 data,
166 message: this.handleError(error as Error | string),
167 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
168 name: name!,
169 },
170 })
171 } finally {
172 this.updateLastTaskTimestamp()
173 }
174 }
175
b6b32453 176 /**
8a970421 177 * Performance statistics computation requirements.
b6b32453 178 */
c63a35a0 179 protected statistics?: WorkerStatistics
97231086 180
729c563d 181 /**
0fd6ad46 182 * Task function object(s) processed by the worker when the pool's `execute` method is invoked.
729c563d 183 */
97231086 184 protected taskFunctions!: Map<string, TaskFunctionObject<Data, Response>>
49890030 185
c97c7edb 186 /**
729c563d 187 * Constructs a new poolifier worker.
38e795c1 188 * @param isMain - Whether this is the main worker or not.
38e795c1 189 * @param mainWorker - Reference to main worker.
0fd6ad46 190 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execute` method is invoked. The first function is the default function.
38e795c1 191 * @param opts - Options for the worker.
c97c7edb
S
192 */
193 public constructor (
c63a35a0 194 protected readonly isMain: boolean | undefined,
97231086 195 private readonly mainWorker: MainWorker | null | undefined,
82ea6492 196 taskFunctions: TaskFunction<Data, Response> | TaskFunctions<Data, Response>,
d38d0e30 197 protected opts: WorkerOptions = DEFAULT_WORKER_OPTIONS
c97c7edb 198 ) {
9d2d0da1
JB
199 if (this.isMain == null) {
200 throw new Error('isMain parameter is mandatory')
201 }
a86b6df1 202 this.checkTaskFunctions(taskFunctions)
9d2d0da1 203 this.checkWorkerOptions(this.opts)
1f68cede 204 if (!this.isMain) {
0cb6edc2 205 // Should be once() but Node.js on windows has a bug that prevents it from working
e2898b4f 206 this.getMainWorker().on('message', this.handleReadyMessage.bind(this))
c97c7edb
S
207 }
208 }
209
97231086
JB
210 /**
211 * Returns the main worker.
212 * @returns Reference to the main worker.
213 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
214 */
215 protected getMainWorker (): MainWorker {
216 if (this.mainWorker == null) {
217 throw new Error('Main worker not set')
218 }
219 return this.mainWorker
41aa7dcd
JB
220 }
221
222 /**
97231086
JB
223 * Handles an error and convert it to a string so it can be sent back to the main worker.
224 * @param error - The error raised by the worker.
225 * @returns The error message.
41aa7dcd 226 */
97231086
JB
227 protected handleError (error: Error | string): string {
228 return error instanceof Error ? error.message : error
41aa7dcd
JB
229 }
230
968a2e8c 231 /**
97231086
JB
232 * Handles a kill message sent by the main worker.
233 * @param message - The kill message.
968a2e8c 234 */
97231086
JB
235 protected handleKillMessage (message: MessageValue<Data>): void {
236 this.stopCheckActive()
237 if (isAsyncFunction(this.opts.killHandler)) {
238 ;(this.opts.killHandler as () => Promise<void>)()
239 .then(() => {
240 this.sendToMainWorker({ kill: 'success' })
241 return undefined
242 })
243 .catch(() => {
244 this.sendToMainWorker({ kill: 'failure' })
245 })
246 } else {
247 try {
248 ;(this.opts.killHandler as (() => void) | undefined)?.()
249 this.sendToMainWorker({ kill: 'success' })
250 } catch {
251 this.sendToMainWorker({ kill: 'failure' })
252 }
6703b9f4 253 }
968a2e8c
JB
254 }
255
256 /**
97231086
JB
257 * Handles the ready message sent by the main worker.
258 * @param message - The ready message.
a038b517
JB
259 */
260 protected abstract handleReadyMessage (message: MessageValue<Data>): void
261
6703b9f4
JB
262 protected handleTaskFunctionOperationMessage (
263 message: MessageValue<Data>
264 ): void {
97231086 265 const { taskFunction, taskFunctionOperation, taskFunctionProperties } =
31847469
JB
266 message
267 if (taskFunctionProperties == null) {
7f0e1334 268 throw new Error(
31847469 269 'Cannot handle task function operation message without task function properties'
7f0e1334
JB
270 )
271 }
272 let response: TaskFunctionOperationResult
d2dd5ef5
JB
273 switch (taskFunctionOperation) {
274 case 'add':
c79f502d
JB
275 if (typeof taskFunction !== 'string') {
276 throw new Error(
277 `Cannot handle task function operation ${taskFunctionOperation} message without task function`
278 )
279 }
f9e6cc35
JB
280 response = this.addTaskFunction(taskFunctionProperties.name, {
281 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func, @typescript-eslint/no-unsafe-call
282 taskFunction: new Function(
283 `return ${taskFunction}`
284 )() as TaskFunction<Data, Response>,
285 ...(taskFunctionProperties.priority != null && {
286 priority: taskFunctionProperties.priority,
287 }),
288 ...(taskFunctionProperties.strategy != null && {
289 strategy: taskFunctionProperties.strategy,
290 }),
291 })
d2dd5ef5 292 break
d2dd5ef5 293 case 'default':
31847469 294 response = this.setDefaultTaskFunction(taskFunctionProperties.name)
d2dd5ef5 295 break
60b5fabe 296 case 'remove':
297 response = this.removeTaskFunction(taskFunctionProperties.name)
298 break
d2dd5ef5 299 default:
97231086 300 response = { error: new Error('Unknown task operation'), status: false }
d2dd5ef5 301 break
6703b9f4 302 }
97231086 303 const { error, status } = response
6703b9f4
JB
304 this.sendToMainWorker({
305 taskFunctionOperation,
4a537a1a 306 taskFunctionOperationStatus: status,
31847469 307 taskFunctionProperties,
4a537a1a
JB
308 ...(!status &&
309 error != null && {
adee6053 310 workerError: {
4a537a1a 311 message: this.handleError(error as Error | string),
97231086 312 name: taskFunctionProperties.name,
3a502712
JB
313 },
314 }),
6703b9f4
JB
315 })
316 }
317
984dc9c8 318 /**
97231086
JB
319 * Worker message listener.
320 * @param message - The received message.
984dc9c8 321 */
97231086
JB
322 protected messageListener (message: MessageValue<Data>): void {
323 this.checkMessageWorkerId(message)
324 const {
325 checkActive,
326 data,
327 kill,
328 statistics,
329 taskFunctionOperation,
330 taskId,
331 } = message
332 if (statistics != null) {
333 // Statistics message received
334 this.statistics = statistics
335 } else if (checkActive != null) {
336 // Check active message received
337 checkActive ? this.startCheckActive() : this.stopCheckActive()
338 } else if (taskFunctionOperation != null) {
339 // Task function operation message received
340 this.handleTaskFunctionOperationMessage(message)
341 } else if (taskId != null && data != null) {
342 // Task message received
343 this.run(message)
344 } else if (kill === true) {
345 // Kill message received
346 this.handleKillMessage(message)
07588f30 347 }
984dc9c8
JB
348 }
349
9e746eec 350 /**
97231086 351 * Sends task functions properties to the main worker.
9e746eec 352 */
97231086
JB
353 protected sendTaskFunctionsPropertiesToMainWorker (): void {
354 this.sendToMainWorker({
355 taskFunctionsProperties: this.listTaskFunctionsProperties(),
356 })
9e746eec
JB
357 }
358
48487131 359 /**
97231086
JB
360 * Sends a message to main worker.
361 * @param message - The response message.
48487131 362 */
97231086
JB
363 protected abstract sendToMainWorker (
364 message: MessageValue<Response, Data>
365 ): void
75d3401a 366
97231086
JB
367 private beginTaskPerformance (name?: string): TaskPerformance {
368 if (this.statistics == null) {
369 throw new Error('Performance statistics computation requirements not set')
370 }
371 return {
372 name: name ?? DEFAULT_TASK_NAME,
373 timestamp: performance.now(),
374 ...(this.statistics.elu && {
375 elu: performance.eventLoopUtilization(),
376 }),
c3f498b5 377 }
48487131
JB
378 }
379
380 /**
381 * Checks if the worker should be terminated, because its living too long.
382 */
b0a4db63 383 private checkActive (): void {
48487131
JB
384 if (
385 performance.now() - this.lastTaskTimestamp >
386 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME)
387 ) {
895b5341 388 this.sendToMainWorker({ kill: this.opts.killBehavior })
48487131
JB
389 }
390 }
391
729c563d 392 /**
97231086
JB
393 * Check if the message worker id is set and matches the worker id.
394 * @param message - The message to check.
395 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
729c563d 396 */
97231086
JB
397 private checkMessageWorkerId (message: MessageValue<Data>): void {
398 if (message.workerId == null) {
399 throw new Error('Message worker id is not set')
400 } else if (message.workerId !== this.id) {
401 throw new Error(
402 `Message worker id ${message.workerId.toString()} does not match the worker id ${this.id.toString()}`
403 )
838898f1 404 }
838898f1 405 }
c97c7edb 406
729c563d 407 /**
97231086
JB
408 * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
409 * @param taskFunctions - The task function(s) parameter that should be checked.
729c563d 410 */
97231086
JB
411 private checkTaskFunctions (
412 taskFunctions:
413 | TaskFunction<Data, Response>
414 | TaskFunctions<Data, Response>
415 | undefined
416 ): void {
417 if (taskFunctions == null) {
418 throw new Error('taskFunctions parameter is mandatory')
419 }
420 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
421 if (typeof taskFunctions === 'function') {
422 const fnObj = { taskFunction: taskFunctions.bind(this) }
423 this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj)
424 this.taskFunctions.set(
425 typeof taskFunctions.name === 'string' &&
426 taskFunctions.name.trim().length > 0
427 ? taskFunctions.name
428 : 'fn1',
429 fnObj
430 )
431 } else if (isPlainObject(taskFunctions)) {
432 let firstEntry = true
433 for (let [name, fnObj] of Object.entries(taskFunctions)) {
434 if (typeof fnObj === 'function') {
435 fnObj = { taskFunction: fnObj } satisfies TaskFunctionObject<
436 Data,
437 Response
438 >
439 }
440 checkValidTaskFunctionObjectEntry<Data, Response>(name, fnObj)
441 fnObj.taskFunction = fnObj.taskFunction.bind(this)
442 if (firstEntry) {
443 this.taskFunctions.set(DEFAULT_TASK_NAME, fnObj)
444 firstEntry = false
445 }
446 this.taskFunctions.set(name, fnObj)
447 }
448 if (firstEntry) {
449 throw new Error('taskFunctions parameter object is empty')
450 }
451 } else {
452 throw new TypeError(
453 'taskFunctions parameter is not a function or a plain object'
454 )
455 }
456 }
457
458 private checkWorkerOptions (opts: WorkerOptions): void {
459 checkValidWorkerOptions(opts)
460 this.opts = { ...DEFAULT_WORKER_OPTIONS, ...opts }
461 }
462
463 private endTaskPerformance (
464 taskPerformance: TaskPerformance
465 ): TaskPerformance {
466 if (this.statistics == null) {
467 throw new Error('Performance statistics computation requirements not set')
468 }
469 return {
470 ...taskPerformance,
471 ...(this.statistics.runTime && {
472 runTime: performance.now() - taskPerformance.timestamp,
473 }),
474 ...(this.statistics.elu && {
475 elu: performance.eventLoopUtilization(taskPerformance.elu),
476 }),
477 }
90d7d101
JB
478 }
479
729c563d 480 /**
97231086 481 * Starts the worker check active interval.
729c563d 482 */
97231086
JB
483 private startCheckActive (): void {
484 this.lastTaskTimestamp = performance.now()
485 this.activeInterval = setInterval(
486 this.checkActive.bind(this),
487 (this.opts.maxInactiveTime ?? DEFAULT_MAX_INACTIVE_TIME) / 2
488 )
c97c7edb
S
489 }
490
5c4d16da 491 /**
97231086 492 * Stops the worker check active interval.
5c4d16da 493 */
97231086
JB
494 private stopCheckActive (): void {
495 if (this.activeInterval != null) {
496 clearInterval(this.activeInterval)
497 delete this.activeInterval
9d2d0da1 498 }
97231086
JB
499 }
500
501 private updateLastTaskTimestamp (): void {
502 if (this.activeInterval != null) {
503 this.lastTaskTimestamp = performance.now()
5c4d16da
JB
504 }
505 }
506
729c563d 507 /**
97231086
JB
508 * Adds a task function to the worker.
509 * If a task function with the same name already exists, it is replaced.
510 * @param name - The name of the task function to add.
511 * @param fn - The task function to add.
512 * @returns Whether the task function was added or not.
729c563d 513 */
97231086
JB
514 public addTaskFunction (
515 name: string,
516 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
517 ): TaskFunctionOperationResult {
c97c7edb 518 try {
97231086
JB
519 checkTaskFunctionName(name)
520 if (name === DEFAULT_TASK_NAME) {
521 throw new Error(
522 'Cannot add a task function with the default reserved name'
523 )
524 }
525 if (typeof fn === 'function') {
526 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
527 }
528 checkValidTaskFunctionObjectEntry<Data, Response>(name, fn)
529 fn.taskFunction = fn.taskFunction.bind(this)
530 if (
531 this.taskFunctions.get(name) ===
532 this.taskFunctions.get(DEFAULT_TASK_NAME)
533 ) {
534 this.taskFunctions.set(DEFAULT_TASK_NAME, fn)
535 }
536 this.taskFunctions.set(name, fn)
537 this.sendTaskFunctionsPropertiesToMainWorker()
538 return { status: true }
6703b9f4 539 } catch (error) {
97231086 540 return { error: error as Error, status: false }
c97c7edb
S
541 }
542 }
543
729c563d 544 /**
97231086
JB
545 * Checks if the worker has a task function with the given name.
546 * @param name - The name of the task function to check.
547 * @returns Whether the worker has a task function with the given name or not.
729c563d 548 */
97231086
JB
549 public hasTaskFunction (name: string): TaskFunctionOperationResult {
550 try {
551 checkTaskFunctionName(name)
552 } catch (error) {
553 return { error: error as Error, status: false }
554 }
555 return { status: this.taskFunctions.has(name) }
c97c7edb 556 }
ec8fd331 557
97231086
JB
558 /**
559 * Lists the properties of the worker's task functions.
560 * @returns The properties of the worker's task functions.
561 */
562 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
563 let defaultTaskFunctionName = DEFAULT_TASK_NAME
564 for (const [name, fnObj] of this.taskFunctions) {
565 if (
566 name !== DEFAULT_TASK_NAME &&
567 fnObj === this.taskFunctions.get(DEFAULT_TASK_NAME)
568 ) {
569 defaultTaskFunctionName = name
570 break
571 }
c63a35a0 572 }
97231086
JB
573 const taskFunctionsProperties: TaskFunctionProperties[] = []
574 for (const [name, fnObj] of this.taskFunctions) {
575 if (name === DEFAULT_TASK_NAME || name === defaultTaskFunctionName) {
576 continue
577 }
578 taskFunctionsProperties.push(buildTaskFunctionProperties(name, fnObj))
62c15a68 579 }
97231086
JB
580 return [
581 buildTaskFunctionProperties(
582 DEFAULT_TASK_NAME,
583 this.taskFunctions.get(DEFAULT_TASK_NAME)
584 ),
585 buildTaskFunctionProperties(
586 defaultTaskFunctionName,
587 this.taskFunctions.get(defaultTaskFunctionName)
588 ),
589 ...taskFunctionsProperties,
590 ]
62c15a68
JB
591 }
592
97231086
JB
593 /**
594 * Removes a task function from the worker.
595 * @param name - The name of the task function to remove.
596 * @returns Whether the task function existed and was removed or not.
597 */
598 public removeTaskFunction (name: string): TaskFunctionOperationResult {
599 try {
600 checkTaskFunctionName(name)
601 if (name === DEFAULT_TASK_NAME) {
602 throw new Error(
603 'Cannot remove the task function with the default reserved name'
604 )
605 }
606 if (
607 this.taskFunctions.get(name) ===
608 this.taskFunctions.get(DEFAULT_TASK_NAME)
609 ) {
610 throw new Error(
611 'Cannot remove the task function used as the default task function'
612 )
613 }
614 const deleteStatus = this.taskFunctions.delete(name)
615 this.sendTaskFunctionsPropertiesToMainWorker()
616 return { status: deleteStatus }
617 } catch (error) {
618 return { error: error as Error, status: false }
62c15a68
JB
619 }
620 }
8a970421 621
97231086
JB
622 /**
623 * Sets the default task function to use in the worker.
624 * @param name - The name of the task function to use as default task function.
625 * @returns Whether the default task function was set or not.
626 */
627 public setDefaultTaskFunction (name: string): TaskFunctionOperationResult {
628 try {
629 checkTaskFunctionName(name)
630 if (name === DEFAULT_TASK_NAME) {
631 throw new Error(
632 'Cannot set the default task function reserved name as the default task function'
633 )
634 }
635 if (!this.taskFunctions.has(name)) {
636 throw new Error(
637 'Cannot set the default task function to a non-existing task function'
638 )
639 }
640 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
641 this.taskFunctions.set(DEFAULT_TASK_NAME, this.taskFunctions.get(name)!)
642 this.sendTaskFunctionsPropertiesToMainWorker()
643 return { status: true }
644 } catch (error) {
645 return { error: error as Error, status: false }
c3f498b5
JB
646 }
647 }
c97c7edb 648}