fix: fix back pressure detection
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
04f45163 109 throw new Error(
8c6d4acf 110 'Cannot start a pool from a worker with the same type as the pool'
04f45163 111 )
c97c7edb 112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 120 this.dequeueTask = this.dequeueTask.bind(this)
a1763c54
JB
121 this.checkAndEmitTaskExecutionEvents =
122 this.checkAndEmitTaskExecutionEvents.bind(this)
123 this.checkAndEmitTaskQueuingEvents =
124 this.checkAndEmitTaskQueuingEvents.bind(this)
1086026a 125
6bd72cd0 126 if (this.opts.enableEvents === true) {
7c0ba920
JB
127 this.emitter = new PoolEmitter()
128 }
d59df138
JB
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
da309861
JB
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
b6b32453
JB
138
139 this.setupHook()
140
075e51d1 141 this.starting = true
e761c033 142 this.startPool()
075e51d1 143 this.starting = false
afe0d5bf
JB
144
145 this.startTimestamp = performance.now()
c97c7edb
S
146 }
147
a35560ba 148 private checkFilePath (filePath: string): void {
ffcbbad8
JB
149 if (
150 filePath == null ||
3d6dd312 151 typeof filePath !== 'string' ||
ffcbbad8
JB
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
c510fea7
APA
154 throw new Error('Please specify a file with a worker implementation')
155 }
3d6dd312
JB
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
c510fea7
APA
159 }
160
8d3782fa
JB
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
78cea37e 166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 167 throw new TypeError(
0d80593b 168 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
169 )
170 } else if (numberOfWorkers < 0) {
473c717a 171 throw new RangeError(
8d3782fa
JB
172 'Cannot instantiate a pool with a negative number of workers'
173 )
6b27d407 174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 180 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
181 if (max == null) {
182 throw new Error(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
079de991
JB
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
b97d82d8 193 } else if (max === 0) {
079de991 194 throw new RangeError(
d640b48b 195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
8d3782fa
JB
202 }
203 }
204
7c0ba920 205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
49be33fe
JB
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 230 }
aee46736
JB
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 237 throw new Error(
aee46736 238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
239 )
240 }
7c0ba920
JB
241 }
242
0d80593b
JB
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
8990357d
JB
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
49be33fe
JB
267 if (
268 workerChoiceStrategyOptions.weights != null &&
6b27d407 269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
f0d7f803
JB
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
0d80593b
JB
285 }
286
a20f0ba5
JB
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
0d80593b
JB
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
f0d7f803
JB
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
a20f0ba5 305 throw new Error(
5137e2ae 306 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
a20f0ba5
JB
307 )
308 }
309 }
310
e761c033
JB
311 private startPool (): void {
312 while (
313 this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
316 0
317 ) < this.numberOfWorkers
318 ) {
aa9eede8 319 this.createAndSetupWorkerNode()
e761c033
JB
320 }
321 }
322
08f3f44c 323 /** @inheritDoc */
6b27d407
JB
324 public get info (): PoolInfo {
325 return {
23ccf9d7 326 version,
6b27d407 327 type: this.type,
184855e6 328 worker: this.worker,
2431bdb4
JB
329 ready: this.ready,
330 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
331 minSize: this.minSize,
332 maxSize: this.maxSize,
c05f0d50
JB
333 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 .runTime.aggregate &&
1305e9a8
JB
335 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
337 workerNodes: this.workerNodes.length,
338 idleWorkerNodes: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
f59e1027 340 workerNode.usage.tasks.executing === 0
a4e07f72
JB
341 ? accumulator + 1
342 : accumulator,
6b27d407
JB
343 0
344 ),
345 busyWorkerNodes: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
f59e1027 347 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
348 0
349 ),
a4e07f72 350 executedTasks: this.workerNodes.reduce(
6b27d407 351 (accumulator, workerNode) =>
f59e1027 352 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
353 0
354 ),
355 executingTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
f59e1027 357 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
358 0
359 ),
daf86646
JB
360 ...(this.opts.enableTasksQueue === true && {
361 queuedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.queued,
364 0
365 )
366 }),
367 ...(this.opts.enableTasksQueue === true && {
368 maxQueuedTasks: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
371 0
372 )
373 }),
a1763c54
JB
374 ...(this.opts.enableTasksQueue === true && {
375 backPressure: this.hasBackPressure()
376 }),
a4e07f72
JB
377 failedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
f59e1027 379 accumulator + workerNode.usage.tasks.failed,
a4e07f72 380 0
1dcf8b7b
JB
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.aggregate && {
384 runTime: {
98e72cda
JB
385 minimum: round(
386 Math.min(
387 ...this.workerNodes.map(
8ebe6c30 388 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 389 )
1dcf8b7b
JB
390 )
391 ),
98e72cda
JB
392 maximum: round(
393 Math.max(
394 ...this.workerNodes.map(
8ebe6c30 395 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 396 )
1dcf8b7b 397 )
98e72cda
JB
398 ),
399 average: round(
400 this.workerNodes.reduce(
401 (accumulator, workerNode) =>
402 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
403 0
404 ) /
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.tasks?.executed ?? 0),
408 0
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .runTime.median && {
413 median: round(
414 median(
415 this.workerNodes.map(
8ebe6c30 416 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
417 )
418 )
419 )
420 })
1dcf8b7b
JB
421 }
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.aggregate && {
425 waitTime: {
98e72cda
JB
426 minimum: round(
427 Math.min(
428 ...this.workerNodes.map(
8ebe6c30 429 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 430 )
1dcf8b7b
JB
431 )
432 ),
98e72cda
JB
433 maximum: round(
434 Math.max(
435 ...this.workerNodes.map(
8ebe6c30 436 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 437 )
1dcf8b7b 438 )
98e72cda
JB
439 ),
440 average: round(
441 this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 ) /
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 accumulator + (workerNode.usage.tasks?.executed ?? 0),
449 0
450 )
451 ),
452 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
453 .waitTime.median && {
454 median: round(
455 median(
456 this.workerNodes.map(
8ebe6c30 457 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
458 )
459 )
460 )
461 })
1dcf8b7b
JB
462 }
463 })
6b27d407
JB
464 }
465 }
08f3f44c 466
aa9eede8
JB
467 /**
468 * The pool readiness boolean status.
469 */
2431bdb4
JB
470 private get ready (): boolean {
471 return (
b97d82d8
JB
472 this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 !workerNode.info.dynamic && workerNode.info.ready
475 ? accumulator + 1
476 : accumulator,
477 0
478 ) >= this.minSize
2431bdb4
JB
479 )
480 }
481
afe0d5bf 482 /**
aa9eede8 483 * The approximate pool utilization.
afe0d5bf
JB
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
8e5ca040 488 const poolTimeCapacity =
fe7d90db 489 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
490 const totalTasksRunTime = this.workerNodes.reduce(
491 (accumulator, workerNode) =>
71514351 492 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
493 0
494 )
495 const totalTasksWaitTime = this.workerNodes.reduce(
496 (accumulator, workerNode) =>
71514351 497 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
498 0
499 )
8e5ca040 500 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
501 }
502
8881ae32 503 /**
aa9eede8 504 * The pool type.
8881ae32
JB
505 *
506 * If it is `'dynamic'`, it provides the `max` property.
507 */
508 protected abstract get type (): PoolType
509
184855e6 510 /**
aa9eede8 511 * The worker type.
184855e6
JB
512 */
513 protected abstract get worker (): WorkerType
514
c2ade475 515 /**
aa9eede8 516 * The pool minimum size.
c2ade475 517 */
6b27d407 518 protected abstract get minSize (): number
ff733df7
JB
519
520 /**
aa9eede8 521 * The pool maximum size.
ff733df7 522 */
6b27d407 523 protected abstract get maxSize (): number
a35560ba 524
6b813701
JB
525 /**
526 * Checks if the worker id sent in the received message from a worker is valid.
527 *
528 * @param message - The received message.
529 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
530 */
21f710aa 531 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
532 if (message.workerId == null) {
533 throw new Error('Worker message received without worker id')
534 } else if (
21f710aa 535 message.workerId != null &&
aad6fb64 536 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
537 ) {
538 throw new Error(
539 `Worker message received from unknown worker '${message.workerId}'`
540 )
541 }
542 }
543
ffcbbad8 544 /**
f06e48d8 545 * Gets the given worker its worker node key.
ffcbbad8
JB
546 *
547 * @param worker - The worker.
f59e1027 548 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 549 */
aad6fb64 550 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 551 return this.workerNodes.findIndex(
8ebe6c30 552 (workerNode) => workerNode.worker === worker
f06e48d8 553 )
bf9549ae
JB
554 }
555
aa9eede8
JB
556 /**
557 * Gets the worker node key given its worker id.
558 *
559 * @param workerId - The worker id.
aad6fb64 560 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 561 */
aad6fb64
JB
562 private getWorkerNodeKeyByWorkerId (workerId: number): number {
563 return this.workerNodes.findIndex(
8ebe6c30 564 (workerNode) => workerNode.info.id === workerId
aad6fb64 565 )
aa9eede8
JB
566 }
567
afc003b2 568 /** @inheritDoc */
a35560ba 569 public setWorkerChoiceStrategy (
59219cbb
JB
570 workerChoiceStrategy: WorkerChoiceStrategy,
571 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 572 ): void {
aee46736 573 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 574 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
575 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
576 this.opts.workerChoiceStrategy
577 )
578 if (workerChoiceStrategyOptions != null) {
579 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
580 }
aa9eede8 581 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 582 workerNode.resetUsage()
9edb9717 583 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 584 }
a20f0ba5
JB
585 }
586
587 /** @inheritDoc */
588 public setWorkerChoiceStrategyOptions (
589 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
590 ): void {
0d80593b 591 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
592 this.opts.workerChoiceStrategyOptions = {
593 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
594 ...workerChoiceStrategyOptions
595 }
a20f0ba5
JB
596 this.workerChoiceStrategyContext.setOptions(
597 this.opts.workerChoiceStrategyOptions
a35560ba
S
598 )
599 }
600
a20f0ba5 601 /** @inheritDoc */
8f52842f
JB
602 public enableTasksQueue (
603 enable: boolean,
604 tasksQueueOptions?: TasksQueueOptions
605 ): void {
a20f0ba5 606 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 607 this.flushTasksQueues()
a20f0ba5
JB
608 }
609 this.opts.enableTasksQueue = enable
8f52842f 610 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
611 }
612
613 /** @inheritDoc */
8f52842f 614 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 615 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
616 this.checkValidTasksQueueOptions(tasksQueueOptions)
617 this.opts.tasksQueueOptions =
618 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 619 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
620 delete this.opts.tasksQueueOptions
621 }
622 }
623
624 private buildTasksQueueOptions (
625 tasksQueueOptions: TasksQueueOptions
626 ): TasksQueueOptions {
627 return {
628 concurrency: tasksQueueOptions?.concurrency ?? 1
629 }
630 }
631
c319c66b
JB
632 /**
633 * Whether the pool is full or not.
634 *
635 * The pool filling boolean status.
636 */
dea903a8
JB
637 protected get full (): boolean {
638 return this.workerNodes.length >= this.maxSize
639 }
c2ade475 640
c319c66b
JB
641 /**
642 * Whether the pool is busy or not.
643 *
644 * The pool busyness boolean status.
645 */
646 protected abstract get busy (): boolean
7c0ba920 647
6c6afb84 648 /**
3d76750a 649 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
650 *
651 * @returns Worker nodes busyness boolean status.
652 */
c2ade475 653 protected internalBusy (): boolean {
3d76750a
JB
654 if (this.opts.enableTasksQueue === true) {
655 return (
656 this.workerNodes.findIndex(
8ebe6c30 657 (workerNode) =>
3d76750a
JB
658 workerNode.info.ready &&
659 workerNode.usage.tasks.executing <
660 (this.opts.tasksQueueOptions?.concurrency as number)
661 ) === -1
662 )
663 } else {
664 return (
665 this.workerNodes.findIndex(
8ebe6c30 666 (workerNode) =>
3d76750a
JB
667 workerNode.info.ready && workerNode.usage.tasks.executing === 0
668 ) === -1
669 )
670 }
cb70b19d
JB
671 }
672
90d7d101
JB
673 /** @inheritDoc */
674 public listTaskFunctions (): string[] {
f2dbbf95
JB
675 for (const workerNode of this.workerNodes) {
676 if (
677 Array.isArray(workerNode.info.taskFunctions) &&
678 workerNode.info.taskFunctions.length > 0
679 ) {
680 return workerNode.info.taskFunctions
681 }
90d7d101 682 }
f2dbbf95 683 return []
90d7d101
JB
684 }
685
afc003b2 686 /** @inheritDoc */
7d91a8cd
JB
687 public async execute (
688 data?: Data,
689 name?: string,
690 transferList?: TransferListItem[]
691 ): Promise<Response> {
52b71763 692 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
693 if (name != null && typeof name !== 'string') {
694 reject(new TypeError('name argument must be a string'))
695 }
90d7d101
JB
696 if (
697 name != null &&
698 typeof name === 'string' &&
699 name.trim().length === 0
700 ) {
f58b60b9 701 reject(new TypeError('name argument must not be an empty string'))
90d7d101 702 }
b558f6b5
JB
703 if (transferList != null && !Array.isArray(transferList)) {
704 reject(new TypeError('transferList argument must be an array'))
705 }
706 const timestamp = performance.now()
707 const workerNodeKey = this.chooseWorkerNode()
a5d15204 708 const workerInfo = this.getWorkerInfo(workerNodeKey)
cea399c8
JB
709 if (
710 name != null &&
a5d15204
JB
711 Array.isArray(workerInfo.taskFunctions) &&
712 !workerInfo.taskFunctions.includes(name)
cea399c8 713 ) {
90d7d101
JB
714 reject(
715 new Error(`Task function '${name}' is not registered in the pool`)
716 )
717 }
501aea93 718 const task: Task<Data> = {
52b71763
JB
719 name: name ?? DEFAULT_TASK_NAME,
720 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
721 data: data ?? ({} as Data),
7d91a8cd 722 transferList,
52b71763 723 timestamp,
a5d15204 724 workerId: workerInfo.id as number,
7629bdf1 725 taskId: randomUUID()
52b71763 726 }
7629bdf1 727 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
728 resolve,
729 reject,
501aea93 730 workerNodeKey
2e81254d 731 })
52b71763 732 if (
4e377863
JB
733 this.opts.enableTasksQueue === false ||
734 (this.opts.enableTasksQueue === true &&
735 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 736 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 737 ) {
501aea93 738 this.executeTask(workerNodeKey, task)
4e377863
JB
739 } else {
740 this.enqueueTask(workerNodeKey, task)
52b71763 741 }
2e81254d 742 })
280c2a77 743 }
c97c7edb 744
afc003b2 745 /** @inheritDoc */
c97c7edb 746 public async destroy (): Promise<void> {
1fbcaa7c 747 await Promise.all(
81c02522 748 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 749 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
750 })
751 )
ef3891a3 752 this.emitter?.emit(PoolEvents.destroy)
c97c7edb
S
753 }
754
1e3214b6
JB
755 protected async sendKillMessageToWorker (
756 workerNodeKey: number,
757 workerId: number
758 ): Promise<void> {
9edb9717 759 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
760 this.registerWorkerMessageListener(workerNodeKey, (message) => {
761 if (message.kill === 'success') {
762 resolve()
763 } else if (message.kill === 'failure') {
e1af34e6 764 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
765 }
766 })
9edb9717 767 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 768 })
1e3214b6
JB
769 }
770
4a6952ff 771 /**
aa9eede8 772 * Terminates the worker node given its worker node key.
4a6952ff 773 *
aa9eede8 774 * @param workerNodeKey - The worker node key.
4a6952ff 775 */
81c02522 776 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 777
729c563d 778 /**
6677a3d3
JB
779 * Setup hook to execute code before worker nodes are created in the abstract constructor.
780 * Can be overridden.
afc003b2
JB
781 *
782 * @virtual
729c563d 783 */
280c2a77 784 protected setupHook (): void {
d99ba5a8 785 // Intentionally empty
280c2a77 786 }
c97c7edb 787
729c563d 788 /**
280c2a77
S
789 * Should return whether the worker is the main worker or not.
790 */
791 protected abstract isMain (): boolean
792
793 /**
2e81254d 794 * Hook executed before the worker task execution.
bf9549ae 795 * Can be overridden.
729c563d 796 *
f06e48d8 797 * @param workerNodeKey - The worker node key.
1c6fe997 798 * @param task - The task to execute.
729c563d 799 */
1c6fe997
JB
800 protected beforeTaskExecutionHook (
801 workerNodeKey: number,
802 task: Task<Data>
803 ): void {
f59e1027 804 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
805 ++workerUsage.tasks.executing
806 this.updateWaitTimeWorkerUsage(workerUsage, task)
db0e38ee
JB
807 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
808 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 809 workerNodeKey
db0e38ee
JB
810 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
811 ++taskFunctionWorkerUsage.tasks.executing
812 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 813 }
c97c7edb
S
814 }
815
c01733f1 816 /**
2e81254d 817 * Hook executed after the worker task execution.
bf9549ae 818 * Can be overridden.
c01733f1 819 *
501aea93 820 * @param workerNodeKey - The worker node key.
38e795c1 821 * @param message - The received message.
c01733f1 822 */
2e81254d 823 protected afterTaskExecutionHook (
501aea93 824 workerNodeKey: number,
2740a743 825 message: MessageValue<Response>
bf9549ae 826 ): void {
ff128cc9 827 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
828 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
829 this.updateRunTimeWorkerUsage(workerUsage, message)
830 this.updateEluWorkerUsage(workerUsage, message)
db0e38ee
JB
831 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
832 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 833 workerNodeKey
db0e38ee 834 ].getTaskFunctionWorkerUsage(
b558f6b5
JB
835 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
836 ) as WorkerUsage
db0e38ee
JB
837 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
838 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
839 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
840 }
841 }
842
db0e38ee
JB
843 /**
844 * Whether the worker node shall update its task function worker usage or not.
845 *
846 * @param workerNodeKey - The worker node key.
847 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
848 */
849 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 850 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 851 return (
a5d15204 852 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 853 workerInfo.taskFunctions.length > 2
b558f6b5 854 )
f1c06930
JB
855 }
856
857 private updateTaskStatisticsWorkerUsage (
858 workerUsage: WorkerUsage,
859 message: MessageValue<Response>
860 ): void {
a4e07f72 861 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
862 if (
863 workerTaskStatistics.executing != null &&
864 workerTaskStatistics.executing > 0
865 ) {
866 --workerTaskStatistics.executing
867 } else if (
868 workerTaskStatistics.executing != null &&
869 workerTaskStatistics.executing < 0
870 ) {
871 throw new Error(
cf6e3991 872 'Worker usage statistic for tasks executing cannot be negative'
5bb5be17
JB
873 )
874 }
98e72cda
JB
875 if (message.taskError == null) {
876 ++workerTaskStatistics.executed
877 } else {
a4e07f72 878 ++workerTaskStatistics.failed
2740a743 879 }
f8eb0a2a
JB
880 }
881
a4e07f72
JB
882 private updateRunTimeWorkerUsage (
883 workerUsage: WorkerUsage,
f8eb0a2a
JB
884 message: MessageValue<Response>
885 ): void {
e4f20deb
JB
886 updateMeasurementStatistics(
887 workerUsage.runTime,
888 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
889 message.taskPerformance?.runTime ?? 0,
890 workerUsage.tasks.executed
891 )
f8eb0a2a
JB
892 }
893
a4e07f72
JB
894 private updateWaitTimeWorkerUsage (
895 workerUsage: WorkerUsage,
1c6fe997 896 task: Task<Data>
f8eb0a2a 897 ): void {
1c6fe997
JB
898 const timestamp = performance.now()
899 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
900 updateMeasurementStatistics(
901 workerUsage.waitTime,
902 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
903 taskWaitTime,
904 workerUsage.tasks.executed
905 )
c01733f1 906 }
907
a4e07f72 908 private updateEluWorkerUsage (
5df69fab 909 workerUsage: WorkerUsage,
62c15a68
JB
910 message: MessageValue<Response>
911 ): void {
008512c7
JB
912 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
913 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
914 updateMeasurementStatistics(
915 workerUsage.elu.active,
008512c7 916 eluTaskStatisticsRequirements,
e4f20deb
JB
917 message.taskPerformance?.elu?.active ?? 0,
918 workerUsage.tasks.executed
919 )
920 updateMeasurementStatistics(
921 workerUsage.elu.idle,
008512c7 922 eluTaskStatisticsRequirements,
e4f20deb
JB
923 message.taskPerformance?.elu?.idle ?? 0,
924 workerUsage.tasks.executed
925 )
008512c7 926 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 927 if (message.taskPerformance?.elu != null) {
f7510105
JB
928 if (workerUsage.elu.utilization != null) {
929 workerUsage.elu.utilization =
930 (workerUsage.elu.utilization +
931 message.taskPerformance.elu.utilization) /
932 2
933 } else {
934 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
935 }
62c15a68
JB
936 }
937 }
938 }
939
280c2a77 940 /**
f06e48d8 941 * Chooses a worker node for the next task.
280c2a77 942 *
6c6afb84 943 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 944 *
aa9eede8 945 * @returns The chosen worker node key
280c2a77 946 */
6c6afb84 947 private chooseWorkerNode (): number {
930dcf12 948 if (this.shallCreateDynamicWorker()) {
aa9eede8 949 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
950 if (
951 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
952 ) {
aa9eede8 953 return workerNodeKey
6c6afb84 954 }
17393ac8 955 }
930dcf12
JB
956 return this.workerChoiceStrategyContext.execute()
957 }
958
6c6afb84
JB
959 /**
960 * Conditions for dynamic worker creation.
961 *
962 * @returns Whether to create a dynamic worker or not.
963 */
964 private shallCreateDynamicWorker (): boolean {
930dcf12 965 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
966 }
967
280c2a77 968 /**
aa9eede8 969 * Sends a message to worker given its worker node key.
280c2a77 970 *
aa9eede8 971 * @param workerNodeKey - The worker node key.
38e795c1 972 * @param message - The message.
7d91a8cd 973 * @param transferList - The optional array of transferable objects.
280c2a77
S
974 */
975 protected abstract sendToWorker (
aa9eede8 976 workerNodeKey: number,
7d91a8cd
JB
977 message: MessageValue<Data>,
978 transferList?: TransferListItem[]
280c2a77
S
979 ): void
980
729c563d 981 /**
41344292 982 * Creates a new worker.
6c6afb84
JB
983 *
984 * @returns Newly created worker.
729c563d 985 */
280c2a77 986 protected abstract createWorker (): Worker
c97c7edb 987
4a6952ff 988 /**
aa9eede8 989 * Creates a new, completely set up worker node.
4a6952ff 990 *
aa9eede8 991 * @returns New, completely set up worker node key.
4a6952ff 992 */
aa9eede8 993 protected createAndSetupWorkerNode (): number {
bdacc2d2 994 const worker = this.createWorker()
280c2a77 995
fd04474e 996 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 997 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 998 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 999 worker.on('error', (error) => {
aad6fb64 1000 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
1001 const workerInfo = this.getWorkerInfo(workerNodeKey)
1002 workerInfo.ready = false
0dc838e3 1003 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1004 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 1005 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 1006 if (workerInfo.dynamic) {
aa9eede8 1007 this.createAndSetupDynamicWorkerNode()
8a1260a3 1008 } else {
aa9eede8 1009 this.createAndSetupWorkerNode()
8a1260a3 1010 }
5baee0d7 1011 }
19dbc45b 1012 if (this.opts.enableTasksQueue === true) {
9b106837 1013 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1014 }
5baee0d7 1015 })
a35560ba 1016 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1017 worker.once('exit', () => {
f06e48d8 1018 this.removeWorkerNode(worker)
a974afa6 1019 })
280c2a77 1020
aa9eede8 1021 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1022
aa9eede8 1023 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1024
aa9eede8 1025 return workerNodeKey
c97c7edb 1026 }
be0676b3 1027
930dcf12 1028 /**
aa9eede8 1029 * Creates a new, completely set up dynamic worker node.
930dcf12 1030 *
aa9eede8 1031 * @returns New, completely set up dynamic worker node key.
930dcf12 1032 */
aa9eede8
JB
1033 protected createAndSetupDynamicWorkerNode (): number {
1034 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1035 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1036 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1037 message.workerId
aad6fb64 1038 )
aa9eede8 1039 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1040 // Kill message received from worker
930dcf12
JB
1041 if (
1042 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1043 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1044 ((this.opts.enableTasksQueue === false &&
aa9eede8 1045 workerUsage.tasks.executing === 0) ||
7b56f532 1046 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1047 workerUsage.tasks.executing === 0 &&
1048 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1049 ) {
5270d253
JB
1050 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1051 this.emitter?.emit(PoolEvents.error, error)
1052 })
930dcf12
JB
1053 }
1054 })
aa9eede8 1055 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1056 this.sendToWorker(workerNodeKey, {
b0a4db63 1057 checkActive: true,
21f710aa
JB
1058 workerId: workerInfo.id as number
1059 })
b5e113f6
JB
1060 workerInfo.dynamic = true
1061 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1062 workerInfo.ready = true
1063 }
aa9eede8 1064 return workerNodeKey
930dcf12
JB
1065 }
1066
a2ed5053 1067 /**
aa9eede8 1068 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1069 *
aa9eede8 1070 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1071 * @param listener - The message listener callback.
1072 */
85aeb3f3
JB
1073 protected abstract registerWorkerMessageListener<
1074 Message extends Data | Response
aa9eede8
JB
1075 >(
1076 workerNodeKey: number,
1077 listener: (message: MessageValue<Message>) => void
1078 ): void
a2ed5053
JB
1079
1080 /**
aa9eede8 1081 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1082 * Can be overridden.
1083 *
aa9eede8 1084 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1085 */
aa9eede8 1086 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1087 // Listen to worker messages.
aa9eede8 1088 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1089 // Send the startup message to worker.
aa9eede8 1090 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1091 // Send the statistics message to worker.
1092 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1093 }
1094
85aeb3f3 1095 /**
aa9eede8
JB
1096 * Sends the startup message to worker given its worker node key.
1097 *
1098 * @param workerNodeKey - The worker node key.
1099 */
1100 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1101
1102 /**
9edb9717 1103 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1104 *
aa9eede8 1105 * @param workerNodeKey - The worker node key.
85aeb3f3 1106 */
9edb9717 1107 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1108 this.sendToWorker(workerNodeKey, {
1109 statistics: {
1110 runTime:
1111 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1112 .runTime.aggregate,
1113 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1114 .elu.aggregate
1115 },
1116 workerId: this.getWorkerInfo(workerNodeKey).id as number
1117 })
1118 }
a2ed5053
JB
1119
1120 private redistributeQueuedTasks (workerNodeKey: number): void {
1121 while (this.tasksQueueSize(workerNodeKey) > 0) {
1122 let targetWorkerNodeKey: number = workerNodeKey
1123 let minQueuedTasks = Infinity
10ecf8fd 1124 let executeTask = false
a2ed5053
JB
1125 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1126 const workerInfo = this.getWorkerInfo(workerNodeId)
1127 if (
1128 workerNodeId !== workerNodeKey &&
1129 workerInfo.ready &&
1130 workerNode.usage.tasks.queued === 0
1131 ) {
a5ed75b7
JB
1132 if (
1133 this.workerNodes[workerNodeId].usage.tasks.executing <
1134 (this.opts.tasksQueueOptions?.concurrency as number)
1135 ) {
10ecf8fd
JB
1136 executeTask = true
1137 }
a2ed5053
JB
1138 targetWorkerNodeKey = workerNodeId
1139 break
1140 }
1141 if (
1142 workerNodeId !== workerNodeKey &&
1143 workerInfo.ready &&
1144 workerNode.usage.tasks.queued < minQueuedTasks
1145 ) {
1146 minQueuedTasks = workerNode.usage.tasks.queued
1147 targetWorkerNodeKey = workerNodeId
1148 }
1149 }
10ecf8fd
JB
1150 if (executeTask) {
1151 this.executeTask(
1152 targetWorkerNodeKey,
1153 this.dequeueTask(workerNodeKey) as Task<Data>
1154 )
1155 } else {
1156 this.enqueueTask(
1157 targetWorkerNodeKey,
1158 this.dequeueTask(workerNodeKey) as Task<Data>
1159 )
1160 }
a2ed5053
JB
1161 }
1162 }
1163
be0676b3 1164 /**
aa9eede8 1165 * This method is the listener registered for each worker message.
be0676b3 1166 *
bdacc2d2 1167 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1168 */
1169 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1170 return (message) => {
21f710aa 1171 this.checkMessageWorkerId(message)
a5d15204 1172 if (message.ready != null && message.taskFunctions != null) {
81c02522 1173 // Worker ready response received from worker
10e2aa7e 1174 this.handleWorkerReadyResponse(message)
7629bdf1 1175 } else if (message.taskId != null) {
81c02522 1176 // Task execution response received from worker
6b272951 1177 this.handleTaskExecutionResponse(message)
90d7d101
JB
1178 } else if (message.taskFunctions != null) {
1179 // Task functions message received from worker
b558f6b5
JB
1180 this.getWorkerInfo(
1181 this.getWorkerNodeKeyByWorkerId(message.workerId)
1182 ).taskFunctions = message.taskFunctions
6b272951
JB
1183 }
1184 }
1185 }
1186
10e2aa7e 1187 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1188 if (message.ready === false) {
1189 throw new Error(`Worker ${message.workerId} failed to initialize`)
1190 }
a5d15204 1191 const workerInfo = this.getWorkerInfo(
aad6fb64 1192 this.getWorkerNodeKeyByWorkerId(message.workerId)
a5d15204
JB
1193 )
1194 workerInfo.ready = message.ready as boolean
1195 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1196 if (this.emitter != null && this.ready) {
1197 this.emitter.emit(PoolEvents.ready, this.info)
1198 }
6b272951
JB
1199 }
1200
1201 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1202 const { taskId, taskError, data } = message
1203 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1204 if (promiseResponse != null) {
5441aea6
JB
1205 if (taskError != null) {
1206 this.emitter?.emit(PoolEvents.taskError, taskError)
1207 promiseResponse.reject(taskError.message)
6b272951 1208 } else {
5441aea6 1209 promiseResponse.resolve(data as Response)
6b272951 1210 }
501aea93
JB
1211 const workerNodeKey = promiseResponse.workerNodeKey
1212 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1213 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1214 if (
1215 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1216 this.tasksQueueSize(workerNodeKey) > 0 &&
1217 this.workerNodes[workerNodeKey].usage.tasks.executing <
1218 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1219 ) {
1220 this.executeTask(
1221 workerNodeKey,
1222 this.dequeueTask(workerNodeKey) as Task<Data>
1223 )
be0676b3 1224 }
6b272951 1225 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1226 }
be0676b3 1227 }
7c0ba920 1228
a1763c54 1229 private checkAndEmitTaskExecutionEvents (): void {
1f68cede 1230 if (this.emitter != null) {
ff733df7 1231 if (this.busy) {
2845f2a5 1232 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1233 }
6b27d407 1234 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1235 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1236 }
a1763c54
JB
1237 }
1238 }
1239
1240 private checkAndEmitTaskQueuingEvents (): void {
1241 if (this.hasBackPressure()) {
1242 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1243 }
1244 }
1245
8a1260a3 1246 /**
aa9eede8 1247 * Gets the worker information given its worker node key.
8a1260a3
JB
1248 *
1249 * @param workerNodeKey - The worker node key.
3f09ed9f 1250 * @returns The worker information.
8a1260a3 1251 */
aa9eede8 1252 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1253 return this.workerNodes[workerNodeKey].info
e221309a
JB
1254 }
1255
a05c10de 1256 /**
b0a4db63 1257 * Adds the given worker in the pool worker nodes.
ea7a90d3 1258 *
38e795c1 1259 * @param worker - The worker.
aa9eede8
JB
1260 * @returns The added worker node key.
1261 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1262 */
b0a4db63 1263 private addWorkerNode (worker: Worker): number {
671d5154
JB
1264 const workerNode = new WorkerNode<Worker, Data>(
1265 worker,
1266 this.worker,
1267 this.maxSize
1268 )
b97d82d8 1269 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1270 if (this.starting) {
1271 workerNode.info.ready = true
1272 }
aa9eede8 1273 this.workerNodes.push(workerNode)
aad6fb64 1274 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1275 if (workerNodeKey === -1) {
a5d15204 1276 throw new Error('Worker node added not found')
aa9eede8
JB
1277 }
1278 return workerNodeKey
ea7a90d3 1279 }
c923ce56 1280
51fe3d3c 1281 /**
f06e48d8 1282 * Removes the given worker from the pool worker nodes.
51fe3d3c 1283 *
f06e48d8 1284 * @param worker - The worker.
51fe3d3c 1285 */
416fd65c 1286 private removeWorkerNode (worker: Worker): void {
aad6fb64 1287 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1288 if (workerNodeKey !== -1) {
1289 this.workerNodes.splice(workerNodeKey, 1)
1290 this.workerChoiceStrategyContext.remove(workerNodeKey)
1291 }
51fe3d3c 1292 }
adc3c320 1293
e2b31e32
JB
1294 /** @inheritDoc */
1295 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1296 return (
e2b31e32
JB
1297 this.opts.enableTasksQueue === true &&
1298 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1299 )
1300 }
1301
1302 private hasBackPressure (): boolean {
1303 return (
1304 this.opts.enableTasksQueue === true &&
1305 this.workerNodes.findIndex(
1306 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1307 ) === -1
9e844245 1308 )
e2b31e32
JB
1309 }
1310
b0a4db63 1311 /**
aa9eede8 1312 * Executes the given task on the worker given its worker node key.
b0a4db63 1313 *
aa9eede8 1314 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1315 * @param task - The task to execute.
1316 */
2e81254d 1317 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1318 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1319 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1320 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1321 }
1322
f9f00b5f 1323 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1324 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1325 this.checkAndEmitTaskQueuingEvents()
1326 return tasksQueueSize
adc3c320
JB
1327 }
1328
416fd65c 1329 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1330 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1331 }
1332
416fd65c 1333 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1334 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1335 }
1336
81c02522 1337 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1338 while (this.tasksQueueSize(workerNodeKey) > 0) {
1339 this.executeTask(
1340 workerNodeKey,
1341 this.dequeueTask(workerNodeKey) as Task<Data>
1342 )
ff733df7 1343 }
4b628b48 1344 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1345 }
1346
ef41a6e6
JB
1347 private flushTasksQueues (): void {
1348 for (const [workerNodeKey] of this.workerNodes.entries()) {
1349 this.flushTasksQueue(workerNodeKey)
1350 }
1351 }
c97c7edb 1352}