refactor: improve task function operation handling on the worker side
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
15b176e0
JB
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
47352846
JB
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
47352846 157 this.started = false
075e51d1 158 this.starting = false
47352846
JB
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 224 if (isPlainObject(opts)) {
47352846 225 this.opts.startWorkers = opts.startWorkers ?? true
2324f8c9
JB
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
0d80593b
JB
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
8990357d
JB
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
1f68cede 238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 251 }
aee46736
JB
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
2324f8c9
JB
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
b529c323 261 throw new Error(
aee46736 262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
263 )
264 }
7c0ba920
JB
265 }
266
0d80593b
JB
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
2324f8c9
JB
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
0d80593b
JB
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
8990357d 278 if (
2324f8c9 279 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
281 ) {
282 throw new TypeError(
8c0b113f 283 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
284 )
285 }
286 if (
2324f8c9 287 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 288 workerChoiceStrategyOptions.retries < 0
8990357d
JB
289 ) {
290 throw new RangeError(
8c0b113f 291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
292 )
293 }
49be33fe 294 if (
2324f8c9 295 workerChoiceStrategyOptions?.weights != null &&
6b27d407 296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
f0d7f803 302 if (
2324f8c9 303 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
0d80593b
JB
312 }
313
a20f0ba5 314 private checkValidTasksQueueOptions (
76d91ea0 315 tasksQueueOptions: TasksQueueOptions
a20f0ba5 316 ): void {
0d80593b
JB
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
f0d7f803 320 if (
b7d085c4 321 tasksQueueOptions?.concurrency != null &&
2324f8c9 322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
f0d7f803
JB
323 ) {
324 throw new TypeError(
20c6f652 325 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
326 )
327 }
328 if (
b7d085c4 329 tasksQueueOptions?.concurrency != null &&
2324f8c9 330 tasksQueueOptions.concurrency <= 0
f0d7f803 331 ) {
e695d66f 332 throw new RangeError(
2324f8c9 333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
20c6f652
JB
334 )
335 }
20c6f652 336 if (
b7d085c4 337 tasksQueueOptions?.size != null &&
2324f8c9 338 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 339 ) {
ff3f866a 340 throw new TypeError(
68dbcdc0 341 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
342 )
343 }
2324f8c9 344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 345 throw new RangeError(
2324f8c9 346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
347 )
348 }
349 }
350
08f3f44c 351 /** @inheritDoc */
6b27d407
JB
352 public get info (): PoolInfo {
353 return {
23ccf9d7 354 version,
6b27d407 355 type: this.type,
184855e6 356 worker: this.worker,
47352846 357 started: this.started,
2431bdb4
JB
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
360 minSize: this.minSize,
361 maxSize: this.maxSize,
c05f0d50
JB
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
1305e9a8
JB
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
f59e1027 369 workerNode.usage.tasks.executing === 0
a4e07f72
JB
370 ? accumulator + 1
371 : accumulator,
6b27d407
JB
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
377 0
378 ),
a4e07f72 379 executedTasks: this.workerNodes.reduce(
6b27d407 380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
f59e1027 386 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
387 0
388 ),
daf86646
JB
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
a1763c54
JB
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
68cbdc84
JB
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
a4e07f72
JB
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
f59e1027 415 accumulator + workerNode.usage.tasks.failed,
a4e07f72 416 0
1dcf8b7b
JB
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
98e72cda 421 minimum: round(
90d6701c 422 min(
98e72cda 423 ...this.workerNodes.map(
041dc05b 424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 425 )
1dcf8b7b
JB
426 )
427 ),
98e72cda 428 maximum: round(
90d6701c 429 max(
98e72cda 430 ...this.workerNodes.map(
041dc05b 431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 432 )
1dcf8b7b 433 )
98e72cda 434 ),
3baa0837
JB
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
98e72cda 444 )
dc021bcc 445 )
3baa0837 446 }),
98e72cda
JB
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
3baa0837
JB
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
98e72cda
JB
455 )
456 )
457 )
458 })
1dcf8b7b
JB
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
98e72cda 464 minimum: round(
90d6701c 465 min(
98e72cda 466 ...this.workerNodes.map(
041dc05b 467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 468 )
1dcf8b7b
JB
469 )
470 ),
98e72cda 471 maximum: round(
90d6701c 472 max(
98e72cda 473 ...this.workerNodes.map(
041dc05b 474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 475 )
1dcf8b7b 476 )
98e72cda 477 ),
3baa0837
JB
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
98e72cda 487 )
dc021bcc 488 )
3baa0837 489 }),
98e72cda
JB
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
3baa0837
JB
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
98e72cda
JB
498 )
499 )
500 )
501 })
1dcf8b7b
JB
502 }
503 })
6b27d407
JB
504 }
505 }
08f3f44c 506
aa9eede8
JB
507 /**
508 * The pool readiness boolean status.
509 */
2431bdb4
JB
510 private get ready (): boolean {
511 return (
b97d82d8
JB
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
2431bdb4
JB
519 )
520 }
521
afe0d5bf 522 /**
aa9eede8 523 * The approximate pool utilization.
afe0d5bf
JB
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
8e5ca040 528 const poolTimeCapacity =
fe7d90db 529 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
71514351 537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
538 0
539 )
8e5ca040 540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
541 }
542
8881ae32 543 /**
aa9eede8 544 * The pool type.
8881ae32
JB
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
184855e6 550 /**
aa9eede8 551 * The worker type.
184855e6
JB
552 */
553 protected abstract get worker (): WorkerType
554
c2ade475 555 /**
aa9eede8 556 * The pool minimum size.
c2ade475 557 */
8735b4e5
JB
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
ff733df7
JB
561
562 /**
aa9eede8 563 * The pool maximum size.
ff733df7 564 */
8735b4e5
JB
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
a35560ba 568
6b813701
JB
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
21f710aa 575 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
21f710aa 579 message.workerId != null &&
aad6fb64 580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
ffcbbad8 588 /**
f06e48d8 589 * Gets the given worker its worker node key.
ffcbbad8
JB
590 *
591 * @param worker - The worker.
f59e1027 592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 593 */
aad6fb64 594 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 595 return this.workerNodes.findIndex(
041dc05b 596 workerNode => workerNode.worker === worker
f06e48d8 597 )
bf9549ae
JB
598 }
599
aa9eede8
JB
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
aad6fb64 604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 605 */
72ae84a2 606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 607 return this.workerNodes.findIndex(
041dc05b 608 workerNode => workerNode.info.id === workerId
aad6fb64 609 )
aa9eede8
JB
610 }
611
afc003b2 612 /** @inheritDoc */
a35560ba 613 public setWorkerChoiceStrategy (
59219cbb
JB
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 616 ): void {
aee46736 617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 618 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
aa9eede8 625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 626 workerNode.resetUsage()
9edb9717 627 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 628 }
a20f0ba5
JB
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
0d80593b 635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
a20f0ba5
JB
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
a35560ba
S
642 )
643 }
644
a20f0ba5 645 /** @inheritDoc */
8f52842f
JB
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
a20f0ba5 650 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
ef41a6e6 653 this.flushTasksQueues()
a20f0ba5
JB
654 }
655 this.opts.enableTasksQueue = enable
8f52842f 656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
657 }
658
659 /** @inheritDoc */
8f52842f 660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 661 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
5baee0d7 676 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
9b38ab2d
JB
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
5b49e864 695 private setTasksQueueSize (size: number): void {
20c6f652 696 for (const workerNode of this.workerNodes) {
ff3f866a 697 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
698 }
699 }
700
d6ca1416
JB
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
c319c66b
JB
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
dea903a8
JB
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
c2ade475 735
c319c66b
JB
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
7c0ba920 742
6c6afb84 743 /**
3d76750a 744 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
c2ade475 748 protected internalBusy (): boolean {
3d76750a
JB
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
041dc05b 752 workerNode =>
3d76750a
JB
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
3d76750a 758 }
419e8121
JB
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
cb70b19d
JB
765 }
766
e81c38f2 767 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
72ae84a2 771 return await new Promise<boolean>((resolve, reject) => {
adee6053 772 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2
JB
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
b0b55f57 785 `Task function operation '${
72ae84a2 786 message.taskFunctionOperation as string
b0b55f57
JB
787 }' failed on worker ${message.workerId} with error: '${
788 message.workerError?.message as string
789 }'`
72ae84a2
JB
790 )
791 )
792 }
793 })
794 this.sendToWorker(workerNodeKey, message)
795 })
796 }
797
798 private async sendTaskFunctionOperationToWorkers (
adee6053 799 message: MessageValue<Data>
e81c38f2
JB
800 ): Promise<boolean> {
801 return await new Promise<boolean>((resolve, reject) => {
802 const responsesReceived = new Array<MessageValue<Data | Response>>()
803 for (const [workerNodeKey] of this.workerNodes.entries()) {
804 this.registerWorkerMessageListener(workerNodeKey, message => {
805 if (message.taskFunctionOperationStatus != null) {
806 responsesReceived.push(message)
807 if (
808 responsesReceived.length === this.workerNodes.length &&
809 responsesReceived.every(
810 message => message.taskFunctionOperationStatus === true
811 )
812 ) {
813 resolve(true)
814 } else if (
815 responsesReceived.length === this.workerNodes.length &&
816 responsesReceived.some(
817 message => message.taskFunctionOperationStatus === false
818 )
819 ) {
b0b55f57
JB
820 const errorResponse = responsesReceived.find(
821 response => response.taskFunctionOperationStatus === false
822 )
e81c38f2
JB
823 reject(
824 new Error(
b0b55f57 825 `Task function operation '${
e81c38f2 826 message.taskFunctionOperation as string
b0b55f57
JB
827 }' failed on worker ${
828 errorResponse?.workerId as number
829 } with error: '${
830 errorResponse?.workerError?.message as string
831 }'`
e81c38f2
JB
832 )
833 )
834 }
835 }
836 })
72ae84a2 837 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
838 }
839 })
6703b9f4
JB
840 }
841
842 /** @inheritDoc */
843 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
844 for (const workerNode of this.workerNodes) {
845 if (
846 Array.isArray(workerNode.info.taskFunctionNames) &&
847 workerNode.info.taskFunctionNames.includes(name)
848 ) {
849 return true
850 }
851 }
852 return false
6703b9f4
JB
853 }
854
855 /** @inheritDoc */
e81c38f2
JB
856 public async addTaskFunction (
857 name: string,
3feeab69 858 fn: TaskFunction<Data, Response>
e81c38f2 859 ): Promise<boolean> {
3feeab69
JB
860 if (typeof name !== 'string') {
861 throw new TypeError('name argument must be a string')
862 }
863 if (typeof name === 'string' && name.trim().length === 0) {
864 throw new TypeError('name argument must not be an empty string')
865 }
866 if (typeof fn !== 'function') {
867 throw new TypeError('fn argument must be a function')
868 }
adee6053 869 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
870 taskFunctionOperation: 'add',
871 taskFunctionName: name,
3feeab69 872 taskFunction: fn.toString()
6703b9f4 873 })
adee6053
JB
874 this.taskFunctions.set(name, fn)
875 return opResult
6703b9f4
JB
876 }
877
878 /** @inheritDoc */
e81c38f2 879 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
880 if (!this.taskFunctions.has(name)) {
881 throw new Error(
16248b23 882 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
883 )
884 }
adee6053 885 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
886 taskFunctionOperation: 'remove',
887 taskFunctionName: name
888 })
adee6053
JB
889 this.deleteTaskFunctionWorkerUsages(name)
890 this.taskFunctions.delete(name)
891 return opResult
6703b9f4
JB
892 }
893
90d7d101 894 /** @inheritDoc */
6703b9f4 895 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
896 for (const workerNode of this.workerNodes) {
897 if (
6703b9f4
JB
898 Array.isArray(workerNode.info.taskFunctionNames) &&
899 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 900 ) {
6703b9f4 901 return workerNode.info.taskFunctionNames
f2dbbf95 902 }
90d7d101 903 }
f2dbbf95 904 return []
90d7d101
JB
905 }
906
6703b9f4 907 /** @inheritDoc */
e81c38f2 908 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 909 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
910 taskFunctionOperation: 'default',
911 taskFunctionName: name
912 })
6703b9f4
JB
913 }
914
adee6053
JB
915 private deleteTaskFunctionWorkerUsages (name: string): void {
916 for (const workerNode of this.workerNodes) {
917 workerNode.deleteTaskFunctionWorkerUsage(name)
918 }
919 }
920
375f7504
JB
921 private shallExecuteTask (workerNodeKey: number): boolean {
922 return (
923 this.tasksQueueSize(workerNodeKey) === 0 &&
924 this.workerNodes[workerNodeKey].usage.tasks.executing <
925 (this.opts.tasksQueueOptions?.concurrency as number)
926 )
927 }
928
afc003b2 929 /** @inheritDoc */
7d91a8cd
JB
930 public async execute (
931 data?: Data,
932 name?: string,
933 transferList?: TransferListItem[]
934 ): Promise<Response> {
52b71763 935 return await new Promise<Response>((resolve, reject) => {
15b176e0 936 if (!this.started) {
47352846 937 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 938 return
15b176e0 939 }
7d91a8cd
JB
940 if (name != null && typeof name !== 'string') {
941 reject(new TypeError('name argument must be a string'))
9d2d0da1 942 return
7d91a8cd 943 }
90d7d101
JB
944 if (
945 name != null &&
946 typeof name === 'string' &&
947 name.trim().length === 0
948 ) {
f58b60b9 949 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 950 return
90d7d101 951 }
b558f6b5
JB
952 if (transferList != null && !Array.isArray(transferList)) {
953 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 954 return
b558f6b5
JB
955 }
956 const timestamp = performance.now()
957 const workerNodeKey = this.chooseWorkerNode()
501aea93 958 const task: Task<Data> = {
52b71763
JB
959 name: name ?? DEFAULT_TASK_NAME,
960 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
961 data: data ?? ({} as Data),
7d91a8cd 962 transferList,
52b71763 963 timestamp,
7629bdf1 964 taskId: randomUUID()
52b71763 965 }
7629bdf1 966 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
967 resolve,
968 reject,
501aea93 969 workerNodeKey
2e81254d 970 })
52b71763 971 if (
4e377863
JB
972 this.opts.enableTasksQueue === false ||
973 (this.opts.enableTasksQueue === true &&
375f7504 974 this.shallExecuteTask(workerNodeKey))
52b71763 975 ) {
501aea93 976 this.executeTask(workerNodeKey, task)
4e377863
JB
977 } else {
978 this.enqueueTask(workerNodeKey, task)
52b71763 979 }
2e81254d 980 })
280c2a77 981 }
c97c7edb 982
47352846
JB
983 /** @inheritdoc */
984 public start (): void {
985 this.starting = true
986 while (
987 this.workerNodes.reduce(
988 (accumulator, workerNode) =>
989 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
990 0
991 ) < this.numberOfWorkers
992 ) {
993 this.createAndSetupWorkerNode()
994 }
995 this.starting = false
996 this.started = true
997 }
998
afc003b2 999 /** @inheritDoc */
c97c7edb 1000 public async destroy (): Promise<void> {
1fbcaa7c 1001 await Promise.all(
81c02522 1002 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 1003 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
1004 })
1005 )
33e6bb4c 1006 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 1007 this.started = false
c97c7edb
S
1008 }
1009
1e3214b6 1010 protected async sendKillMessageToWorker (
72ae84a2 1011 workerNodeKey: number
1e3214b6 1012 ): Promise<void> {
9edb9717 1013 await new Promise<void>((resolve, reject) => {
041dc05b 1014 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
1015 if (message.kill === 'success') {
1016 resolve()
1017 } else if (message.kill === 'failure') {
72ae84a2
JB
1018 reject(
1019 new Error(
1020 `Worker ${
1021 message.workerId as number
1022 } kill message handling failed`
1023 )
1024 )
1e3214b6
JB
1025 }
1026 })
72ae84a2 1027 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1028 })
1e3214b6
JB
1029 }
1030
4a6952ff 1031 /**
aa9eede8 1032 * Terminates the worker node given its worker node key.
4a6952ff 1033 *
aa9eede8 1034 * @param workerNodeKey - The worker node key.
4a6952ff 1035 */
81c02522 1036 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1037
729c563d 1038 /**
6677a3d3
JB
1039 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1040 * Can be overridden.
afc003b2
JB
1041 *
1042 * @virtual
729c563d 1043 */
280c2a77 1044 protected setupHook (): void {
965df41c 1045 /* Intentionally empty */
280c2a77 1046 }
c97c7edb 1047
729c563d 1048 /**
280c2a77
S
1049 * Should return whether the worker is the main worker or not.
1050 */
1051 protected abstract isMain (): boolean
1052
1053 /**
2e81254d 1054 * Hook executed before the worker task execution.
bf9549ae 1055 * Can be overridden.
729c563d 1056 *
f06e48d8 1057 * @param workerNodeKey - The worker node key.
1c6fe997 1058 * @param task - The task to execute.
729c563d 1059 */
1c6fe997
JB
1060 protected beforeTaskExecutionHook (
1061 workerNodeKey: number,
1062 task: Task<Data>
1063 ): void {
94407def
JB
1064 if (this.workerNodes[workerNodeKey]?.usage != null) {
1065 const workerUsage = this.workerNodes[workerNodeKey].usage
1066 ++workerUsage.tasks.executing
1067 this.updateWaitTimeWorkerUsage(workerUsage, task)
1068 }
1069 if (
1070 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1071 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1072 task.name as string
1073 ) != null
1074 ) {
db0e38ee 1075 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1076 workerNodeKey
db0e38ee 1077 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1078 ++taskFunctionWorkerUsage.tasks.executing
1079 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1080 }
c97c7edb
S
1081 }
1082
c01733f1 1083 /**
2e81254d 1084 * Hook executed after the worker task execution.
bf9549ae 1085 * Can be overridden.
c01733f1 1086 *
501aea93 1087 * @param workerNodeKey - The worker node key.
38e795c1 1088 * @param message - The received message.
c01733f1 1089 */
2e81254d 1090 protected afterTaskExecutionHook (
501aea93 1091 workerNodeKey: number,
2740a743 1092 message: MessageValue<Response>
bf9549ae 1093 ): void {
94407def
JB
1094 if (this.workerNodes[workerNodeKey]?.usage != null) {
1095 const workerUsage = this.workerNodes[workerNodeKey].usage
1096 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1097 this.updateRunTimeWorkerUsage(workerUsage, message)
1098 this.updateEluWorkerUsage(workerUsage, message)
1099 }
1100 if (
1101 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1102 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1103 message.taskPerformance?.name as string
94407def
JB
1104 ) != null
1105 ) {
db0e38ee 1106 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1107 workerNodeKey
db0e38ee 1108 ].getTaskFunctionWorkerUsage(
0628755c 1109 message.taskPerformance?.name as string
b558f6b5 1110 ) as WorkerUsage
db0e38ee
JB
1111 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1112 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1113 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1114 }
1115 }
1116
db0e38ee
JB
1117 /**
1118 * Whether the worker node shall update its task function worker usage or not.
1119 *
1120 * @param workerNodeKey - The worker node key.
1121 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1122 */
1123 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1124 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1125 return (
94407def 1126 workerInfo != null &&
6703b9f4
JB
1127 Array.isArray(workerInfo.taskFunctionNames) &&
1128 workerInfo.taskFunctionNames.length > 2
b558f6b5 1129 )
f1c06930
JB
1130 }
1131
1132 private updateTaskStatisticsWorkerUsage (
1133 workerUsage: WorkerUsage,
1134 message: MessageValue<Response>
1135 ): void {
a4e07f72 1136 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1137 if (
1138 workerTaskStatistics.executing != null &&
1139 workerTaskStatistics.executing > 0
1140 ) {
1141 --workerTaskStatistics.executing
5bb5be17 1142 }
6703b9f4 1143 if (message.workerError == null) {
98e72cda
JB
1144 ++workerTaskStatistics.executed
1145 } else {
a4e07f72 1146 ++workerTaskStatistics.failed
2740a743 1147 }
f8eb0a2a
JB
1148 }
1149
a4e07f72
JB
1150 private updateRunTimeWorkerUsage (
1151 workerUsage: WorkerUsage,
f8eb0a2a
JB
1152 message: MessageValue<Response>
1153 ): void {
6703b9f4 1154 if (message.workerError != null) {
dc021bcc
JB
1155 return
1156 }
e4f20deb
JB
1157 updateMeasurementStatistics(
1158 workerUsage.runTime,
1159 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1160 message.taskPerformance?.runTime ?? 0
e4f20deb 1161 )
f8eb0a2a
JB
1162 }
1163
a4e07f72
JB
1164 private updateWaitTimeWorkerUsage (
1165 workerUsage: WorkerUsage,
1c6fe997 1166 task: Task<Data>
f8eb0a2a 1167 ): void {
1c6fe997
JB
1168 const timestamp = performance.now()
1169 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1170 updateMeasurementStatistics(
1171 workerUsage.waitTime,
1172 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1173 taskWaitTime
e4f20deb 1174 )
c01733f1 1175 }
1176
a4e07f72 1177 private updateEluWorkerUsage (
5df69fab 1178 workerUsage: WorkerUsage,
62c15a68
JB
1179 message: MessageValue<Response>
1180 ): void {
6703b9f4 1181 if (message.workerError != null) {
dc021bcc
JB
1182 return
1183 }
008512c7
JB
1184 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1185 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1186 updateMeasurementStatistics(
1187 workerUsage.elu.active,
008512c7 1188 eluTaskStatisticsRequirements,
dc021bcc 1189 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1190 )
1191 updateMeasurementStatistics(
1192 workerUsage.elu.idle,
008512c7 1193 eluTaskStatisticsRequirements,
dc021bcc 1194 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1195 )
008512c7 1196 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1197 if (message.taskPerformance?.elu != null) {
f7510105
JB
1198 if (workerUsage.elu.utilization != null) {
1199 workerUsage.elu.utilization =
1200 (workerUsage.elu.utilization +
1201 message.taskPerformance.elu.utilization) /
1202 2
1203 } else {
1204 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1205 }
62c15a68
JB
1206 }
1207 }
1208 }
1209
280c2a77 1210 /**
f06e48d8 1211 * Chooses a worker node for the next task.
280c2a77 1212 *
6c6afb84 1213 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1214 *
aa9eede8 1215 * @returns The chosen worker node key
280c2a77 1216 */
6c6afb84 1217 private chooseWorkerNode (): number {
930dcf12 1218 if (this.shallCreateDynamicWorker()) {
aa9eede8 1219 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1220 if (
b1aae695 1221 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1222 ) {
aa9eede8 1223 return workerNodeKey
6c6afb84 1224 }
17393ac8 1225 }
930dcf12
JB
1226 return this.workerChoiceStrategyContext.execute()
1227 }
1228
6c6afb84
JB
1229 /**
1230 * Conditions for dynamic worker creation.
1231 *
1232 * @returns Whether to create a dynamic worker or not.
1233 */
1234 private shallCreateDynamicWorker (): boolean {
930dcf12 1235 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1236 }
1237
280c2a77 1238 /**
aa9eede8 1239 * Sends a message to worker given its worker node key.
280c2a77 1240 *
aa9eede8 1241 * @param workerNodeKey - The worker node key.
38e795c1 1242 * @param message - The message.
7d91a8cd 1243 * @param transferList - The optional array of transferable objects.
280c2a77
S
1244 */
1245 protected abstract sendToWorker (
aa9eede8 1246 workerNodeKey: number,
7d91a8cd
JB
1247 message: MessageValue<Data>,
1248 transferList?: TransferListItem[]
280c2a77
S
1249 ): void
1250
729c563d 1251 /**
41344292 1252 * Creates a new worker.
6c6afb84
JB
1253 *
1254 * @returns Newly created worker.
729c563d 1255 */
280c2a77 1256 protected abstract createWorker (): Worker
c97c7edb 1257
4a6952ff 1258 /**
aa9eede8 1259 * Creates a new, completely set up worker node.
4a6952ff 1260 *
aa9eede8 1261 * @returns New, completely set up worker node key.
4a6952ff 1262 */
aa9eede8 1263 protected createAndSetupWorkerNode (): number {
bdacc2d2 1264 const worker = this.createWorker()
280c2a77 1265
fd04474e 1266 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1267 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1268 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1269 worker.on('error', error => {
aad6fb64 1270 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1271 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1272 workerInfo.ready = false
0dc838e3 1273 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1274 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1275 if (
b6bfca01 1276 this.started &&
9b38ab2d
JB
1277 !this.starting &&
1278 this.opts.restartWorkerOnError === true
15b176e0 1279 ) {
9b106837 1280 if (workerInfo.dynamic) {
aa9eede8 1281 this.createAndSetupDynamicWorkerNode()
8a1260a3 1282 } else {
aa9eede8 1283 this.createAndSetupWorkerNode()
8a1260a3 1284 }
5baee0d7 1285 }
9b38ab2d 1286 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1287 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1288 }
5baee0d7 1289 })
a35560ba 1290 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1291 worker.once('exit', () => {
f06e48d8 1292 this.removeWorkerNode(worker)
a974afa6 1293 })
280c2a77 1294
aa9eede8 1295 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1296
aa9eede8 1297 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1298
aa9eede8 1299 return workerNodeKey
c97c7edb 1300 }
be0676b3 1301
930dcf12 1302 /**
aa9eede8 1303 * Creates a new, completely set up dynamic worker node.
930dcf12 1304 *
aa9eede8 1305 * @returns New, completely set up dynamic worker node key.
930dcf12 1306 */
aa9eede8
JB
1307 protected createAndSetupDynamicWorkerNode (): number {
1308 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1309 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1310 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1311 message.workerId
aad6fb64 1312 )
aa9eede8 1313 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1314 // Kill message received from worker
930dcf12
JB
1315 if (
1316 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1317 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1318 ((this.opts.enableTasksQueue === false &&
aa9eede8 1319 workerUsage.tasks.executing === 0) ||
7b56f532 1320 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1321 workerUsage.tasks.executing === 0 &&
1322 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1323 ) {
041dc05b 1324 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1325 this.emitter?.emit(PoolEvents.error, error)
1326 })
930dcf12
JB
1327 }
1328 })
46b0bb09 1329 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1330 this.sendToWorker(workerNodeKey, {
e9dd5b66 1331 checkActive: true
21f710aa 1332 })
72ae84a2
JB
1333 if (this.taskFunctions.size > 0) {
1334 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1335 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1336 taskFunctionOperation: 'add',
1337 taskFunctionName,
1338 taskFunction: taskFunction.toString()
1339 }).catch(error => {
1340 this.emitter?.emit(PoolEvents.error, error)
1341 })
1342 }
1343 }
b5e113f6 1344 workerInfo.dynamic = true
b1aae695
JB
1345 if (
1346 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1347 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1348 ) {
b5e113f6
JB
1349 workerInfo.ready = true
1350 }
33e6bb4c 1351 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1352 return workerNodeKey
930dcf12
JB
1353 }
1354
a2ed5053 1355 /**
aa9eede8 1356 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1357 *
aa9eede8 1358 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1359 * @param listener - The message listener callback.
1360 */
85aeb3f3
JB
1361 protected abstract registerWorkerMessageListener<
1362 Message extends Data | Response
aa9eede8
JB
1363 >(
1364 workerNodeKey: number,
1365 listener: (message: MessageValue<Message>) => void
1366 ): void
a2ed5053
JB
1367
1368 /**
aa9eede8 1369 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1370 * Can be overridden.
1371 *
aa9eede8 1372 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1373 */
aa9eede8 1374 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1375 // Listen to worker messages.
aa9eede8 1376 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1377 // Send the startup message to worker.
aa9eede8 1378 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1379 // Send the statistics message to worker.
1380 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1381 if (this.opts.enableTasksQueue === true) {
dbd73092 1382 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1383 this.workerNodes[workerNodeKey].onEmptyQueue =
1384 this.taskStealingOnEmptyQueue.bind(this)
1385 }
1386 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1387 this.workerNodes[workerNodeKey].onBackPressure =
1388 this.tasksStealingOnBackPressure.bind(this)
1389 }
72695f86 1390 }
d2c73f82
JB
1391 }
1392
85aeb3f3 1393 /**
aa9eede8
JB
1394 * Sends the startup message to worker given its worker node key.
1395 *
1396 * @param workerNodeKey - The worker node key.
1397 */
1398 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1399
1400 /**
9edb9717 1401 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1402 *
aa9eede8 1403 * @param workerNodeKey - The worker node key.
85aeb3f3 1404 */
9edb9717 1405 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1406 this.sendToWorker(workerNodeKey, {
1407 statistics: {
1408 runTime:
1409 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1410 .runTime.aggregate,
1411 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1412 .elu.aggregate
72ae84a2 1413 }
aa9eede8
JB
1414 })
1415 }
a2ed5053
JB
1416
1417 private redistributeQueuedTasks (workerNodeKey: number): void {
1418 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1419 const destinationWorkerNodeKey = this.workerNodes.reduce(
1420 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1421 return workerNode.info.ready &&
1422 workerNode.usage.tasks.queued <
1423 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1424 ? workerNodeKey
1425 : minWorkerNodeKey
1426 },
1427 0
1428 )
72ae84a2 1429 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1430 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1431 this.executeTask(destinationWorkerNodeKey, task)
1432 } else {
1433 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1434 }
1435 }
1436 }
1437
b1838604
JB
1438 private updateTaskStolenStatisticsWorkerUsage (
1439 workerNodeKey: number,
b1838604
JB
1440 taskName: string
1441 ): void {
1a880eca 1442 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1443 if (workerNode?.usage != null) {
1444 ++workerNode.usage.tasks.stolen
1445 }
1446 if (
1447 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1448 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1449 ) {
1450 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1451 taskName
1452 ) as WorkerUsage
1453 ++taskFunctionWorkerUsage.tasks.stolen
1454 }
1455 }
1456
dd951876 1457 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1458 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1459 const workerNodes = this.workerNodes
a6b3272b 1460 .slice()
dd951876
JB
1461 .sort(
1462 (workerNodeA, workerNodeB) =>
1463 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1464 )
f201a0cd 1465 const sourceWorkerNode = workerNodes.find(
041dc05b 1466 workerNode =>
f201a0cd
JB
1467 workerNode.info.ready &&
1468 workerNode.info.id !== workerId &&
1469 workerNode.usage.tasks.queued > 0
1470 )
1471 if (sourceWorkerNode != null) {
72ae84a2 1472 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1473 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1474 this.executeTask(destinationWorkerNodeKey, task)
1475 } else {
1476 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1477 }
f201a0cd
JB
1478 this.updateTaskStolenStatisticsWorkerUsage(
1479 destinationWorkerNodeKey,
1480 task.name as string
1481 )
72695f86
JB
1482 }
1483 }
1484
1485 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1486 const sizeOffset = 1
1487 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1488 return
1489 }
72695f86
JB
1490 const sourceWorkerNode =
1491 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1492 const workerNodes = this.workerNodes
a6b3272b 1493 .slice()
72695f86
JB
1494 .sort(
1495 (workerNodeA, workerNodeB) =>
1496 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1497 )
1498 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1499 if (
0bc68267 1500 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1501 workerNode.info.ready &&
1502 workerNode.info.id !== workerId &&
0bc68267 1503 workerNode.usage.tasks.queued <
f778c355 1504 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1505 ) {
72ae84a2 1506 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1507 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1508 this.executeTask(workerNodeKey, task)
4de3d785 1509 } else {
dd951876 1510 this.enqueueTask(workerNodeKey, task)
4de3d785 1511 }
b1838604
JB
1512 this.updateTaskStolenStatisticsWorkerUsage(
1513 workerNodeKey,
b1838604
JB
1514 task.name as string
1515 )
10ecf8fd 1516 }
a2ed5053
JB
1517 }
1518 }
1519
be0676b3 1520 /**
aa9eede8 1521 * This method is the listener registered for each worker message.
be0676b3 1522 *
bdacc2d2 1523 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1524 */
1525 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1526 return message => {
21f710aa 1527 this.checkMessageWorkerId(message)
6703b9f4 1528 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1529 // Worker ready response received from worker
10e2aa7e 1530 this.handleWorkerReadyResponse(message)
7629bdf1 1531 } else if (message.taskId != null) {
81c02522 1532 // Task execution response received from worker
6b272951 1533 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1534 } else if (message.taskFunctionNames != null) {
1535 // Task function names message received from worker
46b0bb09
JB
1536 this.getWorkerInfo(
1537 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1538 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1539 }
1540 }
1541 }
1542
10e2aa7e 1543 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1544 if (message.ready === false) {
72ae84a2
JB
1545 throw new Error(
1546 `Worker ${message.workerId as number} failed to initialize`
1547 )
f05ed93c 1548 }
a5d15204 1549 const workerInfo = this.getWorkerInfo(
aad6fb64 1550 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1551 )
a5d15204 1552 workerInfo.ready = message.ready as boolean
6703b9f4 1553 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1554 if (this.ready) {
1555 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1556 }
6b272951
JB
1557 }
1558
1559 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1560 const { taskId, workerError, data } = message
5441aea6 1561 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1562 if (promiseResponse != null) {
6703b9f4
JB
1563 if (workerError != null) {
1564 this.emitter?.emit(PoolEvents.taskError, workerError)
1565 promiseResponse.reject(workerError.message)
6b272951 1566 } else {
5441aea6 1567 promiseResponse.resolve(data as Response)
6b272951 1568 }
501aea93
JB
1569 const workerNodeKey = promiseResponse.workerNodeKey
1570 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1571 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1572 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1573 if (
1574 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1575 this.tasksQueueSize(workerNodeKey) > 0 &&
1576 this.workerNodes[workerNodeKey].usage.tasks.executing <
1577 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1578 ) {
1579 this.executeTask(
1580 workerNodeKey,
1581 this.dequeueTask(workerNodeKey) as Task<Data>
1582 )
be0676b3
APA
1583 }
1584 }
be0676b3 1585 }
7c0ba920 1586
a1763c54 1587 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1588 if (this.busy) {
1589 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1590 }
1591 }
1592
1593 private checkAndEmitTaskQueuingEvents (): void {
1594 if (this.hasBackPressure()) {
1595 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1596 }
1597 }
1598
33e6bb4c
JB
1599 private checkAndEmitDynamicWorkerCreationEvents (): void {
1600 if (this.type === PoolTypes.dynamic) {
1601 if (this.full) {
1602 this.emitter?.emit(PoolEvents.full, this.info)
1603 }
1604 }
1605 }
1606
8a1260a3 1607 /**
aa9eede8 1608 * Gets the worker information given its worker node key.
8a1260a3
JB
1609 *
1610 * @param workerNodeKey - The worker node key.
3f09ed9f 1611 * @returns The worker information.
8a1260a3 1612 */
46b0bb09
JB
1613 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1614 return this.workerNodes[workerNodeKey].info
e221309a
JB
1615 }
1616
a05c10de 1617 /**
b0a4db63 1618 * Adds the given worker in the pool worker nodes.
ea7a90d3 1619 *
38e795c1 1620 * @param worker - The worker.
aa9eede8
JB
1621 * @returns The added worker node key.
1622 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1623 */
b0a4db63 1624 private addWorkerNode (worker: Worker): number {
671d5154
JB
1625 const workerNode = new WorkerNode<Worker, Data>(
1626 worker,
ff3f866a 1627 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1628 )
b97d82d8 1629 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1630 if (this.starting) {
1631 workerNode.info.ready = true
1632 }
aa9eede8 1633 this.workerNodes.push(workerNode)
aad6fb64 1634 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1635 if (workerNodeKey === -1) {
86ed0598 1636 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1637 }
1638 return workerNodeKey
ea7a90d3 1639 }
c923ce56 1640
51fe3d3c 1641 /**
f06e48d8 1642 * Removes the given worker from the pool worker nodes.
51fe3d3c 1643 *
f06e48d8 1644 * @param worker - The worker.
51fe3d3c 1645 */
416fd65c 1646 private removeWorkerNode (worker: Worker): void {
aad6fb64 1647 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1648 if (workerNodeKey !== -1) {
1649 this.workerNodes.splice(workerNodeKey, 1)
1650 this.workerChoiceStrategyContext.remove(workerNodeKey)
1651 }
51fe3d3c 1652 }
adc3c320 1653
e2b31e32
JB
1654 /** @inheritDoc */
1655 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1656 return (
e2b31e32
JB
1657 this.opts.enableTasksQueue === true &&
1658 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1659 )
1660 }
1661
1662 private hasBackPressure (): boolean {
1663 return (
1664 this.opts.enableTasksQueue === true &&
1665 this.workerNodes.findIndex(
041dc05b 1666 workerNode => !workerNode.hasBackPressure()
a1763c54 1667 ) === -1
9e844245 1668 )
e2b31e32
JB
1669 }
1670
b0a4db63 1671 /**
aa9eede8 1672 * Executes the given task on the worker given its worker node key.
b0a4db63 1673 *
aa9eede8 1674 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1675 * @param task - The task to execute.
1676 */
2e81254d 1677 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1678 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1679 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1680 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1681 }
1682
f9f00b5f 1683 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1684 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1685 this.checkAndEmitTaskQueuingEvents()
1686 return tasksQueueSize
adc3c320
JB
1687 }
1688
416fd65c 1689 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1690 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1691 }
1692
416fd65c 1693 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1694 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1695 }
1696
81c02522 1697 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1698 while (this.tasksQueueSize(workerNodeKey) > 0) {
1699 this.executeTask(
1700 workerNodeKey,
1701 this.dequeueTask(workerNodeKey) as Task<Data>
1702 )
ff733df7 1703 }
4b628b48 1704 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1705 }
1706
ef41a6e6
JB
1707 private flushTasksQueues (): void {
1708 for (const [workerNodeKey] of this.workerNodes.entries()) {
1709 this.flushTasksQueue(workerNodeKey)
1710 }
1711 }
c97c7edb 1712}