refactor: limit pool internals public exposure
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3 import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
6 isPlainObject,
7 median
8 } from '../utils'
9 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
10 import { CircularArray } from '../circular-array'
11 import { Queue } from '../queue'
12 import {
13 type IPool,
14 PoolEmitter,
15 PoolEvents,
16 type PoolInfo,
17 type PoolOptions,
18 type PoolType,
19 PoolTypes,
20 type TasksQueueOptions,
21 type WorkerType
22 } from './pool'
23 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
24 import {
25 WorkerChoiceStrategies,
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
28 } from './selection-strategies/selection-strategies-types'
29 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
30
31 /**
32 * Base class that implements some shared logic for all poolifier pools.
33 *
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
36 * @typeParam Response - Type of execution response. This can only be serializable data.
37 */
38 export abstract class AbstractPool<
39 Worker extends IWorker,
40 Data = unknown,
41 Response = unknown
42 > implements IPool<Worker, Data, Response> {
43 /** @inheritDoc */
44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
45
46 /** @inheritDoc */
47 public readonly emitter?: PoolEmitter
48
49 /**
50 * The execution response promise map.
51 *
52 * - `key`: The message id of each submitted task.
53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
54 *
55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
56 */
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
61
62 /**
63 * Worker choice strategy context referencing a worker choice algorithm implementation.
64 *
65 * Default to a round robin algorithm.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
73 /**
74 * Constructs a new poolifier pool.
75 *
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
79 */
80 public constructor (
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
84 ) {
85 if (!this.isMain()) {
86 throw new Error('Cannot start a pool from a worker!')
87 }
88 this.checkNumberOfWorkers(this.numberOfWorkers)
89 this.checkFilePath(this.filePath)
90 this.checkPoolOptions(this.opts)
91
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
96
97 this.setupHook()
98
99 for (let i = 1; i <= this.numberOfWorkers; i++) {
100 this.createAndSetupWorker()
101 }
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115 }
116
117 private checkFilePath (filePath: string): void {
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
132 throw new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
168 }
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
175 throw new Error(
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
177 )
178 }
179 }
180
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
189 if (
190 workerChoiceStrategyOptions.weights != null &&
191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
197 }
198
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
214 /** @inheritDoc */
215 public get info (): PoolInfo {
216 return {
217 type: this.type,
218 worker: this.worker,
219 minSize: this.minSize,
220 maxSize: this.maxSize,
221 workerNodes: this.workerNodes.length,
222 idleWorkerNodes: this.workerNodes.reduce(
223 (accumulator, workerNode) =>
224 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
225 0
226 ),
227 busyWorkerNodes: this.workerNodes.reduce(
228 (accumulator, workerNode) =>
229 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
230 0
231 ),
232 runningTasks: this.workerNodes.reduce(
233 (accumulator, workerNode) =>
234 accumulator + workerNode.tasksUsage.running,
235 0
236 ),
237 queuedTasks: this.workerNodes.reduce(
238 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
239 0
240 ),
241 maxQueuedTasks: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
243 accumulator + workerNode.tasksQueue.maxSize,
244 0
245 )
246 }
247 }
248
249 /**
250 * Pool type.
251 *
252 * If it is `'dynamic'`, it provides the `max` property.
253 */
254 protected abstract get type (): PoolType
255
256 /**
257 * Gets the worker type.
258 */
259 protected abstract get worker (): WorkerType
260
261 /**
262 * Pool minimum size.
263 */
264 protected abstract get minSize (): number
265
266 /**
267 * Pool maximum size.
268 */
269 protected abstract get maxSize (): number
270
271 /**
272 * Gets the given worker its worker node key.
273 *
274 * @param worker - The worker.
275 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
276 */
277 private getWorkerNodeKey (worker: Worker): number {
278 return this.workerNodes.findIndex(
279 workerNode => workerNode.worker === worker
280 )
281 }
282
283 /** @inheritDoc */
284 public setWorkerChoiceStrategy (
285 workerChoiceStrategy: WorkerChoiceStrategy,
286 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
287 ): void {
288 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
289 this.opts.workerChoiceStrategy = workerChoiceStrategy
290 for (const workerNode of this.workerNodes) {
291 this.setWorkerNodeTasksUsage(workerNode, {
292 run: 0,
293 running: 0,
294 runTime: 0,
295 runTimeHistory: new CircularArray(),
296 avgRunTime: 0,
297 medRunTime: 0,
298 waitTime: 0,
299 waitTimeHistory: new CircularArray(),
300 avgWaitTime: 0,
301 medWaitTime: 0,
302 error: 0
303 })
304 }
305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
306 this.opts.workerChoiceStrategy
307 )
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
311 }
312
313 /** @inheritDoc */
314 public setWorkerChoiceStrategyOptions (
315 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
316 ): void {
317 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
318 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
319 this.workerChoiceStrategyContext.setOptions(
320 this.opts.workerChoiceStrategyOptions
321 )
322 }
323
324 /** @inheritDoc */
325 public enableTasksQueue (
326 enable: boolean,
327 tasksQueueOptions?: TasksQueueOptions
328 ): void {
329 if (this.opts.enableTasksQueue === true && !enable) {
330 this.flushTasksQueues()
331 }
332 this.opts.enableTasksQueue = enable
333 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
334 }
335
336 /** @inheritDoc */
337 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
338 if (this.opts.enableTasksQueue === true) {
339 this.checkValidTasksQueueOptions(tasksQueueOptions)
340 this.opts.tasksQueueOptions =
341 this.buildTasksQueueOptions(tasksQueueOptions)
342 } else {
343 delete this.opts.tasksQueueOptions
344 }
345 }
346
347 private buildTasksQueueOptions (
348 tasksQueueOptions: TasksQueueOptions
349 ): TasksQueueOptions {
350 return {
351 concurrency: tasksQueueOptions?.concurrency ?? 1
352 }
353 }
354
355 /**
356 * Whether the pool is full or not.
357 *
358 * The pool filling boolean status.
359 */
360 protected abstract get full (): boolean
361
362 /**
363 * Whether the pool is busy or not.
364 *
365 * The pool busyness boolean status.
366 */
367 protected abstract get busy (): boolean
368
369 protected internalBusy (): boolean {
370 return (
371 this.workerNodes.findIndex(workerNode => {
372 return workerNode.tasksUsage.running === 0
373 }) === -1
374 )
375 }
376
377 /** @inheritDoc */
378 public async execute (data?: Data, name?: string): Promise<Response> {
379 const submissionTimestamp = performance.now()
380 const workerNodeKey = this.chooseWorkerNode()
381 const submittedTask: Task<Data> = {
382 name,
383 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
384 data: data ?? ({} as Data),
385 submissionTimestamp,
386 id: crypto.randomUUID()
387 }
388 const res = new Promise<Response>((resolve, reject) => {
389 this.promiseResponseMap.set(submittedTask.id as string, {
390 resolve,
391 reject,
392 worker: this.workerNodes[workerNodeKey].worker
393 })
394 })
395 if (
396 this.opts.enableTasksQueue === true &&
397 (this.busy ||
398 this.workerNodes[workerNodeKey].tasksUsage.running >=
399 ((this.opts.tasksQueueOptions as TasksQueueOptions)
400 .concurrency as number))
401 ) {
402 this.enqueueTask(workerNodeKey, submittedTask)
403 } else {
404 this.executeTask(workerNodeKey, submittedTask)
405 }
406 this.workerChoiceStrategyContext.update(workerNodeKey)
407 this.checkAndEmitEvents()
408 // eslint-disable-next-line @typescript-eslint/return-await
409 return res
410 }
411
412 /** @inheritDoc */
413 public async destroy (): Promise<void> {
414 await Promise.all(
415 this.workerNodes.map(async (workerNode, workerNodeKey) => {
416 this.flushTasksQueue(workerNodeKey)
417 // FIXME: wait for tasks to be finished
418 await this.destroyWorker(workerNode.worker)
419 })
420 )
421 }
422
423 /**
424 * Shutdowns the given worker.
425 *
426 * @param worker - A worker within `workerNodes`.
427 */
428 protected abstract destroyWorker (worker: Worker): void | Promise<void>
429
430 /**
431 * Setup hook to execute code before worker node are created in the abstract constructor.
432 * Can be overridden
433 *
434 * @virtual
435 */
436 protected setupHook (): void {
437 // Intentionally empty
438 }
439
440 /**
441 * Should return whether the worker is the main worker or not.
442 */
443 protected abstract isMain (): boolean
444
445 /**
446 * Hook executed before the worker task execution.
447 * Can be overridden.
448 *
449 * @param workerNodeKey - The worker node key.
450 */
451 protected beforeTaskExecutionHook (workerNodeKey: number): void {
452 ++this.workerNodes[workerNodeKey].tasksUsage.running
453 }
454
455 /**
456 * Hook executed after the worker task execution.
457 * Can be overridden.
458 *
459 * @param worker - The worker.
460 * @param message - The received message.
461 */
462 protected afterTaskExecutionHook (
463 worker: Worker,
464 message: MessageValue<Response>
465 ): void {
466 const workerTasksUsage =
467 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
468 --workerTasksUsage.running
469 ++workerTasksUsage.run
470 if (message.error != null) {
471 ++workerTasksUsage.error
472 }
473 this.updateRunTimeTasksUsage(workerTasksUsage, message)
474 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
475 }
476
477 private updateRunTimeTasksUsage (
478 workerTasksUsage: TasksUsage,
479 message: MessageValue<Response>
480 ): void {
481 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
482 workerTasksUsage.runTime += message.runTime ?? 0
483 if (
484 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
485 workerTasksUsage.run !== 0
486 ) {
487 workerTasksUsage.avgRunTime =
488 workerTasksUsage.runTime / workerTasksUsage.run
489 }
490 if (
491 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
492 message.runTime != null
493 ) {
494 workerTasksUsage.runTimeHistory.push(message.runTime)
495 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
496 }
497 }
498 }
499
500 private updateWaitTimeTasksUsage (
501 workerTasksUsage: TasksUsage,
502 message: MessageValue<Response>
503 ): void {
504 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
505 workerTasksUsage.waitTime += message.waitTime ?? 0
506 if (
507 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
508 workerTasksUsage.run !== 0
509 ) {
510 workerTasksUsage.avgWaitTime =
511 workerTasksUsage.waitTime / workerTasksUsage.run
512 }
513 if (
514 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
515 message.waitTime != null
516 ) {
517 workerTasksUsage.waitTimeHistory.push(message.waitTime)
518 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
519 }
520 }
521 }
522
523 /**
524 * Chooses a worker node for the next task.
525 *
526 * The default worker choice strategy uses a round robin algorithm to distribute the load.
527 *
528 * @returns The worker node key
529 */
530 protected chooseWorkerNode (): number {
531 let workerNodeKey: number
532 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
533 const workerCreated = this.createAndSetupWorker()
534 this.registerWorkerMessageListener(workerCreated, message => {
535 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
536 if (
537 isKillBehavior(KillBehaviors.HARD, message.kill) ||
538 (message.kill != null &&
539 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
540 ) {
541 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
542 this.flushTasksQueue(currentWorkerNodeKey)
543 // FIXME: wait for tasks to be finished
544 void (this.destroyWorker(workerCreated) as Promise<void>)
545 }
546 })
547 workerNodeKey = this.getWorkerNodeKey(workerCreated)
548 } else {
549 workerNodeKey = this.workerChoiceStrategyContext.execute()
550 }
551 return workerNodeKey
552 }
553
554 /**
555 * Sends a message to the given worker.
556 *
557 * @param worker - The worker which should receive the message.
558 * @param message - The message.
559 */
560 protected abstract sendToWorker (
561 worker: Worker,
562 message: MessageValue<Data>
563 ): void
564
565 /**
566 * Registers a listener callback on the given worker.
567 *
568 * @param worker - The worker which should register a listener.
569 * @param listener - The message listener callback.
570 */
571 protected abstract registerWorkerMessageListener<
572 Message extends Data | Response
573 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
574
575 /**
576 * Returns a newly created worker.
577 */
578 protected abstract createWorker (): Worker
579
580 /**
581 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
582 *
583 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
584 *
585 * @param worker - The newly created worker.
586 */
587 protected abstract afterWorkerSetup (worker: Worker): void
588
589 /**
590 * Creates a new worker and sets it up completely in the pool worker nodes.
591 *
592 * @returns New, completely set up worker.
593 */
594 protected createAndSetupWorker (): Worker {
595 const worker = this.createWorker()
596
597 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
598 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
599 worker.on('error', error => {
600 if (this.emitter != null) {
601 this.emitter.emit(PoolEvents.error, error)
602 }
603 })
604 if (this.opts.restartWorkerOnError === true) {
605 worker.on('error', () => {
606 this.createAndSetupWorker()
607 })
608 }
609 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
610 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
611 worker.once('exit', () => {
612 this.removeWorkerNode(worker)
613 })
614
615 this.pushWorkerNode(worker)
616
617 this.afterWorkerSetup(worker)
618
619 return worker
620 }
621
622 /**
623 * This function is the listener registered for each worker message.
624 *
625 * @returns The listener function to execute when a message is received from a worker.
626 */
627 protected workerListener (): (message: MessageValue<Response>) => void {
628 return message => {
629 if (message.id != null) {
630 // Task execution response received
631 const promiseResponse = this.promiseResponseMap.get(message.id)
632 if (promiseResponse != null) {
633 if (message.error != null) {
634 promiseResponse.reject(message.error)
635 if (this.emitter != null) {
636 this.emitter.emit(PoolEvents.taskError, {
637 error: message.error,
638 errorData: message.errorData
639 })
640 }
641 } else {
642 promiseResponse.resolve(message.data as Response)
643 }
644 this.afterTaskExecutionHook(promiseResponse.worker, message)
645 this.promiseResponseMap.delete(message.id)
646 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
647 if (
648 this.opts.enableTasksQueue === true &&
649 this.tasksQueueSize(workerNodeKey) > 0
650 ) {
651 this.executeTask(
652 workerNodeKey,
653 this.dequeueTask(workerNodeKey) as Task<Data>
654 )
655 }
656 }
657 }
658 }
659 }
660
661 private checkAndEmitEvents (): void {
662 if (this.emitter != null) {
663 if (this.busy) {
664 this.emitter?.emit(PoolEvents.busy, this.info)
665 }
666 if (this.type === PoolTypes.dynamic && this.full) {
667 this.emitter?.emit(PoolEvents.full, this.info)
668 }
669 }
670 }
671
672 /**
673 * Sets the given worker node its tasks usage in the pool.
674 *
675 * @param workerNode - The worker node.
676 * @param tasksUsage - The worker node tasks usage.
677 */
678 private setWorkerNodeTasksUsage (
679 workerNode: WorkerNode<Worker, Data>,
680 tasksUsage: TasksUsage
681 ): void {
682 workerNode.tasksUsage = tasksUsage
683 }
684
685 /**
686 * Pushes the given worker in the pool worker nodes.
687 *
688 * @param worker - The worker.
689 * @returns The worker nodes length.
690 */
691 private pushWorkerNode (worker: Worker): number {
692 return this.workerNodes.push({
693 worker,
694 tasksUsage: {
695 run: 0,
696 running: 0,
697 runTime: 0,
698 runTimeHistory: new CircularArray(),
699 avgRunTime: 0,
700 medRunTime: 0,
701 waitTime: 0,
702 waitTimeHistory: new CircularArray(),
703 avgWaitTime: 0,
704 medWaitTime: 0,
705 error: 0
706 },
707 tasksQueue: new Queue<Task<Data>>()
708 })
709 }
710
711 /**
712 * Sets the given worker in the pool worker nodes.
713 *
714 * @param workerNodeKey - The worker node key.
715 * @param worker - The worker.
716 * @param tasksUsage - The worker tasks usage.
717 * @param tasksQueue - The worker task queue.
718 */
719 private setWorkerNode (
720 workerNodeKey: number,
721 worker: Worker,
722 tasksUsage: TasksUsage,
723 tasksQueue: Queue<Task<Data>>
724 ): void {
725 this.workerNodes[workerNodeKey] = {
726 worker,
727 tasksUsage,
728 tasksQueue
729 }
730 }
731
732 /**
733 * Removes the given worker from the pool worker nodes.
734 *
735 * @param worker - The worker.
736 */
737 private removeWorkerNode (worker: Worker): void {
738 const workerNodeKey = this.getWorkerNodeKey(worker)
739 if (workerNodeKey !== -1) {
740 this.workerNodes.splice(workerNodeKey, 1)
741 this.workerChoiceStrategyContext.remove(workerNodeKey)
742 }
743 }
744
745 private executeTask (workerNodeKey: number, task: Task<Data>): void {
746 this.beforeTaskExecutionHook(workerNodeKey)
747 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
748 }
749
750 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
751 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
752 }
753
754 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
755 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
756 }
757
758 private tasksQueueSize (workerNodeKey: number): number {
759 return this.workerNodes[workerNodeKey].tasksQueue.size
760 }
761
762 private flushTasksQueue (workerNodeKey: number): void {
763 if (this.tasksQueueSize(workerNodeKey) > 0) {
764 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
765 this.executeTask(
766 workerNodeKey,
767 this.dequeueTask(workerNodeKey) as Task<Data>
768 )
769 }
770 }
771 }
772
773 private flushTasksQueues (): void {
774 for (const [workerNodeKey] of this.workerNodes.entries()) {
775 this.flushTasksQueue(workerNodeKey)
776 }
777 }
778 }