1 import crypto from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import { readFileSync
} from
'node:fs'
4 import type { MessageValue
, PromiseResponseWrapper
} from
'../utility-types'
6 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
13 import { KillBehaviors
} from
'../worker/worker-options'
14 import { CircularArray
} from
'../circular-array'
15 import { Queue
} from
'../queue'
24 type TasksQueueOptions
,
37 WorkerChoiceStrategies
,
38 type WorkerChoiceStrategy
,
39 type WorkerChoiceStrategyOptions
40 } from
'./selection-strategies/selection-strategies-types'
41 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
45 readFileSync(new URL('../../package.json', import.meta
.url
), 'utf8')
46 ) as Record
<string, unknown
>
50 * Base class that implements some shared logic for all poolifier pools.
52 * @typeParam Worker - Type of worker which manages this pool.
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
56 export abstract class AbstractPool
<
57 Worker
extends IWorker
,
60 > implements IPool
<Worker
, Data
, Response
> {
62 public readonly workerNodes
: Array<WorkerNode
<Worker
, Data
>> = []
65 public readonly emitter
?: PoolEmitter
68 * The execution response promise map.
70 * - `key`: The message id of each submitted task.
71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
75 protected promiseResponseMap
: Map
<
77 PromiseResponseWrapper
<Worker
, Response
>
78 > = new Map
<string, PromiseResponseWrapper
<Worker
, Response
>>()
81 * Worker choice strategy context referencing a worker choice algorithm implementation.
83 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
90 * The start timestamp of the pool.
92 private readonly startTimestamp
95 * Constructs a new poolifier pool.
97 * @param numberOfWorkers - Number of workers that this pool should manage.
98 * @param filePath - Path to the worker file.
99 * @param opts - Options for the pool.
102 protected readonly numberOfWorkers
: number,
103 protected readonly filePath
: string,
104 protected readonly opts
: PoolOptions
<Worker
>
106 if (!this.isMain()) {
107 throw new Error('Cannot start a pool from a worker!')
109 this.checkNumberOfWorkers(this.numberOfWorkers
)
110 this.checkFilePath(this.filePath
)
111 this.checkPoolOptions(this.opts
)
113 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
114 this.executeTask
= this.executeTask
.bind(this)
115 this.enqueueTask
= this.enqueueTask
.bind(this)
116 this.checkAndEmitEvents
= this.checkAndEmitEvents
.bind(this)
118 if (this.opts
.enableEvents
=== true) {
119 this.emitter
= new PoolEmitter()
121 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
127 this.opts
.workerChoiceStrategy
,
128 this.opts
.workerChoiceStrategyOptions
133 while (this.workerNodes
.length
< this.numberOfWorkers
) {
134 this.createAndSetupWorker()
137 this.startTimestamp
= performance
.now()
140 private checkFilePath (filePath
: string): void {
143 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
145 throw new Error('Please specify a file with a worker implementation')
149 private checkNumberOfWorkers (numberOfWorkers
: number): void {
150 if (numberOfWorkers
== null) {
152 'Cannot instantiate a pool without specifying the number of workers'
154 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
156 'Cannot instantiate a pool with a non safe integer number of workers'
158 } else if (numberOfWorkers
< 0) {
159 throw new RangeError(
160 'Cannot instantiate a pool with a negative number of workers'
162 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
163 throw new Error('Cannot instantiate a fixed pool with no worker')
167 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
168 if (isPlainObject(opts
)) {
169 this.opts
.workerChoiceStrategy
=
170 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
171 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
172 this.opts
.workerChoiceStrategyOptions
=
173 opts
.workerChoiceStrategyOptions
??
174 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
175 this.checkValidWorkerChoiceStrategyOptions(
176 this.opts
.workerChoiceStrategyOptions
178 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
179 this.opts
.enableEvents
= opts
.enableEvents
?? true
180 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
181 if (this.opts
.enableTasksQueue
) {
182 this.checkValidTasksQueueOptions(
183 opts
.tasksQueueOptions
as TasksQueueOptions
185 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
186 opts
.tasksQueueOptions
as TasksQueueOptions
190 throw new TypeError('Invalid pool options: must be a plain object')
194 private checkValidWorkerChoiceStrategy (
195 workerChoiceStrategy
: WorkerChoiceStrategy
197 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
199 `Invalid worker choice strategy '${workerChoiceStrategy}'`
204 private checkValidWorkerChoiceStrategyOptions (
205 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
207 if (!isPlainObject(workerChoiceStrategyOptions
)) {
209 'Invalid worker choice strategy options: must be a plain object'
213 workerChoiceStrategyOptions
.weights
!= null &&
214 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
217 'Invalid worker choice strategy options: must have a weight for each worker node'
221 workerChoiceStrategyOptions
.measurement
!= null &&
222 !Object.values(Measurements
).includes(
223 workerChoiceStrategyOptions
.measurement
227 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
232 private checkValidTasksQueueOptions (
233 tasksQueueOptions
: TasksQueueOptions
235 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
236 throw new TypeError('Invalid tasks queue options: must be a plain object')
239 tasksQueueOptions
?.concurrency
!= null &&
240 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
243 'Invalid worker tasks concurrency: must be an integer'
247 tasksQueueOptions
?.concurrency
!= null &&
248 tasksQueueOptions
.concurrency
<= 0
251 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
257 public get
info (): PoolInfo
{
262 minSize
: this.minSize
,
263 maxSize
: this.maxSize
,
264 ...(this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
265 .runTime
.aggregate
&&
266 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
267 .waitTime
.aggregate
&& { utilization
: round(this.utilization
) }),
268 workerNodes
: this.workerNodes
.length
,
269 idleWorkerNodes
: this.workerNodes
.reduce(
270 (accumulator
, workerNode
) =>
271 workerNode
.usage
.tasks
.executing
=== 0
276 busyWorkerNodes
: this.workerNodes
.reduce(
277 (accumulator
, workerNode
) =>
278 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
281 executedTasks
: this.workerNodes
.reduce(
282 (accumulator
, workerNode
) =>
283 accumulator
+ workerNode
.usage
.tasks
.executed
,
286 executingTasks
: this.workerNodes
.reduce(
287 (accumulator
, workerNode
) =>
288 accumulator
+ workerNode
.usage
.tasks
.executing
,
291 queuedTasks
: this.workerNodes
.reduce(
292 (accumulator
, workerNode
) =>
293 accumulator
+ workerNode
.usage
.tasks
.queued
,
296 maxQueuedTasks
: this.workerNodes
.reduce(
297 (accumulator
, workerNode
) =>
298 accumulator
+ workerNode
.usage
.tasks
.maxQueued
,
301 failedTasks
: this.workerNodes
.reduce(
302 (accumulator
, workerNode
) =>
303 accumulator
+ workerNode
.usage
.tasks
.failed
,
310 * Gets the pool run time.
312 * @returns The pool run time in milliseconds.
314 private get
runTime (): number {
315 return performance
.now() - this.startTimestamp
319 * Gets the approximate pool utilization.
321 * @returns The pool utilization.
323 private get
utilization (): number {
324 const poolRunTimeCapacity
= this.runTime
* this.maxSize
325 const totalTasksRunTime
= this.workerNodes
.reduce(
326 (accumulator
, workerNode
) =>
327 accumulator
+ workerNode
.usage
.runTime
.aggregate
,
330 const totalTasksWaitTime
= this.workerNodes
.reduce(
331 (accumulator
, workerNode
) =>
332 accumulator
+ workerNode
.usage
.waitTime
.aggregate
,
335 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolRunTimeCapacity
341 * If it is `'dynamic'`, it provides the `max` property.
343 protected abstract get
type (): PoolType
346 * Gets the worker type.
348 protected abstract get
worker (): WorkerType
353 protected abstract get
minSize (): number
358 protected abstract get
maxSize (): number
361 * Get the worker given its id.
363 * @param workerId - The worker id.
364 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
366 private getWorkerById (workerId
: number): Worker
| undefined {
367 return this.workerNodes
.find(workerNode
=> workerNode
.info
.id
=== workerId
)
372 * Gets the given worker its worker node key.
374 * @param worker - The worker.
375 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
377 private getWorkerNodeKey (worker
: Worker
): number {
378 return this.workerNodes
.findIndex(
379 workerNode
=> workerNode
.worker
=== worker
384 public setWorkerChoiceStrategy (
385 workerChoiceStrategy
: WorkerChoiceStrategy
,
386 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
388 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
389 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
390 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
391 this.opts
.workerChoiceStrategy
393 if (workerChoiceStrategyOptions
!= null) {
394 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
396 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
397 this.setWorkerNodeTasksUsage(
399 this.getWorkerUsage(workerNodeKey
)
401 this.setWorkerStatistics(workerNode
.worker
)
406 public setWorkerChoiceStrategyOptions (
407 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
409 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
410 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
411 this.workerChoiceStrategyContext
.setOptions(
412 this.opts
.workerChoiceStrategyOptions
417 public enableTasksQueue (
419 tasksQueueOptions
?: TasksQueueOptions
421 if (this.opts
.enableTasksQueue
=== true && !enable
) {
422 this.flushTasksQueues()
424 this.opts
.enableTasksQueue
= enable
425 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
429 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
430 if (this.opts
.enableTasksQueue
=== true) {
431 this.checkValidTasksQueueOptions(tasksQueueOptions
)
432 this.opts
.tasksQueueOptions
=
433 this.buildTasksQueueOptions(tasksQueueOptions
)
434 } else if (this.opts
.tasksQueueOptions
!= null) {
435 delete this.opts
.tasksQueueOptions
439 private buildTasksQueueOptions (
440 tasksQueueOptions
: TasksQueueOptions
441 ): TasksQueueOptions
{
443 concurrency
: tasksQueueOptions
?.concurrency
?? 1
448 * Whether the pool is full or not.
450 * The pool filling boolean status.
452 protected get
full (): boolean {
453 return this.workerNodes
.length
>= this.maxSize
457 * Whether the pool is busy or not.
459 * The pool busyness boolean status.
461 protected abstract get
busy (): boolean
464 * Whether worker nodes are executing at least one task.
466 * @returns Worker nodes busyness boolean status.
468 protected internalBusy (): boolean {
470 this.workerNodes
.findIndex(workerNode
=> {
471 return workerNode
.usage
.tasks
.executing
=== 0
477 public async execute (data
?: Data
, name
?: string): Promise
<Response
> {
478 const timestamp
= performance
.now()
479 const workerNodeKey
= this.chooseWorkerNode()
480 const submittedTask
: Task
<Data
> = {
482 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
483 data
: data
?? ({} as Data
),
485 id
: crypto
.randomUUID()
487 const res
= new Promise
<Response
>((resolve
, reject
) => {
488 this.promiseResponseMap
.set(submittedTask
.id
as string, {
491 worker
: this.workerNodes
[workerNodeKey
].worker
495 this.opts
.enableTasksQueue
=== true &&
497 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
498 ((this.opts
.tasksQueueOptions
as TasksQueueOptions
)
499 .concurrency
as number))
501 this.enqueueTask(workerNodeKey
, submittedTask
)
503 this.executeTask(workerNodeKey
, submittedTask
)
505 this.checkAndEmitEvents()
506 // eslint-disable-next-line @typescript-eslint/return-await
511 public async destroy (): Promise
<void> {
513 this.workerNodes
.map(async (workerNode
, workerNodeKey
) => {
514 this.flushTasksQueue(workerNodeKey
)
515 // FIXME: wait for tasks to be finished
516 const workerExitPromise
= new Promise
<void>(resolve
=> {
517 workerNode
.worker
.on('exit', () => {
521 await this.destroyWorker(workerNode
.worker
)
522 await workerExitPromise
528 * Terminates the given worker.
530 * @param worker - A worker within `workerNodes`.
532 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
535 * Setup hook to execute code before worker nodes are created in the abstract constructor.
540 protected setupHook (): void {
541 // Intentionally empty
545 * Should return whether the worker is the main worker or not.
547 protected abstract isMain (): boolean
550 * Hook executed before the worker task execution.
553 * @param workerNodeKey - The worker node key.
554 * @param task - The task to execute.
556 protected beforeTaskExecutionHook (
557 workerNodeKey
: number,
560 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
561 ++workerUsage
.tasks
.executing
562 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
566 * Hook executed after the worker task execution.
569 * @param worker - The worker.
570 * @param message - The received message.
572 protected afterTaskExecutionHook (
574 message
: MessageValue
<Response
>
576 const workerUsage
= this.workerNodes
[this.getWorkerNodeKey(worker
)].usage
577 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
578 this.updateRunTimeWorkerUsage(workerUsage
, message
)
579 this.updateEluWorkerUsage(workerUsage
, message
)
582 private updateTaskStatisticsWorkerUsage (
583 workerUsage
: WorkerUsage
,
584 message
: MessageValue
<Response
>
586 const workerTaskStatistics
= workerUsage
.tasks
587 --workerTaskStatistics
.executing
588 ++workerTaskStatistics
.executed
589 if (message
.taskError
!= null) {
590 ++workerTaskStatistics
.failed
594 private updateRunTimeWorkerUsage (
595 workerUsage
: WorkerUsage
,
596 message
: MessageValue
<Response
>
599 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
602 workerUsage
.runTime
.aggregate
+= message
.taskPerformance
?.runTime
?? 0
604 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
606 workerUsage
.tasks
.executed
!== 0
608 workerUsage
.runTime
.average
=
609 workerUsage
.runTime
.aggregate
/
610 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
613 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
615 message
.taskPerformance
?.runTime
!= null
617 workerUsage
.runTime
.history
.push(message
.taskPerformance
.runTime
)
618 workerUsage
.runTime
.median
= median(workerUsage
.runTime
.history
)
623 private updateWaitTimeWorkerUsage (
624 workerUsage
: WorkerUsage
,
627 const timestamp
= performance
.now()
628 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
630 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
633 workerUsage
.waitTime
.aggregate
+= taskWaitTime
?? 0
635 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
637 workerUsage
.tasks
.executed
!== 0
639 workerUsage
.waitTime
.average
=
640 workerUsage
.waitTime
.aggregate
/
641 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
644 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
648 workerUsage
.waitTime
.history
.push(taskWaitTime
)
649 workerUsage
.waitTime
.median
= median(workerUsage
.waitTime
.history
)
654 private updateEluWorkerUsage (
655 workerUsage
: WorkerUsage
,
656 message
: MessageValue
<Response
>
659 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
662 if (workerUsage
.elu
!= null && message
.taskPerformance
?.elu
!= null) {
663 workerUsage
.elu
.idle
.aggregate
+= message
.taskPerformance
.elu
.idle
664 workerUsage
.elu
.active
.aggregate
+= message
.taskPerformance
.elu
.active
665 workerUsage
.elu
.utilization
=
666 (workerUsage
.elu
.utilization
+
667 message
.taskPerformance
.elu
.utilization
) /
669 } else if (message
.taskPerformance
?.elu
!= null) {
670 workerUsage
.elu
.idle
.aggregate
= message
.taskPerformance
.elu
.idle
671 workerUsage
.elu
.active
.aggregate
= message
.taskPerformance
.elu
.active
672 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
675 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
677 workerUsage
.tasks
.executed
!== 0
679 const executedTasks
=
680 workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
681 workerUsage
.elu
.idle
.average
=
682 workerUsage
.elu
.idle
.aggregate
/ executedTasks
683 workerUsage
.elu
.active
.average
=
684 workerUsage
.elu
.active
.aggregate
/ executedTasks
687 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
689 message
.taskPerformance
?.elu
!= null
691 workerUsage
.elu
.idle
.history
.push(message
.taskPerformance
.elu
.idle
)
692 workerUsage
.elu
.active
.history
.push(message
.taskPerformance
.elu
.active
)
693 workerUsage
.elu
.idle
.median
= median(workerUsage
.elu
.idle
.history
)
694 workerUsage
.elu
.active
.median
= median(workerUsage
.elu
.active
.history
)
700 * Chooses a worker node for the next task.
702 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
704 * @returns The worker node key
706 private chooseWorkerNode (): number {
707 if (this.shallCreateDynamicWorker()) {
708 const worker
= this.createAndSetupDynamicWorker()
710 this.workerChoiceStrategyContext
.getStrategyPolicy().useDynamicWorker
712 return this.getWorkerNodeKey(worker
)
715 return this.workerChoiceStrategyContext
.execute()
719 * Conditions for dynamic worker creation.
721 * @returns Whether to create a dynamic worker or not.
723 private shallCreateDynamicWorker (): boolean {
724 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
728 * Sends a message to the given worker.
730 * @param worker - The worker which should receive the message.
731 * @param message - The message.
733 protected abstract sendToWorker (
735 message
: MessageValue
<Data
>
739 * Registers a listener callback on the given worker.
741 * @param worker - The worker which should register a listener.
742 * @param listener - The message listener callback.
744 private registerWorkerMessageListener
<Message
extends Data
| Response
>(
746 listener
: (message
: MessageValue
<Message
>) => void
748 worker
.on('message', listener
as MessageHandler
<Worker
>)
752 * Creates a new worker.
754 * @returns Newly created worker.
756 protected abstract createWorker (): Worker
759 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
762 * @param worker - The newly created worker.
764 protected afterWorkerSetup (worker
: Worker
): void {
765 // Listen to worker messages.
766 this.registerWorkerMessageListener(worker
, this.workerListener())
770 * Creates a new worker and sets it up completely in the pool worker nodes.
772 * @returns New, completely set up worker.
774 protected createAndSetupWorker (): Worker
{
775 const worker
= this.createWorker()
777 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
778 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
779 worker
.on('error', error
=> {
780 if (this.emitter
!= null) {
781 this.emitter
.emit(PoolEvents
.error
, error
)
783 if (this.opts
.enableTasksQueue
=== true) {
784 const workerNodeKey
= this.getWorkerNodeKey(worker
)
785 while (this.tasksQueueSize(workerNodeKey
) > 0) {
786 let targetWorkerNodeKey
: number = workerNodeKey
787 let minQueuedTasks
= Infinity
788 for (const [workerNodeId
, workerNode
] of this.workerNodes
.entries()) {
790 workerNodeId
!== workerNodeKey
&&
791 workerNode
.usage
.tasks
.queued
=== 0
793 targetWorkerNodeKey
= workerNodeId
797 workerNodeId
!== workerNodeKey
&&
798 workerNode
.usage
.tasks
.queued
< minQueuedTasks
800 minQueuedTasks
= workerNode
.usage
.tasks
.queued
801 targetWorkerNodeKey
= workerNodeId
806 this.dequeueTask(workerNodeKey
) as Task
<Data
>
810 if (this.opts
.restartWorkerOnError
=== true) {
811 this.createAndSetupWorker()
814 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
815 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
816 worker
.once('exit', () => {
817 this.removeWorkerNode(worker
)
820 this.pushWorkerNode(worker
)
822 this.setWorkerStatistics(worker
)
824 this.afterWorkerSetup(worker
)
830 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
832 * @returns New, completely set up dynamic worker.
834 protected createAndSetupDynamicWorker (): Worker
{
835 const worker
= this.createAndSetupWorker()
836 this.registerWorkerMessageListener(worker
, message
=> {
837 const workerNodeKey
= this.getWorkerNodeKey(worker
)
839 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
840 (message
.kill
!= null &&
841 ((this.opts
.enableTasksQueue
=== false &&
842 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
=== 0) ||
843 (this.opts
.enableTasksQueue
=== true &&
844 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
=== 0 &&
845 this.tasksQueueSize(workerNodeKey
) === 0)))
847 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
848 void (this.destroyWorker(worker
) as Promise
<void>)
855 * This function is the listener registered for each worker message.
857 * @returns The listener function to execute when a message is received from a worker.
859 protected workerListener (): (message
: MessageValue
<Response
>) => void {
861 if (message
.workerId
!= null && message
.started
!= null) {
862 // Worker started message received
863 this.handleWorkerStartedMessage(message
)
864 } else if (message
.id
!= null) {
865 // Task execution response received
866 this.handleTaskExecutionResponse(message
)
871 private handleWorkerStartedMessage (message
: MessageValue
<Response
>): void {
872 // Worker started message received
873 const worker
= this.getWorkerById(message
.workerId
as number)
874 if (worker
!= null) {
875 this.workerNodes
[this.getWorkerNodeKey(worker
)].info
.started
=
876 message
.started
as boolean
879 `Worker started message received from unknown worker '${
880 message.workerId as number
886 private handleTaskExecutionResponse (message
: MessageValue
<Response
>): void {
887 const promiseResponse
= this.promiseResponseMap
.get(message
.id
as string)
888 if (promiseResponse
!= null) {
889 if (message
.taskError
!= null) {
890 if (this.emitter
!= null) {
891 this.emitter
.emit(PoolEvents
.taskError
, message
.taskError
)
893 promiseResponse
.reject(message
.taskError
.message
)
895 promiseResponse
.resolve(message
.data
as Response
)
897 this.afterTaskExecutionHook(promiseResponse
.worker
, message
)
898 this.promiseResponseMap
.delete(message
.id
as string)
899 const workerNodeKey
= this.getWorkerNodeKey(promiseResponse
.worker
)
901 this.opts
.enableTasksQueue
=== true &&
902 this.tasksQueueSize(workerNodeKey
) > 0
906 this.dequeueTask(workerNodeKey
) as Task
<Data
>
909 this.workerChoiceStrategyContext
.update(workerNodeKey
)
913 private checkAndEmitEvents (): void {
914 if (this.emitter
!= null) {
916 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
918 if (this.type === PoolTypes
.dynamic
&& this.full
) {
919 this.emitter
?.emit(PoolEvents
.full
, this.info
)
925 * Sets the given worker node its tasks usage in the pool.
927 * @param workerNode - The worker node.
928 * @param workerUsage - The worker usage.
930 private setWorkerNodeTasksUsage (
931 workerNode
: WorkerNode
<Worker
, Data
>,
932 workerUsage
: WorkerUsage
934 workerNode
.usage
= workerUsage
938 * Pushes the given worker in the pool worker nodes.
940 * @param worker - The worker.
941 * @returns The worker nodes length.
943 private pushWorkerNode (worker
: Worker
): number {
944 this.workerNodes
.push({
946 info
: { id
: this.getWorkerId(worker
), started
: true },
947 usage
: this.getWorkerUsage(),
948 tasksQueue
: new Queue
<Task
<Data
>>()
950 const workerNodeKey
= this.getWorkerNodeKey(worker
)
951 this.setWorkerNodeTasksUsage(
952 this.workerNodes
[workerNodeKey
],
953 this.getWorkerUsage(workerNodeKey
)
955 return this.workerNodes
.length
959 * Gets the worker id.
961 * @param worker - The worker.
962 * @returns The worker id.
964 private getWorkerId (worker
: Worker
): number | undefined {
965 if (this.worker
=== WorkerTypes
.thread
) {
966 return worker
.threadId
967 } else if (this.worker
=== WorkerTypes
.cluster
) {
973 // * Sets the given worker in the pool worker nodes.
975 // * @param workerNodeKey - The worker node key.
976 // * @param worker - The worker.
977 // * @param workerInfo - The worker info.
978 // * @param workerUsage - The worker usage.
979 // * @param tasksQueue - The worker task queue.
981 // private setWorkerNode (
982 // workerNodeKey: number,
984 // workerInfo: WorkerInfo,
985 // workerUsage: WorkerUsage,
986 // tasksQueue: Queue<Task<Data>>
988 // this.workerNodes[workerNodeKey] = {
991 // usage: workerUsage,
997 * Removes the given worker from the pool worker nodes.
999 * @param worker - The worker.
1001 private removeWorkerNode (worker
: Worker
): void {
1002 const workerNodeKey
= this.getWorkerNodeKey(worker
)
1003 if (workerNodeKey
!== -1) {
1004 this.workerNodes
.splice(workerNodeKey
, 1)
1005 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
1009 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
1010 this.beforeTaskExecutionHook(workerNodeKey
, task
)
1011 this.sendToWorker(this.workerNodes
[workerNodeKey
].worker
, task
)
1014 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
1015 return this.workerNodes
[workerNodeKey
].tasksQueue
.enqueue(task
)
1018 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
1019 return this.workerNodes
[workerNodeKey
].tasksQueue
.dequeue()
1022 private tasksQueueSize (workerNodeKey
: number): number {
1023 return this.workerNodes
[workerNodeKey
].tasksQueue
.size
1026 private tasksMaxQueueSize (workerNodeKey
: number): number {
1027 return this.workerNodes
[workerNodeKey
].tasksQueue
.maxSize
1030 private flushTasksQueue (workerNodeKey
: number): void {
1031 while (this.tasksQueueSize(workerNodeKey
) > 0) {
1034 this.dequeueTask(workerNodeKey
) as Task
<Data
>
1037 this.workerNodes
[workerNodeKey
].tasksQueue
.clear()
1040 private flushTasksQueues (): void {
1041 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
1042 this.flushTasksQueue(workerNodeKey
)
1046 private setWorkerStatistics (worker
: Worker
): void {
1047 this.sendToWorker(worker
, {
1050 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1052 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1058 private getWorkerUsage (workerNodeKey
?: number): WorkerUsage
{
1059 const getTasksQueueSize
= (workerNodeKey
?: number): number => {
1060 return workerNodeKey
!= null ? this.tasksQueueSize(workerNodeKey
) : 0
1062 const getTasksMaxQueueSize
= (workerNodeKey
?: number): number => {
1063 return workerNodeKey
!= null ? this.tasksMaxQueueSize(workerNodeKey
) : 0
1069 get
queued (): number {
1070 return getTasksQueueSize(workerNodeKey
)
1072 get
maxQueued (): number {
1073 return getTasksMaxQueueSize(workerNodeKey
)
1081 history
: new CircularArray()
1087 history
: new CircularArray()
1094 history
: new CircularArray()
1100 history
: new CircularArray()