1 import { randomUUID
} from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { existsSync
} from
'node:fs'
4 import { type TransferListItem
} from
'node:worker_threads'
7 PromiseResponseWrapper
,
9 } from
'../utility-types'
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
18 updateMeasurementStatistics
20 import { KillBehaviors
} from
'../worker/worker-options'
29 type TasksQueueOptions
40 type MeasurementStatisticsRequirements
,
42 WorkerChoiceStrategies
,
43 type WorkerChoiceStrategy
,
44 type WorkerChoiceStrategyOptions
45 } from
'./selection-strategies/selection-strategies-types'
46 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
47 import { version
} from
'./version'
48 import { WorkerNode
} from
'./worker-node'
51 * Base class that implements some shared logic for all poolifier pools.
53 * @typeParam Worker - Type of worker which manages this pool.
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
57 export abstract class AbstractPool
<
58 Worker
extends IWorker
,
61 > implements IPool
<Worker
, Data
, Response
> {
63 public readonly workerNodes
: Array<IWorkerNode
<Worker
, Data
>> = []
66 public readonly emitter
?: PoolEmitter
69 * The task execution response promise map.
71 * - `key`: The message id of each submitted task.
72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
76 protected promiseResponseMap
: Map
<string, PromiseResponseWrapper
<Response
>> =
77 new Map
<string, PromiseResponseWrapper
<Response
>>()
80 * Worker choice strategy context referencing a worker choice algorithm implementation.
82 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
89 * Whether the pool is starting or not.
91 private readonly starting
: boolean
93 * The start timestamp of the pool.
95 private readonly startTimestamp
98 * Constructs a new poolifier pool.
100 * @param numberOfWorkers - Number of workers that this pool should manage.
101 * @param filePath - Path to the worker file.
102 * @param opts - Options for the pool.
105 protected readonly numberOfWorkers
: number,
106 protected readonly filePath
: string,
107 protected readonly opts
: PoolOptions
<Worker
>
109 if (!this.isMain()) {
110 throw new Error('Cannot start a pool from a worker!')
112 this.checkNumberOfWorkers(this.numberOfWorkers
)
113 this.checkFilePath(this.filePath
)
114 this.checkPoolOptions(this.opts
)
116 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
117 this.executeTask
= this.executeTask
.bind(this)
118 this.enqueueTask
= this.enqueueTask
.bind(this)
119 this.dequeueTask
= this.dequeueTask
.bind(this)
120 this.checkAndEmitEvents
= this.checkAndEmitEvents
.bind(this)
122 if (this.opts
.enableEvents
=== true) {
123 this.emitter
= new PoolEmitter()
125 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
131 this.opts
.workerChoiceStrategy
,
132 this.opts
.workerChoiceStrategyOptions
139 this.starting
= false
141 this.startTimestamp
= performance
.now()
144 private checkFilePath (filePath
: string): void {
147 typeof filePath
!== 'string' ||
148 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
150 throw new Error('Please specify a file with a worker implementation')
152 if (!existsSync(filePath
)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
157 private checkNumberOfWorkers (numberOfWorkers
: number): void {
158 if (numberOfWorkers
== null) {
160 'Cannot instantiate a pool without specifying the number of workers'
162 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
164 'Cannot instantiate a pool with a non safe integer number of workers'
166 } else if (numberOfWorkers
< 0) {
167 throw new RangeError(
168 'Cannot instantiate a pool with a negative number of workers'
170 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
175 protected checkDynamicPoolSize (min
: number, max
: number): void {
176 if (this.type === PoolTypes
.dynamic
) {
179 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 } else if (!Number.isSafeInteger(max
)) {
183 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 } else if (min
> max
) {
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 } else if (max
=== 0) {
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
193 } else if (min
=== max
) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
201 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
202 if (isPlainObject(opts
)) {
203 this.opts
.workerChoiceStrategy
=
204 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
205 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
206 this.opts
.workerChoiceStrategyOptions
=
207 opts
.workerChoiceStrategyOptions
??
208 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
209 this.checkValidWorkerChoiceStrategyOptions(
210 this.opts
.workerChoiceStrategyOptions
212 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
213 this.opts
.enableEvents
= opts
.enableEvents
?? true
214 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
215 if (this.opts
.enableTasksQueue
) {
216 this.checkValidTasksQueueOptions(
217 opts
.tasksQueueOptions
as TasksQueueOptions
219 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
220 opts
.tasksQueueOptions
as TasksQueueOptions
224 throw new TypeError('Invalid pool options: must be a plain object')
228 private checkValidWorkerChoiceStrategy (
229 workerChoiceStrategy
: WorkerChoiceStrategy
231 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
233 `Invalid worker choice strategy '${workerChoiceStrategy}'`
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
241 if (!isPlainObject(workerChoiceStrategyOptions
)) {
243 'Invalid worker choice strategy options: must be a plain object'
247 workerChoiceStrategyOptions
.weights
!= null &&
248 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
251 'Invalid worker choice strategy options: must have a weight for each worker node'
255 workerChoiceStrategyOptions
.measurement
!= null &&
256 !Object.values(Measurements
).includes(
257 workerChoiceStrategyOptions
.measurement
261 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
266 private checkValidTasksQueueOptions (
267 tasksQueueOptions
: TasksQueueOptions
269 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
270 throw new TypeError('Invalid tasks queue options: must be a plain object')
273 tasksQueueOptions
?.concurrency
!= null &&
274 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
277 'Invalid worker tasks concurrency: must be an integer'
281 tasksQueueOptions
?.concurrency
!= null &&
282 tasksQueueOptions
.concurrency
<= 0
285 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
290 private startPool (): void {
292 this.workerNodes
.reduce(
293 (accumulator
, workerNode
) =>
294 !workerNode
.info
.dynamic
? accumulator
+ 1 : accumulator
,
296 ) < this.numberOfWorkers
298 this.createAndSetupWorkerNode()
303 public get
info (): PoolInfo
{
309 strategy
: this.opts
.workerChoiceStrategy
as WorkerChoiceStrategy
,
310 minSize
: this.minSize
,
311 maxSize
: this.maxSize
,
312 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
313 .runTime
.aggregate
&&
314 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
315 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
316 workerNodes
: this.workerNodes
.length
,
317 idleWorkerNodes
: this.workerNodes
.reduce(
318 (accumulator
, workerNode
) =>
319 workerNode
.usage
.tasks
.executing
=== 0
324 busyWorkerNodes
: this.workerNodes
.reduce(
325 (accumulator
, workerNode
) =>
326 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
329 executedTasks
: this.workerNodes
.reduce(
330 (accumulator
, workerNode
) =>
331 accumulator
+ workerNode
.usage
.tasks
.executed
,
334 executingTasks
: this.workerNodes
.reduce(
335 (accumulator
, workerNode
) =>
336 accumulator
+ workerNode
.usage
.tasks
.executing
,
339 ...(this.opts
.enableTasksQueue
=== true && {
340 queuedTasks
: this.workerNodes
.reduce(
341 (accumulator
, workerNode
) =>
342 accumulator
+ workerNode
.usage
.tasks
.queued
,
346 ...(this.opts
.enableTasksQueue
=== true && {
347 maxQueuedTasks
: this.workerNodes
.reduce(
348 (accumulator
, workerNode
) =>
349 accumulator
+ (workerNode
.usage
.tasks
?.maxQueued
?? 0),
353 failedTasks
: this.workerNodes
.reduce(
354 (accumulator
, workerNode
) =>
355 accumulator
+ workerNode
.usage
.tasks
.failed
,
358 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
359 .runTime
.aggregate
&& {
363 ...this.workerNodes
.map(
364 workerNode
=> workerNode
.usage
.runTime
?.minimum
?? Infinity
370 ...this.workerNodes
.map(
371 workerNode
=> workerNode
.usage
.runTime
?.maximum
?? -Infinity
376 this.workerNodes
.reduce(
377 (accumulator
, workerNode
) =>
378 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
381 this.workerNodes
.reduce(
382 (accumulator
, workerNode
) =>
383 accumulator
+ (workerNode
.usage
.tasks
?.executed
?? 0),
387 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
391 this.workerNodes
.map(
392 workerNode
=> workerNode
.usage
.runTime
?.median
?? 0
399 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
400 .waitTime
.aggregate
&& {
404 ...this.workerNodes
.map(
405 workerNode
=> workerNode
.usage
.waitTime
?.minimum
?? Infinity
411 ...this.workerNodes
.map(
412 workerNode
=> workerNode
.usage
.waitTime
?.maximum
?? -Infinity
417 this.workerNodes
.reduce(
418 (accumulator
, workerNode
) =>
419 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
422 this.workerNodes
.reduce(
423 (accumulator
, workerNode
) =>
424 accumulator
+ (workerNode
.usage
.tasks
?.executed
?? 0),
428 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
429 .waitTime
.median
&& {
432 this.workerNodes
.map(
433 workerNode
=> workerNode
.usage
.waitTime
?.median
?? 0
444 * The pool readiness boolean status.
446 private get
ready (): boolean {
448 this.workerNodes
.reduce(
449 (accumulator
, workerNode
) =>
450 !workerNode
.info
.dynamic
&& workerNode
.info
.ready
459 * The approximate pool utilization.
461 * @returns The pool utilization.
463 private get
utilization (): number {
464 const poolTimeCapacity
=
465 (performance
.now() - this.startTimestamp
) * this.maxSize
466 const totalTasksRunTime
= this.workerNodes
.reduce(
467 (accumulator
, workerNode
) =>
468 accumulator
+ (workerNode
.usage
.runTime
?.aggregate
?? 0),
471 const totalTasksWaitTime
= this.workerNodes
.reduce(
472 (accumulator
, workerNode
) =>
473 accumulator
+ (workerNode
.usage
.waitTime
?.aggregate
?? 0),
476 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolTimeCapacity
482 * If it is `'dynamic'`, it provides the `max` property.
484 protected abstract get
type (): PoolType
489 protected abstract get
worker (): WorkerType
492 * The pool minimum size.
494 protected abstract get
minSize (): number
497 * The pool maximum size.
499 protected abstract get
maxSize (): number
502 * Checks if the worker id sent in the received message from a worker is valid.
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 private checkMessageWorkerId (message
: MessageValue
<Response
>): void {
509 message
.workerId
!= null &&
510 this.getWorkerNodeKeyByWorkerId(message
.workerId
) === -1
513 `Worker message received from unknown worker '${message.workerId}'`
519 * Gets the given worker its worker node key.
521 * @param worker - The worker.
522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
524 private getWorkerNodeKeyByWorker (worker
: Worker
): number {
525 return this.workerNodes
.findIndex(
526 workerNode
=> workerNode
.worker
=== worker
531 * Gets the worker node key given its worker id.
533 * @param workerId - The worker id.
534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
536 private getWorkerNodeKeyByWorkerId (workerId
: number): number {
537 return this.workerNodes
.findIndex(
538 workerNode
=> workerNode
.info
.id
=== workerId
543 public setWorkerChoiceStrategy (
544 workerChoiceStrategy
: WorkerChoiceStrategy
,
545 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
547 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
548 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
549 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
550 this.opts
.workerChoiceStrategy
552 if (workerChoiceStrategyOptions
!= null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
555 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
556 workerNode
.resetUsage()
557 this.sendWorkerStatisticsMessageToWorker(workerNodeKey
)
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
566 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
567 this.workerChoiceStrategyContext
.setOptions(
568 this.opts
.workerChoiceStrategyOptions
573 public enableTasksQueue (
575 tasksQueueOptions
?: TasksQueueOptions
577 if (this.opts
.enableTasksQueue
=== true && !enable
) {
578 this.flushTasksQueues()
580 this.opts
.enableTasksQueue
= enable
581 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
585 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
586 if (this.opts
.enableTasksQueue
=== true) {
587 this.checkValidTasksQueueOptions(tasksQueueOptions
)
588 this.opts
.tasksQueueOptions
=
589 this.buildTasksQueueOptions(tasksQueueOptions
)
590 } else if (this.opts
.tasksQueueOptions
!= null) {
591 delete this.opts
.tasksQueueOptions
595 private buildTasksQueueOptions (
596 tasksQueueOptions
: TasksQueueOptions
597 ): TasksQueueOptions
{
599 concurrency
: tasksQueueOptions
?.concurrency
?? 1
604 * Whether the pool is full or not.
606 * The pool filling boolean status.
608 protected get
full (): boolean {
609 return this.workerNodes
.length
>= this.maxSize
613 * Whether the pool is busy or not.
615 * The pool busyness boolean status.
617 protected abstract get
busy (): boolean
620 * Whether worker nodes are executing concurrently their tasks quota or not.
622 * @returns Worker nodes busyness boolean status.
624 protected internalBusy (): boolean {
625 if (this.opts
.enableTasksQueue
=== true) {
627 this.workerNodes
.findIndex(
629 workerNode
.info
.ready
&&
630 workerNode
.usage
.tasks
.executing
<
631 (this.opts
.tasksQueueOptions
?.concurrency
as number)
636 this.workerNodes
.findIndex(
638 workerNode
.info
.ready
&& workerNode
.usage
.tasks
.executing
=== 0
645 public async execute (
648 transferList
?: TransferListItem
[]
649 ): Promise
<Response
> {
650 return await new Promise
<Response
>((resolve
, reject
) => {
651 if (name
!= null && typeof name
!== 'string') {
652 reject(new TypeError('name argument must be a string'))
654 if (transferList
!= null && !Array.isArray(transferList
)) {
655 reject(new TypeError('transferList argument must be an array'))
657 const timestamp
= performance
.now()
658 const workerNodeKey
= this.chooseWorkerNode()
659 const task
: Task
<Data
> = {
660 name
: name
?? DEFAULT_TASK_NAME
,
661 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
662 data
: data
?? ({} as Data
),
665 workerId
: this.getWorkerInfo(workerNodeKey
).id
as number,
668 this.promiseResponseMap
.set(task
.taskId
as string, {
674 this.opts
.enableTasksQueue
=== false ||
675 (this.opts
.enableTasksQueue
=== true &&
676 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
677 (this.opts
.tasksQueueOptions
?.concurrency
as number))
679 this.executeTask(workerNodeKey
, task
)
681 this.enqueueTask(workerNodeKey
, task
)
683 this.checkAndEmitEvents()
688 public async destroy (): Promise
<void> {
690 this.workerNodes
.map(async (_
, workerNodeKey
) => {
691 await this.destroyWorkerNode(workerNodeKey
)
697 * Terminates the worker node given its worker node key.
699 * @param workerNodeKey - The worker node key.
701 protected abstract destroyWorkerNode (workerNodeKey
: number): Promise
<void>
704 * Setup hook to execute code before worker nodes are created in the abstract constructor.
709 protected setupHook (): void {
710 // Intentionally empty
714 * Should return whether the worker is the main worker or not.
716 protected abstract isMain (): boolean
719 * Hook executed before the worker task execution.
722 * @param workerNodeKey - The worker node key.
723 * @param task - The task to execute.
725 protected beforeTaskExecutionHook (
726 workerNodeKey
: number,
729 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
730 ++workerUsage
.tasks
.executing
731 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
732 const taskWorkerUsage
= this.workerNodes
[workerNodeKey
].getTaskWorkerUsage(
735 ++taskWorkerUsage
.tasks
.executing
736 this.updateWaitTimeWorkerUsage(taskWorkerUsage
, task
)
740 * Hook executed after the worker task execution.
743 * @param workerNodeKey - The worker node key.
744 * @param message - The received message.
746 protected afterTaskExecutionHook (
747 workerNodeKey
: number,
748 message
: MessageValue
<Response
>
750 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
751 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
752 this.updateRunTimeWorkerUsage(workerUsage
, message
)
753 this.updateEluWorkerUsage(workerUsage
, message
)
754 const taskWorkerUsage
= this.workerNodes
[workerNodeKey
].getTaskWorkerUsage(
755 message
.taskPerformance
?.name
?? DEFAULT_TASK_NAME
757 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage
, message
)
758 this.updateRunTimeWorkerUsage(taskWorkerUsage
, message
)
759 this.updateEluWorkerUsage(taskWorkerUsage
, message
)
762 private updateTaskStatisticsWorkerUsage (
763 workerUsage
: WorkerUsage
,
764 message
: MessageValue
<Response
>
766 const workerTaskStatistics
= workerUsage
.tasks
767 --workerTaskStatistics
.executing
768 if (message
.taskError
== null) {
769 ++workerTaskStatistics
.executed
771 ++workerTaskStatistics
.failed
775 private updateRunTimeWorkerUsage (
776 workerUsage
: WorkerUsage
,
777 message
: MessageValue
<Response
>
779 updateMeasurementStatistics(
781 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
,
782 message
.taskPerformance
?.runTime
?? 0,
783 workerUsage
.tasks
.executed
787 private updateWaitTimeWorkerUsage (
788 workerUsage
: WorkerUsage
,
791 const timestamp
= performance
.now()
792 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
793 updateMeasurementStatistics(
794 workerUsage
.waitTime
,
795 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
,
797 workerUsage
.tasks
.executed
801 private updateEluWorkerUsage (
802 workerUsage
: WorkerUsage
,
803 message
: MessageValue
<Response
>
805 const eluTaskStatisticsRequirements
: MeasurementStatisticsRequirements
=
806 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
807 updateMeasurementStatistics(
808 workerUsage
.elu
.active
,
809 eluTaskStatisticsRequirements
,
810 message
.taskPerformance
?.elu
?.active
?? 0,
811 workerUsage
.tasks
.executed
813 updateMeasurementStatistics(
814 workerUsage
.elu
.idle
,
815 eluTaskStatisticsRequirements
,
816 message
.taskPerformance
?.elu
?.idle
?? 0,
817 workerUsage
.tasks
.executed
819 if (eluTaskStatisticsRequirements
.aggregate
) {
820 if (message
.taskPerformance
?.elu
!= null) {
821 if (workerUsage
.elu
.utilization
!= null) {
822 workerUsage
.elu
.utilization
=
823 (workerUsage
.elu
.utilization
+
824 message
.taskPerformance
.elu
.utilization
) /
827 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
834 * Chooses a worker node for the next task.
836 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
838 * @returns The chosen worker node key
840 private chooseWorkerNode (): number {
841 if (this.shallCreateDynamicWorker()) {
842 const workerNodeKey
= this.createAndSetupDynamicWorkerNode()
844 this.workerChoiceStrategyContext
.getStrategyPolicy().useDynamicWorker
849 return this.workerChoiceStrategyContext
.execute()
853 * Conditions for dynamic worker creation.
855 * @returns Whether to create a dynamic worker or not.
857 private shallCreateDynamicWorker (): boolean {
858 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
862 * Sends a message to worker given its worker node key.
864 * @param workerNodeKey - The worker node key.
865 * @param message - The message.
866 * @param transferList - The optional array of transferable objects.
868 protected abstract sendToWorker (
869 workerNodeKey
: number,
870 message
: MessageValue
<Data
>,
871 transferList
?: TransferListItem
[]
875 * Creates a new worker.
877 * @returns Newly created worker.
879 protected abstract createWorker (): Worker
882 * Creates a new, completely set up worker node.
884 * @returns New, completely set up worker node key.
886 protected createAndSetupWorkerNode (): number {
887 const worker
= this.createWorker()
889 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
890 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
891 worker
.on('error', error
=> {
892 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
893 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
894 workerInfo
.ready
= false
895 this.workerNodes
[workerNodeKey
].closeChannel()
896 this.emitter
?.emit(PoolEvents
.error
, error
)
897 if (this.opts
.restartWorkerOnError
=== true && !this.starting
) {
898 if (workerInfo
.dynamic
) {
899 this.createAndSetupDynamicWorkerNode()
901 this.createAndSetupWorkerNode()
904 if (this.opts
.enableTasksQueue
=== true) {
905 this.redistributeQueuedTasks(workerNodeKey
)
908 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
909 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
910 worker
.once('exit', () => {
911 this.removeWorkerNode(worker
)
914 const workerNodeKey
= this.addWorkerNode(worker
)
916 this.afterWorkerNodeSetup(workerNodeKey
)
922 * Creates a new, completely set up dynamic worker node.
924 * @returns New, completely set up dynamic worker node key.
926 protected createAndSetupDynamicWorkerNode (): number {
927 const workerNodeKey
= this.createAndSetupWorkerNode()
928 this.registerWorkerMessageListener(workerNodeKey
, message
=> {
929 const localWorkerNodeKey
= this.getWorkerNodeKeyByWorkerId(
932 const workerUsage
= this.workerNodes
[localWorkerNodeKey
].usage
933 // Kill message received from worker
935 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
936 (message
.kill
!= null &&
937 ((this.opts
.enableTasksQueue
=== false &&
938 workerUsage
.tasks
.executing
=== 0) ||
939 (this.opts
.enableTasksQueue
=== true &&
940 workerUsage
.tasks
.executing
=== 0 &&
941 this.tasksQueueSize(localWorkerNodeKey
) === 0)))
943 this.destroyWorkerNode(localWorkerNodeKey
).catch(EMPTY_FUNCTION
)
946 const workerInfo
= this.getWorkerInfo(workerNodeKey
)
947 this.sendToWorker(workerNodeKey
, {
949 workerId
: workerInfo
.id
as number
951 workerInfo
.dynamic
= true
952 if (this.workerChoiceStrategyContext
.getStrategyPolicy().useDynamicWorker
) {
953 workerInfo
.ready
= true
959 * Registers a listener callback on the worker given its worker node key.
961 * @param workerNodeKey - The worker node key.
962 * @param listener - The message listener callback.
964 protected abstract registerWorkerMessageListener
<
965 Message
extends Data
| Response
967 workerNodeKey
: number,
968 listener
: (message
: MessageValue
<Message
>) => void
972 * Method hooked up after a worker node has been newly created.
975 * @param workerNodeKey - The newly created worker node key.
977 protected afterWorkerNodeSetup (workerNodeKey
: number): void {
978 // Listen to worker messages.
979 this.registerWorkerMessageListener(workerNodeKey
, this.workerListener())
980 // Send the startup message to worker.
981 this.sendStartupMessageToWorker(workerNodeKey
)
982 // Send the worker statistics message to worker.
983 this.sendWorkerStatisticsMessageToWorker(workerNodeKey
)
987 * Sends the startup message to worker given its worker node key.
989 * @param workerNodeKey - The worker node key.
991 protected abstract sendStartupMessageToWorker (workerNodeKey
: number): void
994 * Sends the worker statistics message to worker given its worker node key.
996 * @param workerNodeKey - The worker node key.
998 private sendWorkerStatisticsMessageToWorker (workerNodeKey
: number): void {
999 this.sendToWorker(workerNodeKey
, {
1002 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1004 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1007 workerId
: this.getWorkerInfo(workerNodeKey
).id
as number
1011 private redistributeQueuedTasks (workerNodeKey
: number): void {
1012 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1013 let targetWorkerNodeKey
: number = workerNodeKey
1014 let minQueuedTasks
= Infinity
1015 let executeTask
= false
1016 for (const [workerNodeId
, workerNode
] of this.workerNodes
.entries()) {
1017 const workerInfo
= this.getWorkerInfo(workerNodeId
)
1019 workerNodeId
!== workerNodeKey
&&
1021 workerNode
.usage
.tasks
.queued
=== 0
1024 this.workerNodes
[workerNodeId
].usage
.tasks
.executing
<
1025 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1029 targetWorkerNodeKey
= workerNodeId
1033 workerNodeId
!== workerNodeKey
&&
1035 workerNode
.usage
.tasks
.queued
< minQueuedTasks
1037 minQueuedTasks
= workerNode
.usage
.tasks
.queued
1038 targetWorkerNodeKey
= workerNodeId
1043 targetWorkerNodeKey
,
1044 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1048 targetWorkerNodeKey
,
1049 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1056 * This method is the listener registered for each worker message.
1058 * @returns The listener function to execute when a message is received from a worker.
1060 protected workerListener (): (message
: MessageValue
<Response
>) => void {
1062 this.checkMessageWorkerId(message
)
1063 if (message
.ready
!= null) {
1064 // Worker ready response received from worker
1065 this.handleWorkerReadyResponse(message
)
1066 } else if (message
.taskId
!= null) {
1067 // Task execution response received from worker
1068 this.handleTaskExecutionResponse(message
)
1073 private handleWorkerReadyResponse (message
: MessageValue
<Response
>): void {
1075 this.getWorkerNodeKeyByWorkerId(message
.workerId
)
1076 ).ready
= message
.ready
as boolean
1077 if (this.emitter
!= null && this.ready
) {
1078 this.emitter
.emit(PoolEvents
.ready
, this.info
)
1082 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
1083 const promiseResponse
= this.promiseResponseMap
.get(
1084 message
.taskId
as string
1086 if (promiseResponse
!= null) {
1087 if (message
.taskError
!= null) {
1088 this.emitter
?.emit(PoolEvents
.taskError
, message
.taskError
)
1089 promiseResponse
.reject(message
.taskError
.message
)
1091 promiseResponse
.resolve(message
.data
as Response
)
1093 const workerNodeKey
= promiseResponse
.workerNodeKey
1094 this.afterTaskExecutionHook(workerNodeKey
, message
)
1095 this.promiseResponseMap
.delete(message
.taskId
as string)
1097 this.opts
.enableTasksQueue
=== true &&
1098 this.tasksQueueSize(workerNodeKey
) > 0 &&
1099 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
<
1100 (this.opts
.tasksQueueOptions
?.concurrency
as number)
1104 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1107 this.workerChoiceStrategyContext
.update(workerNodeKey
)
1111 private checkAndEmitEvents (): void {
1112 if (this.emitter
!= null) {
1114 this.emitter
.emit(PoolEvents
.busy
, this.info
)
1116 if (this.type === PoolTypes
.dynamic
&& this.full
) {
1117 this.emitter
.emit(PoolEvents
.full
, this.info
)
1123 * Gets the worker information given its worker node key.
1125 * @param workerNodeKey - The worker node key.
1126 * @returns The worker information.
1128 protected getWorkerInfo (workerNodeKey
: number): WorkerInfo
{
1129 return this.workerNodes
[workerNodeKey
].info
1133 * Adds the given worker in the pool worker nodes.
1135 * @param worker - The worker.
1136 * @returns The added worker node key.
1137 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1139 private addWorkerNode (worker
: Worker
): number {
1140 const workerNode
= new WorkerNode
<Worker
, Data
>(worker
, this.worker
)
1141 // Flag the worker node as ready at pool startup.
1142 if (this.starting
) {
1143 workerNode
.info
.ready
= true
1145 this.workerNodes
.push(workerNode
)
1146 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1147 if (workerNodeKey
=== -1) {
1148 throw new Error('Worker node not found')
1150 return workerNodeKey
1154 * Removes the given worker from the pool worker nodes.
1156 * @param worker - The worker.
1158 private removeWorkerNode (worker
: Worker
): void {
1159 const workerNodeKey
= this.getWorkerNodeKeyByWorker(worker
)
1160 if (workerNodeKey
!== -1) {
1161 this.workerNodes
.splice(workerNodeKey
, 1)
1162 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1167 * Executes the given task on the worker given its worker node key.
1169 * @param workerNodeKey - The worker node key.
1170 * @param task - The task to execute.
1172 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1173 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1177 this.worker
=== WorkerTypes
.thread
&& task
.transferList
!= null
1183 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1184 return this.workerNodes
[workerNodeKey
].enqueueTask(task
)
1187 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1188 return this.workerNodes
[workerNodeKey
].dequeueTask()
1191 private tasksQueueSize (workerNodeKey
: number): number {
1192 return this.workerNodes
[workerNodeKey
].tasksQueueSize()
1195 protected flushTasksQueue (workerNodeKey
: number): void {
1196 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1199 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1202 this.workerNodes
[workerNodeKey
].clearTasksQueue()
1205 private flushTasksQueues (): void {
1206 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1207 this.flushTasksQueue(workerNodeKey
)