fix: fix pool ready event emission
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
bfc75cca 20 round
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
6703b9f4 23import type { TaskFunction } from '../worker/task-functions'
c4855468 24import {
65d7a1c9 25 type IPool,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
4d159167
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
9f95d5eb 37 WorkerNodeEventDetail,
4d159167
JB
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
bde6b5d7
JB
51import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
bfc75cca
JB
54 checkValidWorkerChoiceStrategy,
55 updateMeasurementStatistics
bde6b5d7 56} from './utils'
23ccf9d7 57
729c563d 58/**
ea7a90d3 59 * Base class that implements some shared logic for all poolifier pools.
729c563d 60 *
38e795c1 61 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
62 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
63 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 64 */
c97c7edb 65export abstract class AbstractPool<
f06e48d8 66 Worker extends IWorker,
d3c8a1a8
S
67 Data = unknown,
68 Response = unknown
c4855468 69> implements IPool<Worker, Data, Response> {
afc003b2 70 /** @inheritDoc */
4b628b48 71 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 72
afc003b2 73 /** @inheritDoc */
f80125ca 74 public emitter?: EventEmitterAsyncResource
7c0ba920 75
fcd39179
JB
76 /**
77 * Dynamic pool maximum size property placeholder.
78 */
79 protected readonly max?: number
80
be0676b3 81 /**
419e8121 82 * The task execution response promise map:
2740a743 83 * - `key`: The message id of each submitted task.
a3445496 84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 85 *
a3445496 86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 87 */
501aea93
JB
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 90
a35560ba 91 /**
51fe3d3c 92 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
93 */
94 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
95 Worker,
96 Data,
97 Response
a35560ba
S
98 >
99
72ae84a2
JB
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
15b176e0
JB
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
47352846
JB
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
711623b8
JB
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
55082af9
JB
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
afe0d5bf
JB
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
729c563d
S
128 /**
129 * Constructs a new poolifier pool.
130 *
38e795c1 131 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 132 * @param filePath - Path to the worker file.
38e795c1 133 * @param opts - Options for the pool.
729c563d 134 */
c97c7edb 135 public constructor (
b4213b7f
JB
136 protected readonly numberOfWorkers: number,
137 protected readonly filePath: string,
138 protected readonly opts: PoolOptions<Worker>
c97c7edb 139 ) {
78cea37e 140 if (!this.isMain()) {
04f45163 141 throw new Error(
8c6d4acf 142 'Cannot start a pool from a worker with the same type as the pool'
04f45163 143 )
c97c7edb 144 }
bde6b5d7 145 checkFilePath(this.filePath)
8003c026 146 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 147 this.checkPoolOptions(this.opts)
1086026a 148
7254e419
JB
149 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
150 this.executeTask = this.executeTask.bind(this)
151 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 152
6bd72cd0 153 if (this.opts.enableEvents === true) {
b5604034 154 this.initializeEventEmitter()
7c0ba920 155 }
d59df138
JB
156 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
157 Worker,
158 Data,
159 Response
da309861
JB
160 >(
161 this,
162 this.opts.workerChoiceStrategy,
163 this.opts.workerChoiceStrategyOptions
164 )
b6b32453
JB
165
166 this.setupHook()
167
72ae84a2
JB
168 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
169
47352846 170 this.started = false
075e51d1 171 this.starting = false
711623b8 172 this.destroying = false
55082af9 173 this.readyEventEmitted = false
47352846
JB
174 if (this.opts.startWorkers === true) {
175 this.start()
176 }
afe0d5bf
JB
177
178 this.startTimestamp = performance.now()
c97c7edb
S
179 }
180
8d3782fa
JB
181 private checkNumberOfWorkers (numberOfWorkers: number): void {
182 if (numberOfWorkers == null) {
183 throw new Error(
184 'Cannot instantiate a pool without specifying the number of workers'
185 )
78cea37e 186 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 187 throw new TypeError(
0d80593b 188 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
189 )
190 } else if (numberOfWorkers < 0) {
473c717a 191 throw new RangeError(
8d3782fa
JB
192 'Cannot instantiate a pool with a negative number of workers'
193 )
6b27d407 194 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
195 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
196 }
197 }
198
7c0ba920 199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 200 if (isPlainObject(opts)) {
47352846 201 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 202 checkValidWorkerChoiceStrategy(
2324f8c9
JB
203 opts.workerChoiceStrategy as WorkerChoiceStrategy
204 )
0d80593b
JB
205 this.opts.workerChoiceStrategy =
206 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
207 this.checkValidWorkerChoiceStrategyOptions(
208 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
209 )
8990357d
JB
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
1f68cede 214 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
215 this.opts.enableEvents = opts.enableEvents ?? true
216 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
217 if (this.opts.enableTasksQueue) {
bde6b5d7 218 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
219 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
220 opts.tasksQueueOptions as TasksQueueOptions
221 )
222 }
223 } else {
224 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 225 }
aee46736
JB
226 }
227
0d80593b
JB
228 private checkValidWorkerChoiceStrategyOptions (
229 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
230 ): void {
2324f8c9
JB
231 if (
232 workerChoiceStrategyOptions != null &&
233 !isPlainObject(workerChoiceStrategyOptions)
234 ) {
0d80593b
JB
235 throw new TypeError(
236 'Invalid worker choice strategy options: must be a plain object'
237 )
238 }
8990357d 239 if (
2324f8c9 240 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 241 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
242 ) {
243 throw new TypeError(
8c0b113f 244 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
245 )
246 }
247 if (
2324f8c9 248 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 249 workerChoiceStrategyOptions.retries < 0
8990357d
JB
250 ) {
251 throw new RangeError(
8c0b113f 252 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
253 )
254 }
49be33fe 255 if (
2324f8c9 256 workerChoiceStrategyOptions?.weights != null &&
6b27d407 257 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
258 ) {
259 throw new Error(
260 'Invalid worker choice strategy options: must have a weight for each worker node'
261 )
262 }
f0d7f803 263 if (
2324f8c9 264 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
265 !Object.values(Measurements).includes(
266 workerChoiceStrategyOptions.measurement
267 )
268 ) {
269 throw new Error(
270 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
271 )
272 }
0d80593b
JB
273 }
274
b5604034 275 private initializeEventEmitter (): void {
5b7d00b4 276 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
277 name: `poolifier:${this.type}-${this.worker}-pool`
278 })
279 }
280
08f3f44c 281 /** @inheritDoc */
6b27d407
JB
282 public get info (): PoolInfo {
283 return {
23ccf9d7 284 version,
6b27d407 285 type: this.type,
184855e6 286 worker: this.worker,
47352846 287 started: this.started,
2431bdb4
JB
288 ready: this.ready,
289 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
290 minSize: this.minSize,
291 maxSize: this.maxSize,
c05f0d50
JB
292 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
293 .runTime.aggregate &&
1305e9a8
JB
294 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
295 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
296 workerNodes: this.workerNodes.length,
297 idleWorkerNodes: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
f59e1027 299 workerNode.usage.tasks.executing === 0
a4e07f72
JB
300 ? accumulator + 1
301 : accumulator,
6b27d407
JB
302 0
303 ),
304 busyWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
f59e1027 306 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
307 0
308 ),
a4e07f72 309 executedTasks: this.workerNodes.reduce(
6b27d407 310 (accumulator, workerNode) =>
f59e1027 311 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
312 0
313 ),
314 executingTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
f59e1027 316 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
317 0
318 ),
daf86646
JB
319 ...(this.opts.enableTasksQueue === true && {
320 queuedTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.queued,
323 0
324 )
325 }),
326 ...(this.opts.enableTasksQueue === true && {
327 maxQueuedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
330 0
331 )
332 }),
a1763c54
JB
333 ...(this.opts.enableTasksQueue === true && {
334 backPressure: this.hasBackPressure()
335 }),
68cbdc84
JB
336 ...(this.opts.enableTasksQueue === true && {
337 stolenTasks: this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + workerNode.usage.tasks.stolen,
340 0
341 )
342 }),
a4e07f72
JB
343 failedTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
f59e1027 345 accumulator + workerNode.usage.tasks.failed,
a4e07f72 346 0
1dcf8b7b
JB
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.aggregate && {
350 runTime: {
98e72cda 351 minimum: round(
90d6701c 352 min(
98e72cda 353 ...this.workerNodes.map(
041dc05b 354 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 355 )
1dcf8b7b
JB
356 )
357 ),
98e72cda 358 maximum: round(
90d6701c 359 max(
98e72cda 360 ...this.workerNodes.map(
041dc05b 361 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 362 )
1dcf8b7b 363 )
98e72cda 364 ),
3baa0837
JB
365 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
366 .runTime.average && {
367 average: round(
368 average(
369 this.workerNodes.reduce<number[]>(
370 (accumulator, workerNode) =>
371 accumulator.concat(workerNode.usage.runTime.history),
372 []
373 )
98e72cda 374 )
dc021bcc 375 )
3baa0837 376 }),
98e72cda
JB
377 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
378 .runTime.median && {
379 median: round(
380 median(
3baa0837
JB
381 this.workerNodes.reduce<number[]>(
382 (accumulator, workerNode) =>
383 accumulator.concat(workerNode.usage.runTime.history),
384 []
98e72cda
JB
385 )
386 )
387 )
388 })
1dcf8b7b
JB
389 }
390 }),
391 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 .waitTime.aggregate && {
393 waitTime: {
98e72cda 394 minimum: round(
90d6701c 395 min(
98e72cda 396 ...this.workerNodes.map(
041dc05b 397 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 398 )
1dcf8b7b
JB
399 )
400 ),
98e72cda 401 maximum: round(
90d6701c 402 max(
98e72cda 403 ...this.workerNodes.map(
041dc05b 404 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 405 )
1dcf8b7b 406 )
98e72cda 407 ),
3baa0837
JB
408 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
409 .waitTime.average && {
410 average: round(
411 average(
412 this.workerNodes.reduce<number[]>(
413 (accumulator, workerNode) =>
414 accumulator.concat(workerNode.usage.waitTime.history),
415 []
416 )
98e72cda 417 )
dc021bcc 418 )
3baa0837 419 }),
98e72cda
JB
420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
421 .waitTime.median && {
422 median: round(
423 median(
3baa0837
JB
424 this.workerNodes.reduce<number[]>(
425 (accumulator, workerNode) =>
426 accumulator.concat(workerNode.usage.waitTime.history),
427 []
98e72cda
JB
428 )
429 )
430 )
431 })
1dcf8b7b
JB
432 }
433 })
6b27d407
JB
434 }
435 }
08f3f44c 436
aa9eede8
JB
437 /**
438 * The pool readiness boolean status.
439 */
2431bdb4
JB
440 private get ready (): boolean {
441 return (
b97d82d8
JB
442 this.workerNodes.reduce(
443 (accumulator, workerNode) =>
444 !workerNode.info.dynamic && workerNode.info.ready
445 ? accumulator + 1
446 : accumulator,
447 0
448 ) >= this.minSize
2431bdb4
JB
449 )
450 }
451
afe0d5bf 452 /**
aa9eede8 453 * The approximate pool utilization.
afe0d5bf
JB
454 *
455 * @returns The pool utilization.
456 */
457 private get utilization (): number {
8e5ca040 458 const poolTimeCapacity =
fe7d90db 459 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
460 const totalTasksRunTime = this.workerNodes.reduce(
461 (accumulator, workerNode) =>
71514351 462 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
463 0
464 )
465 const totalTasksWaitTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
71514351 467 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
468 0
469 )
8e5ca040 470 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
471 }
472
8881ae32 473 /**
aa9eede8 474 * The pool type.
8881ae32
JB
475 *
476 * If it is `'dynamic'`, it provides the `max` property.
477 */
478 protected abstract get type (): PoolType
479
184855e6 480 /**
aa9eede8 481 * The worker type.
184855e6
JB
482 */
483 protected abstract get worker (): WorkerType
484
c2ade475 485 /**
aa9eede8 486 * The pool minimum size.
c2ade475 487 */
8735b4e5
JB
488 protected get minSize (): number {
489 return this.numberOfWorkers
490 }
ff733df7
JB
491
492 /**
aa9eede8 493 * The pool maximum size.
ff733df7 494 */
8735b4e5
JB
495 protected get maxSize (): number {
496 return this.max ?? this.numberOfWorkers
497 }
a35560ba 498
6b813701
JB
499 /**
500 * Checks if the worker id sent in the received message from a worker is valid.
501 *
502 * @param message - The received message.
503 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
504 */
4d159167 505 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
506 if (message.workerId == null) {
507 throw new Error('Worker message received without worker id')
8e5a7cf9 508 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
509 throw new Error(
510 `Worker message received from unknown worker '${message.workerId}'`
511 )
512 }
513 }
514
ffcbbad8 515 /**
f06e48d8 516 * Gets the given worker its worker node key.
ffcbbad8
JB
517 *
518 * @param worker - The worker.
f59e1027 519 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 520 */
aad6fb64 521 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 522 return this.workerNodes.findIndex(
041dc05b 523 workerNode => workerNode.worker === worker
f06e48d8 524 )
bf9549ae
JB
525 }
526
aa9eede8
JB
527 /**
528 * Gets the worker node key given its worker id.
529 *
530 * @param workerId - The worker id.
aad6fb64 531 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 532 */
72ae84a2 533 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 534 return this.workerNodes.findIndex(
041dc05b 535 workerNode => workerNode.info.id === workerId
aad6fb64 536 )
aa9eede8
JB
537 }
538
afc003b2 539 /** @inheritDoc */
a35560ba 540 public setWorkerChoiceStrategy (
59219cbb
JB
541 workerChoiceStrategy: WorkerChoiceStrategy,
542 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 543 ): void {
bde6b5d7 544 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 545 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
546 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
547 this.opts.workerChoiceStrategy
548 )
549 if (workerChoiceStrategyOptions != null) {
550 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
551 }
aa9eede8 552 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 553 workerNode.resetUsage()
9edb9717 554 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 555 }
a20f0ba5
JB
556 }
557
558 /** @inheritDoc */
559 public setWorkerChoiceStrategyOptions (
560 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
561 ): void {
0d80593b 562 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
563 this.opts.workerChoiceStrategyOptions = {
564 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
565 ...workerChoiceStrategyOptions
566 }
a20f0ba5
JB
567 this.workerChoiceStrategyContext.setOptions(
568 this.opts.workerChoiceStrategyOptions
a35560ba
S
569 )
570 }
571
a20f0ba5 572 /** @inheritDoc */
8f52842f
JB
573 public enableTasksQueue (
574 enable: boolean,
575 tasksQueueOptions?: TasksQueueOptions
576 ): void {
a20f0ba5 577 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
578 this.unsetTaskStealing()
579 this.unsetTasksStealingOnBackPressure()
ef41a6e6 580 this.flushTasksQueues()
a20f0ba5
JB
581 }
582 this.opts.enableTasksQueue = enable
8f52842f 583 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
584 }
585
586 /** @inheritDoc */
8f52842f 587 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 588 if (this.opts.enableTasksQueue === true) {
bde6b5d7 589 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
590 this.opts.tasksQueueOptions =
591 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 592 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
593 if (this.opts.tasksQueueOptions.taskStealing === true) {
594 this.setTaskStealing()
595 } else {
596 this.unsetTaskStealing()
597 }
598 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
599 this.setTasksStealingOnBackPressure()
600 } else {
601 this.unsetTasksStealingOnBackPressure()
602 }
5baee0d7 603 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
604 delete this.opts.tasksQueueOptions
605 }
606 }
607
9b38ab2d
JB
608 private buildTasksQueueOptions (
609 tasksQueueOptions: TasksQueueOptions
610 ): TasksQueueOptions {
611 return {
612 ...{
613 size: Math.pow(this.maxSize, 2),
614 concurrency: 1,
615 taskStealing: true,
616 tasksStealingOnBackPressure: true
617 },
618 ...tasksQueueOptions
619 }
620 }
621
5b49e864 622 private setTasksQueueSize (size: number): void {
20c6f652 623 for (const workerNode of this.workerNodes) {
ff3f866a 624 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
625 }
626 }
627
d6ca1416
JB
628 private setTaskStealing (): void {
629 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 630 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 631 'emptyQueue',
9b85e2d0 632 this.handleEmptyQueueEvent as EventListener
9f95d5eb 633 )
d6ca1416
JB
634 }
635 }
636
637 private unsetTaskStealing (): void {
638 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 639 this.workerNodes[workerNodeKey].removeEventListener(
b5e75be8 640 'emptyQueue',
9b85e2d0 641 this.handleEmptyQueueEvent as EventListener
9f95d5eb 642 )
d6ca1416
JB
643 }
644 }
645
646 private setTasksStealingOnBackPressure (): void {
647 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 648 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 649 'backPressure',
9b85e2d0 650 this.handleBackPressureEvent as EventListener
9f95d5eb 651 )
d6ca1416
JB
652 }
653 }
654
655 private unsetTasksStealingOnBackPressure (): void {
656 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 657 this.workerNodes[workerNodeKey].removeEventListener(
b5e75be8 658 'backPressure',
9b85e2d0 659 this.handleBackPressureEvent as EventListener
9f95d5eb 660 )
d6ca1416
JB
661 }
662 }
663
c319c66b
JB
664 /**
665 * Whether the pool is full or not.
666 *
667 * The pool filling boolean status.
668 */
dea903a8
JB
669 protected get full (): boolean {
670 return this.workerNodes.length >= this.maxSize
671 }
c2ade475 672
c319c66b
JB
673 /**
674 * Whether the pool is busy or not.
675 *
676 * The pool busyness boolean status.
677 */
678 protected abstract get busy (): boolean
7c0ba920 679
6c6afb84 680 /**
3d76750a 681 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
682 *
683 * @returns Worker nodes busyness boolean status.
684 */
c2ade475 685 protected internalBusy (): boolean {
3d76750a
JB
686 if (this.opts.enableTasksQueue === true) {
687 return (
688 this.workerNodes.findIndex(
041dc05b 689 workerNode =>
3d76750a
JB
690 workerNode.info.ready &&
691 workerNode.usage.tasks.executing <
692 (this.opts.tasksQueueOptions?.concurrency as number)
693 ) === -1
694 )
3d76750a 695 }
419e8121
JB
696 return (
697 this.workerNodes.findIndex(
698 workerNode =>
699 workerNode.info.ready && workerNode.usage.tasks.executing === 0
700 ) === -1
701 )
cb70b19d
JB
702 }
703
e81c38f2 704 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
705 workerNodeKey: number,
706 message: MessageValue<Data>
707 ): Promise<boolean> {
72ae84a2 708 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
709 const taskFunctionOperationListener = (
710 message: MessageValue<Response>
711 ): void => {
712 this.checkMessageWorkerId(message)
713 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 714 if (
ae036c3e
JB
715 message.taskFunctionOperationStatus != null &&
716 message.workerId === workerId
72ae84a2 717 ) {
ae036c3e
JB
718 if (message.taskFunctionOperationStatus) {
719 resolve(true)
720 } else if (!message.taskFunctionOperationStatus) {
721 reject(
722 new Error(
723 `Task function operation '${
724 message.taskFunctionOperation as string
725 }' failed on worker ${message.workerId} with error: '${
726 message.workerError?.message as string
727 }'`
728 )
72ae84a2 729 )
ae036c3e
JB
730 }
731 this.deregisterWorkerMessageListener(
732 this.getWorkerNodeKeyByWorkerId(message.workerId),
733 taskFunctionOperationListener
72ae84a2
JB
734 )
735 }
ae036c3e
JB
736 }
737 this.registerWorkerMessageListener(
738 workerNodeKey,
739 taskFunctionOperationListener
740 )
72ae84a2
JB
741 this.sendToWorker(workerNodeKey, message)
742 })
743 }
744
745 private async sendTaskFunctionOperationToWorkers (
adee6053 746 message: MessageValue<Data>
e81c38f2
JB
747 ): Promise<boolean> {
748 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
749 const responsesReceived = new Array<MessageValue<Response>>()
750 const taskFunctionOperationsListener = (
751 message: MessageValue<Response>
752 ): void => {
753 this.checkMessageWorkerId(message)
754 if (message.taskFunctionOperationStatus != null) {
755 responsesReceived.push(message)
756 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 757 if (
e81c38f2
JB
758 responsesReceived.every(
759 message => message.taskFunctionOperationStatus === true
760 )
761 ) {
762 resolve(true)
763 } else if (
e81c38f2
JB
764 responsesReceived.some(
765 message => message.taskFunctionOperationStatus === false
766 )
767 ) {
b0b55f57
JB
768 const errorResponse = responsesReceived.find(
769 response => response.taskFunctionOperationStatus === false
770 )
e81c38f2
JB
771 reject(
772 new Error(
b0b55f57 773 `Task function operation '${
e81c38f2 774 message.taskFunctionOperation as string
b0b55f57
JB
775 }' failed on worker ${
776 errorResponse?.workerId as number
777 } with error: '${
778 errorResponse?.workerError?.message as string
779 }'`
e81c38f2
JB
780 )
781 )
782 }
ae036c3e
JB
783 this.deregisterWorkerMessageListener(
784 this.getWorkerNodeKeyByWorkerId(message.workerId),
785 taskFunctionOperationsListener
786 )
e81c38f2 787 }
ae036c3e
JB
788 }
789 }
790 for (const [workerNodeKey] of this.workerNodes.entries()) {
791 this.registerWorkerMessageListener(
792 workerNodeKey,
793 taskFunctionOperationsListener
794 )
72ae84a2 795 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
796 }
797 })
6703b9f4
JB
798 }
799
800 /** @inheritDoc */
801 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
802 for (const workerNode of this.workerNodes) {
803 if (
804 Array.isArray(workerNode.info.taskFunctionNames) &&
805 workerNode.info.taskFunctionNames.includes(name)
806 ) {
807 return true
808 }
809 }
810 return false
6703b9f4
JB
811 }
812
813 /** @inheritDoc */
e81c38f2
JB
814 public async addTaskFunction (
815 name: string,
3feeab69 816 fn: TaskFunction<Data, Response>
e81c38f2 817 ): Promise<boolean> {
3feeab69
JB
818 if (typeof name !== 'string') {
819 throw new TypeError('name argument must be a string')
820 }
821 if (typeof name === 'string' && name.trim().length === 0) {
822 throw new TypeError('name argument must not be an empty string')
823 }
824 if (typeof fn !== 'function') {
825 throw new TypeError('fn argument must be a function')
826 }
adee6053 827 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
828 taskFunctionOperation: 'add',
829 taskFunctionName: name,
3feeab69 830 taskFunction: fn.toString()
6703b9f4 831 })
adee6053
JB
832 this.taskFunctions.set(name, fn)
833 return opResult
6703b9f4
JB
834 }
835
836 /** @inheritDoc */
e81c38f2 837 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
838 if (!this.taskFunctions.has(name)) {
839 throw new Error(
16248b23 840 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
841 )
842 }
adee6053 843 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
844 taskFunctionOperation: 'remove',
845 taskFunctionName: name
846 })
adee6053
JB
847 this.deleteTaskFunctionWorkerUsages(name)
848 this.taskFunctions.delete(name)
849 return opResult
6703b9f4
JB
850 }
851
90d7d101 852 /** @inheritDoc */
6703b9f4 853 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
854 for (const workerNode of this.workerNodes) {
855 if (
6703b9f4
JB
856 Array.isArray(workerNode.info.taskFunctionNames) &&
857 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 858 ) {
6703b9f4 859 return workerNode.info.taskFunctionNames
f2dbbf95 860 }
90d7d101 861 }
f2dbbf95 862 return []
90d7d101
JB
863 }
864
6703b9f4 865 /** @inheritDoc */
e81c38f2 866 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 867 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
868 taskFunctionOperation: 'default',
869 taskFunctionName: name
870 })
6703b9f4
JB
871 }
872
adee6053
JB
873 private deleteTaskFunctionWorkerUsages (name: string): void {
874 for (const workerNode of this.workerNodes) {
875 workerNode.deleteTaskFunctionWorkerUsage(name)
876 }
877 }
878
375f7504
JB
879 private shallExecuteTask (workerNodeKey: number): boolean {
880 return (
881 this.tasksQueueSize(workerNodeKey) === 0 &&
882 this.workerNodes[workerNodeKey].usage.tasks.executing <
883 (this.opts.tasksQueueOptions?.concurrency as number)
884 )
885 }
886
afc003b2 887 /** @inheritDoc */
7d91a8cd
JB
888 public async execute (
889 data?: Data,
890 name?: string,
891 transferList?: TransferListItem[]
892 ): Promise<Response> {
52b71763 893 return await new Promise<Response>((resolve, reject) => {
15b176e0 894 if (!this.started) {
47352846 895 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 896 return
15b176e0 897 }
711623b8
JB
898 if (this.destroying) {
899 reject(new Error('Cannot execute a task on destroying pool'))
900 return
901 }
7d91a8cd
JB
902 if (name != null && typeof name !== 'string') {
903 reject(new TypeError('name argument must be a string'))
9d2d0da1 904 return
7d91a8cd 905 }
90d7d101
JB
906 if (
907 name != null &&
908 typeof name === 'string' &&
909 name.trim().length === 0
910 ) {
f58b60b9 911 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 912 return
90d7d101 913 }
b558f6b5
JB
914 if (transferList != null && !Array.isArray(transferList)) {
915 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 916 return
b558f6b5
JB
917 }
918 const timestamp = performance.now()
919 const workerNodeKey = this.chooseWorkerNode()
501aea93 920 const task: Task<Data> = {
52b71763
JB
921 name: name ?? DEFAULT_TASK_NAME,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data: data ?? ({} as Data),
7d91a8cd 924 transferList,
52b71763 925 timestamp,
7629bdf1 926 taskId: randomUUID()
52b71763 927 }
7629bdf1 928 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
929 resolve,
930 reject,
501aea93 931 workerNodeKey
2e81254d 932 })
52b71763 933 if (
4e377863
JB
934 this.opts.enableTasksQueue === false ||
935 (this.opts.enableTasksQueue === true &&
375f7504 936 this.shallExecuteTask(workerNodeKey))
52b71763 937 ) {
501aea93 938 this.executeTask(workerNodeKey, task)
4e377863
JB
939 } else {
940 this.enqueueTask(workerNodeKey, task)
52b71763 941 }
2e81254d 942 })
280c2a77 943 }
c97c7edb 944
47352846
JB
945 /** @inheritdoc */
946 public start (): void {
711623b8
JB
947 if (this.started) {
948 throw new Error('Cannot start an already started pool')
949 }
950 if (this.starting) {
951 throw new Error('Cannot start an already starting pool')
952 }
953 if (this.destroying) {
954 throw new Error('Cannot start a destroying pool')
955 }
47352846
JB
956 this.starting = true
957 while (
958 this.workerNodes.reduce(
959 (accumulator, workerNode) =>
960 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
961 0
962 ) < this.numberOfWorkers
963 ) {
964 this.createAndSetupWorkerNode()
965 }
966 this.starting = false
967 this.started = true
968 }
969
afc003b2 970 /** @inheritDoc */
c97c7edb 971 public async destroy (): Promise<void> {
711623b8
JB
972 if (!this.started) {
973 throw new Error('Cannot destroy an already destroyed pool')
974 }
975 if (this.starting) {
976 throw new Error('Cannot destroy an starting pool')
977 }
978 if (this.destroying) {
979 throw new Error('Cannot destroy an already destroying pool')
980 }
981 this.destroying = true
1fbcaa7c 982 await Promise.all(
81c02522 983 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 984 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
985 })
986 )
33e6bb4c 987 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 988 this.emitter?.emitDestroy()
55082af9 989 this.readyEventEmitted = false
711623b8 990 this.destroying = false
15b176e0 991 this.started = false
c97c7edb
S
992 }
993
1e3214b6 994 protected async sendKillMessageToWorker (
72ae84a2 995 workerNodeKey: number
1e3214b6 996 ): Promise<void> {
9edb9717 997 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
998 const killMessageListener = (message: MessageValue<Response>): void => {
999 this.checkMessageWorkerId(message)
1e3214b6
JB
1000 if (message.kill === 'success') {
1001 resolve()
1002 } else if (message.kill === 'failure') {
72ae84a2
JB
1003 reject(
1004 new Error(
ae036c3e 1005 `Kill message handling failed on worker ${
72ae84a2 1006 message.workerId as number
ae036c3e 1007 }`
72ae84a2
JB
1008 )
1009 )
1e3214b6 1010 }
ae036c3e 1011 }
51d9cfbd 1012 // FIXME: should be registered only once
ae036c3e 1013 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1014 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1015 })
1e3214b6
JB
1016 }
1017
4a6952ff 1018 /**
aa9eede8 1019 * Terminates the worker node given its worker node key.
4a6952ff 1020 *
aa9eede8 1021 * @param workerNodeKey - The worker node key.
4a6952ff 1022 */
81c02522 1023 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1024
729c563d 1025 /**
6677a3d3
JB
1026 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1027 * Can be overridden.
afc003b2
JB
1028 *
1029 * @virtual
729c563d 1030 */
280c2a77 1031 protected setupHook (): void {
965df41c 1032 /* Intentionally empty */
280c2a77 1033 }
c97c7edb 1034
729c563d 1035 /**
280c2a77
S
1036 * Should return whether the worker is the main worker or not.
1037 */
1038 protected abstract isMain (): boolean
1039
1040 /**
2e81254d 1041 * Hook executed before the worker task execution.
bf9549ae 1042 * Can be overridden.
729c563d 1043 *
f06e48d8 1044 * @param workerNodeKey - The worker node key.
1c6fe997 1045 * @param task - The task to execute.
729c563d 1046 */
1c6fe997
JB
1047 protected beforeTaskExecutionHook (
1048 workerNodeKey: number,
1049 task: Task<Data>
1050 ): void {
94407def
JB
1051 if (this.workerNodes[workerNodeKey]?.usage != null) {
1052 const workerUsage = this.workerNodes[workerNodeKey].usage
1053 ++workerUsage.tasks.executing
1054 this.updateWaitTimeWorkerUsage(workerUsage, task)
1055 }
1056 if (
1057 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1058 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1059 task.name as string
1060 ) != null
1061 ) {
db0e38ee 1062 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1063 workerNodeKey
db0e38ee 1064 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1065 ++taskFunctionWorkerUsage.tasks.executing
1066 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1067 }
c97c7edb
S
1068 }
1069
c01733f1 1070 /**
2e81254d 1071 * Hook executed after the worker task execution.
bf9549ae 1072 * Can be overridden.
c01733f1 1073 *
501aea93 1074 * @param workerNodeKey - The worker node key.
38e795c1 1075 * @param message - The received message.
c01733f1 1076 */
2e81254d 1077 protected afterTaskExecutionHook (
501aea93 1078 workerNodeKey: number,
2740a743 1079 message: MessageValue<Response>
bf9549ae 1080 ): void {
94407def
JB
1081 if (this.workerNodes[workerNodeKey]?.usage != null) {
1082 const workerUsage = this.workerNodes[workerNodeKey].usage
1083 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1084 this.updateRunTimeWorkerUsage(workerUsage, message)
1085 this.updateEluWorkerUsage(workerUsage, message)
1086 }
1087 if (
1088 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1089 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1090 message.taskPerformance?.name as string
94407def
JB
1091 ) != null
1092 ) {
db0e38ee 1093 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1094 workerNodeKey
db0e38ee 1095 ].getTaskFunctionWorkerUsage(
0628755c 1096 message.taskPerformance?.name as string
b558f6b5 1097 ) as WorkerUsage
db0e38ee
JB
1098 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1099 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1100 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1101 }
1102 }
1103
db0e38ee
JB
1104 /**
1105 * Whether the worker node shall update its task function worker usage or not.
1106 *
1107 * @param workerNodeKey - The worker node key.
1108 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1109 */
1110 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1111 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1112 return (
94407def 1113 workerInfo != null &&
6703b9f4
JB
1114 Array.isArray(workerInfo.taskFunctionNames) &&
1115 workerInfo.taskFunctionNames.length > 2
b558f6b5 1116 )
f1c06930
JB
1117 }
1118
1119 private updateTaskStatisticsWorkerUsage (
1120 workerUsage: WorkerUsage,
1121 message: MessageValue<Response>
1122 ): void {
a4e07f72 1123 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1124 if (
1125 workerTaskStatistics.executing != null &&
1126 workerTaskStatistics.executing > 0
1127 ) {
1128 --workerTaskStatistics.executing
5bb5be17 1129 }
6703b9f4 1130 if (message.workerError == null) {
98e72cda
JB
1131 ++workerTaskStatistics.executed
1132 } else {
a4e07f72 1133 ++workerTaskStatistics.failed
2740a743 1134 }
f8eb0a2a
JB
1135 }
1136
a4e07f72
JB
1137 private updateRunTimeWorkerUsage (
1138 workerUsage: WorkerUsage,
f8eb0a2a
JB
1139 message: MessageValue<Response>
1140 ): void {
6703b9f4 1141 if (message.workerError != null) {
dc021bcc
JB
1142 return
1143 }
e4f20deb
JB
1144 updateMeasurementStatistics(
1145 workerUsage.runTime,
1146 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1147 message.taskPerformance?.runTime ?? 0
e4f20deb 1148 )
f8eb0a2a
JB
1149 }
1150
a4e07f72
JB
1151 private updateWaitTimeWorkerUsage (
1152 workerUsage: WorkerUsage,
1c6fe997 1153 task: Task<Data>
f8eb0a2a 1154 ): void {
1c6fe997
JB
1155 const timestamp = performance.now()
1156 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1157 updateMeasurementStatistics(
1158 workerUsage.waitTime,
1159 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1160 taskWaitTime
e4f20deb 1161 )
c01733f1 1162 }
1163
a4e07f72 1164 private updateEluWorkerUsage (
5df69fab 1165 workerUsage: WorkerUsage,
62c15a68
JB
1166 message: MessageValue<Response>
1167 ): void {
6703b9f4 1168 if (message.workerError != null) {
dc021bcc
JB
1169 return
1170 }
008512c7
JB
1171 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1172 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1173 updateMeasurementStatistics(
1174 workerUsage.elu.active,
008512c7 1175 eluTaskStatisticsRequirements,
dc021bcc 1176 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1177 )
1178 updateMeasurementStatistics(
1179 workerUsage.elu.idle,
008512c7 1180 eluTaskStatisticsRequirements,
dc021bcc 1181 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1182 )
008512c7 1183 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1184 if (message.taskPerformance?.elu != null) {
f7510105
JB
1185 if (workerUsage.elu.utilization != null) {
1186 workerUsage.elu.utilization =
1187 (workerUsage.elu.utilization +
1188 message.taskPerformance.elu.utilization) /
1189 2
1190 } else {
1191 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1192 }
62c15a68
JB
1193 }
1194 }
1195 }
1196
280c2a77 1197 /**
f06e48d8 1198 * Chooses a worker node for the next task.
280c2a77 1199 *
6c6afb84 1200 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1201 *
aa9eede8 1202 * @returns The chosen worker node key
280c2a77 1203 */
6c6afb84 1204 private chooseWorkerNode (): number {
930dcf12 1205 if (this.shallCreateDynamicWorker()) {
aa9eede8 1206 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1207 if (
b1aae695 1208 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1209 ) {
aa9eede8 1210 return workerNodeKey
6c6afb84 1211 }
17393ac8 1212 }
930dcf12
JB
1213 return this.workerChoiceStrategyContext.execute()
1214 }
1215
6c6afb84
JB
1216 /**
1217 * Conditions for dynamic worker creation.
1218 *
1219 * @returns Whether to create a dynamic worker or not.
1220 */
1221 private shallCreateDynamicWorker (): boolean {
930dcf12 1222 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1223 }
1224
280c2a77 1225 /**
aa9eede8 1226 * Sends a message to worker given its worker node key.
280c2a77 1227 *
aa9eede8 1228 * @param workerNodeKey - The worker node key.
38e795c1 1229 * @param message - The message.
7d91a8cd 1230 * @param transferList - The optional array of transferable objects.
280c2a77
S
1231 */
1232 protected abstract sendToWorker (
aa9eede8 1233 workerNodeKey: number,
7d91a8cd
JB
1234 message: MessageValue<Data>,
1235 transferList?: TransferListItem[]
280c2a77
S
1236 ): void
1237
729c563d 1238 /**
41344292 1239 * Creates a new worker.
6c6afb84
JB
1240 *
1241 * @returns Newly created worker.
729c563d 1242 */
280c2a77 1243 protected abstract createWorker (): Worker
c97c7edb 1244
4a6952ff 1245 /**
aa9eede8 1246 * Creates a new, completely set up worker node.
4a6952ff 1247 *
aa9eede8 1248 * @returns New, completely set up worker node key.
4a6952ff 1249 */
aa9eede8 1250 protected createAndSetupWorkerNode (): number {
bdacc2d2 1251 const worker = this.createWorker()
280c2a77 1252
fd04474e 1253 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1254 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1255 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1256 worker.on('error', error => {
aad6fb64 1257 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1258 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1259 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1260 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1261 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1262 if (
b6bfca01 1263 this.started &&
9b38ab2d 1264 !this.starting &&
711623b8 1265 !this.destroying &&
9b38ab2d 1266 this.opts.restartWorkerOnError === true
15b176e0 1267 ) {
9b106837 1268 if (workerInfo.dynamic) {
aa9eede8 1269 this.createAndSetupDynamicWorkerNode()
8a1260a3 1270 } else {
aa9eede8 1271 this.createAndSetupWorkerNode()
8a1260a3 1272 }
5baee0d7 1273 }
6d59c3a3 1274 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1275 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1276 }
5baee0d7 1277 })
a35560ba 1278 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1279 worker.once('exit', () => {
f06e48d8 1280 this.removeWorkerNode(worker)
a974afa6 1281 })
280c2a77 1282
aa9eede8 1283 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1284
aa9eede8 1285 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1286
aa9eede8 1287 return workerNodeKey
c97c7edb 1288 }
be0676b3 1289
930dcf12 1290 /**
aa9eede8 1291 * Creates a new, completely set up dynamic worker node.
930dcf12 1292 *
aa9eede8 1293 * @returns New, completely set up dynamic worker node key.
930dcf12 1294 */
aa9eede8
JB
1295 protected createAndSetupDynamicWorkerNode (): number {
1296 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1297 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1298 this.checkMessageWorkerId(message)
aa9eede8
JB
1299 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1300 message.workerId
aad6fb64 1301 )
aa9eede8 1302 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1303 // Kill message received from worker
930dcf12
JB
1304 if (
1305 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1306 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1307 ((this.opts.enableTasksQueue === false &&
aa9eede8 1308 workerUsage.tasks.executing === 0) ||
7b56f532 1309 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1310 workerUsage.tasks.executing === 0 &&
1311 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1312 ) {
f3827d5d 1313 // Flag the worker node as not ready immediately
ae3ab61d 1314 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1315 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1316 this.emitter?.emit(PoolEvents.error, error)
1317 })
930dcf12
JB
1318 }
1319 })
46b0bb09 1320 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1321 this.sendToWorker(workerNodeKey, {
e9dd5b66 1322 checkActive: true
21f710aa 1323 })
72ae84a2
JB
1324 if (this.taskFunctions.size > 0) {
1325 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1326 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1327 taskFunctionOperation: 'add',
1328 taskFunctionName,
1329 taskFunction: taskFunction.toString()
1330 }).catch(error => {
1331 this.emitter?.emit(PoolEvents.error, error)
1332 })
1333 }
1334 }
b5e113f6 1335 workerInfo.dynamic = true
b1aae695
JB
1336 if (
1337 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1338 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1339 ) {
b5e113f6
JB
1340 workerInfo.ready = true
1341 }
33e6bb4c 1342 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1343 return workerNodeKey
930dcf12
JB
1344 }
1345
a2ed5053 1346 /**
aa9eede8 1347 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1348 *
aa9eede8 1349 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1350 * @param listener - The message listener callback.
1351 */
85aeb3f3
JB
1352 protected abstract registerWorkerMessageListener<
1353 Message extends Data | Response
aa9eede8
JB
1354 >(
1355 workerNodeKey: number,
1356 listener: (message: MessageValue<Message>) => void
1357 ): void
a2ed5053 1358
ae036c3e
JB
1359 /**
1360 * Registers once a listener callback on the worker given its worker node key.
1361 *
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1364 */
1365 protected abstract registerOnceWorkerMessageListener<
1366 Message extends Data | Response
1367 >(
1368 workerNodeKey: number,
1369 listener: (message: MessageValue<Message>) => void
1370 ): void
1371
1372 /**
1373 * Deregisters a listener callback on the worker given its worker node key.
1374 *
1375 * @param workerNodeKey - The worker node key.
1376 * @param listener - The message listener callback.
1377 */
1378 protected abstract deregisterWorkerMessageListener<
1379 Message extends Data | Response
1380 >(
1381 workerNodeKey: number,
1382 listener: (message: MessageValue<Message>) => void
1383 ): void
1384
a2ed5053 1385 /**
aa9eede8 1386 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1387 * Can be overridden.
1388 *
aa9eede8 1389 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1390 */
aa9eede8 1391 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1392 // Listen to worker messages.
fcd39179
JB
1393 this.registerWorkerMessageListener(
1394 workerNodeKey,
1395 this.workerMessageListener.bind(this)
1396 )
85aeb3f3 1397 // Send the startup message to worker.
aa9eede8 1398 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1399 // Send the statistics message to worker.
1400 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1401 if (this.opts.enableTasksQueue === true) {
dbd73092 1402 if (this.opts.tasksQueueOptions?.taskStealing === true) {
9f95d5eb 1403 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 1404 'emptyQueue',
9b85e2d0 1405 this.handleEmptyQueueEvent as EventListener
9f95d5eb 1406 )
47352846
JB
1407 }
1408 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
9f95d5eb 1409 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 1410 'backPressure',
9b85e2d0 1411 this.handleBackPressureEvent as EventListener
9f95d5eb 1412 )
47352846 1413 }
72695f86 1414 }
d2c73f82
JB
1415 }
1416
85aeb3f3 1417 /**
aa9eede8
JB
1418 * Sends the startup message to worker given its worker node key.
1419 *
1420 * @param workerNodeKey - The worker node key.
1421 */
1422 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1423
1424 /**
9edb9717 1425 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1426 *
aa9eede8 1427 * @param workerNodeKey - The worker node key.
85aeb3f3 1428 */
9edb9717 1429 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1430 this.sendToWorker(workerNodeKey, {
1431 statistics: {
1432 runTime:
1433 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1434 .runTime.aggregate,
1435 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1436 .elu.aggregate
72ae84a2 1437 }
aa9eede8
JB
1438 })
1439 }
a2ed5053
JB
1440
1441 private redistributeQueuedTasks (workerNodeKey: number): void {
1442 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1443 const destinationWorkerNodeKey = this.workerNodes.reduce(
1444 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1445 return workerNode.info.ready &&
1446 workerNode.usage.tasks.queued <
1447 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1448 ? workerNodeKey
1449 : minWorkerNodeKey
1450 },
1451 0
1452 )
72ae84a2 1453 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1454 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1455 this.executeTask(destinationWorkerNodeKey, task)
1456 } else {
1457 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1458 }
1459 }
1460 }
1461
b1838604
JB
1462 private updateTaskStolenStatisticsWorkerUsage (
1463 workerNodeKey: number,
b1838604
JB
1464 taskName: string
1465 ): void {
1a880eca 1466 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1467 if (workerNode?.usage != null) {
1468 ++workerNode.usage.tasks.stolen
1469 }
1470 if (
1471 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1472 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1473 ) {
1474 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1475 taskName
1476 ) as WorkerUsage
1477 ++taskFunctionWorkerUsage.tasks.stolen
1478 }
1479 }
1480
9f95d5eb
JB
1481 private readonly handleEmptyQueueEvent = (
1482 event: CustomEvent<WorkerNodeEventDetail>
1483 ): void => {
b5e75be8
JB
1484 const { workerId } = event.detail
1485 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1486 const workerNodes = this.workerNodes
a6b3272b 1487 .slice()
dd951876
JB
1488 .sort(
1489 (workerNodeA, workerNodeB) =>
1490 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1491 )
f201a0cd 1492 const sourceWorkerNode = workerNodes.find(
041dc05b 1493 workerNode =>
f201a0cd 1494 workerNode.info.ready &&
b5e75be8 1495 workerNode.info.id !== workerId &&
f201a0cd
JB
1496 workerNode.usage.tasks.queued > 0
1497 )
1498 if (sourceWorkerNode != null) {
72ae84a2 1499 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1500 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1501 this.executeTask(destinationWorkerNodeKey, task)
1502 } else {
1503 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1504 }
f201a0cd
JB
1505 this.updateTaskStolenStatisticsWorkerUsage(
1506 destinationWorkerNodeKey,
1507 task.name as string
1508 )
72695f86
JB
1509 }
1510 }
1511
9f95d5eb
JB
1512 private readonly handleBackPressureEvent = (
1513 event: CustomEvent<WorkerNodeEventDetail>
1514 ): void => {
b5e75be8 1515 const { workerId } = event.detail
f778c355
JB
1516 const sizeOffset = 1
1517 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1518 return
1519 }
72695f86 1520 const sourceWorkerNode =
b5e75be8 1521 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1522 const workerNodes = this.workerNodes
a6b3272b 1523 .slice()
72695f86
JB
1524 .sort(
1525 (workerNodeA, workerNodeB) =>
1526 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1527 )
1528 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1529 if (
0bc68267 1530 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1531 workerNode.info.ready &&
b5e75be8 1532 workerNode.info.id !== workerId &&
0bc68267 1533 workerNode.usage.tasks.queued <
f778c355 1534 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1535 ) {
72ae84a2 1536 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1537 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1538 this.executeTask(workerNodeKey, task)
4de3d785 1539 } else {
dd951876 1540 this.enqueueTask(workerNodeKey, task)
4de3d785 1541 }
b1838604
JB
1542 this.updateTaskStolenStatisticsWorkerUsage(
1543 workerNodeKey,
b1838604
JB
1544 task.name as string
1545 )
10ecf8fd 1546 }
a2ed5053
JB
1547 }
1548 }
1549
be0676b3 1550 /**
fcd39179 1551 * This method is the message listener registered on each worker.
be0676b3 1552 */
fcd39179
JB
1553 protected workerMessageListener (message: MessageValue<Response>): void {
1554 this.checkMessageWorkerId(message)
1555 if (message.ready != null && message.taskFunctionNames != null) {
1556 // Worker ready response received from worker
1557 this.handleWorkerReadyResponse(message)
1558 } else if (message.taskId != null) {
1559 // Task execution response received from worker
1560 this.handleTaskExecutionResponse(message)
1561 } else if (message.taskFunctionNames != null) {
1562 // Task function names message received from worker
1563 this.getWorkerInfo(
1564 this.getWorkerNodeKeyByWorkerId(message.workerId)
1565 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1566 }
1567 }
1568
10e2aa7e 1569 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1570 if (message.ready === false) {
72ae84a2
JB
1571 throw new Error(
1572 `Worker ${message.workerId as number} failed to initialize`
1573 )
f05ed93c 1574 }
a5d15204 1575 const workerInfo = this.getWorkerInfo(
aad6fb64 1576 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1577 )
a5d15204 1578 workerInfo.ready = message.ready as boolean
6703b9f4 1579 workerInfo.taskFunctionNames = message.taskFunctionNames
55082af9
JB
1580 if (!this.readyEventEmitted && this.ready) {
1581 this.readyEventEmitted = true
1582 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1583 }
6b272951
JB
1584 }
1585
1586 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1587 const { taskId, workerError, data } = message
5441aea6 1588 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1589 if (promiseResponse != null) {
51af50ef 1590 const { resolve, reject, workerNodeKey } = promiseResponse
6703b9f4
JB
1591 if (workerError != null) {
1592 this.emitter?.emit(PoolEvents.taskError, workerError)
51af50ef 1593 reject(workerError.message)
6b272951 1594 } else {
51af50ef 1595 resolve(data as Response)
6b272951 1596 }
501aea93 1597 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1598 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1599 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1600 if (
1601 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1602 this.tasksQueueSize(workerNodeKey) > 0 &&
1603 this.workerNodes[workerNodeKey].usage.tasks.executing <
1604 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1605 ) {
1606 this.executeTask(
1607 workerNodeKey,
1608 this.dequeueTask(workerNodeKey) as Task<Data>
1609 )
be0676b3
APA
1610 }
1611 }
be0676b3 1612 }
7c0ba920 1613
a1763c54 1614 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1615 if (this.busy) {
1616 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1617 }
1618 }
1619
1620 private checkAndEmitTaskQueuingEvents (): void {
1621 if (this.hasBackPressure()) {
1622 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1623 }
1624 }
1625
33e6bb4c
JB
1626 private checkAndEmitDynamicWorkerCreationEvents (): void {
1627 if (this.type === PoolTypes.dynamic) {
1628 if (this.full) {
1629 this.emitter?.emit(PoolEvents.full, this.info)
1630 }
1631 }
1632 }
1633
8a1260a3 1634 /**
aa9eede8 1635 * Gets the worker information given its worker node key.
8a1260a3
JB
1636 *
1637 * @param workerNodeKey - The worker node key.
3f09ed9f 1638 * @returns The worker information.
8a1260a3 1639 */
46b0bb09 1640 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1641 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1642 }
1643
a05c10de 1644 /**
b0a4db63 1645 * Adds the given worker in the pool worker nodes.
ea7a90d3 1646 *
38e795c1 1647 * @param worker - The worker.
aa9eede8
JB
1648 * @returns The added worker node key.
1649 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1650 */
b0a4db63 1651 private addWorkerNode (worker: Worker): number {
671d5154
JB
1652 const workerNode = new WorkerNode<Worker, Data>(
1653 worker,
ff3f866a 1654 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1655 )
b97d82d8 1656 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1657 if (this.starting) {
1658 workerNode.info.ready = true
1659 }
aa9eede8 1660 this.workerNodes.push(workerNode)
aad6fb64 1661 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1662 if (workerNodeKey === -1) {
86ed0598 1663 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1664 }
1665 return workerNodeKey
ea7a90d3 1666 }
c923ce56 1667
51fe3d3c 1668 /**
f06e48d8 1669 * Removes the given worker from the pool worker nodes.
51fe3d3c 1670 *
f06e48d8 1671 * @param worker - The worker.
51fe3d3c 1672 */
416fd65c 1673 private removeWorkerNode (worker: Worker): void {
aad6fb64 1674 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1675 if (workerNodeKey !== -1) {
1676 this.workerNodes.splice(workerNodeKey, 1)
1677 this.workerChoiceStrategyContext.remove(workerNodeKey)
1678 }
51fe3d3c 1679 }
adc3c320 1680
ae3ab61d
JB
1681 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1682 this.getWorkerInfo(workerNodeKey).ready = false
1683 }
1684
e2b31e32
JB
1685 /** @inheritDoc */
1686 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1687 return (
e2b31e32
JB
1688 this.opts.enableTasksQueue === true &&
1689 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1690 )
1691 }
1692
1693 private hasBackPressure (): boolean {
1694 return (
1695 this.opts.enableTasksQueue === true &&
1696 this.workerNodes.findIndex(
041dc05b 1697 workerNode => !workerNode.hasBackPressure()
a1763c54 1698 ) === -1
9e844245 1699 )
e2b31e32
JB
1700 }
1701
b0a4db63 1702 /**
aa9eede8 1703 * Executes the given task on the worker given its worker node key.
b0a4db63 1704 *
aa9eede8 1705 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1706 * @param task - The task to execute.
1707 */
2e81254d 1708 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1709 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1710 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1711 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1712 }
1713
f9f00b5f 1714 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1715 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1716 this.checkAndEmitTaskQueuingEvents()
1717 return tasksQueueSize
adc3c320
JB
1718 }
1719
416fd65c 1720 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1721 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1722 }
1723
416fd65c 1724 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1725 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1726 }
1727
81c02522 1728 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1729 while (this.tasksQueueSize(workerNodeKey) > 0) {
1730 this.executeTask(
1731 workerNodeKey,
1732 this.dequeueTask(workerNodeKey) as Task<Data>
1733 )
ff733df7 1734 }
4b628b48 1735 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1736 }
1737
ef41a6e6
JB
1738 private flushTasksQueues (): void {
1739 for (const [workerNodeKey] of this.workerNodes.entries()) {
1740 this.flushTasksQueue(workerNodeKey)
1741 }
1742 }
c97c7edb 1743}