feat: add dynamic pool sizes type check
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
7c89e6a4 13 isAsyncFunction,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
e102732c
JB
31import type {
32 IWorker,
4b628b48 33 IWorkerNode,
8a1260a3 34 WorkerInfo,
4b628b48 35 WorkerType,
e102732c
JB
36 WorkerUsage
37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
c97c7edb
S
109 throw new Error('Cannot start a pool from a worker!')
110 }
8d3782fa 111 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 112 this.checkFilePath(this.filePath)
7c0ba920 113 this.checkPoolOptions(this.opts)
1086026a 114
7254e419
JB
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 118 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 175 if (this.type === PoolTypes.dynamic) {
2761efb4
JB
176 if (!Number.isSafeInteger(max)) {
177 throw new TypeError(
178 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
179 )
180 } else if (min > max) {
079de991
JB
181 throw new RangeError(
182 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
183 )
b97d82d8 184 } else if (max === 0) {
079de991 185 throw new RangeError(
b97d82d8 186 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
187 )
188 } else if (min === max) {
189 throw new RangeError(
190 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
191 )
192 }
8d3782fa
JB
193 }
194 }
195
7c0ba920 196 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
197 if (isPlainObject(opts)) {
198 this.opts.workerChoiceStrategy =
199 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
200 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
201 this.opts.workerChoiceStrategyOptions =
202 opts.workerChoiceStrategyOptions ??
203 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
204 this.checkValidWorkerChoiceStrategyOptions(
205 this.opts.workerChoiceStrategyOptions
206 )
1f68cede 207 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
208 this.opts.enableEvents = opts.enableEvents ?? true
209 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
210 if (this.opts.enableTasksQueue) {
211 this.checkValidTasksQueueOptions(
212 opts.tasksQueueOptions as TasksQueueOptions
213 )
214 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 }
218 } else {
219 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 220 }
aee46736
JB
221 }
222
223 private checkValidWorkerChoiceStrategy (
224 workerChoiceStrategy: WorkerChoiceStrategy
225 ): void {
226 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 227 throw new Error(
aee46736 228 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
229 )
230 }
7c0ba920
JB
231 }
232
0d80593b
JB
233 private checkValidWorkerChoiceStrategyOptions (
234 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
235 ): void {
236 if (!isPlainObject(workerChoiceStrategyOptions)) {
237 throw new TypeError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
240 }
49be33fe
JB
241 if (
242 workerChoiceStrategyOptions.weights != null &&
6b27d407 243 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
244 ) {
245 throw new Error(
246 'Invalid worker choice strategy options: must have a weight for each worker node'
247 )
248 }
f0d7f803
JB
249 if (
250 workerChoiceStrategyOptions.measurement != null &&
251 !Object.values(Measurements).includes(
252 workerChoiceStrategyOptions.measurement
253 )
254 ) {
255 throw new Error(
256 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
257 )
258 }
0d80593b
JB
259 }
260
a20f0ba5
JB
261 private checkValidTasksQueueOptions (
262 tasksQueueOptions: TasksQueueOptions
263 ): void {
0d80593b
JB
264 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
265 throw new TypeError('Invalid tasks queue options: must be a plain object')
266 }
f0d7f803
JB
267 if (
268 tasksQueueOptions?.concurrency != null &&
269 !Number.isSafeInteger(tasksQueueOptions.concurrency)
270 ) {
271 throw new TypeError(
272 'Invalid worker tasks concurrency: must be an integer'
273 )
274 }
275 if (
276 tasksQueueOptions?.concurrency != null &&
277 tasksQueueOptions.concurrency <= 0
278 ) {
a20f0ba5 279 throw new Error(
f0d7f803 280 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
281 )
282 }
283 }
284
e761c033
JB
285 private startPool (): void {
286 while (
287 this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
290 0
291 ) < this.numberOfWorkers
292 ) {
aa9eede8 293 this.createAndSetupWorkerNode()
e761c033
JB
294 }
295 }
296
08f3f44c 297 /** @inheritDoc */
6b27d407
JB
298 public get info (): PoolInfo {
299 return {
23ccf9d7 300 version,
6b27d407 301 type: this.type,
184855e6 302 worker: this.worker,
2431bdb4
JB
303 ready: this.ready,
304 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
305 minSize: this.minSize,
306 maxSize: this.maxSize,
c05f0d50
JB
307 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
308 .runTime.aggregate &&
1305e9a8
JB
309 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
310 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
311 workerNodes: this.workerNodes.length,
312 idleWorkerNodes: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
f59e1027 314 workerNode.usage.tasks.executing === 0
a4e07f72
JB
315 ? accumulator + 1
316 : accumulator,
6b27d407
JB
317 0
318 ),
319 busyWorkerNodes: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
f59e1027 321 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
322 0
323 ),
a4e07f72 324 executedTasks: this.workerNodes.reduce(
6b27d407 325 (accumulator, workerNode) =>
f59e1027 326 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
327 0
328 ),
329 executingTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
f59e1027 331 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
332 0
333 ),
334 queuedTasks: this.workerNodes.reduce(
df593701 335 (accumulator, workerNode) =>
f59e1027 336 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
337 0
338 ),
339 maxQueuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
b25a42cd 341 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 342 0
a4e07f72
JB
343 ),
344 failedTasks: this.workerNodes.reduce(
345 (accumulator, workerNode) =>
f59e1027 346 accumulator + workerNode.usage.tasks.failed,
a4e07f72 347 0
1dcf8b7b
JB
348 ),
349 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
350 .runTime.aggregate && {
351 runTime: {
98e72cda
JB
352 minimum: round(
353 Math.min(
354 ...this.workerNodes.map(
355 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
356 )
1dcf8b7b
JB
357 )
358 ),
98e72cda
JB
359 maximum: round(
360 Math.max(
361 ...this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
363 )
1dcf8b7b 364 )
98e72cda
JB
365 ),
366 average: round(
367 this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
370 0
371 ) /
372 this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + (workerNode.usage.tasks?.executed ?? 0),
375 0
376 )
377 ),
378 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
379 .runTime.median && {
380 median: round(
381 median(
382 this.workerNodes.map(
383 workerNode => workerNode.usage.runTime?.median ?? 0
384 )
385 )
386 )
387 })
1dcf8b7b
JB
388 }
389 }),
390 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
391 .waitTime.aggregate && {
392 waitTime: {
98e72cda
JB
393 minimum: round(
394 Math.min(
395 ...this.workerNodes.map(
396 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
397 )
1dcf8b7b
JB
398 )
399 ),
98e72cda
JB
400 maximum: round(
401 Math.max(
402 ...this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
404 )
1dcf8b7b 405 )
98e72cda
JB
406 ),
407 average: round(
408 this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
411 0
412 ) /
413 this.workerNodes.reduce(
414 (accumulator, workerNode) =>
415 accumulator + (workerNode.usage.tasks?.executed ?? 0),
416 0
417 )
418 ),
419 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
420 .waitTime.median && {
421 median: round(
422 median(
423 this.workerNodes.map(
424 workerNode => workerNode.usage.waitTime?.median ?? 0
425 )
426 )
427 )
428 })
1dcf8b7b
JB
429 }
430 })
6b27d407
JB
431 }
432 }
08f3f44c 433
aa9eede8
JB
434 /**
435 * The pool readiness boolean status.
436 */
2431bdb4
JB
437 private get ready (): boolean {
438 return (
b97d82d8
JB
439 this.workerNodes.reduce(
440 (accumulator, workerNode) =>
441 !workerNode.info.dynamic && workerNode.info.ready
442 ? accumulator + 1
443 : accumulator,
444 0
445 ) >= this.minSize
2431bdb4
JB
446 )
447 }
448
afe0d5bf 449 /**
aa9eede8 450 * The approximate pool utilization.
afe0d5bf
JB
451 *
452 * @returns The pool utilization.
453 */
454 private get utilization (): number {
8e5ca040 455 const poolTimeCapacity =
fe7d90db 456 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
457 const totalTasksRunTime = this.workerNodes.reduce(
458 (accumulator, workerNode) =>
71514351 459 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
460 0
461 )
462 const totalTasksWaitTime = this.workerNodes.reduce(
463 (accumulator, workerNode) =>
71514351 464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
465 0
466 )
8e5ca040 467 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
468 }
469
8881ae32 470 /**
aa9eede8 471 * The pool type.
8881ae32
JB
472 *
473 * If it is `'dynamic'`, it provides the `max` property.
474 */
475 protected abstract get type (): PoolType
476
184855e6 477 /**
aa9eede8 478 * The worker type.
184855e6
JB
479 */
480 protected abstract get worker (): WorkerType
481
c2ade475 482 /**
aa9eede8 483 * The pool minimum size.
c2ade475 484 */
6b27d407 485 protected abstract get minSize (): number
ff733df7
JB
486
487 /**
aa9eede8 488 * The pool maximum size.
ff733df7 489 */
6b27d407 490 protected abstract get maxSize (): number
a35560ba 491
6b813701
JB
492 /**
493 * Checks if the worker id sent in the received message from a worker is valid.
494 *
495 * @param message - The received message.
496 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
497 */
21f710aa
JB
498 private checkMessageWorkerId (message: MessageValue<Response>): void {
499 if (
500 message.workerId != null &&
aa9eede8 501 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
21f710aa
JB
502 ) {
503 throw new Error(
504 `Worker message received from unknown worker '${message.workerId}'`
505 )
506 }
507 }
508
ffcbbad8 509 /**
f06e48d8 510 * Gets the given worker its worker node key.
ffcbbad8
JB
511 *
512 * @param worker - The worker.
f59e1027 513 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 514 */
aa9eede8 515 private getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
516 return this.workerNodes.findIndex(
517 workerNode => workerNode.worker === worker
518 )
bf9549ae
JB
519 }
520
aa9eede8
JB
521 /**
522 * Gets the worker node key given its worker id.
523 *
524 * @param workerId - The worker id.
525 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
526 */
527 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
528 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
529 if (workerNode.info.id === workerId) {
530 return workerNodeKey
531 }
532 }
533 }
534
afc003b2 535 /** @inheritDoc */
a35560ba 536 public setWorkerChoiceStrategy (
59219cbb
JB
537 workerChoiceStrategy: WorkerChoiceStrategy,
538 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 539 ): void {
aee46736 540 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 541 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
542 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
543 this.opts.workerChoiceStrategy
544 )
545 if (workerChoiceStrategyOptions != null) {
546 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
547 }
aa9eede8 548 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 549 workerNode.resetUsage()
aa9eede8 550 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 551 }
a20f0ba5
JB
552 }
553
554 /** @inheritDoc */
555 public setWorkerChoiceStrategyOptions (
556 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
557 ): void {
0d80593b 558 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
559 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
560 this.workerChoiceStrategyContext.setOptions(
561 this.opts.workerChoiceStrategyOptions
a35560ba
S
562 )
563 }
564
a20f0ba5 565 /** @inheritDoc */
8f52842f
JB
566 public enableTasksQueue (
567 enable: boolean,
568 tasksQueueOptions?: TasksQueueOptions
569 ): void {
a20f0ba5 570 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 571 this.flushTasksQueues()
a20f0ba5
JB
572 }
573 this.opts.enableTasksQueue = enable
8f52842f 574 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
575 }
576
577 /** @inheritDoc */
8f52842f 578 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 579 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
580 this.checkValidTasksQueueOptions(tasksQueueOptions)
581 this.opts.tasksQueueOptions =
582 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 583 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
584 delete this.opts.tasksQueueOptions
585 }
586 }
587
588 private buildTasksQueueOptions (
589 tasksQueueOptions: TasksQueueOptions
590 ): TasksQueueOptions {
591 return {
592 concurrency: tasksQueueOptions?.concurrency ?? 1
593 }
594 }
595
c319c66b
JB
596 /**
597 * Whether the pool is full or not.
598 *
599 * The pool filling boolean status.
600 */
dea903a8
JB
601 protected get full (): boolean {
602 return this.workerNodes.length >= this.maxSize
603 }
c2ade475 604
c319c66b
JB
605 /**
606 * Whether the pool is busy or not.
607 *
608 * The pool busyness boolean status.
609 */
610 protected abstract get busy (): boolean
7c0ba920 611
6c6afb84
JB
612 /**
613 * Whether worker nodes are executing at least one task.
614 *
615 * @returns Worker nodes busyness boolean status.
616 */
c2ade475 617 protected internalBusy (): boolean {
e0ae6100
JB
618 return (
619 this.workerNodes.findIndex(workerNode => {
f59e1027 620 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
621 }) === -1
622 )
cb70b19d
JB
623 }
624
afc003b2 625 /** @inheritDoc */
a86b6df1 626 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
627 return await new Promise<Response>((resolve, reject) => {
628 const timestamp = performance.now()
629 const workerNodeKey = this.chooseWorkerNode()
501aea93 630 const task: Task<Data> = {
52b71763
JB
631 name: name ?? DEFAULT_TASK_NAME,
632 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
633 data: data ?? ({} as Data),
634 timestamp,
635 workerId: this.getWorkerInfo(workerNodeKey).id as number,
636 id: randomUUID()
637 }
501aea93 638 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
639 resolve,
640 reject,
501aea93 641 workerNodeKey
2e81254d 642 })
52b71763
JB
643 if (
644 this.opts.enableTasksQueue === true &&
645 (this.busy ||
646 this.workerNodes[workerNodeKey].usage.tasks.executing >=
647 ((this.opts.tasksQueueOptions as TasksQueueOptions)
648 .concurrency as number))
649 ) {
501aea93 650 this.enqueueTask(workerNodeKey, task)
52b71763 651 } else {
501aea93 652 this.executeTask(workerNodeKey, task)
52b71763
JB
653 }
654 this.checkAndEmitEvents()
2e81254d 655 })
280c2a77 656 }
c97c7edb 657
afc003b2 658 /** @inheritDoc */
c97c7edb 659 public async destroy (): Promise<void> {
1fbcaa7c 660 await Promise.all(
875a7c37
JB
661 this.workerNodes.map(async (workerNode, workerNodeKey) => {
662 this.flushTasksQueue(workerNodeKey)
47aacbaa 663 // FIXME: wait for tasks to be finished
920278a2
JB
664 const workerExitPromise = new Promise<void>(resolve => {
665 workerNode.worker.on('exit', () => {
666 resolve()
667 })
668 })
aa9eede8 669 await this.destroyWorkerNode(workerNodeKey)
920278a2 670 await workerExitPromise
1fbcaa7c
JB
671 })
672 )
c97c7edb
S
673 }
674
4a6952ff 675 /**
aa9eede8 676 * Terminates the worker node given its worker node key.
4a6952ff 677 *
aa9eede8 678 * @param workerNodeKey - The worker node key.
4a6952ff 679 */
aa9eede8
JB
680 protected abstract destroyWorkerNode (
681 workerNodeKey: number
682 ): void | Promise<void>
c97c7edb 683
729c563d 684 /**
6677a3d3
JB
685 * Setup hook to execute code before worker nodes are created in the abstract constructor.
686 * Can be overridden.
afc003b2
JB
687 *
688 * @virtual
729c563d 689 */
280c2a77 690 protected setupHook (): void {
d99ba5a8 691 // Intentionally empty
280c2a77 692 }
c97c7edb 693
729c563d 694 /**
280c2a77
S
695 * Should return whether the worker is the main worker or not.
696 */
697 protected abstract isMain (): boolean
698
699 /**
2e81254d 700 * Hook executed before the worker task execution.
bf9549ae 701 * Can be overridden.
729c563d 702 *
f06e48d8 703 * @param workerNodeKey - The worker node key.
1c6fe997 704 * @param task - The task to execute.
729c563d 705 */
1c6fe997
JB
706 protected beforeTaskExecutionHook (
707 workerNodeKey: number,
708 task: Task<Data>
709 ): void {
f59e1027 710 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
711 ++workerUsage.tasks.executing
712 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 713 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
714 task.name as string
715 ) as WorkerUsage
eb8afc8a
JB
716 ++taskWorkerUsage.tasks.executing
717 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
718 }
719
c01733f1 720 /**
2e81254d 721 * Hook executed after the worker task execution.
bf9549ae 722 * Can be overridden.
c01733f1 723 *
501aea93 724 * @param workerNodeKey - The worker node key.
38e795c1 725 * @param message - The received message.
c01733f1 726 */
2e81254d 727 protected afterTaskExecutionHook (
501aea93 728 workerNodeKey: number,
2740a743 729 message: MessageValue<Response>
bf9549ae 730 ): void {
ff128cc9 731 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
732 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
733 this.updateRunTimeWorkerUsage(workerUsage, message)
734 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 735 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 736 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 737 ) as WorkerUsage
eb8afc8a
JB
738 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
739 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
740 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
741 }
742
743 private updateTaskStatisticsWorkerUsage (
744 workerUsage: WorkerUsage,
745 message: MessageValue<Response>
746 ): void {
a4e07f72
JB
747 const workerTaskStatistics = workerUsage.tasks
748 --workerTaskStatistics.executing
98e72cda
JB
749 if (message.taskError == null) {
750 ++workerTaskStatistics.executed
751 } else {
a4e07f72 752 ++workerTaskStatistics.failed
2740a743 753 }
f8eb0a2a
JB
754 }
755
a4e07f72
JB
756 private updateRunTimeWorkerUsage (
757 workerUsage: WorkerUsage,
f8eb0a2a
JB
758 message: MessageValue<Response>
759 ): void {
e4f20deb
JB
760 updateMeasurementStatistics(
761 workerUsage.runTime,
762 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
763 message.taskPerformance?.runTime ?? 0,
764 workerUsage.tasks.executed
765 )
f8eb0a2a
JB
766 }
767
a4e07f72
JB
768 private updateWaitTimeWorkerUsage (
769 workerUsage: WorkerUsage,
1c6fe997 770 task: Task<Data>
f8eb0a2a 771 ): void {
1c6fe997
JB
772 const timestamp = performance.now()
773 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
774 updateMeasurementStatistics(
775 workerUsage.waitTime,
776 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
777 taskWaitTime,
778 workerUsage.tasks.executed
779 )
c01733f1 780 }
781
a4e07f72 782 private updateEluWorkerUsage (
5df69fab 783 workerUsage: WorkerUsage,
62c15a68
JB
784 message: MessageValue<Response>
785 ): void {
008512c7
JB
786 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
788 updateMeasurementStatistics(
789 workerUsage.elu.active,
008512c7 790 eluTaskStatisticsRequirements,
e4f20deb
JB
791 message.taskPerformance?.elu?.active ?? 0,
792 workerUsage.tasks.executed
793 )
794 updateMeasurementStatistics(
795 workerUsage.elu.idle,
008512c7 796 eluTaskStatisticsRequirements,
e4f20deb
JB
797 message.taskPerformance?.elu?.idle ?? 0,
798 workerUsage.tasks.executed
799 )
008512c7 800 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 801 if (message.taskPerformance?.elu != null) {
f7510105
JB
802 if (workerUsage.elu.utilization != null) {
803 workerUsage.elu.utilization =
804 (workerUsage.elu.utilization +
805 message.taskPerformance.elu.utilization) /
806 2
807 } else {
808 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
809 }
62c15a68
JB
810 }
811 }
812 }
813
280c2a77 814 /**
f06e48d8 815 * Chooses a worker node for the next task.
280c2a77 816 *
6c6afb84 817 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 818 *
aa9eede8 819 * @returns The chosen worker node key
280c2a77 820 */
6c6afb84 821 private chooseWorkerNode (): number {
930dcf12 822 if (this.shallCreateDynamicWorker()) {
aa9eede8 823 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
824 if (
825 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
826 ) {
aa9eede8 827 return workerNodeKey
6c6afb84 828 }
17393ac8 829 }
930dcf12
JB
830 return this.workerChoiceStrategyContext.execute()
831 }
832
6c6afb84
JB
833 /**
834 * Conditions for dynamic worker creation.
835 *
836 * @returns Whether to create a dynamic worker or not.
837 */
838 private shallCreateDynamicWorker (): boolean {
930dcf12 839 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
840 }
841
280c2a77 842 /**
aa9eede8 843 * Sends a message to worker given its worker node key.
280c2a77 844 *
aa9eede8 845 * @param workerNodeKey - The worker node key.
38e795c1 846 * @param message - The message.
280c2a77
S
847 */
848 protected abstract sendToWorker (
aa9eede8 849 workerNodeKey: number,
280c2a77
S
850 message: MessageValue<Data>
851 ): void
852
729c563d 853 /**
41344292 854 * Creates a new worker.
6c6afb84
JB
855 *
856 * @returns Newly created worker.
729c563d 857 */
280c2a77 858 protected abstract createWorker (): Worker
c97c7edb 859
4a6952ff 860 /**
aa9eede8 861 * Creates a new, completely set up worker node.
4a6952ff 862 *
aa9eede8 863 * @returns New, completely set up worker node key.
4a6952ff 864 */
aa9eede8 865 protected createAndSetupWorkerNode (): number {
bdacc2d2 866 const worker = this.createWorker()
280c2a77 867
35cf1c03 868 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 869 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 870 worker.on('error', error => {
9b106837
JB
871 const workerNodeKey = this.getWorkerNodeKey(worker)
872 const workerInfo = this.getWorkerInfo(workerNodeKey)
873 workerInfo.ready = false
0dc838e3 874 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 875 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 876 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 877 if (workerInfo.dynamic) {
aa9eede8 878 this.createAndSetupDynamicWorkerNode()
8a1260a3 879 } else {
aa9eede8 880 this.createAndSetupWorkerNode()
8a1260a3 881 }
5baee0d7 882 }
19dbc45b 883 if (this.opts.enableTasksQueue === true) {
9b106837 884 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 885 }
5baee0d7 886 })
a35560ba
S
887 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
888 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 889 worker.once('exit', () => {
f06e48d8 890 this.removeWorkerNode(worker)
a974afa6 891 })
280c2a77 892
aa9eede8 893 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 894
aa9eede8 895 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 896
aa9eede8 897 return workerNodeKey
c97c7edb 898 }
be0676b3 899
930dcf12 900 /**
aa9eede8 901 * Creates a new, completely set up dynamic worker node.
930dcf12 902 *
aa9eede8 903 * @returns New, completely set up dynamic worker node key.
930dcf12 904 */
aa9eede8
JB
905 protected createAndSetupDynamicWorkerNode (): number {
906 const workerNodeKey = this.createAndSetupWorkerNode()
907 this.registerWorkerMessageListener(workerNodeKey, message => {
908 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
909 message.workerId
910 ) as number
911 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
930dcf12
JB
912 if (
913 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
914 (message.kill != null &&
915 ((this.opts.enableTasksQueue === false &&
aa9eede8 916 workerUsage.tasks.executing === 0) ||
7b56f532 917 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
918 workerUsage.tasks.executing === 0 &&
919 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12
JB
920 ) {
921 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
7c89e6a4
JB
922 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
923 if (isAsyncFunction(destroyWorkerNodeBounded)) {
924 (
925 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
926 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
927 } else {
928 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
929 localWorkerNodeKey
930 )
931 }
930dcf12
JB
932 }
933 })
aa9eede8 934 const workerInfo = this.getWorkerInfo(workerNodeKey)
b0a4db63 935 workerInfo.dynamic = true
b97d82d8
JB
936 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
937 workerInfo.ready = true
938 }
aa9eede8 939 this.sendToWorker(workerNodeKey, {
b0a4db63 940 checkActive: true,
21f710aa
JB
941 workerId: workerInfo.id as number
942 })
aa9eede8 943 return workerNodeKey
930dcf12
JB
944 }
945
a2ed5053 946 /**
aa9eede8 947 * Registers a listener callback on the worker given its worker node key.
a2ed5053 948 *
aa9eede8 949 * @param workerNodeKey - The worker node key.
a2ed5053
JB
950 * @param listener - The message listener callback.
951 */
85aeb3f3
JB
952 protected abstract registerWorkerMessageListener<
953 Message extends Data | Response
aa9eede8
JB
954 >(
955 workerNodeKey: number,
956 listener: (message: MessageValue<Message>) => void
957 ): void
a2ed5053
JB
958
959 /**
aa9eede8 960 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
961 * Can be overridden.
962 *
aa9eede8 963 * @param workerNodeKey - The newly created worker node key.
a2ed5053 964 */
aa9eede8 965 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 966 // Listen to worker messages.
aa9eede8 967 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 968 // Send the startup message to worker.
aa9eede8
JB
969 this.sendStartupMessageToWorker(workerNodeKey)
970 // Send the worker statistics message to worker.
971 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
972 }
973
85aeb3f3 974 /**
aa9eede8
JB
975 * Sends the startup message to worker given its worker node key.
976 *
977 * @param workerNodeKey - The worker node key.
978 */
979 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
980
981 /**
982 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 983 *
aa9eede8 984 * @param workerNodeKey - The worker node key.
85aeb3f3 985 */
aa9eede8
JB
986 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
987 this.sendToWorker(workerNodeKey, {
988 statistics: {
989 runTime:
990 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
991 .runTime.aggregate,
992 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
993 .elu.aggregate
994 },
995 workerId: this.getWorkerInfo(workerNodeKey).id as number
996 })
997 }
a2ed5053
JB
998
999 private redistributeQueuedTasks (workerNodeKey: number): void {
1000 while (this.tasksQueueSize(workerNodeKey) > 0) {
1001 let targetWorkerNodeKey: number = workerNodeKey
1002 let minQueuedTasks = Infinity
10ecf8fd 1003 let executeTask = false
a2ed5053
JB
1004 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1005 const workerInfo = this.getWorkerInfo(workerNodeId)
1006 if (
1007 workerNodeId !== workerNodeKey &&
1008 workerInfo.ready &&
1009 workerNode.usage.tasks.queued === 0
1010 ) {
10ecf8fd
JB
1011 if (workerNode.usage.tasks.executing === 0) {
1012 executeTask = true
1013 }
a2ed5053
JB
1014 targetWorkerNodeKey = workerNodeId
1015 break
1016 }
1017 if (
1018 workerNodeId !== workerNodeKey &&
1019 workerInfo.ready &&
1020 workerNode.usage.tasks.queued < minQueuedTasks
1021 ) {
1022 minQueuedTasks = workerNode.usage.tasks.queued
1023 targetWorkerNodeKey = workerNodeId
1024 }
1025 }
10ecf8fd
JB
1026 if (executeTask) {
1027 this.executeTask(
1028 targetWorkerNodeKey,
1029 this.dequeueTask(workerNodeKey) as Task<Data>
1030 )
1031 } else {
1032 this.enqueueTask(
1033 targetWorkerNodeKey,
1034 this.dequeueTask(workerNodeKey) as Task<Data>
1035 )
1036 }
a2ed5053
JB
1037 }
1038 }
1039
be0676b3 1040 /**
aa9eede8 1041 * This method is the listener registered for each worker message.
be0676b3 1042 *
bdacc2d2 1043 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1044 */
1045 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1046 return message => {
21f710aa 1047 this.checkMessageWorkerId(message)
d2c73f82 1048 if (message.ready != null) {
10e2aa7e
JB
1049 // Worker ready response received
1050 this.handleWorkerReadyResponse(message)
f59e1027 1051 } else if (message.id != null) {
a3445496 1052 // Task execution response received
6b272951
JB
1053 this.handleTaskExecutionResponse(message)
1054 }
1055 }
1056 }
1057
10e2aa7e 1058 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8
JB
1059 this.getWorkerInfo(
1060 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
e221309a 1061 ).ready = message.ready as boolean
2431bdb4
JB
1062 if (this.emitter != null && this.ready) {
1063 this.emitter.emit(PoolEvents.ready, this.info)
1064 }
6b272951
JB
1065 }
1066
1067 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1068 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1069 if (promiseResponse != null) {
1070 if (message.taskError != null) {
2a69b8c5 1071 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1072 promiseResponse.reject(message.taskError.message)
1073 } else {
1074 promiseResponse.resolve(message.data as Response)
1075 }
501aea93
JB
1076 const workerNodeKey = promiseResponse.workerNodeKey
1077 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1078 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1079 if (
1080 this.opts.enableTasksQueue === true &&
1081 this.tasksQueueSize(workerNodeKey) > 0
1082 ) {
1083 this.executeTask(
1084 workerNodeKey,
1085 this.dequeueTask(workerNodeKey) as Task<Data>
1086 )
be0676b3 1087 }
6b272951 1088 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1089 }
be0676b3 1090 }
7c0ba920 1091
ff733df7 1092 private checkAndEmitEvents (): void {
1f68cede 1093 if (this.emitter != null) {
ff733df7 1094 if (this.busy) {
2845f2a5 1095 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1096 }
6b27d407 1097 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1098 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1099 }
164d950a
JB
1100 }
1101 }
1102
8a1260a3 1103 /**
aa9eede8 1104 * Gets the worker information given its worker node key.
8a1260a3
JB
1105 *
1106 * @param workerNodeKey - The worker node key.
3f09ed9f 1107 * @returns The worker information.
8a1260a3 1108 */
aa9eede8 1109 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1110 return this.workerNodes[workerNodeKey].info
e221309a
JB
1111 }
1112
a05c10de 1113 /**
b0a4db63 1114 * Adds the given worker in the pool worker nodes.
ea7a90d3 1115 *
38e795c1 1116 * @param worker - The worker.
aa9eede8
JB
1117 * @returns The added worker node key.
1118 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1119 */
b0a4db63 1120 private addWorkerNode (worker: Worker): number {
cc3ab78b 1121 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1122 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1123 if (this.starting) {
1124 workerNode.info.ready = true
1125 }
aa9eede8
JB
1126 this.workerNodes.push(workerNode)
1127 const workerNodeKey = this.getWorkerNodeKey(worker)
1128 if (workerNodeKey === -1) {
1129 throw new Error('Worker node not found')
1130 }
1131 return workerNodeKey
ea7a90d3 1132 }
c923ce56 1133
51fe3d3c 1134 /**
f06e48d8 1135 * Removes the given worker from the pool worker nodes.
51fe3d3c 1136 *
f06e48d8 1137 * @param worker - The worker.
51fe3d3c 1138 */
416fd65c 1139 private removeWorkerNode (worker: Worker): void {
f06e48d8 1140 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1141 if (workerNodeKey !== -1) {
1142 this.workerNodes.splice(workerNodeKey, 1)
1143 this.workerChoiceStrategyContext.remove(workerNodeKey)
1144 }
51fe3d3c 1145 }
adc3c320 1146
b0a4db63 1147 /**
aa9eede8 1148 * Executes the given task on the worker given its worker node key.
b0a4db63 1149 *
aa9eede8 1150 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1151 * @param task - The task to execute.
1152 */
2e81254d 1153 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1154 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1155 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1156 }
1157
f9f00b5f 1158 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1159 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1160 }
1161
416fd65c 1162 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1163 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1164 }
1165
416fd65c 1166 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1167 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1168 }
1169
416fd65c 1170 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1171 while (this.tasksQueueSize(workerNodeKey) > 0) {
1172 this.executeTask(
1173 workerNodeKey,
1174 this.dequeueTask(workerNodeKey) as Task<Data>
1175 )
ff733df7 1176 }
4b628b48 1177 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1178 }
1179
ef41a6e6
JB
1180 private flushTasksQueues (): void {
1181 for (const [workerNodeKey] of this.workerNodes.entries()) {
1182 this.flushTasksQueue(workerNodeKey)
1183 }
1184 }
c97c7edb 1185}