Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
15b176e0
JB
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
47352846
JB
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
47352846 157 this.started = false
075e51d1 158 this.starting = false
47352846
JB
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 224 if (isPlainObject(opts)) {
47352846 225 this.opts.startWorkers = opts.startWorkers ?? true
2324f8c9
JB
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
0d80593b
JB
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
8990357d
JB
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
1f68cede 238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 251 }
aee46736
JB
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
2324f8c9
JB
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
b529c323 261 throw new Error(
aee46736 262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
263 )
264 }
7c0ba920
JB
265 }
266
0d80593b
JB
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
2324f8c9
JB
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
0d80593b
JB
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
8990357d 278 if (
2324f8c9 279 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
281 ) {
282 throw new TypeError(
8c0b113f 283 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
284 )
285 }
286 if (
2324f8c9 287 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 288 workerChoiceStrategyOptions.retries < 0
8990357d
JB
289 ) {
290 throw new RangeError(
8c0b113f 291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
292 )
293 }
49be33fe 294 if (
2324f8c9 295 workerChoiceStrategyOptions?.weights != null &&
6b27d407 296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
f0d7f803 302 if (
2324f8c9 303 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
0d80593b
JB
312 }
313
a20f0ba5 314 private checkValidTasksQueueOptions (
76d91ea0 315 tasksQueueOptions: TasksQueueOptions
a20f0ba5 316 ): void {
0d80593b
JB
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
f0d7f803 320 if (
b7d085c4 321 tasksQueueOptions?.concurrency != null &&
2324f8c9 322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
f0d7f803
JB
323 ) {
324 throw new TypeError(
20c6f652 325 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
326 )
327 }
328 if (
b7d085c4 329 tasksQueueOptions?.concurrency != null &&
2324f8c9 330 tasksQueueOptions.concurrency <= 0
f0d7f803 331 ) {
e695d66f 332 throw new RangeError(
2324f8c9 333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
20c6f652
JB
334 )
335 }
20c6f652 336 if (
b7d085c4 337 tasksQueueOptions?.size != null &&
2324f8c9 338 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 339 ) {
ff3f866a 340 throw new TypeError(
68dbcdc0 341 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
342 )
343 }
2324f8c9 344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 345 throw new RangeError(
2324f8c9 346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
347 )
348 }
349 }
350
08f3f44c 351 /** @inheritDoc */
6b27d407
JB
352 public get info (): PoolInfo {
353 return {
23ccf9d7 354 version,
6b27d407 355 type: this.type,
184855e6 356 worker: this.worker,
47352846 357 started: this.started,
2431bdb4
JB
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
360 minSize: this.minSize,
361 maxSize: this.maxSize,
c05f0d50
JB
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
1305e9a8
JB
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
f59e1027 369 workerNode.usage.tasks.executing === 0
a4e07f72
JB
370 ? accumulator + 1
371 : accumulator,
6b27d407
JB
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
377 0
378 ),
a4e07f72 379 executedTasks: this.workerNodes.reduce(
6b27d407 380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
f59e1027 386 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
387 0
388 ),
daf86646
JB
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
a1763c54
JB
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
68cbdc84
JB
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
a4e07f72
JB
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
f59e1027 415 accumulator + workerNode.usage.tasks.failed,
a4e07f72 416 0
1dcf8b7b
JB
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
98e72cda 421 minimum: round(
90d6701c 422 min(
98e72cda 423 ...this.workerNodes.map(
041dc05b 424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 425 )
1dcf8b7b
JB
426 )
427 ),
98e72cda 428 maximum: round(
90d6701c 429 max(
98e72cda 430 ...this.workerNodes.map(
041dc05b 431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 432 )
1dcf8b7b 433 )
98e72cda 434 ),
3baa0837
JB
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
98e72cda 444 )
dc021bcc 445 )
3baa0837 446 }),
98e72cda
JB
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
3baa0837
JB
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
98e72cda
JB
455 )
456 )
457 )
458 })
1dcf8b7b
JB
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
98e72cda 464 minimum: round(
90d6701c 465 min(
98e72cda 466 ...this.workerNodes.map(
041dc05b 467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 468 )
1dcf8b7b
JB
469 )
470 ),
98e72cda 471 maximum: round(
90d6701c 472 max(
98e72cda 473 ...this.workerNodes.map(
041dc05b 474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 475 )
1dcf8b7b 476 )
98e72cda 477 ),
3baa0837
JB
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
98e72cda 487 )
dc021bcc 488 )
3baa0837 489 }),
98e72cda
JB
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
3baa0837
JB
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
98e72cda
JB
498 )
499 )
500 )
501 })
1dcf8b7b
JB
502 }
503 })
6b27d407
JB
504 }
505 }
08f3f44c 506
aa9eede8
JB
507 /**
508 * The pool readiness boolean status.
509 */
2431bdb4
JB
510 private get ready (): boolean {
511 return (
b97d82d8
JB
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
2431bdb4
JB
519 )
520 }
521
afe0d5bf 522 /**
aa9eede8 523 * The approximate pool utilization.
afe0d5bf
JB
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
8e5ca040 528 const poolTimeCapacity =
fe7d90db 529 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
71514351 537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
538 0
539 )
8e5ca040 540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
541 }
542
8881ae32 543 /**
aa9eede8 544 * The pool type.
8881ae32
JB
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
184855e6 550 /**
aa9eede8 551 * The worker type.
184855e6
JB
552 */
553 protected abstract get worker (): WorkerType
554
c2ade475 555 /**
aa9eede8 556 * The pool minimum size.
c2ade475 557 */
8735b4e5
JB
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
ff733df7
JB
561
562 /**
aa9eede8 563 * The pool maximum size.
ff733df7 564 */
8735b4e5
JB
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
a35560ba 568
6b813701
JB
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
21f710aa 575 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
21f710aa 579 message.workerId != null &&
aad6fb64 580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
ffcbbad8 588 /**
f06e48d8 589 * Gets the given worker its worker node key.
ffcbbad8
JB
590 *
591 * @param worker - The worker.
f59e1027 592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 593 */
aad6fb64 594 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 595 return this.workerNodes.findIndex(
041dc05b 596 workerNode => workerNode.worker === worker
f06e48d8 597 )
bf9549ae
JB
598 }
599
aa9eede8
JB
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
aad6fb64 604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 605 */
72ae84a2 606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 607 return this.workerNodes.findIndex(
041dc05b 608 workerNode => workerNode.info.id === workerId
aad6fb64 609 )
aa9eede8
JB
610 }
611
afc003b2 612 /** @inheritDoc */
a35560ba 613 public setWorkerChoiceStrategy (
59219cbb
JB
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 616 ): void {
aee46736 617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 618 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
aa9eede8 625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 626 workerNode.resetUsage()
9edb9717 627 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 628 }
a20f0ba5
JB
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
0d80593b 635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
a20f0ba5
JB
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
a35560ba
S
642 )
643 }
644
a20f0ba5 645 /** @inheritDoc */
8f52842f
JB
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
a20f0ba5 650 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
ef41a6e6 653 this.flushTasksQueues()
a20f0ba5
JB
654 }
655 this.opts.enableTasksQueue = enable
8f52842f 656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
657 }
658
659 /** @inheritDoc */
8f52842f 660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 661 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
5baee0d7 676 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
9b38ab2d
JB
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
5b49e864 695 private setTasksQueueSize (size: number): void {
20c6f652 696 for (const workerNode of this.workerNodes) {
ff3f866a 697 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
698 }
699 }
700
d6ca1416
JB
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
c319c66b
JB
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
dea903a8
JB
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
c2ade475 735
c319c66b
JB
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
7c0ba920 742
6c6afb84 743 /**
3d76750a 744 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
c2ade475 748 protected internalBusy (): boolean {
3d76750a
JB
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
041dc05b 752 workerNode =>
3d76750a
JB
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
3d76750a 758 }
419e8121
JB
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
cb70b19d
JB
765 }
766
e81c38f2 767 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
72ae84a2 771 return await new Promise<boolean>((resolve, reject) => {
adee6053 772 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2
JB
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
785 `Task function operation ${
786 message.taskFunctionOperation as string
787 } failed on worker ${message.workerId}`
788 )
789 )
790 }
791 })
792 this.sendToWorker(workerNodeKey, message)
793 })
794 }
795
796 private async sendTaskFunctionOperationToWorkers (
adee6053 797 message: MessageValue<Data>
e81c38f2
JB
798 ): Promise<boolean> {
799 return await new Promise<boolean>((resolve, reject) => {
800 const responsesReceived = new Array<MessageValue<Data | Response>>()
801 for (const [workerNodeKey] of this.workerNodes.entries()) {
802 this.registerWorkerMessageListener(workerNodeKey, message => {
803 if (message.taskFunctionOperationStatus != null) {
804 responsesReceived.push(message)
805 if (
806 responsesReceived.length === this.workerNodes.length &&
807 responsesReceived.every(
808 message => message.taskFunctionOperationStatus === true
809 )
810 ) {
811 resolve(true)
812 } else if (
813 responsesReceived.length === this.workerNodes.length &&
814 responsesReceived.some(
815 message => message.taskFunctionOperationStatus === false
816 )
817 ) {
818 reject(
819 new Error(
820 `Task function operation ${
821 message.taskFunctionOperation as string
72ae84a2 822 } failed on worker ${message.workerId as number}`
e81c38f2
JB
823 )
824 )
825 }
826 }
827 })
72ae84a2 828 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
829 }
830 })
6703b9f4
JB
831 }
832
833 /** @inheritDoc */
834 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
835 for (const workerNode of this.workerNodes) {
836 if (
837 Array.isArray(workerNode.info.taskFunctionNames) &&
838 workerNode.info.taskFunctionNames.includes(name)
839 ) {
840 return true
841 }
842 }
843 return false
6703b9f4
JB
844 }
845
846 /** @inheritDoc */
e81c38f2
JB
847 public async addTaskFunction (
848 name: string,
3feeab69 849 fn: TaskFunction<Data, Response>
e81c38f2 850 ): Promise<boolean> {
3feeab69
JB
851 if (typeof name !== 'string') {
852 throw new TypeError('name argument must be a string')
853 }
854 if (typeof name === 'string' && name.trim().length === 0) {
855 throw new TypeError('name argument must not be an empty string')
856 }
857 if (typeof fn !== 'function') {
858 throw new TypeError('fn argument must be a function')
859 }
adee6053 860 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
861 taskFunctionOperation: 'add',
862 taskFunctionName: name,
3feeab69 863 taskFunction: fn.toString()
6703b9f4 864 })
adee6053
JB
865 this.taskFunctions.set(name, fn)
866 return opResult
6703b9f4
JB
867 }
868
869 /** @inheritDoc */
e81c38f2 870 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
871 if (!this.taskFunctions.has(name)) {
872 throw new Error(
16248b23 873 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
874 )
875 }
adee6053 876 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
877 taskFunctionOperation: 'remove',
878 taskFunctionName: name
879 })
adee6053
JB
880 this.deleteTaskFunctionWorkerUsages(name)
881 this.taskFunctions.delete(name)
882 return opResult
6703b9f4
JB
883 }
884
90d7d101 885 /** @inheritDoc */
6703b9f4 886 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
887 for (const workerNode of this.workerNodes) {
888 if (
6703b9f4
JB
889 Array.isArray(workerNode.info.taskFunctionNames) &&
890 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 891 ) {
6703b9f4 892 return workerNode.info.taskFunctionNames
f2dbbf95 893 }
90d7d101 894 }
f2dbbf95 895 return []
90d7d101
JB
896 }
897
6703b9f4 898 /** @inheritDoc */
e81c38f2 899 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 900 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
901 taskFunctionOperation: 'default',
902 taskFunctionName: name
903 })
6703b9f4
JB
904 }
905
adee6053
JB
906 private deleteTaskFunctionWorkerUsages (name: string): void {
907 for (const workerNode of this.workerNodes) {
908 workerNode.deleteTaskFunctionWorkerUsage(name)
909 }
910 }
911
375f7504
JB
912 private shallExecuteTask (workerNodeKey: number): boolean {
913 return (
914 this.tasksQueueSize(workerNodeKey) === 0 &&
915 this.workerNodes[workerNodeKey].usage.tasks.executing <
916 (this.opts.tasksQueueOptions?.concurrency as number)
917 )
918 }
919
afc003b2 920 /** @inheritDoc */
7d91a8cd
JB
921 public async execute (
922 data?: Data,
923 name?: string,
924 transferList?: TransferListItem[]
925 ): Promise<Response> {
52b71763 926 return await new Promise<Response>((resolve, reject) => {
15b176e0 927 if (!this.started) {
47352846 928 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 929 return
15b176e0 930 }
7d91a8cd
JB
931 if (name != null && typeof name !== 'string') {
932 reject(new TypeError('name argument must be a string'))
9d2d0da1 933 return
7d91a8cd 934 }
90d7d101
JB
935 if (
936 name != null &&
937 typeof name === 'string' &&
938 name.trim().length === 0
939 ) {
f58b60b9 940 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 941 return
90d7d101 942 }
b558f6b5
JB
943 if (transferList != null && !Array.isArray(transferList)) {
944 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 945 return
b558f6b5
JB
946 }
947 const timestamp = performance.now()
948 const workerNodeKey = this.chooseWorkerNode()
501aea93 949 const task: Task<Data> = {
52b71763
JB
950 name: name ?? DEFAULT_TASK_NAME,
951 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
952 data: data ?? ({} as Data),
7d91a8cd 953 transferList,
52b71763 954 timestamp,
7629bdf1 955 taskId: randomUUID()
52b71763 956 }
7629bdf1 957 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
958 resolve,
959 reject,
501aea93 960 workerNodeKey
2e81254d 961 })
52b71763 962 if (
4e377863
JB
963 this.opts.enableTasksQueue === false ||
964 (this.opts.enableTasksQueue === true &&
375f7504 965 this.shallExecuteTask(workerNodeKey))
52b71763 966 ) {
501aea93 967 this.executeTask(workerNodeKey, task)
4e377863
JB
968 } else {
969 this.enqueueTask(workerNodeKey, task)
52b71763 970 }
2e81254d 971 })
280c2a77 972 }
c97c7edb 973
47352846
JB
974 /** @inheritdoc */
975 public start (): void {
976 this.starting = true
977 while (
978 this.workerNodes.reduce(
979 (accumulator, workerNode) =>
980 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
981 0
982 ) < this.numberOfWorkers
983 ) {
984 this.createAndSetupWorkerNode()
985 }
986 this.starting = false
987 this.started = true
988 }
989
afc003b2 990 /** @inheritDoc */
c97c7edb 991 public async destroy (): Promise<void> {
1fbcaa7c 992 await Promise.all(
81c02522 993 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 994 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
995 })
996 )
33e6bb4c 997 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 998 this.started = false
c97c7edb
S
999 }
1000
1e3214b6 1001 protected async sendKillMessageToWorker (
72ae84a2 1002 workerNodeKey: number
1e3214b6 1003 ): Promise<void> {
9edb9717 1004 await new Promise<void>((resolve, reject) => {
041dc05b 1005 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
1006 if (message.kill === 'success') {
1007 resolve()
1008 } else if (message.kill === 'failure') {
72ae84a2
JB
1009 reject(
1010 new Error(
1011 `Worker ${
1012 message.workerId as number
1013 } kill message handling failed`
1014 )
1015 )
1e3214b6
JB
1016 }
1017 })
72ae84a2 1018 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1019 })
1e3214b6
JB
1020 }
1021
4a6952ff 1022 /**
aa9eede8 1023 * Terminates the worker node given its worker node key.
4a6952ff 1024 *
aa9eede8 1025 * @param workerNodeKey - The worker node key.
4a6952ff 1026 */
81c02522 1027 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1028
729c563d 1029 /**
6677a3d3
JB
1030 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1031 * Can be overridden.
afc003b2
JB
1032 *
1033 * @virtual
729c563d 1034 */
280c2a77 1035 protected setupHook (): void {
965df41c 1036 /* Intentionally empty */
280c2a77 1037 }
c97c7edb 1038
729c563d 1039 /**
280c2a77
S
1040 * Should return whether the worker is the main worker or not.
1041 */
1042 protected abstract isMain (): boolean
1043
1044 /**
2e81254d 1045 * Hook executed before the worker task execution.
bf9549ae 1046 * Can be overridden.
729c563d 1047 *
f06e48d8 1048 * @param workerNodeKey - The worker node key.
1c6fe997 1049 * @param task - The task to execute.
729c563d 1050 */
1c6fe997
JB
1051 protected beforeTaskExecutionHook (
1052 workerNodeKey: number,
1053 task: Task<Data>
1054 ): void {
94407def
JB
1055 if (this.workerNodes[workerNodeKey]?.usage != null) {
1056 const workerUsage = this.workerNodes[workerNodeKey].usage
1057 ++workerUsage.tasks.executing
1058 this.updateWaitTimeWorkerUsage(workerUsage, task)
1059 }
1060 if (
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1062 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1063 task.name as string
1064 ) != null
1065 ) {
db0e38ee 1066 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1067 workerNodeKey
db0e38ee 1068 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1069 ++taskFunctionWorkerUsage.tasks.executing
1070 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1071 }
c97c7edb
S
1072 }
1073
c01733f1 1074 /**
2e81254d 1075 * Hook executed after the worker task execution.
bf9549ae 1076 * Can be overridden.
c01733f1 1077 *
501aea93 1078 * @param workerNodeKey - The worker node key.
38e795c1 1079 * @param message - The received message.
c01733f1 1080 */
2e81254d 1081 protected afterTaskExecutionHook (
501aea93 1082 workerNodeKey: number,
2740a743 1083 message: MessageValue<Response>
bf9549ae 1084 ): void {
94407def
JB
1085 if (this.workerNodes[workerNodeKey]?.usage != null) {
1086 const workerUsage = this.workerNodes[workerNodeKey].usage
1087 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1088 this.updateRunTimeWorkerUsage(workerUsage, message)
1089 this.updateEluWorkerUsage(workerUsage, message)
1090 }
1091 if (
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1093 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1094 message.taskPerformance?.name as string
94407def
JB
1095 ) != null
1096 ) {
db0e38ee 1097 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1098 workerNodeKey
db0e38ee 1099 ].getTaskFunctionWorkerUsage(
0628755c 1100 message.taskPerformance?.name as string
b558f6b5 1101 ) as WorkerUsage
db0e38ee
JB
1102 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1103 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1104 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1105 }
1106 }
1107
db0e38ee
JB
1108 /**
1109 * Whether the worker node shall update its task function worker usage or not.
1110 *
1111 * @param workerNodeKey - The worker node key.
1112 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1113 */
1114 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1115 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1116 return (
94407def 1117 workerInfo != null &&
6703b9f4
JB
1118 Array.isArray(workerInfo.taskFunctionNames) &&
1119 workerInfo.taskFunctionNames.length > 2
b558f6b5 1120 )
f1c06930
JB
1121 }
1122
1123 private updateTaskStatisticsWorkerUsage (
1124 workerUsage: WorkerUsage,
1125 message: MessageValue<Response>
1126 ): void {
a4e07f72 1127 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1128 if (
1129 workerTaskStatistics.executing != null &&
1130 workerTaskStatistics.executing > 0
1131 ) {
1132 --workerTaskStatistics.executing
5bb5be17 1133 }
6703b9f4 1134 if (message.workerError == null) {
98e72cda
JB
1135 ++workerTaskStatistics.executed
1136 } else {
a4e07f72 1137 ++workerTaskStatistics.failed
2740a743 1138 }
f8eb0a2a
JB
1139 }
1140
a4e07f72
JB
1141 private updateRunTimeWorkerUsage (
1142 workerUsage: WorkerUsage,
f8eb0a2a
JB
1143 message: MessageValue<Response>
1144 ): void {
6703b9f4 1145 if (message.workerError != null) {
dc021bcc
JB
1146 return
1147 }
e4f20deb
JB
1148 updateMeasurementStatistics(
1149 workerUsage.runTime,
1150 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1151 message.taskPerformance?.runTime ?? 0
e4f20deb 1152 )
f8eb0a2a
JB
1153 }
1154
a4e07f72
JB
1155 private updateWaitTimeWorkerUsage (
1156 workerUsage: WorkerUsage,
1c6fe997 1157 task: Task<Data>
f8eb0a2a 1158 ): void {
1c6fe997
JB
1159 const timestamp = performance.now()
1160 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1161 updateMeasurementStatistics(
1162 workerUsage.waitTime,
1163 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1164 taskWaitTime
e4f20deb 1165 )
c01733f1 1166 }
1167
a4e07f72 1168 private updateEluWorkerUsage (
5df69fab 1169 workerUsage: WorkerUsage,
62c15a68
JB
1170 message: MessageValue<Response>
1171 ): void {
6703b9f4 1172 if (message.workerError != null) {
dc021bcc
JB
1173 return
1174 }
008512c7
JB
1175 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1176 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1177 updateMeasurementStatistics(
1178 workerUsage.elu.active,
008512c7 1179 eluTaskStatisticsRequirements,
dc021bcc 1180 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1181 )
1182 updateMeasurementStatistics(
1183 workerUsage.elu.idle,
008512c7 1184 eluTaskStatisticsRequirements,
dc021bcc 1185 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1186 )
008512c7 1187 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1188 if (message.taskPerformance?.elu != null) {
f7510105
JB
1189 if (workerUsage.elu.utilization != null) {
1190 workerUsage.elu.utilization =
1191 (workerUsage.elu.utilization +
1192 message.taskPerformance.elu.utilization) /
1193 2
1194 } else {
1195 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1196 }
62c15a68
JB
1197 }
1198 }
1199 }
1200
280c2a77 1201 /**
f06e48d8 1202 * Chooses a worker node for the next task.
280c2a77 1203 *
6c6afb84 1204 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1205 *
aa9eede8 1206 * @returns The chosen worker node key
280c2a77 1207 */
6c6afb84 1208 private chooseWorkerNode (): number {
930dcf12 1209 if (this.shallCreateDynamicWorker()) {
aa9eede8 1210 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1211 if (
b1aae695 1212 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1213 ) {
aa9eede8 1214 return workerNodeKey
6c6afb84 1215 }
17393ac8 1216 }
930dcf12
JB
1217 return this.workerChoiceStrategyContext.execute()
1218 }
1219
6c6afb84
JB
1220 /**
1221 * Conditions for dynamic worker creation.
1222 *
1223 * @returns Whether to create a dynamic worker or not.
1224 */
1225 private shallCreateDynamicWorker (): boolean {
930dcf12 1226 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1227 }
1228
280c2a77 1229 /**
aa9eede8 1230 * Sends a message to worker given its worker node key.
280c2a77 1231 *
aa9eede8 1232 * @param workerNodeKey - The worker node key.
38e795c1 1233 * @param message - The message.
7d91a8cd 1234 * @param transferList - The optional array of transferable objects.
280c2a77
S
1235 */
1236 protected abstract sendToWorker (
aa9eede8 1237 workerNodeKey: number,
7d91a8cd
JB
1238 message: MessageValue<Data>,
1239 transferList?: TransferListItem[]
280c2a77
S
1240 ): void
1241
729c563d 1242 /**
41344292 1243 * Creates a new worker.
6c6afb84
JB
1244 *
1245 * @returns Newly created worker.
729c563d 1246 */
280c2a77 1247 protected abstract createWorker (): Worker
c97c7edb 1248
4a6952ff 1249 /**
aa9eede8 1250 * Creates a new, completely set up worker node.
4a6952ff 1251 *
aa9eede8 1252 * @returns New, completely set up worker node key.
4a6952ff 1253 */
aa9eede8 1254 protected createAndSetupWorkerNode (): number {
bdacc2d2 1255 const worker = this.createWorker()
280c2a77 1256
fd04474e 1257 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1258 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1259 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1260 worker.on('error', error => {
aad6fb64 1261 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1262 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1263 workerInfo.ready = false
0dc838e3 1264 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1265 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1266 if (
b6bfca01 1267 this.started &&
9b38ab2d
JB
1268 !this.starting &&
1269 this.opts.restartWorkerOnError === true
15b176e0 1270 ) {
9b106837 1271 if (workerInfo.dynamic) {
aa9eede8 1272 this.createAndSetupDynamicWorkerNode()
8a1260a3 1273 } else {
aa9eede8 1274 this.createAndSetupWorkerNode()
8a1260a3 1275 }
5baee0d7 1276 }
9b38ab2d 1277 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1278 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1279 }
5baee0d7 1280 })
a35560ba 1281 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1282 worker.once('exit', () => {
f06e48d8 1283 this.removeWorkerNode(worker)
a974afa6 1284 })
280c2a77 1285
aa9eede8 1286 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1287
aa9eede8 1288 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1289
aa9eede8 1290 return workerNodeKey
c97c7edb 1291 }
be0676b3 1292
930dcf12 1293 /**
aa9eede8 1294 * Creates a new, completely set up dynamic worker node.
930dcf12 1295 *
aa9eede8 1296 * @returns New, completely set up dynamic worker node key.
930dcf12 1297 */
aa9eede8
JB
1298 protected createAndSetupDynamicWorkerNode (): number {
1299 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1300 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1301 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1302 message.workerId
aad6fb64 1303 )
aa9eede8 1304 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1305 // Kill message received from worker
930dcf12
JB
1306 if (
1307 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1308 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1309 ((this.opts.enableTasksQueue === false &&
aa9eede8 1310 workerUsage.tasks.executing === 0) ||
7b56f532 1311 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1312 workerUsage.tasks.executing === 0 &&
1313 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1314 ) {
041dc05b 1315 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1316 this.emitter?.emit(PoolEvents.error, error)
1317 })
930dcf12
JB
1318 }
1319 })
46b0bb09 1320 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1321 this.sendToWorker(workerNodeKey, {
e9dd5b66 1322 checkActive: true
21f710aa 1323 })
72ae84a2
JB
1324 if (this.taskFunctions.size > 0) {
1325 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1326 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1327 taskFunctionOperation: 'add',
1328 taskFunctionName,
1329 taskFunction: taskFunction.toString()
1330 }).catch(error => {
1331 this.emitter?.emit(PoolEvents.error, error)
1332 })
1333 }
1334 }
b5e113f6 1335 workerInfo.dynamic = true
b1aae695
JB
1336 if (
1337 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1338 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1339 ) {
b5e113f6
JB
1340 workerInfo.ready = true
1341 }
33e6bb4c 1342 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1343 return workerNodeKey
930dcf12
JB
1344 }
1345
a2ed5053 1346 /**
aa9eede8 1347 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1348 *
aa9eede8 1349 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1350 * @param listener - The message listener callback.
1351 */
85aeb3f3
JB
1352 protected abstract registerWorkerMessageListener<
1353 Message extends Data | Response
aa9eede8
JB
1354 >(
1355 workerNodeKey: number,
1356 listener: (message: MessageValue<Message>) => void
1357 ): void
a2ed5053
JB
1358
1359 /**
aa9eede8 1360 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1361 * Can be overridden.
1362 *
aa9eede8 1363 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1364 */
aa9eede8 1365 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1366 // Listen to worker messages.
aa9eede8 1367 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1368 // Send the startup message to worker.
aa9eede8 1369 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1370 // Send the statistics message to worker.
1371 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1372 if (this.opts.enableTasksQueue === true) {
dbd73092 1373 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1374 this.workerNodes[workerNodeKey].onEmptyQueue =
1375 this.taskStealingOnEmptyQueue.bind(this)
1376 }
1377 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1378 this.workerNodes[workerNodeKey].onBackPressure =
1379 this.tasksStealingOnBackPressure.bind(this)
1380 }
72695f86 1381 }
d2c73f82
JB
1382 }
1383
85aeb3f3 1384 /**
aa9eede8
JB
1385 * Sends the startup message to worker given its worker node key.
1386 *
1387 * @param workerNodeKey - The worker node key.
1388 */
1389 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1390
1391 /**
9edb9717 1392 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1393 *
aa9eede8 1394 * @param workerNodeKey - The worker node key.
85aeb3f3 1395 */
9edb9717 1396 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1397 this.sendToWorker(workerNodeKey, {
1398 statistics: {
1399 runTime:
1400 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1401 .runTime.aggregate,
1402 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1403 .elu.aggregate
72ae84a2 1404 }
aa9eede8
JB
1405 })
1406 }
a2ed5053
JB
1407
1408 private redistributeQueuedTasks (workerNodeKey: number): void {
1409 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1410 const destinationWorkerNodeKey = this.workerNodes.reduce(
1411 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1412 return workerNode.info.ready &&
1413 workerNode.usage.tasks.queued <
1414 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1415 ? workerNodeKey
1416 : minWorkerNodeKey
1417 },
1418 0
1419 )
72ae84a2 1420 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1421 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1422 this.executeTask(destinationWorkerNodeKey, task)
1423 } else {
1424 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1425 }
1426 }
1427 }
1428
b1838604
JB
1429 private updateTaskStolenStatisticsWorkerUsage (
1430 workerNodeKey: number,
b1838604
JB
1431 taskName: string
1432 ): void {
1a880eca 1433 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1434 if (workerNode?.usage != null) {
1435 ++workerNode.usage.tasks.stolen
1436 }
1437 if (
1438 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1439 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1440 ) {
1441 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1442 taskName
1443 ) as WorkerUsage
1444 ++taskFunctionWorkerUsage.tasks.stolen
1445 }
1446 }
1447
dd951876 1448 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1449 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1450 const workerNodes = this.workerNodes
a6b3272b 1451 .slice()
dd951876
JB
1452 .sort(
1453 (workerNodeA, workerNodeB) =>
1454 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1455 )
f201a0cd 1456 const sourceWorkerNode = workerNodes.find(
041dc05b 1457 workerNode =>
f201a0cd
JB
1458 workerNode.info.ready &&
1459 workerNode.info.id !== workerId &&
1460 workerNode.usage.tasks.queued > 0
1461 )
1462 if (sourceWorkerNode != null) {
72ae84a2 1463 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1464 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1465 this.executeTask(destinationWorkerNodeKey, task)
1466 } else {
1467 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1468 }
f201a0cd
JB
1469 this.updateTaskStolenStatisticsWorkerUsage(
1470 destinationWorkerNodeKey,
1471 task.name as string
1472 )
72695f86
JB
1473 }
1474 }
1475
1476 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1477 const sizeOffset = 1
1478 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1479 return
1480 }
72695f86
JB
1481 const sourceWorkerNode =
1482 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1483 const workerNodes = this.workerNodes
a6b3272b 1484 .slice()
72695f86
JB
1485 .sort(
1486 (workerNodeA, workerNodeB) =>
1487 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1488 )
1489 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1490 if (
0bc68267 1491 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1492 workerNode.info.ready &&
1493 workerNode.info.id !== workerId &&
0bc68267 1494 workerNode.usage.tasks.queued <
f778c355 1495 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1496 ) {
72ae84a2 1497 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1498 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1499 this.executeTask(workerNodeKey, task)
4de3d785 1500 } else {
dd951876 1501 this.enqueueTask(workerNodeKey, task)
4de3d785 1502 }
b1838604
JB
1503 this.updateTaskStolenStatisticsWorkerUsage(
1504 workerNodeKey,
b1838604
JB
1505 task.name as string
1506 )
10ecf8fd 1507 }
a2ed5053
JB
1508 }
1509 }
1510
be0676b3 1511 /**
aa9eede8 1512 * This method is the listener registered for each worker message.
be0676b3 1513 *
bdacc2d2 1514 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1515 */
1516 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1517 return message => {
21f710aa 1518 this.checkMessageWorkerId(message)
6703b9f4 1519 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1520 // Worker ready response received from worker
10e2aa7e 1521 this.handleWorkerReadyResponse(message)
7629bdf1 1522 } else if (message.taskId != null) {
81c02522 1523 // Task execution response received from worker
6b272951 1524 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1525 } else if (message.taskFunctionNames != null) {
1526 // Task function names message received from worker
46b0bb09
JB
1527 this.getWorkerInfo(
1528 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1529 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1530 }
1531 }
1532 }
1533
10e2aa7e 1534 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1535 if (message.ready === false) {
72ae84a2
JB
1536 throw new Error(
1537 `Worker ${message.workerId as number} failed to initialize`
1538 )
f05ed93c 1539 }
a5d15204 1540 const workerInfo = this.getWorkerInfo(
aad6fb64 1541 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1542 )
a5d15204 1543 workerInfo.ready = message.ready as boolean
6703b9f4 1544 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1545 if (this.ready) {
1546 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1547 }
6b272951
JB
1548 }
1549
1550 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1551 const { taskId, workerError, data } = message
5441aea6 1552 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1553 if (promiseResponse != null) {
6703b9f4
JB
1554 if (workerError != null) {
1555 this.emitter?.emit(PoolEvents.taskError, workerError)
1556 promiseResponse.reject(workerError.message)
6b272951 1557 } else {
5441aea6 1558 promiseResponse.resolve(data as Response)
6b272951 1559 }
501aea93
JB
1560 const workerNodeKey = promiseResponse.workerNodeKey
1561 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1562 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1563 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1564 if (
1565 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1566 this.tasksQueueSize(workerNodeKey) > 0 &&
1567 this.workerNodes[workerNodeKey].usage.tasks.executing <
1568 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1569 ) {
1570 this.executeTask(
1571 workerNodeKey,
1572 this.dequeueTask(workerNodeKey) as Task<Data>
1573 )
be0676b3
APA
1574 }
1575 }
be0676b3 1576 }
7c0ba920 1577
a1763c54 1578 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1579 if (this.busy) {
1580 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1581 }
1582 }
1583
1584 private checkAndEmitTaskQueuingEvents (): void {
1585 if (this.hasBackPressure()) {
1586 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1587 }
1588 }
1589
33e6bb4c
JB
1590 private checkAndEmitDynamicWorkerCreationEvents (): void {
1591 if (this.type === PoolTypes.dynamic) {
1592 if (this.full) {
1593 this.emitter?.emit(PoolEvents.full, this.info)
1594 }
1595 }
1596 }
1597
8a1260a3 1598 /**
aa9eede8 1599 * Gets the worker information given its worker node key.
8a1260a3
JB
1600 *
1601 * @param workerNodeKey - The worker node key.
3f09ed9f 1602 * @returns The worker information.
8a1260a3 1603 */
46b0bb09
JB
1604 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1605 return this.workerNodes[workerNodeKey].info
e221309a
JB
1606 }
1607
a05c10de 1608 /**
b0a4db63 1609 * Adds the given worker in the pool worker nodes.
ea7a90d3 1610 *
38e795c1 1611 * @param worker - The worker.
aa9eede8
JB
1612 * @returns The added worker node key.
1613 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1614 */
b0a4db63 1615 private addWorkerNode (worker: Worker): number {
671d5154
JB
1616 const workerNode = new WorkerNode<Worker, Data>(
1617 worker,
ff3f866a 1618 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1619 )
b97d82d8 1620 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1621 if (this.starting) {
1622 workerNode.info.ready = true
1623 }
aa9eede8 1624 this.workerNodes.push(workerNode)
aad6fb64 1625 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1626 if (workerNodeKey === -1) {
86ed0598 1627 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1628 }
1629 return workerNodeKey
ea7a90d3 1630 }
c923ce56 1631
51fe3d3c 1632 /**
f06e48d8 1633 * Removes the given worker from the pool worker nodes.
51fe3d3c 1634 *
f06e48d8 1635 * @param worker - The worker.
51fe3d3c 1636 */
416fd65c 1637 private removeWorkerNode (worker: Worker): void {
aad6fb64 1638 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1639 if (workerNodeKey !== -1) {
1640 this.workerNodes.splice(workerNodeKey, 1)
1641 this.workerChoiceStrategyContext.remove(workerNodeKey)
1642 }
51fe3d3c 1643 }
adc3c320 1644
e2b31e32
JB
1645 /** @inheritDoc */
1646 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1647 return (
e2b31e32
JB
1648 this.opts.enableTasksQueue === true &&
1649 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1650 )
1651 }
1652
1653 private hasBackPressure (): boolean {
1654 return (
1655 this.opts.enableTasksQueue === true &&
1656 this.workerNodes.findIndex(
041dc05b 1657 workerNode => !workerNode.hasBackPressure()
a1763c54 1658 ) === -1
9e844245 1659 )
e2b31e32
JB
1660 }
1661
b0a4db63 1662 /**
aa9eede8 1663 * Executes the given task on the worker given its worker node key.
b0a4db63 1664 *
aa9eede8 1665 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1666 * @param task - The task to execute.
1667 */
2e81254d 1668 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1669 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1670 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1671 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1672 }
1673
f9f00b5f 1674 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1675 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1676 this.checkAndEmitTaskQueuingEvents()
1677 return tasksQueueSize
adc3c320
JB
1678 }
1679
416fd65c 1680 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1681 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1682 }
1683
416fd65c 1684 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1685 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1686 }
1687
81c02522 1688 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1689 while (this.tasksQueueSize(workerNodeKey) > 0) {
1690 this.executeTask(
1691 workerNodeKey,
1692 this.dequeueTask(workerNodeKey) as Task<Data>
1693 )
ff733df7 1694 }
4b628b48 1695 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1696 }
1697
ef41a6e6
JB
1698 private flushTasksQueues (): void {
1699 for (const [workerNodeKey] of this.workerNodes.entries()) {
1700 this.flushTasksQueue(workerNodeKey)
1701 }
1702 }
c97c7edb 1703}