fix: fix continuous tasks stealing on idle start at worker node idling
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
bbeadd16 23} from '../utils'
59317253 24import { KillBehaviors } from '../worker/worker-options'
6703b9f4 25import type { TaskFunction } from '../worker/task-functions'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
9f95d5eb 39 WorkerNodeEventDetail,
4d159167
JB
40 WorkerType,
41 WorkerUsage
e102732c 42} from './worker'
a35560ba 43import {
008512c7 44 type MeasurementStatisticsRequirements,
f0d7f803 45 Measurements,
a35560ba 46 WorkerChoiceStrategies,
a20f0ba5
JB
47 type WorkerChoiceStrategy,
48 type WorkerChoiceStrategyOptions
bdaf31cd
JB
49} from './selection-strategies/selection-strategies-types'
50import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 51import { version } from './version'
4b628b48 52import { WorkerNode } from './worker-node'
bde6b5d7
JB
53import {
54 checkFilePath,
55 checkValidTasksQueueOptions,
bfc75cca
JB
56 checkValidWorkerChoiceStrategy,
57 updateMeasurementStatistics
bde6b5d7 58} from './utils'
23ccf9d7 59
729c563d 60/**
ea7a90d3 61 * Base class that implements some shared logic for all poolifier pools.
729c563d 62 *
38e795c1 63 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
64 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
65 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 66 */
c97c7edb 67export abstract class AbstractPool<
f06e48d8 68 Worker extends IWorker,
d3c8a1a8
S
69 Data = unknown,
70 Response = unknown
c4855468 71> implements IPool<Worker, Data, Response> {
afc003b2 72 /** @inheritDoc */
4b628b48 73 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 74
afc003b2 75 /** @inheritDoc */
f80125ca 76 public emitter?: EventEmitterAsyncResource
7c0ba920 77
fcd39179
JB
78 /**
79 * Dynamic pool maximum size property placeholder.
80 */
81 protected readonly max?: number
82
be0676b3 83 /**
419e8121 84 * The task execution response promise map:
2740a743 85 * - `key`: The message id of each submitted task.
a3445496 86 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 87 *
a3445496 88 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 89 */
501aea93
JB
90 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
91 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 92
a35560ba 93 /**
51fe3d3c 94 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
95 */
96 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
97 Worker,
98 Data,
99 Response
a35560ba
S
100 >
101
72ae84a2
JB
102 /**
103 * The task functions added at runtime map:
104 * - `key`: The task function name.
105 * - `value`: The task function itself.
106 */
107 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
108
15b176e0
JB
109 /**
110 * Whether the pool is started or not.
111 */
112 private started: boolean
47352846
JB
113 /**
114 * Whether the pool is starting or not.
115 */
116 private starting: boolean
711623b8
JB
117 /**
118 * Whether the pool is destroying or not.
119 */
120 private destroying: boolean
55082af9
JB
121 /**
122 * Whether the pool ready event has been emitted or not.
123 */
124 private readyEventEmitted: boolean
afe0d5bf
JB
125 /**
126 * The start timestamp of the pool.
127 */
128 private readonly startTimestamp
129
729c563d
S
130 /**
131 * Constructs a new poolifier pool.
132 *
38e795c1 133 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 134 * @param filePath - Path to the worker file.
38e795c1 135 * @param opts - Options for the pool.
729c563d 136 */
c97c7edb 137 public constructor (
b4213b7f
JB
138 protected readonly numberOfWorkers: number,
139 protected readonly filePath: string,
140 protected readonly opts: PoolOptions<Worker>
c97c7edb 141 ) {
78cea37e 142 if (!this.isMain()) {
04f45163 143 throw new Error(
8c6d4acf 144 'Cannot start a pool from a worker with the same type as the pool'
04f45163 145 )
c97c7edb 146 }
bde6b5d7 147 checkFilePath(this.filePath)
8003c026 148 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 149 this.checkPoolOptions(this.opts)
1086026a 150
7254e419
JB
151 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
152 this.executeTask = this.executeTask.bind(this)
153 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 154
6bd72cd0 155 if (this.opts.enableEvents === true) {
b5604034 156 this.initializeEventEmitter()
7c0ba920 157 }
d59df138
JB
158 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
159 Worker,
160 Data,
161 Response
da309861
JB
162 >(
163 this,
164 this.opts.workerChoiceStrategy,
165 this.opts.workerChoiceStrategyOptions
166 )
b6b32453
JB
167
168 this.setupHook()
169
72ae84a2
JB
170 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
171
47352846 172 this.started = false
075e51d1 173 this.starting = false
711623b8 174 this.destroying = false
55082af9 175 this.readyEventEmitted = false
47352846
JB
176 if (this.opts.startWorkers === true) {
177 this.start()
178 }
afe0d5bf
JB
179
180 this.startTimestamp = performance.now()
c97c7edb
S
181 }
182
8d3782fa
JB
183 private checkNumberOfWorkers (numberOfWorkers: number): void {
184 if (numberOfWorkers == null) {
185 throw new Error(
186 'Cannot instantiate a pool without specifying the number of workers'
187 )
78cea37e 188 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 189 throw new TypeError(
0d80593b 190 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
191 )
192 } else if (numberOfWorkers < 0) {
473c717a 193 throw new RangeError(
8d3782fa
JB
194 'Cannot instantiate a pool with a negative number of workers'
195 )
6b27d407 196 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
197 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
198 }
199 }
200
7c0ba920 201 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 202 if (isPlainObject(opts)) {
47352846 203 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 204 checkValidWorkerChoiceStrategy(
2324f8c9
JB
205 opts.workerChoiceStrategy as WorkerChoiceStrategy
206 )
0d80593b
JB
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
209 this.checkValidWorkerChoiceStrategyOptions(
210 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
211 )
8990357d
JB
212 this.opts.workerChoiceStrategyOptions = {
213 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
214 ...opts.workerChoiceStrategyOptions
215 }
1f68cede 216 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
217 this.opts.enableEvents = opts.enableEvents ?? true
218 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
219 if (this.opts.enableTasksQueue) {
bde6b5d7 220 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
221 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 }
225 } else {
226 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 227 }
aee46736
JB
228 }
229
0d80593b
JB
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
232 ): void {
2324f8c9
JB
233 if (
234 workerChoiceStrategyOptions != null &&
235 !isPlainObject(workerChoiceStrategyOptions)
236 ) {
0d80593b
JB
237 throw new TypeError(
238 'Invalid worker choice strategy options: must be a plain object'
239 )
240 }
8990357d 241 if (
2324f8c9 242 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 243 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
244 ) {
245 throw new TypeError(
8c0b113f 246 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
247 )
248 }
249 if (
2324f8c9 250 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 251 workerChoiceStrategyOptions.retries < 0
8990357d
JB
252 ) {
253 throw new RangeError(
8c0b113f 254 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
255 )
256 }
49be33fe 257 if (
2324f8c9 258 workerChoiceStrategyOptions?.weights != null &&
6b27d407 259 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
260 ) {
261 throw new Error(
262 'Invalid worker choice strategy options: must have a weight for each worker node'
263 )
264 }
f0d7f803 265 if (
2324f8c9 266 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
267 !Object.values(Measurements).includes(
268 workerChoiceStrategyOptions.measurement
269 )
270 ) {
271 throw new Error(
272 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
273 )
274 }
0d80593b
JB
275 }
276
b5604034 277 private initializeEventEmitter (): void {
5b7d00b4 278 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
279 name: `poolifier:${this.type}-${this.worker}-pool`
280 })
281 }
282
08f3f44c 283 /** @inheritDoc */
6b27d407
JB
284 public get info (): PoolInfo {
285 return {
23ccf9d7 286 version,
6b27d407 287 type: this.type,
184855e6 288 worker: this.worker,
47352846 289 started: this.started,
2431bdb4
JB
290 ready: this.ready,
291 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
292 minSize: this.minSize,
293 maxSize: this.maxSize,
c05f0d50
JB
294 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
295 .runTime.aggregate &&
1305e9a8
JB
296 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
297 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
298 workerNodes: this.workerNodes.length,
299 idleWorkerNodes: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
f59e1027 301 workerNode.usage.tasks.executing === 0
a4e07f72
JB
302 ? accumulator + 1
303 : accumulator,
6b27d407
JB
304 0
305 ),
306 busyWorkerNodes: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
f59e1027 308 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
309 0
310 ),
a4e07f72 311 executedTasks: this.workerNodes.reduce(
6b27d407 312 (accumulator, workerNode) =>
f59e1027 313 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
314 0
315 ),
316 executingTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
f59e1027 318 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
319 0
320 ),
daf86646
JB
321 ...(this.opts.enableTasksQueue === true && {
322 queuedTasks: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
324 accumulator + workerNode.usage.tasks.queued,
325 0
326 )
327 }),
328 ...(this.opts.enableTasksQueue === true && {
329 maxQueuedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
332 0
333 )
334 }),
a1763c54
JB
335 ...(this.opts.enableTasksQueue === true && {
336 backPressure: this.hasBackPressure()
337 }),
68cbdc84
JB
338 ...(this.opts.enableTasksQueue === true && {
339 stolenTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.stolen,
342 0
343 )
344 }),
a4e07f72
JB
345 failedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
f59e1027 347 accumulator + workerNode.usage.tasks.failed,
a4e07f72 348 0
1dcf8b7b
JB
349 ),
350 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
351 .runTime.aggregate && {
352 runTime: {
98e72cda 353 minimum: round(
90d6701c 354 min(
98e72cda 355 ...this.workerNodes.map(
041dc05b 356 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 357 )
1dcf8b7b
JB
358 )
359 ),
98e72cda 360 maximum: round(
90d6701c 361 max(
98e72cda 362 ...this.workerNodes.map(
041dc05b 363 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 364 )
1dcf8b7b 365 )
98e72cda 366 ),
3baa0837
JB
367 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
368 .runTime.average && {
369 average: round(
370 average(
371 this.workerNodes.reduce<number[]>(
372 (accumulator, workerNode) =>
373 accumulator.concat(workerNode.usage.runTime.history),
374 []
375 )
98e72cda 376 )
dc021bcc 377 )
3baa0837 378 }),
98e72cda
JB
379 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
380 .runTime.median && {
381 median: round(
382 median(
3baa0837
JB
383 this.workerNodes.reduce<number[]>(
384 (accumulator, workerNode) =>
385 accumulator.concat(workerNode.usage.runTime.history),
386 []
98e72cda
JB
387 )
388 )
389 )
390 })
1dcf8b7b
JB
391 }
392 }),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.aggregate && {
395 waitTime: {
98e72cda 396 minimum: round(
90d6701c 397 min(
98e72cda 398 ...this.workerNodes.map(
041dc05b 399 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 400 )
1dcf8b7b
JB
401 )
402 ),
98e72cda 403 maximum: round(
90d6701c 404 max(
98e72cda 405 ...this.workerNodes.map(
041dc05b 406 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 407 )
1dcf8b7b 408 )
98e72cda 409 ),
3baa0837
JB
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.average && {
412 average: round(
413 average(
414 this.workerNodes.reduce<number[]>(
415 (accumulator, workerNode) =>
416 accumulator.concat(workerNode.usage.waitTime.history),
417 []
418 )
98e72cda 419 )
dc021bcc 420 )
3baa0837 421 }),
98e72cda
JB
422 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
423 .waitTime.median && {
424 median: round(
425 median(
3baa0837
JB
426 this.workerNodes.reduce<number[]>(
427 (accumulator, workerNode) =>
428 accumulator.concat(workerNode.usage.waitTime.history),
429 []
98e72cda
JB
430 )
431 )
432 )
433 })
1dcf8b7b
JB
434 }
435 })
6b27d407
JB
436 }
437 }
08f3f44c 438
aa9eede8
JB
439 /**
440 * The pool readiness boolean status.
441 */
2431bdb4
JB
442 private get ready (): boolean {
443 return (
b97d82d8
JB
444 this.workerNodes.reduce(
445 (accumulator, workerNode) =>
446 !workerNode.info.dynamic && workerNode.info.ready
447 ? accumulator + 1
448 : accumulator,
449 0
450 ) >= this.minSize
2431bdb4
JB
451 )
452 }
453
afe0d5bf 454 /**
aa9eede8 455 * The approximate pool utilization.
afe0d5bf
JB
456 *
457 * @returns The pool utilization.
458 */
459 private get utilization (): number {
8e5ca040 460 const poolTimeCapacity =
fe7d90db 461 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
462 const totalTasksRunTime = this.workerNodes.reduce(
463 (accumulator, workerNode) =>
71514351 464 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
465 0
466 )
467 const totalTasksWaitTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
71514351 469 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
470 0
471 )
8e5ca040 472 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
473 }
474
8881ae32 475 /**
aa9eede8 476 * The pool type.
8881ae32
JB
477 *
478 * If it is `'dynamic'`, it provides the `max` property.
479 */
480 protected abstract get type (): PoolType
481
184855e6 482 /**
aa9eede8 483 * The worker type.
184855e6
JB
484 */
485 protected abstract get worker (): WorkerType
486
c2ade475 487 /**
aa9eede8 488 * The pool minimum size.
c2ade475 489 */
8735b4e5
JB
490 protected get minSize (): number {
491 return this.numberOfWorkers
492 }
ff733df7
JB
493
494 /**
aa9eede8 495 * The pool maximum size.
ff733df7 496 */
8735b4e5
JB
497 protected get maxSize (): number {
498 return this.max ?? this.numberOfWorkers
499 }
a35560ba 500
6b813701
JB
501 /**
502 * Checks if the worker id sent in the received message from a worker is valid.
503 *
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
506 */
4d159167 507 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
508 if (message.workerId == null) {
509 throw new Error('Worker message received without worker id')
8e5a7cf9 510 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
511 throw new Error(
512 `Worker message received from unknown worker '${message.workerId}'`
513 )
514 }
515 }
516
ffcbbad8 517 /**
f06e48d8 518 * Gets the given worker its worker node key.
ffcbbad8
JB
519 *
520 * @param worker - The worker.
f59e1027 521 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 522 */
aad6fb64 523 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 524 return this.workerNodes.findIndex(
041dc05b 525 workerNode => workerNode.worker === worker
f06e48d8 526 )
bf9549ae
JB
527 }
528
aa9eede8
JB
529 /**
530 * Gets the worker node key given its worker id.
531 *
532 * @param workerId - The worker id.
aad6fb64 533 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 534 */
72ae84a2 535 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 536 return this.workerNodes.findIndex(
041dc05b 537 workerNode => workerNode.info.id === workerId
aad6fb64 538 )
aa9eede8
JB
539 }
540
afc003b2 541 /** @inheritDoc */
a35560ba 542 public setWorkerChoiceStrategy (
59219cbb
JB
543 workerChoiceStrategy: WorkerChoiceStrategy,
544 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 545 ): void {
bde6b5d7 546 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 547 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
548 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
549 this.opts.workerChoiceStrategy
550 )
551 if (workerChoiceStrategyOptions != null) {
552 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 }
aa9eede8 554 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 555 workerNode.resetUsage()
9edb9717 556 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 557 }
a20f0ba5
JB
558 }
559
560 /** @inheritDoc */
561 public setWorkerChoiceStrategyOptions (
562 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
563 ): void {
0d80593b 564 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
565 this.opts.workerChoiceStrategyOptions = {
566 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
567 ...workerChoiceStrategyOptions
568 }
a20f0ba5
JB
569 this.workerChoiceStrategyContext.setOptions(
570 this.opts.workerChoiceStrategyOptions
a35560ba
S
571 )
572 }
573
a20f0ba5 574 /** @inheritDoc */
8f52842f
JB
575 public enableTasksQueue (
576 enable: boolean,
577 tasksQueueOptions?: TasksQueueOptions
578 ): void {
a20f0ba5 579 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
580 this.unsetTaskStealing()
581 this.unsetTasksStealingOnBackPressure()
ef41a6e6 582 this.flushTasksQueues()
a20f0ba5
JB
583 }
584 this.opts.enableTasksQueue = enable
8f52842f 585 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
586 }
587
588 /** @inheritDoc */
8f52842f 589 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 590 if (this.opts.enableTasksQueue === true) {
bde6b5d7 591 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
592 this.opts.tasksQueueOptions =
593 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 594 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
595 if (this.opts.tasksQueueOptions.taskStealing === true) {
596 this.setTaskStealing()
597 } else {
598 this.unsetTaskStealing()
599 }
600 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
601 this.setTasksStealingOnBackPressure()
602 } else {
603 this.unsetTasksStealingOnBackPressure()
604 }
5baee0d7 605 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
606 delete this.opts.tasksQueueOptions
607 }
608 }
609
9b38ab2d
JB
610 private buildTasksQueueOptions (
611 tasksQueueOptions: TasksQueueOptions
612 ): TasksQueueOptions {
613 return {
614 ...{
615 size: Math.pow(this.maxSize, 2),
616 concurrency: 1,
617 taskStealing: true,
618 tasksStealingOnBackPressure: true
619 },
620 ...tasksQueueOptions
621 }
622 }
623
5b49e864 624 private setTasksQueueSize (size: number): void {
20c6f652 625 for (const workerNode of this.workerNodes) {
ff3f866a 626 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
627 }
628 }
629
d6ca1416
JB
630 private setTaskStealing (): void {
631 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 632 this.workerNodes[workerNodeKey].addEventListener(
65542a35
JB
633 'idleWorkerNode',
634 this.handleIdleWorkerNodeEvent as EventListener
9f95d5eb 635 )
d6ca1416
JB
636 }
637 }
638
639 private unsetTaskStealing (): void {
640 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 641 this.workerNodes[workerNodeKey].removeEventListener(
65542a35
JB
642 'idleWorkerNode',
643 this.handleIdleWorkerNodeEvent as EventListener
9f95d5eb 644 )
d6ca1416
JB
645 }
646 }
647
648 private setTasksStealingOnBackPressure (): void {
649 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 650 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 651 'backPressure',
9b85e2d0 652 this.handleBackPressureEvent as EventListener
9f95d5eb 653 )
d6ca1416
JB
654 }
655 }
656
657 private unsetTasksStealingOnBackPressure (): void {
658 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 659 this.workerNodes[workerNodeKey].removeEventListener(
b5e75be8 660 'backPressure',
9b85e2d0 661 this.handleBackPressureEvent as EventListener
9f95d5eb 662 )
d6ca1416
JB
663 }
664 }
665
c319c66b
JB
666 /**
667 * Whether the pool is full or not.
668 *
669 * The pool filling boolean status.
670 */
dea903a8
JB
671 protected get full (): boolean {
672 return this.workerNodes.length >= this.maxSize
673 }
c2ade475 674
c319c66b
JB
675 /**
676 * Whether the pool is busy or not.
677 *
678 * The pool busyness boolean status.
679 */
680 protected abstract get busy (): boolean
7c0ba920 681
6c6afb84 682 /**
3d76750a 683 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
684 *
685 * @returns Worker nodes busyness boolean status.
686 */
c2ade475 687 protected internalBusy (): boolean {
3d76750a
JB
688 if (this.opts.enableTasksQueue === true) {
689 return (
690 this.workerNodes.findIndex(
041dc05b 691 workerNode =>
3d76750a
JB
692 workerNode.info.ready &&
693 workerNode.usage.tasks.executing <
694 (this.opts.tasksQueueOptions?.concurrency as number)
695 ) === -1
696 )
3d76750a 697 }
419e8121
JB
698 return (
699 this.workerNodes.findIndex(
700 workerNode =>
701 workerNode.info.ready && workerNode.usage.tasks.executing === 0
702 ) === -1
703 )
cb70b19d
JB
704 }
705
e81c38f2 706 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
707 workerNodeKey: number,
708 message: MessageValue<Data>
709 ): Promise<boolean> {
72ae84a2 710 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
711 const taskFunctionOperationListener = (
712 message: MessageValue<Response>
713 ): void => {
714 this.checkMessageWorkerId(message)
715 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 716 if (
ae036c3e
JB
717 message.taskFunctionOperationStatus != null &&
718 message.workerId === workerId
72ae84a2 719 ) {
ae036c3e
JB
720 if (message.taskFunctionOperationStatus) {
721 resolve(true)
722 } else if (!message.taskFunctionOperationStatus) {
723 reject(
724 new Error(
725 `Task function operation '${
726 message.taskFunctionOperation as string
727 }' failed on worker ${message.workerId} with error: '${
728 message.workerError?.message as string
729 }'`
730 )
72ae84a2 731 )
ae036c3e
JB
732 }
733 this.deregisterWorkerMessageListener(
734 this.getWorkerNodeKeyByWorkerId(message.workerId),
735 taskFunctionOperationListener
72ae84a2
JB
736 )
737 }
ae036c3e
JB
738 }
739 this.registerWorkerMessageListener(
740 workerNodeKey,
741 taskFunctionOperationListener
742 )
72ae84a2
JB
743 this.sendToWorker(workerNodeKey, message)
744 })
745 }
746
747 private async sendTaskFunctionOperationToWorkers (
adee6053 748 message: MessageValue<Data>
e81c38f2
JB
749 ): Promise<boolean> {
750 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
751 const responsesReceived = new Array<MessageValue<Response>>()
752 const taskFunctionOperationsListener = (
753 message: MessageValue<Response>
754 ): void => {
755 this.checkMessageWorkerId(message)
756 if (message.taskFunctionOperationStatus != null) {
757 responsesReceived.push(message)
758 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 759 if (
e81c38f2
JB
760 responsesReceived.every(
761 message => message.taskFunctionOperationStatus === true
762 )
763 ) {
764 resolve(true)
765 } else if (
e81c38f2
JB
766 responsesReceived.some(
767 message => message.taskFunctionOperationStatus === false
768 )
769 ) {
b0b55f57
JB
770 const errorResponse = responsesReceived.find(
771 response => response.taskFunctionOperationStatus === false
772 )
e81c38f2
JB
773 reject(
774 new Error(
b0b55f57 775 `Task function operation '${
e81c38f2 776 message.taskFunctionOperation as string
b0b55f57
JB
777 }' failed on worker ${
778 errorResponse?.workerId as number
779 } with error: '${
780 errorResponse?.workerError?.message as string
781 }'`
e81c38f2
JB
782 )
783 )
784 }
ae036c3e
JB
785 this.deregisterWorkerMessageListener(
786 this.getWorkerNodeKeyByWorkerId(message.workerId),
787 taskFunctionOperationsListener
788 )
e81c38f2 789 }
ae036c3e
JB
790 }
791 }
792 for (const [workerNodeKey] of this.workerNodes.entries()) {
793 this.registerWorkerMessageListener(
794 workerNodeKey,
795 taskFunctionOperationsListener
796 )
72ae84a2 797 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
798 }
799 })
6703b9f4
JB
800 }
801
802 /** @inheritDoc */
803 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
804 for (const workerNode of this.workerNodes) {
805 if (
806 Array.isArray(workerNode.info.taskFunctionNames) &&
807 workerNode.info.taskFunctionNames.includes(name)
808 ) {
809 return true
810 }
811 }
812 return false
6703b9f4
JB
813 }
814
815 /** @inheritDoc */
e81c38f2
JB
816 public async addTaskFunction (
817 name: string,
3feeab69 818 fn: TaskFunction<Data, Response>
e81c38f2 819 ): Promise<boolean> {
3feeab69
JB
820 if (typeof name !== 'string') {
821 throw new TypeError('name argument must be a string')
822 }
823 if (typeof name === 'string' && name.trim().length === 0) {
824 throw new TypeError('name argument must not be an empty string')
825 }
826 if (typeof fn !== 'function') {
827 throw new TypeError('fn argument must be a function')
828 }
adee6053 829 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
830 taskFunctionOperation: 'add',
831 taskFunctionName: name,
3feeab69 832 taskFunction: fn.toString()
6703b9f4 833 })
adee6053
JB
834 this.taskFunctions.set(name, fn)
835 return opResult
6703b9f4
JB
836 }
837
838 /** @inheritDoc */
e81c38f2 839 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
840 if (!this.taskFunctions.has(name)) {
841 throw new Error(
16248b23 842 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
843 )
844 }
adee6053 845 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
846 taskFunctionOperation: 'remove',
847 taskFunctionName: name
848 })
adee6053
JB
849 this.deleteTaskFunctionWorkerUsages(name)
850 this.taskFunctions.delete(name)
851 return opResult
6703b9f4
JB
852 }
853
90d7d101 854 /** @inheritDoc */
6703b9f4 855 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
856 for (const workerNode of this.workerNodes) {
857 if (
6703b9f4
JB
858 Array.isArray(workerNode.info.taskFunctionNames) &&
859 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 860 ) {
6703b9f4 861 return workerNode.info.taskFunctionNames
f2dbbf95 862 }
90d7d101 863 }
f2dbbf95 864 return []
90d7d101
JB
865 }
866
6703b9f4 867 /** @inheritDoc */
e81c38f2 868 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 869 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
870 taskFunctionOperation: 'default',
871 taskFunctionName: name
872 })
6703b9f4
JB
873 }
874
adee6053
JB
875 private deleteTaskFunctionWorkerUsages (name: string): void {
876 for (const workerNode of this.workerNodes) {
877 workerNode.deleteTaskFunctionWorkerUsage(name)
878 }
879 }
880
375f7504
JB
881 private shallExecuteTask (workerNodeKey: number): boolean {
882 return (
883 this.tasksQueueSize(workerNodeKey) === 0 &&
884 this.workerNodes[workerNodeKey].usage.tasks.executing <
885 (this.opts.tasksQueueOptions?.concurrency as number)
886 )
887 }
888
afc003b2 889 /** @inheritDoc */
7d91a8cd
JB
890 public async execute (
891 data?: Data,
892 name?: string,
893 transferList?: TransferListItem[]
894 ): Promise<Response> {
52b71763 895 return await new Promise<Response>((resolve, reject) => {
15b176e0 896 if (!this.started) {
47352846 897 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 898 return
15b176e0 899 }
711623b8
JB
900 if (this.destroying) {
901 reject(new Error('Cannot execute a task on destroying pool'))
902 return
903 }
7d91a8cd
JB
904 if (name != null && typeof name !== 'string') {
905 reject(new TypeError('name argument must be a string'))
9d2d0da1 906 return
7d91a8cd 907 }
90d7d101
JB
908 if (
909 name != null &&
910 typeof name === 'string' &&
911 name.trim().length === 0
912 ) {
f58b60b9 913 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 914 return
90d7d101 915 }
b558f6b5
JB
916 if (transferList != null && !Array.isArray(transferList)) {
917 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 918 return
b558f6b5
JB
919 }
920 const timestamp = performance.now()
921 const workerNodeKey = this.chooseWorkerNode()
501aea93 922 const task: Task<Data> = {
52b71763
JB
923 name: name ?? DEFAULT_TASK_NAME,
924 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
925 data: data ?? ({} as Data),
7d91a8cd 926 transferList,
52b71763 927 timestamp,
7629bdf1 928 taskId: randomUUID()
52b71763 929 }
7629bdf1 930 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
931 resolve,
932 reject,
501aea93 933 workerNodeKey
2e81254d 934 })
52b71763 935 if (
4e377863
JB
936 this.opts.enableTasksQueue === false ||
937 (this.opts.enableTasksQueue === true &&
375f7504 938 this.shallExecuteTask(workerNodeKey))
52b71763 939 ) {
501aea93 940 this.executeTask(workerNodeKey, task)
4e377863
JB
941 } else {
942 this.enqueueTask(workerNodeKey, task)
52b71763 943 }
2e81254d 944 })
280c2a77 945 }
c97c7edb 946
47352846
JB
947 /** @inheritdoc */
948 public start (): void {
711623b8
JB
949 if (this.started) {
950 throw new Error('Cannot start an already started pool')
951 }
952 if (this.starting) {
953 throw new Error('Cannot start an already starting pool')
954 }
955 if (this.destroying) {
956 throw new Error('Cannot start a destroying pool')
957 }
47352846
JB
958 this.starting = true
959 while (
960 this.workerNodes.reduce(
961 (accumulator, workerNode) =>
962 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
963 0
964 ) < this.numberOfWorkers
965 ) {
966 this.createAndSetupWorkerNode()
967 }
968 this.starting = false
969 this.started = true
970 }
971
afc003b2 972 /** @inheritDoc */
c97c7edb 973 public async destroy (): Promise<void> {
711623b8
JB
974 if (!this.started) {
975 throw new Error('Cannot destroy an already destroyed pool')
976 }
977 if (this.starting) {
978 throw new Error('Cannot destroy an starting pool')
979 }
980 if (this.destroying) {
981 throw new Error('Cannot destroy an already destroying pool')
982 }
983 this.destroying = true
1fbcaa7c 984 await Promise.all(
81c02522 985 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 986 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
987 })
988 )
33e6bb4c 989 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 990 this.emitter?.emitDestroy()
55082af9 991 this.readyEventEmitted = false
711623b8 992 this.destroying = false
15b176e0 993 this.started = false
c97c7edb
S
994 }
995
1e3214b6 996 protected async sendKillMessageToWorker (
72ae84a2 997 workerNodeKey: number
1e3214b6 998 ): Promise<void> {
9edb9717 999 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
1000 const killMessageListener = (message: MessageValue<Response>): void => {
1001 this.checkMessageWorkerId(message)
1e3214b6
JB
1002 if (message.kill === 'success') {
1003 resolve()
1004 } else if (message.kill === 'failure') {
72ae84a2
JB
1005 reject(
1006 new Error(
ae036c3e 1007 `Kill message handling failed on worker ${
72ae84a2 1008 message.workerId as number
ae036c3e 1009 }`
72ae84a2
JB
1010 )
1011 )
1e3214b6 1012 }
ae036c3e 1013 }
51d9cfbd 1014 // FIXME: should be registered only once
ae036c3e 1015 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1016 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1017 })
1e3214b6
JB
1018 }
1019
4a6952ff 1020 /**
aa9eede8 1021 * Terminates the worker node given its worker node key.
4a6952ff 1022 *
aa9eede8 1023 * @param workerNodeKey - The worker node key.
4a6952ff 1024 */
81c02522 1025 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1026
729c563d 1027 /**
6677a3d3
JB
1028 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1029 * Can be overridden.
afc003b2
JB
1030 *
1031 * @virtual
729c563d 1032 */
280c2a77 1033 protected setupHook (): void {
965df41c 1034 /* Intentionally empty */
280c2a77 1035 }
c97c7edb 1036
729c563d 1037 /**
280c2a77
S
1038 * Should return whether the worker is the main worker or not.
1039 */
1040 protected abstract isMain (): boolean
1041
1042 /**
2e81254d 1043 * Hook executed before the worker task execution.
bf9549ae 1044 * Can be overridden.
729c563d 1045 *
f06e48d8 1046 * @param workerNodeKey - The worker node key.
1c6fe997 1047 * @param task - The task to execute.
729c563d 1048 */
1c6fe997
JB
1049 protected beforeTaskExecutionHook (
1050 workerNodeKey: number,
1051 task: Task<Data>
1052 ): void {
94407def
JB
1053 if (this.workerNodes[workerNodeKey]?.usage != null) {
1054 const workerUsage = this.workerNodes[workerNodeKey].usage
1055 ++workerUsage.tasks.executing
1056 this.updateWaitTimeWorkerUsage(workerUsage, task)
1057 }
1058 if (
1059 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1060 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1061 task.name as string
1062 ) != null
1063 ) {
db0e38ee 1064 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1065 workerNodeKey
db0e38ee 1066 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1067 ++taskFunctionWorkerUsage.tasks.executing
1068 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1069 }
c97c7edb
S
1070 }
1071
c01733f1 1072 /**
2e81254d 1073 * Hook executed after the worker task execution.
bf9549ae 1074 * Can be overridden.
c01733f1 1075 *
501aea93 1076 * @param workerNodeKey - The worker node key.
38e795c1 1077 * @param message - The received message.
c01733f1 1078 */
2e81254d 1079 protected afterTaskExecutionHook (
501aea93 1080 workerNodeKey: number,
2740a743 1081 message: MessageValue<Response>
bf9549ae 1082 ): void {
94407def
JB
1083 if (this.workerNodes[workerNodeKey]?.usage != null) {
1084 const workerUsage = this.workerNodes[workerNodeKey].usage
1085 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1086 this.updateRunTimeWorkerUsage(workerUsage, message)
1087 this.updateEluWorkerUsage(workerUsage, message)
1088 }
1089 if (
1090 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1091 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1092 message.taskPerformance?.name as string
94407def
JB
1093 ) != null
1094 ) {
db0e38ee 1095 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1096 workerNodeKey
db0e38ee 1097 ].getTaskFunctionWorkerUsage(
0628755c 1098 message.taskPerformance?.name as string
b558f6b5 1099 ) as WorkerUsage
db0e38ee
JB
1100 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1101 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1102 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1103 }
1104 }
1105
db0e38ee
JB
1106 /**
1107 * Whether the worker node shall update its task function worker usage or not.
1108 *
1109 * @param workerNodeKey - The worker node key.
1110 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1111 */
1112 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1113 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1114 return (
94407def 1115 workerInfo != null &&
6703b9f4
JB
1116 Array.isArray(workerInfo.taskFunctionNames) &&
1117 workerInfo.taskFunctionNames.length > 2
b558f6b5 1118 )
f1c06930
JB
1119 }
1120
1121 private updateTaskStatisticsWorkerUsage (
1122 workerUsage: WorkerUsage,
1123 message: MessageValue<Response>
1124 ): void {
a4e07f72 1125 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1126 if (
1127 workerTaskStatistics.executing != null &&
1128 workerTaskStatistics.executing > 0
1129 ) {
1130 --workerTaskStatistics.executing
5bb5be17 1131 }
6703b9f4 1132 if (message.workerError == null) {
98e72cda
JB
1133 ++workerTaskStatistics.executed
1134 } else {
a4e07f72 1135 ++workerTaskStatistics.failed
2740a743 1136 }
f8eb0a2a
JB
1137 }
1138
a4e07f72
JB
1139 private updateRunTimeWorkerUsage (
1140 workerUsage: WorkerUsage,
f8eb0a2a
JB
1141 message: MessageValue<Response>
1142 ): void {
6703b9f4 1143 if (message.workerError != null) {
dc021bcc
JB
1144 return
1145 }
e4f20deb
JB
1146 updateMeasurementStatistics(
1147 workerUsage.runTime,
1148 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1149 message.taskPerformance?.runTime ?? 0
e4f20deb 1150 )
f8eb0a2a
JB
1151 }
1152
a4e07f72
JB
1153 private updateWaitTimeWorkerUsage (
1154 workerUsage: WorkerUsage,
1c6fe997 1155 task: Task<Data>
f8eb0a2a 1156 ): void {
1c6fe997
JB
1157 const timestamp = performance.now()
1158 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1159 updateMeasurementStatistics(
1160 workerUsage.waitTime,
1161 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1162 taskWaitTime
e4f20deb 1163 )
c01733f1 1164 }
1165
a4e07f72 1166 private updateEluWorkerUsage (
5df69fab 1167 workerUsage: WorkerUsage,
62c15a68
JB
1168 message: MessageValue<Response>
1169 ): void {
6703b9f4 1170 if (message.workerError != null) {
dc021bcc
JB
1171 return
1172 }
008512c7
JB
1173 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1174 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1175 updateMeasurementStatistics(
1176 workerUsage.elu.active,
008512c7 1177 eluTaskStatisticsRequirements,
dc021bcc 1178 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1179 )
1180 updateMeasurementStatistics(
1181 workerUsage.elu.idle,
008512c7 1182 eluTaskStatisticsRequirements,
dc021bcc 1183 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1184 )
008512c7 1185 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1186 if (message.taskPerformance?.elu != null) {
f7510105
JB
1187 if (workerUsage.elu.utilization != null) {
1188 workerUsage.elu.utilization =
1189 (workerUsage.elu.utilization +
1190 message.taskPerformance.elu.utilization) /
1191 2
1192 } else {
1193 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1194 }
62c15a68
JB
1195 }
1196 }
1197 }
1198
280c2a77 1199 /**
f06e48d8 1200 * Chooses a worker node for the next task.
280c2a77 1201 *
6c6afb84 1202 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1203 *
aa9eede8 1204 * @returns The chosen worker node key
280c2a77 1205 */
6c6afb84 1206 private chooseWorkerNode (): number {
930dcf12 1207 if (this.shallCreateDynamicWorker()) {
aa9eede8 1208 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1209 if (
b1aae695 1210 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1211 ) {
aa9eede8 1212 return workerNodeKey
6c6afb84 1213 }
17393ac8 1214 }
930dcf12
JB
1215 return this.workerChoiceStrategyContext.execute()
1216 }
1217
6c6afb84
JB
1218 /**
1219 * Conditions for dynamic worker creation.
1220 *
1221 * @returns Whether to create a dynamic worker or not.
1222 */
1223 private shallCreateDynamicWorker (): boolean {
930dcf12 1224 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1225 }
1226
280c2a77 1227 /**
aa9eede8 1228 * Sends a message to worker given its worker node key.
280c2a77 1229 *
aa9eede8 1230 * @param workerNodeKey - The worker node key.
38e795c1 1231 * @param message - The message.
7d91a8cd 1232 * @param transferList - The optional array of transferable objects.
280c2a77
S
1233 */
1234 protected abstract sendToWorker (
aa9eede8 1235 workerNodeKey: number,
7d91a8cd
JB
1236 message: MessageValue<Data>,
1237 transferList?: TransferListItem[]
280c2a77
S
1238 ): void
1239
729c563d 1240 /**
41344292 1241 * Creates a new worker.
6c6afb84
JB
1242 *
1243 * @returns Newly created worker.
729c563d 1244 */
280c2a77 1245 protected abstract createWorker (): Worker
c97c7edb 1246
4a6952ff 1247 /**
aa9eede8 1248 * Creates a new, completely set up worker node.
4a6952ff 1249 *
aa9eede8 1250 * @returns New, completely set up worker node key.
4a6952ff 1251 */
aa9eede8 1252 protected createAndSetupWorkerNode (): number {
bdacc2d2 1253 const worker = this.createWorker()
280c2a77 1254
fd04474e 1255 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1256 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1257 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1258 worker.on('error', error => {
aad6fb64 1259 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1260 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1261 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1262 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1263 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1264 if (
b6bfca01 1265 this.started &&
9b38ab2d 1266 !this.starting &&
711623b8 1267 !this.destroying &&
9b38ab2d 1268 this.opts.restartWorkerOnError === true
15b176e0 1269 ) {
9b106837 1270 if (workerInfo.dynamic) {
aa9eede8 1271 this.createAndSetupDynamicWorkerNode()
8a1260a3 1272 } else {
aa9eede8 1273 this.createAndSetupWorkerNode()
8a1260a3 1274 }
5baee0d7 1275 }
6d59c3a3 1276 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1277 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1278 }
5baee0d7 1279 })
a35560ba 1280 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1281 worker.once('exit', () => {
f06e48d8 1282 this.removeWorkerNode(worker)
a974afa6 1283 })
280c2a77 1284
aa9eede8 1285 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1286
aa9eede8 1287 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1288
aa9eede8 1289 return workerNodeKey
c97c7edb 1290 }
be0676b3 1291
930dcf12 1292 /**
aa9eede8 1293 * Creates a new, completely set up dynamic worker node.
930dcf12 1294 *
aa9eede8 1295 * @returns New, completely set up dynamic worker node key.
930dcf12 1296 */
aa9eede8
JB
1297 protected createAndSetupDynamicWorkerNode (): number {
1298 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1299 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1300 this.checkMessageWorkerId(message)
aa9eede8
JB
1301 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1302 message.workerId
aad6fb64 1303 )
aa9eede8 1304 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1305 // Kill message received from worker
930dcf12
JB
1306 if (
1307 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1308 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1309 ((this.opts.enableTasksQueue === false &&
aa9eede8 1310 workerUsage.tasks.executing === 0) ||
7b56f532 1311 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1312 workerUsage.tasks.executing === 0 &&
1313 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1314 ) {
f3827d5d 1315 // Flag the worker node as not ready immediately
ae3ab61d 1316 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1317 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1318 this.emitter?.emit(PoolEvents.error, error)
1319 })
930dcf12
JB
1320 }
1321 })
46b0bb09 1322 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1323 this.sendToWorker(workerNodeKey, {
e9dd5b66 1324 checkActive: true
21f710aa 1325 })
72ae84a2
JB
1326 if (this.taskFunctions.size > 0) {
1327 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1328 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1329 taskFunctionOperation: 'add',
1330 taskFunctionName,
1331 taskFunction: taskFunction.toString()
1332 }).catch(error => {
1333 this.emitter?.emit(PoolEvents.error, error)
1334 })
1335 }
1336 }
b5e113f6 1337 workerInfo.dynamic = true
b1aae695
JB
1338 if (
1339 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1340 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1341 ) {
b5e113f6
JB
1342 workerInfo.ready = true
1343 }
33e6bb4c 1344 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1345 return workerNodeKey
930dcf12
JB
1346 }
1347
a2ed5053 1348 /**
aa9eede8 1349 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1350 *
aa9eede8 1351 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1352 * @param listener - The message listener callback.
1353 */
85aeb3f3
JB
1354 protected abstract registerWorkerMessageListener<
1355 Message extends Data | Response
aa9eede8
JB
1356 >(
1357 workerNodeKey: number,
1358 listener: (message: MessageValue<Message>) => void
1359 ): void
a2ed5053 1360
ae036c3e
JB
1361 /**
1362 * Registers once a listener callback on the worker given its worker node key.
1363 *
1364 * @param workerNodeKey - The worker node key.
1365 * @param listener - The message listener callback.
1366 */
1367 protected abstract registerOnceWorkerMessageListener<
1368 Message extends Data | Response
1369 >(
1370 workerNodeKey: number,
1371 listener: (message: MessageValue<Message>) => void
1372 ): void
1373
1374 /**
1375 * Deregisters a listener callback on the worker given its worker node key.
1376 *
1377 * @param workerNodeKey - The worker node key.
1378 * @param listener - The message listener callback.
1379 */
1380 protected abstract deregisterWorkerMessageListener<
1381 Message extends Data | Response
1382 >(
1383 workerNodeKey: number,
1384 listener: (message: MessageValue<Message>) => void
1385 ): void
1386
a2ed5053 1387 /**
aa9eede8 1388 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1389 * Can be overridden.
1390 *
aa9eede8 1391 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1392 */
aa9eede8 1393 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1394 // Listen to worker messages.
fcd39179
JB
1395 this.registerWorkerMessageListener(
1396 workerNodeKey,
1397 this.workerMessageListener.bind(this)
1398 )
85aeb3f3 1399 // Send the startup message to worker.
aa9eede8 1400 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1401 // Send the statistics message to worker.
1402 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1403 if (this.opts.enableTasksQueue === true) {
dbd73092 1404 if (this.opts.tasksQueueOptions?.taskStealing === true) {
9f95d5eb 1405 this.workerNodes[workerNodeKey].addEventListener(
65542a35
JB
1406 'idleWorkerNode',
1407 this.handleIdleWorkerNodeEvent as EventListener
9f95d5eb 1408 )
47352846
JB
1409 }
1410 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
9f95d5eb 1411 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 1412 'backPressure',
9b85e2d0 1413 this.handleBackPressureEvent as EventListener
9f95d5eb 1414 )
47352846 1415 }
72695f86 1416 }
d2c73f82
JB
1417 }
1418
85aeb3f3 1419 /**
aa9eede8
JB
1420 * Sends the startup message to worker given its worker node key.
1421 *
1422 * @param workerNodeKey - The worker node key.
1423 */
1424 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1425
1426 /**
9edb9717 1427 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1428 *
aa9eede8 1429 * @param workerNodeKey - The worker node key.
85aeb3f3 1430 */
9edb9717 1431 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1432 this.sendToWorker(workerNodeKey, {
1433 statistics: {
1434 runTime:
1435 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1436 .runTime.aggregate,
1437 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1438 .elu.aggregate
72ae84a2 1439 }
aa9eede8
JB
1440 })
1441 }
a2ed5053
JB
1442
1443 private redistributeQueuedTasks (workerNodeKey: number): void {
1444 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1445 const destinationWorkerNodeKey = this.workerNodes.reduce(
1446 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1447 return workerNode.info.ready &&
1448 workerNode.usage.tasks.queued <
1449 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1450 ? workerNodeKey
1451 : minWorkerNodeKey
1452 },
1453 0
1454 )
72ae84a2 1455 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1456 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1457 this.executeTask(destinationWorkerNodeKey, task)
1458 } else {
1459 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1460 }
1461 }
1462 }
1463
b1838604
JB
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey: number,
b1838604
JB
1466 taskName: string
1467 ): void {
1a880eca 1468 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1469 if (workerNode?.usage != null) {
1470 ++workerNode.usage.tasks.stolen
1471 }
1472 if (
1473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1474 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1475 ) {
1476 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1477 taskName
1478 ) as WorkerUsage
1479 ++taskFunctionWorkerUsage.tasks.stolen
1480 }
1481 }
1482
463226a4
JB
1483 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
1484 workerNodeKey: number,
1485 taskName: string
1486 ): void {
1487 const workerNode = this.workerNodes[workerNodeKey]
1488 if (workerNode?.usage != null) {
1489 ++workerNode.usage.tasks.sequentiallyStolen
1490 }
1491 if (
1492 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1493 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1494 ) {
1495 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1496 taskName
1497 ) as WorkerUsage
1498 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1499 }
1500 }
1501
1502 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
1503 workerNodeKey: number,
1504 taskName: string
1505 ): void {
1506 const workerNode = this.workerNodes[workerNodeKey]
1507 if (workerNode?.usage != null) {
1508 workerNode.usage.tasks.sequentiallyStolen = 0
1509 }
1510 if (
1511 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1512 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1513 ) {
1514 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1515 taskName
1516 ) as WorkerUsage
1517 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1518 }
1519 }
1520
65542a35 1521 private readonly handleIdleWorkerNodeEvent = (
463226a4
JB
1522 event: CustomEvent<WorkerNodeEventDetail>,
1523 previousStolenTask?: Task<Data>
9f95d5eb 1524 ): void => {
463226a4
JB
1525 const { workerNodeKey } = event.detail
1526 if (workerNodeKey == null) {
1527 throw new Error(
1528 'WorkerNode event detail workerNodeKey attribute must be defined'
1529 )
1530 }
1531 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1532 if (
1533 previousStolenTask != null &&
1534 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1535 (workerNodeTasksUsage.executing > 0 ||
1536 this.tasksQueueSize(workerNodeKey) > 0)
1537 ) {
1538 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(
1539 workerNodeKey,
1540 previousStolenTask.name as string
1541 )
1542 return
1543 }
1544 const stolenTask = this.workerNodeStealTask(workerNodeKey)
1545 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1546 .then(() => {
1547 this.handleIdleWorkerNodeEvent(event, stolenTask)
1548 return undefined
1549 })
1550 .catch(EMPTY_FUNCTION)
1551 }
1552
1553 private readonly workerNodeStealTask = (
1554 workerNodeKey: number
1555 ): Task<Data> | undefined => {
dd951876 1556 const workerNodes = this.workerNodes
a6b3272b 1557 .slice()
dd951876
JB
1558 .sort(
1559 (workerNodeA, workerNodeB) =>
1560 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1561 )
f201a0cd 1562 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1563 (sourceWorkerNode, sourceWorkerNodeKey) =>
1564 sourceWorkerNode.info.ready &&
1565 sourceWorkerNodeKey !== workerNodeKey &&
1566 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1567 )
1568 if (sourceWorkerNode != null) {
72ae84a2 1569 const task = sourceWorkerNode.popTask() as Task<Data>
463226a4
JB
1570 if (this.shallExecuteTask(workerNodeKey)) {
1571 this.executeTask(workerNodeKey, task)
f201a0cd 1572 } else {
463226a4 1573 this.enqueueTask(workerNodeKey, task)
72695f86 1574 }
463226a4
JB
1575 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(
1576 workerNodeKey,
1577 task.name as string
1578 )
f201a0cd 1579 this.updateTaskStolenStatisticsWorkerUsage(
463226a4 1580 workerNodeKey,
f201a0cd
JB
1581 task.name as string
1582 )
463226a4 1583 return task
72695f86
JB
1584 }
1585 }
1586
9f95d5eb
JB
1587 private readonly handleBackPressureEvent = (
1588 event: CustomEvent<WorkerNodeEventDetail>
1589 ): void => {
b5e75be8 1590 const { workerId } = event.detail
f778c355
JB
1591 const sizeOffset = 1
1592 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1593 return
1594 }
72695f86 1595 const sourceWorkerNode =
b5e75be8 1596 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1597 const workerNodes = this.workerNodes
a6b3272b 1598 .slice()
72695f86
JB
1599 .sort(
1600 (workerNodeA, workerNodeB) =>
1601 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1602 )
1603 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1604 if (
0bc68267 1605 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1606 workerNode.info.ready &&
b5e75be8 1607 workerNode.info.id !== workerId &&
0bc68267 1608 workerNode.usage.tasks.queued <
f778c355 1609 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1610 ) {
72ae84a2 1611 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1612 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1613 this.executeTask(workerNodeKey, task)
4de3d785 1614 } else {
dd951876 1615 this.enqueueTask(workerNodeKey, task)
4de3d785 1616 }
b1838604
JB
1617 this.updateTaskStolenStatisticsWorkerUsage(
1618 workerNodeKey,
b1838604
JB
1619 task.name as string
1620 )
10ecf8fd 1621 }
a2ed5053
JB
1622 }
1623 }
1624
be0676b3 1625 /**
fcd39179 1626 * This method is the message listener registered on each worker.
be0676b3 1627 */
fcd39179
JB
1628 protected workerMessageListener (message: MessageValue<Response>): void {
1629 this.checkMessageWorkerId(message)
1630 if (message.ready != null && message.taskFunctionNames != null) {
1631 // Worker ready response received from worker
1632 this.handleWorkerReadyResponse(message)
1633 } else if (message.taskId != null) {
1634 // Task execution response received from worker
1635 this.handleTaskExecutionResponse(message)
1636 } else if (message.taskFunctionNames != null) {
1637 // Task function names message received from worker
1638 this.getWorkerInfo(
1639 this.getWorkerNodeKeyByWorkerId(message.workerId)
1640 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1641 }
1642 }
1643
10e2aa7e 1644 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4
JB
1645 const { workerId, ready, taskFunctionNames } = message
1646 if (ready === false) {
1647 throw new Error(`Worker ${workerId as number} failed to initialize`)
f05ed93c 1648 }
a5d15204 1649 const workerInfo = this.getWorkerInfo(
463226a4 1650 this.getWorkerNodeKeyByWorkerId(workerId)
46b0bb09 1651 )
463226a4
JB
1652 workerInfo.ready = ready as boolean
1653 workerInfo.taskFunctionNames = taskFunctionNames
55082af9
JB
1654 if (!this.readyEventEmitted && this.ready) {
1655 this.readyEventEmitted = true
1656 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1657 }
6b272951
JB
1658 }
1659
1660 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1661 const { workerId, taskId, workerError, data } = message
5441aea6 1662 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1663 if (promiseResponse != null) {
51af50ef 1664 const { resolve, reject, workerNodeKey } = promiseResponse
6703b9f4
JB
1665 if (workerError != null) {
1666 this.emitter?.emit(PoolEvents.taskError, workerError)
51af50ef 1667 reject(workerError.message)
6b272951 1668 } else {
51af50ef 1669 resolve(data as Response)
6b272951 1670 }
501aea93 1671 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1672 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1673 this.promiseResponseMap.delete(taskId as string)
463226a4
JB
1674 if (this.opts.enableTasksQueue === true) {
1675 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1676 if (
1677 this.tasksQueueSize(workerNodeKey) > 0 &&
1678 workerNodeTasksUsage.executing <
1679 (this.opts.tasksQueueOptions?.concurrency as number)
1680 ) {
1681 this.executeTask(
1682 workerNodeKey,
1683 this.dequeueTask(workerNodeKey) as Task<Data>
1684 )
1685 }
1686 if (
1687 workerNodeTasksUsage.executing === 0 &&
1688 this.tasksQueueSize(workerNodeKey) === 0 &&
1689 workerNodeTasksUsage.sequentiallyStolen === 0
1690 ) {
1691 this.workerNodes[workerNodeKey].dispatchEvent(
1692 new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
1693 detail: { workerId: workerId as number, workerNodeKey }
1694 })
1695 )
1696 }
be0676b3
APA
1697 }
1698 }
be0676b3 1699 }
7c0ba920 1700
a1763c54 1701 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1702 if (this.busy) {
1703 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1704 }
1705 }
1706
1707 private checkAndEmitTaskQueuingEvents (): void {
1708 if (this.hasBackPressure()) {
1709 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1710 }
1711 }
1712
33e6bb4c
JB
1713 private checkAndEmitDynamicWorkerCreationEvents (): void {
1714 if (this.type === PoolTypes.dynamic) {
1715 if (this.full) {
1716 this.emitter?.emit(PoolEvents.full, this.info)
1717 }
1718 }
1719 }
1720
8a1260a3 1721 /**
aa9eede8 1722 * Gets the worker information given its worker node key.
8a1260a3
JB
1723 *
1724 * @param workerNodeKey - The worker node key.
3f09ed9f 1725 * @returns The worker information.
8a1260a3 1726 */
46b0bb09 1727 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1728 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1729 }
1730
a05c10de 1731 /**
b0a4db63 1732 * Adds the given worker in the pool worker nodes.
ea7a90d3 1733 *
38e795c1 1734 * @param worker - The worker.
aa9eede8
JB
1735 * @returns The added worker node key.
1736 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1737 */
b0a4db63 1738 private addWorkerNode (worker: Worker): number {
671d5154
JB
1739 const workerNode = new WorkerNode<Worker, Data>(
1740 worker,
ff3f866a 1741 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1742 )
b97d82d8 1743 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1744 if (this.starting) {
1745 workerNode.info.ready = true
1746 }
aa9eede8 1747 this.workerNodes.push(workerNode)
aad6fb64 1748 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1749 if (workerNodeKey === -1) {
86ed0598 1750 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1751 }
1752 return workerNodeKey
ea7a90d3 1753 }
c923ce56 1754
51fe3d3c 1755 /**
f06e48d8 1756 * Removes the given worker from the pool worker nodes.
51fe3d3c 1757 *
f06e48d8 1758 * @param worker - The worker.
51fe3d3c 1759 */
416fd65c 1760 private removeWorkerNode (worker: Worker): void {
aad6fb64 1761 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1762 if (workerNodeKey !== -1) {
1763 this.workerNodes.splice(workerNodeKey, 1)
1764 this.workerChoiceStrategyContext.remove(workerNodeKey)
1765 }
51fe3d3c 1766 }
adc3c320 1767
ae3ab61d
JB
1768 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1769 this.getWorkerInfo(workerNodeKey).ready = false
1770 }
1771
e2b31e32
JB
1772 /** @inheritDoc */
1773 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1774 return (
e2b31e32
JB
1775 this.opts.enableTasksQueue === true &&
1776 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1777 )
1778 }
1779
1780 private hasBackPressure (): boolean {
1781 return (
1782 this.opts.enableTasksQueue === true &&
1783 this.workerNodes.findIndex(
041dc05b 1784 workerNode => !workerNode.hasBackPressure()
a1763c54 1785 ) === -1
9e844245 1786 )
e2b31e32
JB
1787 }
1788
b0a4db63 1789 /**
aa9eede8 1790 * Executes the given task on the worker given its worker node key.
b0a4db63 1791 *
aa9eede8 1792 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1793 * @param task - The task to execute.
1794 */
2e81254d 1795 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1796 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1797 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1798 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1799 }
1800
f9f00b5f 1801 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1802 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1803 this.checkAndEmitTaskQueuingEvents()
1804 return tasksQueueSize
adc3c320
JB
1805 }
1806
416fd65c 1807 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1808 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1809 }
1810
416fd65c 1811 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1812 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1813 }
1814
81c02522 1815 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1816 while (this.tasksQueueSize(workerNodeKey) > 0) {
1817 this.executeTask(
1818 workerNodeKey,
1819 this.dequeueTask(workerNodeKey) as Task<Data>
1820 )
ff733df7 1821 }
4b628b48 1822 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1823 }
1824
ef41a6e6
JB
1825 private flushTasksQueues (): void {
1826 for (const [workerNodeKey] of this.workerNodes.entries()) {
1827 this.flushTasksQueue(workerNodeKey)
1828 }
1829 }
c97c7edb 1830}