perf: drastically reduce worker nodes array lookups
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
501aea93
JB
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
075e51d1 86 /**
adc9cc64 87 * Whether the pool is starting or not.
075e51d1
JB
88 */
89 private readonly starting: boolean
afe0d5bf
JB
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
729c563d
S
95 /**
96 * Constructs a new poolifier pool.
97 *
38e795c1 98 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 99 * @param filePath - Path to the worker file.
38e795c1 100 * @param opts - Options for the pool.
729c563d 101 */
c97c7edb 102 public constructor (
b4213b7f
JB
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
c97c7edb 106 ) {
78cea37e 107 if (!this.isMain()) {
c97c7edb
S
108 throw new Error('Cannot start a pool from a worker!')
109 }
8d3782fa 110 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 111 this.checkFilePath(this.filePath)
7c0ba920 112 this.checkPoolOptions(this.opts)
1086026a 113
7254e419
JB
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
117 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 118
6bd72cd0 119 if (this.opts.enableEvents === true) {
7c0ba920
JB
120 this.emitter = new PoolEmitter()
121 }
d59df138
JB
122 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
123 Worker,
124 Data,
125 Response
da309861
JB
126 >(
127 this,
128 this.opts.workerChoiceStrategy,
129 this.opts.workerChoiceStrategyOptions
130 )
b6b32453
JB
131
132 this.setupHook()
133
075e51d1 134 this.starting = true
e761c033 135 this.startPool()
075e51d1 136 this.starting = false
afe0d5bf
JB
137
138 this.startTimestamp = performance.now()
c97c7edb
S
139 }
140
a35560ba 141 private checkFilePath (filePath: string): void {
ffcbbad8
JB
142 if (
143 filePath == null ||
3d6dd312 144 typeof filePath !== 'string' ||
ffcbbad8
JB
145 (typeof filePath === 'string' && filePath.trim().length === 0)
146 ) {
c510fea7
APA
147 throw new Error('Please specify a file with a worker implementation')
148 }
3d6dd312
JB
149 if (!existsSync(filePath)) {
150 throw new Error(`Cannot find the worker file '${filePath}'`)
151 }
c510fea7
APA
152 }
153
8d3782fa
JB
154 private checkNumberOfWorkers (numberOfWorkers: number): void {
155 if (numberOfWorkers == null) {
156 throw new Error(
157 'Cannot instantiate a pool without specifying the number of workers'
158 )
78cea37e 159 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 160 throw new TypeError(
0d80593b 161 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
162 )
163 } else if (numberOfWorkers < 0) {
473c717a 164 throw new RangeError(
8d3782fa
JB
165 'Cannot instantiate a pool with a negative number of workers'
166 )
6b27d407 167 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
168 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
169 }
170 }
171
172 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
173 if (this.type === PoolTypes.dynamic) {
174 if (min > max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
177 )
b97d82d8 178 } else if (max === 0) {
079de991 179 throw new RangeError(
b97d82d8 180 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
181 )
182 } else if (min === max) {
183 throw new RangeError(
184 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
185 )
186 }
8d3782fa
JB
187 }
188 }
189
7c0ba920 190 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
191 if (isPlainObject(opts)) {
192 this.opts.workerChoiceStrategy =
193 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
194 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
195 this.opts.workerChoiceStrategyOptions =
196 opts.workerChoiceStrategyOptions ??
197 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
198 this.checkValidWorkerChoiceStrategyOptions(
199 this.opts.workerChoiceStrategyOptions
200 )
1f68cede 201 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
202 this.opts.enableEvents = opts.enableEvents ?? true
203 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
204 if (this.opts.enableTasksQueue) {
205 this.checkValidTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 214 }
aee46736
JB
215 }
216
217 private checkValidWorkerChoiceStrategy (
218 workerChoiceStrategy: WorkerChoiceStrategy
219 ): void {
220 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 221 throw new Error(
aee46736 222 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
223 )
224 }
7c0ba920
JB
225 }
226
0d80593b
JB
227 private checkValidWorkerChoiceStrategyOptions (
228 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
229 ): void {
230 if (!isPlainObject(workerChoiceStrategyOptions)) {
231 throw new TypeError(
232 'Invalid worker choice strategy options: must be a plain object'
233 )
234 }
49be33fe
JB
235 if (
236 workerChoiceStrategyOptions.weights != null &&
6b27d407 237 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
238 ) {
239 throw new Error(
240 'Invalid worker choice strategy options: must have a weight for each worker node'
241 )
242 }
f0d7f803
JB
243 if (
244 workerChoiceStrategyOptions.measurement != null &&
245 !Object.values(Measurements).includes(
246 workerChoiceStrategyOptions.measurement
247 )
248 ) {
249 throw new Error(
250 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
251 )
252 }
0d80593b
JB
253 }
254
a20f0ba5
JB
255 private checkValidTasksQueueOptions (
256 tasksQueueOptions: TasksQueueOptions
257 ): void {
0d80593b
JB
258 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
259 throw new TypeError('Invalid tasks queue options: must be a plain object')
260 }
f0d7f803
JB
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 !Number.isSafeInteger(tasksQueueOptions.concurrency)
264 ) {
265 throw new TypeError(
266 'Invalid worker tasks concurrency: must be an integer'
267 )
268 }
269 if (
270 tasksQueueOptions?.concurrency != null &&
271 tasksQueueOptions.concurrency <= 0
272 ) {
a20f0ba5 273 throw new Error(
f0d7f803 274 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
275 )
276 }
277 }
278
e761c033
JB
279 private startPool (): void {
280 while (
281 this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
284 0
285 ) < this.numberOfWorkers
286 ) {
aa9eede8 287 this.createAndSetupWorkerNode()
e761c033
JB
288 }
289 }
290
08f3f44c 291 /** @inheritDoc */
6b27d407
JB
292 public get info (): PoolInfo {
293 return {
23ccf9d7 294 version,
6b27d407 295 type: this.type,
184855e6 296 worker: this.worker,
2431bdb4
JB
297 ready: this.ready,
298 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
299 minSize: this.minSize,
300 maxSize: this.maxSize,
c05f0d50
JB
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate &&
1305e9a8
JB
303 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
305 workerNodes: this.workerNodes.length,
306 idleWorkerNodes: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
f59e1027 308 workerNode.usage.tasks.executing === 0
a4e07f72
JB
309 ? accumulator + 1
310 : accumulator,
6b27d407
JB
311 0
312 ),
313 busyWorkerNodes: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
f59e1027 315 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
316 0
317 ),
a4e07f72 318 executedTasks: this.workerNodes.reduce(
6b27d407 319 (accumulator, workerNode) =>
f59e1027 320 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
321 0
322 ),
323 executingTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
f59e1027 325 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
326 0
327 ),
328 queuedTasks: this.workerNodes.reduce(
df593701 329 (accumulator, workerNode) =>
f59e1027 330 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
331 0
332 ),
333 maxQueuedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
b25a42cd 335 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 336 0
a4e07f72
JB
337 ),
338 failedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
f59e1027 340 accumulator + workerNode.usage.tasks.failed,
a4e07f72 341 0
1dcf8b7b
JB
342 ),
343 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
344 .runTime.aggregate && {
345 runTime: {
98e72cda
JB
346 minimum: round(
347 Math.min(
348 ...this.workerNodes.map(
349 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
350 )
1dcf8b7b
JB
351 )
352 ),
98e72cda
JB
353 maximum: round(
354 Math.max(
355 ...this.workerNodes.map(
356 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
357 )
1dcf8b7b 358 )
98e72cda
JB
359 ),
360 average: round(
361 this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
364 0
365 ) /
366 this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + (workerNode.usage.tasks?.executed ?? 0),
369 0
370 )
371 ),
372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
373 .runTime.median && {
374 median: round(
375 median(
376 this.workerNodes.map(
377 workerNode => workerNode.usage.runTime?.median ?? 0
378 )
379 )
380 )
381 })
1dcf8b7b
JB
382 }
383 }),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .waitTime.aggregate && {
386 waitTime: {
98e72cda
JB
387 minimum: round(
388 Math.min(
389 ...this.workerNodes.map(
390 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
391 )
1dcf8b7b
JB
392 )
393 ),
98e72cda
JB
394 maximum: round(
395 Math.max(
396 ...this.workerNodes.map(
397 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
398 )
1dcf8b7b 399 )
98e72cda
JB
400 ),
401 average: round(
402 this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
405 0
406 ) /
407 this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + (workerNode.usage.tasks?.executed ?? 0),
410 0
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .waitTime.median && {
415 median: round(
416 median(
417 this.workerNodes.map(
418 workerNode => workerNode.usage.waitTime?.median ?? 0
419 )
420 )
421 )
422 })
1dcf8b7b
JB
423 }
424 })
6b27d407
JB
425 }
426 }
08f3f44c 427
aa9eede8
JB
428 /**
429 * The pool readiness boolean status.
430 */
2431bdb4
JB
431 private get ready (): boolean {
432 return (
b97d82d8
JB
433 this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 !workerNode.info.dynamic && workerNode.info.ready
436 ? accumulator + 1
437 : accumulator,
438 0
439 ) >= this.minSize
2431bdb4
JB
440 )
441 }
442
afe0d5bf 443 /**
aa9eede8 444 * The approximate pool utilization.
afe0d5bf
JB
445 *
446 * @returns The pool utilization.
447 */
448 private get utilization (): number {
8e5ca040 449 const poolTimeCapacity =
fe7d90db 450 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
451 const totalTasksRunTime = this.workerNodes.reduce(
452 (accumulator, workerNode) =>
71514351 453 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
454 0
455 )
456 const totalTasksWaitTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
71514351 458 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
459 0
460 )
8e5ca040 461 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
462 }
463
8881ae32 464 /**
aa9eede8 465 * The pool type.
8881ae32
JB
466 *
467 * If it is `'dynamic'`, it provides the `max` property.
468 */
469 protected abstract get type (): PoolType
470
184855e6 471 /**
aa9eede8 472 * The worker type.
184855e6
JB
473 */
474 protected abstract get worker (): WorkerType
475
c2ade475 476 /**
aa9eede8 477 * The pool minimum size.
c2ade475 478 */
6b27d407 479 protected abstract get minSize (): number
ff733df7
JB
480
481 /**
aa9eede8 482 * The pool maximum size.
ff733df7 483 */
6b27d407 484 protected abstract get maxSize (): number
a35560ba 485
6b813701
JB
486 /**
487 * Checks if the worker id sent in the received message from a worker is valid.
488 *
489 * @param message - The received message.
490 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
491 */
21f710aa
JB
492 private checkMessageWorkerId (message: MessageValue<Response>): void {
493 if (
494 message.workerId != null &&
aa9eede8 495 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
21f710aa
JB
496 ) {
497 throw new Error(
498 `Worker message received from unknown worker '${message.workerId}'`
499 )
500 }
501 }
502
ffcbbad8 503 /**
f06e48d8 504 * Gets the given worker its worker node key.
ffcbbad8
JB
505 *
506 * @param worker - The worker.
f59e1027 507 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 508 */
aa9eede8 509 private getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
510 return this.workerNodes.findIndex(
511 workerNode => workerNode.worker === worker
512 )
bf9549ae
JB
513 }
514
aa9eede8
JB
515 /**
516 * Gets the worker node key given its worker id.
517 *
518 * @param workerId - The worker id.
519 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
520 */
521 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
522 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
523 if (workerNode.info.id === workerId) {
524 return workerNodeKey
525 }
526 }
527 }
528
afc003b2 529 /** @inheritDoc */
a35560ba 530 public setWorkerChoiceStrategy (
59219cbb
JB
531 workerChoiceStrategy: WorkerChoiceStrategy,
532 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 533 ): void {
aee46736 534 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 535 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
536 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
537 this.opts.workerChoiceStrategy
538 )
539 if (workerChoiceStrategyOptions != null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
541 }
aa9eede8 542 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 543 workerNode.resetUsage()
aa9eede8 544 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 545 }
a20f0ba5
JB
546 }
547
548 /** @inheritDoc */
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
551 ): void {
0d80593b 552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
553 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
554 this.workerChoiceStrategyContext.setOptions(
555 this.opts.workerChoiceStrategyOptions
a35560ba
S
556 )
557 }
558
a20f0ba5 559 /** @inheritDoc */
8f52842f
JB
560 public enableTasksQueue (
561 enable: boolean,
562 tasksQueueOptions?: TasksQueueOptions
563 ): void {
a20f0ba5 564 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 565 this.flushTasksQueues()
a20f0ba5
JB
566 }
567 this.opts.enableTasksQueue = enable
8f52842f 568 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
569 }
570
571 /** @inheritDoc */
8f52842f 572 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 573 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
574 this.checkValidTasksQueueOptions(tasksQueueOptions)
575 this.opts.tasksQueueOptions =
576 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 577 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
578 delete this.opts.tasksQueueOptions
579 }
580 }
581
582 private buildTasksQueueOptions (
583 tasksQueueOptions: TasksQueueOptions
584 ): TasksQueueOptions {
585 return {
586 concurrency: tasksQueueOptions?.concurrency ?? 1
587 }
588 }
589
c319c66b
JB
590 /**
591 * Whether the pool is full or not.
592 *
593 * The pool filling boolean status.
594 */
dea903a8
JB
595 protected get full (): boolean {
596 return this.workerNodes.length >= this.maxSize
597 }
c2ade475 598
c319c66b
JB
599 /**
600 * Whether the pool is busy or not.
601 *
602 * The pool busyness boolean status.
603 */
604 protected abstract get busy (): boolean
7c0ba920 605
6c6afb84
JB
606 /**
607 * Whether worker nodes are executing at least one task.
608 *
609 * @returns Worker nodes busyness boolean status.
610 */
c2ade475 611 protected internalBusy (): boolean {
e0ae6100
JB
612 return (
613 this.workerNodes.findIndex(workerNode => {
f59e1027 614 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
615 }) === -1
616 )
cb70b19d
JB
617 }
618
afc003b2 619 /** @inheritDoc */
a86b6df1 620 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
621 return await new Promise<Response>((resolve, reject) => {
622 const timestamp = performance.now()
623 const workerNodeKey = this.chooseWorkerNode()
501aea93 624 const task: Task<Data> = {
52b71763
JB
625 name: name ?? DEFAULT_TASK_NAME,
626 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
627 data: data ?? ({} as Data),
628 timestamp,
629 workerId: this.getWorkerInfo(workerNodeKey).id as number,
630 id: randomUUID()
631 }
501aea93 632 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
633 resolve,
634 reject,
501aea93 635 workerNodeKey
2e81254d 636 })
52b71763
JB
637 if (
638 this.opts.enableTasksQueue === true &&
639 (this.busy ||
640 this.workerNodes[workerNodeKey].usage.tasks.executing >=
641 ((this.opts.tasksQueueOptions as TasksQueueOptions)
642 .concurrency as number))
643 ) {
501aea93 644 this.enqueueTask(workerNodeKey, task)
52b71763 645 } else {
501aea93 646 this.executeTask(workerNodeKey, task)
52b71763
JB
647 }
648 this.checkAndEmitEvents()
2e81254d 649 })
280c2a77 650 }
c97c7edb 651
afc003b2 652 /** @inheritDoc */
c97c7edb 653 public async destroy (): Promise<void> {
1fbcaa7c 654 await Promise.all(
875a7c37
JB
655 this.workerNodes.map(async (workerNode, workerNodeKey) => {
656 this.flushTasksQueue(workerNodeKey)
47aacbaa 657 // FIXME: wait for tasks to be finished
920278a2
JB
658 const workerExitPromise = new Promise<void>(resolve => {
659 workerNode.worker.on('exit', () => {
660 resolve()
661 })
662 })
aa9eede8 663 await this.destroyWorkerNode(workerNodeKey)
920278a2 664 await workerExitPromise
1fbcaa7c
JB
665 })
666 )
c97c7edb
S
667 }
668
4a6952ff 669 /**
aa9eede8 670 * Terminates the worker node given its worker node key.
4a6952ff 671 *
aa9eede8 672 * @param workerNodeKey - The worker node key.
4a6952ff 673 */
aa9eede8
JB
674 protected abstract destroyWorkerNode (
675 workerNodeKey: number
676 ): void | Promise<void>
c97c7edb 677
729c563d 678 /**
6677a3d3
JB
679 * Setup hook to execute code before worker nodes are created in the abstract constructor.
680 * Can be overridden.
afc003b2
JB
681 *
682 * @virtual
729c563d 683 */
280c2a77 684 protected setupHook (): void {
d99ba5a8 685 // Intentionally empty
280c2a77 686 }
c97c7edb 687
729c563d 688 /**
280c2a77
S
689 * Should return whether the worker is the main worker or not.
690 */
691 protected abstract isMain (): boolean
692
693 /**
2e81254d 694 * Hook executed before the worker task execution.
bf9549ae 695 * Can be overridden.
729c563d 696 *
f06e48d8 697 * @param workerNodeKey - The worker node key.
1c6fe997 698 * @param task - The task to execute.
729c563d 699 */
1c6fe997
JB
700 protected beforeTaskExecutionHook (
701 workerNodeKey: number,
702 task: Task<Data>
703 ): void {
f59e1027 704 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
705 ++workerUsage.tasks.executing
706 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 707 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
708 task.name as string
709 ) as WorkerUsage
eb8afc8a
JB
710 ++taskWorkerUsage.tasks.executing
711 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
712 }
713
c01733f1 714 /**
2e81254d 715 * Hook executed after the worker task execution.
bf9549ae 716 * Can be overridden.
c01733f1 717 *
501aea93 718 * @param workerNodeKey - The worker node key.
38e795c1 719 * @param message - The received message.
c01733f1 720 */
2e81254d 721 protected afterTaskExecutionHook (
501aea93 722 workerNodeKey: number,
2740a743 723 message: MessageValue<Response>
bf9549ae 724 ): void {
ff128cc9 725 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
726 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
727 this.updateRunTimeWorkerUsage(workerUsage, message)
728 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 729 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 730 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 731 ) as WorkerUsage
eb8afc8a
JB
732 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
733 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
734 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
735 }
736
737 private updateTaskStatisticsWorkerUsage (
738 workerUsage: WorkerUsage,
739 message: MessageValue<Response>
740 ): void {
a4e07f72
JB
741 const workerTaskStatistics = workerUsage.tasks
742 --workerTaskStatistics.executing
98e72cda
JB
743 if (message.taskError == null) {
744 ++workerTaskStatistics.executed
745 } else {
a4e07f72 746 ++workerTaskStatistics.failed
2740a743 747 }
f8eb0a2a
JB
748 }
749
a4e07f72
JB
750 private updateRunTimeWorkerUsage (
751 workerUsage: WorkerUsage,
f8eb0a2a
JB
752 message: MessageValue<Response>
753 ): void {
e4f20deb
JB
754 updateMeasurementStatistics(
755 workerUsage.runTime,
756 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
757 message.taskPerformance?.runTime ?? 0,
758 workerUsage.tasks.executed
759 )
f8eb0a2a
JB
760 }
761
a4e07f72
JB
762 private updateWaitTimeWorkerUsage (
763 workerUsage: WorkerUsage,
1c6fe997 764 task: Task<Data>
f8eb0a2a 765 ): void {
1c6fe997
JB
766 const timestamp = performance.now()
767 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
768 updateMeasurementStatistics(
769 workerUsage.waitTime,
770 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
771 taskWaitTime,
772 workerUsage.tasks.executed
773 )
c01733f1 774 }
775
a4e07f72 776 private updateEluWorkerUsage (
5df69fab 777 workerUsage: WorkerUsage,
62c15a68
JB
778 message: MessageValue<Response>
779 ): void {
008512c7
JB
780 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
781 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
782 updateMeasurementStatistics(
783 workerUsage.elu.active,
008512c7 784 eluTaskStatisticsRequirements,
e4f20deb
JB
785 message.taskPerformance?.elu?.active ?? 0,
786 workerUsage.tasks.executed
787 )
788 updateMeasurementStatistics(
789 workerUsage.elu.idle,
008512c7 790 eluTaskStatisticsRequirements,
e4f20deb
JB
791 message.taskPerformance?.elu?.idle ?? 0,
792 workerUsage.tasks.executed
793 )
008512c7 794 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 795 if (message.taskPerformance?.elu != null) {
f7510105
JB
796 if (workerUsage.elu.utilization != null) {
797 workerUsage.elu.utilization =
798 (workerUsage.elu.utilization +
799 message.taskPerformance.elu.utilization) /
800 2
801 } else {
802 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
803 }
62c15a68
JB
804 }
805 }
806 }
807
280c2a77 808 /**
f06e48d8 809 * Chooses a worker node for the next task.
280c2a77 810 *
6c6afb84 811 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 812 *
aa9eede8 813 * @returns The chosen worker node key
280c2a77 814 */
6c6afb84 815 private chooseWorkerNode (): number {
930dcf12 816 if (this.shallCreateDynamicWorker()) {
aa9eede8 817 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
818 if (
819 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
820 ) {
aa9eede8 821 return workerNodeKey
6c6afb84 822 }
17393ac8 823 }
930dcf12
JB
824 return this.workerChoiceStrategyContext.execute()
825 }
826
6c6afb84
JB
827 /**
828 * Conditions for dynamic worker creation.
829 *
830 * @returns Whether to create a dynamic worker or not.
831 */
832 private shallCreateDynamicWorker (): boolean {
930dcf12 833 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
834 }
835
280c2a77 836 /**
aa9eede8 837 * Sends a message to worker given its worker node key.
280c2a77 838 *
aa9eede8 839 * @param workerNodeKey - The worker node key.
38e795c1 840 * @param message - The message.
280c2a77
S
841 */
842 protected abstract sendToWorker (
aa9eede8 843 workerNodeKey: number,
280c2a77
S
844 message: MessageValue<Data>
845 ): void
846
729c563d 847 /**
41344292 848 * Creates a new worker.
6c6afb84
JB
849 *
850 * @returns Newly created worker.
729c563d 851 */
280c2a77 852 protected abstract createWorker (): Worker
c97c7edb 853
4a6952ff 854 /**
aa9eede8 855 * Creates a new, completely set up worker node.
4a6952ff 856 *
aa9eede8 857 * @returns New, completely set up worker node key.
4a6952ff 858 */
aa9eede8 859 protected createAndSetupWorkerNode (): number {
bdacc2d2 860 const worker = this.createWorker()
280c2a77 861
35cf1c03 862 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 863 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 864 worker.on('error', error => {
9b106837
JB
865 const workerNodeKey = this.getWorkerNodeKey(worker)
866 const workerInfo = this.getWorkerInfo(workerNodeKey)
867 workerInfo.ready = false
0dc838e3 868 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 869 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 870 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 871 if (workerInfo.dynamic) {
aa9eede8 872 this.createAndSetupDynamicWorkerNode()
8a1260a3 873 } else {
aa9eede8 874 this.createAndSetupWorkerNode()
8a1260a3 875 }
5baee0d7 876 }
19dbc45b 877 if (this.opts.enableTasksQueue === true) {
9b106837 878 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 879 }
5baee0d7 880 })
a35560ba
S
881 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
882 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 883 worker.once('exit', () => {
f06e48d8 884 this.removeWorkerNode(worker)
a974afa6 885 })
280c2a77 886
aa9eede8 887 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 888
aa9eede8 889 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 890
aa9eede8 891 return workerNodeKey
c97c7edb 892 }
be0676b3 893
930dcf12 894 /**
aa9eede8 895 * Creates a new, completely set up dynamic worker node.
930dcf12 896 *
aa9eede8 897 * @returns New, completely set up dynamic worker node key.
930dcf12 898 */
aa9eede8
JB
899 protected createAndSetupDynamicWorkerNode (): number {
900 const workerNodeKey = this.createAndSetupWorkerNode()
901 this.registerWorkerMessageListener(workerNodeKey, message => {
902 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
903 message.workerId
904 ) as number
905 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
930dcf12
JB
906 if (
907 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
908 (message.kill != null &&
909 ((this.opts.enableTasksQueue === false &&
aa9eede8 910 workerUsage.tasks.executing === 0) ||
7b56f532 911 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
912 workerUsage.tasks.executing === 0 &&
913 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12
JB
914 ) {
915 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
aa9eede8 916 void (this.destroyWorkerNode(localWorkerNodeKey) as Promise<void>)
930dcf12
JB
917 }
918 })
aa9eede8 919 const workerInfo = this.getWorkerInfo(workerNodeKey)
b0a4db63 920 workerInfo.dynamic = true
b97d82d8
JB
921 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
922 workerInfo.ready = true
923 }
aa9eede8 924 this.sendToWorker(workerNodeKey, {
b0a4db63 925 checkActive: true,
21f710aa
JB
926 workerId: workerInfo.id as number
927 })
aa9eede8 928 return workerNodeKey
930dcf12
JB
929 }
930
a2ed5053 931 /**
aa9eede8 932 * Registers a listener callback on the worker given its worker node key.
a2ed5053 933 *
aa9eede8 934 * @param workerNodeKey - The worker node key.
a2ed5053
JB
935 * @param listener - The message listener callback.
936 */
85aeb3f3
JB
937 protected abstract registerWorkerMessageListener<
938 Message extends Data | Response
aa9eede8
JB
939 >(
940 workerNodeKey: number,
941 listener: (message: MessageValue<Message>) => void
942 ): void
a2ed5053
JB
943
944 /**
aa9eede8 945 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
946 * Can be overridden.
947 *
aa9eede8 948 * @param workerNodeKey - The newly created worker node key.
a2ed5053 949 */
aa9eede8 950 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 951 // Listen to worker messages.
aa9eede8 952 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 953 // Send the startup message to worker.
aa9eede8
JB
954 this.sendStartupMessageToWorker(workerNodeKey)
955 // Send the worker statistics message to worker.
956 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
957 }
958
85aeb3f3 959 /**
aa9eede8
JB
960 * Sends the startup message to worker given its worker node key.
961 *
962 * @param workerNodeKey - The worker node key.
963 */
964 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
965
966 /**
967 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 968 *
aa9eede8 969 * @param workerNodeKey - The worker node key.
85aeb3f3 970 */
aa9eede8
JB
971 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
972 this.sendToWorker(workerNodeKey, {
973 statistics: {
974 runTime:
975 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
976 .runTime.aggregate,
977 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
978 .elu.aggregate
979 },
980 workerId: this.getWorkerInfo(workerNodeKey).id as number
981 })
982 }
a2ed5053
JB
983
984 private redistributeQueuedTasks (workerNodeKey: number): void {
985 while (this.tasksQueueSize(workerNodeKey) > 0) {
986 let targetWorkerNodeKey: number = workerNodeKey
987 let minQueuedTasks = Infinity
988 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
989 const workerInfo = this.getWorkerInfo(workerNodeId)
990 if (
991 workerNodeId !== workerNodeKey &&
992 workerInfo.ready &&
993 workerNode.usage.tasks.queued === 0
994 ) {
995 targetWorkerNodeKey = workerNodeId
996 break
997 }
998 if (
999 workerNodeId !== workerNodeKey &&
1000 workerInfo.ready &&
1001 workerNode.usage.tasks.queued < minQueuedTasks
1002 ) {
1003 minQueuedTasks = workerNode.usage.tasks.queued
1004 targetWorkerNodeKey = workerNodeId
1005 }
1006 }
1007 this.enqueueTask(
1008 targetWorkerNodeKey,
1009 this.dequeueTask(workerNodeKey) as Task<Data>
1010 )
1011 }
1012 }
1013
be0676b3 1014 /**
aa9eede8 1015 * This method is the listener registered for each worker message.
be0676b3 1016 *
bdacc2d2 1017 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1018 */
1019 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1020 return message => {
21f710aa 1021 this.checkMessageWorkerId(message)
d2c73f82 1022 if (message.ready != null) {
10e2aa7e
JB
1023 // Worker ready response received
1024 this.handleWorkerReadyResponse(message)
f59e1027 1025 } else if (message.id != null) {
a3445496 1026 // Task execution response received
6b272951
JB
1027 this.handleTaskExecutionResponse(message)
1028 }
1029 }
1030 }
1031
10e2aa7e 1032 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8
JB
1033 this.getWorkerInfo(
1034 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
e221309a 1035 ).ready = message.ready as boolean
2431bdb4
JB
1036 if (this.emitter != null && this.ready) {
1037 this.emitter.emit(PoolEvents.ready, this.info)
1038 }
6b272951
JB
1039 }
1040
1041 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1042 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1043 if (promiseResponse != null) {
1044 if (message.taskError != null) {
2a69b8c5 1045 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1046 promiseResponse.reject(message.taskError.message)
1047 } else {
1048 promiseResponse.resolve(message.data as Response)
1049 }
501aea93
JB
1050 const workerNodeKey = promiseResponse.workerNodeKey
1051 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1052 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1053 if (
1054 this.opts.enableTasksQueue === true &&
1055 this.tasksQueueSize(workerNodeKey) > 0
1056 ) {
1057 this.executeTask(
1058 workerNodeKey,
1059 this.dequeueTask(workerNodeKey) as Task<Data>
1060 )
be0676b3 1061 }
6b272951 1062 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1063 }
be0676b3 1064 }
7c0ba920 1065
ff733df7 1066 private checkAndEmitEvents (): void {
1f68cede 1067 if (this.emitter != null) {
ff733df7 1068 if (this.busy) {
2845f2a5 1069 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1070 }
6b27d407 1071 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1072 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1073 }
164d950a
JB
1074 }
1075 }
1076
8a1260a3 1077 /**
aa9eede8 1078 * Gets the worker information given its worker node key.
8a1260a3
JB
1079 *
1080 * @param workerNodeKey - The worker node key.
3f09ed9f 1081 * @returns The worker information.
8a1260a3 1082 */
aa9eede8 1083 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1084 return this.workerNodes[workerNodeKey].info
e221309a
JB
1085 }
1086
a05c10de 1087 /**
b0a4db63 1088 * Adds the given worker in the pool worker nodes.
ea7a90d3 1089 *
38e795c1 1090 * @param worker - The worker.
aa9eede8
JB
1091 * @returns The added worker node key.
1092 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1093 */
b0a4db63 1094 private addWorkerNode (worker: Worker): number {
cc3ab78b 1095 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1096 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1097 if (this.starting) {
1098 workerNode.info.ready = true
1099 }
aa9eede8
JB
1100 this.workerNodes.push(workerNode)
1101 const workerNodeKey = this.getWorkerNodeKey(worker)
1102 if (workerNodeKey === -1) {
1103 throw new Error('Worker node not found')
1104 }
1105 return workerNodeKey
ea7a90d3 1106 }
c923ce56 1107
51fe3d3c 1108 /**
f06e48d8 1109 * Removes the given worker from the pool worker nodes.
51fe3d3c 1110 *
f06e48d8 1111 * @param worker - The worker.
51fe3d3c 1112 */
416fd65c 1113 private removeWorkerNode (worker: Worker): void {
f06e48d8 1114 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1115 if (workerNodeKey !== -1) {
1116 this.workerNodes.splice(workerNodeKey, 1)
1117 this.workerChoiceStrategyContext.remove(workerNodeKey)
1118 }
51fe3d3c 1119 }
adc3c320 1120
b0a4db63 1121 /**
aa9eede8 1122 * Executes the given task on the worker given its worker node key.
b0a4db63 1123 *
aa9eede8 1124 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1125 * @param task - The task to execute.
1126 */
2e81254d 1127 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1128 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1129 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1130 }
1131
f9f00b5f 1132 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1133 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1134 }
1135
416fd65c 1136 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1137 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1138 }
1139
416fd65c 1140 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1141 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1142 }
1143
416fd65c 1144 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1145 while (this.tasksQueueSize(workerNodeKey) > 0) {
1146 this.executeTask(
1147 workerNodeKey,
1148 this.dequeueTask(workerNodeKey) as Task<Data>
1149 )
ff733df7 1150 }
4b628b48 1151 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1152 }
1153
ef41a6e6
JB
1154 private flushTasksQueues (): void {
1155 for (const [workerNodeKey] of this.workerNodes.entries()) {
1156 this.flushTasksQueue(workerNodeKey)
1157 }
1158 }
c97c7edb 1159}