refactor: use object destructuration in worker message listener
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
bbeadd16 23} from '../utils'
59317253 24import { KillBehaviors } from '../worker/worker-options'
6703b9f4 25import type { TaskFunction } from '../worker/task-functions'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
f1f77f45 38 TaskStatistics,
4d159167 39 WorkerInfo,
9f95d5eb 40 WorkerNodeEventDetail,
4d159167
JB
41 WorkerType,
42 WorkerUsage
e102732c 43} from './worker'
a35560ba 44import {
008512c7 45 type MeasurementStatisticsRequirements,
f0d7f803 46 Measurements,
a35560ba 47 WorkerChoiceStrategies,
a20f0ba5
JB
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
bdaf31cd
JB
50} from './selection-strategies/selection-strategies-types'
51import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 52import { version } from './version'
4b628b48 53import { WorkerNode } from './worker-node'
bde6b5d7
JB
54import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
bfc75cca
JB
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
bde6b5d7 59} from './utils'
23ccf9d7 60
729c563d 61/**
ea7a90d3 62 * Base class that implements some shared logic for all poolifier pools.
729c563d 63 *
38e795c1 64 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 67 */
c97c7edb 68export abstract class AbstractPool<
f06e48d8 69 Worker extends IWorker,
d3c8a1a8
S
70 Data = unknown,
71 Response = unknown
c4855468 72> implements IPool<Worker, Data, Response> {
afc003b2 73 /** @inheritDoc */
4b628b48 74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 75
afc003b2 76 /** @inheritDoc */
f80125ca 77 public emitter?: EventEmitterAsyncResource
7c0ba920 78
fcd39179
JB
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
be0676b3 84 /**
419e8121 85 * The task execution response promise map:
2740a743 86 * - `key`: The message id of each submitted task.
a3445496 87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 88 *
a3445496 89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 90 */
501aea93
JB
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 93
a35560ba 94 /**
51fe3d3c 95 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
98 Worker,
99 Data,
100 Response
a35560ba
S
101 >
102
72ae84a2
JB
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
15b176e0
JB
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
47352846
JB
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
711623b8
JB
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
55082af9
JB
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
afe0d5bf
JB
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
729c563d
S
131 /**
132 * Constructs a new poolifier pool.
133 *
38e795c1 134 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 135 * @param filePath - Path to the worker file.
38e795c1 136 * @param opts - Options for the pool.
729c563d 137 */
c97c7edb 138 public constructor (
b4213b7f
JB
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
c97c7edb 142 ) {
78cea37e 143 if (!this.isMain()) {
04f45163 144 throw new Error(
8c6d4acf 145 'Cannot start a pool from a worker with the same type as the pool'
04f45163 146 )
c97c7edb 147 }
bde6b5d7 148 checkFilePath(this.filePath)
8003c026 149 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 150 this.checkPoolOptions(this.opts)
1086026a 151
7254e419
JB
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 155
6bd72cd0 156 if (this.opts.enableEvents === true) {
b5604034 157 this.initializeEventEmitter()
7c0ba920 158 }
d59df138
JB
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
da309861
JB
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
b6b32453
JB
168
169 this.setupHook()
170
72ae84a2
JB
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
47352846 173 this.started = false
075e51d1 174 this.starting = false
711623b8 175 this.destroying = false
55082af9 176 this.readyEventEmitted = false
47352846
JB
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
afe0d5bf
JB
180
181 this.startTimestamp = performance.now()
c97c7edb
S
182 }
183
8d3782fa
JB
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
78cea37e 189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 190 throw new TypeError(
0d80593b 191 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
192 )
193 } else if (numberOfWorkers < 0) {
473c717a 194 throw new RangeError(
8d3782fa
JB
195 'Cannot instantiate a pool with a negative number of workers'
196 )
6b27d407 197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 203 if (isPlainObject(opts)) {
47352846 204 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 205 checkValidWorkerChoiceStrategy(
2324f8c9
JB
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
0d80593b
JB
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
8990357d
JB
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
bde6b5d7 221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 228 }
aee46736
JB
229 }
230
0d80593b
JB
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
2324f8c9
JB
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
0d80593b
JB
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
8990357d 242 if (
2324f8c9 243 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
245 ) {
246 throw new TypeError(
8c0b113f 247 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
248 )
249 }
250 if (
2324f8c9 251 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 252 workerChoiceStrategyOptions.retries < 0
8990357d
JB
253 ) {
254 throw new RangeError(
8c0b113f 255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
256 )
257 }
49be33fe 258 if (
2324f8c9 259 workerChoiceStrategyOptions?.weights != null &&
6b27d407 260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
f0d7f803 266 if (
2324f8c9 267 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
0d80593b
JB
276 }
277
b5604034 278 private initializeEventEmitter (): void {
5b7d00b4 279 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
08f3f44c 284 /** @inheritDoc */
6b27d407
JB
285 public get info (): PoolInfo {
286 return {
23ccf9d7 287 version,
6b27d407 288 type: this.type,
184855e6 289 worker: this.worker,
47352846 290 started: this.started,
2431bdb4
JB
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
293 minSize: this.minSize,
294 maxSize: this.maxSize,
c05f0d50
JB
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
1305e9a8
JB
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
f59e1027 302 workerNode.usage.tasks.executing === 0
a4e07f72
JB
303 ? accumulator + 1
304 : accumulator,
6b27d407
JB
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
f59e1027 309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
310 0
311 ),
a4e07f72 312 executedTasks: this.workerNodes.reduce(
6b27d407 313 (accumulator, workerNode) =>
f59e1027 314 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
f59e1027 319 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
320 0
321 ),
daf86646
JB
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
a1763c54
JB
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
68cbdc84
JB
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
a4e07f72
JB
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
f59e1027 348 accumulator + workerNode.usage.tasks.failed,
a4e07f72 349 0
1dcf8b7b
JB
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
98e72cda 354 minimum: round(
90d6701c 355 min(
98e72cda 356 ...this.workerNodes.map(
041dc05b 357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 358 )
1dcf8b7b
JB
359 )
360 ),
98e72cda 361 maximum: round(
90d6701c 362 max(
98e72cda 363 ...this.workerNodes.map(
041dc05b 364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 365 )
1dcf8b7b 366 )
98e72cda 367 ),
3baa0837
JB
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
98e72cda 377 )
dc021bcc 378 )
3baa0837 379 }),
98e72cda
JB
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
3baa0837
JB
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
98e72cda
JB
388 )
389 )
390 )
391 })
1dcf8b7b
JB
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
98e72cda 397 minimum: round(
90d6701c 398 min(
98e72cda 399 ...this.workerNodes.map(
041dc05b 400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 401 )
1dcf8b7b
JB
402 )
403 ),
98e72cda 404 maximum: round(
90d6701c 405 max(
98e72cda 406 ...this.workerNodes.map(
041dc05b 407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 408 )
1dcf8b7b 409 )
98e72cda 410 ),
3baa0837
JB
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
98e72cda 420 )
dc021bcc 421 )
3baa0837 422 }),
98e72cda
JB
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
3baa0837
JB
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
98e72cda
JB
431 )
432 )
433 )
434 })
1dcf8b7b
JB
435 }
436 })
6b27d407
JB
437 }
438 }
08f3f44c 439
aa9eede8
JB
440 /**
441 * The pool readiness boolean status.
442 */
2431bdb4
JB
443 private get ready (): boolean {
444 return (
b97d82d8
JB
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
2431bdb4
JB
452 )
453 }
454
afe0d5bf 455 /**
aa9eede8 456 * The approximate pool utilization.
afe0d5bf
JB
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
8e5ca040 461 const poolTimeCapacity =
fe7d90db 462 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
71514351 465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
71514351 470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
471 0
472 )
8e5ca040 473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
474 }
475
8881ae32 476 /**
aa9eede8 477 * The pool type.
8881ae32
JB
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
184855e6 483 /**
aa9eede8 484 * The worker type.
184855e6
JB
485 */
486 protected abstract get worker (): WorkerType
487
c2ade475 488 /**
aa9eede8 489 * The pool minimum size.
c2ade475 490 */
8735b4e5
JB
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
ff733df7
JB
494
495 /**
aa9eede8 496 * The pool maximum size.
ff733df7 497 */
8735b4e5
JB
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
a35560ba 501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
4d159167 508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
8e5a7cf9 511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
ffcbbad8 518 /**
f06e48d8 519 * Gets the given worker its worker node key.
ffcbbad8
JB
520 *
521 * @param worker - The worker.
f59e1027 522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 523 */
aad6fb64 524 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 525 return this.workerNodes.findIndex(
041dc05b 526 workerNode => workerNode.worker === worker
f06e48d8 527 )
bf9549ae
JB
528 }
529
aa9eede8
JB
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
aad6fb64 534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 535 */
72ae84a2 536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 537 return this.workerNodes.findIndex(
041dc05b 538 workerNode => workerNode.info.id === workerId
aad6fb64 539 )
aa9eede8
JB
540 }
541
afc003b2 542 /** @inheritDoc */
a35560ba 543 public setWorkerChoiceStrategy (
59219cbb
JB
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 546 ): void {
bde6b5d7 547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 548 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
aa9eede8 555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 556 workerNode.resetUsage()
9edb9717 557 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 558 }
a20f0ba5
JB
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
0d80593b 565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
a20f0ba5
JB
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
a35560ba
S
572 )
573 }
574
a20f0ba5 575 /** @inheritDoc */
8f52842f
JB
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
ef41a6e6 583 this.flushTasksQueues()
a20f0ba5
JB
584 }
585 this.opts.enableTasksQueue = enable
8f52842f 586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
587 }
588
589 /** @inheritDoc */
8f52842f 590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 591 if (this.opts.enableTasksQueue === true) {
bde6b5d7 592 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
596 if (this.opts.tasksQueueOptions.taskStealing === true) {
597 this.setTaskStealing()
598 } else {
599 this.unsetTaskStealing()
600 }
601 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
602 this.setTasksStealingOnBackPressure()
603 } else {
604 this.unsetTasksStealingOnBackPressure()
605 }
5baee0d7 606 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
607 delete this.opts.tasksQueueOptions
608 }
609 }
610
9b38ab2d
JB
611 private buildTasksQueueOptions (
612 tasksQueueOptions: TasksQueueOptions
613 ): TasksQueueOptions {
614 return {
615 ...{
616 size: Math.pow(this.maxSize, 2),
617 concurrency: 1,
618 taskStealing: true,
619 tasksStealingOnBackPressure: true
620 },
621 ...tasksQueueOptions
622 }
623 }
624
5b49e864 625 private setTasksQueueSize (size: number): void {
20c6f652 626 for (const workerNode of this.workerNodes) {
ff3f866a 627 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
628 }
629 }
630
d6ca1416
JB
631 private setTaskStealing (): void {
632 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 633 this.workerNodes[workerNodeKey].addEventListener(
65542a35
JB
634 'idleWorkerNode',
635 this.handleIdleWorkerNodeEvent as EventListener
9f95d5eb 636 )
d6ca1416
JB
637 }
638 }
639
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 642 this.workerNodes[workerNodeKey].removeEventListener(
65542a35
JB
643 'idleWorkerNode',
644 this.handleIdleWorkerNodeEvent as EventListener
9f95d5eb 645 )
d6ca1416
JB
646 }
647 }
648
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 651 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 652 'backPressure',
9b85e2d0 653 this.handleBackPressureEvent as EventListener
9f95d5eb 654 )
d6ca1416
JB
655 }
656 }
657
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey] of this.workerNodes.entries()) {
9f95d5eb 660 this.workerNodes[workerNodeKey].removeEventListener(
b5e75be8 661 'backPressure',
9b85e2d0 662 this.handleBackPressureEvent as EventListener
9f95d5eb 663 )
d6ca1416
JB
664 }
665 }
666
c319c66b
JB
667 /**
668 * Whether the pool is full or not.
669 *
670 * The pool filling boolean status.
671 */
dea903a8
JB
672 protected get full (): boolean {
673 return this.workerNodes.length >= this.maxSize
674 }
c2ade475 675
c319c66b
JB
676 /**
677 * Whether the pool is busy or not.
678 *
679 * The pool busyness boolean status.
680 */
681 protected abstract get busy (): boolean
7c0ba920 682
6c6afb84 683 /**
3d76750a 684 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
685 *
686 * @returns Worker nodes busyness boolean status.
687 */
c2ade475 688 protected internalBusy (): boolean {
3d76750a
JB
689 if (this.opts.enableTasksQueue === true) {
690 return (
691 this.workerNodes.findIndex(
041dc05b 692 workerNode =>
3d76750a
JB
693 workerNode.info.ready &&
694 workerNode.usage.tasks.executing <
695 (this.opts.tasksQueueOptions?.concurrency as number)
696 ) === -1
697 )
3d76750a 698 }
419e8121
JB
699 return (
700 this.workerNodes.findIndex(
701 workerNode =>
702 workerNode.info.ready && workerNode.usage.tasks.executing === 0
703 ) === -1
704 )
cb70b19d
JB
705 }
706
e81c38f2 707 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
708 workerNodeKey: number,
709 message: MessageValue<Data>
710 ): Promise<boolean> {
72ae84a2 711 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
712 const taskFunctionOperationListener = (
713 message: MessageValue<Response>
714 ): void => {
715 this.checkMessageWorkerId(message)
716 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 717 if (
ae036c3e
JB
718 message.taskFunctionOperationStatus != null &&
719 message.workerId === workerId
72ae84a2 720 ) {
ae036c3e
JB
721 if (message.taskFunctionOperationStatus) {
722 resolve(true)
723 } else if (!message.taskFunctionOperationStatus) {
724 reject(
725 new Error(
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
730 }'`
731 )
72ae84a2 732 )
ae036c3e
JB
733 }
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message.workerId),
736 taskFunctionOperationListener
72ae84a2
JB
737 )
738 }
ae036c3e
JB
739 }
740 this.registerWorkerMessageListener(
741 workerNodeKey,
742 taskFunctionOperationListener
743 )
72ae84a2
JB
744 this.sendToWorker(workerNodeKey, message)
745 })
746 }
747
748 private async sendTaskFunctionOperationToWorkers (
adee6053 749 message: MessageValue<Data>
e81c38f2
JB
750 ): Promise<boolean> {
751 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
752 const responsesReceived = new Array<MessageValue<Response>>()
753 const taskFunctionOperationsListener = (
754 message: MessageValue<Response>
755 ): void => {
756 this.checkMessageWorkerId(message)
757 if (message.taskFunctionOperationStatus != null) {
758 responsesReceived.push(message)
759 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 760 if (
e81c38f2
JB
761 responsesReceived.every(
762 message => message.taskFunctionOperationStatus === true
763 )
764 ) {
765 resolve(true)
766 } else if (
e81c38f2
JB
767 responsesReceived.some(
768 message => message.taskFunctionOperationStatus === false
769 )
770 ) {
b0b55f57
JB
771 const errorResponse = responsesReceived.find(
772 response => response.taskFunctionOperationStatus === false
773 )
e81c38f2
JB
774 reject(
775 new Error(
b0b55f57 776 `Task function operation '${
e81c38f2 777 message.taskFunctionOperation as string
b0b55f57
JB
778 }' failed on worker ${
779 errorResponse?.workerId as number
780 } with error: '${
781 errorResponse?.workerError?.message as string
782 }'`
e81c38f2
JB
783 )
784 )
785 }
ae036c3e
JB
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message.workerId),
788 taskFunctionOperationsListener
789 )
e81c38f2 790 }
ae036c3e
JB
791 }
792 }
793 for (const [workerNodeKey] of this.workerNodes.entries()) {
794 this.registerWorkerMessageListener(
795 workerNodeKey,
796 taskFunctionOperationsListener
797 )
72ae84a2 798 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
799 }
800 })
6703b9f4
JB
801 }
802
803 /** @inheritDoc */
804 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
805 for (const workerNode of this.workerNodes) {
806 if (
807 Array.isArray(workerNode.info.taskFunctionNames) &&
808 workerNode.info.taskFunctionNames.includes(name)
809 ) {
810 return true
811 }
812 }
813 return false
6703b9f4
JB
814 }
815
816 /** @inheritDoc */
e81c38f2
JB
817 public async addTaskFunction (
818 name: string,
3feeab69 819 fn: TaskFunction<Data, Response>
e81c38f2 820 ): Promise<boolean> {
3feeab69
JB
821 if (typeof name !== 'string') {
822 throw new TypeError('name argument must be a string')
823 }
824 if (typeof name === 'string' && name.trim().length === 0) {
825 throw new TypeError('name argument must not be an empty string')
826 }
827 if (typeof fn !== 'function') {
828 throw new TypeError('fn argument must be a function')
829 }
adee6053 830 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
831 taskFunctionOperation: 'add',
832 taskFunctionName: name,
3feeab69 833 taskFunction: fn.toString()
6703b9f4 834 })
adee6053
JB
835 this.taskFunctions.set(name, fn)
836 return opResult
6703b9f4
JB
837 }
838
839 /** @inheritDoc */
e81c38f2 840 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
841 if (!this.taskFunctions.has(name)) {
842 throw new Error(
16248b23 843 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
844 )
845 }
adee6053 846 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
847 taskFunctionOperation: 'remove',
848 taskFunctionName: name
849 })
adee6053
JB
850 this.deleteTaskFunctionWorkerUsages(name)
851 this.taskFunctions.delete(name)
852 return opResult
6703b9f4
JB
853 }
854
90d7d101 855 /** @inheritDoc */
6703b9f4 856 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
857 for (const workerNode of this.workerNodes) {
858 if (
6703b9f4
JB
859 Array.isArray(workerNode.info.taskFunctionNames) &&
860 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 861 ) {
6703b9f4 862 return workerNode.info.taskFunctionNames
f2dbbf95 863 }
90d7d101 864 }
f2dbbf95 865 return []
90d7d101
JB
866 }
867
6703b9f4 868 /** @inheritDoc */
e81c38f2 869 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 870 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
871 taskFunctionOperation: 'default',
872 taskFunctionName: name
873 })
6703b9f4
JB
874 }
875
adee6053
JB
876 private deleteTaskFunctionWorkerUsages (name: string): void {
877 for (const workerNode of this.workerNodes) {
878 workerNode.deleteTaskFunctionWorkerUsage(name)
879 }
880 }
881
375f7504
JB
882 private shallExecuteTask (workerNodeKey: number): boolean {
883 return (
884 this.tasksQueueSize(workerNodeKey) === 0 &&
885 this.workerNodes[workerNodeKey].usage.tasks.executing <
886 (this.opts.tasksQueueOptions?.concurrency as number)
887 )
888 }
889
afc003b2 890 /** @inheritDoc */
7d91a8cd
JB
891 public async execute (
892 data?: Data,
893 name?: string,
894 transferList?: TransferListItem[]
895 ): Promise<Response> {
52b71763 896 return await new Promise<Response>((resolve, reject) => {
15b176e0 897 if (!this.started) {
47352846 898 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 899 return
15b176e0 900 }
711623b8
JB
901 if (this.destroying) {
902 reject(new Error('Cannot execute a task on destroying pool'))
903 return
904 }
7d91a8cd
JB
905 if (name != null && typeof name !== 'string') {
906 reject(new TypeError('name argument must be a string'))
9d2d0da1 907 return
7d91a8cd 908 }
90d7d101
JB
909 if (
910 name != null &&
911 typeof name === 'string' &&
912 name.trim().length === 0
913 ) {
f58b60b9 914 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 915 return
90d7d101 916 }
b558f6b5
JB
917 if (transferList != null && !Array.isArray(transferList)) {
918 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 919 return
b558f6b5
JB
920 }
921 const timestamp = performance.now()
922 const workerNodeKey = this.chooseWorkerNode()
501aea93 923 const task: Task<Data> = {
52b71763
JB
924 name: name ?? DEFAULT_TASK_NAME,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data: data ?? ({} as Data),
7d91a8cd 927 transferList,
52b71763 928 timestamp,
7629bdf1 929 taskId: randomUUID()
52b71763 930 }
7629bdf1 931 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
932 resolve,
933 reject,
501aea93 934 workerNodeKey
2e81254d 935 })
52b71763 936 if (
4e377863
JB
937 this.opts.enableTasksQueue === false ||
938 (this.opts.enableTasksQueue === true &&
375f7504 939 this.shallExecuteTask(workerNodeKey))
52b71763 940 ) {
501aea93 941 this.executeTask(workerNodeKey, task)
4e377863
JB
942 } else {
943 this.enqueueTask(workerNodeKey, task)
52b71763 944 }
2e81254d 945 })
280c2a77 946 }
c97c7edb 947
47352846
JB
948 /** @inheritdoc */
949 public start (): void {
711623b8
JB
950 if (this.started) {
951 throw new Error('Cannot start an already started pool')
952 }
953 if (this.starting) {
954 throw new Error('Cannot start an already starting pool')
955 }
956 if (this.destroying) {
957 throw new Error('Cannot start a destroying pool')
958 }
47352846
JB
959 this.starting = true
960 while (
961 this.workerNodes.reduce(
962 (accumulator, workerNode) =>
963 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
964 0
965 ) < this.numberOfWorkers
966 ) {
967 this.createAndSetupWorkerNode()
968 }
969 this.starting = false
970 this.started = true
971 }
972
afc003b2 973 /** @inheritDoc */
c97c7edb 974 public async destroy (): Promise<void> {
711623b8
JB
975 if (!this.started) {
976 throw new Error('Cannot destroy an already destroyed pool')
977 }
978 if (this.starting) {
979 throw new Error('Cannot destroy an starting pool')
980 }
981 if (this.destroying) {
982 throw new Error('Cannot destroy an already destroying pool')
983 }
984 this.destroying = true
1fbcaa7c 985 await Promise.all(
81c02522 986 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 987 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
988 })
989 )
33e6bb4c 990 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 991 this.emitter?.emitDestroy()
55082af9 992 this.readyEventEmitted = false
711623b8 993 this.destroying = false
15b176e0 994 this.started = false
c97c7edb
S
995 }
996
1e3214b6 997 protected async sendKillMessageToWorker (
72ae84a2 998 workerNodeKey: number
1e3214b6 999 ): Promise<void> {
9edb9717 1000 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
1001 const killMessageListener = (message: MessageValue<Response>): void => {
1002 this.checkMessageWorkerId(message)
1e3214b6
JB
1003 if (message.kill === 'success') {
1004 resolve()
1005 } else if (message.kill === 'failure') {
72ae84a2
JB
1006 reject(
1007 new Error(
ae036c3e 1008 `Kill message handling failed on worker ${
72ae84a2 1009 message.workerId as number
ae036c3e 1010 }`
72ae84a2
JB
1011 )
1012 )
1e3214b6 1013 }
ae036c3e 1014 }
51d9cfbd 1015 // FIXME: should be registered only once
ae036c3e 1016 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1017 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1018 })
1e3214b6
JB
1019 }
1020
4a6952ff 1021 /**
aa9eede8 1022 * Terminates the worker node given its worker node key.
4a6952ff 1023 *
aa9eede8 1024 * @param workerNodeKey - The worker node key.
4a6952ff 1025 */
81c02522 1026 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1027
729c563d 1028 /**
6677a3d3
JB
1029 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1030 * Can be overridden.
afc003b2
JB
1031 *
1032 * @virtual
729c563d 1033 */
280c2a77 1034 protected setupHook (): void {
965df41c 1035 /* Intentionally empty */
280c2a77 1036 }
c97c7edb 1037
729c563d 1038 /**
280c2a77
S
1039 * Should return whether the worker is the main worker or not.
1040 */
1041 protected abstract isMain (): boolean
1042
1043 /**
2e81254d 1044 * Hook executed before the worker task execution.
bf9549ae 1045 * Can be overridden.
729c563d 1046 *
f06e48d8 1047 * @param workerNodeKey - The worker node key.
1c6fe997 1048 * @param task - The task to execute.
729c563d 1049 */
1c6fe997
JB
1050 protected beforeTaskExecutionHook (
1051 workerNodeKey: number,
1052 task: Task<Data>
1053 ): void {
94407def
JB
1054 if (this.workerNodes[workerNodeKey]?.usage != null) {
1055 const workerUsage = this.workerNodes[workerNodeKey].usage
1056 ++workerUsage.tasks.executing
1057 this.updateWaitTimeWorkerUsage(workerUsage, task)
1058 }
1059 if (
1060 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1061 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1062 task.name as string
1063 ) != null
1064 ) {
db0e38ee 1065 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1066 workerNodeKey
db0e38ee 1067 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1068 ++taskFunctionWorkerUsage.tasks.executing
1069 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1070 }
c97c7edb
S
1071 }
1072
c01733f1 1073 /**
2e81254d 1074 * Hook executed after the worker task execution.
bf9549ae 1075 * Can be overridden.
c01733f1 1076 *
501aea93 1077 * @param workerNodeKey - The worker node key.
38e795c1 1078 * @param message - The received message.
c01733f1 1079 */
2e81254d 1080 protected afterTaskExecutionHook (
501aea93 1081 workerNodeKey: number,
2740a743 1082 message: MessageValue<Response>
bf9549ae 1083 ): void {
94407def
JB
1084 if (this.workerNodes[workerNodeKey]?.usage != null) {
1085 const workerUsage = this.workerNodes[workerNodeKey].usage
1086 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1087 this.updateRunTimeWorkerUsage(workerUsage, message)
1088 this.updateEluWorkerUsage(workerUsage, message)
1089 }
1090 if (
1091 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1092 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1093 message.taskPerformance?.name as string
94407def
JB
1094 ) != null
1095 ) {
db0e38ee 1096 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1097 workerNodeKey
db0e38ee 1098 ].getTaskFunctionWorkerUsage(
0628755c 1099 message.taskPerformance?.name as string
b558f6b5 1100 ) as WorkerUsage
db0e38ee
JB
1101 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1102 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1103 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1104 }
1105 }
1106
db0e38ee
JB
1107 /**
1108 * Whether the worker node shall update its task function worker usage or not.
1109 *
1110 * @param workerNodeKey - The worker node key.
1111 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1112 */
1113 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1114 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1115 return (
94407def 1116 workerInfo != null &&
6703b9f4
JB
1117 Array.isArray(workerInfo.taskFunctionNames) &&
1118 workerInfo.taskFunctionNames.length > 2
b558f6b5 1119 )
f1c06930
JB
1120 }
1121
1122 private updateTaskStatisticsWorkerUsage (
1123 workerUsage: WorkerUsage,
1124 message: MessageValue<Response>
1125 ): void {
a4e07f72 1126 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1127 if (
1128 workerTaskStatistics.executing != null &&
1129 workerTaskStatistics.executing > 0
1130 ) {
1131 --workerTaskStatistics.executing
5bb5be17 1132 }
6703b9f4 1133 if (message.workerError == null) {
98e72cda
JB
1134 ++workerTaskStatistics.executed
1135 } else {
a4e07f72 1136 ++workerTaskStatistics.failed
2740a743 1137 }
f8eb0a2a
JB
1138 }
1139
a4e07f72
JB
1140 private updateRunTimeWorkerUsage (
1141 workerUsage: WorkerUsage,
f8eb0a2a
JB
1142 message: MessageValue<Response>
1143 ): void {
6703b9f4 1144 if (message.workerError != null) {
dc021bcc
JB
1145 return
1146 }
e4f20deb
JB
1147 updateMeasurementStatistics(
1148 workerUsage.runTime,
1149 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1150 message.taskPerformance?.runTime ?? 0
e4f20deb 1151 )
f8eb0a2a
JB
1152 }
1153
a4e07f72
JB
1154 private updateWaitTimeWorkerUsage (
1155 workerUsage: WorkerUsage,
1c6fe997 1156 task: Task<Data>
f8eb0a2a 1157 ): void {
1c6fe997
JB
1158 const timestamp = performance.now()
1159 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1160 updateMeasurementStatistics(
1161 workerUsage.waitTime,
1162 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1163 taskWaitTime
e4f20deb 1164 )
c01733f1 1165 }
1166
a4e07f72 1167 private updateEluWorkerUsage (
5df69fab 1168 workerUsage: WorkerUsage,
62c15a68
JB
1169 message: MessageValue<Response>
1170 ): void {
6703b9f4 1171 if (message.workerError != null) {
dc021bcc
JB
1172 return
1173 }
008512c7
JB
1174 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1175 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1176 updateMeasurementStatistics(
1177 workerUsage.elu.active,
008512c7 1178 eluTaskStatisticsRequirements,
dc021bcc 1179 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1180 )
1181 updateMeasurementStatistics(
1182 workerUsage.elu.idle,
008512c7 1183 eluTaskStatisticsRequirements,
dc021bcc 1184 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1185 )
008512c7 1186 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1187 if (message.taskPerformance?.elu != null) {
f7510105
JB
1188 if (workerUsage.elu.utilization != null) {
1189 workerUsage.elu.utilization =
1190 (workerUsage.elu.utilization +
1191 message.taskPerformance.elu.utilization) /
1192 2
1193 } else {
1194 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1195 }
62c15a68
JB
1196 }
1197 }
1198 }
1199
280c2a77 1200 /**
f06e48d8 1201 * Chooses a worker node for the next task.
280c2a77 1202 *
6c6afb84 1203 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1204 *
aa9eede8 1205 * @returns The chosen worker node key
280c2a77 1206 */
6c6afb84 1207 private chooseWorkerNode (): number {
930dcf12 1208 if (this.shallCreateDynamicWorker()) {
aa9eede8 1209 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1210 if (
b1aae695 1211 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1212 ) {
aa9eede8 1213 return workerNodeKey
6c6afb84 1214 }
17393ac8 1215 }
930dcf12
JB
1216 return this.workerChoiceStrategyContext.execute()
1217 }
1218
6c6afb84
JB
1219 /**
1220 * Conditions for dynamic worker creation.
1221 *
1222 * @returns Whether to create a dynamic worker or not.
1223 */
1224 private shallCreateDynamicWorker (): boolean {
930dcf12 1225 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1226 }
1227
280c2a77 1228 /**
aa9eede8 1229 * Sends a message to worker given its worker node key.
280c2a77 1230 *
aa9eede8 1231 * @param workerNodeKey - The worker node key.
38e795c1 1232 * @param message - The message.
7d91a8cd 1233 * @param transferList - The optional array of transferable objects.
280c2a77
S
1234 */
1235 protected abstract sendToWorker (
aa9eede8 1236 workerNodeKey: number,
7d91a8cd
JB
1237 message: MessageValue<Data>,
1238 transferList?: TransferListItem[]
280c2a77
S
1239 ): void
1240
729c563d 1241 /**
41344292 1242 * Creates a new worker.
6c6afb84
JB
1243 *
1244 * @returns Newly created worker.
729c563d 1245 */
280c2a77 1246 protected abstract createWorker (): Worker
c97c7edb 1247
4a6952ff 1248 /**
aa9eede8 1249 * Creates a new, completely set up worker node.
4a6952ff 1250 *
aa9eede8 1251 * @returns New, completely set up worker node key.
4a6952ff 1252 */
aa9eede8 1253 protected createAndSetupWorkerNode (): number {
bdacc2d2 1254 const worker = this.createWorker()
280c2a77 1255
fd04474e 1256 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1257 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1258 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1259 worker.on('error', error => {
aad6fb64 1260 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1261 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1262 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1263 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1264 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1265 if (
b6bfca01 1266 this.started &&
9b38ab2d 1267 !this.starting &&
711623b8 1268 !this.destroying &&
9b38ab2d 1269 this.opts.restartWorkerOnError === true
15b176e0 1270 ) {
9b106837 1271 if (workerInfo.dynamic) {
aa9eede8 1272 this.createAndSetupDynamicWorkerNode()
8a1260a3 1273 } else {
aa9eede8 1274 this.createAndSetupWorkerNode()
8a1260a3 1275 }
5baee0d7 1276 }
6d59c3a3 1277 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1278 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1279 }
5baee0d7 1280 })
a35560ba 1281 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1282 worker.once('exit', () => {
f06e48d8 1283 this.removeWorkerNode(worker)
a974afa6 1284 })
280c2a77 1285
aa9eede8 1286 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1287
aa9eede8 1288 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1289
aa9eede8 1290 return workerNodeKey
c97c7edb 1291 }
be0676b3 1292
930dcf12 1293 /**
aa9eede8 1294 * Creates a new, completely set up dynamic worker node.
930dcf12 1295 *
aa9eede8 1296 * @returns New, completely set up dynamic worker node key.
930dcf12 1297 */
aa9eede8
JB
1298 protected createAndSetupDynamicWorkerNode (): number {
1299 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1300 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1301 this.checkMessageWorkerId(message)
aa9eede8
JB
1302 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1303 message.workerId
aad6fb64 1304 )
aa9eede8 1305 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1306 // Kill message received from worker
930dcf12
JB
1307 if (
1308 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1309 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1310 ((this.opts.enableTasksQueue === false &&
aa9eede8 1311 workerUsage.tasks.executing === 0) ||
7b56f532 1312 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1313 workerUsage.tasks.executing === 0 &&
1314 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1315 ) {
f3827d5d 1316 // Flag the worker node as not ready immediately
ae3ab61d 1317 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1318 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1319 this.emitter?.emit(PoolEvents.error, error)
1320 })
930dcf12
JB
1321 }
1322 })
46b0bb09 1323 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1324 this.sendToWorker(workerNodeKey, {
e9dd5b66 1325 checkActive: true
21f710aa 1326 })
72ae84a2
JB
1327 if (this.taskFunctions.size > 0) {
1328 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1329 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1330 taskFunctionOperation: 'add',
1331 taskFunctionName,
1332 taskFunction: taskFunction.toString()
1333 }).catch(error => {
1334 this.emitter?.emit(PoolEvents.error, error)
1335 })
1336 }
1337 }
b5e113f6 1338 workerInfo.dynamic = true
b1aae695
JB
1339 if (
1340 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1341 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1342 ) {
b5e113f6
JB
1343 workerInfo.ready = true
1344 }
33e6bb4c 1345 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1346 return workerNodeKey
930dcf12
JB
1347 }
1348
a2ed5053 1349 /**
aa9eede8 1350 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1351 *
aa9eede8 1352 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1353 * @param listener - The message listener callback.
1354 */
85aeb3f3
JB
1355 protected abstract registerWorkerMessageListener<
1356 Message extends Data | Response
aa9eede8
JB
1357 >(
1358 workerNodeKey: number,
1359 listener: (message: MessageValue<Message>) => void
1360 ): void
a2ed5053 1361
ae036c3e
JB
1362 /**
1363 * Registers once a listener callback on the worker given its worker node key.
1364 *
1365 * @param workerNodeKey - The worker node key.
1366 * @param listener - The message listener callback.
1367 */
1368 protected abstract registerOnceWorkerMessageListener<
1369 Message extends Data | Response
1370 >(
1371 workerNodeKey: number,
1372 listener: (message: MessageValue<Message>) => void
1373 ): void
1374
1375 /**
1376 * Deregisters a listener callback on the worker given its worker node key.
1377 *
1378 * @param workerNodeKey - The worker node key.
1379 * @param listener - The message listener callback.
1380 */
1381 protected abstract deregisterWorkerMessageListener<
1382 Message extends Data | Response
1383 >(
1384 workerNodeKey: number,
1385 listener: (message: MessageValue<Message>) => void
1386 ): void
1387
a2ed5053 1388 /**
aa9eede8 1389 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1390 * Can be overridden.
1391 *
aa9eede8 1392 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1393 */
aa9eede8 1394 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1395 // Listen to worker messages.
fcd39179
JB
1396 this.registerWorkerMessageListener(
1397 workerNodeKey,
1398 this.workerMessageListener.bind(this)
1399 )
85aeb3f3 1400 // Send the startup message to worker.
aa9eede8 1401 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1402 // Send the statistics message to worker.
1403 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1404 if (this.opts.enableTasksQueue === true) {
dbd73092 1405 if (this.opts.tasksQueueOptions?.taskStealing === true) {
9f95d5eb 1406 this.workerNodes[workerNodeKey].addEventListener(
65542a35
JB
1407 'idleWorkerNode',
1408 this.handleIdleWorkerNodeEvent as EventListener
9f95d5eb 1409 )
47352846
JB
1410 }
1411 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
9f95d5eb 1412 this.workerNodes[workerNodeKey].addEventListener(
b5e75be8 1413 'backPressure',
9b85e2d0 1414 this.handleBackPressureEvent as EventListener
9f95d5eb 1415 )
47352846 1416 }
72695f86 1417 }
d2c73f82
JB
1418 }
1419
85aeb3f3 1420 /**
aa9eede8
JB
1421 * Sends the startup message to worker given its worker node key.
1422 *
1423 * @param workerNodeKey - The worker node key.
1424 */
1425 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1426
1427 /**
9edb9717 1428 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1429 *
aa9eede8 1430 * @param workerNodeKey - The worker node key.
85aeb3f3 1431 */
9edb9717 1432 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1433 this.sendToWorker(workerNodeKey, {
1434 statistics: {
1435 runTime:
1436 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1437 .runTime.aggregate,
1438 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1439 .elu.aggregate
72ae84a2 1440 }
aa9eede8
JB
1441 })
1442 }
a2ed5053
JB
1443
1444 private redistributeQueuedTasks (workerNodeKey: number): void {
1445 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1446 const destinationWorkerNodeKey = this.workerNodes.reduce(
1447 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1448 return workerNode.info.ready &&
1449 workerNode.usage.tasks.queued <
1450 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1451 ? workerNodeKey
1452 : minWorkerNodeKey
1453 },
1454 0
1455 )
72ae84a2 1456 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1457 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1458 this.executeTask(destinationWorkerNodeKey, task)
1459 } else {
1460 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1461 }
1462 }
1463 }
1464
b1838604
JB
1465 private updateTaskStolenStatisticsWorkerUsage (
1466 workerNodeKey: number,
b1838604
JB
1467 taskName: string
1468 ): void {
1a880eca 1469 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1470 if (workerNode?.usage != null) {
1471 ++workerNode.usage.tasks.stolen
1472 }
1473 if (
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1475 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1476 ) {
1477 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1478 taskName
1479 ) as WorkerUsage
1480 ++taskFunctionWorkerUsage.tasks.stolen
1481 }
1482 }
1483
463226a4 1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1485 workerNodeKey: number
463226a4
JB
1486 ): void {
1487 const workerNode = this.workerNodes[workerNodeKey]
1488 if (workerNode?.usage != null) {
1489 ++workerNode.usage.tasks.sequentiallyStolen
1490 }
f1f77f45
JB
1491 }
1492
1493 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1494 workerNodeKey: number,
1495 taskName: string
1496 ): void {
1497 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1498 if (
1499 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1500 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1501 ) {
1502 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1503 taskName
1504 ) as WorkerUsage
1505 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1506 }
1507 }
1508
1509 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1510 workerNodeKey: number
463226a4
JB
1511 ): void {
1512 const workerNode = this.workerNodes[workerNodeKey]
1513 if (workerNode?.usage != null) {
1514 workerNode.usage.tasks.sequentiallyStolen = 0
1515 }
f1f77f45
JB
1516 }
1517
1518 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1519 workerNodeKey: number,
1520 taskName: string
1521 ): void {
1522 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1523 if (
1524 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1525 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1526 ) {
1527 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1528 taskName
1529 ) as WorkerUsage
1530 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1531 }
1532 }
1533
65542a35 1534 private readonly handleIdleWorkerNodeEvent = (
463226a4
JB
1535 event: CustomEvent<WorkerNodeEventDetail>,
1536 previousStolenTask?: Task<Data>
9f95d5eb 1537 ): void => {
463226a4
JB
1538 const { workerNodeKey } = event.detail
1539 if (workerNodeKey == null) {
1540 throw new Error(
1541 'WorkerNode event detail workerNodeKey attribute must be defined'
1542 )
1543 }
1544 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1545 if (
1546 previousStolenTask != null &&
1547 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1548 (workerNodeTasksUsage.executing > 0 ||
1549 this.tasksQueueSize(workerNodeKey) > 0)
1550 ) {
f1f77f45
JB
1551 for (const taskName of this.workerNodes[workerNodeKey].info
1552 .taskFunctionNames as string[]) {
1553 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1554 workerNodeKey,
1555 taskName
1556 )
1557 }
1558 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1559 return
1560 }
1561 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1562 if (
1563 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1564 stolenTask != null
1565 ) {
1566 const taskFunctionTasksWorkerUsage = this.workerNodes[
1567 workerNodeKey
1568 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1569 ?.tasks as TaskStatistics
1570 if (
1571 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1572 (previousStolenTask != null &&
1573 previousStolenTask.name === stolenTask.name &&
1574 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1575 ) {
1576 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1577 workerNodeKey,
1578 stolenTask.name as string
1579 )
1580 } else {
1581 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1582 workerNodeKey,
1583 stolenTask.name as string
1584 )
1585 }
1586 }
463226a4
JB
1587 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1588 .then(() => {
1589 this.handleIdleWorkerNodeEvent(event, stolenTask)
1590 return undefined
1591 })
1592 .catch(EMPTY_FUNCTION)
1593 }
1594
1595 private readonly workerNodeStealTask = (
1596 workerNodeKey: number
1597 ): Task<Data> | undefined => {
dd951876 1598 const workerNodes = this.workerNodes
a6b3272b 1599 .slice()
dd951876
JB
1600 .sort(
1601 (workerNodeA, workerNodeB) =>
1602 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1603 )
f201a0cd 1604 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1605 (sourceWorkerNode, sourceWorkerNodeKey) =>
1606 sourceWorkerNode.info.ready &&
1607 sourceWorkerNodeKey !== workerNodeKey &&
1608 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1609 )
1610 if (sourceWorkerNode != null) {
72ae84a2 1611 const task = sourceWorkerNode.popTask() as Task<Data>
463226a4
JB
1612 if (this.shallExecuteTask(workerNodeKey)) {
1613 this.executeTask(workerNodeKey, task)
f201a0cd 1614 } else {
463226a4 1615 this.enqueueTask(workerNodeKey, task)
72695f86 1616 }
f1f77f45 1617 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
f201a0cd 1618 this.updateTaskStolenStatisticsWorkerUsage(
463226a4 1619 workerNodeKey,
f201a0cd
JB
1620 task.name as string
1621 )
463226a4 1622 return task
72695f86
JB
1623 }
1624 }
1625
9f95d5eb
JB
1626 private readonly handleBackPressureEvent = (
1627 event: CustomEvent<WorkerNodeEventDetail>
1628 ): void => {
b5e75be8 1629 const { workerId } = event.detail
f778c355
JB
1630 const sizeOffset = 1
1631 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1632 return
1633 }
72695f86 1634 const sourceWorkerNode =
b5e75be8 1635 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1636 const workerNodes = this.workerNodes
a6b3272b 1637 .slice()
72695f86
JB
1638 .sort(
1639 (workerNodeA, workerNodeB) =>
1640 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1641 )
1642 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1643 if (
0bc68267 1644 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1645 workerNode.info.ready &&
b5e75be8 1646 workerNode.info.id !== workerId &&
0bc68267 1647 workerNode.usage.tasks.queued <
f778c355 1648 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1649 ) {
72ae84a2 1650 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1651 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1652 this.executeTask(workerNodeKey, task)
4de3d785 1653 } else {
dd951876 1654 this.enqueueTask(workerNodeKey, task)
4de3d785 1655 }
b1838604
JB
1656 this.updateTaskStolenStatisticsWorkerUsage(
1657 workerNodeKey,
b1838604
JB
1658 task.name as string
1659 )
10ecf8fd 1660 }
a2ed5053
JB
1661 }
1662 }
1663
be0676b3 1664 /**
fcd39179 1665 * This method is the message listener registered on each worker.
be0676b3 1666 */
fcd39179
JB
1667 protected workerMessageListener (message: MessageValue<Response>): void {
1668 this.checkMessageWorkerId(message)
b641345c
JB
1669 const { workerId, ready, taskId, taskFunctionNames } = message
1670 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1671 // Worker ready response received from worker
1672 this.handleWorkerReadyResponse(message)
b641345c 1673 } else if (taskId != null) {
fcd39179
JB
1674 // Task execution response received from worker
1675 this.handleTaskExecutionResponse(message)
b641345c 1676 } else if (taskFunctionNames != null) {
fcd39179
JB
1677 // Task function names message received from worker
1678 this.getWorkerInfo(
b641345c
JB
1679 this.getWorkerNodeKeyByWorkerId(workerId)
1680 ).taskFunctionNames = taskFunctionNames
6b272951
JB
1681 }
1682 }
1683
10e2aa7e 1684 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4
JB
1685 const { workerId, ready, taskFunctionNames } = message
1686 if (ready === false) {
1687 throw new Error(`Worker ${workerId as number} failed to initialize`)
f05ed93c 1688 }
a5d15204 1689 const workerInfo = this.getWorkerInfo(
463226a4 1690 this.getWorkerNodeKeyByWorkerId(workerId)
46b0bb09 1691 )
463226a4
JB
1692 workerInfo.ready = ready as boolean
1693 workerInfo.taskFunctionNames = taskFunctionNames
55082af9
JB
1694 if (!this.readyEventEmitted && this.ready) {
1695 this.readyEventEmitted = true
1696 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1697 }
6b272951
JB
1698 }
1699
1700 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1701 const { workerId, taskId, workerError, data } = message
5441aea6 1702 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1703 if (promiseResponse != null) {
51af50ef 1704 const { resolve, reject, workerNodeKey } = promiseResponse
6703b9f4
JB
1705 if (workerError != null) {
1706 this.emitter?.emit(PoolEvents.taskError, workerError)
51af50ef 1707 reject(workerError.message)
6b272951 1708 } else {
51af50ef 1709 resolve(data as Response)
6b272951 1710 }
501aea93 1711 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1712 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1713 this.promiseResponseMap.delete(taskId as string)
463226a4
JB
1714 if (this.opts.enableTasksQueue === true) {
1715 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1716 if (
1717 this.tasksQueueSize(workerNodeKey) > 0 &&
1718 workerNodeTasksUsage.executing <
1719 (this.opts.tasksQueueOptions?.concurrency as number)
1720 ) {
1721 this.executeTask(
1722 workerNodeKey,
1723 this.dequeueTask(workerNodeKey) as Task<Data>
1724 )
1725 }
1726 if (
1727 workerNodeTasksUsage.executing === 0 &&
1728 this.tasksQueueSize(workerNodeKey) === 0 &&
1729 workerNodeTasksUsage.sequentiallyStolen === 0
1730 ) {
1731 this.workerNodes[workerNodeKey].dispatchEvent(
1732 new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
1733 detail: { workerId: workerId as number, workerNodeKey }
1734 })
1735 )
1736 }
be0676b3
APA
1737 }
1738 }
be0676b3 1739 }
7c0ba920 1740
a1763c54 1741 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1742 if (this.busy) {
1743 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1744 }
1745 }
1746
1747 private checkAndEmitTaskQueuingEvents (): void {
1748 if (this.hasBackPressure()) {
1749 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1750 }
1751 }
1752
33e6bb4c
JB
1753 private checkAndEmitDynamicWorkerCreationEvents (): void {
1754 if (this.type === PoolTypes.dynamic) {
1755 if (this.full) {
1756 this.emitter?.emit(PoolEvents.full, this.info)
1757 }
1758 }
1759 }
1760
8a1260a3 1761 /**
aa9eede8 1762 * Gets the worker information given its worker node key.
8a1260a3
JB
1763 *
1764 * @param workerNodeKey - The worker node key.
3f09ed9f 1765 * @returns The worker information.
8a1260a3 1766 */
46b0bb09 1767 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1768 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1769 }
1770
a05c10de 1771 /**
b0a4db63 1772 * Adds the given worker in the pool worker nodes.
ea7a90d3 1773 *
38e795c1 1774 * @param worker - The worker.
aa9eede8
JB
1775 * @returns The added worker node key.
1776 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1777 */
b0a4db63 1778 private addWorkerNode (worker: Worker): number {
671d5154
JB
1779 const workerNode = new WorkerNode<Worker, Data>(
1780 worker,
ff3f866a 1781 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1782 )
b97d82d8 1783 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1784 if (this.starting) {
1785 workerNode.info.ready = true
1786 }
aa9eede8 1787 this.workerNodes.push(workerNode)
aad6fb64 1788 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1789 if (workerNodeKey === -1) {
86ed0598 1790 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1791 }
1792 return workerNodeKey
ea7a90d3 1793 }
c923ce56 1794
51fe3d3c 1795 /**
f06e48d8 1796 * Removes the given worker from the pool worker nodes.
51fe3d3c 1797 *
f06e48d8 1798 * @param worker - The worker.
51fe3d3c 1799 */
416fd65c 1800 private removeWorkerNode (worker: Worker): void {
aad6fb64 1801 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1802 if (workerNodeKey !== -1) {
1803 this.workerNodes.splice(workerNodeKey, 1)
1804 this.workerChoiceStrategyContext.remove(workerNodeKey)
1805 }
51fe3d3c 1806 }
adc3c320 1807
ae3ab61d
JB
1808 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1809 this.getWorkerInfo(workerNodeKey).ready = false
1810 }
1811
e2b31e32
JB
1812 /** @inheritDoc */
1813 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1814 return (
e2b31e32
JB
1815 this.opts.enableTasksQueue === true &&
1816 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1817 )
1818 }
1819
1820 private hasBackPressure (): boolean {
1821 return (
1822 this.opts.enableTasksQueue === true &&
1823 this.workerNodes.findIndex(
041dc05b 1824 workerNode => !workerNode.hasBackPressure()
a1763c54 1825 ) === -1
9e844245 1826 )
e2b31e32
JB
1827 }
1828
b0a4db63 1829 /**
aa9eede8 1830 * Executes the given task on the worker given its worker node key.
b0a4db63 1831 *
aa9eede8 1832 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1833 * @param task - The task to execute.
1834 */
2e81254d 1835 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1836 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1837 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1838 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1839 }
1840
f9f00b5f 1841 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1842 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1843 this.checkAndEmitTaskQueuingEvents()
1844 return tasksQueueSize
adc3c320
JB
1845 }
1846
416fd65c 1847 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1848 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1849 }
1850
416fd65c 1851 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1852 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1853 }
1854
81c02522 1855 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1856 while (this.tasksQueueSize(workerNodeKey) > 0) {
1857 this.executeTask(
1858 workerNodeKey,
1859 this.dequeueTask(workerNodeKey) as Task<Data>
1860 )
ff733df7 1861 }
4b628b48 1862 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1863 }
1864
ef41a6e6
JB
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey] of this.workerNodes.entries()) {
1867 this.flushTasksQueue(workerNodeKey)
1868 }
1869 }
c97c7edb 1870}