refactor: convert helper to arrow function
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import crypto from 'node:crypto'
2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
6 isPlainObject,
7 median
8} from '../utils'
9import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
10import { CircularArray } from '../circular-array'
11import { Queue } from '../queue'
12import {
13 type IPool,
14 PoolEmitter,
15 PoolEvents,
16 type PoolInfo,
17 type PoolOptions,
18 type PoolType,
19 PoolTypes,
20 type TasksQueueOptions,
21 type WorkerType
22} from './pool'
23import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
24import {
25 WorkerChoiceStrategies,
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
28} from './selection-strategies/selection-strategies-types'
29import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
30
31/**
32 * Base class that implements some shared logic for all poolifier pools.
33 *
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
36 * @typeParam Response - Type of execution response. This can only be serializable data.
37 */
38export abstract class AbstractPool<
39 Worker extends IWorker,
40 Data = unknown,
41 Response = unknown
42> implements IPool<Worker, Data, Response> {
43 /** @inheritDoc */
44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
45
46 /** @inheritDoc */
47 public readonly emitter?: PoolEmitter
48
49 /**
50 * The execution response promise map.
51 *
52 * - `key`: The message id of each submitted task.
53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
54 *
55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
56 */
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
61
62 /**
63 * Worker choice strategy context referencing a worker choice algorithm implementation.
64 *
65 * Default to a round robin algorithm.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
73 /**
74 * Constructs a new poolifier pool.
75 *
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
79 */
80 public constructor (
81 public readonly numberOfWorkers: number,
82 public readonly filePath: string,
83 public readonly opts: PoolOptions<Worker>
84 ) {
85 if (!this.isMain()) {
86 throw new Error('Cannot start a pool from a worker!')
87 }
88 this.checkNumberOfWorkers(this.numberOfWorkers)
89 this.checkFilePath(this.filePath)
90 this.checkPoolOptions(this.opts)
91
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
96
97 this.setupHook()
98
99 for (let i = 1; i <= this.numberOfWorkers; i++) {
100 this.createAndSetupWorker()
101 }
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115 }
116
117 private checkFilePath (filePath: string): void {
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
132 throw new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
168 }
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
175 throw new Error(
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
177 )
178 }
179 }
180
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
189 if (
190 workerChoiceStrategyOptions.weights != null &&
191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
197 }
198
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
214 /** @inheritDoc */
215 public abstract get type (): PoolType
216
217 /** @inheritDoc */
218 public get info (): PoolInfo {
219 return {
220 type: this.type,
221 worker: this.worker,
222 minSize: this.minSize,
223 maxSize: this.maxSize,
224 workerNodes: this.workerNodes.length,
225 idleWorkerNodes: this.workerNodes.reduce(
226 (accumulator, workerNode) =>
227 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
232 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
233 0
234 ),
235 runningTasks: this.workerNodes.reduce(
236 (accumulator, workerNode) =>
237 accumulator + workerNode.tasksUsage.running,
238 0
239 ),
240 queuedTasks: this.workerNodes.reduce(
241 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
242 0
243 ),
244 maxQueuedTasks: this.workerNodes.reduce(
245 (accumulator, workerNode) =>
246 accumulator + workerNode.tasksQueue.maxSize,
247 0
248 )
249 }
250 }
251
252 /**
253 * Gets the worker type.
254 */
255 protected abstract get worker (): WorkerType
256
257 /**
258 * Pool minimum size.
259 */
260 protected abstract get minSize (): number
261
262 /**
263 * Pool maximum size.
264 */
265 protected abstract get maxSize (): number
266
267 /**
268 * Gets the given worker its worker node key.
269 *
270 * @param worker - The worker.
271 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
272 */
273 private getWorkerNodeKey (worker: Worker): number {
274 return this.workerNodes.findIndex(
275 workerNode => workerNode.worker === worker
276 )
277 }
278
279 /** @inheritDoc */
280 public setWorkerChoiceStrategy (
281 workerChoiceStrategy: WorkerChoiceStrategy,
282 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
283 ): void {
284 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
285 this.opts.workerChoiceStrategy = workerChoiceStrategy
286 for (const workerNode of this.workerNodes) {
287 this.setWorkerNodeTasksUsage(workerNode, {
288 run: 0,
289 running: 0,
290 runTime: 0,
291 runTimeHistory: new CircularArray(),
292 avgRunTime: 0,
293 medRunTime: 0,
294 waitTime: 0,
295 waitTimeHistory: new CircularArray(),
296 avgWaitTime: 0,
297 medWaitTime: 0,
298 error: 0
299 })
300 }
301 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
302 this.opts.workerChoiceStrategy
303 )
304 if (workerChoiceStrategyOptions != null) {
305 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
306 }
307 }
308
309 /** @inheritDoc */
310 public setWorkerChoiceStrategyOptions (
311 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
312 ): void {
313 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
314 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
315 this.workerChoiceStrategyContext.setOptions(
316 this.opts.workerChoiceStrategyOptions
317 )
318 }
319
320 /** @inheritDoc */
321 public enableTasksQueue (
322 enable: boolean,
323 tasksQueueOptions?: TasksQueueOptions
324 ): void {
325 if (this.opts.enableTasksQueue === true && !enable) {
326 this.flushTasksQueues()
327 }
328 this.opts.enableTasksQueue = enable
329 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
330 }
331
332 /** @inheritDoc */
333 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
334 if (this.opts.enableTasksQueue === true) {
335 this.checkValidTasksQueueOptions(tasksQueueOptions)
336 this.opts.tasksQueueOptions =
337 this.buildTasksQueueOptions(tasksQueueOptions)
338 } else {
339 delete this.opts.tasksQueueOptions
340 }
341 }
342
343 private buildTasksQueueOptions (
344 tasksQueueOptions: TasksQueueOptions
345 ): TasksQueueOptions {
346 return {
347 concurrency: tasksQueueOptions?.concurrency ?? 1
348 }
349 }
350
351 /**
352 * Whether the pool is full or not.
353 *
354 * The pool filling boolean status.
355 */
356 protected abstract get full (): boolean
357
358 /**
359 * Whether the pool is busy or not.
360 *
361 * The pool busyness boolean status.
362 */
363 protected abstract get busy (): boolean
364
365 protected internalBusy (): boolean {
366 return (
367 this.workerNodes.findIndex(workerNode => {
368 return workerNode.tasksUsage.running === 0
369 }) === -1
370 )
371 }
372
373 /** @inheritDoc */
374 public async execute (data?: Data, name?: string): Promise<Response> {
375 const submissionTimestamp = performance.now()
376 const workerNodeKey = this.chooseWorkerNode()
377 const submittedTask: Task<Data> = {
378 name,
379 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
380 data: data ?? ({} as Data),
381 submissionTimestamp,
382 id: crypto.randomUUID()
383 }
384 const res = new Promise<Response>((resolve, reject) => {
385 this.promiseResponseMap.set(submittedTask.id as string, {
386 resolve,
387 reject,
388 worker: this.workerNodes[workerNodeKey].worker
389 })
390 })
391 if (
392 this.opts.enableTasksQueue === true &&
393 (this.busy ||
394 this.workerNodes[workerNodeKey].tasksUsage.running >=
395 ((this.opts.tasksQueueOptions as TasksQueueOptions)
396 .concurrency as number))
397 ) {
398 this.enqueueTask(workerNodeKey, submittedTask)
399 } else {
400 this.executeTask(workerNodeKey, submittedTask)
401 }
402 this.workerChoiceStrategyContext.update(workerNodeKey)
403 this.checkAndEmitEvents()
404 // eslint-disable-next-line @typescript-eslint/return-await
405 return res
406 }
407
408 /** @inheritDoc */
409 public async destroy (): Promise<void> {
410 await Promise.all(
411 this.workerNodes.map(async (workerNode, workerNodeKey) => {
412 this.flushTasksQueue(workerNodeKey)
413 await this.destroyWorker(workerNode.worker)
414 })
415 )
416 }
417
418 /**
419 * Shutdowns the given worker.
420 *
421 * @param worker - A worker within `workerNodes`.
422 */
423 protected abstract destroyWorker (worker: Worker): void | Promise<void>
424
425 /**
426 * Setup hook to execute code before worker node are created in the abstract constructor.
427 * Can be overridden
428 *
429 * @virtual
430 */
431 protected setupHook (): void {
432 // Intentionally empty
433 }
434
435 /**
436 * Should return whether the worker is the main worker or not.
437 */
438 protected abstract isMain (): boolean
439
440 /**
441 * Hook executed before the worker task execution.
442 * Can be overridden.
443 *
444 * @param workerNodeKey - The worker node key.
445 */
446 protected beforeTaskExecutionHook (workerNodeKey: number): void {
447 ++this.workerNodes[workerNodeKey].tasksUsage.running
448 }
449
450 /**
451 * Hook executed after the worker task execution.
452 * Can be overridden.
453 *
454 * @param worker - The worker.
455 * @param message - The received message.
456 */
457 protected afterTaskExecutionHook (
458 worker: Worker,
459 message: MessageValue<Response>
460 ): void {
461 const workerTasksUsage =
462 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
463 --workerTasksUsage.running
464 ++workerTasksUsage.run
465 if (message.error != null) {
466 ++workerTasksUsage.error
467 }
468 this.updateRunTimeTasksUsage(workerTasksUsage, message)
469 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
470 }
471
472 private updateRunTimeTasksUsage (
473 workerTasksUsage: TasksUsage,
474 message: MessageValue<Response>
475 ): void {
476 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
477 workerTasksUsage.runTime += message.runTime ?? 0
478 if (
479 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
480 workerTasksUsage.run !== 0
481 ) {
482 workerTasksUsage.avgRunTime =
483 workerTasksUsage.runTime / workerTasksUsage.run
484 }
485 if (
486 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
487 message.runTime != null
488 ) {
489 workerTasksUsage.runTimeHistory.push(message.runTime)
490 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
491 }
492 }
493 }
494
495 private updateWaitTimeTasksUsage (
496 workerTasksUsage: TasksUsage,
497 message: MessageValue<Response>
498 ): void {
499 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
500 workerTasksUsage.waitTime += message.waitTime ?? 0
501 if (
502 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
503 workerTasksUsage.run !== 0
504 ) {
505 workerTasksUsage.avgWaitTime =
506 workerTasksUsage.waitTime / workerTasksUsage.run
507 }
508 if (
509 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
510 message.waitTime != null
511 ) {
512 workerTasksUsage.waitTimeHistory.push(message.waitTime)
513 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
514 }
515 }
516 }
517
518 /**
519 * Chooses a worker node for the next task.
520 *
521 * The default worker choice strategy uses a round robin algorithm to distribute the load.
522 *
523 * @returns The worker node key
524 */
525 protected chooseWorkerNode (): number {
526 let workerNodeKey: number
527 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
528 const workerCreated = this.createAndSetupWorker()
529 this.registerWorkerMessageListener(workerCreated, message => {
530 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
531 if (
532 isKillBehavior(KillBehaviors.HARD, message.kill) ||
533 (message.kill != null &&
534 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
535 ) {
536 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
537 this.flushTasksQueue(currentWorkerNodeKey)
538 void (this.destroyWorker(workerCreated) as Promise<void>)
539 }
540 })
541 workerNodeKey = this.getWorkerNodeKey(workerCreated)
542 } else {
543 workerNodeKey = this.workerChoiceStrategyContext.execute()
544 }
545 return workerNodeKey
546 }
547
548 /**
549 * Sends a message to the given worker.
550 *
551 * @param worker - The worker which should receive the message.
552 * @param message - The message.
553 */
554 protected abstract sendToWorker (
555 worker: Worker,
556 message: MessageValue<Data>
557 ): void
558
559 /**
560 * Registers a listener callback on the given worker.
561 *
562 * @param worker - The worker which should register a listener.
563 * @param listener - The message listener callback.
564 */
565 protected abstract registerWorkerMessageListener<
566 Message extends Data | Response
567 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
568
569 /**
570 * Returns a newly created worker.
571 */
572 protected abstract createWorker (): Worker
573
574 /**
575 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
576 *
577 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
578 *
579 * @param worker - The newly created worker.
580 */
581 protected abstract afterWorkerSetup (worker: Worker): void
582
583 /**
584 * Creates a new worker and sets it up completely in the pool worker nodes.
585 *
586 * @returns New, completely set up worker.
587 */
588 protected createAndSetupWorker (): Worker {
589 const worker = this.createWorker()
590
591 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
592 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
593 worker.on('error', error => {
594 if (this.emitter != null) {
595 this.emitter.emit(PoolEvents.error, error)
596 }
597 })
598 if (this.opts.restartWorkerOnError === true) {
599 worker.on('error', () => {
600 this.createAndSetupWorker()
601 })
602 }
603 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
604 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
605 worker.once('exit', () => {
606 this.removeWorkerNode(worker)
607 })
608
609 this.pushWorkerNode(worker)
610
611 this.afterWorkerSetup(worker)
612
613 return worker
614 }
615
616 /**
617 * This function is the listener registered for each worker message.
618 *
619 * @returns The listener function to execute when a message is received from a worker.
620 */
621 protected workerListener (): (message: MessageValue<Response>) => void {
622 return message => {
623 if (message.id != null) {
624 // Task execution response received
625 const promiseResponse = this.promiseResponseMap.get(message.id)
626 if (promiseResponse != null) {
627 if (message.error != null) {
628 promiseResponse.reject(message.error)
629 if (this.emitter != null) {
630 this.emitter.emit(PoolEvents.taskError, {
631 error: message.error,
632 errorData: message.errorData
633 })
634 }
635 } else {
636 promiseResponse.resolve(message.data as Response)
637 }
638 this.afterTaskExecutionHook(promiseResponse.worker, message)
639 this.promiseResponseMap.delete(message.id)
640 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
641 if (
642 this.opts.enableTasksQueue === true &&
643 this.tasksQueueSize(workerNodeKey) > 0
644 ) {
645 this.executeTask(
646 workerNodeKey,
647 this.dequeueTask(workerNodeKey) as Task<Data>
648 )
649 }
650 }
651 }
652 }
653 }
654
655 private checkAndEmitEvents (): void {
656 if (this.emitter != null) {
657 if (this.busy) {
658 this.emitter?.emit(PoolEvents.busy, this.info)
659 }
660 if (this.type === PoolTypes.dynamic && this.full) {
661 this.emitter?.emit(PoolEvents.full, this.info)
662 }
663 }
664 }
665
666 /**
667 * Sets the given worker node its tasks usage in the pool.
668 *
669 * @param workerNode - The worker node.
670 * @param tasksUsage - The worker node tasks usage.
671 */
672 private setWorkerNodeTasksUsage (
673 workerNode: WorkerNode<Worker, Data>,
674 tasksUsage: TasksUsage
675 ): void {
676 workerNode.tasksUsage = tasksUsage
677 }
678
679 /**
680 * Pushes the given worker in the pool worker nodes.
681 *
682 * @param worker - The worker.
683 * @returns The worker nodes length.
684 */
685 private pushWorkerNode (worker: Worker): number {
686 return this.workerNodes.push({
687 worker,
688 tasksUsage: {
689 run: 0,
690 running: 0,
691 runTime: 0,
692 runTimeHistory: new CircularArray(),
693 avgRunTime: 0,
694 medRunTime: 0,
695 waitTime: 0,
696 waitTimeHistory: new CircularArray(),
697 avgWaitTime: 0,
698 medWaitTime: 0,
699 error: 0
700 },
701 tasksQueue: new Queue<Task<Data>>()
702 })
703 }
704
705 /**
706 * Sets the given worker in the pool worker nodes.
707 *
708 * @param workerNodeKey - The worker node key.
709 * @param worker - The worker.
710 * @param tasksUsage - The worker tasks usage.
711 * @param tasksQueue - The worker task queue.
712 */
713 private setWorkerNode (
714 workerNodeKey: number,
715 worker: Worker,
716 tasksUsage: TasksUsage,
717 tasksQueue: Queue<Task<Data>>
718 ): void {
719 this.workerNodes[workerNodeKey] = {
720 worker,
721 tasksUsage,
722 tasksQueue
723 }
724 }
725
726 /**
727 * Removes the given worker from the pool worker nodes.
728 *
729 * @param worker - The worker.
730 */
731 private removeWorkerNode (worker: Worker): void {
732 const workerNodeKey = this.getWorkerNodeKey(worker)
733 if (workerNodeKey !== -1) {
734 this.workerNodes.splice(workerNodeKey, 1)
735 this.workerChoiceStrategyContext.remove(workerNodeKey)
736 }
737 }
738
739 private executeTask (workerNodeKey: number, task: Task<Data>): void {
740 this.beforeTaskExecutionHook(workerNodeKey)
741 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
742 }
743
744 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
745 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
746 }
747
748 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
749 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
750 }
751
752 private tasksQueueSize (workerNodeKey: number): number {
753 return this.workerNodes[workerNodeKey].tasksQueue.size
754 }
755
756 private flushTasksQueue (workerNodeKey: number): void {
757 if (this.tasksQueueSize(workerNodeKey) > 0) {
758 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
759 this.executeTask(
760 workerNodeKey,
761 this.dequeueTask(workerNodeKey) as Task<Data>
762 )
763 }
764 }
765 }
766
767 private flushTasksQueues (): void {
768 for (const [workerNodeKey] of this.workerNodes.entries()) {
769 this.flushTasksQueue(workerNodeKey)
770 }
771 }
772}