fix: ensure worker node destroy semantic is always the same
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
501aea93
JB
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
075e51d1 86 /**
adc9cc64 87 * Whether the pool is starting or not.
075e51d1
JB
88 */
89 private readonly starting: boolean
afe0d5bf
JB
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
729c563d
S
95 /**
96 * Constructs a new poolifier pool.
97 *
38e795c1 98 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 99 * @param filePath - Path to the worker file.
38e795c1 100 * @param opts - Options for the pool.
729c563d 101 */
c97c7edb 102 public constructor (
b4213b7f
JB
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
c97c7edb 106 ) {
78cea37e 107 if (!this.isMain()) {
c97c7edb
S
108 throw new Error('Cannot start a pool from a worker!')
109 }
8d3782fa 110 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 111 this.checkFilePath(this.filePath)
7c0ba920 112 this.checkPoolOptions(this.opts)
1086026a 113
7254e419
JB
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 117 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 119
6bd72cd0 120 if (this.opts.enableEvents === true) {
7c0ba920
JB
121 this.emitter = new PoolEmitter()
122 }
d59df138
JB
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
da309861
JB
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
b6b32453
JB
132
133 this.setupHook()
134
075e51d1 135 this.starting = true
e761c033 136 this.startPool()
075e51d1 137 this.starting = false
afe0d5bf
JB
138
139 this.startTimestamp = performance.now()
c97c7edb
S
140 }
141
a35560ba 142 private checkFilePath (filePath: string): void {
ffcbbad8
JB
143 if (
144 filePath == null ||
3d6dd312 145 typeof filePath !== 'string' ||
ffcbbad8
JB
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
c510fea7
APA
148 throw new Error('Please specify a file with a worker implementation')
149 }
3d6dd312
JB
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
c510fea7
APA
153 }
154
8d3782fa
JB
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
78cea37e 160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 161 throw new TypeError(
0d80593b 162 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
163 )
164 } else if (numberOfWorkers < 0) {
473c717a 165 throw new RangeError(
8d3782fa
JB
166 'Cannot instantiate a pool with a negative number of workers'
167 )
6b27d407 168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 174 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
175 if (max == null) {
176 throw new Error(
177 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
178 )
179 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
180 throw new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 } else if (min > max) {
079de991
JB
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
b97d82d8 187 } else if (max === 0) {
079de991 188 throw new RangeError(
b97d82d8 189 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
8d3782fa
JB
196 }
197 }
198
7c0ba920 199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
1f68cede 210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 223 }
aee46736
JB
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 230 throw new Error(
aee46736 231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
232 )
233 }
7c0ba920
JB
234 }
235
0d80593b
JB
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
49be33fe
JB
244 if (
245 workerChoiceStrategyOptions.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803
JB
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
a20f0ba5
JB
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
0d80593b
JB
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
f0d7f803
JB
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
a20f0ba5 282 throw new Error(
f0d7f803 283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
284 )
285 }
286 }
287
e761c033
JB
288 private startPool (): void {
289 while (
290 this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
293 0
294 ) < this.numberOfWorkers
295 ) {
aa9eede8 296 this.createAndSetupWorkerNode()
e761c033
JB
297 }
298 }
299
08f3f44c 300 /** @inheritDoc */
6b27d407
JB
301 public get info (): PoolInfo {
302 return {
23ccf9d7 303 version,
6b27d407 304 type: this.type,
184855e6 305 worker: this.worker,
2431bdb4
JB
306 ready: this.ready,
307 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
308 minSize: this.minSize,
309 maxSize: this.maxSize,
c05f0d50
JB
310 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
311 .runTime.aggregate &&
1305e9a8
JB
312 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
314 workerNodes: this.workerNodes.length,
315 idleWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing === 0
a4e07f72
JB
318 ? accumulator + 1
319 : accumulator,
6b27d407
JB
320 0
321 ),
322 busyWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
f59e1027 324 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
325 0
326 ),
a4e07f72 327 executedTasks: this.workerNodes.reduce(
6b27d407 328 (accumulator, workerNode) =>
f59e1027 329 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
330 0
331 ),
332 executingTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
f59e1027 334 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
335 0
336 ),
337 queuedTasks: this.workerNodes.reduce(
df593701 338 (accumulator, workerNode) =>
f59e1027 339 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
340 0
341 ),
342 maxQueuedTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
b25a42cd 344 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 345 0
a4e07f72
JB
346 ),
347 failedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
f59e1027 349 accumulator + workerNode.usage.tasks.failed,
a4e07f72 350 0
1dcf8b7b
JB
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.aggregate && {
354 runTime: {
98e72cda
JB
355 minimum: round(
356 Math.min(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
359 )
1dcf8b7b
JB
360 )
361 ),
98e72cda
JB
362 maximum: round(
363 Math.max(
364 ...this.workerNodes.map(
365 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
366 )
1dcf8b7b 367 )
98e72cda
JB
368 ),
369 average: round(
370 this.workerNodes.reduce(
371 (accumulator, workerNode) =>
372 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
373 0
374 ) /
375 this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 accumulator + (workerNode.usage.tasks?.executed ?? 0),
378 0
379 )
380 ),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .runTime.median && {
383 median: round(
384 median(
385 this.workerNodes.map(
386 workerNode => workerNode.usage.runTime?.median ?? 0
387 )
388 )
389 )
390 })
1dcf8b7b
JB
391 }
392 }),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.aggregate && {
395 waitTime: {
98e72cda
JB
396 minimum: round(
397 Math.min(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
400 )
1dcf8b7b
JB
401 )
402 ),
98e72cda
JB
403 maximum: round(
404 Math.max(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
407 )
1dcf8b7b 408 )
98e72cda
JB
409 ),
410 average: round(
411 this.workerNodes.reduce(
412 (accumulator, workerNode) =>
413 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
414 0
415 ) /
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.tasks?.executed ?? 0),
419 0
420 )
421 ),
422 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
423 .waitTime.median && {
424 median: round(
425 median(
426 this.workerNodes.map(
427 workerNode => workerNode.usage.waitTime?.median ?? 0
428 )
429 )
430 )
431 })
1dcf8b7b
JB
432 }
433 })
6b27d407
JB
434 }
435 }
08f3f44c 436
aa9eede8
JB
437 /**
438 * The pool readiness boolean status.
439 */
2431bdb4
JB
440 private get ready (): boolean {
441 return (
b97d82d8
JB
442 this.workerNodes.reduce(
443 (accumulator, workerNode) =>
444 !workerNode.info.dynamic && workerNode.info.ready
445 ? accumulator + 1
446 : accumulator,
447 0
448 ) >= this.minSize
2431bdb4
JB
449 )
450 }
451
afe0d5bf 452 /**
aa9eede8 453 * The approximate pool utilization.
afe0d5bf
JB
454 *
455 * @returns The pool utilization.
456 */
457 private get utilization (): number {
8e5ca040 458 const poolTimeCapacity =
fe7d90db 459 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
460 const totalTasksRunTime = this.workerNodes.reduce(
461 (accumulator, workerNode) =>
71514351 462 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
463 0
464 )
465 const totalTasksWaitTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
71514351 467 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
468 0
469 )
8e5ca040 470 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
471 }
472
8881ae32 473 /**
aa9eede8 474 * The pool type.
8881ae32
JB
475 *
476 * If it is `'dynamic'`, it provides the `max` property.
477 */
478 protected abstract get type (): PoolType
479
184855e6 480 /**
aa9eede8 481 * The worker type.
184855e6
JB
482 */
483 protected abstract get worker (): WorkerType
484
c2ade475 485 /**
aa9eede8 486 * The pool minimum size.
c2ade475 487 */
6b27d407 488 protected abstract get minSize (): number
ff733df7
JB
489
490 /**
aa9eede8 491 * The pool maximum size.
ff733df7 492 */
6b27d407 493 protected abstract get maxSize (): number
a35560ba 494
6b813701
JB
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
21f710aa
JB
501 private checkMessageWorkerId (message: MessageValue<Response>): void {
502 if (
503 message.workerId != null &&
aad6fb64 504 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
505 ) {
506 throw new Error(
507 `Worker message received from unknown worker '${message.workerId}'`
508 )
509 }
510 }
511
ffcbbad8 512 /**
f06e48d8 513 * Gets the given worker its worker node key.
ffcbbad8
JB
514 *
515 * @param worker - The worker.
f59e1027 516 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 517 */
aad6fb64 518 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8
JB
519 return this.workerNodes.findIndex(
520 workerNode => workerNode.worker === worker
521 )
bf9549ae
JB
522 }
523
aa9eede8
JB
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
aad6fb64 528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 529 */
aad6fb64
JB
530 private getWorkerNodeKeyByWorkerId (workerId: number): number {
531 return this.workerNodes.findIndex(
532 workerNode => workerNode.info.id === workerId
533 )
aa9eede8
JB
534 }
535
afc003b2 536 /** @inheritDoc */
a35560ba 537 public setWorkerChoiceStrategy (
59219cbb
JB
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 540 ): void {
aee46736 541 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 542 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
543 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
aa9eede8 549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 550 workerNode.resetUsage()
aa9eede8 551 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 552 }
a20f0ba5
JB
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
558 ): void {
0d80593b 559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
560 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
561 this.workerChoiceStrategyContext.setOptions(
562 this.opts.workerChoiceStrategyOptions
a35560ba
S
563 )
564 }
565
a20f0ba5 566 /** @inheritDoc */
8f52842f
JB
567 public enableTasksQueue (
568 enable: boolean,
569 tasksQueueOptions?: TasksQueueOptions
570 ): void {
a20f0ba5 571 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 572 this.flushTasksQueues()
a20f0ba5
JB
573 }
574 this.opts.enableTasksQueue = enable
8f52842f 575 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
576 }
577
578 /** @inheritDoc */
8f52842f 579 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
581 this.checkValidTasksQueueOptions(tasksQueueOptions)
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 584 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
585 delete this.opts.tasksQueueOptions
586 }
587 }
588
589 private buildTasksQueueOptions (
590 tasksQueueOptions: TasksQueueOptions
591 ): TasksQueueOptions {
592 return {
593 concurrency: tasksQueueOptions?.concurrency ?? 1
594 }
595 }
596
c319c66b
JB
597 /**
598 * Whether the pool is full or not.
599 *
600 * The pool filling boolean status.
601 */
dea903a8
JB
602 protected get full (): boolean {
603 return this.workerNodes.length >= this.maxSize
604 }
c2ade475 605
c319c66b
JB
606 /**
607 * Whether the pool is busy or not.
608 *
609 * The pool busyness boolean status.
610 */
611 protected abstract get busy (): boolean
7c0ba920 612
6c6afb84
JB
613 /**
614 * Whether worker nodes are executing at least one task.
615 *
616 * @returns Worker nodes busyness boolean status.
617 */
c2ade475 618 protected internalBusy (): boolean {
e0ae6100
JB
619 return (
620 this.workerNodes.findIndex(workerNode => {
f59e1027 621 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
622 }) === -1
623 )
cb70b19d
JB
624 }
625
afc003b2 626 /** @inheritDoc */
a86b6df1 627 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
628 return await new Promise<Response>((resolve, reject) => {
629 const timestamp = performance.now()
630 const workerNodeKey = this.chooseWorkerNode()
501aea93 631 const task: Task<Data> = {
52b71763
JB
632 name: name ?? DEFAULT_TASK_NAME,
633 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
634 data: data ?? ({} as Data),
635 timestamp,
636 workerId: this.getWorkerInfo(workerNodeKey).id as number,
637 id: randomUUID()
638 }
501aea93 639 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
640 resolve,
641 reject,
501aea93 642 workerNodeKey
2e81254d 643 })
52b71763
JB
644 if (
645 this.opts.enableTasksQueue === true &&
646 (this.busy ||
647 this.workerNodes[workerNodeKey].usage.tasks.executing >=
b5e113f6 648 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 649 ) {
501aea93 650 this.enqueueTask(workerNodeKey, task)
52b71763 651 } else {
501aea93 652 this.executeTask(workerNodeKey, task)
52b71763
JB
653 }
654 this.checkAndEmitEvents()
2e81254d 655 })
280c2a77 656 }
c97c7edb 657
afc003b2 658 /** @inheritDoc */
c97c7edb 659 public async destroy (): Promise<void> {
1fbcaa7c 660 await Promise.all(
81c02522 661 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 662 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
663 })
664 )
c97c7edb
S
665 }
666
4a6952ff 667 /**
aa9eede8 668 * Terminates the worker node given its worker node key.
4a6952ff 669 *
aa9eede8 670 * @param workerNodeKey - The worker node key.
4a6952ff 671 */
81c02522 672 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 673
729c563d 674 /**
6677a3d3
JB
675 * Setup hook to execute code before worker nodes are created in the abstract constructor.
676 * Can be overridden.
afc003b2
JB
677 *
678 * @virtual
729c563d 679 */
280c2a77 680 protected setupHook (): void {
d99ba5a8 681 // Intentionally empty
280c2a77 682 }
c97c7edb 683
729c563d 684 /**
280c2a77
S
685 * Should return whether the worker is the main worker or not.
686 */
687 protected abstract isMain (): boolean
688
689 /**
2e81254d 690 * Hook executed before the worker task execution.
bf9549ae 691 * Can be overridden.
729c563d 692 *
f06e48d8 693 * @param workerNodeKey - The worker node key.
1c6fe997 694 * @param task - The task to execute.
729c563d 695 */
1c6fe997
JB
696 protected beforeTaskExecutionHook (
697 workerNodeKey: number,
698 task: Task<Data>
699 ): void {
f59e1027 700 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
701 ++workerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 703 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
704 task.name as string
705 ) as WorkerUsage
eb8afc8a
JB
706 ++taskWorkerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
708 }
709
c01733f1 710 /**
2e81254d 711 * Hook executed after the worker task execution.
bf9549ae 712 * Can be overridden.
c01733f1 713 *
501aea93 714 * @param workerNodeKey - The worker node key.
38e795c1 715 * @param message - The received message.
c01733f1 716 */
2e81254d 717 protected afterTaskExecutionHook (
501aea93 718 workerNodeKey: number,
2740a743 719 message: MessageValue<Response>
bf9549ae 720 ): void {
ff128cc9 721 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
722 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
723 this.updateRunTimeWorkerUsage(workerUsage, message)
724 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 725 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 726 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 727 ) as WorkerUsage
eb8afc8a
JB
728 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
729 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
730 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
731 }
732
733 private updateTaskStatisticsWorkerUsage (
734 workerUsage: WorkerUsage,
735 message: MessageValue<Response>
736 ): void {
a4e07f72
JB
737 const workerTaskStatistics = workerUsage.tasks
738 --workerTaskStatistics.executing
98e72cda
JB
739 if (message.taskError == null) {
740 ++workerTaskStatistics.executed
741 } else {
a4e07f72 742 ++workerTaskStatistics.failed
2740a743 743 }
f8eb0a2a
JB
744 }
745
a4e07f72
JB
746 private updateRunTimeWorkerUsage (
747 workerUsage: WorkerUsage,
f8eb0a2a
JB
748 message: MessageValue<Response>
749 ): void {
e4f20deb
JB
750 updateMeasurementStatistics(
751 workerUsage.runTime,
752 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
753 message.taskPerformance?.runTime ?? 0,
754 workerUsage.tasks.executed
755 )
f8eb0a2a
JB
756 }
757
a4e07f72
JB
758 private updateWaitTimeWorkerUsage (
759 workerUsage: WorkerUsage,
1c6fe997 760 task: Task<Data>
f8eb0a2a 761 ): void {
1c6fe997
JB
762 const timestamp = performance.now()
763 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
764 updateMeasurementStatistics(
765 workerUsage.waitTime,
766 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
767 taskWaitTime,
768 workerUsage.tasks.executed
769 )
c01733f1 770 }
771
a4e07f72 772 private updateEluWorkerUsage (
5df69fab 773 workerUsage: WorkerUsage,
62c15a68
JB
774 message: MessageValue<Response>
775 ): void {
008512c7
JB
776 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
777 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
778 updateMeasurementStatistics(
779 workerUsage.elu.active,
008512c7 780 eluTaskStatisticsRequirements,
e4f20deb
JB
781 message.taskPerformance?.elu?.active ?? 0,
782 workerUsage.tasks.executed
783 )
784 updateMeasurementStatistics(
785 workerUsage.elu.idle,
008512c7 786 eluTaskStatisticsRequirements,
e4f20deb
JB
787 message.taskPerformance?.elu?.idle ?? 0,
788 workerUsage.tasks.executed
789 )
008512c7 790 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 791 if (message.taskPerformance?.elu != null) {
f7510105
JB
792 if (workerUsage.elu.utilization != null) {
793 workerUsage.elu.utilization =
794 (workerUsage.elu.utilization +
795 message.taskPerformance.elu.utilization) /
796 2
797 } else {
798 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
799 }
62c15a68
JB
800 }
801 }
802 }
803
280c2a77 804 /**
f06e48d8 805 * Chooses a worker node for the next task.
280c2a77 806 *
6c6afb84 807 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 808 *
aa9eede8 809 * @returns The chosen worker node key
280c2a77 810 */
6c6afb84 811 private chooseWorkerNode (): number {
930dcf12 812 if (this.shallCreateDynamicWorker()) {
aa9eede8 813 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
814 if (
815 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
816 ) {
aa9eede8 817 return workerNodeKey
6c6afb84 818 }
17393ac8 819 }
930dcf12
JB
820 return this.workerChoiceStrategyContext.execute()
821 }
822
6c6afb84
JB
823 /**
824 * Conditions for dynamic worker creation.
825 *
826 * @returns Whether to create a dynamic worker or not.
827 */
828 private shallCreateDynamicWorker (): boolean {
930dcf12 829 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
830 }
831
280c2a77 832 /**
aa9eede8 833 * Sends a message to worker given its worker node key.
280c2a77 834 *
aa9eede8 835 * @param workerNodeKey - The worker node key.
38e795c1 836 * @param message - The message.
280c2a77
S
837 */
838 protected abstract sendToWorker (
aa9eede8 839 workerNodeKey: number,
280c2a77
S
840 message: MessageValue<Data>
841 ): void
842
729c563d 843 /**
41344292 844 * Creates a new worker.
6c6afb84
JB
845 *
846 * @returns Newly created worker.
729c563d 847 */
280c2a77 848 protected abstract createWorker (): Worker
c97c7edb 849
4a6952ff 850 /**
aa9eede8 851 * Creates a new, completely set up worker node.
4a6952ff 852 *
aa9eede8 853 * @returns New, completely set up worker node key.
4a6952ff 854 */
aa9eede8 855 protected createAndSetupWorkerNode (): number {
bdacc2d2 856 const worker = this.createWorker()
280c2a77 857
35cf1c03 858 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 859 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 860 worker.on('error', error => {
aad6fb64 861 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
862 const workerInfo = this.getWorkerInfo(workerNodeKey)
863 workerInfo.ready = false
0dc838e3 864 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 865 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 866 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 867 if (workerInfo.dynamic) {
aa9eede8 868 this.createAndSetupDynamicWorkerNode()
8a1260a3 869 } else {
aa9eede8 870 this.createAndSetupWorkerNode()
8a1260a3 871 }
5baee0d7 872 }
19dbc45b 873 if (this.opts.enableTasksQueue === true) {
9b106837 874 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 875 }
5baee0d7 876 })
a35560ba
S
877 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
878 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 879 worker.once('exit', () => {
f06e48d8 880 this.removeWorkerNode(worker)
a974afa6 881 })
280c2a77 882
aa9eede8 883 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 884
aa9eede8 885 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 886
aa9eede8 887 return workerNodeKey
c97c7edb 888 }
be0676b3 889
930dcf12 890 /**
aa9eede8 891 * Creates a new, completely set up dynamic worker node.
930dcf12 892 *
aa9eede8 893 * @returns New, completely set up dynamic worker node key.
930dcf12 894 */
aa9eede8
JB
895 protected createAndSetupDynamicWorkerNode (): number {
896 const workerNodeKey = this.createAndSetupWorkerNode()
897 this.registerWorkerMessageListener(workerNodeKey, message => {
898 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
899 message.workerId
aad6fb64 900 )
aa9eede8 901 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 902 // Kill message received from worker
930dcf12
JB
903 if (
904 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
905 (message.kill != null &&
906 ((this.opts.enableTasksQueue === false &&
aa9eede8 907 workerUsage.tasks.executing === 0) ||
7b56f532 908 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
909 workerUsage.tasks.executing === 0 &&
910 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 911 ) {
81c02522 912 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930dcf12
JB
913 }
914 })
aa9eede8 915 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 916 this.sendToWorker(workerNodeKey, {
b0a4db63 917 checkActive: true,
21f710aa
JB
918 workerId: workerInfo.id as number
919 })
b5e113f6
JB
920 workerInfo.dynamic = true
921 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
922 workerInfo.ready = true
923 }
aa9eede8 924 return workerNodeKey
930dcf12
JB
925 }
926
a2ed5053 927 /**
aa9eede8 928 * Registers a listener callback on the worker given its worker node key.
a2ed5053 929 *
aa9eede8 930 * @param workerNodeKey - The worker node key.
a2ed5053
JB
931 * @param listener - The message listener callback.
932 */
85aeb3f3
JB
933 protected abstract registerWorkerMessageListener<
934 Message extends Data | Response
aa9eede8
JB
935 >(
936 workerNodeKey: number,
937 listener: (message: MessageValue<Message>) => void
938 ): void
a2ed5053
JB
939
940 /**
aa9eede8 941 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
942 * Can be overridden.
943 *
aa9eede8 944 * @param workerNodeKey - The newly created worker node key.
a2ed5053 945 */
aa9eede8 946 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 947 // Listen to worker messages.
aa9eede8 948 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 949 // Send the startup message to worker.
aa9eede8
JB
950 this.sendStartupMessageToWorker(workerNodeKey)
951 // Send the worker statistics message to worker.
952 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
953 }
954
85aeb3f3 955 /**
aa9eede8
JB
956 * Sends the startup message to worker given its worker node key.
957 *
958 * @param workerNodeKey - The worker node key.
959 */
960 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
961
962 /**
963 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 964 *
aa9eede8 965 * @param workerNodeKey - The worker node key.
85aeb3f3 966 */
aa9eede8
JB
967 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
968 this.sendToWorker(workerNodeKey, {
969 statistics: {
970 runTime:
971 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
972 .runTime.aggregate,
973 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
974 .elu.aggregate
975 },
976 workerId: this.getWorkerInfo(workerNodeKey).id as number
977 })
978 }
a2ed5053
JB
979
980 private redistributeQueuedTasks (workerNodeKey: number): void {
981 while (this.tasksQueueSize(workerNodeKey) > 0) {
982 let targetWorkerNodeKey: number = workerNodeKey
983 let minQueuedTasks = Infinity
10ecf8fd 984 let executeTask = false
a2ed5053
JB
985 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
986 const workerInfo = this.getWorkerInfo(workerNodeId)
987 if (
988 workerNodeId !== workerNodeKey &&
989 workerInfo.ready &&
990 workerNode.usage.tasks.queued === 0
991 ) {
a5ed75b7
JB
992 if (
993 this.workerNodes[workerNodeId].usage.tasks.executing <
994 (this.opts.tasksQueueOptions?.concurrency as number)
995 ) {
10ecf8fd
JB
996 executeTask = true
997 }
a2ed5053
JB
998 targetWorkerNodeKey = workerNodeId
999 break
1000 }
1001 if (
1002 workerNodeId !== workerNodeKey &&
1003 workerInfo.ready &&
1004 workerNode.usage.tasks.queued < minQueuedTasks
1005 ) {
1006 minQueuedTasks = workerNode.usage.tasks.queued
1007 targetWorkerNodeKey = workerNodeId
1008 }
1009 }
10ecf8fd
JB
1010 if (executeTask) {
1011 this.executeTask(
1012 targetWorkerNodeKey,
1013 this.dequeueTask(workerNodeKey) as Task<Data>
1014 )
1015 } else {
1016 this.enqueueTask(
1017 targetWorkerNodeKey,
1018 this.dequeueTask(workerNodeKey) as Task<Data>
1019 )
1020 }
a2ed5053
JB
1021 }
1022 }
1023
be0676b3 1024 /**
aa9eede8 1025 * This method is the listener registered for each worker message.
be0676b3 1026 *
bdacc2d2 1027 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1028 */
1029 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1030 return message => {
21f710aa 1031 this.checkMessageWorkerId(message)
d2c73f82 1032 if (message.ready != null) {
81c02522 1033 // Worker ready response received from worker
10e2aa7e 1034 this.handleWorkerReadyResponse(message)
f59e1027 1035 } else if (message.id != null) {
81c02522 1036 // Task execution response received from worker
6b272951
JB
1037 this.handleTaskExecutionResponse(message)
1038 }
1039 }
1040 }
1041
10e2aa7e 1042 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1043 this.getWorkerInfo(
aad6fb64 1044 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1045 ).ready = message.ready as boolean
2431bdb4
JB
1046 if (this.emitter != null && this.ready) {
1047 this.emitter.emit(PoolEvents.ready, this.info)
1048 }
6b272951
JB
1049 }
1050
1051 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1052 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1053 if (promiseResponse != null) {
1054 if (message.taskError != null) {
2a69b8c5 1055 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1056 promiseResponse.reject(message.taskError.message)
1057 } else {
1058 promiseResponse.resolve(message.data as Response)
1059 }
501aea93
JB
1060 const workerNodeKey = promiseResponse.workerNodeKey
1061 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1062 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1063 if (
1064 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1065 this.tasksQueueSize(workerNodeKey) > 0 &&
1066 this.workerNodes[workerNodeKey].usage.tasks.executing <
1067 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1068 ) {
1069 this.executeTask(
1070 workerNodeKey,
1071 this.dequeueTask(workerNodeKey) as Task<Data>
1072 )
be0676b3 1073 }
6b272951 1074 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1075 }
be0676b3 1076 }
7c0ba920 1077
ff733df7 1078 private checkAndEmitEvents (): void {
1f68cede 1079 if (this.emitter != null) {
ff733df7 1080 if (this.busy) {
2845f2a5 1081 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1082 }
6b27d407 1083 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1084 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1085 }
164d950a
JB
1086 }
1087 }
1088
8a1260a3 1089 /**
aa9eede8 1090 * Gets the worker information given its worker node key.
8a1260a3
JB
1091 *
1092 * @param workerNodeKey - The worker node key.
3f09ed9f 1093 * @returns The worker information.
8a1260a3 1094 */
aa9eede8 1095 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1096 return this.workerNodes[workerNodeKey].info
e221309a
JB
1097 }
1098
a05c10de 1099 /**
b0a4db63 1100 * Adds the given worker in the pool worker nodes.
ea7a90d3 1101 *
38e795c1 1102 * @param worker - The worker.
aa9eede8
JB
1103 * @returns The added worker node key.
1104 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1105 */
b0a4db63 1106 private addWorkerNode (worker: Worker): number {
cc3ab78b 1107 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1108 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1109 if (this.starting) {
1110 workerNode.info.ready = true
1111 }
aa9eede8 1112 this.workerNodes.push(workerNode)
aad6fb64 1113 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1114 if (workerNodeKey === -1) {
1115 throw new Error('Worker node not found')
1116 }
1117 return workerNodeKey
ea7a90d3 1118 }
c923ce56 1119
51fe3d3c 1120 /**
f06e48d8 1121 * Removes the given worker from the pool worker nodes.
51fe3d3c 1122 *
f06e48d8 1123 * @param worker - The worker.
51fe3d3c 1124 */
416fd65c 1125 private removeWorkerNode (worker: Worker): void {
aad6fb64 1126 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1127 if (workerNodeKey !== -1) {
1128 this.workerNodes.splice(workerNodeKey, 1)
1129 this.workerChoiceStrategyContext.remove(workerNodeKey)
1130 }
51fe3d3c 1131 }
adc3c320 1132
b0a4db63 1133 /**
aa9eede8 1134 * Executes the given task on the worker given its worker node key.
b0a4db63 1135 *
aa9eede8 1136 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1137 * @param task - The task to execute.
1138 */
2e81254d 1139 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1140 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1141 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1142 }
1143
f9f00b5f 1144 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1145 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1146 }
1147
416fd65c 1148 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1149 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1150 }
1151
416fd65c 1152 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1153 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1154 }
1155
81c02522 1156 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1157 while (this.tasksQueueSize(workerNodeKey) > 0) {
1158 this.executeTask(
1159 workerNodeKey,
1160 this.dequeueTask(workerNodeKey) as Task<Data>
1161 )
ff733df7 1162 }
4b628b48 1163 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1164 }
1165
ef41a6e6
JB
1166 private flushTasksQueues (): void {
1167 for (const [workerNodeKey] of this.workerNodes.entries()) {
1168 this.flushTasksQueue(workerNodeKey)
1169 }
1170 }
c97c7edb 1171}