1 import crypto from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { MessageValue
, PromiseResponseWrapper
} from
'../utility-types'
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
11 import { KillBehaviors
, isKillBehavior
} from
'../worker/worker-options'
12 import { CircularArray
} from
'../circular-array'
13 import { Queue
} from
'../queue'
22 type TasksQueueOptions
,
35 WorkerChoiceStrategies
,
36 type WorkerChoiceStrategy
,
37 type WorkerChoiceStrategyOptions
38 } from
'./selection-strategies/selection-strategies-types'
39 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
42 * Base class that implements some shared logic for all poolifier pools.
44 * @typeParam Worker - Type of worker which manages this pool.
45 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
46 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 export abstract class AbstractPool
<
49 Worker
extends IWorker
,
52 > implements IPool
<Worker
, Data
, Response
> {
54 public readonly workerNodes
: Array<WorkerNode
<Worker
, Data
>> = []
57 public readonly emitter
?: PoolEmitter
60 * The execution response promise map.
62 * - `key`: The message id of each submitted task.
63 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 protected promiseResponseMap
: Map
<
69 PromiseResponseWrapper
<Worker
, Response
>
70 > = new Map
<string, PromiseResponseWrapper
<Worker
, Response
>>()
73 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
82 * The start timestamp of the pool.
84 private readonly startTimestamp
87 * Constructs a new poolifier pool.
89 * @param numberOfWorkers - Number of workers that this pool should manage.
90 * @param filePath - Path to the worker file.
91 * @param opts - Options for the pool.
94 protected readonly numberOfWorkers
: number,
95 protected readonly filePath
: string,
96 protected readonly opts
: PoolOptions
<Worker
>
99 throw new Error('Cannot start a pool from a worker!')
101 this.checkNumberOfWorkers(this.numberOfWorkers
)
102 this.checkFilePath(this.filePath
)
103 this.checkPoolOptions(this.opts
)
105 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
106 this.executeTask
= this.executeTask
.bind(this)
107 this.enqueueTask
= this.enqueueTask
.bind(this)
108 this.checkAndEmitEvents
= this.checkAndEmitEvents
.bind(this)
110 if (this.opts
.enableEvents
=== true) {
111 this.emitter
= new PoolEmitter()
113 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
119 this.opts
.workerChoiceStrategy
,
120 this.opts
.workerChoiceStrategyOptions
125 while (this.workerNodes
.length
< this.numberOfWorkers
) {
126 this.createAndSetupWorker()
129 this.startTimestamp
= performance
.now()
132 private checkFilePath (filePath
: string): void {
135 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
137 throw new Error('Please specify a file with a worker implementation')
141 private checkNumberOfWorkers (numberOfWorkers
: number): void {
142 if (numberOfWorkers
== null) {
144 'Cannot instantiate a pool without specifying the number of workers'
146 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
148 'Cannot instantiate a pool with a non safe integer number of workers'
150 } else if (numberOfWorkers
< 0) {
151 throw new RangeError(
152 'Cannot instantiate a pool with a negative number of workers'
154 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
155 throw new Error('Cannot instantiate a fixed pool with no worker')
159 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
160 if (isPlainObject(opts
)) {
161 this.opts
.workerChoiceStrategy
=
162 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
163 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
164 this.opts
.workerChoiceStrategyOptions
=
165 opts
.workerChoiceStrategyOptions
??
166 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
167 this.checkValidWorkerChoiceStrategyOptions(
168 this.opts
.workerChoiceStrategyOptions
170 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
171 this.opts
.enableEvents
= opts
.enableEvents
?? true
172 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
173 if (this.opts
.enableTasksQueue
) {
174 this.checkValidTasksQueueOptions(
175 opts
.tasksQueueOptions
as TasksQueueOptions
177 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
178 opts
.tasksQueueOptions
as TasksQueueOptions
182 throw new TypeError('Invalid pool options: must be a plain object')
186 private checkValidWorkerChoiceStrategy (
187 workerChoiceStrategy
: WorkerChoiceStrategy
189 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
191 `Invalid worker choice strategy '${workerChoiceStrategy}'`
196 private checkValidWorkerChoiceStrategyOptions (
197 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
199 if (!isPlainObject(workerChoiceStrategyOptions
)) {
201 'Invalid worker choice strategy options: must be a plain object'
205 workerChoiceStrategyOptions
.weights
!= null &&
206 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
209 'Invalid worker choice strategy options: must have a weight for each worker node'
213 workerChoiceStrategyOptions
.measurement
!= null &&
214 !Object.values(Measurements
).includes(
215 workerChoiceStrategyOptions
.measurement
219 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
224 private checkValidTasksQueueOptions (
225 tasksQueueOptions
: TasksQueueOptions
227 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
228 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 tasksQueueOptions
?.concurrency
!= null &&
232 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
235 'Invalid worker tasks concurrency: must be an integer'
239 tasksQueueOptions
?.concurrency
!= null &&
240 tasksQueueOptions
.concurrency
<= 0
243 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
248 private get
starting (): boolean {
249 return this.workerNodes
.some(workerNode
=> !workerNode
.info
.started
)
252 private get
started (): boolean {
253 return this.workerNodes
.some(workerNode
=> workerNode
.info
.started
)
257 public get
info (): PoolInfo
{
261 minSize
: this.minSize
,
262 maxSize
: this.maxSize
,
263 utilization
: round(this.utilization
),
264 workerNodes
: this.workerNodes
.length
,
265 idleWorkerNodes
: this.workerNodes
.reduce(
266 (accumulator
, workerNode
) =>
267 workerNode
.usage
.tasks
.executing
=== 0
272 busyWorkerNodes
: this.workerNodes
.reduce(
273 (accumulator
, workerNode
) =>
274 workerNode
.usage
.tasks
.executing
> 0 ? accumulator
+ 1 : accumulator
,
277 executedTasks
: this.workerNodes
.reduce(
278 (accumulator
, workerNode
) =>
279 accumulator
+ workerNode
.usage
.tasks
.executed
,
282 executingTasks
: this.workerNodes
.reduce(
283 (accumulator
, workerNode
) =>
284 accumulator
+ workerNode
.usage
.tasks
.executing
,
287 queuedTasks
: this.workerNodes
.reduce(
288 (accumulator
, workerNode
) =>
289 accumulator
+ workerNode
.usage
.tasks
.queued
,
292 maxQueuedTasks
: this.workerNodes
.reduce(
293 (accumulator
, workerNode
) =>
294 accumulator
+ workerNode
.usage
.tasks
.maxQueued
,
297 failedTasks
: this.workerNodes
.reduce(
298 (accumulator
, workerNode
) =>
299 accumulator
+ workerNode
.usage
.tasks
.failed
,
306 * Gets the pool run time.
308 * @returns The pool run time in milliseconds.
310 private get
runTime (): number {
311 return performance
.now() - this.startTimestamp
315 * Gets the approximate pool utilization.
317 * @returns The pool utilization.
319 private get
utilization (): number {
320 const poolRunTimeCapacity
= this.runTime
* this.maxSize
321 const totalTasksRunTime
= this.workerNodes
.reduce(
322 (accumulator
, workerNode
) =>
323 accumulator
+ workerNode
.usage
.runTime
.aggregate
,
326 const totalTasksWaitTime
= this.workerNodes
.reduce(
327 (accumulator
, workerNode
) =>
328 accumulator
+ workerNode
.usage
.waitTime
.aggregate
,
331 return (totalTasksRunTime
+ totalTasksWaitTime
) / poolRunTimeCapacity
337 * If it is `'dynamic'`, it provides the `max` property.
339 protected abstract get
type (): PoolType
342 * Gets the worker type.
344 protected abstract get
worker (): WorkerType
349 protected abstract get
minSize (): number
354 protected abstract get
maxSize (): number
357 * Get the worker given its id.
359 * @param workerId - The worker id.
360 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
362 private getWorkerById (workerId
: number): Worker
| undefined {
363 return this.workerNodes
.find(workerNode
=> workerNode
.info
.id
=== workerId
)
368 * Gets the given worker its worker node key.
370 * @param worker - The worker.
371 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
373 private getWorkerNodeKey (worker
: Worker
): number {
374 return this.workerNodes
.findIndex(
375 workerNode
=> workerNode
.worker
=== worker
380 public setWorkerChoiceStrategy (
381 workerChoiceStrategy
: WorkerChoiceStrategy
,
382 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
384 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
385 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
386 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
387 this.opts
.workerChoiceStrategy
389 if (workerChoiceStrategyOptions
!= null) {
390 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
392 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
393 this.setWorkerNodeTasksUsage(
395 this.getWorkerUsage(workerNodeKey
)
397 this.setWorkerStatistics(workerNode
.worker
)
402 public setWorkerChoiceStrategyOptions (
403 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
405 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
406 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
407 this.workerChoiceStrategyContext
.setOptions(
408 this.opts
.workerChoiceStrategyOptions
413 public enableTasksQueue (
415 tasksQueueOptions
?: TasksQueueOptions
417 if (this.opts
.enableTasksQueue
=== true && !enable
) {
418 this.flushTasksQueues()
420 this.opts
.enableTasksQueue
= enable
421 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
425 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
426 if (this.opts
.enableTasksQueue
=== true) {
427 this.checkValidTasksQueueOptions(tasksQueueOptions
)
428 this.opts
.tasksQueueOptions
=
429 this.buildTasksQueueOptions(tasksQueueOptions
)
430 } else if (this.opts
.tasksQueueOptions
!= null) {
431 delete this.opts
.tasksQueueOptions
435 private buildTasksQueueOptions (
436 tasksQueueOptions
: TasksQueueOptions
437 ): TasksQueueOptions
{
439 concurrency
: tasksQueueOptions
?.concurrency
?? 1
444 * Whether the pool is full or not.
446 * The pool filling boolean status.
448 protected get
full (): boolean {
449 return this.workerNodes
.length
>= this.maxSize
453 * Whether the pool is busy or not.
455 * The pool busyness boolean status.
457 protected abstract get
busy (): boolean
460 * Whether worker nodes are executing at least one task.
462 * @returns Worker nodes busyness boolean status.
464 protected internalBusy (): boolean {
466 this.workerNodes
.findIndex(workerNode
=> {
467 return workerNode
.usage
.tasks
.executing
=== 0
473 public async execute (data
?: Data
, name
?: string): Promise
<Response
> {
474 const timestamp
= performance
.now()
475 const workerNodeKey
= this.chooseWorkerNode()
476 const submittedTask
: Task
<Data
> = {
478 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
479 data
: data
?? ({} as Data
),
481 id
: crypto
.randomUUID()
483 const res
= new Promise
<Response
>((resolve
, reject
) => {
484 this.promiseResponseMap
.set(submittedTask
.id
as string, {
487 worker
: this.workerNodes
[workerNodeKey
].worker
491 this.opts
.enableTasksQueue
=== true &&
493 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
>=
494 ((this.opts
.tasksQueueOptions
as TasksQueueOptions
)
495 .concurrency
as number))
497 this.enqueueTask(workerNodeKey
, submittedTask
)
499 this.executeTask(workerNodeKey
, submittedTask
)
501 this.checkAndEmitEvents()
502 // eslint-disable-next-line @typescript-eslint/return-await
507 public async destroy (): Promise
<void> {
509 this.workerNodes
.map(async (workerNode
, workerNodeKey
) => {
510 this.flushTasksQueue(workerNodeKey
)
511 // FIXME: wait for tasks to be finished
512 await this.destroyWorker(workerNode
.worker
)
518 * Terminates the given worker.
520 * @param worker - A worker within `workerNodes`.
522 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
525 * Setup hook to execute code before worker nodes are created in the abstract constructor.
530 protected setupHook (): void {
531 // Intentionally empty
535 * Should return whether the worker is the main worker or not.
537 protected abstract isMain (): boolean
540 * Hook executed before the worker task execution.
543 * @param workerNodeKey - The worker node key.
544 * @param task - The task to execute.
546 protected beforeTaskExecutionHook (
547 workerNodeKey
: number,
550 const workerUsage
= this.workerNodes
[workerNodeKey
].usage
551 ++workerUsage
.tasks
.executing
552 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
556 * Hook executed after the worker task execution.
559 * @param worker - The worker.
560 * @param message - The received message.
562 protected afterTaskExecutionHook (
564 message
: MessageValue
<Response
>
566 const workerUsage
= this.workerNodes
[this.getWorkerNodeKey(worker
)].usage
567 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
568 this.updateRunTimeWorkerUsage(workerUsage
, message
)
569 this.updateEluWorkerUsage(workerUsage
, message
)
572 private updateTaskStatisticsWorkerUsage (
573 workerUsage
: WorkerUsage
,
574 message
: MessageValue
<Response
>
576 const workerTaskStatistics
= workerUsage
.tasks
577 --workerTaskStatistics
.executing
578 ++workerTaskStatistics
.executed
579 if (message
.taskError
!= null) {
580 ++workerTaskStatistics
.failed
584 private updateRunTimeWorkerUsage (
585 workerUsage
: WorkerUsage
,
586 message
: MessageValue
<Response
>
589 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
592 workerUsage
.runTime
.aggregate
+= message
.taskPerformance
?.runTime
?? 0
594 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
596 workerUsage
.tasks
.executed
!== 0
598 workerUsage
.runTime
.average
=
599 workerUsage
.runTime
.aggregate
/
600 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
603 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
605 message
.taskPerformance
?.runTime
!= null
607 workerUsage
.runTime
.history
.push(message
.taskPerformance
.runTime
)
608 workerUsage
.runTime
.median
= median(workerUsage
.runTime
.history
)
613 private updateWaitTimeWorkerUsage (
614 workerUsage
: WorkerUsage
,
617 const timestamp
= performance
.now()
618 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
620 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
623 workerUsage
.waitTime
.aggregate
+= taskWaitTime
?? 0
625 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
627 workerUsage
.tasks
.executed
!== 0
629 workerUsage
.waitTime
.average
=
630 workerUsage
.waitTime
.aggregate
/
631 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
634 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
638 workerUsage
.waitTime
.history
.push(taskWaitTime
)
639 workerUsage
.waitTime
.median
= median(workerUsage
.waitTime
.history
)
644 private updateEluWorkerUsage (
645 workerUsage
: WorkerUsage
,
646 message
: MessageValue
<Response
>
649 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
652 if (workerUsage
.elu
!= null && message
.taskPerformance
?.elu
!= null) {
653 workerUsage
.elu
.idle
.aggregate
+= message
.taskPerformance
.elu
.idle
654 workerUsage
.elu
.active
.aggregate
+= message
.taskPerformance
.elu
.active
655 workerUsage
.elu
.utilization
=
656 (workerUsage
.elu
.utilization
+
657 message
.taskPerformance
.elu
.utilization
) /
659 } else if (message
.taskPerformance
?.elu
!= null) {
660 workerUsage
.elu
.idle
.aggregate
= message
.taskPerformance
.elu
.idle
661 workerUsage
.elu
.active
.aggregate
= message
.taskPerformance
.elu
.active
662 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
665 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
667 workerUsage
.tasks
.executed
!== 0
669 const executedTasks
=
670 workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
671 workerUsage
.elu
.idle
.average
=
672 workerUsage
.elu
.idle
.aggregate
/ executedTasks
673 workerUsage
.elu
.active
.average
=
674 workerUsage
.elu
.active
.aggregate
/ executedTasks
677 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
679 message
.taskPerformance
?.elu
!= null
681 workerUsage
.elu
.idle
.history
.push(message
.taskPerformance
.elu
.idle
)
682 workerUsage
.elu
.active
.history
.push(message
.taskPerformance
.elu
.active
)
683 workerUsage
.elu
.idle
.median
= median(workerUsage
.elu
.idle
.history
)
684 workerUsage
.elu
.active
.median
= median(workerUsage
.elu
.active
.history
)
690 * Chooses a worker node for the next task.
692 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
694 * @returns The worker node key
696 private chooseWorkerNode (): number {
697 if (this.shallCreateDynamicWorker()) {
698 const worker
= this.createAndSetupDynamicWorker()
700 this.workerChoiceStrategyContext
.getStrategyPolicy().useDynamicWorker
702 return this.getWorkerNodeKey(worker
)
705 return this.workerChoiceStrategyContext
.execute()
709 * Conditions for dynamic worker creation.
711 * @returns Whether to create a dynamic worker or not.
713 private shallCreateDynamicWorker (): boolean {
714 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
718 * Sends a message to the given worker.
720 * @param worker - The worker which should receive the message.
721 * @param message - The message.
723 protected abstract sendToWorker (
725 message
: MessageValue
<Data
>
729 * Registers a listener callback on the given worker.
731 * @param worker - The worker which should register a listener.
732 * @param listener - The message listener callback.
734 private registerWorkerMessageListener
<Message
extends Data
| Response
>(
736 listener
: (message
: MessageValue
<Message
>) => void
738 worker
.on('message', listener
as MessageHandler
<Worker
>)
742 * Creates a new worker.
744 * @returns Newly created worker.
746 protected abstract createWorker (): Worker
749 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
752 * @param worker - The newly created worker.
754 protected afterWorkerSetup (worker
: Worker
): void {
755 // Listen to worker messages.
756 this.registerWorkerMessageListener(worker
, this.workerListener())
760 * Creates a new worker and sets it up completely in the pool worker nodes.
762 * @returns New, completely set up worker.
764 protected createAndSetupWorker (): Worker
{
765 const worker
= this.createWorker()
767 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
768 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
769 worker
.on('error', error
=> {
770 if (this.emitter
!= null) {
771 this.emitter
.emit(PoolEvents
.error
, error
)
773 if (this.opts
.restartWorkerOnError
=== true && !this.starting
) {
774 this.createAndSetupWorker()
777 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
778 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
779 worker
.once('exit', () => {
780 this.removeWorkerNode(worker
)
783 this.pushWorkerNode(worker
)
785 this.setWorkerStatistics(worker
)
787 this.afterWorkerSetup(worker
)
793 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
795 * @returns New, completely set up dynamic worker.
797 protected createAndSetupDynamicWorker (): Worker
{
798 const worker
= this.createAndSetupWorker()
799 this.registerWorkerMessageListener(worker
, message
=> {
800 const workerNodeKey
= this.getWorkerNodeKey(worker
)
802 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
803 (message
.kill
!= null &&
804 ((this.opts
.enableTasksQueue
=== false &&
805 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
=== 0) ||
806 (this.opts
.enableTasksQueue
=== true &&
807 this.workerNodes
[workerNodeKey
].usage
.tasks
.executing
=== 0 &&
808 this.tasksQueueSize(workerNodeKey
) === 0)))
810 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
811 void (this.destroyWorker(worker
) as Promise
<void>)
818 * This function is the listener registered for each worker message.
820 * @returns The listener function to execute when a message is received from a worker.
822 protected workerListener (): (message
: MessageValue
<Response
>) => void {
824 if (message
.workerId
!= null && message
.started
!= null) {
825 // Worker started message received
826 const worker
= this.getWorkerById(message
.workerId
)
827 if (worker
!= null) {
828 this.workerNodes
[this.getWorkerNodeKey(worker
)].info
.started
=
831 throw new Error('Worker started message received from unknown worker')
833 } else if (message
.id
!= null) {
834 // Task execution response received
835 const promiseResponse
= this.promiseResponseMap
.get(message
.id
)
836 if (promiseResponse
!= null) {
837 if (message
.taskError
!= null) {
838 if (this.emitter
!= null) {
839 this.emitter
.emit(PoolEvents
.taskError
, message
.taskError
)
841 promiseResponse
.reject(message
.taskError
.message
)
843 promiseResponse
.resolve(message
.data
as Response
)
845 this.afterTaskExecutionHook(promiseResponse
.worker
, message
)
846 this.promiseResponseMap
.delete(message
.id
)
847 const workerNodeKey
= this.getWorkerNodeKey(promiseResponse
.worker
)
849 this.opts
.enableTasksQueue
=== true &&
850 this.tasksQueueSize(workerNodeKey
) > 0
854 this.dequeueTask(workerNodeKey
) as Task
<Data
>
857 this.workerChoiceStrategyContext
.update(workerNodeKey
)
863 private checkAndEmitEvents (): void {
864 if (this.emitter
!= null) {
866 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
868 if (this.type === PoolTypes
.dynamic
&& this.full
) {
869 this.emitter
?.emit(PoolEvents
.full
, this.info
)
875 * Sets the given worker node its tasks usage in the pool.
877 * @param workerNode - The worker node.
878 * @param workerUsage - The worker usage.
880 private setWorkerNodeTasksUsage (
881 workerNode
: WorkerNode
<Worker
, Data
>,
882 workerUsage
: WorkerUsage
884 workerNode
.usage
= workerUsage
888 * Pushes the given worker in the pool worker nodes.
890 * @param worker - The worker.
891 * @returns The worker nodes length.
893 private pushWorkerNode (worker
: Worker
): number {
894 this.workerNodes
.push({
896 info
: { id
: this.getWorkerId(worker
), started
: false },
897 usage
: this.getWorkerUsage(),
898 tasksQueue
: new Queue
<Task
<Data
>>()
900 const workerNodeKey
= this.getWorkerNodeKey(worker
)
901 this.setWorkerNodeTasksUsage(
902 this.workerNodes
[workerNodeKey
],
903 this.getWorkerUsage(workerNodeKey
)
905 return this.workerNodes
.length
909 * Gets the worker id.
911 * @param worker - The worker.
912 * @returns The worker id.
914 private getWorkerId (worker
: Worker
): number | undefined {
915 if (this.worker
=== WorkerTypes
.thread
) {
916 return worker
.threadId
917 } else if (this.worker
=== WorkerTypes
.cluster
) {
923 // * Sets the given worker in the pool worker nodes.
925 // * @param workerNodeKey - The worker node key.
926 // * @param worker - The worker.
927 // * @param workerInfo - The worker info.
928 // * @param workerUsage - The worker usage.
929 // * @param tasksQueue - The worker task queue.
931 // private setWorkerNode (
932 // workerNodeKey: number,
934 // workerInfo: WorkerInfo,
935 // workerUsage: WorkerUsage,
936 // tasksQueue: Queue<Task<Data>>
938 // this.workerNodes[workerNodeKey] = {
941 // usage: workerUsage,
947 * Removes the given worker from the pool worker nodes.
949 * @param worker - The worker.
951 private removeWorkerNode (worker
: Worker
): void {
952 const workerNodeKey
= this.getWorkerNodeKey(worker
)
953 if (workerNodeKey
!== -1) {
954 this.workerNodes
.splice(workerNodeKey
, 1)
955 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
959 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
960 this.beforeTaskExecutionHook(workerNodeKey
, task
)
961 this.sendToWorker(this.workerNodes
[workerNodeKey
].worker
, task
)
964 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
965 return this.workerNodes
[workerNodeKey
].tasksQueue
.enqueue(task
)
968 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
969 return this.workerNodes
[workerNodeKey
].tasksQueue
.dequeue()
972 private tasksQueueSize (workerNodeKey
: number): number {
973 return this.workerNodes
[workerNodeKey
].tasksQueue
.size
976 private tasksMaxQueueSize (workerNodeKey
: number): number {
977 return this.workerNodes
[workerNodeKey
].tasksQueue
.maxSize
980 private flushTasksQueue (workerNodeKey
: number): void {
981 if (this.tasksQueueSize(workerNodeKey
) > 0) {
982 for (let i
= 0; i
< this.tasksQueueSize(workerNodeKey
); i
++) {
985 this.dequeueTask(workerNodeKey
) as Task
<Data
>
989 this.workerNodes
[workerNodeKey
].tasksQueue
.clear()
992 private flushTasksQueues (): void {
993 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
994 this.flushTasksQueue(workerNodeKey
)
998 private setWorkerStatistics (worker
: Worker
): void {
999 this.sendToWorker(worker
, {
1002 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1004 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
1010 private getWorkerUsage (workerNodeKey
?: number): WorkerUsage
{
1011 const getTasksQueueSize
= (workerNodeKey
?: number): number => {
1012 return workerNodeKey
!= null ? this.tasksQueueSize(workerNodeKey
) : 0
1014 const getTasksMaxQueueSize
= (workerNodeKey
?: number): number => {
1015 return workerNodeKey
!= null ? this.tasksMaxQueueSize(workerNodeKey
) : 0
1021 get
queued (): number {
1022 return getTasksQueueSize(workerNodeKey
)
1024 get
maxQueued (): number {
1025 return getTasksMaxQueueSize(workerNodeKey
)
1033 history
: new CircularArray()
1039 history
: new CircularArray()
1046 history
: new CircularArray()
1052 history
: new CircularArray()