1 import type { Worker
} from
'node:cluster'
2 import type { MessagePort
} from
'node:worker_threads'
4 import { performance
} from
'node:perf_hooks'
9 TaskFunctionProperties
,
12 } from
'../utility-types.js'
17 TaskFunctionOperationResult
,
20 } from
'./task-functions.js'
23 buildTaskFunctionProperties
,
29 import { AbortError
} from
'./abort-error.js'
31 checkTaskFunctionName
,
32 checkValidTaskFunctionObjectEntry
,
33 checkValidWorkerOptions
,
35 import { KillBehaviors
, type WorkerOptions
} from
'./worker-options.js'
37 const DEFAULT_MAX_INACTIVE_TIME
= 60000
38 const DEFAULT_WORKER_OPTIONS
: Readonly
<WorkerOptions
> = Object.freeze({
40 * The kill behavior option on this worker or its default value.
42 killBehavior
: KillBehaviors
.SOFT
,
44 * The function to call when the worker is killed.
46 killHandler
: EMPTY_FUNCTION
,
48 * The maximum time to keep this worker active while idle.
49 * The pool automatically checks and terminates this worker when the time expires.
51 maxInactiveTime
: DEFAULT_MAX_INACTIVE_TIME
,
55 * Base class that implements some shared logic for all poolifier workers.
56 * @typeParam MainWorker - Type of main worker.
57 * @typeParam Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
58 * @typeParam Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
60 export abstract class AbstractWorker
<
61 MainWorker
extends MessagePort
| Worker
,
66 * Handler id of the `activeInterval` worker activity check.
68 protected activeInterval
?: NodeJS
.Timeout
72 protected abstract readonly id
: number
74 * Timestamp of the last task processed by this worker.
76 protected lastTaskTimestamp
!: number
79 * Performance statistics computation requirements.
81 protected statistics
?: WorkerStatistics
84 * Task abort functions processed by the worker when task operation 'abort' is received.
86 protected taskAbortFunctions
: Map
<
87 `${string}-${string}-${string}-${string}-${string}`,
92 * Task function object(s) processed by the worker when the pool's `execute` method is invoked.
94 protected taskFunctions
!: Map
<string, TaskFunctionObject
<Data
, Response
>>
97 * Constructs a new poolifier worker.
98 * @param isMain - Whether this is the main worker or not.
99 * @param mainWorker - Reference to main worker.
100 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execute` method is invoked. The first function is the default function.
101 * @param opts - Options for the worker.
104 protected readonly isMain
: boolean | undefined,
105 private readonly mainWorker
: MainWorker
| null | undefined,
106 taskFunctions
: TaskFunction
<Data
, Response
> | TaskFunctions
<Data
, Response
>,
107 protected opts
: WorkerOptions
= DEFAULT_WORKER_OPTIONS
109 if (this.isMain
== null) {
110 throw new Error('isMain parameter is mandatory')
112 this.checkTaskFunctions(taskFunctions
)
113 this.taskAbortFunctions
= new Map
<
114 `${string}-${string}-${string}-${string}-${string}`,
117 this.checkWorkerOptions(this.opts
)
119 this.getMainWorker().once('message', this.handleReadyMessage
.bind(this))
124 * Adds a task function to the worker.
125 * If a task function with the same name already exists, it is replaced.
126 * @param name - The name of the task function to add.
127 * @param fn - The task function to add.
128 * @returns Whether the task function was added or not.
130 public addTaskFunction (
132 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
133 ): TaskFunctionOperationResult
{
135 checkTaskFunctionName(name
)
136 if (name
=== DEFAULT_TASK_NAME
) {
138 'Cannot add a task function with the default reserved name'
141 if (typeof fn
=== 'function') {
142 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
144 checkValidTaskFunctionObjectEntry
<Data
, Response
>(name
, fn
)
145 fn
.taskFunction
= fn
.taskFunction
.bind(this)
147 this.taskFunctions
.get(name
) ===
148 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
150 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fn
)
152 this.taskFunctions
.set(name
, fn
)
153 this.sendTaskFunctionsPropertiesToMainWorker()
154 return { status: true }
156 return { error
: error
as Error, status: false }
161 * Checks if the worker has a task function with the given name.
162 * @param name - The name of the task function to check.
163 * @returns Whether the worker has a task function with the given name or not.
165 public hasTaskFunction (name
: string): TaskFunctionOperationResult
{
167 checkTaskFunctionName(name
)
169 return { error
: error
as Error, status: false }
171 return { status: this.taskFunctions
.has(name
) }
175 * Lists the properties of the worker's task functions.
176 * @returns The properties of the worker's task functions.
178 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
179 let defaultTaskFunctionName
= DEFAULT_TASK_NAME
180 for (const [name
, fnObj
] of this.taskFunctions
) {
182 name
!== DEFAULT_TASK_NAME
&&
183 fnObj
=== this.taskFunctions
.get(DEFAULT_TASK_NAME
)
185 defaultTaskFunctionName
= name
189 const taskFunctionsProperties
: TaskFunctionProperties
[] = []
190 for (const [name
, fnObj
] of this.taskFunctions
) {
191 if (name
=== DEFAULT_TASK_NAME
|| name
=== defaultTaskFunctionName
) {
194 taskFunctionsProperties
.push(buildTaskFunctionProperties(name
, fnObj
))
197 buildTaskFunctionProperties(
199 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
201 buildTaskFunctionProperties(
202 defaultTaskFunctionName
,
203 this.taskFunctions
.get(defaultTaskFunctionName
)
205 ...taskFunctionsProperties
,
210 * Removes a task function from the worker.
211 * @param name - The name of the task function to remove.
212 * @returns Whether the task function existed and was removed or not.
214 public removeTaskFunction (name
: string): TaskFunctionOperationResult
{
216 checkTaskFunctionName(name
)
217 if (name
=== DEFAULT_TASK_NAME
) {
219 'Cannot remove the task function with the default reserved name'
223 this.taskFunctions
.get(name
) ===
224 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
227 'Cannot remove the task function used as the default task function'
230 const deleteStatus
= this.taskFunctions
.delete(name
)
231 this.sendTaskFunctionsPropertiesToMainWorker()
232 return { status: deleteStatus
}
234 return { error
: error
as Error, status: false }
239 * Sets the default task function to use in the worker.
240 * @param name - The name of the task function to use as default task function.
241 * @returns Whether the default task function was set or not.
243 public setDefaultTaskFunction (name
: string): TaskFunctionOperationResult
{
245 checkTaskFunctionName(name
)
246 if (name
=== DEFAULT_TASK_NAME
) {
248 'Cannot set the default task function reserved name as the default task function'
251 if (!this.taskFunctions
.has(name
)) {
253 'Cannot set the default task function to a non-existing task function'
256 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
257 this.taskFunctions
.set(DEFAULT_TASK_NAME
, this.taskFunctions
.get(name
)!)
258 this.sendTaskFunctionsPropertiesToMainWorker()
259 return { status: true }
261 return { error
: error
as Error, status: false }
266 * Returns the main worker.
267 * @returns Reference to the main worker.
268 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the main worker is not set.
270 protected getMainWorker (): MainWorker
{
271 if (this.mainWorker
== null) {
272 throw new Error('Main worker not set')
274 return this.mainWorker
278 * Handles a worker error.
279 * @param error - The error raised by the worker.
280 * @returns The worker error object.
282 protected abstract handleError (error
: Error): {
290 * Handles a kill message sent by the main worker.
291 * @param message - The kill message.
293 protected handleKillMessage (message
: MessageValue
<Data
>): void {
294 this.stopCheckActive()
295 if (isAsyncFunction(this.opts
.killHandler
)) {
296 ;(this.opts
.killHandler
as () => Promise
<void>)()
298 this.sendToMainWorker({ kill
: 'success' })
302 this.sendToMainWorker({ kill
: 'failure' })
306 ;(this.opts
.killHandler
as (() => void) | undefined)?.()
307 this.sendToMainWorker({ kill
: 'success' })
309 this.sendToMainWorker({ kill
: 'failure' })
315 * Handles the ready message sent by the main worker.
316 * @param message - The ready message.
318 protected abstract handleReadyMessage (message
: MessageValue
<Data
>): void
320 protected handleTaskFunctionOperationMessage (
321 message
: MessageValue
<Data
>
323 const { taskFunction
, taskFunctionOperation
, taskFunctionProperties
} =
325 if (taskFunctionProperties
== null) {
327 'Cannot handle task function operation message without task function properties'
330 let response
: TaskFunctionOperationResult
331 switch (taskFunctionOperation
) {
333 if (typeof taskFunction
!== 'string') {
335 `Cannot handle task function operation ${taskFunctionOperation} message without task function`
338 response
= this.addTaskFunction(taskFunctionProperties
.name
, {
339 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func, @typescript-eslint/no-unsafe-call
340 taskFunction
: new Function(
341 `return ${taskFunction}`
342 )() as TaskFunction
<Data
, Response
>,
343 ...(taskFunctionProperties
.priority
!= null && {
344 priority
: taskFunctionProperties
.priority
,
346 ...(taskFunctionProperties
.strategy
!= null && {
347 strategy
: taskFunctionProperties
.strategy
,
352 response
= this.setDefaultTaskFunction(taskFunctionProperties
.name
)
355 response
= this.removeTaskFunction(taskFunctionProperties
.name
)
359 error
: new Error('Unknown task operation'),
364 const { error
, status } = response
365 this.sendToMainWorker({
366 taskFunctionOperation
,
367 taskFunctionOperationStatus
: status,
368 taskFunctionProperties
,
372 name
: taskFunctionProperties
.name
,
373 ...this.handleError(error
),
380 * Worker message listener.
381 * @param message - The received message.
383 protected messageListener (message
: MessageValue
<Data
>): void {
384 this.checkMessageWorkerId(message
)
390 taskFunctionOperation
,
394 if (statistics
!= null) {
395 // Statistics message received
396 this.statistics
= statistics
397 } else if (checkActive
!= null) {
398 // Check active message received
399 checkActive
? this.startCheckActive() : this.stopCheckActive()
400 } else if (taskFunctionOperation
!= null) {
401 // Task function operation message received
402 this.handleTaskFunctionOperationMessage(message
)
403 } else if (taskId
!= null && data
!= null) {
404 // Task message received
406 } else if (taskOperation
=== 'abort' && taskId
!= null) {
407 // Abort task operation message received
408 if (this.taskAbortFunctions
.has(taskId
)) {
409 this.taskAbortFunctions
.get(taskId
)?.()
411 } else if (kill
=== true) {
412 // Kill message received
413 this.handleKillMessage(message
)
418 * Runs the given task.
419 * @param task - The task to execute.
421 protected readonly run
= (task
: Task
<Data
>): void => {
422 const { abortable
, data
, name
, taskId
} = task
423 const taskFunctionName
= name
?? DEFAULT_TASK_NAME
424 if (!this.taskFunctions
.has(taskFunctionName
)) {
425 this.sendToMainWorker({
431 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
432 new Error(`Task function '${name!}' not found`)
438 let fn
: TaskFunction
<Data
, Response
>
439 if (abortable
=== true) {
440 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
441 fn
= this.getAbortableTaskFunction(taskFunctionName
, taskId
!)
443 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
444 fn
= this.taskFunctions
.get(taskFunctionName
)!.taskFunction
446 if (isAsyncFunction(fn
)) {
447 this.runAsync(fn
as TaskAsyncFunction
<Data
, Response
>, task
)
449 this.runSync(fn
as TaskSyncFunction
<Data
, Response
>, task
)
454 * Runs the given task function asynchronously.
455 * @param fn - Task function that will be executed.
456 * @param task - Input data for the task function.
458 protected readonly runAsync
= (
459 fn
: TaskAsyncFunction
<Data
, Response
>,
462 const { abortable
, data
, name
, taskId
} = task
463 let taskPerformance
= this.beginTaskPerformance(name
)
466 taskPerformance
= this.endTaskPerformance(taskPerformance
)
467 this.sendToMainWorker({
474 .catch((error
: unknown
) => {
475 this.sendToMainWorker({
480 ...this.handleError(error
as Error),
485 this.updateLastTaskTimestamp()
486 if (abortable
=== true) {
487 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
488 this.taskAbortFunctions
.delete(taskId
!)
491 .catch(EMPTY_FUNCTION
)
495 * Runs the given task function synchronously.
496 * @param fn - Task function that will be executed.
497 * @param task - Input data for the task function.
499 protected readonly runSync
= (
500 fn
: TaskSyncFunction
<Data
, Response
>,
503 const { abortable
, data
, name
, taskId
} = task
505 let taskPerformance
= this.beginTaskPerformance(name
)
507 taskPerformance
= this.endTaskPerformance(taskPerformance
)
508 this.sendToMainWorker({
514 this.sendToMainWorker({
519 ...this.handleError(error
as Error),
523 this.updateLastTaskTimestamp()
524 if (abortable
=== true) {
525 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
526 this.taskAbortFunctions
.delete(taskId
!)
532 * Sends task functions properties to the main worker.
534 protected sendTaskFunctionsPropertiesToMainWorker (): void {
535 this.sendToMainWorker({
536 taskFunctionsProperties
: this.listTaskFunctionsProperties(),
541 * Sends a message to main worker.
542 * @param message - The response message.
544 protected abstract sendToMainWorker (
545 message
: MessageValue
<Response
, Data
>
548 private beginTaskPerformance (name
?: string): TaskPerformance
{
549 if (this.statistics
== null) {
550 throw new Error('Performance statistics computation requirements not set')
553 name
: name
?? DEFAULT_TASK_NAME
,
554 timestamp
: performance
.now(),
555 ...(this.statistics
.elu
&& {
556 elu
: performance
.eventLoopUtilization(),
562 * Checks if the worker should be terminated, because its living too long.
564 private checkActive (): void {
566 performance
.now() - this.lastTaskTimestamp
>
567 (this.opts
.maxInactiveTime
?? DEFAULT_MAX_INACTIVE_TIME
)
569 this.sendToMainWorker({ kill
: this.opts
.killBehavior
})
574 * Check if the message worker id is set and matches the worker id.
575 * @param message - The message to check.
576 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the message worker id is not set or does not match the worker id.
578 private checkMessageWorkerId (message
: MessageValue
<Data
>): void {
579 if (message
.workerId
== null) {
581 `Message worker id is not set: ${JSON.stringify(message)}`
584 if (message
.workerId
!== this.id
) {
586 `Message worker id ${message.workerId.toString()} does not match the worker id ${this.id.toString()}: ${JSON.stringify(message)}`
592 * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
593 * @param taskFunctions - The task function(s) parameter that should be checked.
595 private checkTaskFunctions (
597 | TaskFunction
<Data
, Response
>
598 | TaskFunctions
<Data
, Response
>
601 if (taskFunctions
== null) {
602 throw new Error('taskFunctions parameter is mandatory')
604 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
605 if (typeof taskFunctions
=== 'function') {
606 const fnObj
= { taskFunction
: taskFunctions
.bind(this) }
607 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fnObj
)
608 this.taskFunctions
.set(
609 typeof taskFunctions
.name
=== 'string' &&
610 taskFunctions
.name
.trim().length
> 0
615 } else if (isPlainObject(taskFunctions
)) {
616 let firstEntry
= true
617 for (let [name
, fnObj
] of Object.entries(taskFunctions
)) {
618 if (typeof fnObj
=== 'function') {
619 fnObj
= { taskFunction
: fnObj
} satisfies TaskFunctionObject
<
624 checkValidTaskFunctionObjectEntry
<Data
, Response
>(name
, fnObj
)
625 fnObj
.taskFunction
= fnObj
.taskFunction
.bind(this)
627 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fnObj
)
630 this.taskFunctions
.set(name
, fnObj
)
633 throw new Error('taskFunctions parameter object is empty')
637 'taskFunctions parameter is not a function or a plain object'
642 private checkWorkerOptions (opts
: WorkerOptions
): void {
643 checkValidWorkerOptions(opts
)
644 this.opts
= { ...DEFAULT_WORKER_OPTIONS
, ...opts
}
647 private endTaskPerformance (
648 taskPerformance
: TaskPerformance
650 if (this.statistics
== null) {
651 throw new Error('Performance statistics computation requirements not set')
655 ...(this.statistics
.runTime
&& {
656 runTime
: performance
.now() - taskPerformance
.timestamp
,
658 ...(this.statistics
.elu
&& {
659 elu
: performance
.eventLoopUtilization(taskPerformance
.elu
),
665 * Gets abortable task function.
666 * An abortable promise is built to permit the task to be aborted.
667 * @param name - The name of the task.
668 * @param taskId - The task id.
669 * @returns The abortable task function.
671 private getAbortableTaskFunction (
673 taskId
: `${string}-${string}-${string}-${string}-${string}`
674 ): TaskAsyncFunction
<Data
, Response
> {
675 return async (data
?: Data
): Promise
<Response
> =>
676 await new Promise
<Response
>(
677 (resolve
, reject
: (reason
?: unknown
) => void) => {
678 this.taskAbortFunctions
.set(taskId
, () => {
679 reject(new AbortError(`Task '${name}' id '${taskId}' aborted`))
681 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
682 const taskFunction
= this.taskFunctions
.get(name
)!.taskFunction
683 if (isAsyncFunction(taskFunction
)) {
684 ;(taskFunction
as TaskAsyncFunction
<Data
, Response
>)(data
)
688 resolve((taskFunction
as TaskSyncFunction
<Data
, Response
>)(data
))
695 * Starts the worker check active interval.
697 private startCheckActive (): void {
698 this.lastTaskTimestamp
= performance
.now()
699 this.activeInterval
= setInterval(
700 this.checkActive
.bind(this),
701 (this.opts
.maxInactiveTime
?? DEFAULT_MAX_INACTIVE_TIME
) / 2
706 * Stops the worker check active interval.
708 private stopCheckActive (): void {
709 if (this.activeInterval
!= null) {
710 clearInterval(this.activeInterval
)
711 delete this.activeInterval
715 private updateLastTaskTimestamp (): void {
716 if (this.activeInterval
!= null) {
717 this.lastTaskTimestamp
= performance
.now()