feat: restart worker in case of uncaught error
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
3 import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
6 isPlainObject,
7 median
8 } from '../utils'
9 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
10 import { CircularArray } from '../circular-array'
11 import { Queue } from '../queue'
12 import {
13 type IPool,
14 PoolEmitter,
15 PoolEvents,
16 type PoolOptions,
17 PoolType,
18 type TasksQueueOptions
19 } from './pool'
20 import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
21 import {
22 WorkerChoiceStrategies,
23 type WorkerChoiceStrategy,
24 type WorkerChoiceStrategyOptions
25 } from './selection-strategies/selection-strategies-types'
26 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
27
28 /**
29 * Base class that implements some shared logic for all poolifier pools.
30 *
31 * @typeParam Worker - Type of worker which manages this pool.
32 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
33 * @typeParam Response - Type of execution response. This can only be serializable data.
34 */
35 export abstract class AbstractPool<
36 Worker extends IWorker,
37 Data = unknown,
38 Response = unknown
39 > implements IPool<Worker, Data, Response> {
40 /** @inheritDoc */
41 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
42
43 /** @inheritDoc */
44 public readonly emitter?: PoolEmitter
45
46 /**
47 * The execution response promise map.
48 *
49 * - `key`: The message id of each submitted task.
50 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
51 *
52 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
53 */
54 protected promiseResponseMap: Map<
55 string,
56 PromiseResponseWrapper<Worker, Response>
57 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
58
59 /**
60 * Worker choice strategy context referencing a worker choice algorithm implementation.
61 *
62 * Default to a round robin algorithm.
63 */
64 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
65 Worker,
66 Data,
67 Response
68 >
69
70 /**
71 * Constructs a new poolifier pool.
72 *
73 * @param numberOfWorkers - Number of workers that this pool should manage.
74 * @param filePath - Path to the worker file.
75 * @param opts - Options for the pool.
76 */
77 public constructor (
78 public readonly numberOfWorkers: number,
79 public readonly filePath: string,
80 public readonly opts: PoolOptions<Worker>
81 ) {
82 if (!this.isMain()) {
83 throw new Error('Cannot start a pool from a worker!')
84 }
85 this.checkNumberOfWorkers(this.numberOfWorkers)
86 this.checkFilePath(this.filePath)
87 this.checkPoolOptions(this.opts)
88
89 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
90 this.executeTask = this.executeTask.bind(this)
91 this.enqueueTask = this.enqueueTask.bind(this)
92 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
93
94 this.setupHook()
95
96 for (let i = 1; i <= this.numberOfWorkers; i++) {
97 this.createAndSetupWorker()
98 }
99
100 if (this.opts.enableEvents === true) {
101 this.emitter = new PoolEmitter()
102 }
103 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
104 Worker,
105 Data,
106 Response
107 >(
108 this,
109 this.opts.workerChoiceStrategy,
110 this.opts.workerChoiceStrategyOptions
111 )
112 }
113
114 private checkFilePath (filePath: string): void {
115 if (
116 filePath == null ||
117 (typeof filePath === 'string' && filePath.trim().length === 0)
118 ) {
119 throw new Error('Please specify a file with a worker implementation')
120 }
121 }
122
123 private checkNumberOfWorkers (numberOfWorkers: number): void {
124 if (numberOfWorkers == null) {
125 throw new Error(
126 'Cannot instantiate a pool without specifying the number of workers'
127 )
128 } else if (!Number.isSafeInteger(numberOfWorkers)) {
129 throw new TypeError(
130 'Cannot instantiate a pool with a non safe integer number of workers'
131 )
132 } else if (numberOfWorkers < 0) {
133 throw new RangeError(
134 'Cannot instantiate a pool with a negative number of workers'
135 )
136 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
137 throw new Error('Cannot instantiate a fixed pool with no worker')
138 }
139 }
140
141 private checkPoolOptions (opts: PoolOptions<Worker>): void {
142 if (isPlainObject(opts)) {
143 this.opts.workerChoiceStrategy =
144 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
145 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
146 this.opts.workerChoiceStrategyOptions =
147 opts.workerChoiceStrategyOptions ??
148 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
149 this.checkValidWorkerChoiceStrategyOptions(
150 this.opts.workerChoiceStrategyOptions
151 )
152 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
153 this.opts.enableEvents = opts.enableEvents ?? true
154 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
155 if (this.opts.enableTasksQueue) {
156 this.checkValidTasksQueueOptions(
157 opts.tasksQueueOptions as TasksQueueOptions
158 )
159 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 }
163 } else {
164 throw new TypeError('Invalid pool options: must be a plain object')
165 }
166 }
167
168 private checkValidWorkerChoiceStrategy (
169 workerChoiceStrategy: WorkerChoiceStrategy
170 ): void {
171 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
172 throw new Error(
173 `Invalid worker choice strategy '${workerChoiceStrategy}'`
174 )
175 }
176 }
177
178 private checkValidWorkerChoiceStrategyOptions (
179 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
180 ): void {
181 if (!isPlainObject(workerChoiceStrategyOptions)) {
182 throw new TypeError(
183 'Invalid worker choice strategy options: must be a plain object'
184 )
185 }
186 if (
187 workerChoiceStrategyOptions.weights != null &&
188 Object.keys(workerChoiceStrategyOptions.weights).length !== this.size
189 ) {
190 throw new Error(
191 'Invalid worker choice strategy options: must have a weight for each worker node'
192 )
193 }
194 }
195
196 private checkValidTasksQueueOptions (
197 tasksQueueOptions: TasksQueueOptions
198 ): void {
199 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
200 throw new TypeError('Invalid tasks queue options: must be a plain object')
201 }
202 if ((tasksQueueOptions?.concurrency as number) <= 0) {
203 throw new Error(
204 `Invalid worker tasks concurrency '${
205 tasksQueueOptions.concurrency as number
206 }'`
207 )
208 }
209 }
210
211 /** @inheritDoc */
212 public abstract get type (): PoolType
213
214 /** @inheritDoc */
215 public abstract get size (): number
216
217 /**
218 * Number of tasks running in the pool.
219 */
220 private get numberOfRunningTasks (): number {
221 return this.workerNodes.reduce(
222 (accumulator, workerNode) => accumulator + workerNode.tasksUsage.running,
223 0
224 )
225 }
226
227 /**
228 * Number of tasks queued in the pool.
229 */
230 private get numberOfQueuedTasks (): number {
231 if (this.opts.enableTasksQueue === false) {
232 return 0
233 }
234 return this.workerNodes.reduce(
235 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
236 0
237 )
238 }
239
240 /**
241 * Gets the given worker its worker node key.
242 *
243 * @param worker - The worker.
244 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
245 */
246 private getWorkerNodeKey (worker: Worker): number {
247 return this.workerNodes.findIndex(
248 workerNode => workerNode.worker === worker
249 )
250 }
251
252 /** @inheritDoc */
253 public setWorkerChoiceStrategy (
254 workerChoiceStrategy: WorkerChoiceStrategy,
255 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
256 ): void {
257 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
258 this.opts.workerChoiceStrategy = workerChoiceStrategy
259 for (const workerNode of this.workerNodes) {
260 this.setWorkerNodeTasksUsage(workerNode, {
261 run: 0,
262 running: 0,
263 runTime: 0,
264 runTimeHistory: new CircularArray(),
265 avgRunTime: 0,
266 medRunTime: 0,
267 waitTime: 0,
268 waitTimeHistory: new CircularArray(),
269 avgWaitTime: 0,
270 medWaitTime: 0,
271 error: 0
272 })
273 }
274 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
275 this.opts.workerChoiceStrategy
276 )
277 if (workerChoiceStrategyOptions != null) {
278 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
279 }
280 }
281
282 /** @inheritDoc */
283 public setWorkerChoiceStrategyOptions (
284 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
285 ): void {
286 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
287 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
288 this.workerChoiceStrategyContext.setOptions(
289 this.opts.workerChoiceStrategyOptions
290 )
291 }
292
293 /** @inheritDoc */
294 public enableTasksQueue (
295 enable: boolean,
296 tasksQueueOptions?: TasksQueueOptions
297 ): void {
298 if (this.opts.enableTasksQueue === true && !enable) {
299 this.flushTasksQueues()
300 }
301 this.opts.enableTasksQueue = enable
302 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
303 }
304
305 /** @inheritDoc */
306 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
307 if (this.opts.enableTasksQueue === true) {
308 this.checkValidTasksQueueOptions(tasksQueueOptions)
309 this.opts.tasksQueueOptions =
310 this.buildTasksQueueOptions(tasksQueueOptions)
311 } else {
312 delete this.opts.tasksQueueOptions
313 }
314 }
315
316 private buildTasksQueueOptions (
317 tasksQueueOptions: TasksQueueOptions
318 ): TasksQueueOptions {
319 return {
320 concurrency: tasksQueueOptions?.concurrency ?? 1
321 }
322 }
323
324 /**
325 * Whether the pool is full or not.
326 *
327 * The pool filling boolean status.
328 */
329 protected abstract get full (): boolean
330
331 /**
332 * Whether the pool is busy or not.
333 *
334 * The pool busyness boolean status.
335 */
336 protected abstract get busy (): boolean
337
338 protected internalBusy (): boolean {
339 return (
340 this.workerNodes.findIndex(workerNode => {
341 return workerNode.tasksUsage.running === 0
342 }) === -1
343 )
344 }
345
346 /** @inheritDoc */
347 public async execute (data?: Data, name?: string): Promise<Response> {
348 const submissionTimestamp = performance.now()
349 const workerNodeKey = this.chooseWorkerNode()
350 const submittedTask: Task<Data> = {
351 name,
352 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
353 data: data ?? ({} as Data),
354 submissionTimestamp,
355 id: crypto.randomUUID()
356 }
357 const res = new Promise<Response>((resolve, reject) => {
358 this.promiseResponseMap.set(submittedTask.id as string, {
359 resolve,
360 reject,
361 worker: this.workerNodes[workerNodeKey].worker
362 })
363 })
364 if (
365 this.opts.enableTasksQueue === true &&
366 (this.busy ||
367 this.workerNodes[workerNodeKey].tasksUsage.running >=
368 ((this.opts.tasksQueueOptions as TasksQueueOptions)
369 .concurrency as number))
370 ) {
371 this.enqueueTask(workerNodeKey, submittedTask)
372 } else {
373 this.executeTask(workerNodeKey, submittedTask)
374 }
375 this.workerChoiceStrategyContext.update(workerNodeKey)
376 this.checkAndEmitEvents()
377 // eslint-disable-next-line @typescript-eslint/return-await
378 return res
379 }
380
381 /** @inheritDoc */
382 public async destroy (): Promise<void> {
383 await Promise.all(
384 this.workerNodes.map(async (workerNode, workerNodeKey) => {
385 this.flushTasksQueue(workerNodeKey)
386 await this.destroyWorker(workerNode.worker)
387 })
388 )
389 }
390
391 /**
392 * Shutdowns the given worker.
393 *
394 * @param worker - A worker within `workerNodes`.
395 */
396 protected abstract destroyWorker (worker: Worker): void | Promise<void>
397
398 /**
399 * Setup hook to execute code before worker node are created in the abstract constructor.
400 * Can be overridden
401 *
402 * @virtual
403 */
404 protected setupHook (): void {
405 // Intentionally empty
406 }
407
408 /**
409 * Should return whether the worker is the main worker or not.
410 */
411 protected abstract isMain (): boolean
412
413 /**
414 * Hook executed before the worker task execution.
415 * Can be overridden.
416 *
417 * @param workerNodeKey - The worker node key.
418 */
419 protected beforeTaskExecutionHook (workerNodeKey: number): void {
420 ++this.workerNodes[workerNodeKey].tasksUsage.running
421 }
422
423 /**
424 * Hook executed after the worker task execution.
425 * Can be overridden.
426 *
427 * @param worker - The worker.
428 * @param message - The received message.
429 */
430 protected afterTaskExecutionHook (
431 worker: Worker,
432 message: MessageValue<Response>
433 ): void {
434 const workerTasksUsage =
435 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
436 --workerTasksUsage.running
437 ++workerTasksUsage.run
438 if (message.error != null) {
439 ++workerTasksUsage.error
440 }
441 this.updateRunTimeTasksUsage(workerTasksUsage, message)
442 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
443 }
444
445 private updateRunTimeTasksUsage (
446 workerTasksUsage: TasksUsage,
447 message: MessageValue<Response>
448 ): void {
449 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
450 workerTasksUsage.runTime += message.runTime ?? 0
451 if (
452 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
453 workerTasksUsage.run !== 0
454 ) {
455 workerTasksUsage.avgRunTime =
456 workerTasksUsage.runTime / workerTasksUsage.run
457 }
458 if (
459 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
460 message.runTime != null
461 ) {
462 workerTasksUsage.runTimeHistory.push(message.runTime)
463 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
464 }
465 }
466 }
467
468 private updateWaitTimeTasksUsage (
469 workerTasksUsage: TasksUsage,
470 message: MessageValue<Response>
471 ): void {
472 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
473 workerTasksUsage.waitTime += message.waitTime ?? 0
474 if (
475 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
476 workerTasksUsage.run !== 0
477 ) {
478 workerTasksUsage.avgWaitTime =
479 workerTasksUsage.waitTime / workerTasksUsage.run
480 }
481 if (
482 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
483 message.waitTime != null
484 ) {
485 workerTasksUsage.waitTimeHistory.push(message.waitTime)
486 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
487 }
488 }
489 }
490
491 /**
492 * Chooses a worker node for the next task.
493 *
494 * The default worker choice strategy uses a round robin algorithm to distribute the load.
495 *
496 * @returns The worker node key
497 */
498 protected chooseWorkerNode (): number {
499 let workerNodeKey: number
500 if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) {
501 const workerCreated = this.createAndSetupWorker()
502 this.registerWorkerMessageListener(workerCreated, message => {
503 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
504 if (
505 isKillBehavior(KillBehaviors.HARD, message.kill) ||
506 (message.kill != null &&
507 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
508 ) {
509 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
510 this.flushTasksQueue(currentWorkerNodeKey)
511 void (this.destroyWorker(workerCreated) as Promise<void>)
512 }
513 })
514 workerNodeKey = this.getWorkerNodeKey(workerCreated)
515 } else {
516 workerNodeKey = this.workerChoiceStrategyContext.execute()
517 }
518 return workerNodeKey
519 }
520
521 /**
522 * Sends a message to the given worker.
523 *
524 * @param worker - The worker which should receive the message.
525 * @param message - The message.
526 */
527 protected abstract sendToWorker (
528 worker: Worker,
529 message: MessageValue<Data>
530 ): void
531
532 /**
533 * Registers a listener callback on the given worker.
534 *
535 * @param worker - The worker which should register a listener.
536 * @param listener - The message listener callback.
537 */
538 protected abstract registerWorkerMessageListener<
539 Message extends Data | Response
540 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
541
542 /**
543 * Returns a newly created worker.
544 */
545 protected abstract createWorker (): Worker
546
547 /**
548 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
549 *
550 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
551 *
552 * @param worker - The newly created worker.
553 */
554 protected abstract afterWorkerSetup (worker: Worker): void
555
556 /**
557 * Creates a new worker and sets it up completely in the pool worker nodes.
558 *
559 * @returns New, completely set up worker.
560 */
561 protected createAndSetupWorker (): Worker {
562 const worker = this.createWorker()
563
564 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
565 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
566 worker.on('error', error => {
567 if (this.emitter != null) {
568 this.emitter.emit(PoolEvents.error, error)
569 }
570 })
571 if (this.opts.restartWorkerOnError === true) {
572 worker.on('error', () => {
573 this.createAndSetupWorker()
574 })
575 }
576 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
577 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
578 worker.once('exit', () => {
579 this.removeWorkerNode(worker)
580 })
581
582 this.pushWorkerNode(worker)
583
584 this.afterWorkerSetup(worker)
585
586 return worker
587 }
588
589 /**
590 * This function is the listener registered for each worker message.
591 *
592 * @returns The listener function to execute when a message is received from a worker.
593 */
594 protected workerListener (): (message: MessageValue<Response>) => void {
595 return message => {
596 if (message.id != null) {
597 // Task execution response received
598 const promiseResponse = this.promiseResponseMap.get(message.id)
599 if (promiseResponse != null) {
600 if (message.error != null) {
601 promiseResponse.reject(message.error)
602 } else {
603 promiseResponse.resolve(message.data as Response)
604 }
605 this.afterTaskExecutionHook(promiseResponse.worker, message)
606 this.promiseResponseMap.delete(message.id)
607 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
608 if (
609 this.opts.enableTasksQueue === true &&
610 this.tasksQueueSize(workerNodeKey) > 0
611 ) {
612 this.executeTask(
613 workerNodeKey,
614 this.dequeueTask(workerNodeKey) as Task<Data>
615 )
616 }
617 }
618 }
619 }
620 }
621
622 private checkAndEmitEvents (): void {
623 if (this.emitter != null) {
624 if (this.busy) {
625 this.emitter?.emit(PoolEvents.busy)
626 }
627 if (this.type === PoolType.DYNAMIC && this.full) {
628 this.emitter?.emit(PoolEvents.full)
629 }
630 }
631 }
632
633 /**
634 * Sets the given worker node its tasks usage in the pool.
635 *
636 * @param workerNode - The worker node.
637 * @param tasksUsage - The worker node tasks usage.
638 */
639 private setWorkerNodeTasksUsage (
640 workerNode: WorkerNode<Worker, Data>,
641 tasksUsage: TasksUsage
642 ): void {
643 workerNode.tasksUsage = tasksUsage
644 }
645
646 /**
647 * Pushes the given worker in the pool worker nodes.
648 *
649 * @param worker - The worker.
650 * @returns The worker nodes length.
651 */
652 private pushWorkerNode (worker: Worker): number {
653 return this.workerNodes.push({
654 worker,
655 tasksUsage: {
656 run: 0,
657 running: 0,
658 runTime: 0,
659 runTimeHistory: new CircularArray(),
660 avgRunTime: 0,
661 medRunTime: 0,
662 waitTime: 0,
663 waitTimeHistory: new CircularArray(),
664 avgWaitTime: 0,
665 medWaitTime: 0,
666 error: 0
667 },
668 tasksQueue: new Queue<Task<Data>>()
669 })
670 }
671
672 /**
673 * Sets the given worker in the pool worker nodes.
674 *
675 * @param workerNodeKey - The worker node key.
676 * @param worker - The worker.
677 * @param tasksUsage - The worker tasks usage.
678 * @param tasksQueue - The worker task queue.
679 */
680 private setWorkerNode (
681 workerNodeKey: number,
682 worker: Worker,
683 tasksUsage: TasksUsage,
684 tasksQueue: Queue<Task<Data>>
685 ): void {
686 this.workerNodes[workerNodeKey] = {
687 worker,
688 tasksUsage,
689 tasksQueue
690 }
691 }
692
693 /**
694 * Removes the given worker from the pool worker nodes.
695 *
696 * @param worker - The worker.
697 */
698 private removeWorkerNode (worker: Worker): void {
699 const workerNodeKey = this.getWorkerNodeKey(worker)
700 if (workerNodeKey !== -1) {
701 this.workerNodes.splice(workerNodeKey, 1)
702 this.workerChoiceStrategyContext.remove(workerNodeKey)
703 }
704 }
705
706 private executeTask (workerNodeKey: number, task: Task<Data>): void {
707 this.beforeTaskExecutionHook(workerNodeKey)
708 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
709 }
710
711 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
712 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
713 }
714
715 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
716 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
717 }
718
719 private tasksQueueSize (workerNodeKey: number): number {
720 return this.workerNodes[workerNodeKey].tasksQueue.size
721 }
722
723 private flushTasksQueue (workerNodeKey: number): void {
724 if (this.tasksQueueSize(workerNodeKey) > 0) {
725 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
726 this.executeTask(
727 workerNodeKey,
728 this.dequeueTask(workerNodeKey) as Task<Data>
729 )
730 }
731 }
732 }
733
734 private flushTasksQueues (): void {
735 for (const [workerNodeKey] of this.workerNodes.entries()) {
736 this.flushTasksQueue(workerNodeKey)
737 }
738 }
739 }