fix: ensure pool event full is emitted only once
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
04f45163 109 throw new Error(
8c6d4acf 110 'Cannot start a pool from a worker with the same type as the pool'
04f45163 111 )
c97c7edb 112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 120 this.dequeueTask = this.dequeueTask.bind(this)
a1763c54
JB
121 this.checkAndEmitTaskExecutionEvents =
122 this.checkAndEmitTaskExecutionEvents.bind(this)
123 this.checkAndEmitTaskQueuingEvents =
124 this.checkAndEmitTaskQueuingEvents.bind(this)
33e6bb4c
JB
125 this.checkAndEmitDynamicWorkerCreationEvents =
126 this.checkAndEmitDynamicWorkerCreationEvents.bind(this)
1086026a 127
6bd72cd0 128 if (this.opts.enableEvents === true) {
7c0ba920
JB
129 this.emitter = new PoolEmitter()
130 }
d59df138
JB
131 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
132 Worker,
133 Data,
134 Response
da309861
JB
135 >(
136 this,
137 this.opts.workerChoiceStrategy,
138 this.opts.workerChoiceStrategyOptions
139 )
b6b32453
JB
140
141 this.setupHook()
142
075e51d1 143 this.starting = true
e761c033 144 this.startPool()
075e51d1 145 this.starting = false
afe0d5bf
JB
146
147 this.startTimestamp = performance.now()
c97c7edb
S
148 }
149
a35560ba 150 private checkFilePath (filePath: string): void {
ffcbbad8
JB
151 if (
152 filePath == null ||
3d6dd312 153 typeof filePath !== 'string' ||
ffcbbad8
JB
154 (typeof filePath === 'string' && filePath.trim().length === 0)
155 ) {
c510fea7
APA
156 throw new Error('Please specify a file with a worker implementation')
157 }
3d6dd312
JB
158 if (!existsSync(filePath)) {
159 throw new Error(`Cannot find the worker file '${filePath}'`)
160 }
c510fea7
APA
161 }
162
8d3782fa
JB
163 private checkNumberOfWorkers (numberOfWorkers: number): void {
164 if (numberOfWorkers == null) {
165 throw new Error(
166 'Cannot instantiate a pool without specifying the number of workers'
167 )
78cea37e 168 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 169 throw new TypeError(
0d80593b 170 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
171 )
172 } else if (numberOfWorkers < 0) {
473c717a 173 throw new RangeError(
8d3782fa
JB
174 'Cannot instantiate a pool with a negative number of workers'
175 )
6b27d407 176 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
177 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
178 }
179 }
180
181 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 182 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
183 if (max == null) {
184 throw new Error(
185 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
186 )
187 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
188 throw new TypeError(
189 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
190 )
191 } else if (min > max) {
079de991
JB
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
194 )
b97d82d8 195 } else if (max === 0) {
079de991 196 throw new RangeError(
d640b48b 197 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
198 )
199 } else if (min === max) {
200 throw new RangeError(
201 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
202 )
203 }
8d3782fa
JB
204 }
205 }
206
7c0ba920 207 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
208 if (isPlainObject(opts)) {
209 this.opts.workerChoiceStrategy =
210 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
211 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
212 this.opts.workerChoiceStrategyOptions = {
213 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
214 ...opts.workerChoiceStrategyOptions
215 }
49be33fe
JB
216 this.checkValidWorkerChoiceStrategyOptions(
217 this.opts.workerChoiceStrategyOptions
218 )
1f68cede 219 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
220 this.opts.enableEvents = opts.enableEvents ?? true
221 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
222 if (this.opts.enableTasksQueue) {
223 this.checkValidTasksQueueOptions(
224 opts.tasksQueueOptions as TasksQueueOptions
225 )
226 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 }
230 } else {
231 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 232 }
aee46736
JB
233 }
234
235 private checkValidWorkerChoiceStrategy (
236 workerChoiceStrategy: WorkerChoiceStrategy
237 ): void {
238 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 239 throw new Error(
aee46736 240 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
241 )
242 }
7c0ba920
JB
243 }
244
0d80593b
JB
245 private checkValidWorkerChoiceStrategyOptions (
246 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
247 ): void {
248 if (!isPlainObject(workerChoiceStrategyOptions)) {
249 throw new TypeError(
250 'Invalid worker choice strategy options: must be a plain object'
251 )
252 }
8990357d
JB
253 if (
254 workerChoiceStrategyOptions.choiceRetries != null &&
255 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
256 ) {
257 throw new TypeError(
258 'Invalid worker choice strategy options: choice retries must be an integer'
259 )
260 }
261 if (
262 workerChoiceStrategyOptions.choiceRetries != null &&
263 workerChoiceStrategyOptions.choiceRetries <= 0
264 ) {
265 throw new RangeError(
266 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
267 )
268 }
49be33fe
JB
269 if (
270 workerChoiceStrategyOptions.weights != null &&
6b27d407 271 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
272 ) {
273 throw new Error(
274 'Invalid worker choice strategy options: must have a weight for each worker node'
275 )
276 }
f0d7f803
JB
277 if (
278 workerChoiceStrategyOptions.measurement != null &&
279 !Object.values(Measurements).includes(
280 workerChoiceStrategyOptions.measurement
281 )
282 ) {
283 throw new Error(
284 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
285 )
286 }
0d80593b
JB
287 }
288
a20f0ba5
JB
289 private checkValidTasksQueueOptions (
290 tasksQueueOptions: TasksQueueOptions
291 ): void {
0d80593b
JB
292 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
293 throw new TypeError('Invalid tasks queue options: must be a plain object')
294 }
f0d7f803
JB
295 if (
296 tasksQueueOptions?.concurrency != null &&
297 !Number.isSafeInteger(tasksQueueOptions.concurrency)
298 ) {
299 throw new TypeError(
300 'Invalid worker tasks concurrency: must be an integer'
301 )
302 }
303 if (
304 tasksQueueOptions?.concurrency != null &&
305 tasksQueueOptions.concurrency <= 0
306 ) {
a20f0ba5 307 throw new Error(
5137e2ae 308 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
a20f0ba5
JB
309 )
310 }
311 }
312
e761c033
JB
313 private startPool (): void {
314 while (
315 this.workerNodes.reduce(
316 (accumulator, workerNode) =>
317 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
318 0
319 ) < this.numberOfWorkers
320 ) {
aa9eede8 321 this.createAndSetupWorkerNode()
e761c033
JB
322 }
323 }
324
08f3f44c 325 /** @inheritDoc */
6b27d407
JB
326 public get info (): PoolInfo {
327 return {
23ccf9d7 328 version,
6b27d407 329 type: this.type,
184855e6 330 worker: this.worker,
2431bdb4
JB
331 ready: this.ready,
332 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
333 minSize: this.minSize,
334 maxSize: this.maxSize,
c05f0d50
JB
335 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .runTime.aggregate &&
1305e9a8
JB
337 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
339 workerNodes: this.workerNodes.length,
340 idleWorkerNodes: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 workerNode.usage.tasks.executing === 0
a4e07f72
JB
343 ? accumulator + 1
344 : accumulator,
6b27d407
JB
345 0
346 ),
347 busyWorkerNodes: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
f59e1027 349 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
350 0
351 ),
a4e07f72 352 executedTasks: this.workerNodes.reduce(
6b27d407 353 (accumulator, workerNode) =>
f59e1027 354 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
355 0
356 ),
357 executingTasks: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
f59e1027 359 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
360 0
361 ),
daf86646
JB
362 ...(this.opts.enableTasksQueue === true && {
363 queuedTasks: this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + workerNode.usage.tasks.queued,
366 0
367 )
368 }),
369 ...(this.opts.enableTasksQueue === true && {
370 maxQueuedTasks: this.workerNodes.reduce(
371 (accumulator, workerNode) =>
372 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
373 0
374 )
375 }),
a1763c54
JB
376 ...(this.opts.enableTasksQueue === true && {
377 backPressure: this.hasBackPressure()
378 }),
a4e07f72
JB
379 failedTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.failed,
a4e07f72 382 0
1dcf8b7b
JB
383 ),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .runTime.aggregate && {
386 runTime: {
98e72cda
JB
387 minimum: round(
388 Math.min(
389 ...this.workerNodes.map(
8ebe6c30 390 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 391 )
1dcf8b7b
JB
392 )
393 ),
98e72cda
JB
394 maximum: round(
395 Math.max(
396 ...this.workerNodes.map(
8ebe6c30 397 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 398 )
1dcf8b7b 399 )
98e72cda
JB
400 ),
401 average: round(
402 this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
405 0
406 ) /
407 this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + (workerNode.usage.tasks?.executed ?? 0),
410 0
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.median && {
415 median: round(
416 median(
417 this.workerNodes.map(
8ebe6c30 418 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
419 )
420 )
421 )
422 })
1dcf8b7b
JB
423 }
424 }),
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .waitTime.aggregate && {
427 waitTime: {
98e72cda
JB
428 minimum: round(
429 Math.min(
430 ...this.workerNodes.map(
8ebe6c30 431 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 432 )
1dcf8b7b
JB
433 )
434 ),
98e72cda
JB
435 maximum: round(
436 Math.max(
437 ...this.workerNodes.map(
8ebe6c30 438 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 439 )
1dcf8b7b 440 )
98e72cda
JB
441 ),
442 average: round(
443 this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
446 0
447 ) /
448 this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 accumulator + (workerNode.usage.tasks?.executed ?? 0),
451 0
452 )
453 ),
454 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
455 .waitTime.median && {
456 median: round(
457 median(
458 this.workerNodes.map(
8ebe6c30 459 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
460 )
461 )
462 )
463 })
1dcf8b7b
JB
464 }
465 })
6b27d407
JB
466 }
467 }
08f3f44c 468
aa9eede8
JB
469 /**
470 * The pool readiness boolean status.
471 */
2431bdb4
JB
472 private get ready (): boolean {
473 return (
b97d82d8
JB
474 this.workerNodes.reduce(
475 (accumulator, workerNode) =>
476 !workerNode.info.dynamic && workerNode.info.ready
477 ? accumulator + 1
478 : accumulator,
479 0
480 ) >= this.minSize
2431bdb4
JB
481 )
482 }
483
afe0d5bf 484 /**
aa9eede8 485 * The approximate pool utilization.
afe0d5bf
JB
486 *
487 * @returns The pool utilization.
488 */
489 private get utilization (): number {
8e5ca040 490 const poolTimeCapacity =
fe7d90db 491 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
492 const totalTasksRunTime = this.workerNodes.reduce(
493 (accumulator, workerNode) =>
71514351 494 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
495 0
496 )
497 const totalTasksWaitTime = this.workerNodes.reduce(
498 (accumulator, workerNode) =>
71514351 499 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
500 0
501 )
8e5ca040 502 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
503 }
504
8881ae32 505 /**
aa9eede8 506 * The pool type.
8881ae32
JB
507 *
508 * If it is `'dynamic'`, it provides the `max` property.
509 */
510 protected abstract get type (): PoolType
511
184855e6 512 /**
aa9eede8 513 * The worker type.
184855e6
JB
514 */
515 protected abstract get worker (): WorkerType
516
c2ade475 517 /**
aa9eede8 518 * The pool minimum size.
c2ade475 519 */
6b27d407 520 protected abstract get minSize (): number
ff733df7
JB
521
522 /**
aa9eede8 523 * The pool maximum size.
ff733df7 524 */
6b27d407 525 protected abstract get maxSize (): number
a35560ba 526
6b813701
JB
527 /**
528 * Checks if the worker id sent in the received message from a worker is valid.
529 *
530 * @param message - The received message.
531 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
532 */
21f710aa 533 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
534 if (message.workerId == null) {
535 throw new Error('Worker message received without worker id')
536 } else if (
21f710aa 537 message.workerId != null &&
aad6fb64 538 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
539 ) {
540 throw new Error(
541 `Worker message received from unknown worker '${message.workerId}'`
542 )
543 }
544 }
545
ffcbbad8 546 /**
f06e48d8 547 * Gets the given worker its worker node key.
ffcbbad8
JB
548 *
549 * @param worker - The worker.
f59e1027 550 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 551 */
aad6fb64 552 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 553 return this.workerNodes.findIndex(
8ebe6c30 554 (workerNode) => workerNode.worker === worker
f06e48d8 555 )
bf9549ae
JB
556 }
557
aa9eede8
JB
558 /**
559 * Gets the worker node key given its worker id.
560 *
561 * @param workerId - The worker id.
aad6fb64 562 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 563 */
aad6fb64
JB
564 private getWorkerNodeKeyByWorkerId (workerId: number): number {
565 return this.workerNodes.findIndex(
8ebe6c30 566 (workerNode) => workerNode.info.id === workerId
aad6fb64 567 )
aa9eede8
JB
568 }
569
afc003b2 570 /** @inheritDoc */
a35560ba 571 public setWorkerChoiceStrategy (
59219cbb
JB
572 workerChoiceStrategy: WorkerChoiceStrategy,
573 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 574 ): void {
aee46736 575 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 576 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
577 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
578 this.opts.workerChoiceStrategy
579 )
580 if (workerChoiceStrategyOptions != null) {
581 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
582 }
aa9eede8 583 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 584 workerNode.resetUsage()
9edb9717 585 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 586 }
a20f0ba5
JB
587 }
588
589 /** @inheritDoc */
590 public setWorkerChoiceStrategyOptions (
591 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
592 ): void {
0d80593b 593 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
594 this.opts.workerChoiceStrategyOptions = {
595 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
596 ...workerChoiceStrategyOptions
597 }
a20f0ba5
JB
598 this.workerChoiceStrategyContext.setOptions(
599 this.opts.workerChoiceStrategyOptions
a35560ba
S
600 )
601 }
602
a20f0ba5 603 /** @inheritDoc */
8f52842f
JB
604 public enableTasksQueue (
605 enable: boolean,
606 tasksQueueOptions?: TasksQueueOptions
607 ): void {
a20f0ba5 608 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 609 this.flushTasksQueues()
a20f0ba5
JB
610 }
611 this.opts.enableTasksQueue = enable
8f52842f 612 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
613 }
614
615 /** @inheritDoc */
8f52842f 616 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 617 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
618 this.checkValidTasksQueueOptions(tasksQueueOptions)
619 this.opts.tasksQueueOptions =
620 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 621 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
622 delete this.opts.tasksQueueOptions
623 }
624 }
625
626 private buildTasksQueueOptions (
627 tasksQueueOptions: TasksQueueOptions
628 ): TasksQueueOptions {
629 return {
630 concurrency: tasksQueueOptions?.concurrency ?? 1
631 }
632 }
633
c319c66b
JB
634 /**
635 * Whether the pool is full or not.
636 *
637 * The pool filling boolean status.
638 */
dea903a8
JB
639 protected get full (): boolean {
640 return this.workerNodes.length >= this.maxSize
641 }
c2ade475 642
c319c66b
JB
643 /**
644 * Whether the pool is busy or not.
645 *
646 * The pool busyness boolean status.
647 */
648 protected abstract get busy (): boolean
7c0ba920 649
6c6afb84 650 /**
3d76750a 651 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
652 *
653 * @returns Worker nodes busyness boolean status.
654 */
c2ade475 655 protected internalBusy (): boolean {
3d76750a
JB
656 if (this.opts.enableTasksQueue === true) {
657 return (
658 this.workerNodes.findIndex(
8ebe6c30 659 (workerNode) =>
3d76750a
JB
660 workerNode.info.ready &&
661 workerNode.usage.tasks.executing <
662 (this.opts.tasksQueueOptions?.concurrency as number)
663 ) === -1
664 )
665 } else {
666 return (
667 this.workerNodes.findIndex(
8ebe6c30 668 (workerNode) =>
3d76750a
JB
669 workerNode.info.ready && workerNode.usage.tasks.executing === 0
670 ) === -1
671 )
672 }
cb70b19d
JB
673 }
674
90d7d101
JB
675 /** @inheritDoc */
676 public listTaskFunctions (): string[] {
f2dbbf95
JB
677 for (const workerNode of this.workerNodes) {
678 if (
679 Array.isArray(workerNode.info.taskFunctions) &&
680 workerNode.info.taskFunctions.length > 0
681 ) {
682 return workerNode.info.taskFunctions
683 }
90d7d101 684 }
f2dbbf95 685 return []
90d7d101
JB
686 }
687
afc003b2 688 /** @inheritDoc */
7d91a8cd
JB
689 public async execute (
690 data?: Data,
691 name?: string,
692 transferList?: TransferListItem[]
693 ): Promise<Response> {
52b71763 694 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
695 if (name != null && typeof name !== 'string') {
696 reject(new TypeError('name argument must be a string'))
697 }
90d7d101
JB
698 if (
699 name != null &&
700 typeof name === 'string' &&
701 name.trim().length === 0
702 ) {
f58b60b9 703 reject(new TypeError('name argument must not be an empty string'))
90d7d101 704 }
b558f6b5
JB
705 if (transferList != null && !Array.isArray(transferList)) {
706 reject(new TypeError('transferList argument must be an array'))
707 }
708 const timestamp = performance.now()
709 const workerNodeKey = this.chooseWorkerNode()
a5d15204 710 const workerInfo = this.getWorkerInfo(workerNodeKey)
cea399c8
JB
711 if (
712 name != null &&
a5d15204
JB
713 Array.isArray(workerInfo.taskFunctions) &&
714 !workerInfo.taskFunctions.includes(name)
cea399c8 715 ) {
90d7d101
JB
716 reject(
717 new Error(`Task function '${name}' is not registered in the pool`)
718 )
719 }
501aea93 720 const task: Task<Data> = {
52b71763
JB
721 name: name ?? DEFAULT_TASK_NAME,
722 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
723 data: data ?? ({} as Data),
7d91a8cd 724 transferList,
52b71763 725 timestamp,
a5d15204 726 workerId: workerInfo.id as number,
7629bdf1 727 taskId: randomUUID()
52b71763 728 }
7629bdf1 729 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
730 resolve,
731 reject,
501aea93 732 workerNodeKey
2e81254d 733 })
52b71763 734 if (
4e377863
JB
735 this.opts.enableTasksQueue === false ||
736 (this.opts.enableTasksQueue === true &&
737 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 738 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 739 ) {
501aea93 740 this.executeTask(workerNodeKey, task)
4e377863
JB
741 } else {
742 this.enqueueTask(workerNodeKey, task)
52b71763 743 }
2e81254d 744 })
280c2a77 745 }
c97c7edb 746
afc003b2 747 /** @inheritDoc */
c97c7edb 748 public async destroy (): Promise<void> {
1fbcaa7c 749 await Promise.all(
81c02522 750 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 751 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
752 })
753 )
33e6bb4c 754 this.emitter?.emit(PoolEvents.destroy, this.info)
c97c7edb
S
755 }
756
1e3214b6
JB
757 protected async sendKillMessageToWorker (
758 workerNodeKey: number,
759 workerId: number
760 ): Promise<void> {
9edb9717 761 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
762 this.registerWorkerMessageListener(workerNodeKey, (message) => {
763 if (message.kill === 'success') {
764 resolve()
765 } else if (message.kill === 'failure') {
e1af34e6 766 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
767 }
768 })
9edb9717 769 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 770 })
1e3214b6
JB
771 }
772
4a6952ff 773 /**
aa9eede8 774 * Terminates the worker node given its worker node key.
4a6952ff 775 *
aa9eede8 776 * @param workerNodeKey - The worker node key.
4a6952ff 777 */
81c02522 778 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 779
729c563d 780 /**
6677a3d3
JB
781 * Setup hook to execute code before worker nodes are created in the abstract constructor.
782 * Can be overridden.
afc003b2
JB
783 *
784 * @virtual
729c563d 785 */
280c2a77 786 protected setupHook (): void {
d99ba5a8 787 // Intentionally empty
280c2a77 788 }
c97c7edb 789
729c563d 790 /**
280c2a77
S
791 * Should return whether the worker is the main worker or not.
792 */
793 protected abstract isMain (): boolean
794
795 /**
2e81254d 796 * Hook executed before the worker task execution.
bf9549ae 797 * Can be overridden.
729c563d 798 *
f06e48d8 799 * @param workerNodeKey - The worker node key.
1c6fe997 800 * @param task - The task to execute.
729c563d 801 */
1c6fe997
JB
802 protected beforeTaskExecutionHook (
803 workerNodeKey: number,
804 task: Task<Data>
805 ): void {
f59e1027 806 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
807 ++workerUsage.tasks.executing
808 this.updateWaitTimeWorkerUsage(workerUsage, task)
db0e38ee
JB
809 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
810 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 811 workerNodeKey
db0e38ee
JB
812 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
813 ++taskFunctionWorkerUsage.tasks.executing
814 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 815 }
c97c7edb
S
816 }
817
c01733f1 818 /**
2e81254d 819 * Hook executed after the worker task execution.
bf9549ae 820 * Can be overridden.
c01733f1 821 *
501aea93 822 * @param workerNodeKey - The worker node key.
38e795c1 823 * @param message - The received message.
c01733f1 824 */
2e81254d 825 protected afterTaskExecutionHook (
501aea93 826 workerNodeKey: number,
2740a743 827 message: MessageValue<Response>
bf9549ae 828 ): void {
ff128cc9 829 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
830 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
831 this.updateRunTimeWorkerUsage(workerUsage, message)
832 this.updateEluWorkerUsage(workerUsage, message)
db0e38ee
JB
833 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
834 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 835 workerNodeKey
db0e38ee 836 ].getTaskFunctionWorkerUsage(
b558f6b5
JB
837 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
838 ) as WorkerUsage
db0e38ee
JB
839 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
840 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
841 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
842 }
843 }
844
db0e38ee
JB
845 /**
846 * Whether the worker node shall update its task function worker usage or not.
847 *
848 * @param workerNodeKey - The worker node key.
849 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
850 */
851 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 852 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 853 return (
a5d15204 854 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 855 workerInfo.taskFunctions.length > 2
b558f6b5 856 )
f1c06930
JB
857 }
858
859 private updateTaskStatisticsWorkerUsage (
860 workerUsage: WorkerUsage,
861 message: MessageValue<Response>
862 ): void {
a4e07f72 863 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
864 if (
865 workerTaskStatistics.executing != null &&
866 workerTaskStatistics.executing > 0
867 ) {
868 --workerTaskStatistics.executing
869 } else if (
870 workerTaskStatistics.executing != null &&
871 workerTaskStatistics.executing < 0
872 ) {
873 throw new Error(
cf6e3991 874 'Worker usage statistic for tasks executing cannot be negative'
5bb5be17
JB
875 )
876 }
98e72cda
JB
877 if (message.taskError == null) {
878 ++workerTaskStatistics.executed
879 } else {
a4e07f72 880 ++workerTaskStatistics.failed
2740a743 881 }
f8eb0a2a
JB
882 }
883
a4e07f72
JB
884 private updateRunTimeWorkerUsage (
885 workerUsage: WorkerUsage,
f8eb0a2a
JB
886 message: MessageValue<Response>
887 ): void {
e4f20deb
JB
888 updateMeasurementStatistics(
889 workerUsage.runTime,
890 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
891 message.taskPerformance?.runTime ?? 0,
892 workerUsage.tasks.executed
893 )
f8eb0a2a
JB
894 }
895
a4e07f72
JB
896 private updateWaitTimeWorkerUsage (
897 workerUsage: WorkerUsage,
1c6fe997 898 task: Task<Data>
f8eb0a2a 899 ): void {
1c6fe997
JB
900 const timestamp = performance.now()
901 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
902 updateMeasurementStatistics(
903 workerUsage.waitTime,
904 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
905 taskWaitTime,
906 workerUsage.tasks.executed
907 )
c01733f1 908 }
909
a4e07f72 910 private updateEluWorkerUsage (
5df69fab 911 workerUsage: WorkerUsage,
62c15a68
JB
912 message: MessageValue<Response>
913 ): void {
008512c7
JB
914 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
915 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
916 updateMeasurementStatistics(
917 workerUsage.elu.active,
008512c7 918 eluTaskStatisticsRequirements,
e4f20deb
JB
919 message.taskPerformance?.elu?.active ?? 0,
920 workerUsage.tasks.executed
921 )
922 updateMeasurementStatistics(
923 workerUsage.elu.idle,
008512c7 924 eluTaskStatisticsRequirements,
e4f20deb
JB
925 message.taskPerformance?.elu?.idle ?? 0,
926 workerUsage.tasks.executed
927 )
008512c7 928 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 929 if (message.taskPerformance?.elu != null) {
f7510105
JB
930 if (workerUsage.elu.utilization != null) {
931 workerUsage.elu.utilization =
932 (workerUsage.elu.utilization +
933 message.taskPerformance.elu.utilization) /
934 2
935 } else {
936 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
937 }
62c15a68
JB
938 }
939 }
940 }
941
280c2a77 942 /**
f06e48d8 943 * Chooses a worker node for the next task.
280c2a77 944 *
6c6afb84 945 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 946 *
aa9eede8 947 * @returns The chosen worker node key
280c2a77 948 */
6c6afb84 949 private chooseWorkerNode (): number {
930dcf12 950 if (this.shallCreateDynamicWorker()) {
aa9eede8 951 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
952 if (
953 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
954 ) {
aa9eede8 955 return workerNodeKey
6c6afb84 956 }
17393ac8 957 }
930dcf12
JB
958 return this.workerChoiceStrategyContext.execute()
959 }
960
6c6afb84
JB
961 /**
962 * Conditions for dynamic worker creation.
963 *
964 * @returns Whether to create a dynamic worker or not.
965 */
966 private shallCreateDynamicWorker (): boolean {
930dcf12 967 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
968 }
969
280c2a77 970 /**
aa9eede8 971 * Sends a message to worker given its worker node key.
280c2a77 972 *
aa9eede8 973 * @param workerNodeKey - The worker node key.
38e795c1 974 * @param message - The message.
7d91a8cd 975 * @param transferList - The optional array of transferable objects.
280c2a77
S
976 */
977 protected abstract sendToWorker (
aa9eede8 978 workerNodeKey: number,
7d91a8cd
JB
979 message: MessageValue<Data>,
980 transferList?: TransferListItem[]
280c2a77
S
981 ): void
982
729c563d 983 /**
41344292 984 * Creates a new worker.
6c6afb84
JB
985 *
986 * @returns Newly created worker.
729c563d 987 */
280c2a77 988 protected abstract createWorker (): Worker
c97c7edb 989
4a6952ff 990 /**
aa9eede8 991 * Creates a new, completely set up worker node.
4a6952ff 992 *
aa9eede8 993 * @returns New, completely set up worker node key.
4a6952ff 994 */
aa9eede8 995 protected createAndSetupWorkerNode (): number {
bdacc2d2 996 const worker = this.createWorker()
280c2a77 997
fd04474e 998 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 999 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1000 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1001 worker.on('error', (error) => {
aad6fb64 1002 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
1003 const workerInfo = this.getWorkerInfo(workerNodeKey)
1004 workerInfo.ready = false
0dc838e3 1005 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1006 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 1007 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 1008 if (workerInfo.dynamic) {
aa9eede8 1009 this.createAndSetupDynamicWorkerNode()
8a1260a3 1010 } else {
aa9eede8 1011 this.createAndSetupWorkerNode()
8a1260a3 1012 }
5baee0d7 1013 }
19dbc45b 1014 if (this.opts.enableTasksQueue === true) {
9b106837 1015 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1016 }
5baee0d7 1017 })
a35560ba 1018 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1019 worker.once('exit', () => {
f06e48d8 1020 this.removeWorkerNode(worker)
a974afa6 1021 })
280c2a77 1022
aa9eede8 1023 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1024
aa9eede8 1025 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1026
aa9eede8 1027 return workerNodeKey
c97c7edb 1028 }
be0676b3 1029
930dcf12 1030 /**
aa9eede8 1031 * Creates a new, completely set up dynamic worker node.
930dcf12 1032 *
aa9eede8 1033 * @returns New, completely set up dynamic worker node key.
930dcf12 1034 */
aa9eede8
JB
1035 protected createAndSetupDynamicWorkerNode (): number {
1036 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1037 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1038 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1039 message.workerId
aad6fb64 1040 )
aa9eede8 1041 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1042 // Kill message received from worker
930dcf12
JB
1043 if (
1044 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1045 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1046 ((this.opts.enableTasksQueue === false &&
aa9eede8 1047 workerUsage.tasks.executing === 0) ||
7b56f532 1048 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1049 workerUsage.tasks.executing === 0 &&
1050 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1051 ) {
5270d253
JB
1052 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1053 this.emitter?.emit(PoolEvents.error, error)
1054 })
930dcf12
JB
1055 }
1056 })
aa9eede8 1057 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1058 this.sendToWorker(workerNodeKey, {
b0a4db63 1059 checkActive: true,
21f710aa
JB
1060 workerId: workerInfo.id as number
1061 })
b5e113f6
JB
1062 workerInfo.dynamic = true
1063 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1064 workerInfo.ready = true
1065 }
33e6bb4c 1066 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1067 return workerNodeKey
930dcf12
JB
1068 }
1069
a2ed5053 1070 /**
aa9eede8 1071 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1072 *
aa9eede8 1073 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1074 * @param listener - The message listener callback.
1075 */
85aeb3f3
JB
1076 protected abstract registerWorkerMessageListener<
1077 Message extends Data | Response
aa9eede8
JB
1078 >(
1079 workerNodeKey: number,
1080 listener: (message: MessageValue<Message>) => void
1081 ): void
a2ed5053
JB
1082
1083 /**
aa9eede8 1084 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1085 * Can be overridden.
1086 *
aa9eede8 1087 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1088 */
aa9eede8 1089 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1090 // Listen to worker messages.
aa9eede8 1091 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1092 // Send the startup message to worker.
aa9eede8 1093 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1094 // Send the statistics message to worker.
1095 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1096 }
1097
85aeb3f3 1098 /**
aa9eede8
JB
1099 * Sends the startup message to worker given its worker node key.
1100 *
1101 * @param workerNodeKey - The worker node key.
1102 */
1103 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1104
1105 /**
9edb9717 1106 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1107 *
aa9eede8 1108 * @param workerNodeKey - The worker node key.
85aeb3f3 1109 */
9edb9717 1110 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1111 this.sendToWorker(workerNodeKey, {
1112 statistics: {
1113 runTime:
1114 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1115 .runTime.aggregate,
1116 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1117 .elu.aggregate
1118 },
1119 workerId: this.getWorkerInfo(workerNodeKey).id as number
1120 })
1121 }
a2ed5053
JB
1122
1123 private redistributeQueuedTasks (workerNodeKey: number): void {
1124 while (this.tasksQueueSize(workerNodeKey) > 0) {
1125 let targetWorkerNodeKey: number = workerNodeKey
1126 let minQueuedTasks = Infinity
10ecf8fd 1127 let executeTask = false
a2ed5053
JB
1128 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1129 const workerInfo = this.getWorkerInfo(workerNodeId)
1130 if (
1131 workerNodeId !== workerNodeKey &&
1132 workerInfo.ready &&
1133 workerNode.usage.tasks.queued === 0
1134 ) {
a5ed75b7
JB
1135 if (
1136 this.workerNodes[workerNodeId].usage.tasks.executing <
1137 (this.opts.tasksQueueOptions?.concurrency as number)
1138 ) {
10ecf8fd
JB
1139 executeTask = true
1140 }
a2ed5053
JB
1141 targetWorkerNodeKey = workerNodeId
1142 break
1143 }
1144 if (
1145 workerNodeId !== workerNodeKey &&
1146 workerInfo.ready &&
1147 workerNode.usage.tasks.queued < minQueuedTasks
1148 ) {
1149 minQueuedTasks = workerNode.usage.tasks.queued
1150 targetWorkerNodeKey = workerNodeId
1151 }
1152 }
10ecf8fd
JB
1153 if (executeTask) {
1154 this.executeTask(
1155 targetWorkerNodeKey,
1156 this.dequeueTask(workerNodeKey) as Task<Data>
1157 )
1158 } else {
1159 this.enqueueTask(
1160 targetWorkerNodeKey,
1161 this.dequeueTask(workerNodeKey) as Task<Data>
1162 )
1163 }
a2ed5053
JB
1164 }
1165 }
1166
be0676b3 1167 /**
aa9eede8 1168 * This method is the listener registered for each worker message.
be0676b3 1169 *
bdacc2d2 1170 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1171 */
1172 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1173 return (message) => {
21f710aa 1174 this.checkMessageWorkerId(message)
a5d15204 1175 if (message.ready != null && message.taskFunctions != null) {
81c02522 1176 // Worker ready response received from worker
10e2aa7e 1177 this.handleWorkerReadyResponse(message)
7629bdf1 1178 } else if (message.taskId != null) {
81c02522 1179 // Task execution response received from worker
6b272951 1180 this.handleTaskExecutionResponse(message)
90d7d101
JB
1181 } else if (message.taskFunctions != null) {
1182 // Task functions message received from worker
b558f6b5
JB
1183 this.getWorkerInfo(
1184 this.getWorkerNodeKeyByWorkerId(message.workerId)
1185 ).taskFunctions = message.taskFunctions
6b272951
JB
1186 }
1187 }
1188 }
1189
10e2aa7e 1190 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1191 if (message.ready === false) {
1192 throw new Error(`Worker ${message.workerId} failed to initialize`)
1193 }
a5d15204 1194 const workerInfo = this.getWorkerInfo(
aad6fb64 1195 this.getWorkerNodeKeyByWorkerId(message.workerId)
a5d15204
JB
1196 )
1197 workerInfo.ready = message.ready as boolean
1198 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1199 if (this.emitter != null && this.ready) {
1200 this.emitter.emit(PoolEvents.ready, this.info)
1201 }
6b272951
JB
1202 }
1203
1204 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1205 const { taskId, taskError, data } = message
1206 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1207 if (promiseResponse != null) {
5441aea6
JB
1208 if (taskError != null) {
1209 this.emitter?.emit(PoolEvents.taskError, taskError)
1210 promiseResponse.reject(taskError.message)
6b272951 1211 } else {
5441aea6 1212 promiseResponse.resolve(data as Response)
6b272951 1213 }
501aea93
JB
1214 const workerNodeKey = promiseResponse.workerNodeKey
1215 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1216 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1217 if (
1218 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1219 this.tasksQueueSize(workerNodeKey) > 0 &&
1220 this.workerNodes[workerNodeKey].usage.tasks.executing <
1221 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1222 ) {
1223 this.executeTask(
1224 workerNodeKey,
1225 this.dequeueTask(workerNodeKey) as Task<Data>
1226 )
be0676b3 1227 }
6b272951 1228 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1229 }
be0676b3 1230 }
7c0ba920 1231
a1763c54 1232 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1233 if (this.busy) {
1234 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1235 }
1236 }
1237
1238 private checkAndEmitTaskQueuingEvents (): void {
1239 if (this.hasBackPressure()) {
1240 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1241 }
1242 }
1243
33e6bb4c
JB
1244 private checkAndEmitDynamicWorkerCreationEvents (): void {
1245 if (this.type === PoolTypes.dynamic) {
1246 if (this.full) {
1247 this.emitter?.emit(PoolEvents.full, this.info)
1248 }
1249 }
1250 }
1251
8a1260a3 1252 /**
aa9eede8 1253 * Gets the worker information given its worker node key.
8a1260a3
JB
1254 *
1255 * @param workerNodeKey - The worker node key.
3f09ed9f 1256 * @returns The worker information.
8a1260a3 1257 */
aa9eede8 1258 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1259 return this.workerNodes[workerNodeKey].info
e221309a
JB
1260 }
1261
a05c10de 1262 /**
b0a4db63 1263 * Adds the given worker in the pool worker nodes.
ea7a90d3 1264 *
38e795c1 1265 * @param worker - The worker.
aa9eede8
JB
1266 * @returns The added worker node key.
1267 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1268 */
b0a4db63 1269 private addWorkerNode (worker: Worker): number {
671d5154
JB
1270 const workerNode = new WorkerNode<Worker, Data>(
1271 worker,
1272 this.worker,
1273 this.maxSize
1274 )
b97d82d8 1275 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1276 if (this.starting) {
1277 workerNode.info.ready = true
1278 }
aa9eede8 1279 this.workerNodes.push(workerNode)
aad6fb64 1280 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1281 if (workerNodeKey === -1) {
a5d15204 1282 throw new Error('Worker node added not found')
aa9eede8
JB
1283 }
1284 return workerNodeKey
ea7a90d3 1285 }
c923ce56 1286
51fe3d3c 1287 /**
f06e48d8 1288 * Removes the given worker from the pool worker nodes.
51fe3d3c 1289 *
f06e48d8 1290 * @param worker - The worker.
51fe3d3c 1291 */
416fd65c 1292 private removeWorkerNode (worker: Worker): void {
aad6fb64 1293 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1294 if (workerNodeKey !== -1) {
1295 this.workerNodes.splice(workerNodeKey, 1)
1296 this.workerChoiceStrategyContext.remove(workerNodeKey)
1297 }
51fe3d3c 1298 }
adc3c320 1299
e2b31e32
JB
1300 /** @inheritDoc */
1301 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1302 return (
e2b31e32
JB
1303 this.opts.enableTasksQueue === true &&
1304 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1305 )
1306 }
1307
1308 private hasBackPressure (): boolean {
1309 return (
1310 this.opts.enableTasksQueue === true &&
1311 this.workerNodes.findIndex(
1312 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1313 ) === -1
9e844245 1314 )
e2b31e32
JB
1315 }
1316
b0a4db63 1317 /**
aa9eede8 1318 * Executes the given task on the worker given its worker node key.
b0a4db63 1319 *
aa9eede8 1320 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1321 * @param task - The task to execute.
1322 */
2e81254d 1323 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1324 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1325 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1326 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1327 }
1328
f9f00b5f 1329 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1330 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1331 this.checkAndEmitTaskQueuingEvents()
1332 return tasksQueueSize
adc3c320
JB
1333 }
1334
416fd65c 1335 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1336 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1337 }
1338
416fd65c 1339 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1340 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1341 }
1342
81c02522 1343 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1344 while (this.tasksQueueSize(workerNodeKey) > 0) {
1345 this.executeTask(
1346 workerNodeKey,
1347 this.dequeueTask(workerNodeKey) as Task<Data>
1348 )
ff733df7 1349 }
4b628b48 1350 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1351 }
1352
ef41a6e6
JB
1353 private flushTasksQueues (): void {
1354 for (const [workerNodeKey] of this.workerNodes.entries()) {
1355 this.flushTasksQueue(workerNodeKey)
1356 }
1357 }
c97c7edb 1358}