1 import crypto from
'node:crypto'
2 import { performance
} from
'node:perf_hooks'
3 import type { MessageValue
, PromiseResponseWrapper
} from
'../utility-types'
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
,
10 import { KillBehaviors
, isKillBehavior
} from
'../worker/worker-options'
11 import { CircularArray
} from
'../circular-array'
12 import { Queue
} from
'../queue'
21 type TasksQueueOptions
,
24 import type { IWorker
, Task
, WorkerNode
, WorkerUsage
} from
'./worker'
27 WorkerChoiceStrategies
,
28 type WorkerChoiceStrategy
,
29 type WorkerChoiceStrategyOptions
30 } from
'./selection-strategies/selection-strategies-types'
31 import { WorkerChoiceStrategyContext
} from
'./selection-strategies/worker-choice-strategy-context'
34 * Base class that implements some shared logic for all poolifier pools.
36 * @typeParam Worker - Type of worker which manages this pool.
37 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
38 * @typeParam Response - Type of execution response. This can only be serializable data.
40 export abstract class AbstractPool
<
41 Worker
extends IWorker
,
44 > implements IPool
<Worker
, Data
, Response
> {
46 public readonly workerNodes
: Array<WorkerNode
<Worker
, Data
>> = []
49 public readonly emitter
?: PoolEmitter
52 * The execution response promise map.
54 * - `key`: The message id of each submitted task.
55 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
57 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
59 protected promiseResponseMap
: Map
<
61 PromiseResponseWrapper
<Worker
, Response
>
62 > = new Map
<string, PromiseResponseWrapper
<Worker
, Response
>>()
65 * Worker choice strategy context referencing a worker choice algorithm implementation.
67 protected workerChoiceStrategyContext
: WorkerChoiceStrategyContext
<
74 * Constructs a new poolifier pool.
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
81 protected readonly numberOfWorkers
: number,
82 protected readonly filePath
: string,
83 protected readonly opts
: PoolOptions
<Worker
>
86 throw new Error('Cannot start a pool from a worker!')
88 this.checkNumberOfWorkers(this.numberOfWorkers
)
89 this.checkFilePath(this.filePath
)
90 this.checkPoolOptions(this.opts
)
92 this.chooseWorkerNode
= this.chooseWorkerNode
.bind(this)
93 this.executeTask
= this.executeTask
.bind(this)
94 this.enqueueTask
= this.enqueueTask
.bind(this)
95 this.checkAndEmitEvents
= this.checkAndEmitEvents
.bind(this)
97 if (this.opts
.enableEvents
=== true) {
98 this.emitter
= new PoolEmitter()
100 this.workerChoiceStrategyContext
= new WorkerChoiceStrategyContext
<
106 this.opts
.workerChoiceStrategy
,
107 this.opts
.workerChoiceStrategyOptions
112 for (let i
= 1; i
<= this.numberOfWorkers
; i
++) {
113 this.createAndSetupWorker()
117 private checkFilePath (filePath
: string): void {
120 (typeof filePath
=== 'string' && filePath
.trim().length
=== 0)
122 throw new Error('Please specify a file with a worker implementation')
126 private checkNumberOfWorkers (numberOfWorkers
: number): void {
127 if (numberOfWorkers
== null) {
129 'Cannot instantiate a pool without specifying the number of workers'
131 } else if (!Number.isSafeInteger(numberOfWorkers
)) {
133 'Cannot instantiate a pool with a non safe integer number of workers'
135 } else if (numberOfWorkers
< 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
139 } else if (this.type === PoolTypes
.fixed
&& numberOfWorkers
=== 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
144 private checkPoolOptions (opts
: PoolOptions
<Worker
>): void {
145 if (isPlainObject(opts
)) {
146 this.opts
.workerChoiceStrategy
=
147 opts
.workerChoiceStrategy
?? WorkerChoiceStrategies
.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts
.workerChoiceStrategy
)
149 this.opts
.workerChoiceStrategyOptions
=
150 opts
.workerChoiceStrategyOptions
??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts
.workerChoiceStrategyOptions
155 this.opts
.restartWorkerOnError
= opts
.restartWorkerOnError
?? true
156 this.opts
.enableEvents
= opts
.enableEvents
?? true
157 this.opts
.enableTasksQueue
= opts
.enableTasksQueue
?? false
158 if (this.opts
.enableTasksQueue
) {
159 this.checkValidTasksQueueOptions(
160 opts
.tasksQueueOptions
as TasksQueueOptions
162 this.opts
.tasksQueueOptions
= this.buildTasksQueueOptions(
163 opts
.tasksQueueOptions
as TasksQueueOptions
167 throw new TypeError('Invalid pool options: must be a plain object')
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy
: WorkerChoiceStrategy
174 if (!Object.values(WorkerChoiceStrategies
).includes(workerChoiceStrategy
)) {
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
184 if (!isPlainObject(workerChoiceStrategyOptions
)) {
186 'Invalid worker choice strategy options: must be a plain object'
190 workerChoiceStrategyOptions
.weights
!= null &&
191 Object.keys(workerChoiceStrategyOptions
.weights
).length
!== this.maxSize
194 'Invalid worker choice strategy options: must have a weight for each worker node'
198 workerChoiceStrategyOptions
.measurement
!= null &&
199 !Object.values(Measurements
).includes(
200 workerChoiceStrategyOptions
.measurement
204 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
209 private checkValidTasksQueueOptions (
210 tasksQueueOptions
: TasksQueueOptions
212 if (tasksQueueOptions
!= null && !isPlainObject(tasksQueueOptions
)) {
213 throw new TypeError('Invalid tasks queue options: must be a plain object')
216 tasksQueueOptions
?.concurrency
!= null &&
217 !Number.isSafeInteger(tasksQueueOptions
.concurrency
)
220 'Invalid worker tasks concurrency: must be an integer'
224 tasksQueueOptions
?.concurrency
!= null &&
225 tasksQueueOptions
.concurrency
<= 0
228 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
234 public get
info (): PoolInfo
{
238 minSize
: this.minSize
,
239 maxSize
: this.maxSize
,
240 workerNodes
: this.workerNodes
.length
,
241 idleWorkerNodes
: this.workerNodes
.reduce(
242 (accumulator
, workerNode
) =>
243 workerNode
.workerUsage
.tasks
.executing
=== 0
248 busyWorkerNodes
: this.workerNodes
.reduce(
249 (accumulator
, workerNode
) =>
250 workerNode
.workerUsage
.tasks
.executing
> 0
255 executedTasks
: this.workerNodes
.reduce(
256 (accumulator
, workerNode
) =>
257 accumulator
+ workerNode
.workerUsage
.tasks
.executed
,
260 executingTasks
: this.workerNodes
.reduce(
261 (accumulator
, workerNode
) =>
262 accumulator
+ workerNode
.workerUsage
.tasks
.executing
,
265 queuedTasks
: this.workerNodes
.reduce(
266 (accumulator
, workerNode
) => accumulator
+ workerNode
.tasksQueue
.size
,
269 maxQueuedTasks
: this.workerNodes
.reduce(
270 (accumulator
, workerNode
) =>
271 accumulator
+ workerNode
.tasksQueue
.maxSize
,
274 failedTasks
: this.workerNodes
.reduce(
275 (accumulator
, workerNode
) =>
276 accumulator
+ workerNode
.workerUsage
.tasks
.failed
,
285 * If it is `'dynamic'`, it provides the `max` property.
287 protected abstract get
type (): PoolType
290 * Gets the worker type.
292 protected abstract get
worker (): WorkerType
297 protected abstract get
minSize (): number
302 protected abstract get
maxSize (): number
305 * Gets the given worker its worker node key.
307 * @param worker - The worker.
308 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
310 private getWorkerNodeKey (worker
: Worker
): number {
311 return this.workerNodes
.findIndex(
312 workerNode
=> workerNode
.worker
=== worker
317 public setWorkerChoiceStrategy (
318 workerChoiceStrategy
: WorkerChoiceStrategy
,
319 workerChoiceStrategyOptions
?: WorkerChoiceStrategyOptions
321 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy
)
322 this.opts
.workerChoiceStrategy
= workerChoiceStrategy
323 this.workerChoiceStrategyContext
.setWorkerChoiceStrategy(
324 this.opts
.workerChoiceStrategy
326 if (workerChoiceStrategyOptions
!= null) {
327 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
329 for (const [workerNodeKey
, workerNode
] of this.workerNodes
.entries()) {
330 this.setWorkerNodeTasksUsage(
332 this.getWorkerUsage(workerNodeKey
)
334 this.setWorkerStatistics(workerNode
.worker
)
339 public setWorkerChoiceStrategyOptions (
340 workerChoiceStrategyOptions
: WorkerChoiceStrategyOptions
342 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions
)
343 this.opts
.workerChoiceStrategyOptions
= workerChoiceStrategyOptions
344 this.workerChoiceStrategyContext
.setOptions(
345 this.opts
.workerChoiceStrategyOptions
350 public enableTasksQueue (
352 tasksQueueOptions
?: TasksQueueOptions
354 if (this.opts
.enableTasksQueue
=== true && !enable
) {
355 this.flushTasksQueues()
357 this.opts
.enableTasksQueue
= enable
358 this.setTasksQueueOptions(tasksQueueOptions
as TasksQueueOptions
)
362 public setTasksQueueOptions (tasksQueueOptions
: TasksQueueOptions
): void {
363 if (this.opts
.enableTasksQueue
=== true) {
364 this.checkValidTasksQueueOptions(tasksQueueOptions
)
365 this.opts
.tasksQueueOptions
=
366 this.buildTasksQueueOptions(tasksQueueOptions
)
367 } else if (this.opts
.tasksQueueOptions
!= null) {
368 delete this.opts
.tasksQueueOptions
372 private buildTasksQueueOptions (
373 tasksQueueOptions
: TasksQueueOptions
374 ): TasksQueueOptions
{
376 concurrency
: tasksQueueOptions
?.concurrency
?? 1
381 * Whether the pool is full or not.
383 * The pool filling boolean status.
385 protected get
full (): boolean {
386 return this.workerNodes
.length
>= this.maxSize
390 * Whether the pool is busy or not.
392 * The pool busyness boolean status.
394 protected abstract get
busy (): boolean
397 * Whether worker nodes are executing at least one task.
399 * @returns Worker nodes busyness boolean status.
401 protected internalBusy (): boolean {
403 this.workerNodes
.findIndex(workerNode
=> {
404 return workerNode
.workerUsage
.tasks
.executing
=== 0
410 public async execute (data
?: Data
, name
?: string): Promise
<Response
> {
411 const timestamp
= performance
.now()
412 const workerNodeKey
= this.chooseWorkerNode()
413 const submittedTask
: Task
<Data
> = {
415 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
416 data
: data
?? ({} as Data
),
418 id
: crypto
.randomUUID()
420 const res
= new Promise
<Response
>((resolve
, reject
) => {
421 this.promiseResponseMap
.set(submittedTask
.id
as string, {
424 worker
: this.workerNodes
[workerNodeKey
].worker
428 this.opts
.enableTasksQueue
=== true &&
430 this.workerNodes
[workerNodeKey
].workerUsage
.tasks
.executing
>=
431 ((this.opts
.tasksQueueOptions
as TasksQueueOptions
)
432 .concurrency
as number))
434 this.enqueueTask(workerNodeKey
, submittedTask
)
436 this.executeTask(workerNodeKey
, submittedTask
)
438 this.checkAndEmitEvents()
439 // eslint-disable-next-line @typescript-eslint/return-await
444 public async destroy (): Promise
<void> {
446 this.workerNodes
.map(async (workerNode
, workerNodeKey
) => {
447 this.flushTasksQueue(workerNodeKey
)
448 // FIXME: wait for tasks to be finished
449 await this.destroyWorker(workerNode
.worker
)
455 * Terminates the given worker.
457 * @param worker - A worker within `workerNodes`.
459 protected abstract destroyWorker (worker
: Worker
): void | Promise
<void>
462 * Setup hook to execute code before worker node are created in the abstract constructor.
467 protected setupHook (): void {
468 // Intentionally empty
472 * Should return whether the worker is the main worker or not.
474 protected abstract isMain (): boolean
477 * Hook executed before the worker task execution.
480 * @param workerNodeKey - The worker node key.
481 * @param task - The task to execute.
483 protected beforeTaskExecutionHook (
484 workerNodeKey
: number,
487 const workerUsage
= this.workerNodes
[workerNodeKey
].workerUsage
488 ++workerUsage
.tasks
.executing
489 this.updateWaitTimeWorkerUsage(workerUsage
, task
)
493 * Hook executed after the worker task execution.
496 * @param worker - The worker.
497 * @param message - The received message.
499 protected afterTaskExecutionHook (
501 message
: MessageValue
<Response
>
504 this.workerNodes
[this.getWorkerNodeKey(worker
)].workerUsage
505 this.updateTaskStatisticsWorkerUsage(workerUsage
, message
)
506 this.updateRunTimeWorkerUsage(workerUsage
, message
)
507 this.updateEluWorkerUsage(workerUsage
, message
)
510 private updateTaskStatisticsWorkerUsage (
511 workerUsage
: WorkerUsage
,
512 message
: MessageValue
<Response
>
514 const workerTaskStatistics
= workerUsage
.tasks
515 --workerTaskStatistics
.executing
516 ++workerTaskStatistics
.executed
517 if (message
.taskError
!= null) {
518 ++workerTaskStatistics
.failed
522 private updateRunTimeWorkerUsage (
523 workerUsage
: WorkerUsage
,
524 message
: MessageValue
<Response
>
527 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
530 workerUsage
.runTime
.aggregate
+= message
.taskPerformance
?.runTime
?? 0
532 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
534 workerUsage
.tasks
.executed
!== 0
536 workerUsage
.runTime
.average
=
537 workerUsage
.runTime
.aggregate
/
538 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
541 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().runTime
543 message
.taskPerformance
?.runTime
!= null
545 workerUsage
.runTime
.history
.push(message
.taskPerformance
.runTime
)
546 workerUsage
.runTime
.median
= median(workerUsage
.runTime
.history
)
551 private updateWaitTimeWorkerUsage (
552 workerUsage
: WorkerUsage
,
555 const timestamp
= performance
.now()
556 const taskWaitTime
= timestamp
- (task
.timestamp
?? timestamp
)
558 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().waitTime
561 workerUsage
.waitTime
.aggregate
+= taskWaitTime
?? 0
563 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
565 workerUsage
.tasks
.executed
!== 0
567 workerUsage
.waitTime
.average
=
568 workerUsage
.waitTime
.aggregate
/
569 (workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
)
572 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
576 workerUsage
.waitTime
.history
.push(taskWaitTime
)
577 workerUsage
.waitTime
.median
= median(workerUsage
.waitTime
.history
)
582 private updateEluWorkerUsage (
583 workerUsage
: WorkerUsage
,
584 message
: MessageValue
<Response
>
587 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
590 if (workerUsage
.elu
!= null && message
.taskPerformance
?.elu
!= null) {
591 workerUsage
.elu
.idle
.aggregate
+= message
.taskPerformance
.elu
.idle
592 workerUsage
.elu
.active
.aggregate
+= message
.taskPerformance
.elu
.active
593 workerUsage
.elu
.utilization
=
594 (workerUsage
.elu
.utilization
+
595 message
.taskPerformance
.elu
.utilization
) /
597 } else if (message
.taskPerformance
?.elu
!= null) {
598 workerUsage
.elu
.idle
.aggregate
= message
.taskPerformance
.elu
.idle
599 workerUsage
.elu
.active
.aggregate
= message
.taskPerformance
.elu
.active
600 workerUsage
.elu
.utilization
= message
.taskPerformance
.elu
.utilization
603 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
605 workerUsage
.tasks
.executed
!== 0
607 const executedTasks
=
608 workerUsage
.tasks
.executed
- workerUsage
.tasks
.failed
609 workerUsage
.elu
.idle
.average
=
610 workerUsage
.elu
.idle
.aggregate
/ executedTasks
611 workerUsage
.elu
.active
.average
=
612 workerUsage
.elu
.active
.aggregate
/ executedTasks
615 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements().elu
617 message
.taskPerformance
?.elu
!= null
619 workerUsage
.elu
.idle
.history
.push(message
.taskPerformance
.elu
.idle
)
620 workerUsage
.elu
.active
.history
.push(message
.taskPerformance
.elu
.active
)
621 workerUsage
.elu
.idle
.median
= median(workerUsage
.elu
.idle
.history
)
622 workerUsage
.elu
.active
.median
= median(workerUsage
.elu
.active
.history
)
628 * Chooses a worker node for the next task.
630 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
632 * @returns The worker node key
634 private chooseWorkerNode (): number {
635 if (this.shallCreateDynamicWorker()) {
636 const worker
= this.createAndSetupDynamicWorker()
638 this.workerChoiceStrategyContext
.getStrategyPolicy().useDynamicWorker
640 return this.getWorkerNodeKey(worker
)
643 return this.workerChoiceStrategyContext
.execute()
647 * Conditions for dynamic worker creation.
649 * @returns Whether to create a dynamic worker or not.
651 private shallCreateDynamicWorker (): boolean {
652 return this.type === PoolTypes
.dynamic
&& !this.full
&& this.internalBusy()
656 * Sends a message to the given worker.
658 * @param worker - The worker which should receive the message.
659 * @param message - The message.
661 protected abstract sendToWorker (
663 message
: MessageValue
<Data
>
667 * Registers a listener callback on the given worker.
669 * @param worker - The worker which should register a listener.
670 * @param listener - The message listener callback.
672 protected abstract registerWorkerMessageListener
<
673 Message
extends Data
| Response
674 >(worker
: Worker
, listener
: (message
: MessageValue
<Message
>) => void): void
677 * Creates a new worker.
679 * @returns Newly created worker.
681 protected abstract createWorker (): Worker
684 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
686 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
688 * @param worker - The newly created worker.
690 protected abstract afterWorkerSetup (worker
: Worker
): void
693 * Creates a new worker and sets it up completely in the pool worker nodes.
695 * @returns New, completely set up worker.
697 protected createAndSetupWorker (): Worker
{
698 const worker
= this.createWorker()
700 worker
.on('message', this.opts
.messageHandler
?? EMPTY_FUNCTION
)
701 worker
.on('error', this.opts
.errorHandler
?? EMPTY_FUNCTION
)
702 worker
.on('error', error
=> {
703 if (this.emitter
!= null) {
704 this.emitter
.emit(PoolEvents
.error
, error
)
706 if (this.opts
.restartWorkerOnError
=== true) {
707 this.createAndSetupWorker()
710 worker
.on('online', this.opts
.onlineHandler
?? EMPTY_FUNCTION
)
711 worker
.on('exit', this.opts
.exitHandler
?? EMPTY_FUNCTION
)
712 worker
.once('exit', () => {
713 this.removeWorkerNode(worker
)
716 this.pushWorkerNode(worker
)
718 this.setWorkerStatistics(worker
)
720 this.afterWorkerSetup(worker
)
726 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
728 * @returns New, completely set up dynamic worker.
730 protected createAndSetupDynamicWorker (): Worker
{
731 const worker
= this.createAndSetupWorker()
732 this.registerWorkerMessageListener(worker
, message
=> {
733 const workerNodeKey
= this.getWorkerNodeKey(worker
)
735 isKillBehavior(KillBehaviors
.HARD
, message
.kill
) ||
736 (message
.kill
!= null &&
737 ((this.opts
.enableTasksQueue
=== false &&
738 this.workerNodes
[workerNodeKey
].workerUsage
.tasks
.executing
===
740 (this.opts
.enableTasksQueue
=== true &&
741 this.workerNodes
[workerNodeKey
].workerUsage
.tasks
.executing
===
743 this.tasksQueueSize(workerNodeKey
) === 0)))
745 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
746 void (this.destroyWorker(worker
) as Promise
<void>)
753 * This function is the listener registered for each worker message.
755 * @returns The listener function to execute when a message is received from a worker.
757 protected workerListener (): (message
: MessageValue
<Response
>) => void {
759 if (message
.id
!= null) {
760 // Task execution response received
761 const promiseResponse
= this.promiseResponseMap
.get(message
.id
)
762 if (promiseResponse
!= null) {
763 if (message
.taskError
!= null) {
764 if (this.emitter
!= null) {
765 this.emitter
.emit(PoolEvents
.taskError
, message
.taskError
)
767 promiseResponse
.reject(message
.taskError
.message
)
769 promiseResponse
.resolve(message
.data
as Response
)
771 this.afterTaskExecutionHook(promiseResponse
.worker
, message
)
772 this.promiseResponseMap
.delete(message
.id
)
773 const workerNodeKey
= this.getWorkerNodeKey(promiseResponse
.worker
)
775 this.opts
.enableTasksQueue
=== true &&
776 this.tasksQueueSize(workerNodeKey
) > 0
780 this.dequeueTask(workerNodeKey
) as Task
<Data
>
783 this.workerChoiceStrategyContext
.update(workerNodeKey
)
789 private checkAndEmitEvents (): void {
790 if (this.emitter
!= null) {
792 this.emitter
?.emit(PoolEvents
.busy
, this.info
)
794 if (this.type === PoolTypes
.dynamic
&& this.full
) {
795 this.emitter
?.emit(PoolEvents
.full
, this.info
)
801 * Sets the given worker node its tasks usage in the pool.
803 * @param workerNode - The worker node.
804 * @param workerUsage - The worker usage.
806 private setWorkerNodeTasksUsage (
807 workerNode
: WorkerNode
<Worker
, Data
>,
808 workerUsage
: WorkerUsage
810 workerNode
.workerUsage
= workerUsage
814 * Pushes the given worker in the pool worker nodes.
816 * @param worker - The worker.
817 * @returns The worker nodes length.
819 private pushWorkerNode (worker
: Worker
): number {
820 this.workerNodes
.push({
822 workerUsage
: this.getWorkerUsage(),
823 tasksQueue
: new Queue
<Task
<Data
>>()
825 const workerNodeKey
= this.getWorkerNodeKey(worker
)
826 this.setWorkerNodeTasksUsage(
827 this.workerNodes
[workerNodeKey
],
828 this.getWorkerUsage(workerNodeKey
)
830 return this.workerNodes
.length
834 // * Sets the given worker in the pool worker nodes.
836 // * @param workerNodeKey - The worker node key.
837 // * @param worker - The worker.
838 // * @param workerUsage - The worker usage.
839 // * @param tasksQueue - The worker task queue.
841 // private setWorkerNode (
842 // workerNodeKey: number,
844 // workerUsage: WorkerUsage,
845 // tasksQueue: Queue<Task<Data>>
847 // this.workerNodes[workerNodeKey] = {
855 * Removes the given worker from the pool worker nodes.
857 * @param worker - The worker.
859 private removeWorkerNode (worker
: Worker
): void {
860 const workerNodeKey
= this.getWorkerNodeKey(worker
)
861 if (workerNodeKey
!== -1) {
862 this.workerNodes
.splice(workerNodeKey
, 1)
863 this.workerChoiceStrategyContext
.remove(workerNodeKey
)
867 private executeTask (workerNodeKey
: number, task
: Task
<Data
>): void {
868 this.beforeTaskExecutionHook(workerNodeKey
, task
)
869 this.sendToWorker(this.workerNodes
[workerNodeKey
].worker
, task
)
872 private enqueueTask (workerNodeKey
: number, task
: Task
<Data
>): number {
873 return this.workerNodes
[workerNodeKey
].tasksQueue
.enqueue(task
)
876 private dequeueTask (workerNodeKey
: number): Task
<Data
> | undefined {
877 return this.workerNodes
[workerNodeKey
].tasksQueue
.dequeue()
880 private tasksQueueSize (workerNodeKey
: number): number {
881 return this.workerNodes
[workerNodeKey
].tasksQueue
.size
884 private flushTasksQueue (workerNodeKey
: number): void {
885 if (this.tasksQueueSize(workerNodeKey
) > 0) {
886 for (let i
= 0; i
< this.tasksQueueSize(workerNodeKey
); i
++) {
889 this.dequeueTask(workerNodeKey
) as Task
<Data
>
895 private flushTasksQueues (): void {
896 for (const [workerNodeKey
] of this.workerNodes
.entries()) {
897 this.flushTasksQueue(workerNodeKey
)
901 private setWorkerStatistics (worker
: Worker
): void {
902 this.sendToWorker(worker
, {
905 this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
907 elu
: this.workerChoiceStrategyContext
.getTaskStatisticsRequirements()
913 private getWorkerUsage (workerNodeKey
?: number): WorkerUsage
{
914 const getTasksQueueSize
= (workerNodeKey
?: number): number => {
915 return workerNodeKey
!= null ? this.tasksQueueSize(workerNodeKey
) : 0
921 get
queued (): number {
922 return getTasksQueueSize(workerNodeKey
)
930 history
: new CircularArray()
936 history
: new CircularArray()
943 history
: new CircularArray()
949 history
: new CircularArray()