Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
15b176e0
JB
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
47352846
JB
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
47352846 157 this.started = false
075e51d1 158 this.starting = false
47352846
JB
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 224 if (isPlainObject(opts)) {
47352846 225 this.opts.startWorkers = opts.startWorkers ?? true
2324f8c9
JB
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
0d80593b
JB
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
8990357d
JB
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
1f68cede 238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 251 }
aee46736
JB
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
2324f8c9
JB
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
b529c323 261 throw new Error(
aee46736 262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
263 )
264 }
7c0ba920
JB
265 }
266
0d80593b
JB
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
2324f8c9
JB
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
0d80593b
JB
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
8990357d 278 if (
2324f8c9 279 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
281 ) {
282 throw new TypeError(
8c0b113f 283 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
284 )
285 }
286 if (
2324f8c9 287 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 288 workerChoiceStrategyOptions.retries < 0
8990357d
JB
289 ) {
290 throw new RangeError(
8c0b113f 291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
292 )
293 }
49be33fe 294 if (
2324f8c9 295 workerChoiceStrategyOptions?.weights != null &&
6b27d407 296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
f0d7f803 302 if (
2324f8c9 303 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
0d80593b
JB
312 }
313
a20f0ba5 314 private checkValidTasksQueueOptions (
76d91ea0 315 tasksQueueOptions: TasksQueueOptions
a20f0ba5 316 ): void {
0d80593b
JB
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
f0d7f803 320 if (
b7d085c4 321 tasksQueueOptions?.concurrency != null &&
2324f8c9 322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
f0d7f803
JB
323 ) {
324 throw new TypeError(
20c6f652 325 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
326 )
327 }
328 if (
b7d085c4 329 tasksQueueOptions?.concurrency != null &&
2324f8c9 330 tasksQueueOptions.concurrency <= 0
f0d7f803 331 ) {
e695d66f 332 throw new RangeError(
2324f8c9 333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
20c6f652
JB
334 )
335 }
20c6f652 336 if (
b7d085c4 337 tasksQueueOptions?.size != null &&
2324f8c9 338 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 339 ) {
ff3f866a 340 throw new TypeError(
68dbcdc0 341 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
342 )
343 }
2324f8c9 344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 345 throw new RangeError(
2324f8c9 346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
347 )
348 }
349 }
350
08f3f44c 351 /** @inheritDoc */
6b27d407
JB
352 public get info (): PoolInfo {
353 return {
23ccf9d7 354 version,
6b27d407 355 type: this.type,
184855e6 356 worker: this.worker,
47352846 357 started: this.started,
2431bdb4
JB
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
360 minSize: this.minSize,
361 maxSize: this.maxSize,
c05f0d50
JB
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
1305e9a8
JB
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
f59e1027 369 workerNode.usage.tasks.executing === 0
a4e07f72
JB
370 ? accumulator + 1
371 : accumulator,
6b27d407
JB
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
377 0
378 ),
a4e07f72 379 executedTasks: this.workerNodes.reduce(
6b27d407 380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
f59e1027 386 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
387 0
388 ),
daf86646
JB
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
a1763c54
JB
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
68cbdc84
JB
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
a4e07f72
JB
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
f59e1027 415 accumulator + workerNode.usage.tasks.failed,
a4e07f72 416 0
1dcf8b7b
JB
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
98e72cda 421 minimum: round(
90d6701c 422 min(
98e72cda 423 ...this.workerNodes.map(
041dc05b 424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 425 )
1dcf8b7b
JB
426 )
427 ),
98e72cda 428 maximum: round(
90d6701c 429 max(
98e72cda 430 ...this.workerNodes.map(
041dc05b 431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 432 )
1dcf8b7b 433 )
98e72cda 434 ),
3baa0837
JB
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
98e72cda 444 )
dc021bcc 445 )
3baa0837 446 }),
98e72cda
JB
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
3baa0837
JB
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
98e72cda
JB
455 )
456 )
457 )
458 })
1dcf8b7b
JB
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
98e72cda 464 minimum: round(
90d6701c 465 min(
98e72cda 466 ...this.workerNodes.map(
041dc05b 467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 468 )
1dcf8b7b
JB
469 )
470 ),
98e72cda 471 maximum: round(
90d6701c 472 max(
98e72cda 473 ...this.workerNodes.map(
041dc05b 474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 475 )
1dcf8b7b 476 )
98e72cda 477 ),
3baa0837
JB
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
98e72cda 487 )
dc021bcc 488 )
3baa0837 489 }),
98e72cda
JB
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
3baa0837
JB
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
98e72cda
JB
498 )
499 )
500 )
501 })
1dcf8b7b
JB
502 }
503 })
6b27d407
JB
504 }
505 }
08f3f44c 506
aa9eede8
JB
507 /**
508 * The pool readiness boolean status.
509 */
2431bdb4
JB
510 private get ready (): boolean {
511 return (
b97d82d8
JB
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
2431bdb4
JB
519 )
520 }
521
afe0d5bf 522 /**
aa9eede8 523 * The approximate pool utilization.
afe0d5bf
JB
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
8e5ca040 528 const poolTimeCapacity =
fe7d90db 529 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
71514351 537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
538 0
539 )
8e5ca040 540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
541 }
542
8881ae32 543 /**
aa9eede8 544 * The pool type.
8881ae32
JB
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
184855e6 550 /**
aa9eede8 551 * The worker type.
184855e6
JB
552 */
553 protected abstract get worker (): WorkerType
554
c2ade475 555 /**
aa9eede8 556 * The pool minimum size.
c2ade475 557 */
8735b4e5
JB
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
ff733df7
JB
561
562 /**
aa9eede8 563 * The pool maximum size.
ff733df7 564 */
8735b4e5
JB
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
a35560ba 568
6b813701
JB
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
21f710aa 575 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
21f710aa 579 message.workerId != null &&
aad6fb64 580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
ffcbbad8 588 /**
f06e48d8 589 * Gets the given worker its worker node key.
ffcbbad8
JB
590 *
591 * @param worker - The worker.
f59e1027 592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 593 */
aad6fb64 594 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 595 return this.workerNodes.findIndex(
041dc05b 596 workerNode => workerNode.worker === worker
f06e48d8 597 )
bf9549ae
JB
598 }
599
aa9eede8
JB
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
aad6fb64 604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 605 */
72ae84a2 606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 607 return this.workerNodes.findIndex(
041dc05b 608 workerNode => workerNode.info.id === workerId
aad6fb64 609 )
aa9eede8
JB
610 }
611
afc003b2 612 /** @inheritDoc */
a35560ba 613 public setWorkerChoiceStrategy (
59219cbb
JB
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 616 ): void {
aee46736 617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 618 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
aa9eede8 625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 626 workerNode.resetUsage()
9edb9717 627 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 628 }
a20f0ba5
JB
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
0d80593b 635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
a20f0ba5
JB
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
a35560ba
S
642 )
643 }
644
a20f0ba5 645 /** @inheritDoc */
8f52842f
JB
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
a20f0ba5 650 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
ef41a6e6 653 this.flushTasksQueues()
a20f0ba5
JB
654 }
655 this.opts.enableTasksQueue = enable
8f52842f 656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
657 }
658
659 /** @inheritDoc */
8f52842f 660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 661 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
5baee0d7 676 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
9b38ab2d
JB
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
5b49e864 695 private setTasksQueueSize (size: number): void {
20c6f652 696 for (const workerNode of this.workerNodes) {
ff3f866a 697 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
698 }
699 }
700
d6ca1416
JB
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
c319c66b
JB
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
dea903a8
JB
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
c2ade475 735
c319c66b
JB
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
7c0ba920 742
6c6afb84 743 /**
3d76750a 744 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
c2ade475 748 protected internalBusy (): boolean {
3d76750a
JB
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
041dc05b 752 workerNode =>
3d76750a
JB
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
3d76750a 758 }
419e8121
JB
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
cb70b19d
JB
765 }
766
e81c38f2 767 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
771 const workerId = this.getWorkerInfo(workerNodeKey).id as number
772 return await new Promise<boolean>((resolve, reject) => {
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
785 `Task function operation ${
786 message.taskFunctionOperation as string
787 } failed on worker ${message.workerId}`
788 )
789 )
790 }
791 })
792 this.sendToWorker(workerNodeKey, message)
793 })
794 }
795
796 private async sendTaskFunctionOperationToWorkers (
e81c38f2
JB
797 message: Omit<MessageValue<Data>, 'workerId'>
798 ): Promise<boolean> {
799 return await new Promise<boolean>((resolve, reject) => {
800 const responsesReceived = new Array<MessageValue<Data | Response>>()
801 for (const [workerNodeKey] of this.workerNodes.entries()) {
802 this.registerWorkerMessageListener(workerNodeKey, message => {
803 if (message.taskFunctionOperationStatus != null) {
804 responsesReceived.push(message)
805 if (
806 responsesReceived.length === this.workerNodes.length &&
807 responsesReceived.every(
808 message => message.taskFunctionOperationStatus === true
809 )
810 ) {
811 resolve(true)
812 } else if (
813 responsesReceived.length === this.workerNodes.length &&
814 responsesReceived.some(
815 message => message.taskFunctionOperationStatus === false
816 )
817 ) {
818 reject(
819 new Error(
820 `Task function operation ${
821 message.taskFunctionOperation as string
72ae84a2 822 } failed on worker ${message.workerId as number}`
e81c38f2
JB
823 )
824 )
825 }
826 }
827 })
72ae84a2 828 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
829 }
830 })
6703b9f4
JB
831 }
832
833 /** @inheritDoc */
834 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
835 for (const workerNode of this.workerNodes) {
836 if (
837 Array.isArray(workerNode.info.taskFunctionNames) &&
838 workerNode.info.taskFunctionNames.includes(name)
839 ) {
840 return true
841 }
842 }
843 return false
6703b9f4
JB
844 }
845
846 /** @inheritDoc */
e81c38f2
JB
847 public async addTaskFunction (
848 name: string,
3feeab69 849 fn: TaskFunction<Data, Response>
e81c38f2 850 ): Promise<boolean> {
3feeab69
JB
851 if (typeof name !== 'string') {
852 throw new TypeError('name argument must be a string')
853 }
854 if (typeof name === 'string' && name.trim().length === 0) {
855 throw new TypeError('name argument must not be an empty string')
856 }
857 if (typeof fn !== 'function') {
858 throw new TypeError('fn argument must be a function')
859 }
860 this.taskFunctions.set(name, fn)
72ae84a2 861 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
862 taskFunctionOperation: 'add',
863 taskFunctionName: name,
3feeab69 864 taskFunction: fn.toString()
6703b9f4 865 })
6703b9f4
JB
866 }
867
868 /** @inheritDoc */
e81c38f2 869 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
870 if (!this.taskFunctions.has(name)) {
871 throw new Error(
16248b23 872 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
873 )
874 }
72ae84a2
JB
875 this.taskFunctions.delete(name)
876 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
877 taskFunctionOperation: 'remove',
878 taskFunctionName: name
879 })
6703b9f4
JB
880 }
881
90d7d101 882 /** @inheritDoc */
6703b9f4 883 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
884 for (const workerNode of this.workerNodes) {
885 if (
6703b9f4
JB
886 Array.isArray(workerNode.info.taskFunctionNames) &&
887 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 888 ) {
6703b9f4 889 return workerNode.info.taskFunctionNames
f2dbbf95 890 }
90d7d101 891 }
f2dbbf95 892 return []
90d7d101
JB
893 }
894
6703b9f4 895 /** @inheritDoc */
e81c38f2 896 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 897 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
898 taskFunctionOperation: 'default',
899 taskFunctionName: name
900 })
6703b9f4
JB
901 }
902
375f7504
JB
903 private shallExecuteTask (workerNodeKey: number): boolean {
904 return (
905 this.tasksQueueSize(workerNodeKey) === 0 &&
906 this.workerNodes[workerNodeKey].usage.tasks.executing <
907 (this.opts.tasksQueueOptions?.concurrency as number)
908 )
909 }
910
afc003b2 911 /** @inheritDoc */
7d91a8cd
JB
912 public async execute (
913 data?: Data,
914 name?: string,
915 transferList?: TransferListItem[]
916 ): Promise<Response> {
52b71763 917 return await new Promise<Response>((resolve, reject) => {
15b176e0 918 if (!this.started) {
47352846 919 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 920 return
15b176e0 921 }
7d91a8cd
JB
922 if (name != null && typeof name !== 'string') {
923 reject(new TypeError('name argument must be a string'))
9d2d0da1 924 return
7d91a8cd 925 }
90d7d101
JB
926 if (
927 name != null &&
928 typeof name === 'string' &&
929 name.trim().length === 0
930 ) {
f58b60b9 931 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 932 return
90d7d101 933 }
b558f6b5
JB
934 if (transferList != null && !Array.isArray(transferList)) {
935 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 936 return
b558f6b5
JB
937 }
938 const timestamp = performance.now()
939 const workerNodeKey = this.chooseWorkerNode()
501aea93 940 const task: Task<Data> = {
52b71763
JB
941 name: name ?? DEFAULT_TASK_NAME,
942 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
943 data: data ?? ({} as Data),
7d91a8cd 944 transferList,
52b71763 945 timestamp,
7629bdf1 946 taskId: randomUUID()
52b71763 947 }
7629bdf1 948 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
949 resolve,
950 reject,
501aea93 951 workerNodeKey
2e81254d 952 })
52b71763 953 if (
4e377863
JB
954 this.opts.enableTasksQueue === false ||
955 (this.opts.enableTasksQueue === true &&
375f7504 956 this.shallExecuteTask(workerNodeKey))
52b71763 957 ) {
501aea93 958 this.executeTask(workerNodeKey, task)
4e377863
JB
959 } else {
960 this.enqueueTask(workerNodeKey, task)
52b71763 961 }
2e81254d 962 })
280c2a77 963 }
c97c7edb 964
47352846
JB
965 /** @inheritdoc */
966 public start (): void {
967 this.starting = true
968 while (
969 this.workerNodes.reduce(
970 (accumulator, workerNode) =>
971 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
972 0
973 ) < this.numberOfWorkers
974 ) {
975 this.createAndSetupWorkerNode()
976 }
977 this.starting = false
978 this.started = true
979 }
980
afc003b2 981 /** @inheritDoc */
c97c7edb 982 public async destroy (): Promise<void> {
1fbcaa7c 983 await Promise.all(
81c02522 984 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 985 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
986 })
987 )
33e6bb4c 988 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 989 this.started = false
c97c7edb
S
990 }
991
1e3214b6 992 protected async sendKillMessageToWorker (
72ae84a2 993 workerNodeKey: number
1e3214b6 994 ): Promise<void> {
9edb9717 995 await new Promise<void>((resolve, reject) => {
041dc05b 996 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
997 if (message.kill === 'success') {
998 resolve()
999 } else if (message.kill === 'failure') {
72ae84a2
JB
1000 reject(
1001 new Error(
1002 `Worker ${
1003 message.workerId as number
1004 } kill message handling failed`
1005 )
1006 )
1e3214b6
JB
1007 }
1008 })
72ae84a2 1009 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1010 })
1e3214b6
JB
1011 }
1012
4a6952ff 1013 /**
aa9eede8 1014 * Terminates the worker node given its worker node key.
4a6952ff 1015 *
aa9eede8 1016 * @param workerNodeKey - The worker node key.
4a6952ff 1017 */
81c02522 1018 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1019
729c563d 1020 /**
6677a3d3
JB
1021 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1022 * Can be overridden.
afc003b2
JB
1023 *
1024 * @virtual
729c563d 1025 */
280c2a77 1026 protected setupHook (): void {
965df41c 1027 /* Intentionally empty */
280c2a77 1028 }
c97c7edb 1029
729c563d 1030 /**
280c2a77
S
1031 * Should return whether the worker is the main worker or not.
1032 */
1033 protected abstract isMain (): boolean
1034
1035 /**
2e81254d 1036 * Hook executed before the worker task execution.
bf9549ae 1037 * Can be overridden.
729c563d 1038 *
f06e48d8 1039 * @param workerNodeKey - The worker node key.
1c6fe997 1040 * @param task - The task to execute.
729c563d 1041 */
1c6fe997
JB
1042 protected beforeTaskExecutionHook (
1043 workerNodeKey: number,
1044 task: Task<Data>
1045 ): void {
94407def
JB
1046 if (this.workerNodes[workerNodeKey]?.usage != null) {
1047 const workerUsage = this.workerNodes[workerNodeKey].usage
1048 ++workerUsage.tasks.executing
1049 this.updateWaitTimeWorkerUsage(workerUsage, task)
1050 }
1051 if (
1052 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1053 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1054 task.name as string
1055 ) != null
1056 ) {
db0e38ee 1057 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1058 workerNodeKey
db0e38ee 1059 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1060 ++taskFunctionWorkerUsage.tasks.executing
1061 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1062 }
c97c7edb
S
1063 }
1064
c01733f1 1065 /**
2e81254d 1066 * Hook executed after the worker task execution.
bf9549ae 1067 * Can be overridden.
c01733f1 1068 *
501aea93 1069 * @param workerNodeKey - The worker node key.
38e795c1 1070 * @param message - The received message.
c01733f1 1071 */
2e81254d 1072 protected afterTaskExecutionHook (
501aea93 1073 workerNodeKey: number,
2740a743 1074 message: MessageValue<Response>
bf9549ae 1075 ): void {
94407def
JB
1076 if (this.workerNodes[workerNodeKey]?.usage != null) {
1077 const workerUsage = this.workerNodes[workerNodeKey].usage
1078 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1079 this.updateRunTimeWorkerUsage(workerUsage, message)
1080 this.updateEluWorkerUsage(workerUsage, message)
1081 }
1082 if (
1083 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1084 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1085 message.taskPerformance?.name as string
94407def
JB
1086 ) != null
1087 ) {
db0e38ee 1088 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1089 workerNodeKey
db0e38ee 1090 ].getTaskFunctionWorkerUsage(
0628755c 1091 message.taskPerformance?.name as string
b558f6b5 1092 ) as WorkerUsage
db0e38ee
JB
1093 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1094 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1095 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1096 }
1097 }
1098
db0e38ee
JB
1099 /**
1100 * Whether the worker node shall update its task function worker usage or not.
1101 *
1102 * @param workerNodeKey - The worker node key.
1103 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1104 */
1105 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1106 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1107 return (
94407def 1108 workerInfo != null &&
6703b9f4
JB
1109 Array.isArray(workerInfo.taskFunctionNames) &&
1110 workerInfo.taskFunctionNames.length > 2
b558f6b5 1111 )
f1c06930
JB
1112 }
1113
1114 private updateTaskStatisticsWorkerUsage (
1115 workerUsage: WorkerUsage,
1116 message: MessageValue<Response>
1117 ): void {
a4e07f72 1118 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1119 if (
1120 workerTaskStatistics.executing != null &&
1121 workerTaskStatistics.executing > 0
1122 ) {
1123 --workerTaskStatistics.executing
5bb5be17 1124 }
6703b9f4 1125 if (message.workerError == null) {
98e72cda
JB
1126 ++workerTaskStatistics.executed
1127 } else {
a4e07f72 1128 ++workerTaskStatistics.failed
2740a743 1129 }
f8eb0a2a
JB
1130 }
1131
a4e07f72
JB
1132 private updateRunTimeWorkerUsage (
1133 workerUsage: WorkerUsage,
f8eb0a2a
JB
1134 message: MessageValue<Response>
1135 ): void {
6703b9f4 1136 if (message.workerError != null) {
dc021bcc
JB
1137 return
1138 }
e4f20deb
JB
1139 updateMeasurementStatistics(
1140 workerUsage.runTime,
1141 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1142 message.taskPerformance?.runTime ?? 0
e4f20deb 1143 )
f8eb0a2a
JB
1144 }
1145
a4e07f72
JB
1146 private updateWaitTimeWorkerUsage (
1147 workerUsage: WorkerUsage,
1c6fe997 1148 task: Task<Data>
f8eb0a2a 1149 ): void {
1c6fe997
JB
1150 const timestamp = performance.now()
1151 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1152 updateMeasurementStatistics(
1153 workerUsage.waitTime,
1154 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1155 taskWaitTime
e4f20deb 1156 )
c01733f1 1157 }
1158
a4e07f72 1159 private updateEluWorkerUsage (
5df69fab 1160 workerUsage: WorkerUsage,
62c15a68
JB
1161 message: MessageValue<Response>
1162 ): void {
6703b9f4 1163 if (message.workerError != null) {
dc021bcc
JB
1164 return
1165 }
008512c7
JB
1166 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1168 updateMeasurementStatistics(
1169 workerUsage.elu.active,
008512c7 1170 eluTaskStatisticsRequirements,
dc021bcc 1171 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1172 )
1173 updateMeasurementStatistics(
1174 workerUsage.elu.idle,
008512c7 1175 eluTaskStatisticsRequirements,
dc021bcc 1176 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1177 )
008512c7 1178 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1179 if (message.taskPerformance?.elu != null) {
f7510105
JB
1180 if (workerUsage.elu.utilization != null) {
1181 workerUsage.elu.utilization =
1182 (workerUsage.elu.utilization +
1183 message.taskPerformance.elu.utilization) /
1184 2
1185 } else {
1186 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1187 }
62c15a68
JB
1188 }
1189 }
1190 }
1191
280c2a77 1192 /**
f06e48d8 1193 * Chooses a worker node for the next task.
280c2a77 1194 *
6c6afb84 1195 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1196 *
aa9eede8 1197 * @returns The chosen worker node key
280c2a77 1198 */
6c6afb84 1199 private chooseWorkerNode (): number {
930dcf12 1200 if (this.shallCreateDynamicWorker()) {
aa9eede8 1201 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1202 if (
b1aae695 1203 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1204 ) {
aa9eede8 1205 return workerNodeKey
6c6afb84 1206 }
17393ac8 1207 }
930dcf12
JB
1208 return this.workerChoiceStrategyContext.execute()
1209 }
1210
6c6afb84
JB
1211 /**
1212 * Conditions for dynamic worker creation.
1213 *
1214 * @returns Whether to create a dynamic worker or not.
1215 */
1216 private shallCreateDynamicWorker (): boolean {
930dcf12 1217 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1218 }
1219
280c2a77 1220 /**
aa9eede8 1221 * Sends a message to worker given its worker node key.
280c2a77 1222 *
aa9eede8 1223 * @param workerNodeKey - The worker node key.
38e795c1 1224 * @param message - The message.
7d91a8cd 1225 * @param transferList - The optional array of transferable objects.
280c2a77
S
1226 */
1227 protected abstract sendToWorker (
aa9eede8 1228 workerNodeKey: number,
7d91a8cd
JB
1229 message: MessageValue<Data>,
1230 transferList?: TransferListItem[]
280c2a77
S
1231 ): void
1232
729c563d 1233 /**
41344292 1234 * Creates a new worker.
6c6afb84
JB
1235 *
1236 * @returns Newly created worker.
729c563d 1237 */
280c2a77 1238 protected abstract createWorker (): Worker
c97c7edb 1239
4a6952ff 1240 /**
aa9eede8 1241 * Creates a new, completely set up worker node.
4a6952ff 1242 *
aa9eede8 1243 * @returns New, completely set up worker node key.
4a6952ff 1244 */
aa9eede8 1245 protected createAndSetupWorkerNode (): number {
bdacc2d2 1246 const worker = this.createWorker()
280c2a77 1247
fd04474e 1248 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1249 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1250 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1251 worker.on('error', error => {
aad6fb64 1252 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1253 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1254 workerInfo.ready = false
0dc838e3 1255 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1256 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1257 if (
b6bfca01 1258 this.started &&
9b38ab2d
JB
1259 !this.starting &&
1260 this.opts.restartWorkerOnError === true
15b176e0 1261 ) {
9b106837 1262 if (workerInfo.dynamic) {
aa9eede8 1263 this.createAndSetupDynamicWorkerNode()
8a1260a3 1264 } else {
aa9eede8 1265 this.createAndSetupWorkerNode()
8a1260a3 1266 }
5baee0d7 1267 }
9b38ab2d 1268 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1269 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1270 }
5baee0d7 1271 })
a35560ba 1272 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1273 worker.once('exit', () => {
f06e48d8 1274 this.removeWorkerNode(worker)
a974afa6 1275 })
280c2a77 1276
aa9eede8 1277 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1278
aa9eede8 1279 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1280
aa9eede8 1281 return workerNodeKey
c97c7edb 1282 }
be0676b3 1283
930dcf12 1284 /**
aa9eede8 1285 * Creates a new, completely set up dynamic worker node.
930dcf12 1286 *
aa9eede8 1287 * @returns New, completely set up dynamic worker node key.
930dcf12 1288 */
aa9eede8
JB
1289 protected createAndSetupDynamicWorkerNode (): number {
1290 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1291 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1292 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1293 message.workerId
aad6fb64 1294 )
aa9eede8 1295 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1296 // Kill message received from worker
930dcf12
JB
1297 if (
1298 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1299 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1300 ((this.opts.enableTasksQueue === false &&
aa9eede8 1301 workerUsage.tasks.executing === 0) ||
7b56f532 1302 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1303 workerUsage.tasks.executing === 0 &&
1304 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1305 ) {
041dc05b 1306 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1307 this.emitter?.emit(PoolEvents.error, error)
1308 })
930dcf12
JB
1309 }
1310 })
46b0bb09 1311 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1312 this.sendToWorker(workerNodeKey, {
e9dd5b66 1313 checkActive: true
21f710aa 1314 })
72ae84a2
JB
1315 if (this.taskFunctions.size > 0) {
1316 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1317 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1318 taskFunctionOperation: 'add',
1319 taskFunctionName,
1320 taskFunction: taskFunction.toString()
1321 }).catch(error => {
1322 this.emitter?.emit(PoolEvents.error, error)
1323 })
1324 }
1325 }
b5e113f6 1326 workerInfo.dynamic = true
b1aae695
JB
1327 if (
1328 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1329 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1330 ) {
b5e113f6
JB
1331 workerInfo.ready = true
1332 }
33e6bb4c 1333 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1334 return workerNodeKey
930dcf12
JB
1335 }
1336
a2ed5053 1337 /**
aa9eede8 1338 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1339 *
aa9eede8 1340 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1341 * @param listener - The message listener callback.
1342 */
85aeb3f3
JB
1343 protected abstract registerWorkerMessageListener<
1344 Message extends Data | Response
aa9eede8
JB
1345 >(
1346 workerNodeKey: number,
1347 listener: (message: MessageValue<Message>) => void
1348 ): void
a2ed5053
JB
1349
1350 /**
aa9eede8 1351 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1352 * Can be overridden.
1353 *
aa9eede8 1354 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1355 */
aa9eede8 1356 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1357 // Listen to worker messages.
aa9eede8 1358 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1359 // Send the startup message to worker.
aa9eede8 1360 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1361 // Send the statistics message to worker.
1362 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1363 if (this.opts.enableTasksQueue === true) {
dbd73092 1364 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1365 this.workerNodes[workerNodeKey].onEmptyQueue =
1366 this.taskStealingOnEmptyQueue.bind(this)
1367 }
1368 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1369 this.workerNodes[workerNodeKey].onBackPressure =
1370 this.tasksStealingOnBackPressure.bind(this)
1371 }
72695f86 1372 }
d2c73f82
JB
1373 }
1374
85aeb3f3 1375 /**
aa9eede8
JB
1376 * Sends the startup message to worker given its worker node key.
1377 *
1378 * @param workerNodeKey - The worker node key.
1379 */
1380 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1381
1382 /**
9edb9717 1383 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1384 *
aa9eede8 1385 * @param workerNodeKey - The worker node key.
85aeb3f3 1386 */
9edb9717 1387 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1388 this.sendToWorker(workerNodeKey, {
1389 statistics: {
1390 runTime:
1391 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1392 .runTime.aggregate,
1393 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1394 .elu.aggregate
72ae84a2 1395 }
aa9eede8
JB
1396 })
1397 }
a2ed5053
JB
1398
1399 private redistributeQueuedTasks (workerNodeKey: number): void {
1400 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1401 const destinationWorkerNodeKey = this.workerNodes.reduce(
1402 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1403 return workerNode.info.ready &&
1404 workerNode.usage.tasks.queued <
1405 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1406 ? workerNodeKey
1407 : minWorkerNodeKey
1408 },
1409 0
1410 )
72ae84a2 1411 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1412 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1413 this.executeTask(destinationWorkerNodeKey, task)
1414 } else {
1415 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1416 }
1417 }
1418 }
1419
b1838604
JB
1420 private updateTaskStolenStatisticsWorkerUsage (
1421 workerNodeKey: number,
b1838604
JB
1422 taskName: string
1423 ): void {
1a880eca 1424 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1425 if (workerNode?.usage != null) {
1426 ++workerNode.usage.tasks.stolen
1427 }
1428 if (
1429 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1430 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1431 ) {
1432 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1433 taskName
1434 ) as WorkerUsage
1435 ++taskFunctionWorkerUsage.tasks.stolen
1436 }
1437 }
1438
dd951876 1439 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1440 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1441 const workerNodes = this.workerNodes
a6b3272b 1442 .slice()
dd951876
JB
1443 .sort(
1444 (workerNodeA, workerNodeB) =>
1445 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1446 )
f201a0cd 1447 const sourceWorkerNode = workerNodes.find(
041dc05b 1448 workerNode =>
f201a0cd
JB
1449 workerNode.info.ready &&
1450 workerNode.info.id !== workerId &&
1451 workerNode.usage.tasks.queued > 0
1452 )
1453 if (sourceWorkerNode != null) {
72ae84a2 1454 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1455 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1456 this.executeTask(destinationWorkerNodeKey, task)
1457 } else {
1458 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1459 }
f201a0cd
JB
1460 this.updateTaskStolenStatisticsWorkerUsage(
1461 destinationWorkerNodeKey,
1462 task.name as string
1463 )
72695f86
JB
1464 }
1465 }
1466
1467 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1468 const sizeOffset = 1
1469 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1470 return
1471 }
72695f86
JB
1472 const sourceWorkerNode =
1473 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1474 const workerNodes = this.workerNodes
a6b3272b 1475 .slice()
72695f86
JB
1476 .sort(
1477 (workerNodeA, workerNodeB) =>
1478 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1479 )
1480 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1481 if (
0bc68267 1482 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1483 workerNode.info.ready &&
1484 workerNode.info.id !== workerId &&
0bc68267 1485 workerNode.usage.tasks.queued <
f778c355 1486 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1487 ) {
72ae84a2 1488 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1489 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1490 this.executeTask(workerNodeKey, task)
4de3d785 1491 } else {
dd951876 1492 this.enqueueTask(workerNodeKey, task)
4de3d785 1493 }
b1838604
JB
1494 this.updateTaskStolenStatisticsWorkerUsage(
1495 workerNodeKey,
b1838604
JB
1496 task.name as string
1497 )
10ecf8fd 1498 }
a2ed5053
JB
1499 }
1500 }
1501
be0676b3 1502 /**
aa9eede8 1503 * This method is the listener registered for each worker message.
be0676b3 1504 *
bdacc2d2 1505 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1506 */
1507 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1508 return message => {
21f710aa 1509 this.checkMessageWorkerId(message)
6703b9f4 1510 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1511 // Worker ready response received from worker
10e2aa7e 1512 this.handleWorkerReadyResponse(message)
7629bdf1 1513 } else if (message.taskId != null) {
81c02522 1514 // Task execution response received from worker
6b272951 1515 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1516 } else if (message.taskFunctionNames != null) {
1517 // Task function names message received from worker
46b0bb09
JB
1518 this.getWorkerInfo(
1519 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1520 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1521 }
1522 }
1523 }
1524
10e2aa7e 1525 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1526 if (message.ready === false) {
72ae84a2
JB
1527 throw new Error(
1528 `Worker ${message.workerId as number} failed to initialize`
1529 )
f05ed93c 1530 }
a5d15204 1531 const workerInfo = this.getWorkerInfo(
aad6fb64 1532 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1533 )
a5d15204 1534 workerInfo.ready = message.ready as boolean
6703b9f4 1535 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1536 if (this.ready) {
1537 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1538 }
6b272951
JB
1539 }
1540
1541 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1542 const { taskId, workerError, data } = message
5441aea6 1543 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1544 if (promiseResponse != null) {
6703b9f4
JB
1545 if (workerError != null) {
1546 this.emitter?.emit(PoolEvents.taskError, workerError)
1547 promiseResponse.reject(workerError.message)
6b272951 1548 } else {
5441aea6 1549 promiseResponse.resolve(data as Response)
6b272951 1550 }
501aea93
JB
1551 const workerNodeKey = promiseResponse.workerNodeKey
1552 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1553 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1554 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1555 if (
1556 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1557 this.tasksQueueSize(workerNodeKey) > 0 &&
1558 this.workerNodes[workerNodeKey].usage.tasks.executing <
1559 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1560 ) {
1561 this.executeTask(
1562 workerNodeKey,
1563 this.dequeueTask(workerNodeKey) as Task<Data>
1564 )
be0676b3
APA
1565 }
1566 }
be0676b3 1567 }
7c0ba920 1568
a1763c54 1569 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1570 if (this.busy) {
1571 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1572 }
1573 }
1574
1575 private checkAndEmitTaskQueuingEvents (): void {
1576 if (this.hasBackPressure()) {
1577 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1578 }
1579 }
1580
33e6bb4c
JB
1581 private checkAndEmitDynamicWorkerCreationEvents (): void {
1582 if (this.type === PoolTypes.dynamic) {
1583 if (this.full) {
1584 this.emitter?.emit(PoolEvents.full, this.info)
1585 }
1586 }
1587 }
1588
8a1260a3 1589 /**
aa9eede8 1590 * Gets the worker information given its worker node key.
8a1260a3
JB
1591 *
1592 * @param workerNodeKey - The worker node key.
3f09ed9f 1593 * @returns The worker information.
8a1260a3 1594 */
46b0bb09
JB
1595 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1596 return this.workerNodes[workerNodeKey].info
e221309a
JB
1597 }
1598
a05c10de 1599 /**
b0a4db63 1600 * Adds the given worker in the pool worker nodes.
ea7a90d3 1601 *
38e795c1 1602 * @param worker - The worker.
aa9eede8
JB
1603 * @returns The added worker node key.
1604 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1605 */
b0a4db63 1606 private addWorkerNode (worker: Worker): number {
671d5154
JB
1607 const workerNode = new WorkerNode<Worker, Data>(
1608 worker,
ff3f866a 1609 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1610 )
b97d82d8 1611 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1612 if (this.starting) {
1613 workerNode.info.ready = true
1614 }
aa9eede8 1615 this.workerNodes.push(workerNode)
aad6fb64 1616 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1617 if (workerNodeKey === -1) {
86ed0598 1618 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1619 }
1620 return workerNodeKey
ea7a90d3 1621 }
c923ce56 1622
51fe3d3c 1623 /**
f06e48d8 1624 * Removes the given worker from the pool worker nodes.
51fe3d3c 1625 *
f06e48d8 1626 * @param worker - The worker.
51fe3d3c 1627 */
416fd65c 1628 private removeWorkerNode (worker: Worker): void {
aad6fb64 1629 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1630 if (workerNodeKey !== -1) {
1631 this.workerNodes.splice(workerNodeKey, 1)
1632 this.workerChoiceStrategyContext.remove(workerNodeKey)
1633 }
51fe3d3c 1634 }
adc3c320 1635
e2b31e32
JB
1636 /** @inheritDoc */
1637 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1638 return (
e2b31e32
JB
1639 this.opts.enableTasksQueue === true &&
1640 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1641 )
1642 }
1643
1644 private hasBackPressure (): boolean {
1645 return (
1646 this.opts.enableTasksQueue === true &&
1647 this.workerNodes.findIndex(
041dc05b 1648 workerNode => !workerNode.hasBackPressure()
a1763c54 1649 ) === -1
9e844245 1650 )
e2b31e32
JB
1651 }
1652
b0a4db63 1653 /**
aa9eede8 1654 * Executes the given task on the worker given its worker node key.
b0a4db63 1655 *
aa9eede8 1656 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1657 * @param task - The task to execute.
1658 */
2e81254d 1659 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1660 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1661 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1662 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1663 }
1664
f9f00b5f 1665 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1666 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1667 this.checkAndEmitTaskQueuingEvents()
1668 return tasksQueueSize
adc3c320
JB
1669 }
1670
416fd65c 1671 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1672 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1673 }
1674
416fd65c 1675 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1676 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1677 }
1678
81c02522 1679 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1680 while (this.tasksQueueSize(workerNodeKey) > 0) {
1681 this.executeTask(
1682 workerNodeKey,
1683 this.dequeueTask(workerNodeKey) as Task<Data>
1684 )
ff733df7 1685 }
4b628b48 1686 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1687 }
1688
ef41a6e6
JB
1689 private flushTasksQueues (): void {
1690 for (const [workerNodeKey] of this.workerNodes.entries()) {
1691 this.flushTasksQueue(workerNodeKey)
1692 }
1693 }
c97c7edb 1694}