test: check that event listener are remove at destroy
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
bbeadd16 23} from '../utils'
59317253 24import { KillBehaviors } from '../worker/worker-options'
6703b9f4 25import type { TaskFunction } from '../worker/task-functions'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
f1f77f45 38 TaskStatistics,
4d159167 39 WorkerInfo,
9f95d5eb 40 WorkerNodeEventDetail,
4d159167
JB
41 WorkerType,
42 WorkerUsage
e102732c 43} from './worker'
a35560ba 44import {
008512c7 45 type MeasurementStatisticsRequirements,
f0d7f803 46 Measurements,
a35560ba 47 WorkerChoiceStrategies,
a20f0ba5
JB
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
bdaf31cd
JB
50} from './selection-strategies/selection-strategies-types'
51import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 52import { version } from './version'
4b628b48 53import { WorkerNode } from './worker-node'
bde6b5d7
JB
54import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
bfc75cca
JB
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
bde6b5d7 59} from './utils'
23ccf9d7 60
729c563d 61/**
ea7a90d3 62 * Base class that implements some shared logic for all poolifier pools.
729c563d 63 *
38e795c1 64 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 67 */
c97c7edb 68export abstract class AbstractPool<
f06e48d8 69 Worker extends IWorker,
d3c8a1a8
S
70 Data = unknown,
71 Response = unknown
c4855468 72> implements IPool<Worker, Data, Response> {
afc003b2 73 /** @inheritDoc */
4b628b48 74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 75
afc003b2 76 /** @inheritDoc */
f80125ca 77 public emitter?: EventEmitterAsyncResource
7c0ba920 78
fcd39179
JB
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
be0676b3 84 /**
419e8121 85 * The task execution response promise map:
2740a743 86 * - `key`: The message id of each submitted task.
a3445496 87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 88 *
a3445496 89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 90 */
501aea93
JB
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 93
a35560ba 94 /**
51fe3d3c 95 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
98 Worker,
99 Data,
100 Response
a35560ba
S
101 >
102
72ae84a2
JB
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
15b176e0
JB
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
47352846
JB
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
711623b8
JB
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
55082af9
JB
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
afe0d5bf
JB
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
729c563d
S
131 /**
132 * Constructs a new poolifier pool.
133 *
38e795c1 134 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 135 * @param filePath - Path to the worker file.
38e795c1 136 * @param opts - Options for the pool.
729c563d 137 */
c97c7edb 138 public constructor (
b4213b7f
JB
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
c97c7edb 142 ) {
78cea37e 143 if (!this.isMain()) {
04f45163 144 throw new Error(
8c6d4acf 145 'Cannot start a pool from a worker with the same type as the pool'
04f45163 146 )
c97c7edb 147 }
bde6b5d7 148 checkFilePath(this.filePath)
8003c026 149 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 150 this.checkPoolOptions(this.opts)
1086026a 151
7254e419
JB
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 155
6bd72cd0 156 if (this.opts.enableEvents === true) {
b5604034 157 this.initializeEventEmitter()
7c0ba920 158 }
d59df138
JB
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
da309861
JB
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
b6b32453
JB
168
169 this.setupHook()
170
72ae84a2
JB
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
47352846 173 this.started = false
075e51d1 174 this.starting = false
711623b8 175 this.destroying = false
55082af9 176 this.readyEventEmitted = false
47352846
JB
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
afe0d5bf
JB
180
181 this.startTimestamp = performance.now()
c97c7edb
S
182 }
183
8d3782fa
JB
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
78cea37e 189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 190 throw new TypeError(
0d80593b 191 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
192 )
193 } else if (numberOfWorkers < 0) {
473c717a 194 throw new RangeError(
8d3782fa
JB
195 'Cannot instantiate a pool with a negative number of workers'
196 )
6b27d407 197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 203 if (isPlainObject(opts)) {
47352846 204 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 205 checkValidWorkerChoiceStrategy(
2324f8c9
JB
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
0d80593b
JB
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
8990357d
JB
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
bde6b5d7 221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 228 }
aee46736
JB
229 }
230
0d80593b
JB
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
2324f8c9
JB
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
0d80593b
JB
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
8990357d 242 if (
2324f8c9 243 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
245 ) {
246 throw new TypeError(
8c0b113f 247 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
248 )
249 }
250 if (
2324f8c9 251 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 252 workerChoiceStrategyOptions.retries < 0
8990357d
JB
253 ) {
254 throw new RangeError(
8c0b113f 255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
256 )
257 }
49be33fe 258 if (
2324f8c9 259 workerChoiceStrategyOptions?.weights != null &&
6b27d407 260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
f0d7f803 266 if (
2324f8c9 267 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
0d80593b
JB
276 }
277
b5604034 278 private initializeEventEmitter (): void {
5b7d00b4 279 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
08f3f44c 284 /** @inheritDoc */
6b27d407
JB
285 public get info (): PoolInfo {
286 return {
23ccf9d7 287 version,
6b27d407 288 type: this.type,
184855e6 289 worker: this.worker,
47352846 290 started: this.started,
2431bdb4
JB
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
293 minSize: this.minSize,
294 maxSize: this.maxSize,
c05f0d50
JB
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
1305e9a8
JB
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
f59e1027 302 workerNode.usage.tasks.executing === 0
a4e07f72
JB
303 ? accumulator + 1
304 : accumulator,
6b27d407
JB
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
f59e1027 309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
310 0
311 ),
a4e07f72 312 executedTasks: this.workerNodes.reduce(
6b27d407 313 (accumulator, workerNode) =>
f59e1027 314 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
f59e1027 319 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
320 0
321 ),
daf86646
JB
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
a1763c54
JB
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
68cbdc84
JB
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
a4e07f72
JB
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
f59e1027 348 accumulator + workerNode.usage.tasks.failed,
a4e07f72 349 0
1dcf8b7b
JB
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
98e72cda 354 minimum: round(
90d6701c 355 min(
98e72cda 356 ...this.workerNodes.map(
041dc05b 357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 358 )
1dcf8b7b
JB
359 )
360 ),
98e72cda 361 maximum: round(
90d6701c 362 max(
98e72cda 363 ...this.workerNodes.map(
041dc05b 364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 365 )
1dcf8b7b 366 )
98e72cda 367 ),
3baa0837
JB
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
98e72cda 377 )
dc021bcc 378 )
3baa0837 379 }),
98e72cda
JB
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
3baa0837
JB
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
98e72cda
JB
388 )
389 )
390 )
391 })
1dcf8b7b
JB
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
98e72cda 397 minimum: round(
90d6701c 398 min(
98e72cda 399 ...this.workerNodes.map(
041dc05b 400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 401 )
1dcf8b7b
JB
402 )
403 ),
98e72cda 404 maximum: round(
90d6701c 405 max(
98e72cda 406 ...this.workerNodes.map(
041dc05b 407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 408 )
1dcf8b7b 409 )
98e72cda 410 ),
3baa0837
JB
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
98e72cda 420 )
dc021bcc 421 )
3baa0837 422 }),
98e72cda
JB
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
3baa0837
JB
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
98e72cda
JB
431 )
432 )
433 )
434 })
1dcf8b7b
JB
435 }
436 })
6b27d407
JB
437 }
438 }
08f3f44c 439
aa9eede8
JB
440 /**
441 * The pool readiness boolean status.
442 */
2431bdb4
JB
443 private get ready (): boolean {
444 return (
b97d82d8
JB
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
2431bdb4
JB
452 )
453 }
454
afe0d5bf 455 /**
aa9eede8 456 * The approximate pool utilization.
afe0d5bf
JB
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
8e5ca040 461 const poolTimeCapacity =
fe7d90db 462 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
71514351 465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
71514351 470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
471 0
472 )
8e5ca040 473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
474 }
475
8881ae32 476 /**
aa9eede8 477 * The pool type.
8881ae32
JB
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
184855e6 483 /**
aa9eede8 484 * The worker type.
184855e6
JB
485 */
486 protected abstract get worker (): WorkerType
487
c2ade475 488 /**
aa9eede8 489 * The pool minimum size.
c2ade475 490 */
8735b4e5
JB
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
ff733df7
JB
494
495 /**
aa9eede8 496 * The pool maximum size.
ff733df7 497 */
8735b4e5
JB
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
a35560ba 501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
4d159167 508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
8e5a7cf9 511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
ffcbbad8 518 /**
f06e48d8 519 * Gets the given worker its worker node key.
ffcbbad8
JB
520 *
521 * @param worker - The worker.
f59e1027 522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 523 */
aad6fb64 524 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 525 return this.workerNodes.findIndex(
041dc05b 526 workerNode => workerNode.worker === worker
f06e48d8 527 )
bf9549ae
JB
528 }
529
aa9eede8
JB
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
aad6fb64 534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 535 */
72ae84a2 536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 537 return this.workerNodes.findIndex(
041dc05b 538 workerNode => workerNode.info.id === workerId
aad6fb64 539 )
aa9eede8
JB
540 }
541
afc003b2 542 /** @inheritDoc */
a35560ba 543 public setWorkerChoiceStrategy (
59219cbb
JB
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 546 ): void {
bde6b5d7 547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 548 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
aa9eede8 555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 556 workerNode.resetUsage()
9edb9717 557 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 558 }
a20f0ba5
JB
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
0d80593b 565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
a20f0ba5
JB
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
a35560ba
S
572 )
573 }
574
a20f0ba5 575 /** @inheritDoc */
8f52842f
JB
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
ef41a6e6 583 this.flushTasksQueues()
a20f0ba5
JB
584 }
585 this.opts.enableTasksQueue = enable
8f52842f 586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
587 }
588
589 /** @inheritDoc */
8f52842f 590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 591 if (this.opts.enableTasksQueue === true) {
bde6b5d7 592 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
596 if (this.opts.tasksQueueOptions.taskStealing === true) {
597 this.setTaskStealing()
598 } else {
599 this.unsetTaskStealing()
600 }
601 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
602 this.setTasksStealingOnBackPressure()
603 } else {
604 this.unsetTasksStealingOnBackPressure()
605 }
5baee0d7 606 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
607 delete this.opts.tasksQueueOptions
608 }
609 }
610
9b38ab2d
JB
611 private buildTasksQueueOptions (
612 tasksQueueOptions: TasksQueueOptions
613 ): TasksQueueOptions {
614 return {
615 ...{
616 size: Math.pow(this.maxSize, 2),
617 concurrency: 1,
618 taskStealing: true,
619 tasksStealingOnBackPressure: true
620 },
621 ...tasksQueueOptions
622 }
623 }
624
5b49e864 625 private setTasksQueueSize (size: number): void {
20c6f652 626 for (const workerNode of this.workerNodes) {
ff3f866a 627 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
628 }
629 }
630
d6ca1416
JB
631 private setTaskStealing (): void {
632 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 633 this.workerNodes[workerNodeKey].on(
65542a35 634 'idleWorkerNode',
e1c2dba7 635 this.handleIdleWorkerNodeEvent
9f95d5eb 636 )
d6ca1416
JB
637 }
638 }
639
640 private unsetTaskStealing (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 642 this.workerNodes[workerNodeKey].off(
65542a35 643 'idleWorkerNode',
e1c2dba7 644 this.handleIdleWorkerNodeEvent
9f95d5eb 645 )
d6ca1416
JB
646 }
647 }
648
649 private setTasksStealingOnBackPressure (): void {
650 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 651 this.workerNodes[workerNodeKey].on(
b5e75be8 652 'backPressure',
e1c2dba7 653 this.handleBackPressureEvent
9f95d5eb 654 )
d6ca1416
JB
655 }
656 }
657
658 private unsetTasksStealingOnBackPressure (): void {
659 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 660 this.workerNodes[workerNodeKey].off(
b5e75be8 661 'backPressure',
e1c2dba7 662 this.handleBackPressureEvent
9f95d5eb 663 )
d6ca1416
JB
664 }
665 }
666
c319c66b
JB
667 /**
668 * Whether the pool is full or not.
669 *
670 * The pool filling boolean status.
671 */
dea903a8
JB
672 protected get full (): boolean {
673 return this.workerNodes.length >= this.maxSize
674 }
c2ade475 675
c319c66b
JB
676 /**
677 * Whether the pool is busy or not.
678 *
679 * The pool busyness boolean status.
680 */
681 protected abstract get busy (): boolean
7c0ba920 682
6c6afb84 683 /**
3d76750a 684 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
685 *
686 * @returns Worker nodes busyness boolean status.
687 */
c2ade475 688 protected internalBusy (): boolean {
3d76750a
JB
689 if (this.opts.enableTasksQueue === true) {
690 return (
691 this.workerNodes.findIndex(
041dc05b 692 workerNode =>
3d76750a
JB
693 workerNode.info.ready &&
694 workerNode.usage.tasks.executing <
695 (this.opts.tasksQueueOptions?.concurrency as number)
696 ) === -1
697 )
3d76750a 698 }
419e8121
JB
699 return (
700 this.workerNodes.findIndex(
701 workerNode =>
702 workerNode.info.ready && workerNode.usage.tasks.executing === 0
703 ) === -1
704 )
cb70b19d
JB
705 }
706
e81c38f2 707 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
708 workerNodeKey: number,
709 message: MessageValue<Data>
710 ): Promise<boolean> {
72ae84a2 711 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
712 const taskFunctionOperationListener = (
713 message: MessageValue<Response>
714 ): void => {
715 this.checkMessageWorkerId(message)
716 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 717 if (
ae036c3e
JB
718 message.taskFunctionOperationStatus != null &&
719 message.workerId === workerId
72ae84a2 720 ) {
ae036c3e
JB
721 if (message.taskFunctionOperationStatus) {
722 resolve(true)
723 } else if (!message.taskFunctionOperationStatus) {
724 reject(
725 new Error(
726 `Task function operation '${
727 message.taskFunctionOperation as string
728 }' failed on worker ${message.workerId} with error: '${
729 message.workerError?.message as string
730 }'`
731 )
72ae84a2 732 )
ae036c3e
JB
733 }
734 this.deregisterWorkerMessageListener(
735 this.getWorkerNodeKeyByWorkerId(message.workerId),
736 taskFunctionOperationListener
72ae84a2
JB
737 )
738 }
ae036c3e
JB
739 }
740 this.registerWorkerMessageListener(
741 workerNodeKey,
742 taskFunctionOperationListener
743 )
72ae84a2
JB
744 this.sendToWorker(workerNodeKey, message)
745 })
746 }
747
748 private async sendTaskFunctionOperationToWorkers (
adee6053 749 message: MessageValue<Data>
e81c38f2
JB
750 ): Promise<boolean> {
751 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
752 const responsesReceived = new Array<MessageValue<Response>>()
753 const taskFunctionOperationsListener = (
754 message: MessageValue<Response>
755 ): void => {
756 this.checkMessageWorkerId(message)
757 if (message.taskFunctionOperationStatus != null) {
758 responsesReceived.push(message)
759 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 760 if (
e81c38f2
JB
761 responsesReceived.every(
762 message => message.taskFunctionOperationStatus === true
763 )
764 ) {
765 resolve(true)
766 } else if (
e81c38f2
JB
767 responsesReceived.some(
768 message => message.taskFunctionOperationStatus === false
769 )
770 ) {
b0b55f57
JB
771 const errorResponse = responsesReceived.find(
772 response => response.taskFunctionOperationStatus === false
773 )
e81c38f2
JB
774 reject(
775 new Error(
b0b55f57 776 `Task function operation '${
e81c38f2 777 message.taskFunctionOperation as string
b0b55f57
JB
778 }' failed on worker ${
779 errorResponse?.workerId as number
780 } with error: '${
781 errorResponse?.workerError?.message as string
782 }'`
e81c38f2
JB
783 )
784 )
785 }
ae036c3e
JB
786 this.deregisterWorkerMessageListener(
787 this.getWorkerNodeKeyByWorkerId(message.workerId),
788 taskFunctionOperationsListener
789 )
e81c38f2 790 }
ae036c3e
JB
791 }
792 }
793 for (const [workerNodeKey] of this.workerNodes.entries()) {
794 this.registerWorkerMessageListener(
795 workerNodeKey,
796 taskFunctionOperationsListener
797 )
72ae84a2 798 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
799 }
800 })
6703b9f4
JB
801 }
802
803 /** @inheritDoc */
804 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
805 for (const workerNode of this.workerNodes) {
806 if (
807 Array.isArray(workerNode.info.taskFunctionNames) &&
808 workerNode.info.taskFunctionNames.includes(name)
809 ) {
810 return true
811 }
812 }
813 return false
6703b9f4
JB
814 }
815
816 /** @inheritDoc */
e81c38f2
JB
817 public async addTaskFunction (
818 name: string,
3feeab69 819 fn: TaskFunction<Data, Response>
e81c38f2 820 ): Promise<boolean> {
3feeab69
JB
821 if (typeof name !== 'string') {
822 throw new TypeError('name argument must be a string')
823 }
824 if (typeof name === 'string' && name.trim().length === 0) {
825 throw new TypeError('name argument must not be an empty string')
826 }
827 if (typeof fn !== 'function') {
828 throw new TypeError('fn argument must be a function')
829 }
adee6053 830 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
831 taskFunctionOperation: 'add',
832 taskFunctionName: name,
3feeab69 833 taskFunction: fn.toString()
6703b9f4 834 })
adee6053
JB
835 this.taskFunctions.set(name, fn)
836 return opResult
6703b9f4
JB
837 }
838
839 /** @inheritDoc */
e81c38f2 840 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
841 if (!this.taskFunctions.has(name)) {
842 throw new Error(
16248b23 843 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
844 )
845 }
adee6053 846 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
847 taskFunctionOperation: 'remove',
848 taskFunctionName: name
849 })
adee6053
JB
850 this.deleteTaskFunctionWorkerUsages(name)
851 this.taskFunctions.delete(name)
852 return opResult
6703b9f4
JB
853 }
854
90d7d101 855 /** @inheritDoc */
6703b9f4 856 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
857 for (const workerNode of this.workerNodes) {
858 if (
6703b9f4
JB
859 Array.isArray(workerNode.info.taskFunctionNames) &&
860 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 861 ) {
6703b9f4 862 return workerNode.info.taskFunctionNames
f2dbbf95 863 }
90d7d101 864 }
f2dbbf95 865 return []
90d7d101
JB
866 }
867
6703b9f4 868 /** @inheritDoc */
e81c38f2 869 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 870 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
871 taskFunctionOperation: 'default',
872 taskFunctionName: name
873 })
6703b9f4
JB
874 }
875
adee6053
JB
876 private deleteTaskFunctionWorkerUsages (name: string): void {
877 for (const workerNode of this.workerNodes) {
878 workerNode.deleteTaskFunctionWorkerUsage(name)
879 }
880 }
881
375f7504
JB
882 private shallExecuteTask (workerNodeKey: number): boolean {
883 return (
884 this.tasksQueueSize(workerNodeKey) === 0 &&
885 this.workerNodes[workerNodeKey].usage.tasks.executing <
886 (this.opts.tasksQueueOptions?.concurrency as number)
887 )
888 }
889
afc003b2 890 /** @inheritDoc */
7d91a8cd
JB
891 public async execute (
892 data?: Data,
893 name?: string,
894 transferList?: TransferListItem[]
895 ): Promise<Response> {
52b71763 896 return await new Promise<Response>((resolve, reject) => {
15b176e0 897 if (!this.started) {
47352846 898 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 899 return
15b176e0 900 }
711623b8
JB
901 if (this.destroying) {
902 reject(new Error('Cannot execute a task on destroying pool'))
903 return
904 }
7d91a8cd
JB
905 if (name != null && typeof name !== 'string') {
906 reject(new TypeError('name argument must be a string'))
9d2d0da1 907 return
7d91a8cd 908 }
90d7d101
JB
909 if (
910 name != null &&
911 typeof name === 'string' &&
912 name.trim().length === 0
913 ) {
f58b60b9 914 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 915 return
90d7d101 916 }
b558f6b5
JB
917 if (transferList != null && !Array.isArray(transferList)) {
918 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 919 return
b558f6b5
JB
920 }
921 const timestamp = performance.now()
922 const workerNodeKey = this.chooseWorkerNode()
501aea93 923 const task: Task<Data> = {
52b71763
JB
924 name: name ?? DEFAULT_TASK_NAME,
925 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
926 data: data ?? ({} as Data),
7d91a8cd 927 transferList,
52b71763 928 timestamp,
7629bdf1 929 taskId: randomUUID()
52b71763 930 }
7629bdf1 931 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
932 resolve,
933 reject,
501aea93 934 workerNodeKey
2e81254d 935 })
52b71763 936 if (
4e377863
JB
937 this.opts.enableTasksQueue === false ||
938 (this.opts.enableTasksQueue === true &&
375f7504 939 this.shallExecuteTask(workerNodeKey))
52b71763 940 ) {
501aea93 941 this.executeTask(workerNodeKey, task)
4e377863
JB
942 } else {
943 this.enqueueTask(workerNodeKey, task)
52b71763 944 }
2e81254d 945 })
280c2a77 946 }
c97c7edb 947
47352846
JB
948 /** @inheritdoc */
949 public start (): void {
711623b8
JB
950 if (this.started) {
951 throw new Error('Cannot start an already started pool')
952 }
953 if (this.starting) {
954 throw new Error('Cannot start an already starting pool')
955 }
956 if (this.destroying) {
957 throw new Error('Cannot start a destroying pool')
958 }
47352846
JB
959 this.starting = true
960 while (
961 this.workerNodes.reduce(
962 (accumulator, workerNode) =>
963 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
964 0
965 ) < this.numberOfWorkers
966 ) {
967 this.createAndSetupWorkerNode()
968 }
969 this.starting = false
970 this.started = true
971 }
972
afc003b2 973 /** @inheritDoc */
c97c7edb 974 public async destroy (): Promise<void> {
711623b8
JB
975 if (!this.started) {
976 throw new Error('Cannot destroy an already destroyed pool')
977 }
978 if (this.starting) {
979 throw new Error('Cannot destroy an starting pool')
980 }
981 if (this.destroying) {
982 throw new Error('Cannot destroy an already destroying pool')
983 }
984 this.destroying = true
1fbcaa7c 985 await Promise.all(
14c39df1 986 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 987 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
988 })
989 )
33e6bb4c 990 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 991 this.emitter?.emitDestroy()
78f60f82 992 this.emitter?.removeAllListeners()
55082af9 993 this.readyEventEmitted = false
711623b8 994 this.destroying = false
15b176e0 995 this.started = false
c97c7edb
S
996 }
997
1e3214b6 998 protected async sendKillMessageToWorker (
72ae84a2 999 workerNodeKey: number
1e3214b6 1000 ): Promise<void> {
9edb9717 1001 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
1002 const killMessageListener = (message: MessageValue<Response>): void => {
1003 this.checkMessageWorkerId(message)
1e3214b6
JB
1004 if (message.kill === 'success') {
1005 resolve()
1006 } else if (message.kill === 'failure') {
72ae84a2
JB
1007 reject(
1008 new Error(
ae036c3e 1009 `Kill message handling failed on worker ${
72ae84a2 1010 message.workerId as number
ae036c3e 1011 }`
72ae84a2
JB
1012 )
1013 )
1e3214b6 1014 }
ae036c3e 1015 }
51d9cfbd 1016 // FIXME: should be registered only once
ae036c3e 1017 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1018 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1019 })
1e3214b6
JB
1020 }
1021
4a6952ff 1022 /**
aa9eede8 1023 * Terminates the worker node given its worker node key.
4a6952ff 1024 *
aa9eede8 1025 * @param workerNodeKey - The worker node key.
4a6952ff 1026 */
81c02522 1027 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1028
729c563d 1029 /**
6677a3d3
JB
1030 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1031 * Can be overridden.
afc003b2
JB
1032 *
1033 * @virtual
729c563d 1034 */
280c2a77 1035 protected setupHook (): void {
965df41c 1036 /* Intentionally empty */
280c2a77 1037 }
c97c7edb 1038
729c563d 1039 /**
280c2a77
S
1040 * Should return whether the worker is the main worker or not.
1041 */
1042 protected abstract isMain (): boolean
1043
1044 /**
2e81254d 1045 * Hook executed before the worker task execution.
bf9549ae 1046 * Can be overridden.
729c563d 1047 *
f06e48d8 1048 * @param workerNodeKey - The worker node key.
1c6fe997 1049 * @param task - The task to execute.
729c563d 1050 */
1c6fe997
JB
1051 protected beforeTaskExecutionHook (
1052 workerNodeKey: number,
1053 task: Task<Data>
1054 ): void {
94407def
JB
1055 if (this.workerNodes[workerNodeKey]?.usage != null) {
1056 const workerUsage = this.workerNodes[workerNodeKey].usage
1057 ++workerUsage.tasks.executing
1058 this.updateWaitTimeWorkerUsage(workerUsage, task)
1059 }
1060 if (
1061 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1062 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1063 task.name as string
1064 ) != null
1065 ) {
db0e38ee 1066 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1067 workerNodeKey
db0e38ee 1068 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1069 ++taskFunctionWorkerUsage.tasks.executing
1070 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1071 }
c97c7edb
S
1072 }
1073
c01733f1 1074 /**
2e81254d 1075 * Hook executed after the worker task execution.
bf9549ae 1076 * Can be overridden.
c01733f1 1077 *
501aea93 1078 * @param workerNodeKey - The worker node key.
38e795c1 1079 * @param message - The received message.
c01733f1 1080 */
2e81254d 1081 protected afterTaskExecutionHook (
501aea93 1082 workerNodeKey: number,
2740a743 1083 message: MessageValue<Response>
bf9549ae 1084 ): void {
94407def
JB
1085 if (this.workerNodes[workerNodeKey]?.usage != null) {
1086 const workerUsage = this.workerNodes[workerNodeKey].usage
1087 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1088 this.updateRunTimeWorkerUsage(workerUsage, message)
1089 this.updateEluWorkerUsage(workerUsage, message)
1090 }
1091 if (
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1093 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1094 message.taskPerformance?.name as string
94407def
JB
1095 ) != null
1096 ) {
db0e38ee 1097 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1098 workerNodeKey
db0e38ee 1099 ].getTaskFunctionWorkerUsage(
0628755c 1100 message.taskPerformance?.name as string
b558f6b5 1101 ) as WorkerUsage
db0e38ee
JB
1102 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1103 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1104 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1105 }
1106 }
1107
db0e38ee
JB
1108 /**
1109 * Whether the worker node shall update its task function worker usage or not.
1110 *
1111 * @param workerNodeKey - The worker node key.
1112 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1113 */
1114 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1115 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1116 return (
94407def 1117 workerInfo != null &&
6703b9f4
JB
1118 Array.isArray(workerInfo.taskFunctionNames) &&
1119 workerInfo.taskFunctionNames.length > 2
b558f6b5 1120 )
f1c06930
JB
1121 }
1122
1123 private updateTaskStatisticsWorkerUsage (
1124 workerUsage: WorkerUsage,
1125 message: MessageValue<Response>
1126 ): void {
a4e07f72 1127 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1128 if (
1129 workerTaskStatistics.executing != null &&
1130 workerTaskStatistics.executing > 0
1131 ) {
1132 --workerTaskStatistics.executing
5bb5be17 1133 }
6703b9f4 1134 if (message.workerError == null) {
98e72cda
JB
1135 ++workerTaskStatistics.executed
1136 } else {
a4e07f72 1137 ++workerTaskStatistics.failed
2740a743 1138 }
f8eb0a2a
JB
1139 }
1140
a4e07f72
JB
1141 private updateRunTimeWorkerUsage (
1142 workerUsage: WorkerUsage,
f8eb0a2a
JB
1143 message: MessageValue<Response>
1144 ): void {
6703b9f4 1145 if (message.workerError != null) {
dc021bcc
JB
1146 return
1147 }
e4f20deb
JB
1148 updateMeasurementStatistics(
1149 workerUsage.runTime,
1150 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1151 message.taskPerformance?.runTime ?? 0
e4f20deb 1152 )
f8eb0a2a
JB
1153 }
1154
a4e07f72
JB
1155 private updateWaitTimeWorkerUsage (
1156 workerUsage: WorkerUsage,
1c6fe997 1157 task: Task<Data>
f8eb0a2a 1158 ): void {
1c6fe997
JB
1159 const timestamp = performance.now()
1160 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1161 updateMeasurementStatistics(
1162 workerUsage.waitTime,
1163 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1164 taskWaitTime
e4f20deb 1165 )
c01733f1 1166 }
1167
a4e07f72 1168 private updateEluWorkerUsage (
5df69fab 1169 workerUsage: WorkerUsage,
62c15a68
JB
1170 message: MessageValue<Response>
1171 ): void {
6703b9f4 1172 if (message.workerError != null) {
dc021bcc
JB
1173 return
1174 }
008512c7
JB
1175 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1176 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1177 updateMeasurementStatistics(
1178 workerUsage.elu.active,
008512c7 1179 eluTaskStatisticsRequirements,
dc021bcc 1180 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1181 )
1182 updateMeasurementStatistics(
1183 workerUsage.elu.idle,
008512c7 1184 eluTaskStatisticsRequirements,
dc021bcc 1185 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1186 )
008512c7 1187 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1188 if (message.taskPerformance?.elu != null) {
f7510105
JB
1189 if (workerUsage.elu.utilization != null) {
1190 workerUsage.elu.utilization =
1191 (workerUsage.elu.utilization +
1192 message.taskPerformance.elu.utilization) /
1193 2
1194 } else {
1195 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1196 }
62c15a68
JB
1197 }
1198 }
1199 }
1200
280c2a77 1201 /**
f06e48d8 1202 * Chooses a worker node for the next task.
280c2a77 1203 *
6c6afb84 1204 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1205 *
aa9eede8 1206 * @returns The chosen worker node key
280c2a77 1207 */
6c6afb84 1208 private chooseWorkerNode (): number {
930dcf12 1209 if (this.shallCreateDynamicWorker()) {
aa9eede8 1210 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1211 if (
b1aae695 1212 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1213 ) {
aa9eede8 1214 return workerNodeKey
6c6afb84 1215 }
17393ac8 1216 }
930dcf12
JB
1217 return this.workerChoiceStrategyContext.execute()
1218 }
1219
6c6afb84
JB
1220 /**
1221 * Conditions for dynamic worker creation.
1222 *
1223 * @returns Whether to create a dynamic worker or not.
1224 */
1225 private shallCreateDynamicWorker (): boolean {
930dcf12 1226 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1227 }
1228
280c2a77 1229 /**
aa9eede8 1230 * Sends a message to worker given its worker node key.
280c2a77 1231 *
aa9eede8 1232 * @param workerNodeKey - The worker node key.
38e795c1 1233 * @param message - The message.
7d91a8cd 1234 * @param transferList - The optional array of transferable objects.
280c2a77
S
1235 */
1236 protected abstract sendToWorker (
aa9eede8 1237 workerNodeKey: number,
7d91a8cd
JB
1238 message: MessageValue<Data>,
1239 transferList?: TransferListItem[]
280c2a77
S
1240 ): void
1241
729c563d 1242 /**
41344292 1243 * Creates a new worker.
6c6afb84
JB
1244 *
1245 * @returns Newly created worker.
729c563d 1246 */
280c2a77 1247 protected abstract createWorker (): Worker
c97c7edb 1248
4a6952ff 1249 /**
aa9eede8 1250 * Creates a new, completely set up worker node.
4a6952ff 1251 *
aa9eede8 1252 * @returns New, completely set up worker node key.
4a6952ff 1253 */
aa9eede8 1254 protected createAndSetupWorkerNode (): number {
bdacc2d2 1255 const worker = this.createWorker()
280c2a77 1256
fd04474e 1257 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1258 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1259 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1260 worker.on('error', error => {
aad6fb64 1261 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1262 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1263 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1264 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1265 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1266 if (
b6bfca01 1267 this.started &&
9b38ab2d 1268 !this.starting &&
711623b8 1269 !this.destroying &&
9b38ab2d 1270 this.opts.restartWorkerOnError === true
15b176e0 1271 ) {
9b106837 1272 if (workerInfo.dynamic) {
aa9eede8 1273 this.createAndSetupDynamicWorkerNode()
8a1260a3 1274 } else {
aa9eede8 1275 this.createAndSetupWorkerNode()
8a1260a3 1276 }
5baee0d7 1277 }
6d59c3a3 1278 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1279 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1280 }
5baee0d7 1281 })
a35560ba 1282 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1283 worker.once('exit', () => {
f06e48d8 1284 this.removeWorkerNode(worker)
a974afa6 1285 })
280c2a77 1286
aa9eede8 1287 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1288
aa9eede8 1289 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1290
aa9eede8 1291 return workerNodeKey
c97c7edb 1292 }
be0676b3 1293
930dcf12 1294 /**
aa9eede8 1295 * Creates a new, completely set up dynamic worker node.
930dcf12 1296 *
aa9eede8 1297 * @returns New, completely set up dynamic worker node key.
930dcf12 1298 */
aa9eede8
JB
1299 protected createAndSetupDynamicWorkerNode (): number {
1300 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1301 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1302 this.checkMessageWorkerId(message)
aa9eede8
JB
1303 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1304 message.workerId
aad6fb64 1305 )
aa9eede8 1306 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1307 // Kill message received from worker
930dcf12
JB
1308 if (
1309 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1310 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1311 ((this.opts.enableTasksQueue === false &&
aa9eede8 1312 workerUsage.tasks.executing === 0) ||
7b56f532 1313 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1314 workerUsage.tasks.executing === 0 &&
1315 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1316 ) {
f3827d5d 1317 // Flag the worker node as not ready immediately
ae3ab61d 1318 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1319 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1320 this.emitter?.emit(PoolEvents.error, error)
1321 })
930dcf12
JB
1322 }
1323 })
46b0bb09 1324 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1325 this.sendToWorker(workerNodeKey, {
e9dd5b66 1326 checkActive: true
21f710aa 1327 })
72ae84a2
JB
1328 if (this.taskFunctions.size > 0) {
1329 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1330 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1331 taskFunctionOperation: 'add',
1332 taskFunctionName,
1333 taskFunction: taskFunction.toString()
1334 }).catch(error => {
1335 this.emitter?.emit(PoolEvents.error, error)
1336 })
1337 }
1338 }
b5e113f6 1339 workerInfo.dynamic = true
b1aae695
JB
1340 if (
1341 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1342 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1343 ) {
b5e113f6
JB
1344 workerInfo.ready = true
1345 }
33e6bb4c 1346 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1347 return workerNodeKey
930dcf12
JB
1348 }
1349
a2ed5053 1350 /**
aa9eede8 1351 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1352 *
aa9eede8 1353 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1354 * @param listener - The message listener callback.
1355 */
85aeb3f3
JB
1356 protected abstract registerWorkerMessageListener<
1357 Message extends Data | Response
aa9eede8
JB
1358 >(
1359 workerNodeKey: number,
1360 listener: (message: MessageValue<Message>) => void
1361 ): void
a2ed5053 1362
ae036c3e
JB
1363 /**
1364 * Registers once a listener callback on the worker given its worker node key.
1365 *
1366 * @param workerNodeKey - The worker node key.
1367 * @param listener - The message listener callback.
1368 */
1369 protected abstract registerOnceWorkerMessageListener<
1370 Message extends Data | Response
1371 >(
1372 workerNodeKey: number,
1373 listener: (message: MessageValue<Message>) => void
1374 ): void
1375
1376 /**
1377 * Deregisters a listener callback on the worker given its worker node key.
1378 *
1379 * @param workerNodeKey - The worker node key.
1380 * @param listener - The message listener callback.
1381 */
1382 protected abstract deregisterWorkerMessageListener<
1383 Message extends Data | Response
1384 >(
1385 workerNodeKey: number,
1386 listener: (message: MessageValue<Message>) => void
1387 ): void
1388
a2ed5053 1389 /**
aa9eede8 1390 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1391 * Can be overridden.
1392 *
aa9eede8 1393 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1394 */
aa9eede8 1395 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1396 // Listen to worker messages.
fcd39179
JB
1397 this.registerWorkerMessageListener(
1398 workerNodeKey,
1399 this.workerMessageListener.bind(this)
1400 )
85aeb3f3 1401 // Send the startup message to worker.
aa9eede8 1402 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1403 // Send the statistics message to worker.
1404 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1405 if (this.opts.enableTasksQueue === true) {
dbd73092 1406 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1407 this.workerNodes[workerNodeKey].on(
65542a35 1408 'idleWorkerNode',
e1c2dba7 1409 this.handleIdleWorkerNodeEvent
9f95d5eb 1410 )
47352846
JB
1411 }
1412 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1413 this.workerNodes[workerNodeKey].on(
b5e75be8 1414 'backPressure',
e1c2dba7 1415 this.handleBackPressureEvent
9f95d5eb 1416 )
47352846 1417 }
72695f86 1418 }
d2c73f82
JB
1419 }
1420
85aeb3f3 1421 /**
aa9eede8
JB
1422 * Sends the startup message to worker given its worker node key.
1423 *
1424 * @param workerNodeKey - The worker node key.
1425 */
1426 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1427
1428 /**
9edb9717 1429 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1430 *
aa9eede8 1431 * @param workerNodeKey - The worker node key.
85aeb3f3 1432 */
9edb9717 1433 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1434 this.sendToWorker(workerNodeKey, {
1435 statistics: {
1436 runTime:
1437 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1438 .runTime.aggregate,
1439 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1440 .elu.aggregate
72ae84a2 1441 }
aa9eede8
JB
1442 })
1443 }
a2ed5053
JB
1444
1445 private redistributeQueuedTasks (workerNodeKey: number): void {
1446 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1447 const destinationWorkerNodeKey = this.workerNodes.reduce(
1448 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1449 return workerNode.info.ready &&
1450 workerNode.usage.tasks.queued <
1451 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1452 ? workerNodeKey
1453 : minWorkerNodeKey
1454 },
1455 0
1456 )
72ae84a2 1457 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1458 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1459 this.executeTask(destinationWorkerNodeKey, task)
1460 } else {
1461 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1462 }
1463 }
1464 }
1465
b1838604
JB
1466 private updateTaskStolenStatisticsWorkerUsage (
1467 workerNodeKey: number,
b1838604
JB
1468 taskName: string
1469 ): void {
1a880eca 1470 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1471 if (workerNode?.usage != null) {
1472 ++workerNode.usage.tasks.stolen
1473 }
1474 if (
1475 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1476 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1477 ) {
1478 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1479 taskName
1480 ) as WorkerUsage
1481 ++taskFunctionWorkerUsage.tasks.stolen
1482 }
1483 }
1484
463226a4 1485 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1486 workerNodeKey: number
463226a4
JB
1487 ): void {
1488 const workerNode = this.workerNodes[workerNodeKey]
1489 if (workerNode?.usage != null) {
1490 ++workerNode.usage.tasks.sequentiallyStolen
1491 }
f1f77f45
JB
1492 }
1493
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey: number,
1496 taskName: string
1497 ): void {
1498 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1499 if (
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1501 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1502 ) {
1503 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1504 taskName
1505 ) as WorkerUsage
1506 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1507 }
1508 }
1509
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1511 workerNodeKey: number
463226a4
JB
1512 ): void {
1513 const workerNode = this.workerNodes[workerNodeKey]
1514 if (workerNode?.usage != null) {
1515 workerNode.usage.tasks.sequentiallyStolen = 0
1516 }
f1f77f45
JB
1517 }
1518
1519 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1520 workerNodeKey: number,
1521 taskName: string
1522 ): void {
1523 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1524 if (
1525 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1526 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1527 ) {
1528 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1529 taskName
1530 ) as WorkerUsage
1531 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1532 }
1533 }
1534
65542a35 1535 private readonly handleIdleWorkerNodeEvent = (
e1c2dba7 1536 eventDetail: WorkerNodeEventDetail,
463226a4 1537 previousStolenTask?: Task<Data>
9f95d5eb 1538 ): void => {
e1c2dba7 1539 const { workerNodeKey } = eventDetail
463226a4
JB
1540 if (workerNodeKey == null) {
1541 throw new Error(
1542 'WorkerNode event detail workerNodeKey attribute must be defined'
1543 )
1544 }
1545 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1546 if (
1547 previousStolenTask != null &&
1548 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1549 (workerNodeTasksUsage.executing > 0 ||
1550 this.tasksQueueSize(workerNodeKey) > 0)
1551 ) {
f1f77f45
JB
1552 for (const taskName of this.workerNodes[workerNodeKey].info
1553 .taskFunctionNames as string[]) {
1554 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1555 workerNodeKey,
1556 taskName
1557 )
1558 }
1559 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1560 return
1561 }
1562 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1563 if (
1564 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1565 stolenTask != null
1566 ) {
1567 const taskFunctionTasksWorkerUsage = this.workerNodes[
1568 workerNodeKey
1569 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1570 ?.tasks as TaskStatistics
1571 if (
1572 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1573 (previousStolenTask != null &&
1574 previousStolenTask.name === stolenTask.name &&
1575 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1576 ) {
1577 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1578 workerNodeKey,
1579 stolenTask.name as string
1580 )
1581 } else {
1582 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1583 workerNodeKey,
1584 stolenTask.name as string
1585 )
1586 }
1587 }
463226a4
JB
1588 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1589 .then(() => {
e1c2dba7 1590 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
463226a4
JB
1591 return undefined
1592 })
1593 .catch(EMPTY_FUNCTION)
1594 }
1595
1596 private readonly workerNodeStealTask = (
1597 workerNodeKey: number
1598 ): Task<Data> | undefined => {
dd951876 1599 const workerNodes = this.workerNodes
a6b3272b 1600 .slice()
dd951876
JB
1601 .sort(
1602 (workerNodeA, workerNodeB) =>
1603 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1604 )
f201a0cd 1605 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1606 (sourceWorkerNode, sourceWorkerNodeKey) =>
1607 sourceWorkerNode.info.ready &&
1608 sourceWorkerNodeKey !== workerNodeKey &&
1609 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1610 )
1611 if (sourceWorkerNode != null) {
72ae84a2 1612 const task = sourceWorkerNode.popTask() as Task<Data>
463226a4
JB
1613 if (this.shallExecuteTask(workerNodeKey)) {
1614 this.executeTask(workerNodeKey, task)
f201a0cd 1615 } else {
463226a4 1616 this.enqueueTask(workerNodeKey, task)
72695f86 1617 }
f1f77f45 1618 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
f201a0cd 1619 this.updateTaskStolenStatisticsWorkerUsage(
463226a4 1620 workerNodeKey,
f201a0cd
JB
1621 task.name as string
1622 )
463226a4 1623 return task
72695f86
JB
1624 }
1625 }
1626
9f95d5eb 1627 private readonly handleBackPressureEvent = (
e1c2dba7 1628 eventDetail: WorkerNodeEventDetail
9f95d5eb 1629 ): void => {
e1c2dba7 1630 const { workerId } = eventDetail
f778c355
JB
1631 const sizeOffset = 1
1632 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1633 return
1634 }
72695f86 1635 const sourceWorkerNode =
b5e75be8 1636 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1637 const workerNodes = this.workerNodes
a6b3272b 1638 .slice()
72695f86
JB
1639 .sort(
1640 (workerNodeA, workerNodeB) =>
1641 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1642 )
1643 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1644 if (
0bc68267 1645 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1646 workerNode.info.ready &&
b5e75be8 1647 workerNode.info.id !== workerId &&
0bc68267 1648 workerNode.usage.tasks.queued <
f778c355 1649 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1650 ) {
72ae84a2 1651 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1652 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1653 this.executeTask(workerNodeKey, task)
4de3d785 1654 } else {
dd951876 1655 this.enqueueTask(workerNodeKey, task)
4de3d785 1656 }
b1838604
JB
1657 this.updateTaskStolenStatisticsWorkerUsage(
1658 workerNodeKey,
b1838604
JB
1659 task.name as string
1660 )
10ecf8fd 1661 }
a2ed5053
JB
1662 }
1663 }
1664
be0676b3 1665 /**
fcd39179 1666 * This method is the message listener registered on each worker.
be0676b3 1667 */
fcd39179
JB
1668 protected workerMessageListener (message: MessageValue<Response>): void {
1669 this.checkMessageWorkerId(message)
b641345c
JB
1670 const { workerId, ready, taskId, taskFunctionNames } = message
1671 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1672 // Worker ready response received from worker
1673 this.handleWorkerReadyResponse(message)
b641345c 1674 } else if (taskId != null) {
fcd39179
JB
1675 // Task execution response received from worker
1676 this.handleTaskExecutionResponse(message)
b641345c 1677 } else if (taskFunctionNames != null) {
fcd39179
JB
1678 // Task function names message received from worker
1679 this.getWorkerInfo(
b641345c
JB
1680 this.getWorkerNodeKeyByWorkerId(workerId)
1681 ).taskFunctionNames = taskFunctionNames
6b272951
JB
1682 }
1683 }
1684
10e2aa7e 1685 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4
JB
1686 const { workerId, ready, taskFunctionNames } = message
1687 if (ready === false) {
1688 throw new Error(`Worker ${workerId as number} failed to initialize`)
f05ed93c 1689 }
a5d15204 1690 const workerInfo = this.getWorkerInfo(
463226a4 1691 this.getWorkerNodeKeyByWorkerId(workerId)
46b0bb09 1692 )
463226a4
JB
1693 workerInfo.ready = ready as boolean
1694 workerInfo.taskFunctionNames = taskFunctionNames
55082af9
JB
1695 if (!this.readyEventEmitted && this.ready) {
1696 this.readyEventEmitted = true
1697 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1698 }
6b272951
JB
1699 }
1700
1701 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1702 const { workerId, taskId, workerError, data } = message
5441aea6 1703 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1704 if (promiseResponse != null) {
51af50ef 1705 const { resolve, reject, workerNodeKey } = promiseResponse
6703b9f4
JB
1706 if (workerError != null) {
1707 this.emitter?.emit(PoolEvents.taskError, workerError)
51af50ef 1708 reject(workerError.message)
6b272951 1709 } else {
51af50ef 1710 resolve(data as Response)
6b272951 1711 }
501aea93 1712 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1713 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1714 this.promiseResponseMap.delete(taskId as string)
463226a4
JB
1715 if (this.opts.enableTasksQueue === true) {
1716 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1717 if (
1718 this.tasksQueueSize(workerNodeKey) > 0 &&
1719 workerNodeTasksUsage.executing <
1720 (this.opts.tasksQueueOptions?.concurrency as number)
1721 ) {
1722 this.executeTask(
1723 workerNodeKey,
1724 this.dequeueTask(workerNodeKey) as Task<Data>
1725 )
1726 }
1727 if (
1728 workerNodeTasksUsage.executing === 0 &&
1729 this.tasksQueueSize(workerNodeKey) === 0 &&
1730 workerNodeTasksUsage.sequentiallyStolen === 0
1731 ) {
e1c2dba7
JB
1732 this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
1733 workerId: workerId as number,
1734 workerNodeKey
1735 })
463226a4 1736 }
be0676b3
APA
1737 }
1738 }
be0676b3 1739 }
7c0ba920 1740
a1763c54 1741 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1742 if (this.busy) {
1743 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1744 }
1745 }
1746
1747 private checkAndEmitTaskQueuingEvents (): void {
1748 if (this.hasBackPressure()) {
1749 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1750 }
1751 }
1752
33e6bb4c
JB
1753 private checkAndEmitDynamicWorkerCreationEvents (): void {
1754 if (this.type === PoolTypes.dynamic) {
1755 if (this.full) {
1756 this.emitter?.emit(PoolEvents.full, this.info)
1757 }
1758 }
1759 }
1760
8a1260a3 1761 /**
aa9eede8 1762 * Gets the worker information given its worker node key.
8a1260a3
JB
1763 *
1764 * @param workerNodeKey - The worker node key.
3f09ed9f 1765 * @returns The worker information.
8a1260a3 1766 */
46b0bb09 1767 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1768 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1769 }
1770
a05c10de 1771 /**
b0a4db63 1772 * Adds the given worker in the pool worker nodes.
ea7a90d3 1773 *
38e795c1 1774 * @param worker - The worker.
aa9eede8
JB
1775 * @returns The added worker node key.
1776 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1777 */
b0a4db63 1778 private addWorkerNode (worker: Worker): number {
671d5154
JB
1779 const workerNode = new WorkerNode<Worker, Data>(
1780 worker,
ff3f866a 1781 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1782 )
b97d82d8 1783 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1784 if (this.starting) {
1785 workerNode.info.ready = true
1786 }
aa9eede8 1787 this.workerNodes.push(workerNode)
aad6fb64 1788 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1789 if (workerNodeKey === -1) {
86ed0598 1790 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1791 }
1792 return workerNodeKey
ea7a90d3 1793 }
c923ce56 1794
51fe3d3c 1795 /**
f06e48d8 1796 * Removes the given worker from the pool worker nodes.
51fe3d3c 1797 *
f06e48d8 1798 * @param worker - The worker.
51fe3d3c 1799 */
416fd65c 1800 private removeWorkerNode (worker: Worker): void {
aad6fb64 1801 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1802 if (workerNodeKey !== -1) {
1803 this.workerNodes.splice(workerNodeKey, 1)
1804 this.workerChoiceStrategyContext.remove(workerNodeKey)
1805 }
51fe3d3c 1806 }
adc3c320 1807
ae3ab61d
JB
1808 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1809 this.getWorkerInfo(workerNodeKey).ready = false
1810 }
1811
e2b31e32
JB
1812 /** @inheritDoc */
1813 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1814 return (
e2b31e32
JB
1815 this.opts.enableTasksQueue === true &&
1816 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1817 )
1818 }
1819
1820 private hasBackPressure (): boolean {
1821 return (
1822 this.opts.enableTasksQueue === true &&
1823 this.workerNodes.findIndex(
041dc05b 1824 workerNode => !workerNode.hasBackPressure()
a1763c54 1825 ) === -1
9e844245 1826 )
e2b31e32
JB
1827 }
1828
b0a4db63 1829 /**
aa9eede8 1830 * Executes the given task on the worker given its worker node key.
b0a4db63 1831 *
aa9eede8 1832 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1833 * @param task - The task to execute.
1834 */
2e81254d 1835 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1836 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1837 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1838 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1839 }
1840
f9f00b5f 1841 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1842 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1843 this.checkAndEmitTaskQueuingEvents()
1844 return tasksQueueSize
adc3c320
JB
1845 }
1846
416fd65c 1847 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1848 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1849 }
1850
416fd65c 1851 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1852 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1853 }
1854
81c02522 1855 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1856 while (this.tasksQueueSize(workerNodeKey) > 0) {
1857 this.executeTask(
1858 workerNodeKey,
1859 this.dequeueTask(workerNodeKey) as Task<Data>
1860 )
ff733df7 1861 }
4b628b48 1862 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1863 }
1864
ef41a6e6
JB
1865 private flushTasksQueues (): void {
1866 for (const [workerNodeKey] of this.workerNodes.entries()) {
1867 this.flushTasksQueue(workerNodeKey)
1868 }
1869 }
c97c7edb 1870}