feat: improve events emission
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3 import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
6 isPlainObject,
7 median
8 } from '../utils'
9 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
10 import { CircularArray } from '../circular-array'
11 import { Queue } from '../queue'
12 import {
13 type IPool,
14 PoolEmitter,
15 PoolEvents,
16 type PoolOptions,
17 PoolType,
18 type TasksQueueOptions
19 } from './pool'
20 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
21 import {
22 WorkerChoiceStrategies,
23 type WorkerChoiceStrategy,
24 type WorkerChoiceStrategyOptions
25 } from './selection-strategies/selection-strategies-types'
26 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
27
28 /**
29 * Base class that implements some shared logic for all poolifier pools.
30 *
31 * @typeParam Worker - Type of worker which manages this pool.
32 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
33 * @typeParam Response - Type of execution response. This can only be serializable data.
34 */
35 export abstract class AbstractPool<
36 Worker extends IWorker,
37 Data = unknown,
38 Response = unknown
39 > implements IPool<Worker, Data, Response> {
40 /** @inheritDoc */
41 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
42
43 /** @inheritDoc */
44 public readonly emitter?: PoolEmitter
45
46 /**
47 * The execution response promise map.
48 *
49 * - `key`: The message id of each submitted task.
50 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
51 *
52 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
53 */
54 protected promiseResponseMap: Map<
55 string,
56 PromiseResponseWrapper<Worker, Response>
57 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
58
59 /**
60 * Worker choice strategy context referencing a worker choice algorithm implementation.
61 *
62 * Default to a round robin algorithm.
63 */
64 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
65 Worker,
66 Data,
67 Response
68 >
69
70 /**
71 * Constructs a new poolifier pool.
72 *
73 * @param numberOfWorkers - Number of workers that this pool should manage.
74 * @param filePath - Path to the worker file.
75 * @param opts - Options for the pool.
76 */
77 public constructor (
78 public readonly numberOfWorkers: number,
79 public readonly filePath: string,
80 public readonly opts: PoolOptions<Worker>
81 ) {
82 if (!this.isMain()) {
83 throw new Error('Cannot start a pool from a worker!')
84 }
85 this.checkNumberOfWorkers(this.numberOfWorkers)
86 this.checkFilePath(this.filePath)
87 this.checkPoolOptions(this.opts)
88
89 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
90 this.executeTask = this.executeTask.bind(this)
91 this.enqueueTask = this.enqueueTask.bind(this)
92 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
93
94 this.setupHook()
95
96 for (let i = 1; i <= this.numberOfWorkers; i++) {
97 this.createAndSetupWorker()
98 }
99
100 if (this.opts.enableEvents === true) {
101 this.emitter = new PoolEmitter()
102 }
103 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
104 Worker,
105 Data,
106 Response
107 >(
108 this,
109 this.opts.workerChoiceStrategy,
110 this.opts.workerChoiceStrategyOptions
111 )
112 }
113
114 private checkFilePath (filePath: string): void {
115 if (
116 filePath == null ||
117 (typeof filePath === 'string' && filePath.trim().length === 0)
118 ) {
119 throw new Error('Please specify a file with a worker implementation')
120 }
121 }
122
123 private checkNumberOfWorkers (numberOfWorkers: number): void {
124 if (numberOfWorkers == null) {
125 throw new Error(
126 'Cannot instantiate a pool without specifying the number of workers'
127 )
128 } else if (!Number.isSafeInteger(numberOfWorkers)) {
129 throw new TypeError(
130 'Cannot instantiate a pool with a non safe integer number of workers'
131 )
132 } else if (numberOfWorkers < 0) {
133 throw new RangeError(
134 'Cannot instantiate a pool with a negative number of workers'
135 )
136 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
137 throw new Error('Cannot instantiate a fixed pool with no worker')
138 }
139 }
140
141 private checkPoolOptions (opts: PoolOptions<Worker>): void {
142 if (isPlainObject(opts)) {
143 this.opts.workerChoiceStrategy =
144 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
145 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
146 this.opts.workerChoiceStrategyOptions =
147 opts.workerChoiceStrategyOptions ??
148 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
149 this.checkValidWorkerChoiceStrategyOptions(
150 this.opts.workerChoiceStrategyOptions
151 )
152 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
153 this.opts.enableEvents = opts.enableEvents ?? true
154 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
155 if (this.opts.enableTasksQueue) {
156 this.checkValidTasksQueueOptions(
157 opts.tasksQueueOptions as TasksQueueOptions
158 )
159 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 }
163 } else {
164 throw new TypeError('Invalid pool options: must be a plain object')
165 }
166 }
167
168 private checkValidWorkerChoiceStrategy (
169 workerChoiceStrategy: WorkerChoiceStrategy
170 ): void {
171 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
172 throw new Error(
173 `Invalid worker choice strategy '${workerChoiceStrategy}'`
174 )
175 }
176 }
177
178 private checkValidWorkerChoiceStrategyOptions (
179 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
180 ): void {
181 if (!isPlainObject(workerChoiceStrategyOptions)) {
182 throw new TypeError(
183 'Invalid worker choice strategy options: must be a plain object'
184 )
185 }
186 if (
187 workerChoiceStrategyOptions.weights != null &&
188 Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
189 ) {
190 throw new Error(
191 'Invalid worker choice strategy options: must have a weight for each worker node'
192 )
193 }
194 }
195
196 private checkValidTasksQueueOptions (
197 tasksQueueOptions: TasksQueueOptions
198 ): void {
199 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
200 throw new TypeError('Invalid tasks queue options: must be a plain object')
201 }
202 if ((tasksQueueOptions?.concurrency as number) <= 0) {
203 throw new Error(
204 `Invalid worker tasks concurrency '${
205 tasksQueueOptions.concurrency as number
206 }'`
207 )
208 }
209 }
210
211 /** @inheritDoc */
212 public abstract get type (): PoolType
213
214 /** @inheritDoc */
215 public abstract get size (): number
216
217 /**
218 * Number of tasks running in the pool.
219 */
220 private get numberOfRunningTasks (): number {
221 return this.workerNodes.reduce(
222 (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running,
223 0
224 )
225 }
226
227 /**
228 * Number of tasks queued in the pool.
229 */
230 private get numberOfQueuedTasks (): number {
231 if (this.opts.enableTasksQueue === false) {
232 return 0
233 }
234 return this.workerNodes.reduce(
235 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
236 0
237 )
238 }
239
240 /**
241 * Gets the given worker its worker node key.
242 *
243 * @param worker - The worker.
244 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
245 */
246 private getWorkerNodeKey (worker: Worker): number {
247 return this.workerNodes.findIndex(
248 workerNode => workerNode.worker === worker
249 )
250 }
251
252 /** @inheritDoc */
253 public setWorkerChoiceStrategy (
254 workerChoiceStrategy: WorkerChoiceStrategy,
255 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
256 ): void {
257 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
258 this.opts.workerChoiceStrategy = workerChoiceStrategy
259 for (const workerNode of this.workerNodes) {
260 this.setWorkerNodeTasksUsage(workerNode, {
261 run: 0,
262 running: 0,
263 runTime: 0,
264 runTimeHistory: new CircularArray(),
265 avgRunTime: 0,
266 medRunTime: 0,
267 waitTime: 0,
268 waitTimeHistory: new CircularArray(),
269 avgWaitTime: 0,
270 medWaitTime: 0,
271 error: 0
272 })
273 }
274 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
275 this.opts.workerChoiceStrategy
276 )
277 if (workerChoiceStrategyOptions != null) {
278 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
279 }
280 }
281
282 /** @inheritDoc */
283 public setWorkerChoiceStrategyOptions (
284 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
285 ): void {
286 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
287 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
288 this.workerChoiceStrategyContext.setOptions(
289 this.opts.workerChoiceStrategyOptions
290 )
291 }
292
293 /** @inheritDoc */
294 public enableTasksQueue (
295 enable: boolean,
296 tasksQueueOptions?: TasksQueueOptions
297 ): void {
298 if (this.opts.enableTasksQueue === true && !enable) {
299 this.flushTasksQueues()
300 }
301 this.opts.enableTasksQueue = enable
302 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
303 }
304
305 /** @inheritDoc */
306 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
307 if (this.opts.enableTasksQueue === true) {
308 this.checkValidTasksQueueOptions(tasksQueueOptions)
309 this.opts.tasksQueueOptions =
310 this.buildTasksQueueOptions(tasksQueueOptions)
311 } else {
312 delete this.opts.tasksQueueOptions
313 }
314 }
315
316 private buildTasksQueueOptions (
317 tasksQueueOptions: TasksQueueOptions
318 ): TasksQueueOptions {
319 return {
320 concurrency: tasksQueueOptions?.concurrency ?? 1
321 }
322 }
323
324 /**
325 * Whether the pool is full or not.
326 *
327 * The pool filling boolean status.
328 */
329 protected abstract get full (): boolean
330
331 /**
332 * Whether the pool is busy or not.
333 *
334 * The pool busyness boolean status.
335 */
336 protected abstract get busy (): boolean
337
338 protected internalBusy (): boolean {
339 return (
340 this.workerNodes.findIndex(workerNode => {
341 return workerNode.tasksUsage.running === 0
342 }) === -1
343 )
344 }
345
346 /** @inheritDoc */
347 public async execute (data?: Data, name?: string): Promise<Response> {
348 const submissionTimestamp = performance.now()
349 const workerNodeKey = this.chooseWorkerNode()
350 const submittedTask: Task<Data> = {
351 name,
352 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
353 data: data ?? ({} as Data),
354 submissionTimestamp,
355 id: crypto.randomUUID()
356 }
357 const res = new Promise<Response>((resolve, reject) => {
358 this.promiseResponseMap.set(submittedTask.id as string, {
359 resolve,
360 reject,
361 worker: this.workerNodes[workerNodeKey].worker
362 })
363 })
364 if (
365 this.opts.enableTasksQueue === true &&
366 (this.busy ||
367 this.workerNodes[workerNodeKey].tasksUsage.running >=
368 ((this.opts.tasksQueueOptions as TasksQueueOptions)
369 .concurrency as number))
370 ) {
371 this.enqueueTask(workerNodeKey, submittedTask)
372 } else {
373 this.executeTask(workerNodeKey, submittedTask)
374 }
375 this.workerChoiceStrategyContext.update(workerNodeKey)
376 this.checkAndEmitEvents()
377 // eslint-disable-next-line @typescript-eslint/return-await
378 return res
379 }
380
381 /** @inheritDoc */
382 public async destroy (): Promise<void> {
383 await Promise.all(
384 this.workerNodes.map(async (workerNode, workerNodeKey) => {
385 this.flushTasksQueue(workerNodeKey)
386 await this.destroyWorker(workerNode.worker)
387 })
388 )
389 }
390
391 /**
392 * Shutdowns the given worker.
393 *
394 * @param worker - A worker within `workerNodes`.
395 */
396 protected abstract destroyWorker (worker: Worker): void | Promise<void>
397
398 /**
399 * Setup hook to execute code before worker node are created in the abstract constructor.
400 * Can be overridden
401 *
402 * @virtual
403 */
404 protected setupHook (): void {
405 // Intentionally empty
406 }
407
408 /**
409 * Should return whether the worker is the main worker or not.
410 */
411 protected abstract isMain (): boolean
412
413 /**
414 * Hook executed before the worker task execution.
415 * Can be overridden.
416 *
417 * @param workerNodeKey - The worker node key.
418 */
419 protected beforeTaskExecutionHook (workerNodeKey: number): void {
420 ++this.workerNodes[workerNodeKey].tasksUsage.running
421 }
422
423 /**
424 * Hook executed after the worker task execution.
425 * Can be overridden.
426 *
427 * @param worker - The worker.
428 * @param message - The received message.
429 */
430 protected afterTaskExecutionHook (
431 worker: Worker,
432 message: MessageValue<Response>
433 ): void {
434 const workerTasksUsage =
435 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
436 --workerTasksUsage.running
437 ++workerTasksUsage.run
438 if (message.error != null) {
439 ++workerTasksUsage.error
440 }
441 this.updateRunTimeTasksUsage(workerTasksUsage, message)
442 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
443 }
444
445 private updateRunTimeTasksUsage (
446 workerTasksUsage: TasksUsage,
447 message: MessageValue<Response>
448 ): void {
449 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
450 workerTasksUsage.runTime += message.runTime ?? 0
451 if (
452 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
453 workerTasksUsage.run !== 0
454 ) {
455 workerTasksUsage.avgRunTime =
456 workerTasksUsage.runTime / workerTasksUsage.run
457 }
458 if (
459 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
460 message.runTime != null
461 ) {
462 workerTasksUsage.runTimeHistory.push(message.runTime)
463 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
464 }
465 }
466 }
467
468 private updateWaitTimeTasksUsage (
469 workerTasksUsage: TasksUsage,
470 message: MessageValue<Response>
471 ): void {
472 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
473 workerTasksUsage.waitTime += message.waitTime ?? 0
474 if (
475 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
476 workerTasksUsage.run !== 0
477 ) {
478 workerTasksUsage.avgWaitTime =
479 workerTasksUsage.waitTime / workerTasksUsage.run
480 }
481 if (
482 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
483 message.waitTime != null
484 ) {
485 workerTasksUsage.waitTimeHistory.push(message.waitTime)
486 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
487 }
488 }
489 }
490
491 /**
492 * Chooses a worker node for the next task.
493 *
494 * The default worker choice strategy uses a round robin algorithm to distribute the load.
495 *
496 * @returns The worker node key
497 */
498 protected chooseWorkerNode (): number {
499 let workerNodeKey: number
500 if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
501 const workerCreated = this.createAndSetupWorker()
502 this.registerWorkerMessageListener(workerCreated, message => {
503 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
504 if (
505 isKillBehavior(KillBehaviors.HARD, message.kill) ||
506 (message.kill != null &&
507 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
508 ) {
509 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
510 this.flushTasksQueue(currentWorkerNodeKey)
511 void (this.destroyWorker(workerCreated) as Promise<void>)
512 }
513 })
514 workerNodeKey = this.getWorkerNodeKey(workerCreated)
515 } else {
516 workerNodeKey = this.workerChoiceStrategyContext.execute()
517 }
518 return workerNodeKey
519 }
520
521 /**
522 * Sends a message to the given worker.
523 *
524 * @param worker - The worker which should receive the message.
525 * @param message - The message.
526 */
527 protected abstract sendToWorker (
528 worker: Worker,
529 message: MessageValue<Data>
530 ): void
531
532 /**
533 * Registers a listener callback on the given worker.
534 *
535 * @param worker - The worker which should register a listener.
536 * @param listener - The message listener callback.
537 */
538 protected abstract registerWorkerMessageListener<
539 Message extends Data | Response
540 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
541
542 /**
543 * Returns a newly created worker.
544 */
545 protected abstract createWorker (): Worker
546
547 /**
548 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
549 *
550 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
551 *
552 * @param worker - The newly created worker.
553 */
554 protected abstract afterWorkerSetup (worker: Worker): void
555
556 /**
557 * Creates a new worker and sets it up completely in the pool worker nodes.
558 *
559 * @returns New, completely set up worker.
560 */
561 protected createAndSetupWorker (): Worker {
562 const worker = this.createWorker()
563
564 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
565 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
566 worker.on('error', error => {
567 if (this.emitter != null) {
568 this.emitter.emit(PoolEvents.error, error)
569 }
570 })
571 if (this.opts.restartWorkerOnError === true) {
572 worker.on('error', () => {
573 this.createAndSetupWorker()
574 })
575 }
576 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
577 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
578 worker.once('exit', () => {
579 this.removeWorkerNode(worker)
580 })
581
582 this.pushWorkerNode(worker)
583
584 this.afterWorkerSetup(worker)
585
586 return worker
587 }
588
589 /**
590 * This function is the listener registered for each worker message.
591 *
592 * @returns The listener function to execute when a message is received from a worker.
593 */
594 protected workerListener (): (message: MessageValue<Response>) => void {
595 return message => {
596 if (message.id != null) {
597 // Task execution response received
598 const promiseResponse = this.promiseResponseMap.get(message.id)
599 if (promiseResponse != null) {
600 if (message.error != null) {
601 promiseResponse.reject(message.error)
602 if (this.emitter != null) {
603 this.emitter.emit(PoolEvents.taskError, {
604 error: message.error,
605 errorData: message.errorData
606 })
607 }
608 } else {
609 promiseResponse.resolve(message.data as Response)
610 }
611 this.afterTaskExecutionHook(promiseResponse.worker, message)
612 this.promiseResponseMap.delete(message.id)
613 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
614 if (
615 this.opts.enableTasksQueue === true &&
616 this.tasksQueueSize(workerNodeKey) > 0
617 ) {
618 this.executeTask(
619 workerNodeKey,
620 this.dequeueTask(workerNodeKey) as Task<Data>
621 )
622 }
623 }
624 }
625 }
626 }
627
628 private checkAndEmitEvents (): void {
629 if (this.emitter != null) {
630 const poolInfo = {
631 size: this.size,
632 workerNodes: this.workerNodes.length,
633 runningTasks: this.numberOfRunningTasks,
634 queuedTasks: this.numberOfQueuedTasks
635 }
636 if (this.busy) {
637 this.emitter?.emit(PoolEvents.busy, poolInfo)
638 }
639 if (this.type === PoolType.DYNAMIC && this.full) {
640 this.emitter?.emit(PoolEvents.full, poolInfo)
641 }
642 }
643 }
644
645 /**
646 * Sets the given worker node its tasks usage in the pool.
647 *
648 * @param workerNode - The worker node.
649 * @param tasksUsage - The worker node tasks usage.
650 */
651 private setWorkerNodeTasksUsage (
652 workerNode: WorkerNode<Worker, Data>,
653 tasksUsage: TasksUsage
654 ): void {
655 workerNode.tasksUsage = tasksUsage
656 }
657
658 /**
659 * Pushes the given worker in the pool worker nodes.
660 *
661 * @param worker - The worker.
662 * @returns The worker nodes length.
663 */
664 private pushWorkerNode (worker: Worker): number {
665 return this.workerNodes.push({
666 worker,
667 tasksUsage: {
668 run: 0,
669 running: 0,
670 runTime: 0,
671 runTimeHistory: new CircularArray(),
672 avgRunTime: 0,
673 medRunTime: 0,
674 waitTime: 0,
675 waitTimeHistory: new CircularArray(),
676 avgWaitTime: 0,
677 medWaitTime: 0,
678 error: 0
679 },
680 tasksQueue: new Queue<Task<Data>>()
681 })
682 }
683
684 /**
685 * Sets the given worker in the pool worker nodes.
686 *
687 * @param workerNodeKey - The worker node key.
688 * @param worker - The worker.
689 * @param tasksUsage - The worker tasks usage.
690 * @param tasksQueue - The worker task queue.
691 */
692 private setWorkerNode (
693 workerNodeKey: number,
694 worker: Worker,
695 tasksUsage: TasksUsage,
696 tasksQueue: Queue<Task<Data>>
697 ): void {
698 this.workerNodes[workerNodeKey] = {
699 worker,
700 tasksUsage,
701 tasksQueue
702 }
703 }
704
705 /**
706 * Removes the given worker from the pool worker nodes.
707 *
708 * @param worker - The worker.
709 */
710 private removeWorkerNode (worker: Worker): void {
711 const workerNodeKey = this.getWorkerNodeKey(worker)
712 if (workerNodeKey !== -1) {
713 this.workerNodes.splice(workerNodeKey, 1)
714 this.workerChoiceStrategyContext.remove(workerNodeKey)
715 }
716 }
717
718 private executeTask (workerNodeKey: number, task: Task<Data>): void {
719 this.beforeTaskExecutionHook(workerNodeKey)
720 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
721 }
722
723 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
724 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
725 }
726
727 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
728 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
729 }
730
731 private tasksQueueSize (workerNodeKey: number): number {
732 return this.workerNodes[workerNodeKey].tasksQueue.size
733 }
734
735 private flushTasksQueue (workerNodeKey: number): void {
736 if (this.tasksQueueSize(workerNodeKey) > 0) {
737 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
738 this.executeTask(
739 workerNodeKey,
740 this.dequeueTask(workerNodeKey) as Task<Data>
741 )
742 }
743 }
744 }
745
746 private flushTasksQueues (): void {
747 for (const [workerNodeKey] of this.workerNodes.entries()) {
748 this.flushTasksQueue(workerNodeKey)
749 }
750 }
751 }