1 import crypto from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { MessageValue
, PromiseResponseWrapper
} from
'../utility-types'
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
10 import { KillBehaviors
, isKillBehavior
} from
'../worker/worker-options'
11 import { CircularArray
} from
'../circular-array'
12 import { Queue
} from
'../queue'
21 type TasksQueueOptions
,
24 import type { IWorker
, Task
, WorkerNode
, WorkerUsage
} from
'./worker'
27 WorkerChoiceStrategies
,
28 type WorkerChoiceStrategy
,
29 type WorkerChoiceStrategyOptions
30 } from
'./selection-strategies/selection-strategies-types'
31 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
34 * Base class that implements some shared logic for all poolifier pools.
36 * @typeParam Worker - Type of worker which manages this pool.
37 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
38 * @typeParam Response - Type of execution response. This can only be serializable data.
40 export abstract class AbstractPool
<
41 Worker
extends IWorker
,
44 > implements IPool
<Worker
, Data
, Response
> {
46 public readonly workerNodes
: Array<WorkerNode
<Worker
, Data
>> = []
49 public readonly emitter
?: PoolEmitter
52 * The execution response promise map.
54 * - `key`: The message id of each submitted task.
55 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
57 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
59 protected promiseResponseMap
: Map
<
61 PromiseResponseWrapper
<Worker
, Response
>
62 > = new Map
<string, PromiseResponseWrapper
<Worker
, Response
>>()
65 * Worker choice strategy context referencing a worker choice algorithm implementation.
67 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
74 * Constructs a new poolifier pool.
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
81 protected readonly numberOfWorkers
: number,
82 protected readonly filePath
: string,
83 protected readonly opts
: PoolOptions
<Worker
>
86 throw new Error('Cannot start a pool from a worker!')
88 this.checkNumberOfWorkers(this.numberOfWorkers
)
89 this.checkFilePath(this.filePath
)
90 this.checkPoolOptions(this.opts
)
92 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
93 this.executeTask
= this.executeTask
.bind(this)
94 this.enqueueTask
= this.enqueueTask
.bind(this)
95 this.checkAndEmitEvents
= this.checkAndEmitEvents
.bind(this)
97 if (this.opts
.enableEvents
=== true) {
98 this.emitter
= new PoolEmitter()
100 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
106 this.opts
.workerChoiceStrategy
,
107 this.opts
.workerChoiceStrategyOptions
112 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
113 this.createAndSetupWorker()
117 private checkFilePath (filePath
: string): void {
120 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
122 throw new Error('Please specify a file with a worker implementation')
126 private checkNumberOfWorkers (numberOfWorkers
: number): void {
127 if (numberOfWorkers
== null) {
129 'Cannot instantiate a pool without specifying the number of workers'
131 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
133 'Cannot instantiate a pool with a non safe integer number of workers'
135 } else if (numberOfWorkers
< 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
139 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
144 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
145 if (isPlainObject(opts
)) {
146 this.opts
.workerChoiceStrategy
=
147 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
149 this.opts
.workerChoiceStrategyOptions
=
150 opts
.workerChoiceStrategyOptions
??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts
.workerChoiceStrategyOptions
155 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
156 this.opts
.enableEvents
= opts
.enableEvents
?? true
157 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
158 if (this.opts
.enableTasksQueue
) {
159 this.checkValidTasksQueueOptions(
160 opts
.tasksQueueOptions
as TasksQueueOptions
162 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
163 opts
.tasksQueueOptions
as TasksQueueOptions
167 throw new TypeError('Invalid pool options: must be a plain object')
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy
: WorkerChoiceStrategy
174 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
184 if (!isPlainObject(workerChoiceStrategyOptions
)) {
186 'Invalid worker choice strategy options: must be a plain object'
190 workerChoiceStrategyOptions
.weights
!= null &&
191 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
194 'Invalid worker choice strategy options: must have a weight for each worker node'
198 workerChoiceStrategyOptions
.measurement
!= null &&
199 !Object.values(Measurements
).includes(
200 workerChoiceStrategyOptions
.measurement
204 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
209 private checkValidTasksQueueOptions (
210 tasksQueueOptions
: TasksQueueOptions
212 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
213 throw new TypeError('Invalid tasks queue options: must be a plain object')
216 tasksQueueOptions
?.concurrency
!= null &&
217 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
220 'Invalid worker tasks concurrency: must be an integer'
224 tasksQueueOptions
?.concurrency
!= null &&
225 tasksQueueOptions
.concurrency
<= 0
228 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
234 public get
info (): PoolInfo
{
238 minSize
: this.minSize
,
239 maxSize
: this.maxSize
,
240 workerNodes
: this.workerNodes
.length
,
241 idleWorkerNodes
: this.workerNodes
.reduce(
242 (accumulator
, workerNode
) =>
243 workerNode
.workerUsage
.tasks
.executing
=== 0
248 busyWorkerNodes
: this.workerNodes
.reduce(
249 (accumulator
, workerNode
) =>
250 workerNode
.workerUsage
.tasks
.executing
> 0
255 executedTasks
: this.workerNodes
.reduce(
256 (accumulator
, workerNode
) =>
257 accumulator
+ workerNode
.workerUsage
.tasks
.executed
,
260 executingTasks
: this.workerNodes
.reduce(
261 (accumulator
, workerNode
) =>
262 accumulator
+ workerNode
.workerUsage
.tasks
.executing
,
265 queuedTasks
: this.workerNodes
.reduce(
266 (accumulator
, workerNode
) =>
267 accumulator
+ workerNode
.workerUsage
.tasks
.queued
,
270 maxQueuedTasks
: this.workerNodes
.reduce(
271 (accumulator
, workerNode
) =>
272 accumulator
+ workerNode
.workerUsage
.tasks
.maxQueued
,
275 failedTasks
: this.workerNodes
.reduce(
276 (accumulator
, workerNode
) =>
277 accumulator
+ workerNode
.workerUsage
.tasks
.failed
,
286 * If it is `'dynamic'`, it provides the `max` property.
288 protected abstract get
type (): PoolType
291 * Gets the worker type.
293 protected abstract get
worker (): WorkerType
298 protected abstract get
minSize (): number
303 protected abstract get
maxSize (): number
306 * Gets the given worker its worker node key.
308 * @param worker - The worker.
309 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
311 private getWorkerNodeKey (worker
: Worker
): number {
312 return this.workerNodes
.findIndex(
313 workerNode
=> workerNode
.worker
=== worker
318 public setWorkerChoiceStrategy (
319 workerChoiceStrategy
: WorkerChoiceStrategy
,
320 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
322 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
323 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
324 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
325 this.opts
.workerChoiceStrategy
327 if (workerChoiceStrategyOptions
!= null) {
328 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
330 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
331 this.setWorkerNodeTasksUsage(
333 this.getWorkerUsage(workerNodeKey
)
335 this.setWorkerStatistics(workerNode
.worker
)
340 public setWorkerChoiceStrategyOptions (
341 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
343 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
344 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
345 this.workerChoiceStrategyContext
.setOptions(
346 this.opts
.workerChoiceStrategyOptions
351 public enableTasksQueue (
353 tasksQueueOptions
?: TasksQueueOptions
355 if (this.opts
.enableTasksQueue
=== true && !enable
) {
356 this.flushTasksQueues()
358 this.opts
.enableTasksQueue
= enable
359 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
363 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
364 if (this.opts
.enableTasksQueue
=== true) {
365 this.checkValidTasksQueueOptions(tasksQueueOptions
)
366 this.opts
.tasksQueueOptions
=
367 this.buildTasksQueueOptions(tasksQueueOptions
)
368 } else if (this.opts
.tasksQueueOptions
!= null) {
369 delete this.opts
.tasksQueueOptions
373 private buildTasksQueueOptions (
374 tasksQueueOptions
: TasksQueueOptions
375 ): TasksQueueOptions
{
377 concurrency
: tasksQueueOptions
?.concurrency
?? 1
382 * Whether the pool is full or not.
384 * The pool filling boolean status.
386 protected get
full (): boolean {
387 return this.workerNodes
.length
>= this.maxSize
391 * Whether the pool is busy or not.
393 * The pool busyness boolean status.
395 protected abstract get
busy (): boolean
398 * Whether worker nodes are executing at least one task.
400 * @returns Worker nodes busyness boolean status.
402 protected internalBusy (): boolean {
404 this.workerNodes
.findIndex(workerNode
=> {
405 return workerNode
.workerUsage
.tasks
.executing
=== 0
411 public async execute (data
?: Data
, name
?: string): Promise
<Response
> {
412 const timestamp
= performance
.now()
413 const workerNodeKey
= this.chooseWorkerNode()
414 const submittedTask
: Task
<Data
> = {
416 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
417 data
: data
?? ({} as Data
),
419 id
: crypto
.randomUUID()
421 const res
= new Promise
<Response
>((resolve
, reject
) => {
422 this.promiseResponseMap
.set(submittedTask
.id
as string, {
425 worker
: this.workerNodes
[workerNodeKey
].worker
429 this.opts
.enableTasksQueue
=== true &&
431 this.workerNodes
[workerNodeKey
].workerUsage
.tasks
.executing
>=
432 ((this.opts
.tasksQueueOptions
as TasksQueueOptions
)
433 .concurrency
as number))
435 this.enqueueTask(workerNodeKey
, submittedTask
)
437 this.executeTask(workerNodeKey
, submittedTask
)
439 this.checkAndEmitEvents()
440 // eslint-disable-next-line @typescript-eslint/return-await
445 public async destroy (): Promise
<void> {
447 this.workerNodes
.map(async (workerNode
, workerNodeKey
) => {
448 this.flushTasksQueue(workerNodeKey
)
449 // FIXME: wait for tasks to be finished
450 await this.destroyWorker(workerNode
.worker
)
456 * Terminates the given worker.
458 * @param worker - A worker within `workerNodes`.
460 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
463 * Setup hook to execute code before worker node are created in the abstract constructor.
468 protected setupHook (): void {
469 // Intentionally empty
473 * Should return whether the worker is the main worker or not.
475 protected abstract isMain (): boolean
478 * Hook executed before the worker task execution.
481 * @param workerNodeKey - The worker node key.
482 * @param task - The task to execute.
484 protected beforeTaskExecutionHook (
485 workerNodeKey
: number,
488 const workerUsage
= this.workerNodes
[workerNodeKey
].workerUsage
489 ++workerUsage
.tasks
.executing
490 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
494 * Hook executed after the worker task execution.
497 * @param worker - The worker.
498 * @param message - The received message.
500 protected afterTaskExecutionHook (
502 message
: MessageValue
<Response
>
505 this.workerNodes
[this.getWorkerNodeKey(worker
)].workerUsage
506 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
507 this.updateRunTimeWorkerUsage(workerUsage
, message
)
508 this.updateEluWorkerUsage(workerUsage
, message
)
511 private updateTaskStatisticsWorkerUsage (
512 workerUsage
: WorkerUsage
,
513 message
: MessageValue
<Response
>
515 const workerTaskStatistics
= workerUsage
.tasks
516 --workerTaskStatistics
.executing
517 ++workerTaskStatistics
.executed
518 if (message
.taskError
!= null) {
519 ++workerTaskStatistics
.failed
523 private updateRunTimeWorkerUsage (
524 workerUsage
: WorkerUsage
,
525 message
: MessageValue
<Response
>
528 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
531 workerUsage
.runTime
.aggregate
+= message
.taskPerformance
?.runTime
?? 0
533 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
535 workerUsage
.tasks
.executed
!== 0
537 workerUsage
.runTime
.average
=
538 workerUsage
.runTime
.aggregate
/
539 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
542 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
544 message
.taskPerformance
?.runTime
!= null
546 workerUsage
.runTime
.history
.push(message
.taskPerformance
.runTime
)
547 workerUsage
.runTime
.median
= median(workerUsage
.runTime
.history
)
552 private updateWaitTimeWorkerUsage (
553 workerUsage
: WorkerUsage
,
556 const timestamp
= performance
.now()
557 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
559 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
562 workerUsage
.waitTime
.aggregate
+= taskWaitTime
?? 0
564 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
566 workerUsage
.tasks
.executed
!== 0
568 workerUsage
.waitTime
.average
=
569 workerUsage
.waitTime
.aggregate
/
570 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
573 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
577 workerUsage
.waitTime
.history
.push(taskWaitTime
)
578 workerUsage
.waitTime
.median
= median(workerUsage
.waitTime
.history
)
583 private updateEluWorkerUsage (
584 workerUsage
: WorkerUsage
,
585 message
: MessageValue
<Response
>
588 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
591 if (workerUsage
.elu
!= null && message
.taskPerformance
?.elu
!= null) {
592 workerUsage
.elu
.idle
.aggregate
+= message
.taskPerformance
.elu
.idle
593 workerUsage
.elu
.active
.aggregate
+= message
.taskPerformance
.elu
.active
594 workerUsage
.elu
.utilization
=
595 (workerUsage
.elu
.utilization
+
596 message
.taskPerformance
.elu
.utilization
) /
598 } else if (message
.taskPerformance
?.elu
!= null) {
599 workerUsage
.elu
.idle
.aggregate
= message
.taskPerformance
.elu
.idle
600 workerUsage
.elu
.active
.aggregate
= message
.taskPerformance
.elu
.active
601 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
604 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
606 workerUsage
.tasks
.executed
!== 0
608 const executedTasks
=
609 workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
610 workerUsage
.elu
.idle
.average
=
611 workerUsage
.elu
.idle
.aggregate
/ executedTasks
612 workerUsage
.elu
.active
.average
=
613 workerUsage
.elu
.active
.aggregate
/ executedTasks
616 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
618 message
.taskPerformance
?.elu
!= null
620 workerUsage
.elu
.idle
.history
.push(message
.taskPerformance
.elu
.idle
)
621 workerUsage
.elu
.active
.history
.push(message
.taskPerformance
.elu
.active
)
622 workerUsage
.elu
.idle
.median
= median(workerUsage
.elu
.idle
.history
)
623 workerUsage
.elu
.active
.median
= median(workerUsage
.elu
.active
.history
)
629 * Chooses a worker node for the next task.
631 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
633 * @returns The worker node key
635 private chooseWorkerNode (): number {
636 if (this.shallCreateDynamicWorker()) {
637 const worker
= this.createAndSetupDynamicWorker()
639 this.workerChoiceStrategyContext
.getStrategyPolicy().useDynamicWorker
641 return this.getWorkerNodeKey(worker
)
644 return this.workerChoiceStrategyContext
.execute()
648 * Conditions for dynamic worker creation.
650 * @returns Whether to create a dynamic worker or not.
652 private shallCreateDynamicWorker (): boolean {
653 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
657 * Sends a message to the given worker.
659 * @param worker - The worker which should receive the message.
660 * @param message - The message.
662 protected abstract sendToWorker (
664 message
: MessageValue
<Data
>
668 * Registers a listener callback on the given worker.
670 * @param worker - The worker which should register a listener.
671 * @param listener - The message listener callback.
673 protected abstract registerWorkerMessageListener
<
674 Message
extends Data
| Response
675 >(worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
678 * Creates a new worker.
680 * @returns Newly created worker.
682 protected abstract createWorker (): Worker
685 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
687 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
689 * @param worker - The newly created worker.
691 protected abstract afterWorkerSetup (worker
: Worker
): void
694 * Creates a new worker and sets it up completely in the pool worker nodes.
696 * @returns New, completely set up worker.
698 protected createAndSetupWorker (): Worker
{
699 const worker
= this.createWorker()
701 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
702 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
703 worker
.on('error', error
=> {
704 if (this.emitter
!= null) {
705 this.emitter
.emit(PoolEvents
.error
, error
)
707 if (this.opts
.restartWorkerOnError
=== true) {
708 this.createAndSetupWorker()
711 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
712 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
713 worker
.once('exit', () => {
714 this.removeWorkerNode(worker
)
717 this.pushWorkerNode(worker
)
719 this.setWorkerStatistics(worker
)
721 this.afterWorkerSetup(worker
)
727 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
729 * @returns New, completely set up dynamic worker.
731 protected createAndSetupDynamicWorker (): Worker
{
732 const worker
= this.createAndSetupWorker()
733 this.registerWorkerMessageListener(worker
, message
=> {
734 const workerNodeKey
= this.getWorkerNodeKey(worker
)
736 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
737 (message
.kill
!= null &&
738 ((this.opts
.enableTasksQueue
=== false &&
739 this.workerNodes
[workerNodeKey
].workerUsage
.tasks
.executing
===
741 (this.opts
.enableTasksQueue
=== true &&
742 this.workerNodes
[workerNodeKey
].workerUsage
.tasks
.executing
===
744 this.tasksQueueSize(workerNodeKey
) === 0)))
746 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
747 void (this.destroyWorker(worker
) as Promise
<void>)
754 * This function is the listener registered for each worker message.
756 * @returns The listener function to execute when a message is received from a worker.
758 protected workerListener (): (message
: MessageValue
<Response
>) => void {
760 if (message
.id
!= null) {
761 // Task execution response received
762 const promiseResponse
= this.promiseResponseMap
.get(message
.id
)
763 if (promiseResponse
!= null) {
764 if (message
.taskError
!= null) {
765 if (this.emitter
!= null) {
766 this.emitter
.emit(PoolEvents
.taskError
, message
.taskError
)
768 promiseResponse
.reject(message
.taskError
.message
)
770 promiseResponse
.resolve(message
.data
as Response
)
772 this.afterTaskExecutionHook(promiseResponse
.worker
, message
)
773 this.promiseResponseMap
.delete(message
.id
)
774 const workerNodeKey
= this.getWorkerNodeKey(promiseResponse
.worker
)
776 this.opts
.enableTasksQueue
=== true &&
777 this.tasksQueueSize(workerNodeKey
) > 0
781 this.dequeueTask(workerNodeKey
) as Task
<Data
>
784 this.workerChoiceStrategyContext
.update(workerNodeKey
)
790 private checkAndEmitEvents (): void {
791 if (this.emitter
!= null) {
793 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
795 if (this.type === PoolTypes
.dynamic
&& this.full
) {
796 this.emitter
?.emit(PoolEvents
.full
, this.info
)
802 * Sets the given worker node its tasks usage in the pool.
804 * @param workerNode - The worker node.
805 * @param workerUsage - The worker usage.
807 private setWorkerNodeTasksUsage (
808 workerNode
: WorkerNode
<Worker
, Data
>,
809 workerUsage
: WorkerUsage
811 workerNode
.workerUsage
= workerUsage
815 * Pushes the given worker in the pool worker nodes.
817 * @param worker - The worker.
818 * @returns The worker nodes length.
820 private pushWorkerNode (worker
: Worker
): number {
821 this.workerNodes
.push({
823 workerUsage
: this.getWorkerUsage(),
824 tasksQueue
: new Queue
<Task
<Data
>>()
826 const workerNodeKey
= this.getWorkerNodeKey(worker
)
827 this.setWorkerNodeTasksUsage(
828 this.workerNodes
[workerNodeKey
],
829 this.getWorkerUsage(workerNodeKey
)
831 return this.workerNodes
.length
835 // * Sets the given worker in the pool worker nodes.
837 // * @param workerNodeKey - The worker node key.
838 // * @param worker - The worker.
839 // * @param workerUsage - The worker usage.
840 // * @param tasksQueue - The worker task queue.
842 // private setWorkerNode (
843 // workerNodeKey: number,
845 // workerUsage: WorkerUsage,
846 // tasksQueue: Queue<Task<Data>>
848 // this.workerNodes[workerNodeKey] = {
856 * Removes the given worker from the pool worker nodes.
858 * @param worker - The worker.
860 private removeWorkerNode (worker
: Worker
): void {
861 const workerNodeKey
= this.getWorkerNodeKey(worker
)
862 if (workerNodeKey
!== -1) {
863 this.workerNodes
.splice(workerNodeKey
, 1)
864 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
868 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
869 this.beforeTaskExecutionHook(workerNodeKey
, task
)
870 this.sendToWorker(this.workerNodes
[workerNodeKey
].worker
, task
)
873 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
874 return this.workerNodes
[workerNodeKey
].tasksQueue
.enqueue(task
)
877 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
878 return this.workerNodes
[workerNodeKey
].tasksQueue
.dequeue()
881 private tasksQueueSize (workerNodeKey
: number): number {
882 return this.workerNodes
[workerNodeKey
].tasksQueue
.size
885 private tasksMaxQueueSize (workerNodeKey
: number): number {
886 return this.workerNodes
[workerNodeKey
].tasksQueue
.maxSize
889 private flushTasksQueue (workerNodeKey
: number): void {
890 if (this.tasksQueueSize(workerNodeKey
) > 0) {
891 for (let i
= 0; i
< this.tasksQueueSize(workerNodeKey
); i
++) {
894 this.dequeueTask(workerNodeKey
) as Task
<Data
>
898 this.workerNodes
[workerNodeKey
].tasksQueue
.clear()
901 private flushTasksQueues (): void {
902 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
903 this.flushTasksQueue(workerNodeKey
)
907 private setWorkerStatistics (worker
: Worker
): void {
908 this.sendToWorker(worker
, {
911 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
913 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
919 private getWorkerUsage (workerNodeKey
?: number): WorkerUsage
{
920 const getTasksQueueSize
= (workerNodeKey
?: number): number => {
921 return workerNodeKey
!= null ? this.tasksQueueSize(workerNodeKey
) : 0
923 const getTasksMaxQueueSize
= (workerNodeKey
?: number): number => {
924 return workerNodeKey
!= null ? this.tasksMaxQueueSize(workerNodeKey
) : 0
930 get
queued (): number {
931 return getTasksQueueSize(workerNodeKey
)
933 get
maxQueued (): number {
934 return getTasksMaxQueueSize(workerNodeKey
)
942 history
: new CircularArray()
948 history
: new CircularArray()
955 history
: new CircularArray()
961 history
: new CircularArray()