build(deps-dev): apply updates
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
d91689fd 20 once,
bfc75cca 21 round
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
4d159167
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
9f95d5eb 38 WorkerNodeEventDetail,
4d159167
JB
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
bde6b5d7
JB
52import {
53 checkFilePath,
54 checkValidTasksQueueOptions,
bfc75cca
JB
55 checkValidWorkerChoiceStrategy,
56 updateMeasurementStatistics
bde6b5d7 57} from './utils'
23ccf9d7 58
729c563d 59/**
ea7a90d3 60 * Base class that implements some shared logic for all poolifier pools.
729c563d 61 *
38e795c1 62 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
63 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
64 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 65 */
c97c7edb 66export abstract class AbstractPool<
f06e48d8 67 Worker extends IWorker,
d3c8a1a8
S
68 Data = unknown,
69 Response = unknown
c4855468 70> implements IPool<Worker, Data, Response> {
afc003b2 71 /** @inheritDoc */
4b628b48 72 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 73
afc003b2 74 /** @inheritDoc */
f80125ca 75 public emitter?: EventEmitterAsyncResource
7c0ba920 76
fcd39179
JB
77 /**
78 * Dynamic pool maximum size property placeholder.
79 */
80 protected readonly max?: number
81
be0676b3 82 /**
419e8121 83 * The task execution response promise map:
2740a743 84 * - `key`: The message id of each submitted task.
a3445496 85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 86 *
a3445496 87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 88 */
501aea93
JB
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 91
a35560ba 92 /**
51fe3d3c 93 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
94 */
95 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
96 Worker,
97 Data,
98 Response
a35560ba
S
99 >
100
72ae84a2
JB
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
15b176e0
JB
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
47352846
JB
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
afe0d5bf
JB
116 /**
117 * The start timestamp of the pool.
118 */
119 private readonly startTimestamp
120
729c563d
S
121 /**
122 * Constructs a new poolifier pool.
123 *
38e795c1 124 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 125 * @param filePath - Path to the worker file.
38e795c1 126 * @param opts - Options for the pool.
729c563d 127 */
c97c7edb 128 public constructor (
b4213b7f
JB
129 protected readonly numberOfWorkers: number,
130 protected readonly filePath: string,
131 protected readonly opts: PoolOptions<Worker>
c97c7edb 132 ) {
78cea37e 133 if (!this.isMain()) {
04f45163 134 throw new Error(
8c6d4acf 135 'Cannot start a pool from a worker with the same type as the pool'
04f45163 136 )
c97c7edb 137 }
bde6b5d7 138 checkFilePath(this.filePath)
8003c026 139 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 140 this.checkPoolOptions(this.opts)
1086026a 141
7254e419
JB
142 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
143 this.executeTask = this.executeTask.bind(this)
144 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 145
6bd72cd0 146 if (this.opts.enableEvents === true) {
b5604034 147 this.initializeEventEmitter()
7c0ba920 148 }
d59df138
JB
149 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
150 Worker,
151 Data,
152 Response
da309861
JB
153 >(
154 this,
155 this.opts.workerChoiceStrategy,
156 this.opts.workerChoiceStrategyOptions
157 )
b6b32453
JB
158
159 this.setupHook()
160
72ae84a2
JB
161 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
162
47352846 163 this.started = false
075e51d1 164 this.starting = false
47352846
JB
165 if (this.opts.startWorkers === true) {
166 this.start()
167 }
afe0d5bf
JB
168
169 this.startTimestamp = performance.now()
c97c7edb
S
170 }
171
8d3782fa
JB
172 private checkNumberOfWorkers (numberOfWorkers: number): void {
173 if (numberOfWorkers == null) {
174 throw new Error(
175 'Cannot instantiate a pool without specifying the number of workers'
176 )
78cea37e 177 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 178 throw new TypeError(
0d80593b 179 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
180 )
181 } else if (numberOfWorkers < 0) {
473c717a 182 throw new RangeError(
8d3782fa
JB
183 'Cannot instantiate a pool with a negative number of workers'
184 )
6b27d407 185 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
186 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
187 }
188 }
189
7c0ba920 190 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 191 if (isPlainObject(opts)) {
47352846 192 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 193 checkValidWorkerChoiceStrategy(
2324f8c9
JB
194 opts.workerChoiceStrategy as WorkerChoiceStrategy
195 )
0d80593b
JB
196 this.opts.workerChoiceStrategy =
197 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
198 this.checkValidWorkerChoiceStrategyOptions(
199 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
200 )
8990357d
JB
201 this.opts.workerChoiceStrategyOptions = {
202 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
203 ...opts.workerChoiceStrategyOptions
204 }
1f68cede 205 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
206 this.opts.enableEvents = opts.enableEvents ?? true
207 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
208 if (this.opts.enableTasksQueue) {
bde6b5d7 209 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 216 }
aee46736
JB
217 }
218
0d80593b
JB
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
221 ): void {
2324f8c9
JB
222 if (
223 workerChoiceStrategyOptions != null &&
224 !isPlainObject(workerChoiceStrategyOptions)
225 ) {
0d80593b
JB
226 throw new TypeError(
227 'Invalid worker choice strategy options: must be a plain object'
228 )
229 }
8990357d 230 if (
2324f8c9 231 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 232 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
233 ) {
234 throw new TypeError(
8c0b113f 235 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
236 )
237 }
238 if (
2324f8c9 239 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 240 workerChoiceStrategyOptions.retries < 0
8990357d
JB
241 ) {
242 throw new RangeError(
8c0b113f 243 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
244 )
245 }
49be33fe 246 if (
2324f8c9 247 workerChoiceStrategyOptions?.weights != null &&
6b27d407 248 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
249 ) {
250 throw new Error(
251 'Invalid worker choice strategy options: must have a weight for each worker node'
252 )
253 }
f0d7f803 254 if (
2324f8c9 255 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
256 !Object.values(Measurements).includes(
257 workerChoiceStrategyOptions.measurement
258 )
259 ) {
260 throw new Error(
261 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
262 )
263 }
0d80593b
JB
264 }
265
b5604034 266 private initializeEventEmitter (): void {
5b7d00b4 267 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
268 name: `poolifier:${this.type}-${this.worker}-pool`
269 })
270 }
271
08f3f44c 272 /** @inheritDoc */
6b27d407
JB
273 public get info (): PoolInfo {
274 return {
23ccf9d7 275 version,
6b27d407 276 type: this.type,
184855e6 277 worker: this.worker,
47352846 278 started: this.started,
2431bdb4
JB
279 ready: this.ready,
280 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
281 minSize: this.minSize,
282 maxSize: this.maxSize,
c05f0d50
JB
283 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .runTime.aggregate &&
1305e9a8
JB
285 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
286 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
287 workerNodes: this.workerNodes.length,
288 idleWorkerNodes: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
f59e1027 290 workerNode.usage.tasks.executing === 0
a4e07f72
JB
291 ? accumulator + 1
292 : accumulator,
6b27d407
JB
293 0
294 ),
295 busyWorkerNodes: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
f59e1027 297 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
298 0
299 ),
a4e07f72 300 executedTasks: this.workerNodes.reduce(
6b27d407 301 (accumulator, workerNode) =>
f59e1027 302 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
303 0
304 ),
305 executingTasks: this.workerNodes.reduce(
306 (accumulator, workerNode) =>
f59e1027 307 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
308 0
309 ),
daf86646
JB
310 ...(this.opts.enableTasksQueue === true && {
311 queuedTasks: this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.tasks.queued,
314 0
315 )
316 }),
317 ...(this.opts.enableTasksQueue === true && {
318 maxQueuedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
321 0
322 )
323 }),
a1763c54
JB
324 ...(this.opts.enableTasksQueue === true && {
325 backPressure: this.hasBackPressure()
326 }),
68cbdc84
JB
327 ...(this.opts.enableTasksQueue === true && {
328 stolenTasks: this.workerNodes.reduce(
329 (accumulator, workerNode) =>
330 accumulator + workerNode.usage.tasks.stolen,
331 0
332 )
333 }),
a4e07f72
JB
334 failedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
f59e1027 336 accumulator + workerNode.usage.tasks.failed,
a4e07f72 337 0
1dcf8b7b
JB
338 ),
339 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
340 .runTime.aggregate && {
341 runTime: {
98e72cda 342 minimum: round(
90d6701c 343 min(
98e72cda 344 ...this.workerNodes.map(
041dc05b 345 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 346 )
1dcf8b7b
JB
347 )
348 ),
98e72cda 349 maximum: round(
90d6701c 350 max(
98e72cda 351 ...this.workerNodes.map(
041dc05b 352 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 353 )
1dcf8b7b 354 )
98e72cda 355 ),
3baa0837
JB
356 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .runTime.average && {
358 average: round(
359 average(
360 this.workerNodes.reduce<number[]>(
361 (accumulator, workerNode) =>
362 accumulator.concat(workerNode.usage.runTime.history),
363 []
364 )
98e72cda 365 )
dc021bcc 366 )
3baa0837 367 }),
98e72cda
JB
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.median && {
370 median: round(
371 median(
3baa0837
JB
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
98e72cda
JB
376 )
377 )
378 )
379 })
1dcf8b7b
JB
380 }
381 }),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .waitTime.aggregate && {
384 waitTime: {
98e72cda 385 minimum: round(
90d6701c 386 min(
98e72cda 387 ...this.workerNodes.map(
041dc05b 388 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 389 )
1dcf8b7b
JB
390 )
391 ),
98e72cda 392 maximum: round(
90d6701c 393 max(
98e72cda 394 ...this.workerNodes.map(
041dc05b 395 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 396 )
1dcf8b7b 397 )
98e72cda 398 ),
3baa0837
JB
399 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
400 .waitTime.average && {
401 average: round(
402 average(
403 this.workerNodes.reduce<number[]>(
404 (accumulator, workerNode) =>
405 accumulator.concat(workerNode.usage.waitTime.history),
406 []
407 )
98e72cda 408 )
dc021bcc 409 )
3baa0837 410 }),
98e72cda
JB
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.median && {
413 median: round(
414 median(
3baa0837
JB
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
98e72cda
JB
419 )
420 )
421 )
422 })
1dcf8b7b
JB
423 }
424 })
6b27d407
JB
425 }
426 }
08f3f44c 427
aa9eede8
JB
428 /**
429 * The pool readiness boolean status.
430 */
2431bdb4
JB
431 private get ready (): boolean {
432 return (
b97d82d8
JB
433 this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 !workerNode.info.dynamic && workerNode.info.ready
436 ? accumulator + 1
437 : accumulator,
438 0
439 ) >= this.minSize
2431bdb4
JB
440 )
441 }
442
afe0d5bf 443 /**
aa9eede8 444 * The approximate pool utilization.
afe0d5bf
JB
445 *
446 * @returns The pool utilization.
447 */
448 private get utilization (): number {
8e5ca040 449 const poolTimeCapacity =
fe7d90db 450 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
451 const totalTasksRunTime = this.workerNodes.reduce(
452 (accumulator, workerNode) =>
71514351 453 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
454 0
455 )
456 const totalTasksWaitTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
71514351 458 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
459 0
460 )
8e5ca040 461 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
462 }
463
8881ae32 464 /**
aa9eede8 465 * The pool type.
8881ae32
JB
466 *
467 * If it is `'dynamic'`, it provides the `max` property.
468 */
469 protected abstract get type (): PoolType
470
184855e6 471 /**
aa9eede8 472 * The worker type.
184855e6
JB
473 */
474 protected abstract get worker (): WorkerType
475
c2ade475 476 /**
aa9eede8 477 * The pool minimum size.
c2ade475 478 */
8735b4e5
JB
479 protected get minSize (): number {
480 return this.numberOfWorkers
481 }
ff733df7
JB
482
483 /**
aa9eede8 484 * The pool maximum size.
ff733df7 485 */
8735b4e5
JB
486 protected get maxSize (): number {
487 return this.max ?? this.numberOfWorkers
488 }
a35560ba 489
6b813701
JB
490 /**
491 * Checks if the worker id sent in the received message from a worker is valid.
492 *
493 * @param message - The received message.
494 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
495 */
4d159167 496 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
497 if (message.workerId == null) {
498 throw new Error('Worker message received without worker id')
8e5a7cf9 499 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
500 throw new Error(
501 `Worker message received from unknown worker '${message.workerId}'`
502 )
503 }
504 }
505
ffcbbad8 506 /**
f06e48d8 507 * Gets the given worker its worker node key.
ffcbbad8
JB
508 *
509 * @param worker - The worker.
f59e1027 510 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 511 */
aad6fb64 512 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 513 return this.workerNodes.findIndex(
041dc05b 514 workerNode => workerNode.worker === worker
f06e48d8 515 )
bf9549ae
JB
516 }
517
aa9eede8
JB
518 /**
519 * Gets the worker node key given its worker id.
520 *
521 * @param workerId - The worker id.
aad6fb64 522 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 523 */
72ae84a2 524 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 525 return this.workerNodes.findIndex(
041dc05b 526 workerNode => workerNode.info.id === workerId
aad6fb64 527 )
aa9eede8
JB
528 }
529
afc003b2 530 /** @inheritDoc */
a35560ba 531 public setWorkerChoiceStrategy (
59219cbb
JB
532 workerChoiceStrategy: WorkerChoiceStrategy,
533 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 534 ): void {
bde6b5d7 535 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 536 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
537 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
538 this.opts.workerChoiceStrategy
539 )
540 if (workerChoiceStrategyOptions != null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 }
aa9eede8 543 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 544 workerNode.resetUsage()
9edb9717 545 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 546 }
a20f0ba5
JB
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
552 ): void {
0d80593b 553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
554 this.opts.workerChoiceStrategyOptions = {
555 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
556 ...workerChoiceStrategyOptions
557 }
a20f0ba5
JB
558 this.workerChoiceStrategyContext.setOptions(
559 this.opts.workerChoiceStrategyOptions
a35560ba
S
560 )
561 }
562
a20f0ba5 563 /** @inheritDoc */
8f52842f
JB
564 public enableTasksQueue (
565 enable: boolean,
566 tasksQueueOptions?: TasksQueueOptions
567 ): void {
a20f0ba5 568 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
ef41a6e6 571 this.flushTasksQueues()
a20f0ba5
JB
572 }
573 this.opts.enableTasksQueue = enable
8f52842f 574 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
575 }
576
577 /** @inheritDoc */
8f52842f 578 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 579 if (this.opts.enableTasksQueue === true) {
bde6b5d7 580 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
581 this.opts.tasksQueueOptions =
582 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 583 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
584 if (this.opts.tasksQueueOptions.taskStealing === true) {
585 this.setTaskStealing()
586 } else {
587 this.unsetTaskStealing()
588 }
589 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
590 this.setTasksStealingOnBackPressure()
591 } else {
592 this.unsetTasksStealingOnBackPressure()
593 }
5baee0d7 594 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
595 delete this.opts.tasksQueueOptions
596 }
597 }
598
9b38ab2d
JB
599 private buildTasksQueueOptions (
600 tasksQueueOptions: TasksQueueOptions
601 ): TasksQueueOptions {
602 return {
603 ...{
604 size: Math.pow(this.maxSize, 2),
605 concurrency: 1,
606 taskStealing: true,
607 tasksStealingOnBackPressure: true
608 },
609 ...tasksQueueOptions
610 }
611 }
612
5b49e864 613 private setTasksQueueSize (size: number): void {
20c6f652 614 for (const workerNode of this.workerNodes) {
ff3f866a 615 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
616 }
617 }
618
d6ca1416
JB
619 private setTaskStealing (): void {
620 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
621 this.workerNodes[workerNodeKey].addEventListener(
622 'emptyqueue',
9b85e2d0 623 this.handleEmptyQueueEvent as EventListener
9f95d5eb 624 )
d6ca1416
JB
625 }
626 }
627
628 private unsetTaskStealing (): void {
629 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
630 this.workerNodes[workerNodeKey].removeEventListener(
631 'emptyqueue',
9b85e2d0 632 this.handleEmptyQueueEvent as EventListener
9f95d5eb 633 )
d6ca1416
JB
634 }
635 }
636
637 private setTasksStealingOnBackPressure (): void {
638 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
639 this.workerNodes[workerNodeKey].addEventListener(
640 'backpressure',
9b85e2d0 641 this.handleBackPressureEvent as EventListener
9f95d5eb 642 )
d6ca1416
JB
643 }
644 }
645
646 private unsetTasksStealingOnBackPressure (): void {
647 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
648 this.workerNodes[workerNodeKey].removeEventListener(
649 'backpressure',
9b85e2d0 650 this.handleBackPressureEvent as EventListener
9f95d5eb 651 )
d6ca1416
JB
652 }
653 }
654
c319c66b
JB
655 /**
656 * Whether the pool is full or not.
657 *
658 * The pool filling boolean status.
659 */
dea903a8
JB
660 protected get full (): boolean {
661 return this.workerNodes.length >= this.maxSize
662 }
c2ade475 663
c319c66b
JB
664 /**
665 * Whether the pool is busy or not.
666 *
667 * The pool busyness boolean status.
668 */
669 protected abstract get busy (): boolean
7c0ba920 670
6c6afb84 671 /**
3d76750a 672 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
673 *
674 * @returns Worker nodes busyness boolean status.
675 */
c2ade475 676 protected internalBusy (): boolean {
3d76750a
JB
677 if (this.opts.enableTasksQueue === true) {
678 return (
679 this.workerNodes.findIndex(
041dc05b 680 workerNode =>
3d76750a
JB
681 workerNode.info.ready &&
682 workerNode.usage.tasks.executing <
683 (this.opts.tasksQueueOptions?.concurrency as number)
684 ) === -1
685 )
3d76750a 686 }
419e8121
JB
687 return (
688 this.workerNodes.findIndex(
689 workerNode =>
690 workerNode.info.ready && workerNode.usage.tasks.executing === 0
691 ) === -1
692 )
cb70b19d
JB
693 }
694
e81c38f2 695 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
696 workerNodeKey: number,
697 message: MessageValue<Data>
698 ): Promise<boolean> {
72ae84a2 699 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
700 const taskFunctionOperationListener = (
701 message: MessageValue<Response>
702 ): void => {
703 this.checkMessageWorkerId(message)
704 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 705 if (
ae036c3e
JB
706 message.taskFunctionOperationStatus != null &&
707 message.workerId === workerId
72ae84a2 708 ) {
ae036c3e
JB
709 if (message.taskFunctionOperationStatus) {
710 resolve(true)
711 } else if (!message.taskFunctionOperationStatus) {
712 reject(
713 new Error(
714 `Task function operation '${
715 message.taskFunctionOperation as string
716 }' failed on worker ${message.workerId} with error: '${
717 message.workerError?.message as string
718 }'`
719 )
72ae84a2 720 )
ae036c3e
JB
721 }
722 this.deregisterWorkerMessageListener(
723 this.getWorkerNodeKeyByWorkerId(message.workerId),
724 taskFunctionOperationListener
72ae84a2
JB
725 )
726 }
ae036c3e
JB
727 }
728 this.registerWorkerMessageListener(
729 workerNodeKey,
730 taskFunctionOperationListener
731 )
72ae84a2
JB
732 this.sendToWorker(workerNodeKey, message)
733 })
734 }
735
736 private async sendTaskFunctionOperationToWorkers (
adee6053 737 message: MessageValue<Data>
e81c38f2
JB
738 ): Promise<boolean> {
739 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
740 const responsesReceived = new Array<MessageValue<Response>>()
741 const taskFunctionOperationsListener = (
742 message: MessageValue<Response>
743 ): void => {
744 this.checkMessageWorkerId(message)
745 if (message.taskFunctionOperationStatus != null) {
746 responsesReceived.push(message)
747 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 748 if (
e81c38f2
JB
749 responsesReceived.every(
750 message => message.taskFunctionOperationStatus === true
751 )
752 ) {
753 resolve(true)
754 } else if (
e81c38f2
JB
755 responsesReceived.some(
756 message => message.taskFunctionOperationStatus === false
757 )
758 ) {
b0b55f57
JB
759 const errorResponse = responsesReceived.find(
760 response => response.taskFunctionOperationStatus === false
761 )
e81c38f2
JB
762 reject(
763 new Error(
b0b55f57 764 `Task function operation '${
e81c38f2 765 message.taskFunctionOperation as string
b0b55f57
JB
766 }' failed on worker ${
767 errorResponse?.workerId as number
768 } with error: '${
769 errorResponse?.workerError?.message as string
770 }'`
e81c38f2
JB
771 )
772 )
773 }
ae036c3e
JB
774 this.deregisterWorkerMessageListener(
775 this.getWorkerNodeKeyByWorkerId(message.workerId),
776 taskFunctionOperationsListener
777 )
e81c38f2 778 }
ae036c3e
JB
779 }
780 }
781 for (const [workerNodeKey] of this.workerNodes.entries()) {
782 this.registerWorkerMessageListener(
783 workerNodeKey,
784 taskFunctionOperationsListener
785 )
72ae84a2 786 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
787 }
788 })
6703b9f4
JB
789 }
790
791 /** @inheritDoc */
792 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
793 for (const workerNode of this.workerNodes) {
794 if (
795 Array.isArray(workerNode.info.taskFunctionNames) &&
796 workerNode.info.taskFunctionNames.includes(name)
797 ) {
798 return true
799 }
800 }
801 return false
6703b9f4
JB
802 }
803
804 /** @inheritDoc */
e81c38f2
JB
805 public async addTaskFunction (
806 name: string,
3feeab69 807 fn: TaskFunction<Data, Response>
e81c38f2 808 ): Promise<boolean> {
3feeab69
JB
809 if (typeof name !== 'string') {
810 throw new TypeError('name argument must be a string')
811 }
812 if (typeof name === 'string' && name.trim().length === 0) {
813 throw new TypeError('name argument must not be an empty string')
814 }
815 if (typeof fn !== 'function') {
816 throw new TypeError('fn argument must be a function')
817 }
adee6053 818 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
819 taskFunctionOperation: 'add',
820 taskFunctionName: name,
3feeab69 821 taskFunction: fn.toString()
6703b9f4 822 })
adee6053
JB
823 this.taskFunctions.set(name, fn)
824 return opResult
6703b9f4
JB
825 }
826
827 /** @inheritDoc */
e81c38f2 828 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
829 if (!this.taskFunctions.has(name)) {
830 throw new Error(
16248b23 831 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
832 )
833 }
adee6053 834 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
835 taskFunctionOperation: 'remove',
836 taskFunctionName: name
837 })
adee6053
JB
838 this.deleteTaskFunctionWorkerUsages(name)
839 this.taskFunctions.delete(name)
840 return opResult
6703b9f4
JB
841 }
842
90d7d101 843 /** @inheritDoc */
6703b9f4 844 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
845 for (const workerNode of this.workerNodes) {
846 if (
6703b9f4
JB
847 Array.isArray(workerNode.info.taskFunctionNames) &&
848 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 849 ) {
6703b9f4 850 return workerNode.info.taskFunctionNames
f2dbbf95 851 }
90d7d101 852 }
f2dbbf95 853 return []
90d7d101
JB
854 }
855
6703b9f4 856 /** @inheritDoc */
e81c38f2 857 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 858 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
859 taskFunctionOperation: 'default',
860 taskFunctionName: name
861 })
6703b9f4
JB
862 }
863
adee6053
JB
864 private deleteTaskFunctionWorkerUsages (name: string): void {
865 for (const workerNode of this.workerNodes) {
866 workerNode.deleteTaskFunctionWorkerUsage(name)
867 }
868 }
869
375f7504
JB
870 private shallExecuteTask (workerNodeKey: number): boolean {
871 return (
872 this.tasksQueueSize(workerNodeKey) === 0 &&
873 this.workerNodes[workerNodeKey].usage.tasks.executing <
874 (this.opts.tasksQueueOptions?.concurrency as number)
875 )
876 }
877
afc003b2 878 /** @inheritDoc */
7d91a8cd
JB
879 public async execute (
880 data?: Data,
881 name?: string,
882 transferList?: TransferListItem[]
883 ): Promise<Response> {
52b71763 884 return await new Promise<Response>((resolve, reject) => {
15b176e0 885 if (!this.started) {
47352846 886 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 887 return
15b176e0 888 }
7d91a8cd
JB
889 if (name != null && typeof name !== 'string') {
890 reject(new TypeError('name argument must be a string'))
9d2d0da1 891 return
7d91a8cd 892 }
90d7d101
JB
893 if (
894 name != null &&
895 typeof name === 'string' &&
896 name.trim().length === 0
897 ) {
f58b60b9 898 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 899 return
90d7d101 900 }
b558f6b5
JB
901 if (transferList != null && !Array.isArray(transferList)) {
902 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 903 return
b558f6b5
JB
904 }
905 const timestamp = performance.now()
906 const workerNodeKey = this.chooseWorkerNode()
501aea93 907 const task: Task<Data> = {
52b71763
JB
908 name: name ?? DEFAULT_TASK_NAME,
909 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
910 data: data ?? ({} as Data),
7d91a8cd 911 transferList,
52b71763 912 timestamp,
7629bdf1 913 taskId: randomUUID()
52b71763 914 }
7629bdf1 915 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
916 resolve,
917 reject,
501aea93 918 workerNodeKey
2e81254d 919 })
52b71763 920 if (
4e377863
JB
921 this.opts.enableTasksQueue === false ||
922 (this.opts.enableTasksQueue === true &&
375f7504 923 this.shallExecuteTask(workerNodeKey))
52b71763 924 ) {
501aea93 925 this.executeTask(workerNodeKey, task)
4e377863
JB
926 } else {
927 this.enqueueTask(workerNodeKey, task)
52b71763 928 }
2e81254d 929 })
280c2a77 930 }
c97c7edb 931
47352846
JB
932 /** @inheritdoc */
933 public start (): void {
934 this.starting = true
935 while (
936 this.workerNodes.reduce(
937 (accumulator, workerNode) =>
938 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
939 0
940 ) < this.numberOfWorkers
941 ) {
942 this.createAndSetupWorkerNode()
943 }
944 this.starting = false
945 this.started = true
946 }
947
afc003b2 948 /** @inheritDoc */
c97c7edb 949 public async destroy (): Promise<void> {
1fbcaa7c 950 await Promise.all(
81c02522 951 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 952 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
953 })
954 )
33e6bb4c 955 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 956 this.emitter?.emitDestroy()
15b176e0 957 this.started = false
c97c7edb
S
958 }
959
1e3214b6 960 protected async sendKillMessageToWorker (
72ae84a2 961 workerNodeKey: number
1e3214b6 962 ): Promise<void> {
9edb9717 963 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
964 const killMessageListener = (message: MessageValue<Response>): void => {
965 this.checkMessageWorkerId(message)
1e3214b6
JB
966 if (message.kill === 'success') {
967 resolve()
968 } else if (message.kill === 'failure') {
72ae84a2
JB
969 reject(
970 new Error(
ae036c3e 971 `Kill message handling failed on worker ${
72ae84a2 972 message.workerId as number
ae036c3e 973 }`
72ae84a2
JB
974 )
975 )
1e3214b6 976 }
ae036c3e 977 }
51d9cfbd 978 // FIXME: should be registered only once
ae036c3e 979 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 980 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 981 })
1e3214b6
JB
982 }
983
4a6952ff 984 /**
aa9eede8 985 * Terminates the worker node given its worker node key.
4a6952ff 986 *
aa9eede8 987 * @param workerNodeKey - The worker node key.
4a6952ff 988 */
81c02522 989 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 990
729c563d 991 /**
6677a3d3
JB
992 * Setup hook to execute code before worker nodes are created in the abstract constructor.
993 * Can be overridden.
afc003b2
JB
994 *
995 * @virtual
729c563d 996 */
280c2a77 997 protected setupHook (): void {
965df41c 998 /* Intentionally empty */
280c2a77 999 }
c97c7edb 1000
729c563d 1001 /**
280c2a77
S
1002 * Should return whether the worker is the main worker or not.
1003 */
1004 protected abstract isMain (): boolean
1005
1006 /**
2e81254d 1007 * Hook executed before the worker task execution.
bf9549ae 1008 * Can be overridden.
729c563d 1009 *
f06e48d8 1010 * @param workerNodeKey - The worker node key.
1c6fe997 1011 * @param task - The task to execute.
729c563d 1012 */
1c6fe997
JB
1013 protected beforeTaskExecutionHook (
1014 workerNodeKey: number,
1015 task: Task<Data>
1016 ): void {
94407def
JB
1017 if (this.workerNodes[workerNodeKey]?.usage != null) {
1018 const workerUsage = this.workerNodes[workerNodeKey].usage
1019 ++workerUsage.tasks.executing
1020 this.updateWaitTimeWorkerUsage(workerUsage, task)
1021 }
1022 if (
1023 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1024 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1025 task.name as string
1026 ) != null
1027 ) {
db0e38ee 1028 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1029 workerNodeKey
db0e38ee 1030 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1031 ++taskFunctionWorkerUsage.tasks.executing
1032 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1033 }
c97c7edb
S
1034 }
1035
c01733f1 1036 /**
2e81254d 1037 * Hook executed after the worker task execution.
bf9549ae 1038 * Can be overridden.
c01733f1 1039 *
501aea93 1040 * @param workerNodeKey - The worker node key.
38e795c1 1041 * @param message - The received message.
c01733f1 1042 */
2e81254d 1043 protected afterTaskExecutionHook (
501aea93 1044 workerNodeKey: number,
2740a743 1045 message: MessageValue<Response>
bf9549ae 1046 ): void {
94407def
JB
1047 if (this.workerNodes[workerNodeKey]?.usage != null) {
1048 const workerUsage = this.workerNodes[workerNodeKey].usage
1049 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1050 this.updateRunTimeWorkerUsage(workerUsage, message)
1051 this.updateEluWorkerUsage(workerUsage, message)
1052 }
1053 if (
1054 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1055 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1056 message.taskPerformance?.name as string
94407def
JB
1057 ) != null
1058 ) {
db0e38ee 1059 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1060 workerNodeKey
db0e38ee 1061 ].getTaskFunctionWorkerUsage(
0628755c 1062 message.taskPerformance?.name as string
b558f6b5 1063 ) as WorkerUsage
db0e38ee
JB
1064 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1065 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1066 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1067 }
1068 }
1069
db0e38ee
JB
1070 /**
1071 * Whether the worker node shall update its task function worker usage or not.
1072 *
1073 * @param workerNodeKey - The worker node key.
1074 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1075 */
1076 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1077 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1078 return (
94407def 1079 workerInfo != null &&
6703b9f4
JB
1080 Array.isArray(workerInfo.taskFunctionNames) &&
1081 workerInfo.taskFunctionNames.length > 2
b558f6b5 1082 )
f1c06930
JB
1083 }
1084
1085 private updateTaskStatisticsWorkerUsage (
1086 workerUsage: WorkerUsage,
1087 message: MessageValue<Response>
1088 ): void {
a4e07f72 1089 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1090 if (
1091 workerTaskStatistics.executing != null &&
1092 workerTaskStatistics.executing > 0
1093 ) {
1094 --workerTaskStatistics.executing
5bb5be17 1095 }
6703b9f4 1096 if (message.workerError == null) {
98e72cda
JB
1097 ++workerTaskStatistics.executed
1098 } else {
a4e07f72 1099 ++workerTaskStatistics.failed
2740a743 1100 }
f8eb0a2a
JB
1101 }
1102
a4e07f72
JB
1103 private updateRunTimeWorkerUsage (
1104 workerUsage: WorkerUsage,
f8eb0a2a
JB
1105 message: MessageValue<Response>
1106 ): void {
6703b9f4 1107 if (message.workerError != null) {
dc021bcc
JB
1108 return
1109 }
e4f20deb
JB
1110 updateMeasurementStatistics(
1111 workerUsage.runTime,
1112 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1113 message.taskPerformance?.runTime ?? 0
e4f20deb 1114 )
f8eb0a2a
JB
1115 }
1116
a4e07f72
JB
1117 private updateWaitTimeWorkerUsage (
1118 workerUsage: WorkerUsage,
1c6fe997 1119 task: Task<Data>
f8eb0a2a 1120 ): void {
1c6fe997
JB
1121 const timestamp = performance.now()
1122 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1123 updateMeasurementStatistics(
1124 workerUsage.waitTime,
1125 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1126 taskWaitTime
e4f20deb 1127 )
c01733f1 1128 }
1129
a4e07f72 1130 private updateEluWorkerUsage (
5df69fab 1131 workerUsage: WorkerUsage,
62c15a68
JB
1132 message: MessageValue<Response>
1133 ): void {
6703b9f4 1134 if (message.workerError != null) {
dc021bcc
JB
1135 return
1136 }
008512c7
JB
1137 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1138 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1139 updateMeasurementStatistics(
1140 workerUsage.elu.active,
008512c7 1141 eluTaskStatisticsRequirements,
dc021bcc 1142 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1143 )
1144 updateMeasurementStatistics(
1145 workerUsage.elu.idle,
008512c7 1146 eluTaskStatisticsRequirements,
dc021bcc 1147 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1148 )
008512c7 1149 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1150 if (message.taskPerformance?.elu != null) {
f7510105
JB
1151 if (workerUsage.elu.utilization != null) {
1152 workerUsage.elu.utilization =
1153 (workerUsage.elu.utilization +
1154 message.taskPerformance.elu.utilization) /
1155 2
1156 } else {
1157 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1158 }
62c15a68
JB
1159 }
1160 }
1161 }
1162
280c2a77 1163 /**
f06e48d8 1164 * Chooses a worker node for the next task.
280c2a77 1165 *
6c6afb84 1166 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1167 *
aa9eede8 1168 * @returns The chosen worker node key
280c2a77 1169 */
6c6afb84 1170 private chooseWorkerNode (): number {
930dcf12 1171 if (this.shallCreateDynamicWorker()) {
aa9eede8 1172 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1173 if (
b1aae695 1174 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1175 ) {
aa9eede8 1176 return workerNodeKey
6c6afb84 1177 }
17393ac8 1178 }
930dcf12
JB
1179 return this.workerChoiceStrategyContext.execute()
1180 }
1181
6c6afb84
JB
1182 /**
1183 * Conditions for dynamic worker creation.
1184 *
1185 * @returns Whether to create a dynamic worker or not.
1186 */
1187 private shallCreateDynamicWorker (): boolean {
930dcf12 1188 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1189 }
1190
280c2a77 1191 /**
aa9eede8 1192 * Sends a message to worker given its worker node key.
280c2a77 1193 *
aa9eede8 1194 * @param workerNodeKey - The worker node key.
38e795c1 1195 * @param message - The message.
7d91a8cd 1196 * @param transferList - The optional array of transferable objects.
280c2a77
S
1197 */
1198 protected abstract sendToWorker (
aa9eede8 1199 workerNodeKey: number,
7d91a8cd
JB
1200 message: MessageValue<Data>,
1201 transferList?: TransferListItem[]
280c2a77
S
1202 ): void
1203
729c563d 1204 /**
41344292 1205 * Creates a new worker.
6c6afb84
JB
1206 *
1207 * @returns Newly created worker.
729c563d 1208 */
280c2a77 1209 protected abstract createWorker (): Worker
c97c7edb 1210
4a6952ff 1211 /**
aa9eede8 1212 * Creates a new, completely set up worker node.
4a6952ff 1213 *
aa9eede8 1214 * @returns New, completely set up worker node key.
4a6952ff 1215 */
aa9eede8 1216 protected createAndSetupWorkerNode (): number {
bdacc2d2 1217 const worker = this.createWorker()
280c2a77 1218
fd04474e 1219 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1220 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1221 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1222 worker.on('error', error => {
aad6fb64 1223 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1224 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1225 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1226 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1227 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1228 if (
b6bfca01 1229 this.started &&
9b38ab2d
JB
1230 !this.starting &&
1231 this.opts.restartWorkerOnError === true
15b176e0 1232 ) {
9b106837 1233 if (workerInfo.dynamic) {
aa9eede8 1234 this.createAndSetupDynamicWorkerNode()
8a1260a3 1235 } else {
aa9eede8 1236 this.createAndSetupWorkerNode()
8a1260a3 1237 }
5baee0d7 1238 }
9b38ab2d 1239 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1240 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1241 }
5baee0d7 1242 })
a35560ba 1243 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1244 worker.once('exit', () => {
f06e48d8 1245 this.removeWorkerNode(worker)
a974afa6 1246 })
280c2a77 1247
aa9eede8 1248 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1249
aa9eede8 1250 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1251
aa9eede8 1252 return workerNodeKey
c97c7edb 1253 }
be0676b3 1254
930dcf12 1255 /**
aa9eede8 1256 * Creates a new, completely set up dynamic worker node.
930dcf12 1257 *
aa9eede8 1258 * @returns New, completely set up dynamic worker node key.
930dcf12 1259 */
aa9eede8
JB
1260 protected createAndSetupDynamicWorkerNode (): number {
1261 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1262 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1263 this.checkMessageWorkerId(message)
aa9eede8
JB
1264 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1265 message.workerId
aad6fb64 1266 )
aa9eede8 1267 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1268 // Kill message received from worker
930dcf12
JB
1269 if (
1270 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1271 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1272 ((this.opts.enableTasksQueue === false &&
aa9eede8 1273 workerUsage.tasks.executing === 0) ||
7b56f532 1274 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1275 workerUsage.tasks.executing === 0 &&
1276 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1277 ) {
f3827d5d 1278 // Flag the worker node as not ready immediately
ae3ab61d 1279 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1280 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1281 this.emitter?.emit(PoolEvents.error, error)
1282 })
930dcf12
JB
1283 }
1284 })
46b0bb09 1285 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1286 this.sendToWorker(workerNodeKey, {
e9dd5b66 1287 checkActive: true
21f710aa 1288 })
72ae84a2
JB
1289 if (this.taskFunctions.size > 0) {
1290 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1291 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1292 taskFunctionOperation: 'add',
1293 taskFunctionName,
1294 taskFunction: taskFunction.toString()
1295 }).catch(error => {
1296 this.emitter?.emit(PoolEvents.error, error)
1297 })
1298 }
1299 }
b5e113f6 1300 workerInfo.dynamic = true
b1aae695
JB
1301 if (
1302 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1303 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1304 ) {
b5e113f6
JB
1305 workerInfo.ready = true
1306 }
33e6bb4c 1307 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1308 return workerNodeKey
930dcf12
JB
1309 }
1310
a2ed5053 1311 /**
aa9eede8 1312 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1313 *
aa9eede8 1314 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1315 * @param listener - The message listener callback.
1316 */
85aeb3f3
JB
1317 protected abstract registerWorkerMessageListener<
1318 Message extends Data | Response
aa9eede8
JB
1319 >(
1320 workerNodeKey: number,
1321 listener: (message: MessageValue<Message>) => void
1322 ): void
a2ed5053 1323
ae036c3e
JB
1324 /**
1325 * Registers once a listener callback on the worker given its worker node key.
1326 *
1327 * @param workerNodeKey - The worker node key.
1328 * @param listener - The message listener callback.
1329 */
1330 protected abstract registerOnceWorkerMessageListener<
1331 Message extends Data | Response
1332 >(
1333 workerNodeKey: number,
1334 listener: (message: MessageValue<Message>) => void
1335 ): void
1336
1337 /**
1338 * Deregisters a listener callback on the worker given its worker node key.
1339 *
1340 * @param workerNodeKey - The worker node key.
1341 * @param listener - The message listener callback.
1342 */
1343 protected abstract deregisterWorkerMessageListener<
1344 Message extends Data | Response
1345 >(
1346 workerNodeKey: number,
1347 listener: (message: MessageValue<Message>) => void
1348 ): void
1349
a2ed5053 1350 /**
aa9eede8 1351 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1352 * Can be overridden.
1353 *
aa9eede8 1354 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1355 */
aa9eede8 1356 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1357 // Listen to worker messages.
fcd39179
JB
1358 this.registerWorkerMessageListener(
1359 workerNodeKey,
1360 this.workerMessageListener.bind(this)
1361 )
85aeb3f3 1362 // Send the startup message to worker.
aa9eede8 1363 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1364 // Send the statistics message to worker.
1365 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1366 if (this.opts.enableTasksQueue === true) {
dbd73092 1367 if (this.opts.tasksQueueOptions?.taskStealing === true) {
9f95d5eb
JB
1368 this.workerNodes[workerNodeKey].addEventListener(
1369 'emptyqueue',
9b85e2d0 1370 this.handleEmptyQueueEvent as EventListener
9f95d5eb 1371 )
47352846
JB
1372 }
1373 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
9f95d5eb
JB
1374 this.workerNodes[workerNodeKey].addEventListener(
1375 'backpressure',
9b85e2d0 1376 this.handleBackPressureEvent as EventListener
9f95d5eb 1377 )
47352846 1378 }
72695f86 1379 }
d2c73f82
JB
1380 }
1381
85aeb3f3 1382 /**
aa9eede8
JB
1383 * Sends the startup message to worker given its worker node key.
1384 *
1385 * @param workerNodeKey - The worker node key.
1386 */
1387 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1388
1389 /**
9edb9717 1390 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1391 *
aa9eede8 1392 * @param workerNodeKey - The worker node key.
85aeb3f3 1393 */
9edb9717 1394 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1395 this.sendToWorker(workerNodeKey, {
1396 statistics: {
1397 runTime:
1398 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1399 .runTime.aggregate,
1400 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1401 .elu.aggregate
72ae84a2 1402 }
aa9eede8
JB
1403 })
1404 }
a2ed5053
JB
1405
1406 private redistributeQueuedTasks (workerNodeKey: number): void {
1407 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1408 const destinationWorkerNodeKey = this.workerNodes.reduce(
1409 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1410 return workerNode.info.ready &&
1411 workerNode.usage.tasks.queued <
1412 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1413 ? workerNodeKey
1414 : minWorkerNodeKey
1415 },
1416 0
1417 )
72ae84a2 1418 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1419 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1420 this.executeTask(destinationWorkerNodeKey, task)
1421 } else {
1422 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1423 }
1424 }
1425 }
1426
b1838604
JB
1427 private updateTaskStolenStatisticsWorkerUsage (
1428 workerNodeKey: number,
b1838604
JB
1429 taskName: string
1430 ): void {
1a880eca 1431 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1432 if (workerNode?.usage != null) {
1433 ++workerNode.usage.tasks.stolen
1434 }
1435 if (
1436 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1437 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1438 ) {
1439 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1440 taskName
1441 ) as WorkerUsage
1442 ++taskFunctionWorkerUsage.tasks.stolen
1443 }
1444 }
1445
9f95d5eb
JB
1446 private readonly handleEmptyQueueEvent = (
1447 event: CustomEvent<WorkerNodeEventDetail>
1448 ): void => {
1449 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1450 event.detail.workerId
1451 )
dd951876 1452 const workerNodes = this.workerNodes
a6b3272b 1453 .slice()
dd951876
JB
1454 .sort(
1455 (workerNodeA, workerNodeB) =>
1456 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1457 )
f201a0cd 1458 const sourceWorkerNode = workerNodes.find(
041dc05b 1459 workerNode =>
f201a0cd 1460 workerNode.info.ready &&
9f95d5eb 1461 workerNode.info.id !== event.detail.workerId &&
f201a0cd
JB
1462 workerNode.usage.tasks.queued > 0
1463 )
1464 if (sourceWorkerNode != null) {
72ae84a2 1465 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1466 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1467 this.executeTask(destinationWorkerNodeKey, task)
1468 } else {
1469 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1470 }
f201a0cd
JB
1471 this.updateTaskStolenStatisticsWorkerUsage(
1472 destinationWorkerNodeKey,
1473 task.name as string
1474 )
72695f86
JB
1475 }
1476 }
1477
9f95d5eb
JB
1478 private readonly handleBackPressureEvent = (
1479 event: CustomEvent<WorkerNodeEventDetail>
1480 ): void => {
f778c355
JB
1481 const sizeOffset = 1
1482 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1483 return
1484 }
72695f86 1485 const sourceWorkerNode =
9f95d5eb 1486 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
72695f86 1487 const workerNodes = this.workerNodes
a6b3272b 1488 .slice()
72695f86
JB
1489 .sort(
1490 (workerNodeA, workerNodeB) =>
1491 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1492 )
1493 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1494 if (
0bc68267 1495 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1496 workerNode.info.ready &&
9f95d5eb 1497 workerNode.info.id !== event.detail.workerId &&
0bc68267 1498 workerNode.usage.tasks.queued <
f778c355 1499 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1500 ) {
72ae84a2 1501 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1502 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1503 this.executeTask(workerNodeKey, task)
4de3d785 1504 } else {
dd951876 1505 this.enqueueTask(workerNodeKey, task)
4de3d785 1506 }
b1838604
JB
1507 this.updateTaskStolenStatisticsWorkerUsage(
1508 workerNodeKey,
b1838604
JB
1509 task.name as string
1510 )
10ecf8fd 1511 }
a2ed5053
JB
1512 }
1513 }
1514
be0676b3 1515 /**
fcd39179 1516 * This method is the message listener registered on each worker.
be0676b3 1517 */
fcd39179
JB
1518 protected workerMessageListener (message: MessageValue<Response>): void {
1519 this.checkMessageWorkerId(message)
1520 if (message.ready != null && message.taskFunctionNames != null) {
1521 // Worker ready response received from worker
1522 this.handleWorkerReadyResponse(message)
1523 } else if (message.taskId != null) {
1524 // Task execution response received from worker
1525 this.handleTaskExecutionResponse(message)
1526 } else if (message.taskFunctionNames != null) {
1527 // Task function names message received from worker
1528 this.getWorkerInfo(
1529 this.getWorkerNodeKeyByWorkerId(message.workerId)
1530 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1531 }
1532 }
1533
10e2aa7e 1534 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1535 if (message.ready === false) {
72ae84a2
JB
1536 throw new Error(
1537 `Worker ${message.workerId as number} failed to initialize`
1538 )
f05ed93c 1539 }
a5d15204 1540 const workerInfo = this.getWorkerInfo(
aad6fb64 1541 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1542 )
a5d15204 1543 workerInfo.ready = message.ready as boolean
6703b9f4 1544 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d 1545 if (this.ready) {
d91689fd
JB
1546 const emitPoolReadyEventOnce = once(
1547 () => this.emitter?.emit(PoolEvents.ready, this.info),
1548 this
1549 )
1550 emitPoolReadyEventOnce()
2431bdb4 1551 }
6b272951
JB
1552 }
1553
1554 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1555 const { taskId, workerError, data } = message
5441aea6 1556 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1557 if (promiseResponse != null) {
6703b9f4
JB
1558 if (workerError != null) {
1559 this.emitter?.emit(PoolEvents.taskError, workerError)
1560 promiseResponse.reject(workerError.message)
6b272951 1561 } else {
5441aea6 1562 promiseResponse.resolve(data as Response)
6b272951 1563 }
501aea93
JB
1564 const workerNodeKey = promiseResponse.workerNodeKey
1565 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1566 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1567 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1568 if (
1569 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1570 this.tasksQueueSize(workerNodeKey) > 0 &&
1571 this.workerNodes[workerNodeKey].usage.tasks.executing <
1572 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1573 ) {
1574 this.executeTask(
1575 workerNodeKey,
1576 this.dequeueTask(workerNodeKey) as Task<Data>
1577 )
be0676b3
APA
1578 }
1579 }
be0676b3 1580 }
7c0ba920 1581
a1763c54 1582 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1583 if (this.busy) {
1584 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1585 }
1586 }
1587
1588 private checkAndEmitTaskQueuingEvents (): void {
1589 if (this.hasBackPressure()) {
1590 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1591 }
1592 }
1593
33e6bb4c
JB
1594 private checkAndEmitDynamicWorkerCreationEvents (): void {
1595 if (this.type === PoolTypes.dynamic) {
1596 if (this.full) {
1597 this.emitter?.emit(PoolEvents.full, this.info)
1598 }
1599 }
1600 }
1601
8a1260a3 1602 /**
aa9eede8 1603 * Gets the worker information given its worker node key.
8a1260a3
JB
1604 *
1605 * @param workerNodeKey - The worker node key.
3f09ed9f 1606 * @returns The worker information.
8a1260a3 1607 */
46b0bb09 1608 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1609 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1610 }
1611
a05c10de 1612 /**
b0a4db63 1613 * Adds the given worker in the pool worker nodes.
ea7a90d3 1614 *
38e795c1 1615 * @param worker - The worker.
aa9eede8
JB
1616 * @returns The added worker node key.
1617 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1618 */
b0a4db63 1619 private addWorkerNode (worker: Worker): number {
671d5154
JB
1620 const workerNode = new WorkerNode<Worker, Data>(
1621 worker,
ff3f866a 1622 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1623 )
b97d82d8 1624 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1625 if (this.starting) {
1626 workerNode.info.ready = true
1627 }
aa9eede8 1628 this.workerNodes.push(workerNode)
aad6fb64 1629 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1630 if (workerNodeKey === -1) {
86ed0598 1631 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1632 }
1633 return workerNodeKey
ea7a90d3 1634 }
c923ce56 1635
51fe3d3c 1636 /**
f06e48d8 1637 * Removes the given worker from the pool worker nodes.
51fe3d3c 1638 *
f06e48d8 1639 * @param worker - The worker.
51fe3d3c 1640 */
416fd65c 1641 private removeWorkerNode (worker: Worker): void {
aad6fb64 1642 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1643 if (workerNodeKey !== -1) {
1644 this.workerNodes.splice(workerNodeKey, 1)
1645 this.workerChoiceStrategyContext.remove(workerNodeKey)
1646 }
51fe3d3c 1647 }
adc3c320 1648
ae3ab61d
JB
1649 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1650 this.getWorkerInfo(workerNodeKey).ready = false
1651 }
1652
e2b31e32
JB
1653 /** @inheritDoc */
1654 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1655 return (
e2b31e32
JB
1656 this.opts.enableTasksQueue === true &&
1657 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1658 )
1659 }
1660
1661 private hasBackPressure (): boolean {
1662 return (
1663 this.opts.enableTasksQueue === true &&
1664 this.workerNodes.findIndex(
041dc05b 1665 workerNode => !workerNode.hasBackPressure()
a1763c54 1666 ) === -1
9e844245 1667 )
e2b31e32
JB
1668 }
1669
b0a4db63 1670 /**
aa9eede8 1671 * Executes the given task on the worker given its worker node key.
b0a4db63 1672 *
aa9eede8 1673 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1674 * @param task - The task to execute.
1675 */
2e81254d 1676 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1677 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1678 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1679 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1680 }
1681
f9f00b5f 1682 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1683 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1684 this.checkAndEmitTaskQueuingEvents()
1685 return tasksQueueSize
adc3c320
JB
1686 }
1687
416fd65c 1688 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1689 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1690 }
1691
416fd65c 1692 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1693 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1694 }
1695
81c02522 1696 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1697 while (this.tasksQueueSize(workerNodeKey) > 0) {
1698 this.executeTask(
1699 workerNodeKey,
1700 this.dequeueTask(workerNodeKey) as Task<Data>
1701 )
ff733df7 1702 }
4b628b48 1703 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1704 }
1705
ef41a6e6
JB
1706 private flushTasksQueues (): void {
1707 for (const [workerNodeKey] of this.workerNodes.entries()) {
1708 this.flushTasksQueue(workerNodeKey)
1709 }
1710 }
c97c7edb 1711}