perf: speed up dynamic worker termination
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
7c89e6a4 13 isAsyncFunction,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
e102732c
JB
31import type {
32 IWorker,
4b628b48 33 IWorkerNode,
8a1260a3 34 WorkerInfo,
4b628b48 35 WorkerType,
e102732c
JB
36 WorkerUsage
37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
c97c7edb
S
109 throw new Error('Cannot start a pool from a worker!')
110 }
8d3782fa 111 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 112 this.checkFilePath(this.filePath)
7c0ba920 113 this.checkPoolOptions(this.opts)
1086026a 114
7254e419
JB
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 119
6bd72cd0 120 if (this.opts.enableEvents === true) {
7c0ba920
JB
121 this.emitter = new PoolEmitter()
122 }
d59df138
JB
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
da309861
JB
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
b6b32453
JB
132
133 this.setupHook()
134
075e51d1 135 this.starting = true
e761c033 136 this.startPool()
075e51d1 137 this.starting = false
afe0d5bf
JB
138
139 this.startTimestamp = performance.now()
c97c7edb
S
140 }
141
a35560ba 142 private checkFilePath (filePath: string): void {
ffcbbad8
JB
143 if (
144 filePath == null ||
3d6dd312 145 typeof filePath !== 'string' ||
ffcbbad8
JB
146 (typeof filePath === 'string' && filePath.trim().length === 0)
147 ) {
c510fea7
APA
148 throw new Error('Please specify a file with a worker implementation')
149 }
3d6dd312
JB
150 if (!existsSync(filePath)) {
151 throw new Error(`Cannot find the worker file '${filePath}'`)
152 }
c510fea7
APA
153 }
154
8d3782fa
JB
155 private checkNumberOfWorkers (numberOfWorkers: number): void {
156 if (numberOfWorkers == null) {
157 throw new Error(
158 'Cannot instantiate a pool without specifying the number of workers'
159 )
78cea37e 160 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 161 throw new TypeError(
0d80593b 162 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
163 )
164 } else if (numberOfWorkers < 0) {
473c717a 165 throw new RangeError(
8d3782fa
JB
166 'Cannot instantiate a pool with a negative number of workers'
167 )
6b27d407 168 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
169 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
170 }
171 }
172
173 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
174 if (this.type === PoolTypes.dynamic) {
175 if (min > max) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
178 )
b97d82d8 179 } else if (max === 0) {
079de991 180 throw new RangeError(
b97d82d8 181 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
182 )
183 } else if (min === max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
186 )
187 }
8d3782fa
JB
188 }
189 }
190
7c0ba920 191 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
192 if (isPlainObject(opts)) {
193 this.opts.workerChoiceStrategy =
194 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
195 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
196 this.opts.workerChoiceStrategyOptions =
197 opts.workerChoiceStrategyOptions ??
198 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
199 this.checkValidWorkerChoiceStrategyOptions(
200 this.opts.workerChoiceStrategyOptions
201 )
1f68cede 202 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
203 this.opts.enableEvents = opts.enableEvents ?? true
204 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
205 if (this.opts.enableTasksQueue) {
206 this.checkValidTasksQueueOptions(
207 opts.tasksQueueOptions as TasksQueueOptions
208 )
209 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
210 opts.tasksQueueOptions as TasksQueueOptions
211 )
212 }
213 } else {
214 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 215 }
aee46736
JB
216 }
217
218 private checkValidWorkerChoiceStrategy (
219 workerChoiceStrategy: WorkerChoiceStrategy
220 ): void {
221 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 222 throw new Error(
aee46736 223 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
224 )
225 }
7c0ba920
JB
226 }
227
0d80593b
JB
228 private checkValidWorkerChoiceStrategyOptions (
229 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
230 ): void {
231 if (!isPlainObject(workerChoiceStrategyOptions)) {
232 throw new TypeError(
233 'Invalid worker choice strategy options: must be a plain object'
234 )
235 }
49be33fe
JB
236 if (
237 workerChoiceStrategyOptions.weights != null &&
6b27d407 238 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
239 ) {
240 throw new Error(
241 'Invalid worker choice strategy options: must have a weight for each worker node'
242 )
243 }
f0d7f803
JB
244 if (
245 workerChoiceStrategyOptions.measurement != null &&
246 !Object.values(Measurements).includes(
247 workerChoiceStrategyOptions.measurement
248 )
249 ) {
250 throw new Error(
251 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
252 )
253 }
0d80593b
JB
254 }
255
a20f0ba5
JB
256 private checkValidTasksQueueOptions (
257 tasksQueueOptions: TasksQueueOptions
258 ): void {
0d80593b
JB
259 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
260 throw new TypeError('Invalid tasks queue options: must be a plain object')
261 }
f0d7f803
JB
262 if (
263 tasksQueueOptions?.concurrency != null &&
264 !Number.isSafeInteger(tasksQueueOptions.concurrency)
265 ) {
266 throw new TypeError(
267 'Invalid worker tasks concurrency: must be an integer'
268 )
269 }
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 tasksQueueOptions.concurrency <= 0
273 ) {
a20f0ba5 274 throw new Error(
f0d7f803 275 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
276 )
277 }
278 }
279
e761c033
JB
280 private startPool (): void {
281 while (
282 this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
285 0
286 ) < this.numberOfWorkers
287 ) {
aa9eede8 288 this.createAndSetupWorkerNode()
e761c033
JB
289 }
290 }
291
08f3f44c 292 /** @inheritDoc */
6b27d407
JB
293 public get info (): PoolInfo {
294 return {
23ccf9d7 295 version,
6b27d407 296 type: this.type,
184855e6 297 worker: this.worker,
2431bdb4
JB
298 ready: this.ready,
299 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
300 minSize: this.minSize,
301 maxSize: this.maxSize,
c05f0d50
JB
302 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
303 .runTime.aggregate &&
1305e9a8
JB
304 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
305 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
306 workerNodes: this.workerNodes.length,
307 idleWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
f59e1027 309 workerNode.usage.tasks.executing === 0
a4e07f72
JB
310 ? accumulator + 1
311 : accumulator,
6b27d407
JB
312 0
313 ),
314 busyWorkerNodes: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
f59e1027 316 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
317 0
318 ),
a4e07f72 319 executedTasks: this.workerNodes.reduce(
6b27d407 320 (accumulator, workerNode) =>
f59e1027 321 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
322 0
323 ),
324 executingTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
f59e1027 326 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
327 0
328 ),
329 queuedTasks: this.workerNodes.reduce(
df593701 330 (accumulator, workerNode) =>
f59e1027 331 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
332 0
333 ),
334 maxQueuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
b25a42cd 336 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 337 0
a4e07f72
JB
338 ),
339 failedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
f59e1027 341 accumulator + workerNode.usage.tasks.failed,
a4e07f72 342 0
1dcf8b7b
JB
343 ),
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate && {
346 runTime: {
98e72cda
JB
347 minimum: round(
348 Math.min(
349 ...this.workerNodes.map(
350 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
351 )
1dcf8b7b
JB
352 )
353 ),
98e72cda
JB
354 maximum: round(
355 Math.max(
356 ...this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
358 )
1dcf8b7b 359 )
98e72cda
JB
360 ),
361 average: round(
362 this.workerNodes.reduce(
363 (accumulator, workerNode) =>
364 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
365 0
366 ) /
367 this.workerNodes.reduce(
368 (accumulator, workerNode) =>
369 accumulator + (workerNode.usage.tasks?.executed ?? 0),
370 0
371 )
372 ),
373 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
374 .runTime.median && {
375 median: round(
376 median(
377 this.workerNodes.map(
378 workerNode => workerNode.usage.runTime?.median ?? 0
379 )
380 )
381 )
382 })
1dcf8b7b
JB
383 }
384 }),
385 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
386 .waitTime.aggregate && {
387 waitTime: {
98e72cda
JB
388 minimum: round(
389 Math.min(
390 ...this.workerNodes.map(
391 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
392 )
1dcf8b7b
JB
393 )
394 ),
98e72cda
JB
395 maximum: round(
396 Math.max(
397 ...this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
399 )
1dcf8b7b 400 )
98e72cda
JB
401 ),
402 average: round(
403 this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
406 0
407 ) /
408 this.workerNodes.reduce(
409 (accumulator, workerNode) =>
410 accumulator + (workerNode.usage.tasks?.executed ?? 0),
411 0
412 )
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .waitTime.median && {
416 median: round(
417 median(
418 this.workerNodes.map(
419 workerNode => workerNode.usage.waitTime?.median ?? 0
420 )
421 )
422 )
423 })
1dcf8b7b
JB
424 }
425 })
6b27d407
JB
426 }
427 }
08f3f44c 428
aa9eede8
JB
429 /**
430 * The pool readiness boolean status.
431 */
2431bdb4
JB
432 private get ready (): boolean {
433 return (
b97d82d8
JB
434 this.workerNodes.reduce(
435 (accumulator, workerNode) =>
436 !workerNode.info.dynamic && workerNode.info.ready
437 ? accumulator + 1
438 : accumulator,
439 0
440 ) >= this.minSize
2431bdb4
JB
441 )
442 }
443
afe0d5bf 444 /**
aa9eede8 445 * The approximate pool utilization.
afe0d5bf
JB
446 *
447 * @returns The pool utilization.
448 */
449 private get utilization (): number {
8e5ca040 450 const poolTimeCapacity =
fe7d90db 451 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
452 const totalTasksRunTime = this.workerNodes.reduce(
453 (accumulator, workerNode) =>
71514351 454 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
455 0
456 )
457 const totalTasksWaitTime = this.workerNodes.reduce(
458 (accumulator, workerNode) =>
71514351 459 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
460 0
461 )
8e5ca040 462 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
463 }
464
8881ae32 465 /**
aa9eede8 466 * The pool type.
8881ae32
JB
467 *
468 * If it is `'dynamic'`, it provides the `max` property.
469 */
470 protected abstract get type (): PoolType
471
184855e6 472 /**
aa9eede8 473 * The worker type.
184855e6
JB
474 */
475 protected abstract get worker (): WorkerType
476
c2ade475 477 /**
aa9eede8 478 * The pool minimum size.
c2ade475 479 */
6b27d407 480 protected abstract get minSize (): number
ff733df7
JB
481
482 /**
aa9eede8 483 * The pool maximum size.
ff733df7 484 */
6b27d407 485 protected abstract get maxSize (): number
a35560ba 486
6b813701
JB
487 /**
488 * Checks if the worker id sent in the received message from a worker is valid.
489 *
490 * @param message - The received message.
491 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
492 */
21f710aa
JB
493 private checkMessageWorkerId (message: MessageValue<Response>): void {
494 if (
495 message.workerId != null &&
aa9eede8 496 this.getWorkerNodeKeyByWorkerId(message.workerId) == null
21f710aa
JB
497 ) {
498 throw new Error(
499 `Worker message received from unknown worker '${message.workerId}'`
500 )
501 }
502 }
503
ffcbbad8 504 /**
f06e48d8 505 * Gets the given worker its worker node key.
ffcbbad8
JB
506 *
507 * @param worker - The worker.
f59e1027 508 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 509 */
aa9eede8 510 private getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
511 return this.workerNodes.findIndex(
512 workerNode => workerNode.worker === worker
513 )
bf9549ae
JB
514 }
515
aa9eede8
JB
516 /**
517 * Gets the worker node key given its worker id.
518 *
519 * @param workerId - The worker id.
520 * @returns The worker node key if the worker id is found in the pool worker nodes, `undefined` otherwise.
521 */
522 private getWorkerNodeKeyByWorkerId (workerId: number): number | undefined {
523 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
524 if (workerNode.info.id === workerId) {
525 return workerNodeKey
526 }
527 }
528 }
529
afc003b2 530 /** @inheritDoc */
a35560ba 531 public setWorkerChoiceStrategy (
59219cbb
JB
532 workerChoiceStrategy: WorkerChoiceStrategy,
533 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 534 ): void {
aee46736 535 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 536 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
537 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
538 this.opts.workerChoiceStrategy
539 )
540 if (workerChoiceStrategyOptions != null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 }
aa9eede8 543 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 544 workerNode.resetUsage()
aa9eede8 545 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 546 }
a20f0ba5
JB
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategyOptions (
551 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
552 ): void {
0d80593b 553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
554 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
555 this.workerChoiceStrategyContext.setOptions(
556 this.opts.workerChoiceStrategyOptions
a35560ba
S
557 )
558 }
559
a20f0ba5 560 /** @inheritDoc */
8f52842f
JB
561 public enableTasksQueue (
562 enable: boolean,
563 tasksQueueOptions?: TasksQueueOptions
564 ): void {
a20f0ba5 565 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 566 this.flushTasksQueues()
a20f0ba5
JB
567 }
568 this.opts.enableTasksQueue = enable
8f52842f 569 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
570 }
571
572 /** @inheritDoc */
8f52842f 573 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 574 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
575 this.checkValidTasksQueueOptions(tasksQueueOptions)
576 this.opts.tasksQueueOptions =
577 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 578 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
579 delete this.opts.tasksQueueOptions
580 }
581 }
582
583 private buildTasksQueueOptions (
584 tasksQueueOptions: TasksQueueOptions
585 ): TasksQueueOptions {
586 return {
587 concurrency: tasksQueueOptions?.concurrency ?? 1
588 }
589 }
590
c319c66b
JB
591 /**
592 * Whether the pool is full or not.
593 *
594 * The pool filling boolean status.
595 */
dea903a8
JB
596 protected get full (): boolean {
597 return this.workerNodes.length >= this.maxSize
598 }
c2ade475 599
c319c66b
JB
600 /**
601 * Whether the pool is busy or not.
602 *
603 * The pool busyness boolean status.
604 */
605 protected abstract get busy (): boolean
7c0ba920 606
6c6afb84
JB
607 /**
608 * Whether worker nodes are executing at least one task.
609 *
610 * @returns Worker nodes busyness boolean status.
611 */
c2ade475 612 protected internalBusy (): boolean {
e0ae6100
JB
613 return (
614 this.workerNodes.findIndex(workerNode => {
f59e1027 615 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
616 }) === -1
617 )
cb70b19d
JB
618 }
619
afc003b2 620 /** @inheritDoc */
a86b6df1 621 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
622 return await new Promise<Response>((resolve, reject) => {
623 const timestamp = performance.now()
624 const workerNodeKey = this.chooseWorkerNode()
501aea93 625 const task: Task<Data> = {
52b71763
JB
626 name: name ?? DEFAULT_TASK_NAME,
627 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
628 data: data ?? ({} as Data),
629 timestamp,
630 workerId: this.getWorkerInfo(workerNodeKey).id as number,
631 id: randomUUID()
632 }
501aea93 633 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
634 resolve,
635 reject,
501aea93 636 workerNodeKey
2e81254d 637 })
52b71763
JB
638 if (
639 this.opts.enableTasksQueue === true &&
640 (this.busy ||
641 this.workerNodes[workerNodeKey].usage.tasks.executing >=
642 ((this.opts.tasksQueueOptions as TasksQueueOptions)
643 .concurrency as number))
644 ) {
501aea93 645 this.enqueueTask(workerNodeKey, task)
52b71763 646 } else {
501aea93 647 this.executeTask(workerNodeKey, task)
52b71763
JB
648 }
649 this.checkAndEmitEvents()
2e81254d 650 })
280c2a77 651 }
c97c7edb 652
afc003b2 653 /** @inheritDoc */
c97c7edb 654 public async destroy (): Promise<void> {
1fbcaa7c 655 await Promise.all(
875a7c37
JB
656 this.workerNodes.map(async (workerNode, workerNodeKey) => {
657 this.flushTasksQueue(workerNodeKey)
47aacbaa 658 // FIXME: wait for tasks to be finished
920278a2
JB
659 const workerExitPromise = new Promise<void>(resolve => {
660 workerNode.worker.on('exit', () => {
661 resolve()
662 })
663 })
aa9eede8 664 await this.destroyWorkerNode(workerNodeKey)
920278a2 665 await workerExitPromise
1fbcaa7c
JB
666 })
667 )
c97c7edb
S
668 }
669
4a6952ff 670 /**
aa9eede8 671 * Terminates the worker node given its worker node key.
4a6952ff 672 *
aa9eede8 673 * @param workerNodeKey - The worker node key.
4a6952ff 674 */
aa9eede8
JB
675 protected abstract destroyWorkerNode (
676 workerNodeKey: number
677 ): void | Promise<void>
c97c7edb 678
729c563d 679 /**
6677a3d3
JB
680 * Setup hook to execute code before worker nodes are created in the abstract constructor.
681 * Can be overridden.
afc003b2
JB
682 *
683 * @virtual
729c563d 684 */
280c2a77 685 protected setupHook (): void {
d99ba5a8 686 // Intentionally empty
280c2a77 687 }
c97c7edb 688
729c563d 689 /**
280c2a77
S
690 * Should return whether the worker is the main worker or not.
691 */
692 protected abstract isMain (): boolean
693
694 /**
2e81254d 695 * Hook executed before the worker task execution.
bf9549ae 696 * Can be overridden.
729c563d 697 *
f06e48d8 698 * @param workerNodeKey - The worker node key.
1c6fe997 699 * @param task - The task to execute.
729c563d 700 */
1c6fe997
JB
701 protected beforeTaskExecutionHook (
702 workerNodeKey: number,
703 task: Task<Data>
704 ): void {
f59e1027 705 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
706 ++workerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 708 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
709 task.name as string
710 ) as WorkerUsage
eb8afc8a
JB
711 ++taskWorkerUsage.tasks.executing
712 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
713 }
714
c01733f1 715 /**
2e81254d 716 * Hook executed after the worker task execution.
bf9549ae 717 * Can be overridden.
c01733f1 718 *
501aea93 719 * @param workerNodeKey - The worker node key.
38e795c1 720 * @param message - The received message.
c01733f1 721 */
2e81254d 722 protected afterTaskExecutionHook (
501aea93 723 workerNodeKey: number,
2740a743 724 message: MessageValue<Response>
bf9549ae 725 ): void {
ff128cc9 726 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
727 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
728 this.updateRunTimeWorkerUsage(workerUsage, message)
729 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 730 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 731 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 732 ) as WorkerUsage
eb8afc8a
JB
733 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
734 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
735 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
736 }
737
738 private updateTaskStatisticsWorkerUsage (
739 workerUsage: WorkerUsage,
740 message: MessageValue<Response>
741 ): void {
a4e07f72
JB
742 const workerTaskStatistics = workerUsage.tasks
743 --workerTaskStatistics.executing
98e72cda
JB
744 if (message.taskError == null) {
745 ++workerTaskStatistics.executed
746 } else {
a4e07f72 747 ++workerTaskStatistics.failed
2740a743 748 }
f8eb0a2a
JB
749 }
750
a4e07f72
JB
751 private updateRunTimeWorkerUsage (
752 workerUsage: WorkerUsage,
f8eb0a2a
JB
753 message: MessageValue<Response>
754 ): void {
e4f20deb
JB
755 updateMeasurementStatistics(
756 workerUsage.runTime,
757 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
758 message.taskPerformance?.runTime ?? 0,
759 workerUsage.tasks.executed
760 )
f8eb0a2a
JB
761 }
762
a4e07f72
JB
763 private updateWaitTimeWorkerUsage (
764 workerUsage: WorkerUsage,
1c6fe997 765 task: Task<Data>
f8eb0a2a 766 ): void {
1c6fe997
JB
767 const timestamp = performance.now()
768 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
769 updateMeasurementStatistics(
770 workerUsage.waitTime,
771 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
772 taskWaitTime,
773 workerUsage.tasks.executed
774 )
c01733f1 775 }
776
a4e07f72 777 private updateEluWorkerUsage (
5df69fab 778 workerUsage: WorkerUsage,
62c15a68
JB
779 message: MessageValue<Response>
780 ): void {
008512c7
JB
781 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
782 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
783 updateMeasurementStatistics(
784 workerUsage.elu.active,
008512c7 785 eluTaskStatisticsRequirements,
e4f20deb
JB
786 message.taskPerformance?.elu?.active ?? 0,
787 workerUsage.tasks.executed
788 )
789 updateMeasurementStatistics(
790 workerUsage.elu.idle,
008512c7 791 eluTaskStatisticsRequirements,
e4f20deb
JB
792 message.taskPerformance?.elu?.idle ?? 0,
793 workerUsage.tasks.executed
794 )
008512c7 795 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 796 if (message.taskPerformance?.elu != null) {
f7510105
JB
797 if (workerUsage.elu.utilization != null) {
798 workerUsage.elu.utilization =
799 (workerUsage.elu.utilization +
800 message.taskPerformance.elu.utilization) /
801 2
802 } else {
803 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
804 }
62c15a68
JB
805 }
806 }
807 }
808
280c2a77 809 /**
f06e48d8 810 * Chooses a worker node for the next task.
280c2a77 811 *
6c6afb84 812 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 813 *
aa9eede8 814 * @returns The chosen worker node key
280c2a77 815 */
6c6afb84 816 private chooseWorkerNode (): number {
930dcf12 817 if (this.shallCreateDynamicWorker()) {
aa9eede8 818 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
819 if (
820 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
821 ) {
aa9eede8 822 return workerNodeKey
6c6afb84 823 }
17393ac8 824 }
930dcf12
JB
825 return this.workerChoiceStrategyContext.execute()
826 }
827
6c6afb84
JB
828 /**
829 * Conditions for dynamic worker creation.
830 *
831 * @returns Whether to create a dynamic worker or not.
832 */
833 private shallCreateDynamicWorker (): boolean {
930dcf12 834 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
835 }
836
280c2a77 837 /**
aa9eede8 838 * Sends a message to worker given its worker node key.
280c2a77 839 *
aa9eede8 840 * @param workerNodeKey - The worker node key.
38e795c1 841 * @param message - The message.
280c2a77
S
842 */
843 protected abstract sendToWorker (
aa9eede8 844 workerNodeKey: number,
280c2a77
S
845 message: MessageValue<Data>
846 ): void
847
729c563d 848 /**
41344292 849 * Creates a new worker.
6c6afb84
JB
850 *
851 * @returns Newly created worker.
729c563d 852 */
280c2a77 853 protected abstract createWorker (): Worker
c97c7edb 854
4a6952ff 855 /**
aa9eede8 856 * Creates a new, completely set up worker node.
4a6952ff 857 *
aa9eede8 858 * @returns New, completely set up worker node key.
4a6952ff 859 */
aa9eede8 860 protected createAndSetupWorkerNode (): number {
bdacc2d2 861 const worker = this.createWorker()
280c2a77 862
35cf1c03 863 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 864 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 865 worker.on('error', error => {
9b106837
JB
866 const workerNodeKey = this.getWorkerNodeKey(worker)
867 const workerInfo = this.getWorkerInfo(workerNodeKey)
868 workerInfo.ready = false
0dc838e3 869 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 870 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 871 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 872 if (workerInfo.dynamic) {
aa9eede8 873 this.createAndSetupDynamicWorkerNode()
8a1260a3 874 } else {
aa9eede8 875 this.createAndSetupWorkerNode()
8a1260a3 876 }
5baee0d7 877 }
19dbc45b 878 if (this.opts.enableTasksQueue === true) {
9b106837 879 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 880 }
5baee0d7 881 })
a35560ba
S
882 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
883 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 884 worker.once('exit', () => {
f06e48d8 885 this.removeWorkerNode(worker)
a974afa6 886 })
280c2a77 887
aa9eede8 888 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 889
aa9eede8 890 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 891
aa9eede8 892 return workerNodeKey
c97c7edb 893 }
be0676b3 894
930dcf12 895 /**
aa9eede8 896 * Creates a new, completely set up dynamic worker node.
930dcf12 897 *
aa9eede8 898 * @returns New, completely set up dynamic worker node key.
930dcf12 899 */
aa9eede8
JB
900 protected createAndSetupDynamicWorkerNode (): number {
901 const workerNodeKey = this.createAndSetupWorkerNode()
902 this.registerWorkerMessageListener(workerNodeKey, message => {
903 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
904 message.workerId
905 ) as number
906 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
930dcf12
JB
907 if (
908 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
909 (message.kill != null &&
910 ((this.opts.enableTasksQueue === false &&
aa9eede8 911 workerUsage.tasks.executing === 0) ||
7b56f532 912 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
913 workerUsage.tasks.executing === 0 &&
914 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12
JB
915 ) {
916 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
7c89e6a4
JB
917 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
918 if (isAsyncFunction(destroyWorkerNodeBounded)) {
919 (
920 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
921 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
922 } else {
923 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
924 localWorkerNodeKey
925 )
926 }
930dcf12
JB
927 }
928 })
aa9eede8 929 const workerInfo = this.getWorkerInfo(workerNodeKey)
b0a4db63 930 workerInfo.dynamic = true
b97d82d8
JB
931 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
932 workerInfo.ready = true
933 }
aa9eede8 934 this.sendToWorker(workerNodeKey, {
b0a4db63 935 checkActive: true,
21f710aa
JB
936 workerId: workerInfo.id as number
937 })
aa9eede8 938 return workerNodeKey
930dcf12
JB
939 }
940
a2ed5053 941 /**
aa9eede8 942 * Registers a listener callback on the worker given its worker node key.
a2ed5053 943 *
aa9eede8 944 * @param workerNodeKey - The worker node key.
a2ed5053
JB
945 * @param listener - The message listener callback.
946 */
85aeb3f3
JB
947 protected abstract registerWorkerMessageListener<
948 Message extends Data | Response
aa9eede8
JB
949 >(
950 workerNodeKey: number,
951 listener: (message: MessageValue<Message>) => void
952 ): void
a2ed5053
JB
953
954 /**
aa9eede8 955 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
956 * Can be overridden.
957 *
aa9eede8 958 * @param workerNodeKey - The newly created worker node key.
a2ed5053 959 */
aa9eede8 960 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 961 // Listen to worker messages.
aa9eede8 962 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 963 // Send the startup message to worker.
aa9eede8
JB
964 this.sendStartupMessageToWorker(workerNodeKey)
965 // Send the worker statistics message to worker.
966 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
967 }
968
85aeb3f3 969 /**
aa9eede8
JB
970 * Sends the startup message to worker given its worker node key.
971 *
972 * @param workerNodeKey - The worker node key.
973 */
974 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
975
976 /**
977 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 978 *
aa9eede8 979 * @param workerNodeKey - The worker node key.
85aeb3f3 980 */
aa9eede8
JB
981 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
982 this.sendToWorker(workerNodeKey, {
983 statistics: {
984 runTime:
985 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
986 .runTime.aggregate,
987 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
988 .elu.aggregate
989 },
990 workerId: this.getWorkerInfo(workerNodeKey).id as number
991 })
992 }
a2ed5053
JB
993
994 private redistributeQueuedTasks (workerNodeKey: number): void {
995 while (this.tasksQueueSize(workerNodeKey) > 0) {
996 let targetWorkerNodeKey: number = workerNodeKey
997 let minQueuedTasks = Infinity
998 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
999 const workerInfo = this.getWorkerInfo(workerNodeId)
1000 if (
1001 workerNodeId !== workerNodeKey &&
1002 workerInfo.ready &&
1003 workerNode.usage.tasks.queued === 0
1004 ) {
1005 targetWorkerNodeKey = workerNodeId
1006 break
1007 }
1008 if (
1009 workerNodeId !== workerNodeKey &&
1010 workerInfo.ready &&
1011 workerNode.usage.tasks.queued < minQueuedTasks
1012 ) {
1013 minQueuedTasks = workerNode.usage.tasks.queued
1014 targetWorkerNodeKey = workerNodeId
1015 }
1016 }
1017 this.enqueueTask(
1018 targetWorkerNodeKey,
1019 this.dequeueTask(workerNodeKey) as Task<Data>
1020 )
1021 }
1022 }
1023
be0676b3 1024 /**
aa9eede8 1025 * This method is the listener registered for each worker message.
be0676b3 1026 *
bdacc2d2 1027 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1028 */
1029 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1030 return message => {
21f710aa 1031 this.checkMessageWorkerId(message)
d2c73f82 1032 if (message.ready != null) {
10e2aa7e
JB
1033 // Worker ready response received
1034 this.handleWorkerReadyResponse(message)
f59e1027 1035 } else if (message.id != null) {
a3445496 1036 // Task execution response received
6b272951
JB
1037 this.handleTaskExecutionResponse(message)
1038 }
1039 }
1040 }
1041
10e2aa7e 1042 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8
JB
1043 this.getWorkerInfo(
1044 this.getWorkerNodeKeyByWorkerId(message.workerId) as number
e221309a 1045 ).ready = message.ready as boolean
2431bdb4
JB
1046 if (this.emitter != null && this.ready) {
1047 this.emitter.emit(PoolEvents.ready, this.info)
1048 }
6b272951
JB
1049 }
1050
1051 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1052 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1053 if (promiseResponse != null) {
1054 if (message.taskError != null) {
2a69b8c5 1055 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1056 promiseResponse.reject(message.taskError.message)
1057 } else {
1058 promiseResponse.resolve(message.data as Response)
1059 }
501aea93
JB
1060 const workerNodeKey = promiseResponse.workerNodeKey
1061 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1062 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1063 if (
1064 this.opts.enableTasksQueue === true &&
1065 this.tasksQueueSize(workerNodeKey) > 0
1066 ) {
1067 this.executeTask(
1068 workerNodeKey,
1069 this.dequeueTask(workerNodeKey) as Task<Data>
1070 )
be0676b3 1071 }
6b272951 1072 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1073 }
be0676b3 1074 }
7c0ba920 1075
ff733df7 1076 private checkAndEmitEvents (): void {
1f68cede 1077 if (this.emitter != null) {
ff733df7 1078 if (this.busy) {
2845f2a5 1079 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1080 }
6b27d407 1081 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1082 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1083 }
164d950a
JB
1084 }
1085 }
1086
8a1260a3 1087 /**
aa9eede8 1088 * Gets the worker information given its worker node key.
8a1260a3
JB
1089 *
1090 * @param workerNodeKey - The worker node key.
3f09ed9f 1091 * @returns The worker information.
8a1260a3 1092 */
aa9eede8 1093 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1094 return this.workerNodes[workerNodeKey].info
e221309a
JB
1095 }
1096
a05c10de 1097 /**
b0a4db63 1098 * Adds the given worker in the pool worker nodes.
ea7a90d3 1099 *
38e795c1 1100 * @param worker - The worker.
aa9eede8
JB
1101 * @returns The added worker node key.
1102 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1103 */
b0a4db63 1104 private addWorkerNode (worker: Worker): number {
cc3ab78b 1105 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1106 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1107 if (this.starting) {
1108 workerNode.info.ready = true
1109 }
aa9eede8
JB
1110 this.workerNodes.push(workerNode)
1111 const workerNodeKey = this.getWorkerNodeKey(worker)
1112 if (workerNodeKey === -1) {
1113 throw new Error('Worker node not found')
1114 }
1115 return workerNodeKey
ea7a90d3 1116 }
c923ce56 1117
51fe3d3c 1118 /**
f06e48d8 1119 * Removes the given worker from the pool worker nodes.
51fe3d3c 1120 *
f06e48d8 1121 * @param worker - The worker.
51fe3d3c 1122 */
416fd65c 1123 private removeWorkerNode (worker: Worker): void {
f06e48d8 1124 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1125 if (workerNodeKey !== -1) {
1126 this.workerNodes.splice(workerNodeKey, 1)
1127 this.workerChoiceStrategyContext.remove(workerNodeKey)
1128 }
51fe3d3c 1129 }
adc3c320 1130
b0a4db63 1131 /**
aa9eede8 1132 * Executes the given task on the worker given its worker node key.
b0a4db63 1133 *
aa9eede8 1134 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1135 * @param task - The task to execute.
1136 */
2e81254d 1137 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1138 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1139 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1140 }
1141
f9f00b5f 1142 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1143 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1144 }
1145
416fd65c 1146 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1147 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1148 }
1149
416fd65c 1150 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1151 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1152 }
1153
416fd65c 1154 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1155 while (this.tasksQueueSize(workerNodeKey) > 0) {
1156 this.executeTask(
1157 workerNodeKey,
1158 this.dequeueTask(workerNodeKey) as Task<Data>
1159 )
ff733df7 1160 }
4b628b48 1161 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1162 }
1163
ef41a6e6
JB
1164 private flushTasksQueues (): void {
1165 for (const [workerNodeKey] of this.workerNodes.entries()) {
1166 this.flushTasksQueue(workerNodeKey)
1167 }
1168 }
c97c7edb 1169}