1 import type { Worker
} from
'node:cluster'
2 import type { MessagePort
} from
'node:worker_threads'
4 import { performance
} from
'node:perf_hooks'
9 TaskFunctionProperties
,
12 } from
'../utility-types.js'
17 TaskFunctionOperationResult
,
20 } from
'./task-functions.js'
23 buildTaskFunctionProperties
,
29 import { AbortError
} from
'./abort-error.js'
31 checkTaskFunctionName
,
32 checkValidTaskFunctionObjectEntry
,
33 checkValidWorkerOptions
,
35 import { KillBehaviors
, type WorkerOptions
} from
'./worker-options.js'
37 const DEFAULT_MAX_INACTIVE_TIME
= 60000
38 const DEFAULT_WORKER_OPTIONS
: Readonly
<WorkerOptions
> = Object.freeze({
40 * The kill behavior option on this worker or its default value.
42 killBehavior
: KillBehaviors
.SOFT
,
44 * The function to call when the worker is killed.
46 killHandler
: EMPTY_FUNCTION
,
48 * The maximum time to keep this worker active while idle.
49 * The pool automatically checks and terminates this worker when the time expires.
51 maxInactiveTime
: DEFAULT_MAX_INACTIVE_TIME
,
55 * Base class that implements some shared logic for all poolifier workers.
56 * @typeParam MainWorker - Type of main worker.
57 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
58 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
60 export abstract class AbstractWorker
<
61 MainWorker
extends MessagePort
| Worker
,
66 * Handler id of the `activeInterval` worker activity check.
68 protected activeInterval
?: NodeJS
.Timeout
72 protected abstract readonly id
: number
74 * Timestamp of the last task processed by this worker.
76 protected lastTaskTimestamp
!: number
79 * Performance statistics computation requirements.
81 protected statistics
?: WorkerStatistics
84 * Task abort functions processed by the worker when task operation 'abort' is received.
86 protected taskAbortFunctions
: Map
<
87 `${string}-${string}-${string}-${string}-${string}`,
92 * Task function object(s) processed by the worker when the pool's `execute` method is invoked.
94 protected taskFunctions
!: Map
<string, TaskFunctionObject
<Data
, Response
>>
97 * Constructs a new poolifier worker.
98 * @param isMain - Whether this is the main worker or not.
99 * @param mainWorker - Reference to main worker.
100 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execute` method is invoked. The first function is the default function.
101 * @param opts - Options for the worker.
104 protected readonly isMain
: boolean | undefined,
105 private readonly mainWorker
: MainWorker
| null | undefined,
106 taskFunctions
: TaskFunction
<Data
, Response
> | TaskFunctions
<Data
, Response
>,
107 protected opts
: WorkerOptions
= DEFAULT_WORKER_OPTIONS
109 if (this.isMain
== null) {
110 throw new Error('isMain parameter is mandatory')
112 this.checkTaskFunctions(taskFunctions
)
113 this.taskAbortFunctions
= new Map
<
114 `${string}-${string}-${string}-${string}-${string}`,
117 this.checkWorkerOptions(this.opts
)
119 this.getMainWorker().once('message', this.handleReadyMessage
.bind(this))
124 * Adds a task function to the worker.
125 * If a task function with the same name already exists, it is replaced.
126 * @param name - The name of the task function to add.
127 * @param fn - The task function to add.
128 * @returns Whether the task function was added or not.
130 public addTaskFunction (
132 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
133 ): TaskFunctionOperationResult
{
135 checkTaskFunctionName(name
)
136 if (name
=== DEFAULT_TASK_NAME
) {
138 'Cannot add a task function with the default reserved name'
141 if (typeof fn
=== 'function') {
142 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
144 checkValidTaskFunctionObjectEntry
<Data
, Response
>(name
, fn
)
145 fn
.taskFunction
= fn
.taskFunction
.bind(this)
147 this.taskFunctions
.get(name
) ===
148 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
150 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fn
)
152 this.taskFunctions
.set(name
, fn
)
153 this.sendTaskFunctionsPropertiesToMainWorker()
154 return { status: true }
156 return { error
: error
as Error, status: false }
161 * Checks if the worker has a task function with the given name.
162 * @param name - The name of the task function to check.
163 * @returns Whether the worker has a task function with the given name or not.
165 public hasTaskFunction (name
: string): TaskFunctionOperationResult
{
167 checkTaskFunctionName(name
)
169 return { error
: error
as Error, status: false }
171 return { status: this.taskFunctions
.has(name
) }
175 * Lists the properties of the worker's task functions.
176 * @returns The properties of the worker's task functions.
178 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
179 let defaultTaskFunctionName
= DEFAULT_TASK_NAME
180 for (const [name
, fnObj
] of this.taskFunctions
) {
182 name
!== DEFAULT_TASK_NAME
&&
183 fnObj
=== this.taskFunctions
.get(DEFAULT_TASK_NAME
)
185 defaultTaskFunctionName
= name
189 const taskFunctionsProperties
: TaskFunctionProperties
[] = []
190 for (const [name
, fnObj
] of this.taskFunctions
) {
191 if (name
=== DEFAULT_TASK_NAME
|| name
=== defaultTaskFunctionName
) {
194 taskFunctionsProperties
.push(buildTaskFunctionProperties(name
, fnObj
))
197 buildTaskFunctionProperties(
199 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
201 buildTaskFunctionProperties(
202 defaultTaskFunctionName
,
203 this.taskFunctions
.get(defaultTaskFunctionName
)
205 ...taskFunctionsProperties
,
210 * Removes a task function from the worker.
211 * @param name - The name of the task function to remove.
212 * @returns Whether the task function existed and was removed or not.
214 public removeTaskFunction (name
: string): TaskFunctionOperationResult
{
216 checkTaskFunctionName(name
)
217 if (name
=== DEFAULT_TASK_NAME
) {
219 'Cannot remove the task function with the default reserved name'
223 this.taskFunctions
.get(name
) ===
224 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
227 'Cannot remove the task function used as the default task function'
230 const deleteStatus
= this.taskFunctions
.delete(name
)
231 this.sendTaskFunctionsPropertiesToMainWorker()
232 return { status: deleteStatus
}
234 return { error
: error
as Error, status: false }
239 * Sets the default task function to use in the worker.
240 * @param name - The name of the task function to use as default task function.
241 * @returns Whether the default task function was set or not.
243 public setDefaultTaskFunction (name
: string): TaskFunctionOperationResult
{
245 checkTaskFunctionName(name
)
246 if (name
=== DEFAULT_TASK_NAME
) {
248 'Cannot set the default task function reserved name as the default task function'
251 if (!this.taskFunctions
.has(name
)) {
253 'Cannot set the default task function to a non-existing task function'
256 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
257 this.taskFunctions
.set(DEFAULT_TASK_NAME
, this.taskFunctions
.get(name
)!)
258 this.sendTaskFunctionsPropertiesToMainWorker()
259 return { status: true }
261 return { error
: error
as Error, status: false }
266 * Returns the main worker.
267 * @returns Reference to the main worker.
268 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
270 protected getMainWorker (): MainWorker
{
271 if (this.mainWorker
== null) {
272 throw new Error('Main worker not set')
274 return this.mainWorker
278 * Handles a worker error.
279 * @param error - The error raised by the worker.
280 * @returns The worker error object.
282 protected abstract handleError (error
: Error): {
290 * Handles a kill message sent by the main worker.
291 * @param message - The kill message.
293 protected handleKillMessage (message
: MessageValue
<Data
>): void {
294 this.stopCheckActive()
295 if (isAsyncFunction(this.opts
.killHandler
)) {
296 ;(this.opts
.killHandler
as () => Promise
<void>)()
298 this.sendToMainWorker({ kill
: 'success' })
302 this.sendToMainWorker({ kill
: 'failure' })
306 ;(this.opts
.killHandler
as (() => void) | undefined)?.()
307 this.sendToMainWorker({ kill
: 'success' })
309 this.sendToMainWorker({ kill
: 'failure' })
315 * Handles the ready message sent by the main worker.
316 * @param message - The ready message.
318 protected abstract handleReadyMessage (message
: MessageValue
<Data
>): void
320 protected handleTaskFunctionOperationMessage (
321 message
: MessageValue
<Data
>
323 const { taskFunction
, taskFunctionOperation
, taskFunctionProperties
} =
325 if (taskFunctionProperties
== null) {
327 'Cannot handle task function operation message without task function properties'
330 let response
: TaskFunctionOperationResult
331 switch (taskFunctionOperation
) {
333 if (typeof taskFunction
!== 'string') {
335 `Cannot handle task function operation ${taskFunctionOperation} message without task function`
338 response
= this.addTaskFunction(taskFunctionProperties
.name
, {
339 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func, @typescript-eslint/no-unsafe-call
340 taskFunction
: new Function(
341 `return (${taskFunction})`
342 )() as TaskFunction
<Data
, Response
>,
343 ...(taskFunctionProperties
.priority
!= null && {
344 priority
: taskFunctionProperties
.priority
,
346 ...(taskFunctionProperties
.strategy
!= null && {
347 strategy
: taskFunctionProperties
.strategy
,
352 response
= this.setDefaultTaskFunction(taskFunctionProperties
.name
)
355 response
= this.removeTaskFunction(taskFunctionProperties
.name
)
360 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
361 `Unknown task function operation: ${taskFunctionOperation!}`
367 const { error
, status } = response
368 this.sendToMainWorker({
369 taskFunctionOperation
,
370 taskFunctionOperationStatus
: status,
371 taskFunctionProperties
,
375 name
: taskFunctionProperties
.name
,
376 ...this.handleError(error
),
383 * Worker message listener.
384 * @param message - The received message.
386 protected messageListener (message
: MessageValue
<Data
>): void {
387 this.checkMessageWorkerId(message
)
393 taskFunctionOperation
,
397 if (statistics
!= null) {
398 // Statistics message received
399 this.statistics
= statistics
400 } else if (checkActive
!= null) {
401 // Check active message received
402 checkActive
? this.startCheckActive() : this.stopCheckActive()
403 } else if (taskFunctionOperation
!= null) {
404 // Task function operation message received
405 this.handleTaskFunctionOperationMessage(message
)
406 } else if (taskId
!= null && data
!= null) {
407 // Task message received
409 } else if (taskOperation
=== 'abort' && taskId
!= null) {
410 // Abort task operation message received
411 if (this.taskAbortFunctions
.has(taskId
)) {
412 this.taskAbortFunctions
.get(taskId
)?.()
414 } else if (kill
=== true) {
415 // Kill message received
416 this.handleKillMessage(message
)
421 * Runs the given task.
422 * @param task - The task to execute.
424 protected readonly run
= (task
: Task
<Data
>): void => {
425 const { abortable
, data
, name
, taskId
} = task
426 const taskFunctionName
= name
?? DEFAULT_TASK_NAME
427 if (!this.taskFunctions
.has(taskFunctionName
)) {
428 this.sendToMainWorker({
434 new Error(`Task function '${taskFunctionName}' not found`)
440 let fn
: TaskFunction
<Data
, Response
>
441 if (abortable
=== true) {
442 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
443 fn
= this.getAbortableTaskFunction(taskFunctionName
, taskId
!)
445 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
446 fn
= this.taskFunctions
.get(taskFunctionName
)!.taskFunction
448 if (isAsyncFunction(fn
)) {
449 this.runAsync(fn
as TaskAsyncFunction
<Data
, Response
>, task
)
451 this.runSync(fn
as TaskSyncFunction
<Data
, Response
>, task
)
456 * Runs the given task function asynchronously.
457 * @param fn - Task function that will be executed.
458 * @param task - Input data for the task function.
460 protected readonly runAsync
= (
461 fn
: TaskAsyncFunction
<Data
, Response
>,
464 const { abortable
, data
, name
, taskId
} = task
465 let taskPerformance
= this.beginTaskPerformance(name
)
468 taskPerformance
= this.endTaskPerformance(taskPerformance
)
469 this.sendToMainWorker({
476 .catch((error
: unknown
) => {
477 this.sendToMainWorker({
482 ...this.handleError(error
as Error),
487 this.updateLastTaskTimestamp()
488 if (abortable
=== true) {
489 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
490 this.taskAbortFunctions
.delete(taskId
!)
493 .catch(EMPTY_FUNCTION
)
497 * Runs the given task function synchronously.
498 * @param fn - Task function that will be executed.
499 * @param task - Input data for the task function.
501 protected readonly runSync
= (
502 fn
: TaskSyncFunction
<Data
, Response
>,
505 const { abortable
, data
, name
, taskId
} = task
507 let taskPerformance
= this.beginTaskPerformance(name
)
509 taskPerformance
= this.endTaskPerformance(taskPerformance
)
510 this.sendToMainWorker({
516 this.sendToMainWorker({
521 ...this.handleError(error
as Error),
525 this.updateLastTaskTimestamp()
526 if (abortable
=== true) {
527 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
528 this.taskAbortFunctions
.delete(taskId
!)
534 * Sends task functions properties to the main worker.
536 protected sendTaskFunctionsPropertiesToMainWorker (): void {
537 this.sendToMainWorker({
538 taskFunctionsProperties
: this.listTaskFunctionsProperties(),
543 * Sends a message to main worker.
544 * @param message - The response message.
546 protected abstract sendToMainWorker (
547 message
: MessageValue
<Response
, Data
>
550 private beginTaskPerformance (name
?: string): TaskPerformance
{
551 if (this.statistics
== null) {
552 throw new Error('Performance statistics computation requirements not set')
555 name
: name
?? DEFAULT_TASK_NAME
,
556 timestamp
: performance
.now(),
557 ...(this.statistics
.elu
&& {
558 elu
: performance
.eventLoopUtilization(),
564 * Checks if the worker should be terminated, because its living too long.
566 private checkActive (): void {
568 performance
.now() - this.lastTaskTimestamp
>
569 (this.opts
.maxInactiveTime
?? DEFAULT_MAX_INACTIVE_TIME
)
571 this.sendToMainWorker({ kill
: this.opts
.killBehavior
})
576 * Check if the message worker id is set and matches the worker id.
577 * @param message - The message to check.
578 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
580 private checkMessageWorkerId (message
: MessageValue
<Data
>): void {
581 if (message
.workerId
== null) {
583 `Message worker id is not set: ${JSON.stringify(message)}`
586 if (message
.workerId
!== this.id
) {
588 `Message worker id ${message.workerId.toString()} does not match the worker id ${this.id.toString()}: ${JSON.stringify(message)}`
594 * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
595 * @param taskFunctions - The task function(s) parameter that should be checked.
597 private checkTaskFunctions (
599 | TaskFunction
<Data
, Response
>
600 | TaskFunctions
<Data
, Response
>
603 if (taskFunctions
== null) {
604 throw new Error('taskFunctions parameter is mandatory')
606 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
607 if (typeof taskFunctions
=== 'function') {
608 const fnObj
= { taskFunction
: taskFunctions
.bind(this) }
609 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fnObj
)
610 this.taskFunctions
.set(
611 typeof taskFunctions
.name
=== 'string' &&
612 taskFunctions
.name
.trim().length
> 0
617 } else if (isPlainObject(taskFunctions
)) {
618 let firstEntry
= true
619 for (let [name
, fnObj
] of Object.entries(taskFunctions
)) {
620 if (typeof fnObj
=== 'function') {
621 fnObj
= { taskFunction
: fnObj
} satisfies TaskFunctionObject
<
626 checkValidTaskFunctionObjectEntry
<Data
, Response
>(name
, fnObj
)
627 fnObj
.taskFunction
= fnObj
.taskFunction
.bind(this)
629 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fnObj
)
632 this.taskFunctions
.set(name
, fnObj
)
635 throw new Error('taskFunctions parameter object is empty')
639 'taskFunctions parameter is not a function or a plain object'
644 private checkWorkerOptions (opts
: WorkerOptions
): void {
645 checkValidWorkerOptions(opts
)
646 this.opts
= { ...DEFAULT_WORKER_OPTIONS
, ...opts
}
649 private endTaskPerformance (
650 taskPerformance
: TaskPerformance
652 if (this.statistics
== null) {
653 throw new Error('Performance statistics computation requirements not set')
657 ...(this.statistics
.runTime
&& {
658 runTime
: performance
.now() - taskPerformance
.timestamp
,
660 ...(this.statistics
.elu
&& {
661 elu
: performance
.eventLoopUtilization(taskPerformance
.elu
),
667 * Gets abortable task function.
668 * An abortable promise is built to permit the task to be aborted.
669 * @param name - The name of the task.
670 * @param taskId - The task id.
671 * @returns The abortable task function.
673 private getAbortableTaskFunction (
675 taskId
: `${string}-${string}-${string}-${string}-${string}`
676 ): TaskAsyncFunction
<Data
, Response
> {
677 return async (data
?: Data
): Promise
<Response
> =>
678 await new Promise
<Response
>(
679 (resolve
, reject
: (reason
?: unknown
) => void) => {
680 this.taskAbortFunctions
.set(taskId
, () => {
681 reject(new AbortError(`Task '${name}' id '${taskId}' aborted`))
683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
684 const taskFunction
= this.taskFunctions
.get(name
)!.taskFunction
685 if (isAsyncFunction(taskFunction
)) {
686 ;(taskFunction
as TaskAsyncFunction
<Data
, Response
>)(data
)
690 resolve((taskFunction
as TaskSyncFunction
<Data
, Response
>)(data
))
697 * Starts the worker check active interval.
699 private startCheckActive (): void {
700 this.lastTaskTimestamp
= performance
.now()
701 this.activeInterval
= setInterval(
702 this.checkActive
.bind(this),
703 (this.opts
.maxInactiveTime
?? DEFAULT_MAX_INACTIVE_TIME
) / 2
705 this.activeInterval
.unref()
709 * Stops the worker check active interval.
711 private stopCheckActive (): void {
712 if (this.activeInterval
!= null) {
713 clearInterval(this.activeInterval
)
714 this.activeInterval
= undefined
718 private updateLastTaskTimestamp (): void {
719 if (this.activeInterval
!= null) {
720 this.lastTaskTimestamp
= performance
.now()