1 import type { Worker
} from
'node:cluster'
2 import type { MessagePort
} from
'node:worker_threads'
4 import { performance
} from
'node:perf_hooks'
9 TaskFunctionProperties
,
12 } from
'../utility-types.js'
17 TaskFunctionOperationResult
,
20 } from
'./task-functions.js'
23 buildTaskFunctionProperties
,
29 import { AbortError
} from
'./abort-error.js'
31 checkTaskFunctionName
,
32 checkValidTaskFunctionObjectEntry
,
33 checkValidWorkerOptions
,
35 import { KillBehaviors
, type WorkerOptions
} from
'./worker-options.js'
37 const DEFAULT_MAX_INACTIVE_TIME
= 60000
38 const DEFAULT_WORKER_OPTIONS
: Readonly
<WorkerOptions
> = Object.freeze({
40 * The kill behavior option on this worker or its default value.
42 killBehavior
: KillBehaviors
.SOFT
,
44 * The function to call when the worker is killed.
46 killHandler
: EMPTY_FUNCTION
,
48 * The maximum time to keep this worker active while idle.
49 * The pool automatically checks and terminates this worker when the time expires.
51 maxInactiveTime
: DEFAULT_MAX_INACTIVE_TIME
,
55 * Base class that implements some shared logic for all poolifier workers.
56 * @template MainWorker - Type of main worker.
57 * @template Data - Type of data this worker receives from pool's execution. This can only be structured-cloneable data.
58 * @template Response - Type of response the worker sends back to the main worker. This can only be structured-cloneable data.
60 export abstract class AbstractWorker
<
61 MainWorker
extends MessagePort
| Worker
,
66 * Handler id of the `activeInterval` worker activity check.
68 protected activeInterval
?: NodeJS
.Timeout
72 protected abstract readonly id
: number
74 * Timestamp of the last task processed by this worker.
76 protected lastTaskTimestamp
!: number
79 * Performance statistics computation requirements.
81 protected statistics
?: WorkerStatistics
84 * Task abort functions processed by the worker when task operation 'abort' is received.
86 protected taskAbortFunctions
: Map
<
87 `${string}-${string}-${string}-${string}-${string}`,
92 * Task function object(s) processed by the worker when the pool's `execute` method is invoked.
94 protected taskFunctions
!: Map
<string, TaskFunctionObject
<Data
, Response
>>
97 * Constructs a new poolifier worker.
98 * @param isMain - Whether this is the main worker or not.
99 * @param mainWorker - Reference to main worker.
100 * @param taskFunctions - Task function(s) processed by the worker when the pool's `execute` method is invoked. The first function is the default function.
101 * @param opts - Options for the worker.
104 protected readonly isMain
: boolean | undefined,
105 private readonly mainWorker
: MainWorker
| null | undefined,
106 taskFunctions
: TaskFunction
<Data
, Response
> | TaskFunctions
<Data
, Response
>,
107 protected opts
: WorkerOptions
= DEFAULT_WORKER_OPTIONS
109 if (this.isMain
== null) {
110 throw new Error('isMain parameter is mandatory')
112 this.checkTaskFunctions(taskFunctions
)
113 this.taskAbortFunctions
= new Map
<
114 `${string}-${string}-${string}-${string}-${string}`,
117 this.checkWorkerOptions(this.opts
)
119 this.getMainWorker().once('message', this.handleReadyMessage
.bind(this))
124 * Adds a task function to the worker.
125 * If a task function with the same name already exists, it is replaced.
126 * @param name - The name of the task function to add.
127 * @param fn - The task function to add.
128 * @returns Whether the task function was added or not.
130 public addTaskFunction (
132 fn
: TaskFunction
<Data
, Response
> | TaskFunctionObject
<Data
, Response
>
133 ): TaskFunctionOperationResult
{
135 checkTaskFunctionName(name
)
136 if (name
=== DEFAULT_TASK_NAME
) {
138 'Cannot add a task function with the default reserved name'
141 if (typeof fn
=== 'function') {
142 fn
= { taskFunction
: fn
} satisfies TaskFunctionObject
<Data
, Response
>
144 checkValidTaskFunctionObjectEntry
<Data
, Response
>(name
, fn
)
145 fn
.taskFunction
= fn
.taskFunction
.bind(this)
147 this.taskFunctions
.get(name
) ===
148 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
150 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fn
)
152 this.taskFunctions
.set(name
, fn
)
153 this.sendTaskFunctionsPropertiesToMainWorker()
154 return { status: true }
156 return { error
: error
as Error, status: false }
161 * Checks if the worker has a task function with the given name.
162 * @param name - The name of the task function to check.
163 * @returns Whether the worker has a task function with the given name or not.
165 public hasTaskFunction (name
: string): TaskFunctionOperationResult
{
167 checkTaskFunctionName(name
)
169 return { error
: error
as Error, status: false }
171 return { status: this.taskFunctions
.has(name
) }
175 * Lists the properties of the worker's task functions.
176 * @returns The properties of the worker's task functions.
178 public listTaskFunctionsProperties (): TaskFunctionProperties
[] {
179 let defaultTaskFunctionName
= DEFAULT_TASK_NAME
180 for (const [name
, fnObj
] of this.taskFunctions
) {
182 name
!== DEFAULT_TASK_NAME
&&
183 fnObj
=== this.taskFunctions
.get(DEFAULT_TASK_NAME
)
185 defaultTaskFunctionName
= name
189 const taskFunctionsProperties
: TaskFunctionProperties
[] = []
190 for (const [name
, fnObj
] of this.taskFunctions
) {
191 if (name
=== DEFAULT_TASK_NAME
|| name
=== defaultTaskFunctionName
) {
194 taskFunctionsProperties
.push(buildTaskFunctionProperties(name
, fnObj
))
197 buildTaskFunctionProperties(
199 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
201 buildTaskFunctionProperties(
202 defaultTaskFunctionName
,
203 this.taskFunctions
.get(defaultTaskFunctionName
)
205 ...taskFunctionsProperties
,
210 * Removes a task function from the worker.
211 * @param name - The name of the task function to remove.
212 * @returns Whether the task function existed and was removed or not.
214 public removeTaskFunction (name
: string): TaskFunctionOperationResult
{
216 checkTaskFunctionName(name
)
217 if (name
=== DEFAULT_TASK_NAME
) {
219 'Cannot remove the task function with the default reserved name'
223 this.taskFunctions
.get(name
) ===
224 this.taskFunctions
.get(DEFAULT_TASK_NAME
)
227 'Cannot remove the task function used as the default task function'
230 const deleteStatus
= this.taskFunctions
.delete(name
)
231 this.sendTaskFunctionsPropertiesToMainWorker()
232 return { status: deleteStatus
}
234 return { error
: error
as Error, status: false }
239 * Sets the default task function to use in the worker.
240 * @param name - The name of the task function to use as default task function.
241 * @returns Whether the default task function was set or not.
243 public setDefaultTaskFunction (name
: string): TaskFunctionOperationResult
{
245 checkTaskFunctionName(name
)
246 if (name
=== DEFAULT_TASK_NAME
) {
248 'Cannot set the default task function reserved name as the default task function'
251 if (!this.taskFunctions
.has(name
)) {
253 'Cannot set the default task function to a non-existing task function'
256 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
257 this.taskFunctions
.set(DEFAULT_TASK_NAME
, this.taskFunctions
.get(name
)!)
258 this.sendTaskFunctionsPropertiesToMainWorker()
259 return { status: true }
261 return { error
: error
as Error, status: false }
266 * Returns the main worker.
267 * @returns Reference to the main worker.
268 * @throws {Error} If the main worker is not set.
270 protected getMainWorker (): MainWorker
{
271 if (this.mainWorker
== null) {
272 throw new Error('Main worker not set')
274 return this.mainWorker
278 * Handles a worker error.
279 * @param error - The error raised by the worker.
280 * @returns The worker error object.
282 protected abstract handleError (error
: Error): {
290 * Handles a kill message sent by the main worker.
291 * @param message - The kill message.
293 protected handleKillMessage (message
: MessageValue
<Data
>): void {
294 this.stopCheckActive()
296 const result
= this.opts
.killHandler
?.()
297 if (result
instanceof Promise
) {
300 this.sendToMainWorker({ kill
: 'success' })
304 this.sendToMainWorker({ kill
: 'failure' })
307 this.sendToMainWorker({ kill
: 'success' })
310 this.sendToMainWorker({ kill
: 'failure' })
315 * Handles the ready message sent by the main worker.
316 * @param message - The ready message.
318 protected abstract handleReadyMessage (message
: MessageValue
<Data
>): void
320 protected handleTaskFunctionOperationMessage (
321 message
: MessageValue
<Data
>
323 const { taskFunction
, taskFunctionOperation
, taskFunctionProperties
} =
325 if (taskFunctionProperties
== null) {
327 'Cannot handle task function operation message without task function properties'
330 let response
: TaskFunctionOperationResult
331 switch (taskFunctionOperation
) {
333 if (typeof taskFunction
!== 'string') {
335 `Cannot handle task function operation ${taskFunctionOperation} message without task function`
338 response
= this.addTaskFunction(taskFunctionProperties
.name
, {
339 // eslint-disable-next-line @typescript-eslint/no-implied-eval, no-new-func, @typescript-eslint/no-unsafe-call
340 taskFunction
: new Function(
341 `return (${taskFunction})`
342 )() as TaskFunction
<Data
, Response
>,
343 ...(taskFunctionProperties
.priority
!= null && {
344 priority
: taskFunctionProperties
.priority
,
346 ...(taskFunctionProperties
.strategy
!= null && {
347 strategy
: taskFunctionProperties
.strategy
,
349 ...(taskFunctionProperties
.workerNodeKeys
!= null && {
350 workerNodeKeys
: taskFunctionProperties
.workerNodeKeys
,
355 response
= this.setDefaultTaskFunction(taskFunctionProperties
.name
)
358 response
= this.removeTaskFunction(taskFunctionProperties
.name
)
363 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
364 `Unknown task function operation: ${taskFunctionOperation!}`
370 const { error
, status } = response
371 this.sendToMainWorker({
372 taskFunctionOperation
,
373 taskFunctionOperationStatus
: status,
374 taskFunctionProperties
,
378 name
: taskFunctionProperties
.name
,
379 ...this.handleError(error
),
386 * Worker message listener.
387 * @param message - The received message.
389 protected messageListener (message
: MessageValue
<Data
>): void {
390 this.checkMessageWorkerId(message
)
396 taskFunctionOperation
,
400 if (statistics
!= null) {
401 // Statistics message received
402 this.statistics
= statistics
403 } else if (checkActive
!= null) {
404 // Check active message received
405 checkActive
? this.startCheckActive() : this.stopCheckActive()
406 } else if (taskFunctionOperation
!= null) {
407 // Task function operation message received
408 this.handleTaskFunctionOperationMessage(message
)
409 } else if (taskId
!= null && data
!= null) {
410 // Task message received
412 } else if (taskOperation
=== 'abort' && taskId
!= null) {
413 // Abort task operation message received
414 if (this.taskAbortFunctions
.has(taskId
)) {
415 this.taskAbortFunctions
.get(taskId
)?.()
417 } else if (kill
=== true) {
418 // Kill message received
419 this.handleKillMessage(message
)
424 * Runs the given task.
425 * @param task - The task to execute.
427 protected readonly run
= (task
: Task
<Data
>): void => {
428 const { abortable
, data
, name
, taskId
} = task
429 const taskFunctionName
= name
?? DEFAULT_TASK_NAME
430 if (!this.taskFunctions
.has(taskFunctionName
)) {
431 this.sendToMainWorker({
437 new Error(`Task function '${taskFunctionName}' not found`)
443 let fn
: TaskFunction
<Data
, Response
>
444 if (abortable
=== true) {
445 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
446 fn
= this.getAbortableTaskFunction(taskFunctionName
, taskId
!)
448 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
449 fn
= this.taskFunctions
.get(taskFunctionName
)!.taskFunction
451 if (isAsyncFunction(fn
)) {
452 this.runAsync(fn
as TaskAsyncFunction
<Data
, Response
>, task
)
454 this.runSync(fn
as TaskSyncFunction
<Data
, Response
>, task
)
459 * Runs the given task function asynchronously.
460 * @param fn - Task function that will be executed.
461 * @param task - Input data for the task function.
463 protected readonly runAsync
= (
464 fn
: TaskAsyncFunction
<Data
, Response
>,
467 const { abortable
, data
, name
, taskId
} = task
468 let taskPerformance
= this.beginTaskPerformance(name
)
471 taskPerformance
= this.endTaskPerformance(taskPerformance
)
472 this.sendToMainWorker({
479 .catch((error
: unknown
) => {
480 this.sendToMainWorker({
485 ...this.handleError(error
as Error),
490 this.updateLastTaskTimestamp()
491 if (abortable
=== true) {
492 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
493 this.taskAbortFunctions
.delete(taskId
!)
496 .catch(EMPTY_FUNCTION
)
500 * Runs the given task function synchronously.
501 * @param fn - Task function that will be executed.
502 * @param task - Input data for the task function.
504 protected readonly runSync
= (
505 fn
: TaskSyncFunction
<Data
, Response
>,
508 const { abortable
, data
, name
, taskId
} = task
510 let taskPerformance
= this.beginTaskPerformance(name
)
512 taskPerformance
= this.endTaskPerformance(taskPerformance
)
513 this.sendToMainWorker({
519 this.sendToMainWorker({
524 ...this.handleError(error
as Error),
528 this.updateLastTaskTimestamp()
529 if (abortable
=== true) {
530 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
531 this.taskAbortFunctions
.delete(taskId
!)
537 * Sends task functions properties to the main worker.
539 protected sendTaskFunctionsPropertiesToMainWorker (): void {
540 this.sendToMainWorker({
541 taskFunctionsProperties
: this.listTaskFunctionsProperties(),
546 * Sends a message to main worker.
547 * @param message - The response message.
549 protected abstract sendToMainWorker (
550 message
: MessageValue
<Response
, Data
>
553 private beginTaskPerformance (name
?: string): TaskPerformance
{
554 if (this.statistics
== null) {
555 throw new Error('Performance statistics computation requirements not set')
558 name
: name
?? DEFAULT_TASK_NAME
,
559 timestamp
: performance
.now(),
560 ...(this.statistics
.elu
&& {
561 elu
: performance
.eventLoopUtilization(),
567 * Checks if the worker should be terminated, because its living too long.
569 private checkActive (): void {
571 performance
.now() - this.lastTaskTimestamp
>
572 (this.opts
.maxInactiveTime
?? DEFAULT_MAX_INACTIVE_TIME
)
574 this.sendToMainWorker({ kill
: this.opts
.killBehavior
})
579 * Check if the message worker id is set and matches the worker id.
580 * @param message - The message to check.
581 * @throws {Error} If the message worker id is not set or does not match the worker id.
583 private checkMessageWorkerId (message
: MessageValue
<Data
>): void {
584 if (message
.workerId
== null) {
586 `Message worker id is not set: ${JSON.stringify(message)}`
589 if (message
.workerId
!== this.id
) {
591 `Message worker id ${message.workerId.toString()} does not match the worker id ${this.id.toString()}: ${JSON.stringify(message)}`
597 * Checks if the `taskFunctions` parameter is passed to the constructor and valid.
598 * @param taskFunctions - The task function(s) parameter that should be checked.
600 private checkTaskFunctions (
602 | TaskFunction
<Data
, Response
>
603 | TaskFunctions
<Data
, Response
>
606 if (taskFunctions
== null) {
607 throw new Error('taskFunctions parameter is mandatory')
609 this.taskFunctions
= new Map
<string, TaskFunctionObject
<Data
, Response
>>()
610 if (typeof taskFunctions
=== 'function') {
611 const fnObj
= { taskFunction
: taskFunctions
.bind(this) }
612 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fnObj
)
613 this.taskFunctions
.set(
614 typeof taskFunctions
.name
=== 'string' &&
615 taskFunctions
.name
.trim().length
> 0
620 } else if (isPlainObject(taskFunctions
)) {
621 let firstEntry
= true
622 for (let [name
, fnObj
] of Object.entries(taskFunctions
)) {
623 if (typeof fnObj
=== 'function') {
624 fnObj
= { taskFunction
: fnObj
} satisfies TaskFunctionObject
<
629 checkValidTaskFunctionObjectEntry
<Data
, Response
>(name
, fnObj
)
630 fnObj
.taskFunction
= fnObj
.taskFunction
.bind(this)
632 this.taskFunctions
.set(DEFAULT_TASK_NAME
, fnObj
)
635 this.taskFunctions
.set(name
, fnObj
)
638 throw new Error('taskFunctions parameter object is empty')
642 'taskFunctions parameter is not a function or a plain object'
647 private checkWorkerOptions (opts
: WorkerOptions
): void {
648 checkValidWorkerOptions(opts
)
649 this.opts
= { ...DEFAULT_WORKER_OPTIONS
, ...opts
}
652 private endTaskPerformance (
653 taskPerformance
: TaskPerformance
655 if (this.statistics
== null) {
656 throw new Error('Performance statistics computation requirements not set')
660 ...(this.statistics
.runTime
&& {
661 runTime
: performance
.now() - taskPerformance
.timestamp
,
663 ...(this.statistics
.elu
&& {
664 elu
: performance
.eventLoopUtilization(taskPerformance
.elu
),
670 * Gets abortable task function.
671 * An abortable promise is built to permit the task to be aborted.
672 * @param name - The name of the task.
673 * @param taskId - The task id.
674 * @returns The abortable task function.
676 private getAbortableTaskFunction (
678 taskId
: `${string}-${string}-${string}-${string}-${string}`
679 ): TaskAsyncFunction
<Data
, Response
> {
680 return async (data
?: Data
): Promise
<Response
> =>
681 await new Promise
<Response
>(
682 (resolve
, reject
: (reason
?: unknown
) => void) => {
683 this.taskAbortFunctions
.set(taskId
, () => {
684 reject(new AbortError(`Task '${name}' id '${taskId}' aborted`))
686 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
687 const taskFunction
= this.taskFunctions
.get(name
)!.taskFunction
688 if (isAsyncFunction(taskFunction
)) {
689 ;(taskFunction
as TaskAsyncFunction
<Data
, Response
>)(data
)
693 resolve((taskFunction
as TaskSyncFunction
<Data
, Response
>)(data
))
700 * Starts the worker check active interval.
702 private startCheckActive (): void {
703 this.lastTaskTimestamp
= performance
.now()
704 this.activeInterval
= setInterval(
705 this.checkActive
.bind(this),
706 (this.opts
.maxInactiveTime
?? DEFAULT_MAX_INACTIVE_TIME
) / 2
708 this.activeInterval
.unref()
712 * Stops the worker check active interval.
714 private stopCheckActive (): void {
715 if (this.activeInterval
!= null) {
716 clearInterval(this.activeInterval
)
717 this.activeInterval
= undefined
721 private updateLastTaskTimestamp (): void {
722 if (this.activeInterval
!= null) {
723 this.lastTaskTimestamp
= performance
.now()