5088f3d90957bd276756c120d2496e8ffe01bb5a
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3 import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
6 isPlainObject,
7 median
8 } from '../utils'
9 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
10 import { CircularArray } from '../circular-array'
11 import { Queue } from '../queue'
12 import {
13 type IPool,
14 PoolEmitter,
15 PoolEvents,
16 type PoolInfo,
17 type PoolOptions,
18 type PoolType,
19 PoolTypes,
20 type TasksQueueOptions,
21 type WorkerType
22 } from './pool'
23 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
24 import {
25 WorkerChoiceStrategies,
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
28 } from './selection-strategies/selection-strategies-types'
29 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
30
31 /**
32 * Base class that implements some shared logic for all poolifier pools.
33 *
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
36 * @typeParam Response - Type of execution response. This can only be serializable data.
37 */
38 export abstract class AbstractPool<
39 Worker extends IWorker,
40 Data = unknown,
41 Response = unknown
42 > implements IPool<Worker, Data, Response> {
43 /** @inheritDoc */
44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
45
46 /** @inheritDoc */
47 public readonly emitter?: PoolEmitter
48
49 /**
50 * The execution response promise map.
51 *
52 * - `key`: The message id of each submitted task.
53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
54 *
55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
56 */
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
61
62 /**
63 * Worker choice strategy context referencing a worker choice algorithm implementation.
64 *
65 * Default to a round robin algorithm.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
73 /**
74 * Constructs a new poolifier pool.
75 *
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
79 */
80 public constructor (
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
84 ) {
85 if (!this.isMain()) {
86 throw new Error('Cannot start a pool from a worker!')
87 }
88 this.checkNumberOfWorkers(this.numberOfWorkers)
89 this.checkFilePath(this.filePath)
90 this.checkPoolOptions(this.opts)
91
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
96
97 this.setupHook()
98
99 for (let i = 1; i <= this.numberOfWorkers; i++) {
100 this.createAndSetupWorker()
101 }
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115 }
116
117 private checkFilePath (filePath: string): void {
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
132 throw new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
168 }
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
175 throw new Error(
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
177 )
178 }
179 }
180
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
189 if (
190 workerChoiceStrategyOptions.weights != null &&
191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
197 }
198
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
214 /** @inheritDoc */
215 public get info (): PoolInfo {
216 return {
217 type: this.type,
218 worker: this.worker,
219 minSize: this.minSize,
220 maxSize: this.maxSize,
221 workerNodes: this.workerNodes.length,
222 idleWorkerNodes: this.workerNodes.reduce(
223 (accumulator, workerNode) =>
224 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
225 0
226 ),
227 busyWorkerNodes: this.workerNodes.reduce(
228 (accumulator, workerNode) =>
229 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
230 0
231 ),
232 runningTasks: this.workerNodes.reduce(
233 (accumulator, workerNode) =>
234 accumulator + workerNode.tasksUsage.running,
235 0
236 ),
237 queuedTasks: this.workerNodes.reduce(
238 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
239 0
240 ),
241 maxQueuedTasks: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
243 accumulator + workerNode.tasksQueue.maxSize,
244 0
245 )
246 }
247 }
248
249 /**
250 * Pool type.
251 *
252 * If it is `'dynamic'`, it provides the `max` property.
253 */
254 protected abstract get type (): PoolType
255
256 /**
257 * Gets the worker type.
258 */
259 protected abstract get worker (): WorkerType
260
261 /**
262 * Pool minimum size.
263 */
264 protected abstract get minSize (): number
265
266 /**
267 * Pool maximum size.
268 */
269 protected abstract get maxSize (): number
270
271 /**
272 * Gets the given worker its worker node key.
273 *
274 * @param worker - The worker.
275 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
276 */
277 private getWorkerNodeKey (worker: Worker): number {
278 return this.workerNodes.findIndex(
279 workerNode => workerNode.worker === worker
280 )
281 }
282
283 /** @inheritDoc */
284 public setWorkerChoiceStrategy (
285 workerChoiceStrategy: WorkerChoiceStrategy,
286 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
287 ): void {
288 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
289 this.opts.workerChoiceStrategy = workerChoiceStrategy
290 for (const workerNode of this.workerNodes) {
291 this.setWorkerNodeTasksUsage(workerNode, {
292 ran: 0,
293 running: 0,
294 runTime: 0,
295 runTimeHistory: new CircularArray(),
296 avgRunTime: 0,
297 medRunTime: 0,
298 waitTime: 0,
299 waitTimeHistory: new CircularArray(),
300 avgWaitTime: 0,
301 medWaitTime: 0,
302 error: 0
303 })
304 }
305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
306 this.opts.workerChoiceStrategy
307 )
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
311 }
312
313 /** @inheritDoc */
314 public setWorkerChoiceStrategyOptions (
315 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
316 ): void {
317 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
318 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
319 this.workerChoiceStrategyContext.setOptions(
320 this.opts.workerChoiceStrategyOptions
321 )
322 }
323
324 /** @inheritDoc */
325 public enableTasksQueue (
326 enable: boolean,
327 tasksQueueOptions?: TasksQueueOptions
328 ): void {
329 if (this.opts.enableTasksQueue === true && !enable) {
330 this.flushTasksQueues()
331 }
332 this.opts.enableTasksQueue = enable
333 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
334 }
335
336 /** @inheritDoc */
337 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
338 if (this.opts.enableTasksQueue === true) {
339 this.checkValidTasksQueueOptions(tasksQueueOptions)
340 this.opts.tasksQueueOptions =
341 this.buildTasksQueueOptions(tasksQueueOptions)
342 } else {
343 delete this.opts.tasksQueueOptions
344 }
345 }
346
347 private buildTasksQueueOptions (
348 tasksQueueOptions: TasksQueueOptions
349 ): TasksQueueOptions {
350 return {
351 concurrency: tasksQueueOptions?.concurrency ?? 1
352 }
353 }
354
355 /**
356 * Whether the pool is full or not.
357 *
358 * The pool filling boolean status.
359 */
360 protected get full (): boolean {
361 return this.workerNodes.length >= this.maxSize
362 }
363
364 /**
365 * Whether the pool is busy or not.
366 *
367 * The pool busyness boolean status.
368 */
369 protected abstract get busy (): boolean
370
371 protected internalBusy (): boolean {
372 return (
373 this.workerNodes.findIndex(workerNode => {
374 return workerNode.tasksUsage.running === 0
375 }) === -1
376 )
377 }
378
379 /** @inheritDoc */
380 public async execute (data?: Data, name?: string): Promise<Response> {
381 const submissionTimestamp = performance.now()
382 const workerNodeKey = this.chooseWorkerNode()
383 const submittedTask: Task<Data> = {
384 name,
385 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
386 data: data ?? ({} as Data),
387 submissionTimestamp,
388 id: crypto.randomUUID()
389 }
390 const res = new Promise<Response>((resolve, reject) => {
391 this.promiseResponseMap.set(submittedTask.id as string, {
392 resolve,
393 reject,
394 worker: this.workerNodes[workerNodeKey].worker
395 })
396 })
397 if (
398 this.opts.enableTasksQueue === true &&
399 (this.busy ||
400 this.workerNodes[workerNodeKey].tasksUsage.running >=
401 ((this.opts.tasksQueueOptions as TasksQueueOptions)
402 .concurrency as number))
403 ) {
404 this.enqueueTask(workerNodeKey, submittedTask)
405 } else {
406 this.executeTask(workerNodeKey, submittedTask)
407 }
408 this.workerChoiceStrategyContext.update(workerNodeKey)
409 this.checkAndEmitEvents()
410 // eslint-disable-next-line @typescript-eslint/return-await
411 return res
412 }
413
414 /** @inheritDoc */
415 public async destroy (): Promise<void> {
416 await Promise.all(
417 this.workerNodes.map(async (workerNode, workerNodeKey) => {
418 this.flushTasksQueue(workerNodeKey)
419 // FIXME: wait for tasks to be finished
420 await this.destroyWorker(workerNode.worker)
421 })
422 )
423 }
424
425 /**
426 * Shutdowns the given worker.
427 *
428 * @param worker - A worker within `workerNodes`.
429 */
430 protected abstract destroyWorker (worker: Worker): void | Promise<void>
431
432 /**
433 * Setup hook to execute code before worker node are created in the abstract constructor.
434 * Can be overridden
435 *
436 * @virtual
437 */
438 protected setupHook (): void {
439 // Intentionally empty
440 }
441
442 /**
443 * Should return whether the worker is the main worker or not.
444 */
445 protected abstract isMain (): boolean
446
447 /**
448 * Hook executed before the worker task execution.
449 * Can be overridden.
450 *
451 * @param workerNodeKey - The worker node key.
452 */
453 protected beforeTaskExecutionHook (workerNodeKey: number): void {
454 ++this.workerNodes[workerNodeKey].tasksUsage.running
455 }
456
457 /**
458 * Hook executed after the worker task execution.
459 * Can be overridden.
460 *
461 * @param worker - The worker.
462 * @param message - The received message.
463 */
464 protected afterTaskExecutionHook (
465 worker: Worker,
466 message: MessageValue<Response>
467 ): void {
468 const workerTasksUsage =
469 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
470 --workerTasksUsage.running
471 ++workerTasksUsage.ran
472 if (message.error != null) {
473 ++workerTasksUsage.error
474 }
475 this.updateRunTimeTasksUsage(workerTasksUsage, message)
476 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
477 }
478
479 private updateRunTimeTasksUsage (
480 workerTasksUsage: TasksUsage,
481 message: MessageValue<Response>
482 ): void {
483 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
484 workerTasksUsage.runTime += message.runTime ?? 0
485 if (
486 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
487 workerTasksUsage.ran !== 0
488 ) {
489 workerTasksUsage.avgRunTime =
490 workerTasksUsage.runTime / workerTasksUsage.ran
491 }
492 if (
493 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
494 message.runTime != null
495 ) {
496 workerTasksUsage.runTimeHistory.push(message.runTime)
497 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
498 }
499 }
500 }
501
502 private updateWaitTimeTasksUsage (
503 workerTasksUsage: TasksUsage,
504 message: MessageValue<Response>
505 ): void {
506 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
507 workerTasksUsage.waitTime += message.waitTime ?? 0
508 if (
509 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
510 workerTasksUsage.ran !== 0
511 ) {
512 workerTasksUsage.avgWaitTime =
513 workerTasksUsage.waitTime / workerTasksUsage.ran
514 }
515 if (
516 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
517 message.waitTime != null
518 ) {
519 workerTasksUsage.waitTimeHistory.push(message.waitTime)
520 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
521 }
522 }
523 }
524
525 /**
526 * Chooses a worker node for the next task.
527 *
528 * The default worker choice strategy uses a round robin algorithm to distribute the load.
529 *
530 * @returns The worker node key
531 */
532 protected chooseWorkerNode (): number {
533 let workerNodeKey: number
534 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
535 const workerCreated = this.createAndSetupWorker()
536 this.registerWorkerMessageListener(workerCreated, message => {
537 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
538 if (
539 isKillBehavior(KillBehaviors.HARD, message.kill) ||
540 (message.kill != null &&
541 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
542 ) {
543 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
544 this.flushTasksQueue(currentWorkerNodeKey)
545 // FIXME: wait for tasks to be finished
546 void (this.destroyWorker(workerCreated) as Promise<void>)
547 }
548 })
549 workerNodeKey = this.getWorkerNodeKey(workerCreated)
550 } else {
551 workerNodeKey = this.workerChoiceStrategyContext.execute()
552 }
553 return workerNodeKey
554 }
555
556 /**
557 * Sends a message to the given worker.
558 *
559 * @param worker - The worker which should receive the message.
560 * @param message - The message.
561 */
562 protected abstract sendToWorker (
563 worker: Worker,
564 message: MessageValue<Data>
565 ): void
566
567 /**
568 * Registers a listener callback on the given worker.
569 *
570 * @param worker - The worker which should register a listener.
571 * @param listener - The message listener callback.
572 */
573 protected abstract registerWorkerMessageListener<
574 Message extends Data | Response
575 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
576
577 /**
578 * Returns a newly created worker.
579 */
580 protected abstract createWorker (): Worker
581
582 /**
583 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
584 *
585 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
586 *
587 * @param worker - The newly created worker.
588 */
589 protected abstract afterWorkerSetup (worker: Worker): void
590
591 /**
592 * Creates a new worker and sets it up completely in the pool worker nodes.
593 *
594 * @returns New, completely set up worker.
595 */
596 protected createAndSetupWorker (): Worker {
597 const worker = this.createWorker()
598
599 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
600 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
601 worker.on('error', error => {
602 if (this.emitter != null) {
603 this.emitter.emit(PoolEvents.error, error)
604 }
605 })
606 if (this.opts.restartWorkerOnError === true) {
607 worker.on('error', () => {
608 this.createAndSetupWorker()
609 })
610 }
611 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
612 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
613 worker.once('exit', () => {
614 this.removeWorkerNode(worker)
615 })
616
617 this.pushWorkerNode(worker)
618
619 this.afterWorkerSetup(worker)
620
621 return worker
622 }
623
624 /**
625 * This function is the listener registered for each worker message.
626 *
627 * @returns The listener function to execute when a message is received from a worker.
628 */
629 protected workerListener (): (message: MessageValue<Response>) => void {
630 return message => {
631 if (message.id != null) {
632 // Task execution response received
633 const promiseResponse = this.promiseResponseMap.get(message.id)
634 if (promiseResponse != null) {
635 if (message.error != null) {
636 promiseResponse.reject(message.error)
637 if (this.emitter != null) {
638 this.emitter.emit(PoolEvents.taskError, {
639 error: message.error,
640 errorData: message.errorData
641 })
642 }
643 } else {
644 promiseResponse.resolve(message.data as Response)
645 }
646 this.afterTaskExecutionHook(promiseResponse.worker, message)
647 this.promiseResponseMap.delete(message.id)
648 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
649 if (
650 this.opts.enableTasksQueue === true &&
651 this.tasksQueueSize(workerNodeKey) > 0
652 ) {
653 this.executeTask(
654 workerNodeKey,
655 this.dequeueTask(workerNodeKey) as Task<Data>
656 )
657 }
658 }
659 }
660 }
661 }
662
663 private checkAndEmitEvents (): void {
664 if (this.emitter != null) {
665 if (this.busy) {
666 this.emitter?.emit(PoolEvents.busy, this.info)
667 }
668 if (this.type === PoolTypes.dynamic && this.full) {
669 this.emitter?.emit(PoolEvents.full, this.info)
670 }
671 }
672 }
673
674 /**
675 * Sets the given worker node its tasks usage in the pool.
676 *
677 * @param workerNode - The worker node.
678 * @param tasksUsage - The worker node tasks usage.
679 */
680 private setWorkerNodeTasksUsage (
681 workerNode: WorkerNode<Worker, Data>,
682 tasksUsage: TasksUsage
683 ): void {
684 workerNode.tasksUsage = tasksUsage
685 }
686
687 /**
688 * Pushes the given worker in the pool worker nodes.
689 *
690 * @param worker - The worker.
691 * @returns The worker nodes length.
692 */
693 private pushWorkerNode (worker: Worker): number {
694 return this.workerNodes.push({
695 worker,
696 tasksUsage: {
697 ran: 0,
698 running: 0,
699 runTime: 0,
700 runTimeHistory: new CircularArray(),
701 avgRunTime: 0,
702 medRunTime: 0,
703 waitTime: 0,
704 waitTimeHistory: new CircularArray(),
705 avgWaitTime: 0,
706 medWaitTime: 0,
707 error: 0
708 },
709 tasksQueue: new Queue<Task<Data>>()
710 })
711 }
712
713 /**
714 * Sets the given worker in the pool worker nodes.
715 *
716 * @param workerNodeKey - The worker node key.
717 * @param worker - The worker.
718 * @param tasksUsage - The worker tasks usage.
719 * @param tasksQueue - The worker task queue.
720 */
721 private setWorkerNode (
722 workerNodeKey: number,
723 worker: Worker,
724 tasksUsage: TasksUsage,
725 tasksQueue: Queue<Task<Data>>
726 ): void {
727 this.workerNodes[workerNodeKey] = {
728 worker,
729 tasksUsage,
730 tasksQueue
731 }
732 }
733
734 /**
735 * Removes the given worker from the pool worker nodes.
736 *
737 * @param worker - The worker.
738 */
739 private removeWorkerNode (worker: Worker): void {
740 const workerNodeKey = this.getWorkerNodeKey(worker)
741 if (workerNodeKey !== -1) {
742 this.workerNodes.splice(workerNodeKey, 1)
743 this.workerChoiceStrategyContext.remove(workerNodeKey)
744 }
745 }
746
747 private executeTask (workerNodeKey: number, task: Task<Data>): void {
748 this.beforeTaskExecutionHook(workerNodeKey)
749 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
750 }
751
752 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
753 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
754 }
755
756 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
757 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
758 }
759
760 private tasksQueueSize (workerNodeKey: number): number {
761 return this.workerNodes[workerNodeKey].tasksQueue.size
762 }
763
764 private flushTasksQueue (workerNodeKey: number): void {
765 if (this.tasksQueueSize(workerNodeKey) > 0) {
766 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
767 this.executeTask(
768 workerNodeKey,
769 this.dequeueTask(workerNodeKey) as Task<Data>
770 )
771 }
772 }
773 }
774
775 private flushTasksQueues (): void {
776 for (const [workerNodeKey] of this.workerNodes.entries()) {
777 this.flushTasksQueue(workerNodeKey)
778 }
779 }
780 }