chore: generate documentation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
501aea93
JB
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
075e51d1 86 /**
adc9cc64 87 * Whether the pool is starting or not.
075e51d1
JB
88 */
89 private readonly starting: boolean
afe0d5bf
JB
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
729c563d
S
95 /**
96 * Constructs a new poolifier pool.
97 *
38e795c1 98 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 99 * @param filePath - Path to the worker file.
38e795c1 100 * @param opts - Options for the pool.
729c563d 101 */
c97c7edb 102 public constructor (
b4213b7f
JB
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
c97c7edb 106 ) {
78cea37e 107 if (!this.isMain()) {
c97c7edb
S
108 throw new Error('Cannot start a pool from a worker!')
109 }
8d3782fa 110 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 111 this.checkFilePath(this.filePath)
7c0ba920 112 this.checkPoolOptions(this.opts)
1086026a 113
7254e419
JB
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 117 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 119
6bd72cd0 120 if (this.opts.enableEvents === true) {
7c0ba920
JB
121 this.emitter = new PoolEmitter()
122 }
d59df138
JB
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
da309861
JB
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
b6b32453
JB
132
133 this.setupHook()
134
075e51d1 135 this.starting = true
e761c033 136 this.startPool()
075e51d1 137 this.starting = false
afe0d5bf
JB
138
139 this.startTimestamp = performance.now()
c97c7edb
S
140 }
141
a35560ba 142 private checkFilePath (filePath: string): void {
ffcbbad8
JB
143 if (
144 filePath == null ||
3d6dd312 145 typeof filePath !== 'string' ||
ffcbbad8
JB
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
c510fea7
APA
148 throw new Error('Please specify a file with a worker implementation')
149 }
3d6dd312
JB
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
c510fea7
APA
153 }
154
8d3782fa
JB
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
78cea37e 160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 161 throw new TypeError(
0d80593b 162 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
163 )
164 } else if (numberOfWorkers < 0) {
473c717a 165 throw new RangeError(
8d3782fa
JB
166 'Cannot instantiate a pool with a negative number of workers'
167 )
6b27d407 168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 174 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
175 if (max == null) {
176 throw new Error(
177 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
178 )
179 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
180 throw new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 } else if (min > max) {
079de991
JB
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
b97d82d8 187 } else if (max === 0) {
079de991 188 throw new RangeError(
b97d82d8 189 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
8d3782fa
JB
196 }
197 }
198
7c0ba920 199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
1f68cede 210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 223 }
aee46736
JB
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 230 throw new Error(
aee46736 231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
232 )
233 }
7c0ba920
JB
234 }
235
0d80593b
JB
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
49be33fe
JB
244 if (
245 workerChoiceStrategyOptions.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803
JB
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
a20f0ba5
JB
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
0d80593b
JB
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
f0d7f803
JB
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
a20f0ba5 282 throw new Error(
f0d7f803 283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
284 )
285 }
286 }
287
e761c033
JB
288 private startPool (): void {
289 while (
290 this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
293 0
294 ) < this.numberOfWorkers
295 ) {
aa9eede8 296 this.createAndSetupWorkerNode()
e761c033
JB
297 }
298 }
299
08f3f44c 300 /** @inheritDoc */
6b27d407
JB
301 public get info (): PoolInfo {
302 return {
23ccf9d7 303 version,
6b27d407 304 type: this.type,
184855e6 305 worker: this.worker,
2431bdb4
JB
306 ready: this.ready,
307 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
308 minSize: this.minSize,
309 maxSize: this.maxSize,
c05f0d50
JB
310 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
311 .runTime.aggregate &&
1305e9a8
JB
312 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
314 workerNodes: this.workerNodes.length,
315 idleWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing === 0
a4e07f72
JB
318 ? accumulator + 1
319 : accumulator,
6b27d407
JB
320 0
321 ),
322 busyWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
f59e1027 324 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
325 0
326 ),
a4e07f72 327 executedTasks: this.workerNodes.reduce(
6b27d407 328 (accumulator, workerNode) =>
f59e1027 329 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
330 0
331 ),
332 executingTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
f59e1027 334 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
335 0
336 ),
337 queuedTasks: this.workerNodes.reduce(
df593701 338 (accumulator, workerNode) =>
f59e1027 339 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
340 0
341 ),
342 maxQueuedTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
b25a42cd 344 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 345 0
a4e07f72
JB
346 ),
347 failedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
f59e1027 349 accumulator + workerNode.usage.tasks.failed,
a4e07f72 350 0
1dcf8b7b
JB
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.aggregate && {
354 runTime: {
98e72cda
JB
355 minimum: round(
356 Math.min(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
359 )
1dcf8b7b
JB
360 )
361 ),
98e72cda
JB
362 maximum: round(
363 Math.max(
364 ...this.workerNodes.map(
365 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
366 )
1dcf8b7b 367 )
98e72cda
JB
368 ),
369 average: round(
370 this.workerNodes.reduce(
371 (accumulator, workerNode) =>
372 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
373 0
374 ) /
375 this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 accumulator + (workerNode.usage.tasks?.executed ?? 0),
378 0
379 )
380 ),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .runTime.median && {
383 median: round(
384 median(
385 this.workerNodes.map(
386 workerNode => workerNode.usage.runTime?.median ?? 0
387 )
388 )
389 )
390 })
1dcf8b7b
JB
391 }
392 }),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.aggregate && {
395 waitTime: {
98e72cda
JB
396 minimum: round(
397 Math.min(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
400 )
1dcf8b7b
JB
401 )
402 ),
98e72cda
JB
403 maximum: round(
404 Math.max(
405 ...this.workerNodes.map(
406 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
407 )
1dcf8b7b 408 )
98e72cda
JB
409 ),
410 average: round(
411 this.workerNodes.reduce(
412 (accumulator, workerNode) =>
413 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
414 0
415 ) /
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.tasks?.executed ?? 0),
419 0
420 )
421 ),
422 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
423 .waitTime.median && {
424 median: round(
425 median(
426 this.workerNodes.map(
427 workerNode => workerNode.usage.waitTime?.median ?? 0
428 )
429 )
430 )
431 })
1dcf8b7b
JB
432 }
433 })
6b27d407
JB
434 }
435 }
08f3f44c 436
aa9eede8
JB
437 /**
438 * The pool readiness boolean status.
439 */
2431bdb4
JB
440 private get ready (): boolean {
441 return (
b97d82d8
JB
442 this.workerNodes.reduce(
443 (accumulator, workerNode) =>
444 !workerNode.info.dynamic && workerNode.info.ready
445 ? accumulator + 1
446 : accumulator,
447 0
448 ) >= this.minSize
2431bdb4
JB
449 )
450 }
451
afe0d5bf 452 /**
aa9eede8 453 * The approximate pool utilization.
afe0d5bf
JB
454 *
455 * @returns The pool utilization.
456 */
457 private get utilization (): number {
8e5ca040 458 const poolTimeCapacity =
fe7d90db 459 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
460 const totalTasksRunTime = this.workerNodes.reduce(
461 (accumulator, workerNode) =>
71514351 462 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
463 0
464 )
465 const totalTasksWaitTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
71514351 467 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
468 0
469 )
8e5ca040 470 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
471 }
472
8881ae32 473 /**
aa9eede8 474 * The pool type.
8881ae32
JB
475 *
476 * If it is `'dynamic'`, it provides the `max` property.
477 */
478 protected abstract get type (): PoolType
479
184855e6 480 /**
aa9eede8 481 * The worker type.
184855e6
JB
482 */
483 protected abstract get worker (): WorkerType
484
c2ade475 485 /**
aa9eede8 486 * The pool minimum size.
c2ade475 487 */
6b27d407 488 protected abstract get minSize (): number
ff733df7
JB
489
490 /**
aa9eede8 491 * The pool maximum size.
ff733df7 492 */
6b27d407 493 protected abstract get maxSize (): number
a35560ba 494
6b813701
JB
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
21f710aa
JB
501 private checkMessageWorkerId (message: MessageValue<Response>): void {
502 if (
503 message.workerId != null &&
aad6fb64 504 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
505 ) {
506 throw new Error(
507 `Worker message received from unknown worker '${message.workerId}'`
508 )
509 }
510 }
511
ffcbbad8 512 /**
f06e48d8 513 * Gets the given worker its worker node key.
ffcbbad8
JB
514 *
515 * @param worker - The worker.
f59e1027 516 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 517 */
aad6fb64 518 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8
JB
519 return this.workerNodes.findIndex(
520 workerNode => workerNode.worker === worker
521 )
bf9549ae
JB
522 }
523
aa9eede8
JB
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
aad6fb64 528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 529 */
aad6fb64
JB
530 private getWorkerNodeKeyByWorkerId (workerId: number): number {
531 return this.workerNodes.findIndex(
532 workerNode => workerNode.info.id === workerId
533 )
aa9eede8
JB
534 }
535
afc003b2 536 /** @inheritDoc */
a35560ba 537 public setWorkerChoiceStrategy (
59219cbb
JB
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 540 ): void {
aee46736 541 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 542 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
543 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
aa9eede8 549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 550 workerNode.resetUsage()
aa9eede8 551 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 552 }
a20f0ba5
JB
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
558 ): void {
0d80593b 559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
560 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
561 this.workerChoiceStrategyContext.setOptions(
562 this.opts.workerChoiceStrategyOptions
a35560ba
S
563 )
564 }
565
a20f0ba5 566 /** @inheritDoc */
8f52842f
JB
567 public enableTasksQueue (
568 enable: boolean,
569 tasksQueueOptions?: TasksQueueOptions
570 ): void {
a20f0ba5 571 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 572 this.flushTasksQueues()
a20f0ba5
JB
573 }
574 this.opts.enableTasksQueue = enable
8f52842f 575 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
576 }
577
578 /** @inheritDoc */
8f52842f 579 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
581 this.checkValidTasksQueueOptions(tasksQueueOptions)
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 584 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
585 delete this.opts.tasksQueueOptions
586 }
587 }
588
589 private buildTasksQueueOptions (
590 tasksQueueOptions: TasksQueueOptions
591 ): TasksQueueOptions {
592 return {
593 concurrency: tasksQueueOptions?.concurrency ?? 1
594 }
595 }
596
c319c66b
JB
597 /**
598 * Whether the pool is full or not.
599 *
600 * The pool filling boolean status.
601 */
dea903a8
JB
602 protected get full (): boolean {
603 return this.workerNodes.length >= this.maxSize
604 }
c2ade475 605
c319c66b
JB
606 /**
607 * Whether the pool is busy or not.
608 *
609 * The pool busyness boolean status.
610 */
611 protected abstract get busy (): boolean
7c0ba920 612
6c6afb84
JB
613 /**
614 * Whether worker nodes are executing at least one task.
615 *
616 * @returns Worker nodes busyness boolean status.
617 */
c2ade475 618 protected internalBusy (): boolean {
e0ae6100 619 return (
fd7de21a
JB
620 this.workerNodes.findIndex(
621 workerNode =>
622 workerNode.info.ready && workerNode.usage.tasks.executing === 0
623 ) === -1
e0ae6100 624 )
cb70b19d
JB
625 }
626
afc003b2 627 /** @inheritDoc */
a86b6df1 628 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
629 return await new Promise<Response>((resolve, reject) => {
630 const timestamp = performance.now()
631 const workerNodeKey = this.chooseWorkerNode()
501aea93 632 const task: Task<Data> = {
52b71763
JB
633 name: name ?? DEFAULT_TASK_NAME,
634 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
635 data: data ?? ({} as Data),
636 timestamp,
637 workerId: this.getWorkerInfo(workerNodeKey).id as number,
638 id: randomUUID()
639 }
501aea93 640 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
641 resolve,
642 reject,
501aea93 643 workerNodeKey
2e81254d 644 })
52b71763 645 if (
4e377863
JB
646 this.opts.enableTasksQueue === false ||
647 (this.opts.enableTasksQueue === true &&
648 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 649 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 650 ) {
501aea93 651 this.executeTask(workerNodeKey, task)
4e377863
JB
652 } else {
653 this.enqueueTask(workerNodeKey, task)
52b71763
JB
654 }
655 this.checkAndEmitEvents()
2e81254d 656 })
280c2a77 657 }
c97c7edb 658
afc003b2 659 /** @inheritDoc */
c97c7edb 660 public async destroy (): Promise<void> {
1fbcaa7c 661 await Promise.all(
81c02522 662 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 663 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
664 })
665 )
c97c7edb
S
666 }
667
4a6952ff 668 /**
aa9eede8 669 * Terminates the worker node given its worker node key.
4a6952ff 670 *
aa9eede8 671 * @param workerNodeKey - The worker node key.
4a6952ff 672 */
81c02522 673 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 674
729c563d 675 /**
6677a3d3
JB
676 * Setup hook to execute code before worker nodes are created in the abstract constructor.
677 * Can be overridden.
afc003b2
JB
678 *
679 * @virtual
729c563d 680 */
280c2a77 681 protected setupHook (): void {
d99ba5a8 682 // Intentionally empty
280c2a77 683 }
c97c7edb 684
729c563d 685 /**
280c2a77
S
686 * Should return whether the worker is the main worker or not.
687 */
688 protected abstract isMain (): boolean
689
690 /**
2e81254d 691 * Hook executed before the worker task execution.
bf9549ae 692 * Can be overridden.
729c563d 693 *
f06e48d8 694 * @param workerNodeKey - The worker node key.
1c6fe997 695 * @param task - The task to execute.
729c563d 696 */
1c6fe997
JB
697 protected beforeTaskExecutionHook (
698 workerNodeKey: number,
699 task: Task<Data>
700 ): void {
f59e1027 701 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
702 ++workerUsage.tasks.executing
703 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 704 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
705 task.name as string
706 ) as WorkerUsage
eb8afc8a
JB
707 ++taskWorkerUsage.tasks.executing
708 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
709 }
710
c01733f1 711 /**
2e81254d 712 * Hook executed after the worker task execution.
bf9549ae 713 * Can be overridden.
c01733f1 714 *
501aea93 715 * @param workerNodeKey - The worker node key.
38e795c1 716 * @param message - The received message.
c01733f1 717 */
2e81254d 718 protected afterTaskExecutionHook (
501aea93 719 workerNodeKey: number,
2740a743 720 message: MessageValue<Response>
bf9549ae 721 ): void {
ff128cc9 722 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 728 ) as WorkerUsage
eb8afc8a
JB
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
a4e07f72
JB
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
98e72cda
JB
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
a4e07f72 743 ++workerTaskStatistics.failed
2740a743 744 }
f8eb0a2a
JB
745 }
746
a4e07f72
JB
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
f8eb0a2a
JB
749 message: MessageValue<Response>
750 ): void {
e4f20deb
JB
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
f8eb0a2a
JB
757 }
758
a4e07f72
JB
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
1c6fe997 761 task: Task<Data>
f8eb0a2a 762 ): void {
1c6fe997
JB
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
c01733f1 771 }
772
a4e07f72 773 private updateEluWorkerUsage (
5df69fab 774 workerUsage: WorkerUsage,
62c15a68
JB
775 message: MessageValue<Response>
776 ): void {
008512c7
JB
777 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
779 updateMeasurementStatistics(
780 workerUsage.elu.active,
008512c7 781 eluTaskStatisticsRequirements,
e4f20deb
JB
782 message.taskPerformance?.elu?.active ?? 0,
783 workerUsage.tasks.executed
784 )
785 updateMeasurementStatistics(
786 workerUsage.elu.idle,
008512c7 787 eluTaskStatisticsRequirements,
e4f20deb
JB
788 message.taskPerformance?.elu?.idle ?? 0,
789 workerUsage.tasks.executed
790 )
008512c7 791 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 792 if (message.taskPerformance?.elu != null) {
f7510105
JB
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
62c15a68
JB
801 }
802 }
803 }
804
280c2a77 805 /**
f06e48d8 806 * Chooses a worker node for the next task.
280c2a77 807 *
6c6afb84 808 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 809 *
aa9eede8 810 * @returns The chosen worker node key
280c2a77 811 */
6c6afb84 812 private chooseWorkerNode (): number {
930dcf12 813 if (this.shallCreateDynamicWorker()) {
aa9eede8 814 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
815 if (
816 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
817 ) {
aa9eede8 818 return workerNodeKey
6c6afb84 819 }
17393ac8 820 }
930dcf12
JB
821 return this.workerChoiceStrategyContext.execute()
822 }
823
6c6afb84
JB
824 /**
825 * Conditions for dynamic worker creation.
826 *
827 * @returns Whether to create a dynamic worker or not.
828 */
829 private shallCreateDynamicWorker (): boolean {
930dcf12 830 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
831 }
832
280c2a77 833 /**
aa9eede8 834 * Sends a message to worker given its worker node key.
280c2a77 835 *
aa9eede8 836 * @param workerNodeKey - The worker node key.
38e795c1 837 * @param message - The message.
280c2a77
S
838 */
839 protected abstract sendToWorker (
aa9eede8 840 workerNodeKey: number,
280c2a77
S
841 message: MessageValue<Data>
842 ): void
843
729c563d 844 /**
41344292 845 * Creates a new worker.
6c6afb84
JB
846 *
847 * @returns Newly created worker.
729c563d 848 */
280c2a77 849 protected abstract createWorker (): Worker
c97c7edb 850
4a6952ff 851 /**
aa9eede8 852 * Creates a new, completely set up worker node.
4a6952ff 853 *
aa9eede8 854 * @returns New, completely set up worker node key.
4a6952ff 855 */
aa9eede8 856 protected createAndSetupWorkerNode (): number {
bdacc2d2 857 const worker = this.createWorker()
280c2a77 858
35cf1c03 859 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 860 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 861 worker.on('error', error => {
aad6fb64 862 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
863 const workerInfo = this.getWorkerInfo(workerNodeKey)
864 workerInfo.ready = false
0dc838e3 865 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 866 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 867 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 868 if (workerInfo.dynamic) {
aa9eede8 869 this.createAndSetupDynamicWorkerNode()
8a1260a3 870 } else {
aa9eede8 871 this.createAndSetupWorkerNode()
8a1260a3 872 }
5baee0d7 873 }
19dbc45b 874 if (this.opts.enableTasksQueue === true) {
9b106837 875 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 876 }
5baee0d7 877 })
a35560ba
S
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 880 worker.once('exit', () => {
f06e48d8 881 this.removeWorkerNode(worker)
a974afa6 882 })
280c2a77 883
aa9eede8 884 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 885
aa9eede8 886 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 887
aa9eede8 888 return workerNodeKey
c97c7edb 889 }
be0676b3 890
930dcf12 891 /**
aa9eede8 892 * Creates a new, completely set up dynamic worker node.
930dcf12 893 *
aa9eede8 894 * @returns New, completely set up dynamic worker node key.
930dcf12 895 */
aa9eede8
JB
896 protected createAndSetupDynamicWorkerNode (): number {
897 const workerNodeKey = this.createAndSetupWorkerNode()
898 this.registerWorkerMessageListener(workerNodeKey, message => {
899 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
900 message.workerId
aad6fb64 901 )
aa9eede8 902 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 903 // Kill message received from worker
930dcf12
JB
904 if (
905 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
906 (message.kill != null &&
907 ((this.opts.enableTasksQueue === false &&
aa9eede8 908 workerUsage.tasks.executing === 0) ||
7b56f532 909 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
910 workerUsage.tasks.executing === 0 &&
911 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 912 ) {
81c02522 913 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930dcf12
JB
914 }
915 })
aa9eede8 916 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 917 this.sendToWorker(workerNodeKey, {
b0a4db63 918 checkActive: true,
21f710aa
JB
919 workerId: workerInfo.id as number
920 })
b5e113f6
JB
921 workerInfo.dynamic = true
922 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
923 workerInfo.ready = true
924 }
aa9eede8 925 return workerNodeKey
930dcf12
JB
926 }
927
a2ed5053 928 /**
aa9eede8 929 * Registers a listener callback on the worker given its worker node key.
a2ed5053 930 *
aa9eede8 931 * @param workerNodeKey - The worker node key.
a2ed5053
JB
932 * @param listener - The message listener callback.
933 */
85aeb3f3
JB
934 protected abstract registerWorkerMessageListener<
935 Message extends Data | Response
aa9eede8
JB
936 >(
937 workerNodeKey: number,
938 listener: (message: MessageValue<Message>) => void
939 ): void
a2ed5053
JB
940
941 /**
aa9eede8 942 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
943 * Can be overridden.
944 *
aa9eede8 945 * @param workerNodeKey - The newly created worker node key.
a2ed5053 946 */
aa9eede8 947 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 948 // Listen to worker messages.
aa9eede8 949 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 950 // Send the startup message to worker.
aa9eede8
JB
951 this.sendStartupMessageToWorker(workerNodeKey)
952 // Send the worker statistics message to worker.
953 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
954 }
955
85aeb3f3 956 /**
aa9eede8
JB
957 * Sends the startup message to worker given its worker node key.
958 *
959 * @param workerNodeKey - The worker node key.
960 */
961 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
962
963 /**
964 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 965 *
aa9eede8 966 * @param workerNodeKey - The worker node key.
85aeb3f3 967 */
aa9eede8
JB
968 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
969 this.sendToWorker(workerNodeKey, {
970 statistics: {
971 runTime:
972 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
973 .runTime.aggregate,
974 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
975 .elu.aggregate
976 },
977 workerId: this.getWorkerInfo(workerNodeKey).id as number
978 })
979 }
a2ed5053
JB
980
981 private redistributeQueuedTasks (workerNodeKey: number): void {
982 while (this.tasksQueueSize(workerNodeKey) > 0) {
983 let targetWorkerNodeKey: number = workerNodeKey
984 let minQueuedTasks = Infinity
10ecf8fd 985 let executeTask = false
a2ed5053
JB
986 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
987 const workerInfo = this.getWorkerInfo(workerNodeId)
988 if (
989 workerNodeId !== workerNodeKey &&
990 workerInfo.ready &&
991 workerNode.usage.tasks.queued === 0
992 ) {
a5ed75b7
JB
993 if (
994 this.workerNodes[workerNodeId].usage.tasks.executing <
995 (this.opts.tasksQueueOptions?.concurrency as number)
996 ) {
10ecf8fd
JB
997 executeTask = true
998 }
a2ed5053
JB
999 targetWorkerNodeKey = workerNodeId
1000 break
1001 }
1002 if (
1003 workerNodeId !== workerNodeKey &&
1004 workerInfo.ready &&
1005 workerNode.usage.tasks.queued < minQueuedTasks
1006 ) {
1007 minQueuedTasks = workerNode.usage.tasks.queued
1008 targetWorkerNodeKey = workerNodeId
1009 }
1010 }
10ecf8fd
JB
1011 if (executeTask) {
1012 this.executeTask(
1013 targetWorkerNodeKey,
1014 this.dequeueTask(workerNodeKey) as Task<Data>
1015 )
1016 } else {
1017 this.enqueueTask(
1018 targetWorkerNodeKey,
1019 this.dequeueTask(workerNodeKey) as Task<Data>
1020 )
1021 }
a2ed5053
JB
1022 }
1023 }
1024
be0676b3 1025 /**
aa9eede8 1026 * This method is the listener registered for each worker message.
be0676b3 1027 *
bdacc2d2 1028 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1029 */
1030 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1031 return message => {
21f710aa 1032 this.checkMessageWorkerId(message)
d2c73f82 1033 if (message.ready != null) {
81c02522 1034 // Worker ready response received from worker
10e2aa7e 1035 this.handleWorkerReadyResponse(message)
f59e1027 1036 } else if (message.id != null) {
81c02522 1037 // Task execution response received from worker
6b272951
JB
1038 this.handleTaskExecutionResponse(message)
1039 }
1040 }
1041 }
1042
10e2aa7e 1043 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1044 this.getWorkerInfo(
aad6fb64 1045 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1046 ).ready = message.ready as boolean
2431bdb4
JB
1047 if (this.emitter != null && this.ready) {
1048 this.emitter.emit(PoolEvents.ready, this.info)
1049 }
6b272951
JB
1050 }
1051
1052 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1053 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1054 if (promiseResponse != null) {
1055 if (message.taskError != null) {
2a69b8c5 1056 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1057 promiseResponse.reject(message.taskError.message)
1058 } else {
1059 promiseResponse.resolve(message.data as Response)
1060 }
501aea93
JB
1061 const workerNodeKey = promiseResponse.workerNodeKey
1062 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1063 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1064 if (
1065 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1066 this.tasksQueueSize(workerNodeKey) > 0 &&
1067 this.workerNodes[workerNodeKey].usage.tasks.executing <
1068 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1069 ) {
1070 this.executeTask(
1071 workerNodeKey,
1072 this.dequeueTask(workerNodeKey) as Task<Data>
1073 )
be0676b3 1074 }
6b272951 1075 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1076 }
be0676b3 1077 }
7c0ba920 1078
ff733df7 1079 private checkAndEmitEvents (): void {
1f68cede 1080 if (this.emitter != null) {
ff733df7 1081 if (this.busy) {
2845f2a5 1082 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1083 }
6b27d407 1084 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1085 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1086 }
164d950a
JB
1087 }
1088 }
1089
8a1260a3 1090 /**
aa9eede8 1091 * Gets the worker information given its worker node key.
8a1260a3
JB
1092 *
1093 * @param workerNodeKey - The worker node key.
3f09ed9f 1094 * @returns The worker information.
8a1260a3 1095 */
aa9eede8 1096 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1097 return this.workerNodes[workerNodeKey].info
e221309a
JB
1098 }
1099
a05c10de 1100 /**
b0a4db63 1101 * Adds the given worker in the pool worker nodes.
ea7a90d3 1102 *
38e795c1 1103 * @param worker - The worker.
aa9eede8
JB
1104 * @returns The added worker node key.
1105 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1106 */
b0a4db63 1107 private addWorkerNode (worker: Worker): number {
cc3ab78b 1108 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1109 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1110 if (this.starting) {
1111 workerNode.info.ready = true
1112 }
aa9eede8 1113 this.workerNodes.push(workerNode)
aad6fb64 1114 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1115 if (workerNodeKey === -1) {
1116 throw new Error('Worker node not found')
1117 }
1118 return workerNodeKey
ea7a90d3 1119 }
c923ce56 1120
51fe3d3c 1121 /**
f06e48d8 1122 * Removes the given worker from the pool worker nodes.
51fe3d3c 1123 *
f06e48d8 1124 * @param worker - The worker.
51fe3d3c 1125 */
416fd65c 1126 private removeWorkerNode (worker: Worker): void {
aad6fb64 1127 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1128 if (workerNodeKey !== -1) {
1129 this.workerNodes.splice(workerNodeKey, 1)
1130 this.workerChoiceStrategyContext.remove(workerNodeKey)
1131 }
51fe3d3c 1132 }
adc3c320 1133
b0a4db63 1134 /**
aa9eede8 1135 * Executes the given task on the worker given its worker node key.
b0a4db63 1136 *
aa9eede8 1137 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1138 * @param task - The task to execute.
1139 */
2e81254d 1140 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1141 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1142 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1143 }
1144
f9f00b5f 1145 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1146 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1147 }
1148
416fd65c 1149 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1150 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1151 }
1152
416fd65c 1153 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1154 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1155 }
1156
81c02522 1157 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1158 while (this.tasksQueueSize(workerNodeKey) > 0) {
1159 this.executeTask(
1160 workerNodeKey,
1161 this.dequeueTask(workerNodeKey) as Task<Data>
1162 )
ff733df7 1163 }
4b628b48 1164 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1165 }
1166
ef41a6e6
JB
1167 private flushTasksQueues (): void {
1168 for (const [workerNodeKey] of this.workerNodes.entries()) {
1169 this.flushTasksQueue(workerNodeKey)
1170 }
1171 }
c97c7edb 1172}