build(deps-dev): apply updates
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
d91689fd 20 once,
bfc75cca 21 round
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
4d159167
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
9f95d5eb 38 WorkerNodeEventDetail,
4d159167
JB
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
bde6b5d7
JB
52import {
53 checkFilePath,
54 checkValidTasksQueueOptions,
bfc75cca
JB
55 checkValidWorkerChoiceStrategy,
56 updateMeasurementStatistics
bde6b5d7 57} from './utils'
23ccf9d7 58
729c563d 59/**
ea7a90d3 60 * Base class that implements some shared logic for all poolifier pools.
729c563d 61 *
38e795c1 62 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
63 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
64 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 65 */
c97c7edb 66export abstract class AbstractPool<
f06e48d8 67 Worker extends IWorker,
d3c8a1a8
S
68 Data = unknown,
69 Response = unknown
c4855468 70> implements IPool<Worker, Data, Response> {
afc003b2 71 /** @inheritDoc */
4b628b48 72 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 73
afc003b2 74 /** @inheritDoc */
f80125ca 75 public emitter?: EventEmitterAsyncResource
7c0ba920 76
fcd39179
JB
77 /**
78 * Dynamic pool maximum size property placeholder.
79 */
80 protected readonly max?: number
81
be0676b3 82 /**
419e8121 83 * The task execution response promise map:
2740a743 84 * - `key`: The message id of each submitted task.
a3445496 85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 86 *
a3445496 87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 88 */
501aea93
JB
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 91
a35560ba 92 /**
51fe3d3c 93 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
94 */
95 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
96 Worker,
97 Data,
98 Response
a35560ba
S
99 >
100
72ae84a2
JB
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
15b176e0
JB
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
47352846
JB
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
711623b8
JB
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
afe0d5bf
JB
120 /**
121 * The start timestamp of the pool.
122 */
123 private readonly startTimestamp
124
729c563d
S
125 /**
126 * Constructs a new poolifier pool.
127 *
38e795c1 128 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 129 * @param filePath - Path to the worker file.
38e795c1 130 * @param opts - Options for the pool.
729c563d 131 */
c97c7edb 132 public constructor (
b4213b7f
JB
133 protected readonly numberOfWorkers: number,
134 protected readonly filePath: string,
135 protected readonly opts: PoolOptions<Worker>
c97c7edb 136 ) {
78cea37e 137 if (!this.isMain()) {
04f45163 138 throw new Error(
8c6d4acf 139 'Cannot start a pool from a worker with the same type as the pool'
04f45163 140 )
c97c7edb 141 }
bde6b5d7 142 checkFilePath(this.filePath)
8003c026 143 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 144 this.checkPoolOptions(this.opts)
1086026a 145
7254e419
JB
146 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
147 this.executeTask = this.executeTask.bind(this)
148 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 149
6bd72cd0 150 if (this.opts.enableEvents === true) {
b5604034 151 this.initializeEventEmitter()
7c0ba920 152 }
d59df138
JB
153 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
154 Worker,
155 Data,
156 Response
da309861
JB
157 >(
158 this,
159 this.opts.workerChoiceStrategy,
160 this.opts.workerChoiceStrategyOptions
161 )
b6b32453
JB
162
163 this.setupHook()
164
72ae84a2
JB
165 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
166
47352846 167 this.started = false
075e51d1 168 this.starting = false
711623b8 169 this.destroying = false
47352846
JB
170 if (this.opts.startWorkers === true) {
171 this.start()
172 }
afe0d5bf
JB
173
174 this.startTimestamp = performance.now()
c97c7edb
S
175 }
176
8d3782fa
JB
177 private checkNumberOfWorkers (numberOfWorkers: number): void {
178 if (numberOfWorkers == null) {
179 throw new Error(
180 'Cannot instantiate a pool without specifying the number of workers'
181 )
78cea37e 182 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 183 throw new TypeError(
0d80593b 184 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
185 )
186 } else if (numberOfWorkers < 0) {
473c717a 187 throw new RangeError(
8d3782fa
JB
188 'Cannot instantiate a pool with a negative number of workers'
189 )
6b27d407 190 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
191 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
192 }
193 }
194
7c0ba920 195 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 196 if (isPlainObject(opts)) {
47352846 197 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 198 checkValidWorkerChoiceStrategy(
2324f8c9
JB
199 opts.workerChoiceStrategy as WorkerChoiceStrategy
200 )
0d80593b
JB
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
203 this.checkValidWorkerChoiceStrategyOptions(
204 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
205 )
8990357d
JB
206 this.opts.workerChoiceStrategyOptions = {
207 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
208 ...opts.workerChoiceStrategyOptions
209 }
1f68cede 210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
bde6b5d7 214 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
215 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 }
219 } else {
220 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 221 }
aee46736
JB
222 }
223
0d80593b
JB
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
226 ): void {
2324f8c9
JB
227 if (
228 workerChoiceStrategyOptions != null &&
229 !isPlainObject(workerChoiceStrategyOptions)
230 ) {
0d80593b
JB
231 throw new TypeError(
232 'Invalid worker choice strategy options: must be a plain object'
233 )
234 }
8990357d 235 if (
2324f8c9 236 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 237 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
238 ) {
239 throw new TypeError(
8c0b113f 240 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
241 )
242 }
243 if (
2324f8c9 244 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 245 workerChoiceStrategyOptions.retries < 0
8990357d
JB
246 ) {
247 throw new RangeError(
8c0b113f 248 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
249 )
250 }
49be33fe 251 if (
2324f8c9 252 workerChoiceStrategyOptions?.weights != null &&
6b27d407 253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
f0d7f803 259 if (
2324f8c9 260 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
0d80593b
JB
269 }
270
b5604034 271 private initializeEventEmitter (): void {
5b7d00b4 272 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
273 name: `poolifier:${this.type}-${this.worker}-pool`
274 })
275 }
276
08f3f44c 277 /** @inheritDoc */
6b27d407
JB
278 public get info (): PoolInfo {
279 return {
23ccf9d7 280 version,
6b27d407 281 type: this.type,
184855e6 282 worker: this.worker,
47352846 283 started: this.started,
2431bdb4
JB
284 ready: this.ready,
285 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
286 minSize: this.minSize,
287 maxSize: this.maxSize,
c05f0d50
JB
288 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
289 .runTime.aggregate &&
1305e9a8
JB
290 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
291 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
292 workerNodes: this.workerNodes.length,
293 idleWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 workerNode.usage.tasks.executing === 0
a4e07f72
JB
296 ? accumulator + 1
297 : accumulator,
6b27d407
JB
298 0
299 ),
300 busyWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
f59e1027 302 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
303 0
304 ),
a4e07f72 305 executedTasks: this.workerNodes.reduce(
6b27d407 306 (accumulator, workerNode) =>
f59e1027 307 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
308 0
309 ),
310 executingTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
f59e1027 312 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
313 0
314 ),
daf86646
JB
315 ...(this.opts.enableTasksQueue === true && {
316 queuedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.tasks.queued,
319 0
320 )
321 }),
322 ...(this.opts.enableTasksQueue === true && {
323 maxQueuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
326 0
327 )
328 }),
a1763c54
JB
329 ...(this.opts.enableTasksQueue === true && {
330 backPressure: this.hasBackPressure()
331 }),
68cbdc84
JB
332 ...(this.opts.enableTasksQueue === true && {
333 stolenTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
335 accumulator + workerNode.usage.tasks.stolen,
336 0
337 )
338 }),
a4e07f72
JB
339 failedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
f59e1027 341 accumulator + workerNode.usage.tasks.failed,
a4e07f72 342 0
1dcf8b7b
JB
343 ),
344 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
345 .runTime.aggregate && {
346 runTime: {
98e72cda 347 minimum: round(
90d6701c 348 min(
98e72cda 349 ...this.workerNodes.map(
041dc05b 350 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 351 )
1dcf8b7b
JB
352 )
353 ),
98e72cda 354 maximum: round(
90d6701c 355 max(
98e72cda 356 ...this.workerNodes.map(
041dc05b 357 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 358 )
1dcf8b7b 359 )
98e72cda 360 ),
3baa0837
JB
361 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .runTime.average && {
363 average: round(
364 average(
365 this.workerNodes.reduce<number[]>(
366 (accumulator, workerNode) =>
367 accumulator.concat(workerNode.usage.runTime.history),
368 []
369 )
98e72cda 370 )
dc021bcc 371 )
3baa0837 372 }),
98e72cda
JB
373 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
374 .runTime.median && {
375 median: round(
376 median(
3baa0837
JB
377 this.workerNodes.reduce<number[]>(
378 (accumulator, workerNode) =>
379 accumulator.concat(workerNode.usage.runTime.history),
380 []
98e72cda
JB
381 )
382 )
383 )
384 })
1dcf8b7b
JB
385 }
386 }),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .waitTime.aggregate && {
389 waitTime: {
98e72cda 390 minimum: round(
90d6701c 391 min(
98e72cda 392 ...this.workerNodes.map(
041dc05b 393 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 394 )
1dcf8b7b
JB
395 )
396 ),
98e72cda 397 maximum: round(
90d6701c 398 max(
98e72cda 399 ...this.workerNodes.map(
041dc05b 400 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 401 )
1dcf8b7b 402 )
98e72cda 403 ),
3baa0837
JB
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.average && {
406 average: round(
407 average(
408 this.workerNodes.reduce<number[]>(
409 (accumulator, workerNode) =>
410 accumulator.concat(workerNode.usage.waitTime.history),
411 []
412 )
98e72cda 413 )
dc021bcc 414 )
3baa0837 415 }),
98e72cda
JB
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .waitTime.median && {
418 median: round(
419 median(
3baa0837
JB
420 this.workerNodes.reduce<number[]>(
421 (accumulator, workerNode) =>
422 accumulator.concat(workerNode.usage.waitTime.history),
423 []
98e72cda
JB
424 )
425 )
426 )
427 })
1dcf8b7b
JB
428 }
429 })
6b27d407
JB
430 }
431 }
08f3f44c 432
aa9eede8
JB
433 /**
434 * The pool readiness boolean status.
435 */
2431bdb4
JB
436 private get ready (): boolean {
437 return (
b97d82d8
JB
438 this.workerNodes.reduce(
439 (accumulator, workerNode) =>
440 !workerNode.info.dynamic && workerNode.info.ready
441 ? accumulator + 1
442 : accumulator,
443 0
444 ) >= this.minSize
2431bdb4
JB
445 )
446 }
447
afe0d5bf 448 /**
aa9eede8 449 * The approximate pool utilization.
afe0d5bf
JB
450 *
451 * @returns The pool utilization.
452 */
453 private get utilization (): number {
8e5ca040 454 const poolTimeCapacity =
fe7d90db 455 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
456 const totalTasksRunTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
71514351 458 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
459 0
460 )
461 const totalTasksWaitTime = this.workerNodes.reduce(
462 (accumulator, workerNode) =>
71514351 463 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
464 0
465 )
8e5ca040 466 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
467 }
468
8881ae32 469 /**
aa9eede8 470 * The pool type.
8881ae32
JB
471 *
472 * If it is `'dynamic'`, it provides the `max` property.
473 */
474 protected abstract get type (): PoolType
475
184855e6 476 /**
aa9eede8 477 * The worker type.
184855e6
JB
478 */
479 protected abstract get worker (): WorkerType
480
c2ade475 481 /**
aa9eede8 482 * The pool minimum size.
c2ade475 483 */
8735b4e5
JB
484 protected get minSize (): number {
485 return this.numberOfWorkers
486 }
ff733df7
JB
487
488 /**
aa9eede8 489 * The pool maximum size.
ff733df7 490 */
8735b4e5
JB
491 protected get maxSize (): number {
492 return this.max ?? this.numberOfWorkers
493 }
a35560ba 494
6b813701
JB
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
4d159167 501 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
502 if (message.workerId == null) {
503 throw new Error('Worker message received without worker id')
8e5a7cf9 504 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
505 throw new Error(
506 `Worker message received from unknown worker '${message.workerId}'`
507 )
508 }
509 }
510
ffcbbad8 511 /**
f06e48d8 512 * Gets the given worker its worker node key.
ffcbbad8
JB
513 *
514 * @param worker - The worker.
f59e1027 515 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 516 */
aad6fb64 517 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 518 return this.workerNodes.findIndex(
041dc05b 519 workerNode => workerNode.worker === worker
f06e48d8 520 )
bf9549ae
JB
521 }
522
aa9eede8
JB
523 /**
524 * Gets the worker node key given its worker id.
525 *
526 * @param workerId - The worker id.
aad6fb64 527 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 528 */
72ae84a2 529 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 530 return this.workerNodes.findIndex(
041dc05b 531 workerNode => workerNode.info.id === workerId
aad6fb64 532 )
aa9eede8
JB
533 }
534
afc003b2 535 /** @inheritDoc */
a35560ba 536 public setWorkerChoiceStrategy (
59219cbb
JB
537 workerChoiceStrategy: WorkerChoiceStrategy,
538 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 539 ): void {
bde6b5d7 540 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 541 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
542 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
543 this.opts.workerChoiceStrategy
544 )
545 if (workerChoiceStrategyOptions != null) {
546 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
547 }
aa9eede8 548 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 549 workerNode.resetUsage()
9edb9717 550 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 551 }
a20f0ba5
JB
552 }
553
554 /** @inheritDoc */
555 public setWorkerChoiceStrategyOptions (
556 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
557 ): void {
0d80593b 558 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
559 this.opts.workerChoiceStrategyOptions = {
560 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
561 ...workerChoiceStrategyOptions
562 }
a20f0ba5
JB
563 this.workerChoiceStrategyContext.setOptions(
564 this.opts.workerChoiceStrategyOptions
a35560ba
S
565 )
566 }
567
a20f0ba5 568 /** @inheritDoc */
8f52842f
JB
569 public enableTasksQueue (
570 enable: boolean,
571 tasksQueueOptions?: TasksQueueOptions
572 ): void {
a20f0ba5 573 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
ef41a6e6 576 this.flushTasksQueues()
a20f0ba5
JB
577 }
578 this.opts.enableTasksQueue = enable
8f52842f 579 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
580 }
581
582 /** @inheritDoc */
8f52842f 583 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 584 if (this.opts.enableTasksQueue === true) {
bde6b5d7 585 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
586 this.opts.tasksQueueOptions =
587 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 588 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
589 if (this.opts.tasksQueueOptions.taskStealing === true) {
590 this.setTaskStealing()
591 } else {
592 this.unsetTaskStealing()
593 }
594 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
595 this.setTasksStealingOnBackPressure()
596 } else {
597 this.unsetTasksStealingOnBackPressure()
598 }
5baee0d7 599 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
600 delete this.opts.tasksQueueOptions
601 }
602 }
603
9b38ab2d
JB
604 private buildTasksQueueOptions (
605 tasksQueueOptions: TasksQueueOptions
606 ): TasksQueueOptions {
607 return {
608 ...{
609 size: Math.pow(this.maxSize, 2),
610 concurrency: 1,
611 taskStealing: true,
612 tasksStealingOnBackPressure: true
613 },
614 ...tasksQueueOptions
615 }
616 }
617
5b49e864 618 private setTasksQueueSize (size: number): void {
20c6f652 619 for (const workerNode of this.workerNodes) {
ff3f866a 620 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
621 }
622 }
623
d6ca1416
JB
624 private setTaskStealing (): void {
625 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
626 this.workerNodes[workerNodeKey].addEventListener(
627 'emptyqueue',
9b85e2d0 628 this.handleEmptyQueueEvent as EventListener
9f95d5eb 629 )
d6ca1416
JB
630 }
631 }
632
633 private unsetTaskStealing (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
635 this.workerNodes[workerNodeKey].removeEventListener(
636 'emptyqueue',
9b85e2d0 637 this.handleEmptyQueueEvent as EventListener
9f95d5eb 638 )
d6ca1416
JB
639 }
640 }
641
642 private setTasksStealingOnBackPressure (): void {
643 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
644 this.workerNodes[workerNodeKey].addEventListener(
645 'backpressure',
9b85e2d0 646 this.handleBackPressureEvent as EventListener
9f95d5eb 647 )
d6ca1416
JB
648 }
649 }
650
651 private unsetTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb
JB
653 this.workerNodes[workerNodeKey].removeEventListener(
654 'backpressure',
9b85e2d0 655 this.handleBackPressureEvent as EventListener
9f95d5eb 656 )
d6ca1416
JB
657 }
658 }
659
c319c66b
JB
660 /**
661 * Whether the pool is full or not.
662 *
663 * The pool filling boolean status.
664 */
dea903a8
JB
665 protected get full (): boolean {
666 return this.workerNodes.length >= this.maxSize
667 }
c2ade475 668
c319c66b
JB
669 /**
670 * Whether the pool is busy or not.
671 *
672 * The pool busyness boolean status.
673 */
674 protected abstract get busy (): boolean
7c0ba920 675
6c6afb84 676 /**
3d76750a 677 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
678 *
679 * @returns Worker nodes busyness boolean status.
680 */
c2ade475 681 protected internalBusy (): boolean {
3d76750a
JB
682 if (this.opts.enableTasksQueue === true) {
683 return (
684 this.workerNodes.findIndex(
041dc05b 685 workerNode =>
3d76750a
JB
686 workerNode.info.ready &&
687 workerNode.usage.tasks.executing <
688 (this.opts.tasksQueueOptions?.concurrency as number)
689 ) === -1
690 )
3d76750a 691 }
419e8121
JB
692 return (
693 this.workerNodes.findIndex(
694 workerNode =>
695 workerNode.info.ready && workerNode.usage.tasks.executing === 0
696 ) === -1
697 )
cb70b19d
JB
698 }
699
e81c38f2 700 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
701 workerNodeKey: number,
702 message: MessageValue<Data>
703 ): Promise<boolean> {
72ae84a2 704 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
705 const taskFunctionOperationListener = (
706 message: MessageValue<Response>
707 ): void => {
708 this.checkMessageWorkerId(message)
709 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 710 if (
ae036c3e
JB
711 message.taskFunctionOperationStatus != null &&
712 message.workerId === workerId
72ae84a2 713 ) {
ae036c3e
JB
714 if (message.taskFunctionOperationStatus) {
715 resolve(true)
716 } else if (!message.taskFunctionOperationStatus) {
717 reject(
718 new Error(
719 `Task function operation '${
720 message.taskFunctionOperation as string
721 }' failed on worker ${message.workerId} with error: '${
722 message.workerError?.message as string
723 }'`
724 )
72ae84a2 725 )
ae036c3e
JB
726 }
727 this.deregisterWorkerMessageListener(
728 this.getWorkerNodeKeyByWorkerId(message.workerId),
729 taskFunctionOperationListener
72ae84a2
JB
730 )
731 }
ae036c3e
JB
732 }
733 this.registerWorkerMessageListener(
734 workerNodeKey,
735 taskFunctionOperationListener
736 )
72ae84a2
JB
737 this.sendToWorker(workerNodeKey, message)
738 })
739 }
740
741 private async sendTaskFunctionOperationToWorkers (
adee6053 742 message: MessageValue<Data>
e81c38f2
JB
743 ): Promise<boolean> {
744 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
745 const responsesReceived = new Array<MessageValue<Response>>()
746 const taskFunctionOperationsListener = (
747 message: MessageValue<Response>
748 ): void => {
749 this.checkMessageWorkerId(message)
750 if (message.taskFunctionOperationStatus != null) {
751 responsesReceived.push(message)
752 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 753 if (
e81c38f2
JB
754 responsesReceived.every(
755 message => message.taskFunctionOperationStatus === true
756 )
757 ) {
758 resolve(true)
759 } else if (
e81c38f2
JB
760 responsesReceived.some(
761 message => message.taskFunctionOperationStatus === false
762 )
763 ) {
b0b55f57
JB
764 const errorResponse = responsesReceived.find(
765 response => response.taskFunctionOperationStatus === false
766 )
e81c38f2
JB
767 reject(
768 new Error(
b0b55f57 769 `Task function operation '${
e81c38f2 770 message.taskFunctionOperation as string
b0b55f57
JB
771 }' failed on worker ${
772 errorResponse?.workerId as number
773 } with error: '${
774 errorResponse?.workerError?.message as string
775 }'`
e81c38f2
JB
776 )
777 )
778 }
ae036c3e
JB
779 this.deregisterWorkerMessageListener(
780 this.getWorkerNodeKeyByWorkerId(message.workerId),
781 taskFunctionOperationsListener
782 )
e81c38f2 783 }
ae036c3e
JB
784 }
785 }
786 for (const [workerNodeKey] of this.workerNodes.entries()) {
787 this.registerWorkerMessageListener(
788 workerNodeKey,
789 taskFunctionOperationsListener
790 )
72ae84a2 791 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
792 }
793 })
6703b9f4
JB
794 }
795
796 /** @inheritDoc */
797 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
798 for (const workerNode of this.workerNodes) {
799 if (
800 Array.isArray(workerNode.info.taskFunctionNames) &&
801 workerNode.info.taskFunctionNames.includes(name)
802 ) {
803 return true
804 }
805 }
806 return false
6703b9f4
JB
807 }
808
809 /** @inheritDoc */
e81c38f2
JB
810 public async addTaskFunction (
811 name: string,
3feeab69 812 fn: TaskFunction<Data, Response>
e81c38f2 813 ): Promise<boolean> {
3feeab69
JB
814 if (typeof name !== 'string') {
815 throw new TypeError('name argument must be a string')
816 }
817 if (typeof name === 'string' && name.trim().length === 0) {
818 throw new TypeError('name argument must not be an empty string')
819 }
820 if (typeof fn !== 'function') {
821 throw new TypeError('fn argument must be a function')
822 }
adee6053 823 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
824 taskFunctionOperation: 'add',
825 taskFunctionName: name,
3feeab69 826 taskFunction: fn.toString()
6703b9f4 827 })
adee6053
JB
828 this.taskFunctions.set(name, fn)
829 return opResult
6703b9f4
JB
830 }
831
832 /** @inheritDoc */
e81c38f2 833 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
834 if (!this.taskFunctions.has(name)) {
835 throw new Error(
16248b23 836 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
837 )
838 }
adee6053 839 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
840 taskFunctionOperation: 'remove',
841 taskFunctionName: name
842 })
adee6053
JB
843 this.deleteTaskFunctionWorkerUsages(name)
844 this.taskFunctions.delete(name)
845 return opResult
6703b9f4
JB
846 }
847
90d7d101 848 /** @inheritDoc */
6703b9f4 849 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
850 for (const workerNode of this.workerNodes) {
851 if (
6703b9f4
JB
852 Array.isArray(workerNode.info.taskFunctionNames) &&
853 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 854 ) {
6703b9f4 855 return workerNode.info.taskFunctionNames
f2dbbf95 856 }
90d7d101 857 }
f2dbbf95 858 return []
90d7d101
JB
859 }
860
6703b9f4 861 /** @inheritDoc */
e81c38f2 862 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 863 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
864 taskFunctionOperation: 'default',
865 taskFunctionName: name
866 })
6703b9f4
JB
867 }
868
adee6053
JB
869 private deleteTaskFunctionWorkerUsages (name: string): void {
870 for (const workerNode of this.workerNodes) {
871 workerNode.deleteTaskFunctionWorkerUsage(name)
872 }
873 }
874
375f7504
JB
875 private shallExecuteTask (workerNodeKey: number): boolean {
876 return (
877 this.tasksQueueSize(workerNodeKey) === 0 &&
878 this.workerNodes[workerNodeKey].usage.tasks.executing <
879 (this.opts.tasksQueueOptions?.concurrency as number)
880 )
881 }
882
afc003b2 883 /** @inheritDoc */
7d91a8cd
JB
884 public async execute (
885 data?: Data,
886 name?: string,
887 transferList?: TransferListItem[]
888 ): Promise<Response> {
52b71763 889 return await new Promise<Response>((resolve, reject) => {
15b176e0 890 if (!this.started) {
47352846 891 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 892 return
15b176e0 893 }
711623b8
JB
894 if (this.destroying) {
895 reject(new Error('Cannot execute a task on destroying pool'))
896 return
897 }
7d91a8cd
JB
898 if (name != null && typeof name !== 'string') {
899 reject(new TypeError('name argument must be a string'))
9d2d0da1 900 return
7d91a8cd 901 }
90d7d101
JB
902 if (
903 name != null &&
904 typeof name === 'string' &&
905 name.trim().length === 0
906 ) {
f58b60b9 907 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 908 return
90d7d101 909 }
b558f6b5
JB
910 if (transferList != null && !Array.isArray(transferList)) {
911 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 912 return
b558f6b5
JB
913 }
914 const timestamp = performance.now()
915 const workerNodeKey = this.chooseWorkerNode()
501aea93 916 const task: Task<Data> = {
52b71763
JB
917 name: name ?? DEFAULT_TASK_NAME,
918 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
919 data: data ?? ({} as Data),
7d91a8cd 920 transferList,
52b71763 921 timestamp,
7629bdf1 922 taskId: randomUUID()
52b71763 923 }
7629bdf1 924 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
925 resolve,
926 reject,
501aea93 927 workerNodeKey
2e81254d 928 })
52b71763 929 if (
4e377863
JB
930 this.opts.enableTasksQueue === false ||
931 (this.opts.enableTasksQueue === true &&
375f7504 932 this.shallExecuteTask(workerNodeKey))
52b71763 933 ) {
501aea93 934 this.executeTask(workerNodeKey, task)
4e377863
JB
935 } else {
936 this.enqueueTask(workerNodeKey, task)
52b71763 937 }
2e81254d 938 })
280c2a77 939 }
c97c7edb 940
47352846
JB
941 /** @inheritdoc */
942 public start (): void {
711623b8
JB
943 if (this.started) {
944 throw new Error('Cannot start an already started pool')
945 }
946 if (this.starting) {
947 throw new Error('Cannot start an already starting pool')
948 }
949 if (this.destroying) {
950 throw new Error('Cannot start a destroying pool')
951 }
47352846
JB
952 this.starting = true
953 while (
954 this.workerNodes.reduce(
955 (accumulator, workerNode) =>
956 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
957 0
958 ) < this.numberOfWorkers
959 ) {
960 this.createAndSetupWorkerNode()
961 }
962 this.starting = false
963 this.started = true
964 }
965
afc003b2 966 /** @inheritDoc */
c97c7edb 967 public async destroy (): Promise<void> {
711623b8
JB
968 if (!this.started) {
969 throw new Error('Cannot destroy an already destroyed pool')
970 }
971 if (this.starting) {
972 throw new Error('Cannot destroy an starting pool')
973 }
974 if (this.destroying) {
975 throw new Error('Cannot destroy an already destroying pool')
976 }
977 this.destroying = true
1fbcaa7c 978 await Promise.all(
81c02522 979 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 980 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
981 })
982 )
33e6bb4c 983 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 984 this.emitter?.emitDestroy()
711623b8 985 this.destroying = false
15b176e0 986 this.started = false
c97c7edb
S
987 }
988
1e3214b6 989 protected async sendKillMessageToWorker (
72ae84a2 990 workerNodeKey: number
1e3214b6 991 ): Promise<void> {
9edb9717 992 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
993 const killMessageListener = (message: MessageValue<Response>): void => {
994 this.checkMessageWorkerId(message)
1e3214b6
JB
995 if (message.kill === 'success') {
996 resolve()
997 } else if (message.kill === 'failure') {
72ae84a2
JB
998 reject(
999 new Error(
ae036c3e 1000 `Kill message handling failed on worker ${
72ae84a2 1001 message.workerId as number
ae036c3e 1002 }`
72ae84a2
JB
1003 )
1004 )
1e3214b6 1005 }
ae036c3e 1006 }
51d9cfbd 1007 // FIXME: should be registered only once
ae036c3e 1008 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1009 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1010 })
1e3214b6
JB
1011 }
1012
4a6952ff 1013 /**
aa9eede8 1014 * Terminates the worker node given its worker node key.
4a6952ff 1015 *
aa9eede8 1016 * @param workerNodeKey - The worker node key.
4a6952ff 1017 */
81c02522 1018 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1019
729c563d 1020 /**
6677a3d3
JB
1021 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1022 * Can be overridden.
afc003b2
JB
1023 *
1024 * @virtual
729c563d 1025 */
280c2a77 1026 protected setupHook (): void {
965df41c 1027 /* Intentionally empty */
280c2a77 1028 }
c97c7edb 1029
729c563d 1030 /**
280c2a77
S
1031 * Should return whether the worker is the main worker or not.
1032 */
1033 protected abstract isMain (): boolean
1034
1035 /**
2e81254d 1036 * Hook executed before the worker task execution.
bf9549ae 1037 * Can be overridden.
729c563d 1038 *
f06e48d8 1039 * @param workerNodeKey - The worker node key.
1c6fe997 1040 * @param task - The task to execute.
729c563d 1041 */
1c6fe997
JB
1042 protected beforeTaskExecutionHook (
1043 workerNodeKey: number,
1044 task: Task<Data>
1045 ): void {
94407def
JB
1046 if (this.workerNodes[workerNodeKey]?.usage != null) {
1047 const workerUsage = this.workerNodes[workerNodeKey].usage
1048 ++workerUsage.tasks.executing
1049 this.updateWaitTimeWorkerUsage(workerUsage, task)
1050 }
1051 if (
1052 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1053 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1054 task.name as string
1055 ) != null
1056 ) {
db0e38ee 1057 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1058 workerNodeKey
db0e38ee 1059 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1060 ++taskFunctionWorkerUsage.tasks.executing
1061 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1062 }
c97c7edb
S
1063 }
1064
c01733f1 1065 /**
2e81254d 1066 * Hook executed after the worker task execution.
bf9549ae 1067 * Can be overridden.
c01733f1 1068 *
501aea93 1069 * @param workerNodeKey - The worker node key.
38e795c1 1070 * @param message - The received message.
c01733f1 1071 */
2e81254d 1072 protected afterTaskExecutionHook (
501aea93 1073 workerNodeKey: number,
2740a743 1074 message: MessageValue<Response>
bf9549ae 1075 ): void {
94407def
JB
1076 if (this.workerNodes[workerNodeKey]?.usage != null) {
1077 const workerUsage = this.workerNodes[workerNodeKey].usage
1078 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1079 this.updateRunTimeWorkerUsage(workerUsage, message)
1080 this.updateEluWorkerUsage(workerUsage, message)
1081 }
1082 if (
1083 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1084 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1085 message.taskPerformance?.name as string
94407def
JB
1086 ) != null
1087 ) {
db0e38ee 1088 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1089 workerNodeKey
db0e38ee 1090 ].getTaskFunctionWorkerUsage(
0628755c 1091 message.taskPerformance?.name as string
b558f6b5 1092 ) as WorkerUsage
db0e38ee
JB
1093 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1094 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1095 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1096 }
1097 }
1098
db0e38ee
JB
1099 /**
1100 * Whether the worker node shall update its task function worker usage or not.
1101 *
1102 * @param workerNodeKey - The worker node key.
1103 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1104 */
1105 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1106 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1107 return (
94407def 1108 workerInfo != null &&
6703b9f4
JB
1109 Array.isArray(workerInfo.taskFunctionNames) &&
1110 workerInfo.taskFunctionNames.length > 2
b558f6b5 1111 )
f1c06930
JB
1112 }
1113
1114 private updateTaskStatisticsWorkerUsage (
1115 workerUsage: WorkerUsage,
1116 message: MessageValue<Response>
1117 ): void {
a4e07f72 1118 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1119 if (
1120 workerTaskStatistics.executing != null &&
1121 workerTaskStatistics.executing > 0
1122 ) {
1123 --workerTaskStatistics.executing
5bb5be17 1124 }
6703b9f4 1125 if (message.workerError == null) {
98e72cda
JB
1126 ++workerTaskStatistics.executed
1127 } else {
a4e07f72 1128 ++workerTaskStatistics.failed
2740a743 1129 }
f8eb0a2a
JB
1130 }
1131
a4e07f72
JB
1132 private updateRunTimeWorkerUsage (
1133 workerUsage: WorkerUsage,
f8eb0a2a
JB
1134 message: MessageValue<Response>
1135 ): void {
6703b9f4 1136 if (message.workerError != null) {
dc021bcc
JB
1137 return
1138 }
e4f20deb
JB
1139 updateMeasurementStatistics(
1140 workerUsage.runTime,
1141 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1142 message.taskPerformance?.runTime ?? 0
e4f20deb 1143 )
f8eb0a2a
JB
1144 }
1145
a4e07f72
JB
1146 private updateWaitTimeWorkerUsage (
1147 workerUsage: WorkerUsage,
1c6fe997 1148 task: Task<Data>
f8eb0a2a 1149 ): void {
1c6fe997
JB
1150 const timestamp = performance.now()
1151 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1152 updateMeasurementStatistics(
1153 workerUsage.waitTime,
1154 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1155 taskWaitTime
e4f20deb 1156 )
c01733f1 1157 }
1158
a4e07f72 1159 private updateEluWorkerUsage (
5df69fab 1160 workerUsage: WorkerUsage,
62c15a68
JB
1161 message: MessageValue<Response>
1162 ): void {
6703b9f4 1163 if (message.workerError != null) {
dc021bcc
JB
1164 return
1165 }
008512c7
JB
1166 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1168 updateMeasurementStatistics(
1169 workerUsage.elu.active,
008512c7 1170 eluTaskStatisticsRequirements,
dc021bcc 1171 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1172 )
1173 updateMeasurementStatistics(
1174 workerUsage.elu.idle,
008512c7 1175 eluTaskStatisticsRequirements,
dc021bcc 1176 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1177 )
008512c7 1178 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1179 if (message.taskPerformance?.elu != null) {
f7510105
JB
1180 if (workerUsage.elu.utilization != null) {
1181 workerUsage.elu.utilization =
1182 (workerUsage.elu.utilization +
1183 message.taskPerformance.elu.utilization) /
1184 2
1185 } else {
1186 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1187 }
62c15a68
JB
1188 }
1189 }
1190 }
1191
280c2a77 1192 /**
f06e48d8 1193 * Chooses a worker node for the next task.
280c2a77 1194 *
6c6afb84 1195 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1196 *
aa9eede8 1197 * @returns The chosen worker node key
280c2a77 1198 */
6c6afb84 1199 private chooseWorkerNode (): number {
930dcf12 1200 if (this.shallCreateDynamicWorker()) {
aa9eede8 1201 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1202 if (
b1aae695 1203 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1204 ) {
aa9eede8 1205 return workerNodeKey
6c6afb84 1206 }
17393ac8 1207 }
930dcf12
JB
1208 return this.workerChoiceStrategyContext.execute()
1209 }
1210
6c6afb84
JB
1211 /**
1212 * Conditions for dynamic worker creation.
1213 *
1214 * @returns Whether to create a dynamic worker or not.
1215 */
1216 private shallCreateDynamicWorker (): boolean {
930dcf12 1217 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1218 }
1219
280c2a77 1220 /**
aa9eede8 1221 * Sends a message to worker given its worker node key.
280c2a77 1222 *
aa9eede8 1223 * @param workerNodeKey - The worker node key.
38e795c1 1224 * @param message - The message.
7d91a8cd 1225 * @param transferList - The optional array of transferable objects.
280c2a77
S
1226 */
1227 protected abstract sendToWorker (
aa9eede8 1228 workerNodeKey: number,
7d91a8cd
JB
1229 message: MessageValue<Data>,
1230 transferList?: TransferListItem[]
280c2a77
S
1231 ): void
1232
729c563d 1233 /**
41344292 1234 * Creates a new worker.
6c6afb84
JB
1235 *
1236 * @returns Newly created worker.
729c563d 1237 */
280c2a77 1238 protected abstract createWorker (): Worker
c97c7edb 1239
4a6952ff 1240 /**
aa9eede8 1241 * Creates a new, completely set up worker node.
4a6952ff 1242 *
aa9eede8 1243 * @returns New, completely set up worker node key.
4a6952ff 1244 */
aa9eede8 1245 protected createAndSetupWorkerNode (): number {
bdacc2d2 1246 const worker = this.createWorker()
280c2a77 1247
fd04474e 1248 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1249 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1250 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1251 worker.on('error', error => {
aad6fb64 1252 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1253 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1254 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1255 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1256 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1257 if (
b6bfca01 1258 this.started &&
9b38ab2d 1259 !this.starting &&
711623b8 1260 !this.destroying &&
9b38ab2d 1261 this.opts.restartWorkerOnError === true
15b176e0 1262 ) {
9b106837 1263 if (workerInfo.dynamic) {
aa9eede8 1264 this.createAndSetupDynamicWorkerNode()
8a1260a3 1265 } else {
aa9eede8 1266 this.createAndSetupWorkerNode()
8a1260a3 1267 }
5baee0d7 1268 }
6d59c3a3 1269 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1270 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1271 }
5baee0d7 1272 })
a35560ba 1273 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1274 worker.once('exit', () => {
f06e48d8 1275 this.removeWorkerNode(worker)
a974afa6 1276 })
280c2a77 1277
aa9eede8 1278 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1279
aa9eede8 1280 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1281
aa9eede8 1282 return workerNodeKey
c97c7edb 1283 }
be0676b3 1284
930dcf12 1285 /**
aa9eede8 1286 * Creates a new, completely set up dynamic worker node.
930dcf12 1287 *
aa9eede8 1288 * @returns New, completely set up dynamic worker node key.
930dcf12 1289 */
aa9eede8
JB
1290 protected createAndSetupDynamicWorkerNode (): number {
1291 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1292 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1293 this.checkMessageWorkerId(message)
aa9eede8
JB
1294 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1295 message.workerId
aad6fb64 1296 )
aa9eede8 1297 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1298 // Kill message received from worker
930dcf12
JB
1299 if (
1300 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1301 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1302 ((this.opts.enableTasksQueue === false &&
aa9eede8 1303 workerUsage.tasks.executing === 0) ||
7b56f532 1304 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1305 workerUsage.tasks.executing === 0 &&
1306 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1307 ) {
f3827d5d 1308 // Flag the worker node as not ready immediately
ae3ab61d 1309 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1310 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1311 this.emitter?.emit(PoolEvents.error, error)
1312 })
930dcf12
JB
1313 }
1314 })
46b0bb09 1315 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1316 this.sendToWorker(workerNodeKey, {
e9dd5b66 1317 checkActive: true
21f710aa 1318 })
72ae84a2
JB
1319 if (this.taskFunctions.size > 0) {
1320 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1321 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1322 taskFunctionOperation: 'add',
1323 taskFunctionName,
1324 taskFunction: taskFunction.toString()
1325 }).catch(error => {
1326 this.emitter?.emit(PoolEvents.error, error)
1327 })
1328 }
1329 }
b5e113f6 1330 workerInfo.dynamic = true
b1aae695
JB
1331 if (
1332 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1333 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1334 ) {
b5e113f6
JB
1335 workerInfo.ready = true
1336 }
33e6bb4c 1337 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1338 return workerNodeKey
930dcf12
JB
1339 }
1340
a2ed5053 1341 /**
aa9eede8 1342 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1343 *
aa9eede8 1344 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1345 * @param listener - The message listener callback.
1346 */
85aeb3f3
JB
1347 protected abstract registerWorkerMessageListener<
1348 Message extends Data | Response
aa9eede8
JB
1349 >(
1350 workerNodeKey: number,
1351 listener: (message: MessageValue<Message>) => void
1352 ): void
a2ed5053 1353
ae036c3e
JB
1354 /**
1355 * Registers once a listener callback on the worker given its worker node key.
1356 *
1357 * @param workerNodeKey - The worker node key.
1358 * @param listener - The message listener callback.
1359 */
1360 protected abstract registerOnceWorkerMessageListener<
1361 Message extends Data | Response
1362 >(
1363 workerNodeKey: number,
1364 listener: (message: MessageValue<Message>) => void
1365 ): void
1366
1367 /**
1368 * Deregisters a listener callback on the worker given its worker node key.
1369 *
1370 * @param workerNodeKey - The worker node key.
1371 * @param listener - The message listener callback.
1372 */
1373 protected abstract deregisterWorkerMessageListener<
1374 Message extends Data | Response
1375 >(
1376 workerNodeKey: number,
1377 listener: (message: MessageValue<Message>) => void
1378 ): void
1379
a2ed5053 1380 /**
aa9eede8 1381 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1382 * Can be overridden.
1383 *
aa9eede8 1384 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1385 */
aa9eede8 1386 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1387 // Listen to worker messages.
fcd39179
JB
1388 this.registerWorkerMessageListener(
1389 workerNodeKey,
1390 this.workerMessageListener.bind(this)
1391 )
85aeb3f3 1392 // Send the startup message to worker.
aa9eede8 1393 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1394 // Send the statistics message to worker.
1395 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1396 if (this.opts.enableTasksQueue === true) {
dbd73092 1397 if (this.opts.tasksQueueOptions?.taskStealing === true) {
9f95d5eb
JB
1398 this.workerNodes[workerNodeKey].addEventListener(
1399 'emptyqueue',
9b85e2d0 1400 this.handleEmptyQueueEvent as EventListener
9f95d5eb 1401 )
47352846
JB
1402 }
1403 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
9f95d5eb
JB
1404 this.workerNodes[workerNodeKey].addEventListener(
1405 'backpressure',
9b85e2d0 1406 this.handleBackPressureEvent as EventListener
9f95d5eb 1407 )
47352846 1408 }
72695f86 1409 }
d2c73f82
JB
1410 }
1411
85aeb3f3 1412 /**
aa9eede8
JB
1413 * Sends the startup message to worker given its worker node key.
1414 *
1415 * @param workerNodeKey - The worker node key.
1416 */
1417 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1418
1419 /**
9edb9717 1420 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1421 *
aa9eede8 1422 * @param workerNodeKey - The worker node key.
85aeb3f3 1423 */
9edb9717 1424 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1425 this.sendToWorker(workerNodeKey, {
1426 statistics: {
1427 runTime:
1428 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1429 .runTime.aggregate,
1430 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1431 .elu.aggregate
72ae84a2 1432 }
aa9eede8
JB
1433 })
1434 }
a2ed5053
JB
1435
1436 private redistributeQueuedTasks (workerNodeKey: number): void {
1437 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1438 const destinationWorkerNodeKey = this.workerNodes.reduce(
1439 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1440 return workerNode.info.ready &&
1441 workerNode.usage.tasks.queued <
1442 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1443 ? workerNodeKey
1444 : minWorkerNodeKey
1445 },
1446 0
1447 )
72ae84a2 1448 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1449 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1450 this.executeTask(destinationWorkerNodeKey, task)
1451 } else {
1452 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1453 }
1454 }
1455 }
1456
b1838604
JB
1457 private updateTaskStolenStatisticsWorkerUsage (
1458 workerNodeKey: number,
b1838604
JB
1459 taskName: string
1460 ): void {
1a880eca 1461 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1462 if (workerNode?.usage != null) {
1463 ++workerNode.usage.tasks.stolen
1464 }
1465 if (
1466 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1467 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1468 ) {
1469 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1470 taskName
1471 ) as WorkerUsage
1472 ++taskFunctionWorkerUsage.tasks.stolen
1473 }
1474 }
1475
9f95d5eb
JB
1476 private readonly handleEmptyQueueEvent = (
1477 event: CustomEvent<WorkerNodeEventDetail>
1478 ): void => {
1479 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1480 event.detail.workerId
1481 )
dd951876 1482 const workerNodes = this.workerNodes
a6b3272b 1483 .slice()
dd951876
JB
1484 .sort(
1485 (workerNodeA, workerNodeB) =>
1486 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1487 )
f201a0cd 1488 const sourceWorkerNode = workerNodes.find(
041dc05b 1489 workerNode =>
f201a0cd 1490 workerNode.info.ready &&
9f95d5eb 1491 workerNode.info.id !== event.detail.workerId &&
f201a0cd
JB
1492 workerNode.usage.tasks.queued > 0
1493 )
1494 if (sourceWorkerNode != null) {
72ae84a2 1495 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1496 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1497 this.executeTask(destinationWorkerNodeKey, task)
1498 } else {
1499 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1500 }
f201a0cd
JB
1501 this.updateTaskStolenStatisticsWorkerUsage(
1502 destinationWorkerNodeKey,
1503 task.name as string
1504 )
72695f86
JB
1505 }
1506 }
1507
9f95d5eb
JB
1508 private readonly handleBackPressureEvent = (
1509 event: CustomEvent<WorkerNodeEventDetail>
1510 ): void => {
f778c355
JB
1511 const sizeOffset = 1
1512 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1513 return
1514 }
72695f86 1515 const sourceWorkerNode =
9f95d5eb 1516 this.workerNodes[this.getWorkerNodeKeyByWorkerId(event.detail.workerId)]
72695f86 1517 const workerNodes = this.workerNodes
a6b3272b 1518 .slice()
72695f86
JB
1519 .sort(
1520 (workerNodeA, workerNodeB) =>
1521 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1522 )
1523 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1524 if (
0bc68267 1525 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1526 workerNode.info.ready &&
9f95d5eb 1527 workerNode.info.id !== event.detail.workerId &&
0bc68267 1528 workerNode.usage.tasks.queued <
f778c355 1529 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1530 ) {
72ae84a2 1531 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1532 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1533 this.executeTask(workerNodeKey, task)
4de3d785 1534 } else {
dd951876 1535 this.enqueueTask(workerNodeKey, task)
4de3d785 1536 }
b1838604
JB
1537 this.updateTaskStolenStatisticsWorkerUsage(
1538 workerNodeKey,
b1838604
JB
1539 task.name as string
1540 )
10ecf8fd 1541 }
a2ed5053
JB
1542 }
1543 }
1544
be0676b3 1545 /**
fcd39179 1546 * This method is the message listener registered on each worker.
be0676b3 1547 */
fcd39179
JB
1548 protected workerMessageListener (message: MessageValue<Response>): void {
1549 this.checkMessageWorkerId(message)
1550 if (message.ready != null && message.taskFunctionNames != null) {
1551 // Worker ready response received from worker
1552 this.handleWorkerReadyResponse(message)
1553 } else if (message.taskId != null) {
1554 // Task execution response received from worker
1555 this.handleTaskExecutionResponse(message)
1556 } else if (message.taskFunctionNames != null) {
1557 // Task function names message received from worker
1558 this.getWorkerInfo(
1559 this.getWorkerNodeKeyByWorkerId(message.workerId)
1560 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1561 }
1562 }
1563
10e2aa7e 1564 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1565 if (message.ready === false) {
72ae84a2
JB
1566 throw new Error(
1567 `Worker ${message.workerId as number} failed to initialize`
1568 )
f05ed93c 1569 }
a5d15204 1570 const workerInfo = this.getWorkerInfo(
aad6fb64 1571 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1572 )
a5d15204 1573 workerInfo.ready = message.ready as boolean
6703b9f4 1574 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d 1575 if (this.ready) {
d91689fd
JB
1576 const emitPoolReadyEventOnce = once(
1577 () => this.emitter?.emit(PoolEvents.ready, this.info),
1578 this
1579 )
1580 emitPoolReadyEventOnce()
2431bdb4 1581 }
6b272951
JB
1582 }
1583
1584 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1585 const { taskId, workerError, data } = message
5441aea6 1586 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1587 if (promiseResponse != null) {
51af50ef 1588 const { resolve, reject, workerNodeKey } = promiseResponse
6703b9f4
JB
1589 if (workerError != null) {
1590 this.emitter?.emit(PoolEvents.taskError, workerError)
51af50ef 1591 reject(workerError.message)
6b272951 1592 } else {
51af50ef 1593 resolve(data as Response)
6b272951 1594 }
501aea93 1595 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1596 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1597 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1598 if (
1599 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1600 this.tasksQueueSize(workerNodeKey) > 0 &&
1601 this.workerNodes[workerNodeKey].usage.tasks.executing <
1602 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1603 ) {
1604 this.executeTask(
1605 workerNodeKey,
1606 this.dequeueTask(workerNodeKey) as Task<Data>
1607 )
be0676b3
APA
1608 }
1609 }
be0676b3 1610 }
7c0ba920 1611
a1763c54 1612 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1613 if (this.busy) {
1614 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1615 }
1616 }
1617
1618 private checkAndEmitTaskQueuingEvents (): void {
1619 if (this.hasBackPressure()) {
1620 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1621 }
1622 }
1623
33e6bb4c
JB
1624 private checkAndEmitDynamicWorkerCreationEvents (): void {
1625 if (this.type === PoolTypes.dynamic) {
1626 if (this.full) {
1627 this.emitter?.emit(PoolEvents.full, this.info)
1628 }
1629 }
1630 }
1631
8a1260a3 1632 /**
aa9eede8 1633 * Gets the worker information given its worker node key.
8a1260a3
JB
1634 *
1635 * @param workerNodeKey - The worker node key.
3f09ed9f 1636 * @returns The worker information.
8a1260a3 1637 */
46b0bb09 1638 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1639 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1640 }
1641
a05c10de 1642 /**
b0a4db63 1643 * Adds the given worker in the pool worker nodes.
ea7a90d3 1644 *
38e795c1 1645 * @param worker - The worker.
aa9eede8
JB
1646 * @returns The added worker node key.
1647 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1648 */
b0a4db63 1649 private addWorkerNode (worker: Worker): number {
671d5154
JB
1650 const workerNode = new WorkerNode<Worker, Data>(
1651 worker,
ff3f866a 1652 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1653 )
b97d82d8 1654 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1655 if (this.starting) {
1656 workerNode.info.ready = true
1657 }
aa9eede8 1658 this.workerNodes.push(workerNode)
aad6fb64 1659 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1660 if (workerNodeKey === -1) {
86ed0598 1661 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1662 }
1663 return workerNodeKey
ea7a90d3 1664 }
c923ce56 1665
51fe3d3c 1666 /**
f06e48d8 1667 * Removes the given worker from the pool worker nodes.
51fe3d3c 1668 *
f06e48d8 1669 * @param worker - The worker.
51fe3d3c 1670 */
416fd65c 1671 private removeWorkerNode (worker: Worker): void {
aad6fb64 1672 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1673 if (workerNodeKey !== -1) {
1674 this.workerNodes.splice(workerNodeKey, 1)
1675 this.workerChoiceStrategyContext.remove(workerNodeKey)
1676 }
51fe3d3c 1677 }
adc3c320 1678
ae3ab61d
JB
1679 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1680 this.getWorkerInfo(workerNodeKey).ready = false
1681 }
1682
e2b31e32
JB
1683 /** @inheritDoc */
1684 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1685 return (
e2b31e32
JB
1686 this.opts.enableTasksQueue === true &&
1687 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1688 )
1689 }
1690
1691 private hasBackPressure (): boolean {
1692 return (
1693 this.opts.enableTasksQueue === true &&
1694 this.workerNodes.findIndex(
041dc05b 1695 workerNode => !workerNode.hasBackPressure()
a1763c54 1696 ) === -1
9e844245 1697 )
e2b31e32
JB
1698 }
1699
b0a4db63 1700 /**
aa9eede8 1701 * Executes the given task on the worker given its worker node key.
b0a4db63 1702 *
aa9eede8 1703 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1704 * @param task - The task to execute.
1705 */
2e81254d 1706 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1707 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1708 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1709 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1710 }
1711
f9f00b5f 1712 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1713 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1714 this.checkAndEmitTaskQueuingEvents()
1715 return tasksQueueSize
adc3c320
JB
1716 }
1717
416fd65c 1718 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1719 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1720 }
1721
416fd65c 1722 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1723 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1724 }
1725
81c02522 1726 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1727 while (this.tasksQueueSize(workerNodeKey) > 0) {
1728 this.executeTask(
1729 workerNodeKey,
1730 this.dequeueTask(workerNodeKey) as Task<Data>
1731 )
ff733df7 1732 }
4b628b48 1733 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1734 }
1735
ef41a6e6
JB
1736 private flushTasksQueues (): void {
1737 for (const [workerNodeKey] of this.workerNodes.entries()) {
1738 this.flushTasksQueue(workerNodeKey)
1739 }
1740 }
c97c7edb 1741}