build: silence linter on TS code examples
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
501aea93
JB
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
075e51d1 86 /**
adc9cc64 87 * Whether the pool is starting or not.
075e51d1
JB
88 */
89 private readonly starting: boolean
afe0d5bf
JB
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
729c563d
S
95 /**
96 * Constructs a new poolifier pool.
97 *
38e795c1 98 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 99 * @param filePath - Path to the worker file.
38e795c1 100 * @param opts - Options for the pool.
729c563d 101 */
c97c7edb 102 public constructor (
b4213b7f
JB
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
c97c7edb 106 ) {
78cea37e 107 if (!this.isMain()) {
c97c7edb
S
108 throw new Error('Cannot start a pool from a worker!')
109 }
8d3782fa 110 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 111 this.checkFilePath(this.filePath)
7c0ba920 112 this.checkPoolOptions(this.opts)
1086026a 113
7254e419
JB
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 117 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 119
6bd72cd0 120 if (this.opts.enableEvents === true) {
7c0ba920
JB
121 this.emitter = new PoolEmitter()
122 }
d59df138
JB
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
da309861
JB
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
b6b32453
JB
132
133 this.setupHook()
134
075e51d1 135 this.starting = true
e761c033 136 this.startPool()
075e51d1 137 this.starting = false
afe0d5bf
JB
138
139 this.startTimestamp = performance.now()
c97c7edb
S
140 }
141
a35560ba 142 private checkFilePath (filePath: string): void {
ffcbbad8
JB
143 if (
144 filePath == null ||
3d6dd312 145 typeof filePath !== 'string' ||
ffcbbad8
JB
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
c510fea7
APA
148 throw new Error('Please specify a file with a worker implementation')
149 }
3d6dd312
JB
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
c510fea7
APA
153 }
154
8d3782fa
JB
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
78cea37e 160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 161 throw new TypeError(
0d80593b 162 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
163 )
164 } else if (numberOfWorkers < 0) {
473c717a 165 throw new RangeError(
8d3782fa
JB
166 'Cannot instantiate a pool with a negative number of workers'
167 )
6b27d407 168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 174 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
175 if (max == null) {
176 throw new Error(
177 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
178 )
179 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
180 throw new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 } else if (min > max) {
079de991
JB
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
b97d82d8 187 } else if (max === 0) {
079de991 188 throw new RangeError(
d640b48b 189 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
8d3782fa
JB
196 }
197 }
198
7c0ba920 199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
1f68cede 210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 223 }
aee46736
JB
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 230 throw new Error(
aee46736 231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
232 )
233 }
7c0ba920
JB
234 }
235
0d80593b
JB
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
49be33fe
JB
244 if (
245 workerChoiceStrategyOptions.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803
JB
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
a20f0ba5
JB
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
0d80593b
JB
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
f0d7f803
JB
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
a20f0ba5 282 throw new Error(
f0d7f803 283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
284 )
285 }
286 }
287
e761c033
JB
288 private startPool (): void {
289 while (
290 this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
293 0
294 ) < this.numberOfWorkers
295 ) {
aa9eede8 296 this.createAndSetupWorkerNode()
e761c033
JB
297 }
298 }
299
08f3f44c 300 /** @inheritDoc */
6b27d407
JB
301 public get info (): PoolInfo {
302 return {
23ccf9d7 303 version,
6b27d407 304 type: this.type,
184855e6 305 worker: this.worker,
2431bdb4
JB
306 ready: this.ready,
307 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
308 minSize: this.minSize,
309 maxSize: this.maxSize,
c05f0d50
JB
310 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
311 .runTime.aggregate &&
1305e9a8
JB
312 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
314 workerNodes: this.workerNodes.length,
315 idleWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing === 0
a4e07f72
JB
318 ? accumulator + 1
319 : accumulator,
6b27d407
JB
320 0
321 ),
322 busyWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
f59e1027 324 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
325 0
326 ),
a4e07f72 327 executedTasks: this.workerNodes.reduce(
6b27d407 328 (accumulator, workerNode) =>
f59e1027 329 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
330 0
331 ),
332 executingTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
f59e1027 334 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
335 0
336 ),
daf86646
JB
337 ...(this.opts.enableTasksQueue === true && {
338 queuedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.queued,
341 0
342 )
343 }),
344 ...(this.opts.enableTasksQueue === true && {
345 maxQueuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
348 0
349 )
350 }),
a4e07f72
JB
351 failedTasks: this.workerNodes.reduce(
352 (accumulator, workerNode) =>
f59e1027 353 accumulator + workerNode.usage.tasks.failed,
a4e07f72 354 0
1dcf8b7b
JB
355 ),
356 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .runTime.aggregate && {
358 runTime: {
98e72cda
JB
359 minimum: round(
360 Math.min(
361 ...this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
363 )
1dcf8b7b
JB
364 )
365 ),
98e72cda
JB
366 maximum: round(
367 Math.max(
368 ...this.workerNodes.map(
369 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
370 )
1dcf8b7b 371 )
98e72cda
JB
372 ),
373 average: round(
374 this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
377 0
378 ) /
379 this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.executed ?? 0),
382 0
383 )
384 ),
385 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386 .runTime.median && {
387 median: round(
388 median(
389 this.workerNodes.map(
390 workerNode => workerNode.usage.runTime?.median ?? 0
391 )
392 )
393 )
394 })
1dcf8b7b
JB
395 }
396 }),
397 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
398 .waitTime.aggregate && {
399 waitTime: {
98e72cda
JB
400 minimum: round(
401 Math.min(
402 ...this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
404 )
1dcf8b7b
JB
405 )
406 ),
98e72cda
JB
407 maximum: round(
408 Math.max(
409 ...this.workerNodes.map(
410 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
411 )
1dcf8b7b 412 )
98e72cda
JB
413 ),
414 average: round(
415 this.workerNodes.reduce(
416 (accumulator, workerNode) =>
417 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
418 0
419 ) /
420 this.workerNodes.reduce(
421 (accumulator, workerNode) =>
422 accumulator + (workerNode.usage.tasks?.executed ?? 0),
423 0
424 )
425 ),
426 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
427 .waitTime.median && {
428 median: round(
429 median(
430 this.workerNodes.map(
431 workerNode => workerNode.usage.waitTime?.median ?? 0
432 )
433 )
434 )
435 })
1dcf8b7b
JB
436 }
437 })
6b27d407
JB
438 }
439 }
08f3f44c 440
aa9eede8
JB
441 /**
442 * The pool readiness boolean status.
443 */
2431bdb4
JB
444 private get ready (): boolean {
445 return (
b97d82d8
JB
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 !workerNode.info.dynamic && workerNode.info.ready
449 ? accumulator + 1
450 : accumulator,
451 0
452 ) >= this.minSize
2431bdb4
JB
453 )
454 }
455
afe0d5bf 456 /**
aa9eede8 457 * The approximate pool utilization.
afe0d5bf
JB
458 *
459 * @returns The pool utilization.
460 */
461 private get utilization (): number {
8e5ca040 462 const poolTimeCapacity =
fe7d90db 463 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
464 const totalTasksRunTime = this.workerNodes.reduce(
465 (accumulator, workerNode) =>
71514351 466 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
467 0
468 )
469 const totalTasksWaitTime = this.workerNodes.reduce(
470 (accumulator, workerNode) =>
71514351 471 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
472 0
473 )
8e5ca040 474 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
475 }
476
8881ae32 477 /**
aa9eede8 478 * The pool type.
8881ae32
JB
479 *
480 * If it is `'dynamic'`, it provides the `max` property.
481 */
482 protected abstract get type (): PoolType
483
184855e6 484 /**
aa9eede8 485 * The worker type.
184855e6
JB
486 */
487 protected abstract get worker (): WorkerType
488
c2ade475 489 /**
aa9eede8 490 * The pool minimum size.
c2ade475 491 */
6b27d407 492 protected abstract get minSize (): number
ff733df7
JB
493
494 /**
aa9eede8 495 * The pool maximum size.
ff733df7 496 */
6b27d407 497 protected abstract get maxSize (): number
a35560ba 498
6b813701
JB
499 /**
500 * Checks if the worker id sent in the received message from a worker is valid.
501 *
502 * @param message - The received message.
503 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
504 */
21f710aa
JB
505 private checkMessageWorkerId (message: MessageValue<Response>): void {
506 if (
507 message.workerId != null &&
aad6fb64 508 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
509 ) {
510 throw new Error(
511 `Worker message received from unknown worker '${message.workerId}'`
512 )
513 }
514 }
515
ffcbbad8 516 /**
f06e48d8 517 * Gets the given worker its worker node key.
ffcbbad8
JB
518 *
519 * @param worker - The worker.
f59e1027 520 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 521 */
aad6fb64 522 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8
JB
523 return this.workerNodes.findIndex(
524 workerNode => workerNode.worker === worker
525 )
bf9549ae
JB
526 }
527
aa9eede8
JB
528 /**
529 * Gets the worker node key given its worker id.
530 *
531 * @param workerId - The worker id.
aad6fb64 532 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 533 */
aad6fb64
JB
534 private getWorkerNodeKeyByWorkerId (workerId: number): number {
535 return this.workerNodes.findIndex(
536 workerNode => workerNode.info.id === workerId
537 )
aa9eede8
JB
538 }
539
afc003b2 540 /** @inheritDoc */
a35560ba 541 public setWorkerChoiceStrategy (
59219cbb
JB
542 workerChoiceStrategy: WorkerChoiceStrategy,
543 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 544 ): void {
aee46736 545 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 546 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
547 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
548 this.opts.workerChoiceStrategy
549 )
550 if (workerChoiceStrategyOptions != null) {
551 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
552 }
aa9eede8 553 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 554 workerNode.resetUsage()
aa9eede8 555 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 556 }
a20f0ba5
JB
557 }
558
559 /** @inheritDoc */
560 public setWorkerChoiceStrategyOptions (
561 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
562 ): void {
0d80593b 563 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
564 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
565 this.workerChoiceStrategyContext.setOptions(
566 this.opts.workerChoiceStrategyOptions
a35560ba
S
567 )
568 }
569
a20f0ba5 570 /** @inheritDoc */
8f52842f
JB
571 public enableTasksQueue (
572 enable: boolean,
573 tasksQueueOptions?: TasksQueueOptions
574 ): void {
a20f0ba5 575 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 576 this.flushTasksQueues()
a20f0ba5
JB
577 }
578 this.opts.enableTasksQueue = enable
8f52842f 579 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
580 }
581
582 /** @inheritDoc */
8f52842f 583 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 584 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
585 this.checkValidTasksQueueOptions(tasksQueueOptions)
586 this.opts.tasksQueueOptions =
587 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 588 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
589 delete this.opts.tasksQueueOptions
590 }
591 }
592
593 private buildTasksQueueOptions (
594 tasksQueueOptions: TasksQueueOptions
595 ): TasksQueueOptions {
596 return {
597 concurrency: tasksQueueOptions?.concurrency ?? 1
598 }
599 }
600
c319c66b
JB
601 /**
602 * Whether the pool is full or not.
603 *
604 * The pool filling boolean status.
605 */
dea903a8
JB
606 protected get full (): boolean {
607 return this.workerNodes.length >= this.maxSize
608 }
c2ade475 609
c319c66b
JB
610 /**
611 * Whether the pool is busy or not.
612 *
613 * The pool busyness boolean status.
614 */
615 protected abstract get busy (): boolean
7c0ba920 616
6c6afb84
JB
617 /**
618 * Whether worker nodes are executing at least one task.
619 *
620 * @returns Worker nodes busyness boolean status.
621 */
c2ade475 622 protected internalBusy (): boolean {
e0ae6100 623 return (
fd7de21a
JB
624 this.workerNodes.findIndex(
625 workerNode =>
626 workerNode.info.ready && workerNode.usage.tasks.executing === 0
627 ) === -1
e0ae6100 628 )
cb70b19d
JB
629 }
630
afc003b2 631 /** @inheritDoc */
a86b6df1 632 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
633 return await new Promise<Response>((resolve, reject) => {
634 const timestamp = performance.now()
635 const workerNodeKey = this.chooseWorkerNode()
501aea93 636 const task: Task<Data> = {
52b71763
JB
637 name: name ?? DEFAULT_TASK_NAME,
638 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
639 data: data ?? ({} as Data),
640 timestamp,
641 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 642 taskId: randomUUID()
52b71763 643 }
7629bdf1 644 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
645 resolve,
646 reject,
501aea93 647 workerNodeKey
2e81254d 648 })
52b71763 649 if (
4e377863
JB
650 this.opts.enableTasksQueue === false ||
651 (this.opts.enableTasksQueue === true &&
652 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 653 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 654 ) {
501aea93 655 this.executeTask(workerNodeKey, task)
4e377863
JB
656 } else {
657 this.enqueueTask(workerNodeKey, task)
52b71763
JB
658 }
659 this.checkAndEmitEvents()
2e81254d 660 })
280c2a77 661 }
c97c7edb 662
afc003b2 663 /** @inheritDoc */
c97c7edb 664 public async destroy (): Promise<void> {
1fbcaa7c 665 await Promise.all(
81c02522 666 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 667 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
668 })
669 )
c97c7edb
S
670 }
671
4a6952ff 672 /**
aa9eede8 673 * Terminates the worker node given its worker node key.
4a6952ff 674 *
aa9eede8 675 * @param workerNodeKey - The worker node key.
4a6952ff 676 */
81c02522 677 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 678
729c563d 679 /**
6677a3d3
JB
680 * Setup hook to execute code before worker nodes are created in the abstract constructor.
681 * Can be overridden.
afc003b2
JB
682 *
683 * @virtual
729c563d 684 */
280c2a77 685 protected setupHook (): void {
d99ba5a8 686 // Intentionally empty
280c2a77 687 }
c97c7edb 688
729c563d 689 /**
280c2a77
S
690 * Should return whether the worker is the main worker or not.
691 */
692 protected abstract isMain (): boolean
693
694 /**
2e81254d 695 * Hook executed before the worker task execution.
bf9549ae 696 * Can be overridden.
729c563d 697 *
f06e48d8 698 * @param workerNodeKey - The worker node key.
1c6fe997 699 * @param task - The task to execute.
729c563d 700 */
1c6fe997
JB
701 protected beforeTaskExecutionHook (
702 workerNodeKey: number,
703 task: Task<Data>
704 ): void {
f59e1027 705 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
706 ++workerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 708 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
709 task.name as string
710 ) as WorkerUsage
eb8afc8a
JB
711 ++taskWorkerUsage.tasks.executing
712 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
713 }
714
c01733f1 715 /**
2e81254d 716 * Hook executed after the worker task execution.
bf9549ae 717 * Can be overridden.
c01733f1 718 *
501aea93 719 * @param workerNodeKey - The worker node key.
38e795c1 720 * @param message - The received message.
c01733f1 721 */
2e81254d 722 protected afterTaskExecutionHook (
501aea93 723 workerNodeKey: number,
2740a743 724 message: MessageValue<Response>
bf9549ae 725 ): void {
ff128cc9 726 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
727 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
728 this.updateRunTimeWorkerUsage(workerUsage, message)
729 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 730 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 731 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 732 ) as WorkerUsage
eb8afc8a
JB
733 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
734 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
735 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
736 }
737
738 private updateTaskStatisticsWorkerUsage (
739 workerUsage: WorkerUsage,
740 message: MessageValue<Response>
741 ): void {
a4e07f72
JB
742 const workerTaskStatistics = workerUsage.tasks
743 --workerTaskStatistics.executing
98e72cda
JB
744 if (message.taskError == null) {
745 ++workerTaskStatistics.executed
746 } else {
a4e07f72 747 ++workerTaskStatistics.failed
2740a743 748 }
f8eb0a2a
JB
749 }
750
a4e07f72
JB
751 private updateRunTimeWorkerUsage (
752 workerUsage: WorkerUsage,
f8eb0a2a
JB
753 message: MessageValue<Response>
754 ): void {
e4f20deb
JB
755 updateMeasurementStatistics(
756 workerUsage.runTime,
757 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
758 message.taskPerformance?.runTime ?? 0,
759 workerUsage.tasks.executed
760 )
f8eb0a2a
JB
761 }
762
a4e07f72
JB
763 private updateWaitTimeWorkerUsage (
764 workerUsage: WorkerUsage,
1c6fe997 765 task: Task<Data>
f8eb0a2a 766 ): void {
1c6fe997
JB
767 const timestamp = performance.now()
768 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
769 updateMeasurementStatistics(
770 workerUsage.waitTime,
771 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
772 taskWaitTime,
773 workerUsage.tasks.executed
774 )
c01733f1 775 }
776
a4e07f72 777 private updateEluWorkerUsage (
5df69fab 778 workerUsage: WorkerUsage,
62c15a68
JB
779 message: MessageValue<Response>
780 ): void {
008512c7
JB
781 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
782 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
783 updateMeasurementStatistics(
784 workerUsage.elu.active,
008512c7 785 eluTaskStatisticsRequirements,
e4f20deb
JB
786 message.taskPerformance?.elu?.active ?? 0,
787 workerUsage.tasks.executed
788 )
789 updateMeasurementStatistics(
790 workerUsage.elu.idle,
008512c7 791 eluTaskStatisticsRequirements,
e4f20deb
JB
792 message.taskPerformance?.elu?.idle ?? 0,
793 workerUsage.tasks.executed
794 )
008512c7 795 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 796 if (message.taskPerformance?.elu != null) {
f7510105
JB
797 if (workerUsage.elu.utilization != null) {
798 workerUsage.elu.utilization =
799 (workerUsage.elu.utilization +
800 message.taskPerformance.elu.utilization) /
801 2
802 } else {
803 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
804 }
62c15a68
JB
805 }
806 }
807 }
808
280c2a77 809 /**
f06e48d8 810 * Chooses a worker node for the next task.
280c2a77 811 *
6c6afb84 812 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 813 *
aa9eede8 814 * @returns The chosen worker node key
280c2a77 815 */
6c6afb84 816 private chooseWorkerNode (): number {
930dcf12 817 if (this.shallCreateDynamicWorker()) {
aa9eede8 818 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
819 if (
820 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
821 ) {
aa9eede8 822 return workerNodeKey
6c6afb84 823 }
17393ac8 824 }
930dcf12
JB
825 return this.workerChoiceStrategyContext.execute()
826 }
827
6c6afb84
JB
828 /**
829 * Conditions for dynamic worker creation.
830 *
831 * @returns Whether to create a dynamic worker or not.
832 */
833 private shallCreateDynamicWorker (): boolean {
930dcf12 834 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
835 }
836
280c2a77 837 /**
aa9eede8 838 * Sends a message to worker given its worker node key.
280c2a77 839 *
aa9eede8 840 * @param workerNodeKey - The worker node key.
38e795c1 841 * @param message - The message.
280c2a77
S
842 */
843 protected abstract sendToWorker (
aa9eede8 844 workerNodeKey: number,
280c2a77
S
845 message: MessageValue<Data>
846 ): void
847
729c563d 848 /**
41344292 849 * Creates a new worker.
6c6afb84
JB
850 *
851 * @returns Newly created worker.
729c563d 852 */
280c2a77 853 protected abstract createWorker (): Worker
c97c7edb 854
4a6952ff 855 /**
aa9eede8 856 * Creates a new, completely set up worker node.
4a6952ff 857 *
aa9eede8 858 * @returns New, completely set up worker node key.
4a6952ff 859 */
aa9eede8 860 protected createAndSetupWorkerNode (): number {
bdacc2d2 861 const worker = this.createWorker()
280c2a77 862
35cf1c03 863 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 864 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 865 worker.on('error', error => {
aad6fb64 866 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
867 const workerInfo = this.getWorkerInfo(workerNodeKey)
868 workerInfo.ready = false
0dc838e3 869 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 870 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 871 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 872 if (workerInfo.dynamic) {
aa9eede8 873 this.createAndSetupDynamicWorkerNode()
8a1260a3 874 } else {
aa9eede8 875 this.createAndSetupWorkerNode()
8a1260a3 876 }
5baee0d7 877 }
19dbc45b 878 if (this.opts.enableTasksQueue === true) {
9b106837 879 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 880 }
5baee0d7 881 })
a35560ba
S
882 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
883 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 884 worker.once('exit', () => {
f06e48d8 885 this.removeWorkerNode(worker)
a974afa6 886 })
280c2a77 887
aa9eede8 888 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 889
aa9eede8 890 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 891
aa9eede8 892 return workerNodeKey
c97c7edb 893 }
be0676b3 894
930dcf12 895 /**
aa9eede8 896 * Creates a new, completely set up dynamic worker node.
930dcf12 897 *
aa9eede8 898 * @returns New, completely set up dynamic worker node key.
930dcf12 899 */
aa9eede8
JB
900 protected createAndSetupDynamicWorkerNode (): number {
901 const workerNodeKey = this.createAndSetupWorkerNode()
902 this.registerWorkerMessageListener(workerNodeKey, message => {
903 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
904 message.workerId
aad6fb64 905 )
aa9eede8 906 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 907 // Kill message received from worker
930dcf12
JB
908 if (
909 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
910 (message.kill != null &&
911 ((this.opts.enableTasksQueue === false &&
aa9eede8 912 workerUsage.tasks.executing === 0) ||
7b56f532 913 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
914 workerUsage.tasks.executing === 0 &&
915 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 916 ) {
81c02522 917 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930dcf12
JB
918 }
919 })
aa9eede8 920 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 921 this.sendToWorker(workerNodeKey, {
b0a4db63 922 checkActive: true,
21f710aa
JB
923 workerId: workerInfo.id as number
924 })
b5e113f6
JB
925 workerInfo.dynamic = true
926 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
927 workerInfo.ready = true
928 }
aa9eede8 929 return workerNodeKey
930dcf12
JB
930 }
931
a2ed5053 932 /**
aa9eede8 933 * Registers a listener callback on the worker given its worker node key.
a2ed5053 934 *
aa9eede8 935 * @param workerNodeKey - The worker node key.
a2ed5053
JB
936 * @param listener - The message listener callback.
937 */
85aeb3f3
JB
938 protected abstract registerWorkerMessageListener<
939 Message extends Data | Response
aa9eede8
JB
940 >(
941 workerNodeKey: number,
942 listener: (message: MessageValue<Message>) => void
943 ): void
a2ed5053
JB
944
945 /**
aa9eede8 946 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
947 * Can be overridden.
948 *
aa9eede8 949 * @param workerNodeKey - The newly created worker node key.
a2ed5053 950 */
aa9eede8 951 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 952 // Listen to worker messages.
aa9eede8 953 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 954 // Send the startup message to worker.
aa9eede8
JB
955 this.sendStartupMessageToWorker(workerNodeKey)
956 // Send the worker statistics message to worker.
957 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
958 }
959
85aeb3f3 960 /**
aa9eede8
JB
961 * Sends the startup message to worker given its worker node key.
962 *
963 * @param workerNodeKey - The worker node key.
964 */
965 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
966
967 /**
968 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 969 *
aa9eede8 970 * @param workerNodeKey - The worker node key.
85aeb3f3 971 */
aa9eede8
JB
972 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
973 this.sendToWorker(workerNodeKey, {
974 statistics: {
975 runTime:
976 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
977 .runTime.aggregate,
978 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
979 .elu.aggregate
980 },
981 workerId: this.getWorkerInfo(workerNodeKey).id as number
982 })
983 }
a2ed5053
JB
984
985 private redistributeQueuedTasks (workerNodeKey: number): void {
986 while (this.tasksQueueSize(workerNodeKey) > 0) {
987 let targetWorkerNodeKey: number = workerNodeKey
988 let minQueuedTasks = Infinity
10ecf8fd 989 let executeTask = false
a2ed5053
JB
990 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
991 const workerInfo = this.getWorkerInfo(workerNodeId)
992 if (
993 workerNodeId !== workerNodeKey &&
994 workerInfo.ready &&
995 workerNode.usage.tasks.queued === 0
996 ) {
a5ed75b7
JB
997 if (
998 this.workerNodes[workerNodeId].usage.tasks.executing <
999 (this.opts.tasksQueueOptions?.concurrency as number)
1000 ) {
10ecf8fd
JB
1001 executeTask = true
1002 }
a2ed5053
JB
1003 targetWorkerNodeKey = workerNodeId
1004 break
1005 }
1006 if (
1007 workerNodeId !== workerNodeKey &&
1008 workerInfo.ready &&
1009 workerNode.usage.tasks.queued < minQueuedTasks
1010 ) {
1011 minQueuedTasks = workerNode.usage.tasks.queued
1012 targetWorkerNodeKey = workerNodeId
1013 }
1014 }
10ecf8fd
JB
1015 if (executeTask) {
1016 this.executeTask(
1017 targetWorkerNodeKey,
1018 this.dequeueTask(workerNodeKey) as Task<Data>
1019 )
1020 } else {
1021 this.enqueueTask(
1022 targetWorkerNodeKey,
1023 this.dequeueTask(workerNodeKey) as Task<Data>
1024 )
1025 }
a2ed5053
JB
1026 }
1027 }
1028
be0676b3 1029 /**
aa9eede8 1030 * This method is the listener registered for each worker message.
be0676b3 1031 *
bdacc2d2 1032 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1033 */
1034 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1035 return message => {
21f710aa 1036 this.checkMessageWorkerId(message)
d2c73f82 1037 if (message.ready != null) {
81c02522 1038 // Worker ready response received from worker
10e2aa7e 1039 this.handleWorkerReadyResponse(message)
7629bdf1 1040 } else if (message.taskId != null) {
81c02522 1041 // Task execution response received from worker
6b272951
JB
1042 this.handleTaskExecutionResponse(message)
1043 }
1044 }
1045 }
1046
10e2aa7e 1047 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1048 this.getWorkerInfo(
aad6fb64 1049 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1050 ).ready = message.ready as boolean
2431bdb4
JB
1051 if (this.emitter != null && this.ready) {
1052 this.emitter.emit(PoolEvents.ready, this.info)
1053 }
6b272951
JB
1054 }
1055
1056 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
7629bdf1
JB
1057 const promiseResponse = this.promiseResponseMap.get(
1058 message.taskId as string
1059 )
6b272951
JB
1060 if (promiseResponse != null) {
1061 if (message.taskError != null) {
2a69b8c5 1062 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1063 promiseResponse.reject(message.taskError.message)
1064 } else {
1065 promiseResponse.resolve(message.data as Response)
1066 }
501aea93
JB
1067 const workerNodeKey = promiseResponse.workerNodeKey
1068 this.afterTaskExecutionHook(workerNodeKey, message)
7629bdf1 1069 this.promiseResponseMap.delete(message.taskId as string)
6b272951
JB
1070 if (
1071 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1072 this.tasksQueueSize(workerNodeKey) > 0 &&
1073 this.workerNodes[workerNodeKey].usage.tasks.executing <
1074 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1075 ) {
1076 this.executeTask(
1077 workerNodeKey,
1078 this.dequeueTask(workerNodeKey) as Task<Data>
1079 )
be0676b3 1080 }
6b272951 1081 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1082 }
be0676b3 1083 }
7c0ba920 1084
ff733df7 1085 private checkAndEmitEvents (): void {
1f68cede 1086 if (this.emitter != null) {
ff733df7 1087 if (this.busy) {
2845f2a5 1088 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1089 }
6b27d407 1090 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1091 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1092 }
164d950a
JB
1093 }
1094 }
1095
8a1260a3 1096 /**
aa9eede8 1097 * Gets the worker information given its worker node key.
8a1260a3
JB
1098 *
1099 * @param workerNodeKey - The worker node key.
3f09ed9f 1100 * @returns The worker information.
8a1260a3 1101 */
aa9eede8 1102 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1103 return this.workerNodes[workerNodeKey].info
e221309a
JB
1104 }
1105
a05c10de 1106 /**
b0a4db63 1107 * Adds the given worker in the pool worker nodes.
ea7a90d3 1108 *
38e795c1 1109 * @param worker - The worker.
aa9eede8
JB
1110 * @returns The added worker node key.
1111 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1112 */
b0a4db63 1113 private addWorkerNode (worker: Worker): number {
cc3ab78b 1114 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1115 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1116 if (this.starting) {
1117 workerNode.info.ready = true
1118 }
aa9eede8 1119 this.workerNodes.push(workerNode)
aad6fb64 1120 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1121 if (workerNodeKey === -1) {
1122 throw new Error('Worker node not found')
1123 }
1124 return workerNodeKey
ea7a90d3 1125 }
c923ce56 1126
51fe3d3c 1127 /**
f06e48d8 1128 * Removes the given worker from the pool worker nodes.
51fe3d3c 1129 *
f06e48d8 1130 * @param worker - The worker.
51fe3d3c 1131 */
416fd65c 1132 private removeWorkerNode (worker: Worker): void {
aad6fb64 1133 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1134 if (workerNodeKey !== -1) {
1135 this.workerNodes.splice(workerNodeKey, 1)
1136 this.workerChoiceStrategyContext.remove(workerNodeKey)
1137 }
51fe3d3c 1138 }
adc3c320 1139
b0a4db63 1140 /**
aa9eede8 1141 * Executes the given task on the worker given its worker node key.
b0a4db63 1142 *
aa9eede8 1143 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1144 * @param task - The task to execute.
1145 */
2e81254d 1146 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1147 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1148 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1149 }
1150
f9f00b5f 1151 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1152 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1153 }
1154
416fd65c 1155 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1156 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1157 }
1158
416fd65c 1159 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1160 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1161 }
1162
81c02522 1163 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1164 while (this.tasksQueueSize(workerNodeKey) > 0) {
1165 this.executeTask(
1166 workerNodeKey,
1167 this.dequeueTask(workerNodeKey) as Task<Data>
1168 )
ff733df7 1169 }
4b628b48 1170 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1171 }
1172
ef41a6e6
JB
1173 private flushTasksQueues (): void {
1174 for (const [workerNodeKey] of this.workerNodes.entries()) {
1175 this.flushTasksQueue(workerNodeKey)
1176 }
1177 }
c97c7edb 1178}