test: add pool creation and destroy test
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
bfc75cca 20 round
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
6703b9f4 23import type { TaskFunction } from '../worker/task-functions'
c4855468 24import {
65d7a1c9 25 type IPool,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
4d159167
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
9f95d5eb 37 WorkerNodeEventDetail,
4d159167
JB
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
bde6b5d7
JB
51import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
bfc75cca
JB
54 checkValidWorkerChoiceStrategy,
55 updateMeasurementStatistics
bde6b5d7 56} from './utils'
23ccf9d7 57
729c563d 58/**
ea7a90d3 59 * Base class that implements some shared logic for all poolifier pools.
729c563d 60 *
38e795c1 61 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 64 */
c97c7edb 65export abstract class AbstractPool<
f06e48d8 66 Worker extends IWorker,
d3c8a1a8
S
67 Data = unknown,
68 Response = unknown
c4855468 69> implements IPool<Worker, Data, Response> {
afc003b2 70 /** @inheritDoc */
4b628b48 71 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 72
afc003b2 73 /** @inheritDoc */
f80125ca 74 public emitter?: EventEmitterAsyncResource
7c0ba920 75
fcd39179
JB
76 /**
77 * Dynamic pool maximum size property placeholder.
78 */
79 protected readonly max?: number
80
be0676b3 81 /**
419e8121 82 * The task execution response promise map:
2740a743 83 * - `key`: The message id of each submitted task.
a3445496 84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 85 *
a3445496 86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 87 */
501aea93
JB
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 90
a35560ba 91 /**
51fe3d3c 92 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
93 */
94 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
95 Worker,
96 Data,
97 Response
a35560ba
S
98 >
99
72ae84a2
JB
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
15b176e0
JB
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
47352846
JB
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
afe0d5bf
JB
115 /**
116 * The start timestamp of the pool.
117 */
118 private readonly startTimestamp
119
729c563d
S
120 /**
121 * Constructs a new poolifier pool.
122 *
38e795c1 123 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 124 * @param filePath - Path to the worker file.
38e795c1 125 * @param opts - Options for the pool.
729c563d 126 */
c97c7edb 127 public constructor (
b4213b7f
JB
128 protected readonly numberOfWorkers: number,
129 protected readonly filePath: string,
130 protected readonly opts: PoolOptions<Worker>
c97c7edb 131 ) {
78cea37e 132 if (!this.isMain()) {
04f45163 133 throw new Error(
8c6d4acf 134 'Cannot start a pool from a worker with the same type as the pool'
04f45163 135 )
c97c7edb 136 }
bde6b5d7 137 checkFilePath(this.filePath)
8003c026 138 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 139 this.checkPoolOptions(this.opts)
1086026a 140
7254e419
JB
141 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
142 this.executeTask = this.executeTask.bind(this)
143 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 144
6bd72cd0 145 if (this.opts.enableEvents === true) {
b5604034 146 this.initializeEventEmitter()
7c0ba920 147 }
d59df138
JB
148 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
149 Worker,
150 Data,
151 Response
da309861
JB
152 >(
153 this,
154 this.opts.workerChoiceStrategy,
155 this.opts.workerChoiceStrategyOptions
156 )
b6b32453
JB
157
158 this.setupHook()
159
72ae84a2
JB
160 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
161
47352846 162 this.started = false
075e51d1 163 this.starting = false
47352846
JB
164 if (this.opts.startWorkers === true) {
165 this.start()
166 }
afe0d5bf
JB
167
168 this.startTimestamp = performance.now()
c97c7edb
S
169 }
170
8d3782fa
JB
171 private checkNumberOfWorkers (numberOfWorkers: number): void {
172 if (numberOfWorkers == null) {
173 throw new Error(
174 'Cannot instantiate a pool without specifying the number of workers'
175 )
78cea37e 176 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 177 throw new TypeError(
0d80593b 178 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
179 )
180 } else if (numberOfWorkers < 0) {
473c717a 181 throw new RangeError(
8d3782fa
JB
182 'Cannot instantiate a pool with a negative number of workers'
183 )
6b27d407 184 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
185 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
186 }
187 }
188
7c0ba920 189 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 190 if (isPlainObject(opts)) {
47352846 191 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 192 checkValidWorkerChoiceStrategy(
2324f8c9
JB
193 opts.workerChoiceStrategy as WorkerChoiceStrategy
194 )
0d80593b
JB
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
197 this.checkValidWorkerChoiceStrategyOptions(
198 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
199 )
8990357d
JB
200 this.opts.workerChoiceStrategyOptions = {
201 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
202 ...opts.workerChoiceStrategyOptions
203 }
1f68cede 204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
bde6b5d7 208 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
209 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
210 opts.tasksQueueOptions as TasksQueueOptions
211 )
212 }
213 } else {
214 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 215 }
aee46736
JB
216 }
217
0d80593b
JB
218 private checkValidWorkerChoiceStrategyOptions (
219 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
220 ): void {
2324f8c9
JB
221 if (
222 workerChoiceStrategyOptions != null &&
223 !isPlainObject(workerChoiceStrategyOptions)
224 ) {
0d80593b
JB
225 throw new TypeError(
226 'Invalid worker choice strategy options: must be a plain object'
227 )
228 }
8990357d 229 if (
2324f8c9 230 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 231 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
232 ) {
233 throw new TypeError(
8c0b113f 234 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
235 )
236 }
237 if (
2324f8c9 238 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 239 workerChoiceStrategyOptions.retries < 0
8990357d
JB
240 ) {
241 throw new RangeError(
8c0b113f 242 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
243 )
244 }
49be33fe 245 if (
2324f8c9 246 workerChoiceStrategyOptions?.weights != null &&
6b27d407 247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
f0d7f803 253 if (
2324f8c9 254 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
0d80593b
JB
263 }
264
b5604034 265 private initializeEventEmitter (): void {
5b7d00b4 266 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
267 name: `poolifier:${this.type}-${this.worker}-pool`
268 })
269 }
270
08f3f44c 271 /** @inheritDoc */
6b27d407
JB
272 public get info (): PoolInfo {
273 return {
23ccf9d7 274 version,
6b27d407 275 type: this.type,
184855e6 276 worker: this.worker,
47352846 277 started: this.started,
2431bdb4
JB
278 ready: this.ready,
279 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
280 minSize: this.minSize,
281 maxSize: this.maxSize,
c05f0d50
JB
282 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
283 .runTime.aggregate &&
1305e9a8
JB
284 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
285 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
286 workerNodes: this.workerNodes.length,
287 idleWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
f59e1027 289 workerNode.usage.tasks.executing === 0
a4e07f72
JB
290 ? accumulator + 1
291 : accumulator,
6b27d407
JB
292 0
293 ),
294 busyWorkerNodes: this.workerNodes.reduce(
295 (accumulator, workerNode) =>
f59e1027 296 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
297 0
298 ),
a4e07f72 299 executedTasks: this.workerNodes.reduce(
6b27d407 300 (accumulator, workerNode) =>
f59e1027 301 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
302 0
303 ),
304 executingTasks: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
f59e1027 306 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
307 0
308 ),
daf86646
JB
309 ...(this.opts.enableTasksQueue === true && {
310 queuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + workerNode.usage.tasks.queued,
313 0
314 )
315 }),
316 ...(this.opts.enableTasksQueue === true && {
317 maxQueuedTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
320 0
321 )
322 }),
a1763c54
JB
323 ...(this.opts.enableTasksQueue === true && {
324 backPressure: this.hasBackPressure()
325 }),
68cbdc84
JB
326 ...(this.opts.enableTasksQueue === true && {
327 stolenTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.stolen,
330 0
331 )
332 }),
a4e07f72
JB
333 failedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
f59e1027 335 accumulator + workerNode.usage.tasks.failed,
a4e07f72 336 0
1dcf8b7b
JB
337 ),
338 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
339 .runTime.aggregate && {
340 runTime: {
98e72cda 341 minimum: round(
90d6701c 342 min(
98e72cda 343 ...this.workerNodes.map(
041dc05b 344 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 345 )
1dcf8b7b
JB
346 )
347 ),
98e72cda 348 maximum: round(
90d6701c 349 max(
98e72cda 350 ...this.workerNodes.map(
041dc05b 351 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 352 )
1dcf8b7b 353 )
98e72cda 354 ),
3baa0837
JB
355 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
356 .runTime.average && {
357 average: round(
358 average(
359 this.workerNodes.reduce<number[]>(
360 (accumulator, workerNode) =>
361 accumulator.concat(workerNode.usage.runTime.history),
362 []
363 )
98e72cda 364 )
dc021bcc 365 )
3baa0837 366 }),
98e72cda
JB
367 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
368 .runTime.median && {
369 median: round(
370 median(
3baa0837
JB
371 this.workerNodes.reduce<number[]>(
372 (accumulator, workerNode) =>
373 accumulator.concat(workerNode.usage.runTime.history),
374 []
98e72cda
JB
375 )
376 )
377 )
378 })
1dcf8b7b
JB
379 }
380 }),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .waitTime.aggregate && {
383 waitTime: {
98e72cda 384 minimum: round(
90d6701c 385 min(
98e72cda 386 ...this.workerNodes.map(
041dc05b 387 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 388 )
1dcf8b7b
JB
389 )
390 ),
98e72cda 391 maximum: round(
90d6701c 392 max(
98e72cda 393 ...this.workerNodes.map(
041dc05b 394 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 395 )
1dcf8b7b 396 )
98e72cda 397 ),
3baa0837
JB
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.average && {
400 average: round(
401 average(
402 this.workerNodes.reduce<number[]>(
403 (accumulator, workerNode) =>
404 accumulator.concat(workerNode.usage.waitTime.history),
405 []
406 )
98e72cda 407 )
dc021bcc 408 )
3baa0837 409 }),
98e72cda
JB
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.median && {
412 median: round(
413 median(
3baa0837
JB
414 this.workerNodes.reduce<number[]>(
415 (accumulator, workerNode) =>
416 accumulator.concat(workerNode.usage.waitTime.history),
417 []
98e72cda
JB
418 )
419 )
420 )
421 })
1dcf8b7b
JB
422 }
423 })
6b27d407
JB
424 }
425 }
08f3f44c 426
aa9eede8
JB
427 /**
428 * The pool readiness boolean status.
429 */
2431bdb4
JB
430 private get ready (): boolean {
431 return (
b97d82d8
JB
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
2431bdb4
JB
439 )
440 }
441
afe0d5bf 442 /**
aa9eede8 443 * The approximate pool utilization.
afe0d5bf
JB
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
8e5ca040 448 const poolTimeCapacity =
fe7d90db 449 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
71514351 452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
71514351 457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
458 0
459 )
8e5ca040 460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
461 }
462
8881ae32 463 /**
aa9eede8 464 * The pool type.
8881ae32
JB
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
184855e6 470 /**
aa9eede8 471 * The worker type.
184855e6
JB
472 */
473 protected abstract get worker (): WorkerType
474
c2ade475 475 /**
aa9eede8 476 * The pool minimum size.
c2ade475 477 */
8735b4e5
JB
478 protected get minSize (): number {
479 return this.numberOfWorkers
480 }
ff733df7
JB
481
482 /**
aa9eede8 483 * The pool maximum size.
ff733df7 484 */
8735b4e5
JB
485 protected get maxSize (): number {
486 return this.max ?? this.numberOfWorkers
487 }
a35560ba 488
6b813701
JB
489 /**
490 * Checks if the worker id sent in the received message from a worker is valid.
491 *
492 * @param message - The received message.
493 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
494 */
4d159167 495 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
496 if (message.workerId == null) {
497 throw new Error('Worker message received without worker id')
8e5a7cf9 498 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
499 throw new Error(
500 `Worker message received from unknown worker '${message.workerId}'`
501 )
502 }
503 }
504
ffcbbad8 505 /**
f06e48d8 506 * Gets the given worker its worker node key.
ffcbbad8
JB
507 *
508 * @param worker - The worker.
f59e1027 509 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 510 */
aad6fb64 511 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 512 return this.workerNodes.findIndex(
041dc05b 513 workerNode => workerNode.worker === worker
f06e48d8 514 )
bf9549ae
JB
515 }
516
aa9eede8
JB
517 /**
518 * Gets the worker node key given its worker id.
519 *
520 * @param workerId - The worker id.
aad6fb64 521 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 522 */
72ae84a2 523 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 524 return this.workerNodes.findIndex(
041dc05b 525 workerNode => workerNode.info.id === workerId
aad6fb64 526 )
aa9eede8
JB
527 }
528
afc003b2 529 /** @inheritDoc */
a35560ba 530 public setWorkerChoiceStrategy (
59219cbb
JB
531 workerChoiceStrategy: WorkerChoiceStrategy,
532 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 533 ): void {
bde6b5d7 534 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 535 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
536 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
537 this.opts.workerChoiceStrategy
538 )
539 if (workerChoiceStrategyOptions != null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
541 }
aa9eede8 542 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 543 workerNode.resetUsage()
9edb9717 544 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 545 }
a20f0ba5
JB
546 }
547
548 /** @inheritDoc */
549 public setWorkerChoiceStrategyOptions (
550 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
551 ): void {
0d80593b 552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
553 this.opts.workerChoiceStrategyOptions = {
554 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
555 ...workerChoiceStrategyOptions
556 }
a20f0ba5
JB
557 this.workerChoiceStrategyContext.setOptions(
558 this.opts.workerChoiceStrategyOptions
a35560ba
S
559 )
560 }
561
a20f0ba5 562 /** @inheritDoc */
8f52842f
JB
563 public enableTasksQueue (
564 enable: boolean,
565 tasksQueueOptions?: TasksQueueOptions
566 ): void {
a20f0ba5 567 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
568 this.unsetTaskStealing()
569 this.unsetTasksStealingOnBackPressure()
ef41a6e6 570 this.flushTasksQueues()
a20f0ba5
JB
571 }
572 this.opts.enableTasksQueue = enable
8f52842f 573 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
574 }
575
576 /** @inheritDoc */
8f52842f 577 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 578 if (this.opts.enableTasksQueue === true) {
bde6b5d7 579 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
580 this.opts.tasksQueueOptions =
581 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 582 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
583 if (this.opts.tasksQueueOptions.taskStealing === true) {
584 this.setTaskStealing()
585 } else {
586 this.unsetTaskStealing()
587 }
588 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
589 this.setTasksStealingOnBackPressure()
590 } else {
591 this.unsetTasksStealingOnBackPressure()
592 }
5baee0d7 593 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
594 delete this.opts.tasksQueueOptions
595 }
596 }
597
9b38ab2d
JB
598 private buildTasksQueueOptions (
599 tasksQueueOptions: TasksQueueOptions
600 ): TasksQueueOptions {
601 return {
602 ...{
603 size: Math.pow(this.maxSize, 2),
604 concurrency: 1,
605 taskStealing: true,
606 tasksStealingOnBackPressure: true
607 },
608 ...tasksQueueOptions
609 }
610 }
611
5b49e864 612 private setTasksQueueSize (size: number): void {
20c6f652 613 for (const workerNode of this.workerNodes) {
ff3f866a 614 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
615 }
616 }
617
d6ca1416
JB
618 private setTaskStealing (): void {
619 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
620 this.workerNodes[workerNodeKey].addEventListener(
621 'emptyqueue',
9b85e2d0 622 this.handleEmptyQueueEvent as EventListener
9f95d5eb 623 )
d6ca1416
JB
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
629 this.workerNodes[workerNodeKey].removeEventListener(
630 'emptyqueue',
9b85e2d0 631 this.handleEmptyQueueEvent as EventListener
9f95d5eb 632 )
d6ca1416
JB
633 }
634 }
635
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
638 this.workerNodes[workerNodeKey].addEventListener(
639 'backpressure',
9b85e2d0 640 this.handleBackPressureEvent as EventListener
9f95d5eb 641 )
d6ca1416
JB
642 }
643 }
644
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
647 this.workerNodes[workerNodeKey].removeEventListener(
648 'backpressure',
9b85e2d0 649 this.handleBackPressureEvent as EventListener
9f95d5eb 650 )
d6ca1416
JB
651 }
652 }
653
c319c66b
JB
654 /**
655 * Whether the pool is full or not.
656 *
657 * The pool filling boolean status.
658 */
dea903a8
JB
659 protected get full (): boolean {
660 return this.workerNodes.length >= this.maxSize
661 }
c2ade475 662
c319c66b
JB
663 /**
664 * Whether the pool is busy or not.
665 *
666 * The pool busyness boolean status.
667 */
668 protected abstract get busy (): boolean
7c0ba920 669
6c6afb84 670 /**
3d76750a 671 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
672 *
673 * @returns Worker nodes busyness boolean status.
674 */
c2ade475 675 protected internalBusy (): boolean {
3d76750a
JB
676 if (this.opts.enableTasksQueue === true) {
677 return (
678 this.workerNodes.findIndex(
041dc05b 679 workerNode =>
3d76750a
JB
680 workerNode.info.ready &&
681 workerNode.usage.tasks.executing <
682 (this.opts.tasksQueueOptions?.concurrency as number)
683 ) === -1
684 )
3d76750a 685 }
419e8121
JB
686 return (
687 this.workerNodes.findIndex(
688 workerNode =>
689 workerNode.info.ready && workerNode.usage.tasks.executing === 0
690 ) === -1
691 )
cb70b19d
JB
692 }
693
e81c38f2 694 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
695 workerNodeKey: number,
696 message: MessageValue<Data>
697 ): Promise<boolean> {
72ae84a2 698 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
699 const taskFunctionOperationListener = (
700 message: MessageValue<Response>
701 ): void => {
702 this.checkMessageWorkerId(message)
703 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 704 if (
ae036c3e
JB
705 message.taskFunctionOperationStatus != null &&
706 message.workerId === workerId
72ae84a2 707 ) {
ae036c3e
JB
708 if (message.taskFunctionOperationStatus) {
709 resolve(true)
710 } else if (!message.taskFunctionOperationStatus) {
711 reject(
712 new Error(
713 `Task function operation '${
714 message.taskFunctionOperation as string
715 }' failed on worker ${message.workerId} with error: '${
716 message.workerError?.message as string
717 }'`
718 )
72ae84a2 719 )
ae036c3e
JB
720 }
721 this.deregisterWorkerMessageListener(
722 this.getWorkerNodeKeyByWorkerId(message.workerId),
723 taskFunctionOperationListener
72ae84a2
JB
724 )
725 }
ae036c3e
JB
726 }
727 this.registerWorkerMessageListener(
728 workerNodeKey,
729 taskFunctionOperationListener
730 )
72ae84a2
JB
731 this.sendToWorker(workerNodeKey, message)
732 })
733 }
734
735 private async sendTaskFunctionOperationToWorkers (
adee6053 736 message: MessageValue<Data>
e81c38f2
JB
737 ): Promise<boolean> {
738 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
739 const responsesReceived = new Array<MessageValue<Response>>()
740 const taskFunctionOperationsListener = (
741 message: MessageValue<Response>
742 ): void => {
743 this.checkMessageWorkerId(message)
744 if (message.taskFunctionOperationStatus != null) {
745 responsesReceived.push(message)
746 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 747 if (
e81c38f2
JB
748 responsesReceived.every(
749 message => message.taskFunctionOperationStatus === true
750 )
751 ) {
752 resolve(true)
753 } else if (
e81c38f2
JB
754 responsesReceived.some(
755 message => message.taskFunctionOperationStatus === false
756 )
757 ) {
b0b55f57
JB
758 const errorResponse = responsesReceived.find(
759 response => response.taskFunctionOperationStatus === false
760 )
e81c38f2
JB
761 reject(
762 new Error(
b0b55f57 763 `Task function operation '${
e81c38f2 764 message.taskFunctionOperation as string
b0b55f57
JB
765 }' failed on worker ${
766 errorResponse?.workerId as number
767 } with error: '${
768 errorResponse?.workerError?.message as string
769 }'`
e81c38f2
JB
770 )
771 )
772 }
ae036c3e
JB
773 this.deregisterWorkerMessageListener(
774 this.getWorkerNodeKeyByWorkerId(message.workerId),
775 taskFunctionOperationsListener
776 )
e81c38f2 777 }
ae036c3e
JB
778 }
779 }
780 for (const [workerNodeKey] of this.workerNodes.entries()) {
781 this.registerWorkerMessageListener(
782 workerNodeKey,
783 taskFunctionOperationsListener
784 )
72ae84a2 785 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
786 }
787 })
6703b9f4
JB
788 }
789
790 /** @inheritDoc */
791 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
792 for (const workerNode of this.workerNodes) {
793 if (
794 Array.isArray(workerNode.info.taskFunctionNames) &&
795 workerNode.info.taskFunctionNames.includes(name)
796 ) {
797 return true
798 }
799 }
800 return false
6703b9f4
JB
801 }
802
803 /** @inheritDoc */
e81c38f2
JB
804 public async addTaskFunction (
805 name: string,
3feeab69 806 fn: TaskFunction<Data, Response>
e81c38f2 807 ): Promise<boolean> {
3feeab69
JB
808 if (typeof name !== 'string') {
809 throw new TypeError('name argument must be a string')
810 }
811 if (typeof name === 'string' && name.trim().length === 0) {
812 throw new TypeError('name argument must not be an empty string')
813 }
814 if (typeof fn !== 'function') {
815 throw new TypeError('fn argument must be a function')
816 }
adee6053 817 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
818 taskFunctionOperation: 'add',
819 taskFunctionName: name,
3feeab69 820 taskFunction: fn.toString()
6703b9f4 821 })
adee6053
JB
822 this.taskFunctions.set(name, fn)
823 return opResult
6703b9f4
JB
824 }
825
826 /** @inheritDoc */
e81c38f2 827 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
828 if (!this.taskFunctions.has(name)) {
829 throw new Error(
16248b23 830 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
831 )
832 }
adee6053 833 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
834 taskFunctionOperation: 'remove',
835 taskFunctionName: name
836 })
adee6053
JB
837 this.deleteTaskFunctionWorkerUsages(name)
838 this.taskFunctions.delete(name)
839 return opResult
6703b9f4
JB
840 }
841
90d7d101 842 /** @inheritDoc */
6703b9f4 843 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
844 for (const workerNode of this.workerNodes) {
845 if (
6703b9f4
JB
846 Array.isArray(workerNode.info.taskFunctionNames) &&
847 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 848 ) {
6703b9f4 849 return workerNode.info.taskFunctionNames
f2dbbf95 850 }
90d7d101 851 }
f2dbbf95 852 return []
90d7d101
JB
853 }
854
6703b9f4 855 /** @inheritDoc */
e81c38f2 856 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 857 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
858 taskFunctionOperation: 'default',
859 taskFunctionName: name
860 })
6703b9f4
JB
861 }
862
adee6053
JB
863 private deleteTaskFunctionWorkerUsages (name: string): void {
864 for (const workerNode of this.workerNodes) {
865 workerNode.deleteTaskFunctionWorkerUsage(name)
866 }
867 }
868
375f7504
JB
869 private shallExecuteTask (workerNodeKey: number): boolean {
870 return (
871 this.tasksQueueSize(workerNodeKey) === 0 &&
872 this.workerNodes[workerNodeKey].usage.tasks.executing <
873 (this.opts.tasksQueueOptions?.concurrency as number)
874 )
875 }
876
afc003b2 877 /** @inheritDoc */
7d91a8cd
JB
878 public async execute (
879 data?: Data,
880 name?: string,
881 transferList?: TransferListItem[]
882 ): Promise<Response> {
52b71763 883 return await new Promise<Response>((resolve, reject) => {
15b176e0 884 if (!this.started) {
47352846 885 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 886 return
15b176e0 887 }
7d91a8cd
JB
888 if (name != null && typeof name !== 'string') {
889 reject(new TypeError('name argument must be a string'))
9d2d0da1 890 return
7d91a8cd 891 }
90d7d101
JB
892 if (
893 name != null &&
894 typeof name === 'string' &&
895 name.trim().length === 0
896 ) {
f58b60b9 897 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 898 return
90d7d101 899 }
b558f6b5
JB
900 if (transferList != null && !Array.isArray(transferList)) {
901 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 902 return
b558f6b5
JB
903 }
904 const timestamp = performance.now()
905 const workerNodeKey = this.chooseWorkerNode()
501aea93 906 const task: Task<Data> = {
52b71763
JB
907 name: name ?? DEFAULT_TASK_NAME,
908 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
909 data: data ?? ({} as Data),
7d91a8cd 910 transferList,
52b71763 911 timestamp,
7629bdf1 912 taskId: randomUUID()
52b71763 913 }
7629bdf1 914 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
915 resolve,
916 reject,
501aea93 917 workerNodeKey
2e81254d 918 })
52b71763 919 if (
4e377863
JB
920 this.opts.enableTasksQueue === false ||
921 (this.opts.enableTasksQueue === true &&
375f7504 922 this.shallExecuteTask(workerNodeKey))
52b71763 923 ) {
501aea93 924 this.executeTask(workerNodeKey, task)
4e377863
JB
925 } else {
926 this.enqueueTask(workerNodeKey, task)
52b71763 927 }
2e81254d 928 })
280c2a77 929 }
c97c7edb 930
47352846
JB
931 /** @inheritdoc */
932 public start (): void {
933 this.starting = true
934 while (
935 this.workerNodes.reduce(
936 (accumulator, workerNode) =>
937 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
938 0
939 ) < this.numberOfWorkers
940 ) {
941 this.createAndSetupWorkerNode()
942 }
943 this.starting = false
944 this.started = true
945 }
946
afc003b2 947 /** @inheritDoc */
c97c7edb 948 public async destroy (): Promise<void> {
1fbcaa7c 949 await Promise.all(
81c02522 950 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 951 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
952 })
953 )
33e6bb4c 954 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 955 this.emitter?.emitDestroy()
15b176e0 956 this.started = false
c97c7edb
S
957 }
958
1e3214b6 959 protected async sendKillMessageToWorker (
72ae84a2 960 workerNodeKey: number
1e3214b6 961 ): Promise<void> {
9edb9717 962 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
963 const killMessageListener = (message: MessageValue<Response>): void => {
964 this.checkMessageWorkerId(message)
1e3214b6
JB
965 if (message.kill === 'success') {
966 resolve()
967 } else if (message.kill === 'failure') {
72ae84a2
JB
968 reject(
969 new Error(
ae036c3e 970 `Kill message handling failed on worker ${
72ae84a2 971 message.workerId as number
ae036c3e 972 }`
72ae84a2
JB
973 )
974 )
1e3214b6 975 }
ae036c3e 976 }
51d9cfbd 977 // FIXME: should be registered only once
ae036c3e 978 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 979 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 980 })
1e3214b6
JB
981 }
982
4a6952ff 983 /**
aa9eede8 984 * Terminates the worker node given its worker node key.
4a6952ff 985 *
aa9eede8 986 * @param workerNodeKey - The worker node key.
4a6952ff 987 */
81c02522 988 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 989
729c563d 990 /**
6677a3d3
JB
991 * Setup hook to execute code before worker nodes are created in the abstract constructor.
992 * Can be overridden.
afc003b2
JB
993 *
994 * @virtual
729c563d 995 */
280c2a77 996 protected setupHook (): void {
965df41c 997 /* Intentionally empty */
280c2a77 998 }
c97c7edb 999
729c563d 1000 /**
280c2a77
S
1001 * Should return whether the worker is the main worker or not.
1002 */
1003 protected abstract isMain (): boolean
1004
1005 /**
2e81254d 1006 * Hook executed before the worker task execution.
bf9549ae 1007 * Can be overridden.
729c563d 1008 *
f06e48d8 1009 * @param workerNodeKey - The worker node key.
1c6fe997 1010 * @param task - The task to execute.
729c563d 1011 */
1c6fe997
JB
1012 protected beforeTaskExecutionHook (
1013 workerNodeKey: number,
1014 task: Task<Data>
1015 ): void {
94407def
JB
1016 if (this.workerNodes[workerNodeKey]?.usage != null) {
1017 const workerUsage = this.workerNodes[workerNodeKey].usage
1018 ++workerUsage.tasks.executing
1019 this.updateWaitTimeWorkerUsage(workerUsage, task)
1020 }
1021 if (
1022 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1023 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1024 task.name as string
1025 ) != null
1026 ) {
db0e38ee 1027 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1028 workerNodeKey
db0e38ee 1029 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1030 ++taskFunctionWorkerUsage.tasks.executing
1031 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1032 }
c97c7edb
S
1033 }
1034
c01733f1 1035 /**
2e81254d 1036 * Hook executed after the worker task execution.
bf9549ae 1037 * Can be overridden.
c01733f1 1038 *
501aea93 1039 * @param workerNodeKey - The worker node key.
38e795c1 1040 * @param message - The received message.
c01733f1 1041 */
2e81254d 1042 protected afterTaskExecutionHook (
501aea93 1043 workerNodeKey: number,
2740a743 1044 message: MessageValue<Response>
bf9549ae 1045 ): void {
94407def
JB
1046 if (this.workerNodes[workerNodeKey]?.usage != null) {
1047 const workerUsage = this.workerNodes[workerNodeKey].usage
1048 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1049 this.updateRunTimeWorkerUsage(workerUsage, message)
1050 this.updateEluWorkerUsage(workerUsage, message)
1051 }
1052 if (
1053 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1054 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1055 message.taskPerformance?.name as string
94407def
JB
1056 ) != null
1057 ) {
db0e38ee 1058 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1059 workerNodeKey
db0e38ee 1060 ].getTaskFunctionWorkerUsage(
0628755c 1061 message.taskPerformance?.name as string
b558f6b5 1062 ) as WorkerUsage
db0e38ee
JB
1063 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1064 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1065 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1066 }
1067 }
1068
db0e38ee
JB
1069 /**
1070 * Whether the worker node shall update its task function worker usage or not.
1071 *
1072 * @param workerNodeKey - The worker node key.
1073 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1074 */
1075 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1076 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1077 return (
94407def 1078 workerInfo != null &&
6703b9f4
JB
1079 Array.isArray(workerInfo.taskFunctionNames) &&
1080 workerInfo.taskFunctionNames.length > 2
b558f6b5 1081 )
f1c06930
JB
1082 }
1083
1084 private updateTaskStatisticsWorkerUsage (
1085 workerUsage: WorkerUsage,
1086 message: MessageValue<Response>
1087 ): void {
a4e07f72 1088 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1089 if (
1090 workerTaskStatistics.executing != null &&
1091 workerTaskStatistics.executing > 0
1092 ) {
1093 --workerTaskStatistics.executing
5bb5be17 1094 }
6703b9f4 1095 if (message.workerError == null) {
98e72cda
JB
1096 ++workerTaskStatistics.executed
1097 } else {
a4e07f72 1098 ++workerTaskStatistics.failed
2740a743 1099 }
f8eb0a2a
JB
1100 }
1101
a4e07f72
JB
1102 private updateRunTimeWorkerUsage (
1103 workerUsage: WorkerUsage,
f8eb0a2a
JB
1104 message: MessageValue<Response>
1105 ): void {
6703b9f4 1106 if (message.workerError != null) {
dc021bcc
JB
1107 return
1108 }
e4f20deb
JB
1109 updateMeasurementStatistics(
1110 workerUsage.runTime,
1111 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1112 message.taskPerformance?.runTime ?? 0
e4f20deb 1113 )
f8eb0a2a
JB
1114 }
1115
a4e07f72
JB
1116 private updateWaitTimeWorkerUsage (
1117 workerUsage: WorkerUsage,
1c6fe997 1118 task: Task<Data>
f8eb0a2a 1119 ): void {
1c6fe997
JB
1120 const timestamp = performance.now()
1121 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1122 updateMeasurementStatistics(
1123 workerUsage.waitTime,
1124 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1125 taskWaitTime
e4f20deb 1126 )
c01733f1 1127 }
1128
a4e07f72 1129 private updateEluWorkerUsage (
5df69fab 1130 workerUsage: WorkerUsage,
62c15a68
JB
1131 message: MessageValue<Response>
1132 ): void {
6703b9f4 1133 if (message.workerError != null) {
dc021bcc
JB
1134 return
1135 }
008512c7
JB
1136 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1137 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1138 updateMeasurementStatistics(
1139 workerUsage.elu.active,
008512c7 1140 eluTaskStatisticsRequirements,
dc021bcc 1141 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1142 )
1143 updateMeasurementStatistics(
1144 workerUsage.elu.idle,
008512c7 1145 eluTaskStatisticsRequirements,
dc021bcc 1146 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1147 )
008512c7 1148 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1149 if (message.taskPerformance?.elu != null) {
f7510105
JB
1150 if (workerUsage.elu.utilization != null) {
1151 workerUsage.elu.utilization =
1152 (workerUsage.elu.utilization +
1153 message.taskPerformance.elu.utilization) /
1154 2
1155 } else {
1156 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1157 }
62c15a68
JB
1158 }
1159 }
1160 }
1161
280c2a77 1162 /**
f06e48d8 1163 * Chooses a worker node for the next task.
280c2a77 1164 *
6c6afb84 1165 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1166 *
aa9eede8 1167 * @returns The chosen worker node key
280c2a77 1168 */
6c6afb84 1169 private chooseWorkerNode (): number {
930dcf12 1170 if (this.shallCreateDynamicWorker()) {
aa9eede8 1171 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1172 if (
b1aae695 1173 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1174 ) {
aa9eede8 1175 return workerNodeKey
6c6afb84 1176 }
17393ac8 1177 }
930dcf12
JB
1178 return this.workerChoiceStrategyContext.execute()
1179 }
1180
6c6afb84
JB
1181 /**
1182 * Conditions for dynamic worker creation.
1183 *
1184 * @returns Whether to create a dynamic worker or not.
1185 */
1186 private shallCreateDynamicWorker (): boolean {
930dcf12 1187 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1188 }
1189
280c2a77 1190 /**
aa9eede8 1191 * Sends a message to worker given its worker node key.
280c2a77 1192 *
aa9eede8 1193 * @param workerNodeKey - The worker node key.
38e795c1 1194 * @param message - The message.
7d91a8cd 1195 * @param transferList - The optional array of transferable objects.
280c2a77
S
1196 */
1197 protected abstract sendToWorker (
aa9eede8 1198 workerNodeKey: number,
7d91a8cd
JB
1199 message: MessageValue<Data>,
1200 transferList?: TransferListItem[]
280c2a77
S
1201 ): void
1202
729c563d 1203 /**
41344292 1204 * Creates a new worker.
6c6afb84
JB
1205 *
1206 * @returns Newly created worker.
729c563d 1207 */
280c2a77 1208 protected abstract createWorker (): Worker
c97c7edb 1209
4a6952ff 1210 /**
aa9eede8 1211 * Creates a new, completely set up worker node.
4a6952ff 1212 *
aa9eede8 1213 * @returns New, completely set up worker node key.
4a6952ff 1214 */
aa9eede8 1215 protected createAndSetupWorkerNode (): number {
bdacc2d2 1216 const worker = this.createWorker()
280c2a77 1217
fd04474e 1218 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1219 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1220 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1221 worker.on('error', error => {
aad6fb64 1222 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1223 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1224 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1225 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1226 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1227 if (
b6bfca01 1228 this.started &&
9b38ab2d
JB
1229 !this.starting &&
1230 this.opts.restartWorkerOnError === true
15b176e0 1231 ) {
9b106837 1232 if (workerInfo.dynamic) {
aa9eede8 1233 this.createAndSetupDynamicWorkerNode()
8a1260a3 1234 } else {
aa9eede8 1235 this.createAndSetupWorkerNode()
8a1260a3 1236 }
5baee0d7 1237 }
9b38ab2d 1238 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1239 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1240 }
5baee0d7 1241 })
a35560ba 1242 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1243 worker.once('exit', () => {
f06e48d8 1244 this.removeWorkerNode(worker)
a974afa6 1245 })
280c2a77 1246
aa9eede8 1247 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1248
aa9eede8 1249 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1250
aa9eede8 1251 return workerNodeKey
c97c7edb 1252 }
be0676b3 1253
930dcf12 1254 /**
aa9eede8 1255 * Creates a new, completely set up dynamic worker node.
930dcf12 1256 *
aa9eede8 1257 * @returns New, completely set up dynamic worker node key.
930dcf12 1258 */
aa9eede8
JB
1259 protected createAndSetupDynamicWorkerNode (): number {
1260 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1261 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1262 this.checkMessageWorkerId(message)
aa9eede8
JB
1263 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1264 message.workerId
aad6fb64 1265 )
aa9eede8 1266 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1267 // Kill message received from worker
930dcf12
JB
1268 if (
1269 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1270 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1271 ((this.opts.enableTasksQueue === false &&
aa9eede8 1272 workerUsage.tasks.executing === 0) ||
7b56f532 1273 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1274 workerUsage.tasks.executing === 0 &&
1275 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1276 ) {
f3827d5d 1277 // Flag the worker node as not ready immediately
ae3ab61d 1278 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1279 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1280 this.emitter?.emit(PoolEvents.error, error)
1281 })
930dcf12
JB
1282 }
1283 })
46b0bb09 1284 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1285 this.sendToWorker(workerNodeKey, {
e9dd5b66 1286 checkActive: true
21f710aa 1287 })
72ae84a2
JB
1288 if (this.taskFunctions.size > 0) {
1289 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1290 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1291 taskFunctionOperation: 'add',
1292 taskFunctionName,
1293 taskFunction: taskFunction.toString()
1294 }).catch(error => {
1295 this.emitter?.emit(PoolEvents.error, error)
1296 })
1297 }
1298 }
b5e113f6 1299 workerInfo.dynamic = true
b1aae695
JB
1300 if (
1301 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1302 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1303 ) {
b5e113f6
JB
1304 workerInfo.ready = true
1305 }
33e6bb4c 1306 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1307 return workerNodeKey
930dcf12
JB
1308 }
1309
a2ed5053 1310 /**
aa9eede8 1311 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1312 *
aa9eede8 1313 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1314 * @param listener - The message listener callback.
1315 */
85aeb3f3
JB
1316 protected abstract registerWorkerMessageListener<
1317 Message extends Data | Response
aa9eede8
JB
1318 >(
1319 workerNodeKey: number,
1320 listener: (message: MessageValue<Message>) => void
1321 ): void
a2ed5053 1322
ae036c3e
JB
1323 /**
1324 * Registers once a listener callback on the worker given its worker node key.
1325 *
1326 * @param workerNodeKey - The worker node key.
1327 * @param listener - The message listener callback.
1328 */
1329 protected abstract registerOnceWorkerMessageListener<
1330 Message extends Data | Response
1331 >(
1332 workerNodeKey: number,
1333 listener: (message: MessageValue<Message>) => void
1334 ): void
1335
1336 /**
1337 * Deregisters a listener callback on the worker given its worker node key.
1338 *
1339 * @param workerNodeKey - The worker node key.
1340 * @param listener - The message listener callback.
1341 */
1342 protected abstract deregisterWorkerMessageListener<
1343 Message extends Data | Response
1344 >(
1345 workerNodeKey: number,
1346 listener: (message: MessageValue<Message>) => void
1347 ): void
1348
a2ed5053 1349 /**
aa9eede8 1350 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1351 * Can be overridden.
1352 *
aa9eede8 1353 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1354 */
aa9eede8 1355 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1356 // Listen to worker messages.
fcd39179
JB
1357 this.registerWorkerMessageListener(
1358 workerNodeKey,
1359 this.workerMessageListener.bind(this)
1360 )
85aeb3f3 1361 // Send the startup message to worker.
aa9eede8 1362 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1363 // Send the statistics message to worker.
1364 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1365 if (this.opts.enableTasksQueue === true) {
dbd73092 1366 if (this.opts.tasksQueueOptions?.taskStealing === true) {
9f95d5eb
JB
1367 this.workerNodes[workerNodeKey].addEventListener(
1368 'emptyqueue',
9b85e2d0 1369 this.handleEmptyQueueEvent as EventListener
9f95d5eb 1370 )
47352846
JB
1371 }
1372 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
9f95d5eb
JB
1373 this.workerNodes[workerNodeKey].addEventListener(
1374 'backpressure',
9b85e2d0 1375 this.handleBackPressureEvent as EventListener
9f95d5eb 1376 )
47352846 1377 }
72695f86 1378 }
d2c73f82
JB
1379 }
1380
85aeb3f3 1381 /**
aa9eede8
JB
1382 * Sends the startup message to worker given its worker node key.
1383 *
1384 * @param workerNodeKey - The worker node key.
1385 */
1386 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1387
1388 /**
9edb9717 1389 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1390 *
aa9eede8 1391 * @param workerNodeKey - The worker node key.
85aeb3f3 1392 */
9edb9717 1393 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1394 this.sendToWorker(workerNodeKey, {
1395 statistics: {
1396 runTime:
1397 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1398 .runTime.aggregate,
1399 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1400 .elu.aggregate
72ae84a2 1401 }
aa9eede8
JB
1402 })
1403 }
a2ed5053
JB
1404
1405 private redistributeQueuedTasks (workerNodeKey: number): void {
1406 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1407 const destinationWorkerNodeKey = this.workerNodes.reduce(
1408 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1409 return workerNode.info.ready &&
1410 workerNode.usage.tasks.queued <
1411 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1412 ? workerNodeKey
1413 : minWorkerNodeKey
1414 },
1415 0
1416 )
72ae84a2 1417 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1418 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1419 this.executeTask(destinationWorkerNodeKey, task)
1420 } else {
1421 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1422 }
1423 }
1424 }
1425
b1838604
JB
1426 private updateTaskStolenStatisticsWorkerUsage (
1427 workerNodeKey: number,
b1838604
JB
1428 taskName: string
1429 ): void {
1a880eca 1430 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1431 if (workerNode?.usage != null) {
1432 ++workerNode.usage.tasks.stolen
1433 }
1434 if (
1435 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1436 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1437 ) {
1438 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1439 taskName
1440 ) as WorkerUsage
1441 ++taskFunctionWorkerUsage.tasks.stolen
1442 }
1443 }
1444
9f95d5eb
JB
1445 private readonly handleEmptyQueueEvent = (
1446 event: CustomEvent<WorkerNodeEventDetail>
1447 ): void => {
1448 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1449 event.detail.workerId
1450 )
dd951876 1451 const workerNodes = this.workerNodes
a6b3272b 1452 .slice()
dd951876
JB
1453 .sort(
1454 (workerNodeA, workerNodeB) =>
1455 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1456 )
f201a0cd 1457 const sourceWorkerNode = workerNodes.find(
041dc05b 1458 workerNode =>
f201a0cd 1459 workerNode.info.ready &&
9f95d5eb 1460 workerNode.info.id !== event.detail.workerId &&
f201a0cd
JB
1461 workerNode.usage.tasks.queued > 0
1462 )
1463 if (sourceWorkerNode != null) {
72ae84a2 1464 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1465 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1466 this.executeTask(destinationWorkerNodeKey, task)
1467 } else {
1468 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1469 }
f201a0cd
JB
1470 this.updateTaskStolenStatisticsWorkerUsage(
1471 destinationWorkerNodeKey,
1472 task.name as string
1473 )
72695f86
JB
1474 }
1475 }
1476
9f95d5eb
JB
1477 private readonly handleBackPressureEvent = (
1478 event: CustomEvent<WorkerNodeEventDetail>
1479 ): void => {
f778c355
JB
1480 const sizeOffset = 1
1481 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1482 return
1483 }
72695f86 1484 const sourceWorkerNode =
9f95d5eb 1485 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
72695f86 1486 const workerNodes = this.workerNodes
a6b3272b 1487 .slice()
72695f86
JB
1488 .sort(
1489 (workerNodeA, workerNodeB) =>
1490 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1491 )
1492 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1493 if (
0bc68267 1494 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1495 workerNode.info.ready &&
9f95d5eb 1496 workerNode.info.id !== event.detail.workerId &&
0bc68267 1497 workerNode.usage.tasks.queued <
f778c355 1498 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1499 ) {
72ae84a2 1500 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1501 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1502 this.executeTask(workerNodeKey, task)
4de3d785 1503 } else {
dd951876 1504 this.enqueueTask(workerNodeKey, task)
4de3d785 1505 }
b1838604
JB
1506 this.updateTaskStolenStatisticsWorkerUsage(
1507 workerNodeKey,
b1838604
JB
1508 task.name as string
1509 )
10ecf8fd 1510 }
a2ed5053
JB
1511 }
1512 }
1513
be0676b3 1514 /**
fcd39179 1515 * This method is the message listener registered on each worker.
be0676b3 1516 */
fcd39179
JB
1517 protected workerMessageListener (message: MessageValue<Response>): void {
1518 this.checkMessageWorkerId(message)
1519 if (message.ready != null && message.taskFunctionNames != null) {
1520 // Worker ready response received from worker
1521 this.handleWorkerReadyResponse(message)
1522 } else if (message.taskId != null) {
1523 // Task execution response received from worker
1524 this.handleTaskExecutionResponse(message)
1525 } else if (message.taskFunctionNames != null) {
1526 // Task function names message received from worker
1527 this.getWorkerInfo(
1528 this.getWorkerNodeKeyByWorkerId(message.workerId)
1529 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1530 }
1531 }
1532
10e2aa7e 1533 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1534 if (message.ready === false) {
72ae84a2
JB
1535 throw new Error(
1536 `Worker ${message.workerId as number} failed to initialize`
1537 )
f05ed93c 1538 }
a5d15204 1539 const workerInfo = this.getWorkerInfo(
aad6fb64 1540 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1541 )
a5d15204 1542 workerInfo.ready = message.ready as boolean
6703b9f4 1543 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1544 if (this.ready) {
1545 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1546 }
6b272951
JB
1547 }
1548
1549 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1550 const { taskId, workerError, data } = message
5441aea6 1551 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1552 if (promiseResponse != null) {
6703b9f4
JB
1553 if (workerError != null) {
1554 this.emitter?.emit(PoolEvents.taskError, workerError)
1555 promiseResponse.reject(workerError.message)
6b272951 1556 } else {
5441aea6 1557 promiseResponse.resolve(data as Response)
6b272951 1558 }
501aea93
JB
1559 const workerNodeKey = promiseResponse.workerNodeKey
1560 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1561 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1562 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1563 if (
1564 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1565 this.tasksQueueSize(workerNodeKey) > 0 &&
1566 this.workerNodes[workerNodeKey].usage.tasks.executing <
1567 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1568 ) {
1569 this.executeTask(
1570 workerNodeKey,
1571 this.dequeueTask(workerNodeKey) as Task<Data>
1572 )
be0676b3
APA
1573 }
1574 }
be0676b3 1575 }
7c0ba920 1576
a1763c54 1577 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1578 if (this.busy) {
1579 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1580 }
1581 }
1582
1583 private checkAndEmitTaskQueuingEvents (): void {
1584 if (this.hasBackPressure()) {
1585 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1586 }
1587 }
1588
33e6bb4c
JB
1589 private checkAndEmitDynamicWorkerCreationEvents (): void {
1590 if (this.type === PoolTypes.dynamic) {
1591 if (this.full) {
1592 this.emitter?.emit(PoolEvents.full, this.info)
1593 }
1594 }
1595 }
1596
8a1260a3 1597 /**
aa9eede8 1598 * Gets the worker information given its worker node key.
8a1260a3
JB
1599 *
1600 * @param workerNodeKey - The worker node key.
3f09ed9f 1601 * @returns The worker information.
8a1260a3 1602 */
46b0bb09 1603 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1604 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1605 }
1606
a05c10de 1607 /**
b0a4db63 1608 * Adds the given worker in the pool worker nodes.
ea7a90d3 1609 *
38e795c1 1610 * @param worker - The worker.
aa9eede8
JB
1611 * @returns The added worker node key.
1612 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1613 */
b0a4db63 1614 private addWorkerNode (worker: Worker): number {
671d5154
JB
1615 const workerNode = new WorkerNode<Worker, Data>(
1616 worker,
ff3f866a 1617 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1618 )
b97d82d8 1619 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1620 if (this.starting) {
1621 workerNode.info.ready = true
1622 }
aa9eede8 1623 this.workerNodes.push(workerNode)
aad6fb64 1624 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1625 if (workerNodeKey === -1) {
86ed0598 1626 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1627 }
1628 return workerNodeKey
ea7a90d3 1629 }
c923ce56 1630
51fe3d3c 1631 /**
f06e48d8 1632 * Removes the given worker from the pool worker nodes.
51fe3d3c 1633 *
f06e48d8 1634 * @param worker - The worker.
51fe3d3c 1635 */
416fd65c 1636 private removeWorkerNode (worker: Worker): void {
aad6fb64 1637 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1638 if (workerNodeKey !== -1) {
1639 this.workerNodes.splice(workerNodeKey, 1)
1640 this.workerChoiceStrategyContext.remove(workerNodeKey)
1641 }
51fe3d3c 1642 }
adc3c320 1643
ae3ab61d
JB
1644 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1645 this.getWorkerInfo(workerNodeKey).ready = false
1646 }
1647
e2b31e32
JB
1648 /** @inheritDoc */
1649 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1650 return (
e2b31e32
JB
1651 this.opts.enableTasksQueue === true &&
1652 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1653 )
1654 }
1655
1656 private hasBackPressure (): boolean {
1657 return (
1658 this.opts.enableTasksQueue === true &&
1659 this.workerNodes.findIndex(
041dc05b 1660 workerNode => !workerNode.hasBackPressure()
a1763c54 1661 ) === -1
9e844245 1662 )
e2b31e32
JB
1663 }
1664
b0a4db63 1665 /**
aa9eede8 1666 * Executes the given task on the worker given its worker node key.
b0a4db63 1667 *
aa9eede8 1668 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1669 * @param task - The task to execute.
1670 */
2e81254d 1671 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1672 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1673 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1674 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1675 }
1676
f9f00b5f 1677 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1678 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1679 this.checkAndEmitTaskQueuingEvents()
1680 return tasksQueueSize
adc3c320
JB
1681 }
1682
416fd65c 1683 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1684 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1685 }
1686
416fd65c 1687 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1688 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1689 }
1690
81c02522 1691 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1692 while (this.tasksQueueSize(workerNodeKey) > 0) {
1693 this.executeTask(
1694 workerNodeKey,
1695 this.dequeueTask(workerNodeKey) as Task<Data>
1696 )
ff733df7 1697 }
4b628b48 1698 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1699 }
1700
ef41a6e6
JB
1701 private flushTasksQueues (): void {
1702 for (const [workerNodeKey] of this.workerNodes.entries()) {
1703 this.flushTasksQueue(workerNodeKey)
1704 }
1705 }
c97c7edb 1706}