refactor: cleanup http client example
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
501aea93
JB
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
075e51d1 86 /**
adc9cc64 87 * Whether the pool is starting or not.
075e51d1
JB
88 */
89 private readonly starting: boolean
afe0d5bf
JB
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
729c563d
S
95 /**
96 * Constructs a new poolifier pool.
97 *
38e795c1 98 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 99 * @param filePath - Path to the worker file.
38e795c1 100 * @param opts - Options for the pool.
729c563d 101 */
c97c7edb 102 public constructor (
b4213b7f
JB
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
c97c7edb 106 ) {
78cea37e 107 if (!this.isMain()) {
c97c7edb
S
108 throw new Error('Cannot start a pool from a worker!')
109 }
8d3782fa 110 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 111 this.checkFilePath(this.filePath)
7c0ba920 112 this.checkPoolOptions(this.opts)
1086026a 113
7254e419
JB
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 117 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 119
6bd72cd0 120 if (this.opts.enableEvents === true) {
7c0ba920
JB
121 this.emitter = new PoolEmitter()
122 }
d59df138
JB
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
da309861
JB
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
b6b32453
JB
132
133 this.setupHook()
134
075e51d1 135 this.starting = true
e761c033 136 this.startPool()
075e51d1 137 this.starting = false
afe0d5bf
JB
138
139 this.startTimestamp = performance.now()
c97c7edb
S
140 }
141
a35560ba 142 private checkFilePath (filePath: string): void {
ffcbbad8
JB
143 if (
144 filePath == null ||
3d6dd312 145 typeof filePath !== 'string' ||
ffcbbad8
JB
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
c510fea7
APA
148 throw new Error('Please specify a file with a worker implementation')
149 }
3d6dd312
JB
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
c510fea7
APA
153 }
154
8d3782fa
JB
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
78cea37e 160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 161 throw new TypeError(
0d80593b 162 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
163 )
164 } else if (numberOfWorkers < 0) {
473c717a 165 throw new RangeError(
8d3782fa
JB
166 'Cannot instantiate a pool with a negative number of workers'
167 )
6b27d407 168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 174 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
175 if (max == null) {
176 throw new Error(
177 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
178 )
179 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
180 throw new TypeError(
181 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
182 )
183 } else if (min > max) {
079de991
JB
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
b97d82d8 187 } else if (max === 0) {
079de991 188 throw new RangeError(
d640b48b 189 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
8d3782fa
JB
196 }
197 }
198
7c0ba920 199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
1f68cede 210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 223 }
aee46736
JB
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 230 throw new Error(
aee46736 231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
232 )
233 }
7c0ba920
JB
234 }
235
0d80593b
JB
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
49be33fe
JB
244 if (
245 workerChoiceStrategyOptions.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803
JB
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
a20f0ba5
JB
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
0d80593b
JB
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
f0d7f803
JB
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
a20f0ba5 282 throw new Error(
f0d7f803 283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
284 )
285 }
286 }
287
e761c033
JB
288 private startPool (): void {
289 while (
290 this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
293 0
294 ) < this.numberOfWorkers
295 ) {
aa9eede8 296 this.createAndSetupWorkerNode()
e761c033
JB
297 }
298 }
299
08f3f44c 300 /** @inheritDoc */
6b27d407
JB
301 public get info (): PoolInfo {
302 return {
23ccf9d7 303 version,
6b27d407 304 type: this.type,
184855e6 305 worker: this.worker,
2431bdb4
JB
306 ready: this.ready,
307 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
308 minSize: this.minSize,
309 maxSize: this.maxSize,
c05f0d50
JB
310 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
311 .runTime.aggregate &&
1305e9a8
JB
312 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
314 workerNodes: this.workerNodes.length,
315 idleWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing === 0
a4e07f72
JB
318 ? accumulator + 1
319 : accumulator,
6b27d407
JB
320 0
321 ),
322 busyWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
f59e1027 324 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
325 0
326 ),
a4e07f72 327 executedTasks: this.workerNodes.reduce(
6b27d407 328 (accumulator, workerNode) =>
f59e1027 329 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
330 0
331 ),
332 executingTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
f59e1027 334 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
335 0
336 ),
daf86646
JB
337 ...(this.opts.enableTasksQueue === true && {
338 queuedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + workerNode.usage.tasks.queued,
341 0
342 )
343 }),
344 ...(this.opts.enableTasksQueue === true && {
345 maxQueuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
348 0
349 )
350 }),
a4e07f72
JB
351 failedTasks: this.workerNodes.reduce(
352 (accumulator, workerNode) =>
f59e1027 353 accumulator + workerNode.usage.tasks.failed,
a4e07f72 354 0
1dcf8b7b
JB
355 ),
356 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .runTime.aggregate && {
358 runTime: {
98e72cda
JB
359 minimum: round(
360 Math.min(
361 ...this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
363 )
1dcf8b7b
JB
364 )
365 ),
98e72cda
JB
366 maximum: round(
367 Math.max(
368 ...this.workerNodes.map(
369 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
370 )
1dcf8b7b 371 )
98e72cda
JB
372 ),
373 average: round(
374 this.workerNodes.reduce(
375 (accumulator, workerNode) =>
376 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
377 0
378 ) /
379 this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.tasks?.executed ?? 0),
382 0
383 )
384 ),
385 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386 .runTime.median && {
387 median: round(
388 median(
389 this.workerNodes.map(
390 workerNode => workerNode.usage.runTime?.median ?? 0
391 )
392 )
393 )
394 })
1dcf8b7b
JB
395 }
396 }),
397 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
398 .waitTime.aggregate && {
399 waitTime: {
98e72cda
JB
400 minimum: round(
401 Math.min(
402 ...this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
404 )
1dcf8b7b
JB
405 )
406 ),
98e72cda
JB
407 maximum: round(
408 Math.max(
409 ...this.workerNodes.map(
410 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
411 )
1dcf8b7b 412 )
98e72cda
JB
413 ),
414 average: round(
415 this.workerNodes.reduce(
416 (accumulator, workerNode) =>
417 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
418 0
419 ) /
420 this.workerNodes.reduce(
421 (accumulator, workerNode) =>
422 accumulator + (workerNode.usage.tasks?.executed ?? 0),
423 0
424 )
425 ),
426 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
427 .waitTime.median && {
428 median: round(
429 median(
430 this.workerNodes.map(
431 workerNode => workerNode.usage.waitTime?.median ?? 0
432 )
433 )
434 )
435 })
1dcf8b7b
JB
436 }
437 })
6b27d407
JB
438 }
439 }
08f3f44c 440
aa9eede8
JB
441 /**
442 * The pool readiness boolean status.
443 */
2431bdb4
JB
444 private get ready (): boolean {
445 return (
b97d82d8
JB
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 !workerNode.info.dynamic && workerNode.info.ready
449 ? accumulator + 1
450 : accumulator,
451 0
452 ) >= this.minSize
2431bdb4
JB
453 )
454 }
455
afe0d5bf 456 /**
aa9eede8 457 * The approximate pool utilization.
afe0d5bf
JB
458 *
459 * @returns The pool utilization.
460 */
461 private get utilization (): number {
8e5ca040 462 const poolTimeCapacity =
fe7d90db 463 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
464 const totalTasksRunTime = this.workerNodes.reduce(
465 (accumulator, workerNode) =>
71514351 466 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
467 0
468 )
469 const totalTasksWaitTime = this.workerNodes.reduce(
470 (accumulator, workerNode) =>
71514351 471 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
472 0
473 )
8e5ca040 474 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
475 }
476
8881ae32 477 /**
aa9eede8 478 * The pool type.
8881ae32
JB
479 *
480 * If it is `'dynamic'`, it provides the `max` property.
481 */
482 protected abstract get type (): PoolType
483
184855e6 484 /**
aa9eede8 485 * The worker type.
184855e6
JB
486 */
487 protected abstract get worker (): WorkerType
488
c2ade475 489 /**
aa9eede8 490 * The pool minimum size.
c2ade475 491 */
6b27d407 492 protected abstract get minSize (): number
ff733df7
JB
493
494 /**
aa9eede8 495 * The pool maximum size.
ff733df7 496 */
6b27d407 497 protected abstract get maxSize (): number
a35560ba 498
6b813701
JB
499 /**
500 * Checks if the worker id sent in the received message from a worker is valid.
501 *
502 * @param message - The received message.
503 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
504 */
21f710aa
JB
505 private checkMessageWorkerId (message: MessageValue<Response>): void {
506 if (
507 message.workerId != null &&
aad6fb64 508 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
509 ) {
510 throw new Error(
511 `Worker message received from unknown worker '${message.workerId}'`
512 )
513 }
514 }
515
ffcbbad8 516 /**
f06e48d8 517 * Gets the given worker its worker node key.
ffcbbad8
JB
518 *
519 * @param worker - The worker.
f59e1027 520 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 521 */
aad6fb64 522 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8
JB
523 return this.workerNodes.findIndex(
524 workerNode => workerNode.worker === worker
525 )
bf9549ae
JB
526 }
527
aa9eede8
JB
528 /**
529 * Gets the worker node key given its worker id.
530 *
531 * @param workerId - The worker id.
aad6fb64 532 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 533 */
aad6fb64
JB
534 private getWorkerNodeKeyByWorkerId (workerId: number): number {
535 return this.workerNodes.findIndex(
536 workerNode => workerNode.info.id === workerId
537 )
aa9eede8
JB
538 }
539
afc003b2 540 /** @inheritDoc */
a35560ba 541 public setWorkerChoiceStrategy (
59219cbb
JB
542 workerChoiceStrategy: WorkerChoiceStrategy,
543 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 544 ): void {
aee46736 545 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 546 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
547 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
548 this.opts.workerChoiceStrategy
549 )
550 if (workerChoiceStrategyOptions != null) {
551 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
552 }
aa9eede8 553 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 554 workerNode.resetUsage()
aa9eede8 555 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 556 }
a20f0ba5
JB
557 }
558
559 /** @inheritDoc */
560 public setWorkerChoiceStrategyOptions (
561 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
562 ): void {
0d80593b 563 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
564 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
565 this.workerChoiceStrategyContext.setOptions(
566 this.opts.workerChoiceStrategyOptions
a35560ba
S
567 )
568 }
569
a20f0ba5 570 /** @inheritDoc */
8f52842f
JB
571 public enableTasksQueue (
572 enable: boolean,
573 tasksQueueOptions?: TasksQueueOptions
574 ): void {
a20f0ba5 575 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 576 this.flushTasksQueues()
a20f0ba5
JB
577 }
578 this.opts.enableTasksQueue = enable
8f52842f 579 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
580 }
581
582 /** @inheritDoc */
8f52842f 583 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 584 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
585 this.checkValidTasksQueueOptions(tasksQueueOptions)
586 this.opts.tasksQueueOptions =
587 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 588 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
589 delete this.opts.tasksQueueOptions
590 }
591 }
592
593 private buildTasksQueueOptions (
594 tasksQueueOptions: TasksQueueOptions
595 ): TasksQueueOptions {
596 return {
597 concurrency: tasksQueueOptions?.concurrency ?? 1
598 }
599 }
600
c319c66b
JB
601 /**
602 * Whether the pool is full or not.
603 *
604 * The pool filling boolean status.
605 */
dea903a8
JB
606 protected get full (): boolean {
607 return this.workerNodes.length >= this.maxSize
608 }
c2ade475 609
c319c66b
JB
610 /**
611 * Whether the pool is busy or not.
612 *
613 * The pool busyness boolean status.
614 */
615 protected abstract get busy (): boolean
7c0ba920 616
6c6afb84 617 /**
3d76750a 618 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
619 *
620 * @returns Worker nodes busyness boolean status.
621 */
c2ade475 622 protected internalBusy (): boolean {
3d76750a
JB
623 if (this.opts.enableTasksQueue === true) {
624 return (
625 this.workerNodes.findIndex(
626 workerNode =>
627 workerNode.info.ready &&
628 workerNode.usage.tasks.executing <
629 (this.opts.tasksQueueOptions?.concurrency as number)
630 ) === -1
631 )
632 } else {
633 return (
634 this.workerNodes.findIndex(
635 workerNode =>
636 workerNode.info.ready && workerNode.usage.tasks.executing === 0
637 ) === -1
638 )
639 }
cb70b19d
JB
640 }
641
afc003b2 642 /** @inheritDoc */
a86b6df1 643 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
644 return await new Promise<Response>((resolve, reject) => {
645 const timestamp = performance.now()
646 const workerNodeKey = this.chooseWorkerNode()
501aea93 647 const task: Task<Data> = {
52b71763
JB
648 name: name ?? DEFAULT_TASK_NAME,
649 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
650 data: data ?? ({} as Data),
651 timestamp,
652 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 653 taskId: randomUUID()
52b71763 654 }
7629bdf1 655 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
656 resolve,
657 reject,
501aea93 658 workerNodeKey
2e81254d 659 })
52b71763 660 if (
4e377863
JB
661 this.opts.enableTasksQueue === false ||
662 (this.opts.enableTasksQueue === true &&
663 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 664 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 665 ) {
501aea93 666 this.executeTask(workerNodeKey, task)
4e377863
JB
667 } else {
668 this.enqueueTask(workerNodeKey, task)
52b71763
JB
669 }
670 this.checkAndEmitEvents()
2e81254d 671 })
280c2a77 672 }
c97c7edb 673
afc003b2 674 /** @inheritDoc */
c97c7edb 675 public async destroy (): Promise<void> {
1fbcaa7c 676 await Promise.all(
81c02522 677 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 678 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
679 })
680 )
c97c7edb
S
681 }
682
4a6952ff 683 /**
aa9eede8 684 * Terminates the worker node given its worker node key.
4a6952ff 685 *
aa9eede8 686 * @param workerNodeKey - The worker node key.
4a6952ff 687 */
81c02522 688 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 689
729c563d 690 /**
6677a3d3
JB
691 * Setup hook to execute code before worker nodes are created in the abstract constructor.
692 * Can be overridden.
afc003b2
JB
693 *
694 * @virtual
729c563d 695 */
280c2a77 696 protected setupHook (): void {
d99ba5a8 697 // Intentionally empty
280c2a77 698 }
c97c7edb 699
729c563d 700 /**
280c2a77
S
701 * Should return whether the worker is the main worker or not.
702 */
703 protected abstract isMain (): boolean
704
705 /**
2e81254d 706 * Hook executed before the worker task execution.
bf9549ae 707 * Can be overridden.
729c563d 708 *
f06e48d8 709 * @param workerNodeKey - The worker node key.
1c6fe997 710 * @param task - The task to execute.
729c563d 711 */
1c6fe997
JB
712 protected beforeTaskExecutionHook (
713 workerNodeKey: number,
714 task: Task<Data>
715 ): void {
f59e1027 716 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
717 ++workerUsage.tasks.executing
718 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 719 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
720 task.name as string
721 ) as WorkerUsage
eb8afc8a
JB
722 ++taskWorkerUsage.tasks.executing
723 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
724 }
725
c01733f1 726 /**
2e81254d 727 * Hook executed after the worker task execution.
bf9549ae 728 * Can be overridden.
c01733f1 729 *
501aea93 730 * @param workerNodeKey - The worker node key.
38e795c1 731 * @param message - The received message.
c01733f1 732 */
2e81254d 733 protected afterTaskExecutionHook (
501aea93 734 workerNodeKey: number,
2740a743 735 message: MessageValue<Response>
bf9549ae 736 ): void {
ff128cc9 737 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
738 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
739 this.updateRunTimeWorkerUsage(workerUsage, message)
740 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 741 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 742 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 743 ) as WorkerUsage
eb8afc8a
JB
744 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
745 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
746 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
747 }
748
749 private updateTaskStatisticsWorkerUsage (
750 workerUsage: WorkerUsage,
751 message: MessageValue<Response>
752 ): void {
a4e07f72
JB
753 const workerTaskStatistics = workerUsage.tasks
754 --workerTaskStatistics.executing
98e72cda
JB
755 if (message.taskError == null) {
756 ++workerTaskStatistics.executed
757 } else {
a4e07f72 758 ++workerTaskStatistics.failed
2740a743 759 }
f8eb0a2a
JB
760 }
761
a4e07f72
JB
762 private updateRunTimeWorkerUsage (
763 workerUsage: WorkerUsage,
f8eb0a2a
JB
764 message: MessageValue<Response>
765 ): void {
e4f20deb
JB
766 updateMeasurementStatistics(
767 workerUsage.runTime,
768 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
769 message.taskPerformance?.runTime ?? 0,
770 workerUsage.tasks.executed
771 )
f8eb0a2a
JB
772 }
773
a4e07f72
JB
774 private updateWaitTimeWorkerUsage (
775 workerUsage: WorkerUsage,
1c6fe997 776 task: Task<Data>
f8eb0a2a 777 ): void {
1c6fe997
JB
778 const timestamp = performance.now()
779 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
780 updateMeasurementStatistics(
781 workerUsage.waitTime,
782 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
783 taskWaitTime,
784 workerUsage.tasks.executed
785 )
c01733f1 786 }
787
a4e07f72 788 private updateEluWorkerUsage (
5df69fab 789 workerUsage: WorkerUsage,
62c15a68
JB
790 message: MessageValue<Response>
791 ): void {
008512c7
JB
792 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
793 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
794 updateMeasurementStatistics(
795 workerUsage.elu.active,
008512c7 796 eluTaskStatisticsRequirements,
e4f20deb
JB
797 message.taskPerformance?.elu?.active ?? 0,
798 workerUsage.tasks.executed
799 )
800 updateMeasurementStatistics(
801 workerUsage.elu.idle,
008512c7 802 eluTaskStatisticsRequirements,
e4f20deb
JB
803 message.taskPerformance?.elu?.idle ?? 0,
804 workerUsage.tasks.executed
805 )
008512c7 806 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 807 if (message.taskPerformance?.elu != null) {
f7510105
JB
808 if (workerUsage.elu.utilization != null) {
809 workerUsage.elu.utilization =
810 (workerUsage.elu.utilization +
811 message.taskPerformance.elu.utilization) /
812 2
813 } else {
814 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
815 }
62c15a68
JB
816 }
817 }
818 }
819
280c2a77 820 /**
f06e48d8 821 * Chooses a worker node for the next task.
280c2a77 822 *
6c6afb84 823 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 824 *
aa9eede8 825 * @returns The chosen worker node key
280c2a77 826 */
6c6afb84 827 private chooseWorkerNode (): number {
930dcf12 828 if (this.shallCreateDynamicWorker()) {
aa9eede8 829 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
830 if (
831 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
832 ) {
aa9eede8 833 return workerNodeKey
6c6afb84 834 }
17393ac8 835 }
930dcf12
JB
836 return this.workerChoiceStrategyContext.execute()
837 }
838
6c6afb84
JB
839 /**
840 * Conditions for dynamic worker creation.
841 *
842 * @returns Whether to create a dynamic worker or not.
843 */
844 private shallCreateDynamicWorker (): boolean {
930dcf12 845 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
846 }
847
280c2a77 848 /**
aa9eede8 849 * Sends a message to worker given its worker node key.
280c2a77 850 *
aa9eede8 851 * @param workerNodeKey - The worker node key.
38e795c1 852 * @param message - The message.
280c2a77
S
853 */
854 protected abstract sendToWorker (
aa9eede8 855 workerNodeKey: number,
280c2a77
S
856 message: MessageValue<Data>
857 ): void
858
729c563d 859 /**
41344292 860 * Creates a new worker.
6c6afb84
JB
861 *
862 * @returns Newly created worker.
729c563d 863 */
280c2a77 864 protected abstract createWorker (): Worker
c97c7edb 865
4a6952ff 866 /**
aa9eede8 867 * Creates a new, completely set up worker node.
4a6952ff 868 *
aa9eede8 869 * @returns New, completely set up worker node key.
4a6952ff 870 */
aa9eede8 871 protected createAndSetupWorkerNode (): number {
bdacc2d2 872 const worker = this.createWorker()
280c2a77 873
35cf1c03 874 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 875 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 876 worker.on('error', error => {
aad6fb64 877 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
878 const workerInfo = this.getWorkerInfo(workerNodeKey)
879 workerInfo.ready = false
0dc838e3 880 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 881 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 882 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 883 if (workerInfo.dynamic) {
aa9eede8 884 this.createAndSetupDynamicWorkerNode()
8a1260a3 885 } else {
aa9eede8 886 this.createAndSetupWorkerNode()
8a1260a3 887 }
5baee0d7 888 }
19dbc45b 889 if (this.opts.enableTasksQueue === true) {
9b106837 890 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 891 }
5baee0d7 892 })
a35560ba
S
893 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
894 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 895 worker.once('exit', () => {
f06e48d8 896 this.removeWorkerNode(worker)
a974afa6 897 })
280c2a77 898
aa9eede8 899 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 900
aa9eede8 901 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 902
aa9eede8 903 return workerNodeKey
c97c7edb 904 }
be0676b3 905
930dcf12 906 /**
aa9eede8 907 * Creates a new, completely set up dynamic worker node.
930dcf12 908 *
aa9eede8 909 * @returns New, completely set up dynamic worker node key.
930dcf12 910 */
aa9eede8
JB
911 protected createAndSetupDynamicWorkerNode (): number {
912 const workerNodeKey = this.createAndSetupWorkerNode()
913 this.registerWorkerMessageListener(workerNodeKey, message => {
914 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
915 message.workerId
aad6fb64 916 )
aa9eede8 917 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 918 // Kill message received from worker
930dcf12
JB
919 if (
920 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
921 (message.kill != null &&
922 ((this.opts.enableTasksQueue === false &&
aa9eede8 923 workerUsage.tasks.executing === 0) ||
7b56f532 924 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
925 workerUsage.tasks.executing === 0 &&
926 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 927 ) {
81c02522 928 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930dcf12
JB
929 }
930 })
aa9eede8 931 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 932 this.sendToWorker(workerNodeKey, {
b0a4db63 933 checkActive: true,
21f710aa
JB
934 workerId: workerInfo.id as number
935 })
b5e113f6
JB
936 workerInfo.dynamic = true
937 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
938 workerInfo.ready = true
939 }
aa9eede8 940 return workerNodeKey
930dcf12
JB
941 }
942
a2ed5053 943 /**
aa9eede8 944 * Registers a listener callback on the worker given its worker node key.
a2ed5053 945 *
aa9eede8 946 * @param workerNodeKey - The worker node key.
a2ed5053
JB
947 * @param listener - The message listener callback.
948 */
85aeb3f3
JB
949 protected abstract registerWorkerMessageListener<
950 Message extends Data | Response
aa9eede8
JB
951 >(
952 workerNodeKey: number,
953 listener: (message: MessageValue<Message>) => void
954 ): void
a2ed5053
JB
955
956 /**
aa9eede8 957 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
958 * Can be overridden.
959 *
aa9eede8 960 * @param workerNodeKey - The newly created worker node key.
a2ed5053 961 */
aa9eede8 962 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 963 // Listen to worker messages.
aa9eede8 964 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 965 // Send the startup message to worker.
aa9eede8
JB
966 this.sendStartupMessageToWorker(workerNodeKey)
967 // Send the worker statistics message to worker.
968 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
969 }
970
85aeb3f3 971 /**
aa9eede8
JB
972 * Sends the startup message to worker given its worker node key.
973 *
974 * @param workerNodeKey - The worker node key.
975 */
976 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
977
978 /**
979 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 980 *
aa9eede8 981 * @param workerNodeKey - The worker node key.
85aeb3f3 982 */
aa9eede8
JB
983 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
984 this.sendToWorker(workerNodeKey, {
985 statistics: {
986 runTime:
987 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
988 .runTime.aggregate,
989 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
990 .elu.aggregate
991 },
992 workerId: this.getWorkerInfo(workerNodeKey).id as number
993 })
994 }
a2ed5053
JB
995
996 private redistributeQueuedTasks (workerNodeKey: number): void {
997 while (this.tasksQueueSize(workerNodeKey) > 0) {
998 let targetWorkerNodeKey: number = workerNodeKey
999 let minQueuedTasks = Infinity
10ecf8fd 1000 let executeTask = false
a2ed5053
JB
1001 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1002 const workerInfo = this.getWorkerInfo(workerNodeId)
1003 if (
1004 workerNodeId !== workerNodeKey &&
1005 workerInfo.ready &&
1006 workerNode.usage.tasks.queued === 0
1007 ) {
a5ed75b7
JB
1008 if (
1009 this.workerNodes[workerNodeId].usage.tasks.executing <
1010 (this.opts.tasksQueueOptions?.concurrency as number)
1011 ) {
10ecf8fd
JB
1012 executeTask = true
1013 }
a2ed5053
JB
1014 targetWorkerNodeKey = workerNodeId
1015 break
1016 }
1017 if (
1018 workerNodeId !== workerNodeKey &&
1019 workerInfo.ready &&
1020 workerNode.usage.tasks.queued < minQueuedTasks
1021 ) {
1022 minQueuedTasks = workerNode.usage.tasks.queued
1023 targetWorkerNodeKey = workerNodeId
1024 }
1025 }
10ecf8fd
JB
1026 if (executeTask) {
1027 this.executeTask(
1028 targetWorkerNodeKey,
1029 this.dequeueTask(workerNodeKey) as Task<Data>
1030 )
1031 } else {
1032 this.enqueueTask(
1033 targetWorkerNodeKey,
1034 this.dequeueTask(workerNodeKey) as Task<Data>
1035 )
1036 }
a2ed5053
JB
1037 }
1038 }
1039
be0676b3 1040 /**
aa9eede8 1041 * This method is the listener registered for each worker message.
be0676b3 1042 *
bdacc2d2 1043 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1044 */
1045 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1046 return message => {
21f710aa 1047 this.checkMessageWorkerId(message)
d2c73f82 1048 if (message.ready != null) {
81c02522 1049 // Worker ready response received from worker
10e2aa7e 1050 this.handleWorkerReadyResponse(message)
7629bdf1 1051 } else if (message.taskId != null) {
81c02522 1052 // Task execution response received from worker
6b272951
JB
1053 this.handleTaskExecutionResponse(message)
1054 }
1055 }
1056 }
1057
10e2aa7e 1058 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1059 this.getWorkerInfo(
aad6fb64 1060 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1061 ).ready = message.ready as boolean
2431bdb4
JB
1062 if (this.emitter != null && this.ready) {
1063 this.emitter.emit(PoolEvents.ready, this.info)
1064 }
6b272951
JB
1065 }
1066
1067 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
7629bdf1
JB
1068 const promiseResponse = this.promiseResponseMap.get(
1069 message.taskId as string
1070 )
6b272951
JB
1071 if (promiseResponse != null) {
1072 if (message.taskError != null) {
2a69b8c5 1073 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1074 promiseResponse.reject(message.taskError.message)
1075 } else {
1076 promiseResponse.resolve(message.data as Response)
1077 }
501aea93
JB
1078 const workerNodeKey = promiseResponse.workerNodeKey
1079 this.afterTaskExecutionHook(workerNodeKey, message)
7629bdf1 1080 this.promiseResponseMap.delete(message.taskId as string)
6b272951
JB
1081 if (
1082 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1083 this.tasksQueueSize(workerNodeKey) > 0 &&
1084 this.workerNodes[workerNodeKey].usage.tasks.executing <
1085 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1086 ) {
1087 this.executeTask(
1088 workerNodeKey,
1089 this.dequeueTask(workerNodeKey) as Task<Data>
1090 )
be0676b3 1091 }
6b272951 1092 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1093 }
be0676b3 1094 }
7c0ba920 1095
ff733df7 1096 private checkAndEmitEvents (): void {
1f68cede 1097 if (this.emitter != null) {
ff733df7 1098 if (this.busy) {
2845f2a5 1099 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1100 }
6b27d407 1101 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1102 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1103 }
164d950a
JB
1104 }
1105 }
1106
8a1260a3 1107 /**
aa9eede8 1108 * Gets the worker information given its worker node key.
8a1260a3
JB
1109 *
1110 * @param workerNodeKey - The worker node key.
3f09ed9f 1111 * @returns The worker information.
8a1260a3 1112 */
aa9eede8 1113 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1114 return this.workerNodes[workerNodeKey].info
e221309a
JB
1115 }
1116
a05c10de 1117 /**
b0a4db63 1118 * Adds the given worker in the pool worker nodes.
ea7a90d3 1119 *
38e795c1 1120 * @param worker - The worker.
aa9eede8
JB
1121 * @returns The added worker node key.
1122 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1123 */
b0a4db63 1124 private addWorkerNode (worker: Worker): number {
cc3ab78b 1125 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1126 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1127 if (this.starting) {
1128 workerNode.info.ready = true
1129 }
aa9eede8 1130 this.workerNodes.push(workerNode)
aad6fb64 1131 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1132 if (workerNodeKey === -1) {
1133 throw new Error('Worker node not found')
1134 }
1135 return workerNodeKey
ea7a90d3 1136 }
c923ce56 1137
51fe3d3c 1138 /**
f06e48d8 1139 * Removes the given worker from the pool worker nodes.
51fe3d3c 1140 *
f06e48d8 1141 * @param worker - The worker.
51fe3d3c 1142 */
416fd65c 1143 private removeWorkerNode (worker: Worker): void {
aad6fb64 1144 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1145 if (workerNodeKey !== -1) {
1146 this.workerNodes.splice(workerNodeKey, 1)
1147 this.workerChoiceStrategyContext.remove(workerNodeKey)
1148 }
51fe3d3c 1149 }
adc3c320 1150
b0a4db63 1151 /**
aa9eede8 1152 * Executes the given task on the worker given its worker node key.
b0a4db63 1153 *
aa9eede8 1154 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1155 * @param task - The task to execute.
1156 */
2e81254d 1157 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1158 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1159 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1160 }
1161
f9f00b5f 1162 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1163 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1164 }
1165
416fd65c 1166 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1167 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1168 }
1169
416fd65c 1170 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1171 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1172 }
1173
81c02522 1174 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1175 while (this.tasksQueueSize(workerNodeKey) > 0) {
1176 this.executeTask(
1177 workerNodeKey,
1178 this.dequeueTask(workerNodeKey) as Task<Data>
1179 )
ff733df7 1180 }
4b628b48 1181 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1182 }
1183
ef41a6e6
JB
1184 private flushTasksQueues (): void {
1185 for (const [workerNodeKey] of this.workerNodes.entries()) {
1186 this.flushTasksQueue(workerNodeKey)
1187 }
1188 }
c97c7edb 1189}