chore: v2.6.32
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
8735b4e5
JB
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
075e51d1 92 /**
adc9cc64 93 * Whether the pool is starting or not.
075e51d1
JB
94 */
95 private readonly starting: boolean
15b176e0
JB
96 /**
97 * Whether the pool is started or not.
98 */
99 private started: boolean
afe0d5bf
JB
100 /**
101 * The start timestamp of the pool.
102 */
103 private readonly startTimestamp
104
729c563d
S
105 /**
106 * Constructs a new poolifier pool.
107 *
38e795c1 108 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 109 * @param filePath - Path to the worker file.
38e795c1 110 * @param opts - Options for the pool.
729c563d 111 */
c97c7edb 112 public constructor (
b4213b7f
JB
113 protected readonly numberOfWorkers: number,
114 protected readonly filePath: string,
115 protected readonly opts: PoolOptions<Worker>
c97c7edb 116 ) {
78cea37e 117 if (!this.isMain()) {
04f45163 118 throw new Error(
8c6d4acf 119 'Cannot start a pool from a worker with the same type as the pool'
04f45163 120 )
c97c7edb 121 }
8d3782fa 122 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 123 this.checkFilePath(this.filePath)
7c0ba920 124 this.checkPoolOptions(this.opts)
1086026a 125
7254e419
JB
126 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
127 this.executeTask = this.executeTask.bind(this)
128 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 129
6bd72cd0 130 if (this.opts.enableEvents === true) {
7c0ba920
JB
131 this.emitter = new PoolEmitter()
132 }
d59df138
JB
133 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
134 Worker,
135 Data,
136 Response
da309861
JB
137 >(
138 this,
139 this.opts.workerChoiceStrategy,
140 this.opts.workerChoiceStrategyOptions
141 )
b6b32453
JB
142
143 this.setupHook()
144
075e51d1 145 this.starting = true
e761c033 146 this.startPool()
075e51d1 147 this.starting = false
15b176e0 148 this.started = true
afe0d5bf
JB
149
150 this.startTimestamp = performance.now()
c97c7edb
S
151 }
152
a35560ba 153 private checkFilePath (filePath: string): void {
ffcbbad8
JB
154 if (
155 filePath == null ||
3d6dd312 156 typeof filePath !== 'string' ||
ffcbbad8
JB
157 (typeof filePath === 'string' && filePath.trim().length === 0)
158 ) {
c510fea7
APA
159 throw new Error('Please specify a file with a worker implementation')
160 }
3d6dd312
JB
161 if (!existsSync(filePath)) {
162 throw new Error(`Cannot find the worker file '${filePath}'`)
163 }
c510fea7
APA
164 }
165
8d3782fa
JB
166 private checkNumberOfWorkers (numberOfWorkers: number): void {
167 if (numberOfWorkers == null) {
168 throw new Error(
169 'Cannot instantiate a pool without specifying the number of workers'
170 )
78cea37e 171 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 172 throw new TypeError(
0d80593b 173 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
174 )
175 } else if (numberOfWorkers < 0) {
473c717a 176 throw new RangeError(
8d3782fa
JB
177 'Cannot instantiate a pool with a negative number of workers'
178 )
6b27d407 179 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
180 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
181 }
182 }
183
184 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 185 if (this.type === PoolTypes.dynamic) {
a5ed75b7 186 if (max == null) {
e695d66f 187 throw new TypeError(
a5ed75b7
JB
188 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
189 )
190 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
193 )
194 } else if (min > max) {
079de991
JB
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 )
b97d82d8 198 } else if (max === 0) {
079de991 199 throw new RangeError(
d640b48b 200 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
201 )
202 } else if (min === max) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
205 )
206 }
8d3782fa
JB
207 }
208 }
209
7c0ba920 210 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
211 if (isPlainObject(opts)) {
212 this.opts.workerChoiceStrategy =
213 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
215 this.opts.workerChoiceStrategyOptions = {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
217 ...opts.workerChoiceStrategyOptions
218 }
49be33fe
JB
219 this.checkValidWorkerChoiceStrategyOptions(
220 this.opts.workerChoiceStrategyOptions
221 )
1f68cede 222 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
223 this.opts.enableEvents = opts.enableEvents ?? true
224 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
225 if (this.opts.enableTasksQueue) {
226 this.checkValidTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 235 }
aee46736
JB
236 }
237
238 private checkValidWorkerChoiceStrategy (
239 workerChoiceStrategy: WorkerChoiceStrategy
240 ): void {
241 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 242 throw new Error(
aee46736 243 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
244 )
245 }
7c0ba920
JB
246 }
247
0d80593b
JB
248 private checkValidWorkerChoiceStrategyOptions (
249 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
250 ): void {
251 if (!isPlainObject(workerChoiceStrategyOptions)) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: must be a plain object'
254 )
255 }
8990357d
JB
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
259 ) {
260 throw new TypeError(
261 'Invalid worker choice strategy options: choice retries must be an integer'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions.choiceRetries != null &&
266 workerChoiceStrategyOptions.choiceRetries <= 0
267 ) {
268 throw new RangeError(
269 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
270 )
271 }
49be33fe
JB
272 if (
273 workerChoiceStrategyOptions.weights != null &&
6b27d407 274 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
275 ) {
276 throw new Error(
277 'Invalid worker choice strategy options: must have a weight for each worker node'
278 )
279 }
f0d7f803
JB
280 if (
281 workerChoiceStrategyOptions.measurement != null &&
282 !Object.values(Measurements).includes(
283 workerChoiceStrategyOptions.measurement
284 )
285 ) {
286 throw new Error(
287 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
288 )
289 }
0d80593b
JB
290 }
291
a20f0ba5
JB
292 private checkValidTasksQueueOptions (
293 tasksQueueOptions: TasksQueueOptions
294 ): void {
0d80593b
JB
295 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
296 throw new TypeError('Invalid tasks queue options: must be a plain object')
297 }
f0d7f803
JB
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 !Number.isSafeInteger(tasksQueueOptions.concurrency)
301 ) {
302 throw new TypeError(
20c6f652 303 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
304 )
305 }
306 if (
307 tasksQueueOptions?.concurrency != null &&
308 tasksQueueOptions.concurrency <= 0
309 ) {
e695d66f 310 throw new RangeError(
20c6f652
JB
311 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
312 )
313 }
314 if (
315 tasksQueueOptions?.queueMaxSize != null &&
316 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
317 ) {
318 throw new TypeError(
319 'Invalid worker node tasks queue max size: must be an integer'
320 )
321 }
322 if (
323 tasksQueueOptions?.queueMaxSize != null &&
324 tasksQueueOptions.queueMaxSize <= 0
325 ) {
326 throw new RangeError(
327 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
a20f0ba5
JB
328 )
329 }
330 }
331
e761c033
JB
332 private startPool (): void {
333 while (
334 this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
337 0
338 ) < this.numberOfWorkers
339 ) {
aa9eede8 340 this.createAndSetupWorkerNode()
e761c033
JB
341 }
342 }
343
08f3f44c 344 /** @inheritDoc */
6b27d407
JB
345 public get info (): PoolInfo {
346 return {
23ccf9d7 347 version,
6b27d407 348 type: this.type,
184855e6 349 worker: this.worker,
2431bdb4
JB
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
352 minSize: this.minSize,
353 maxSize: this.maxSize,
c05f0d50
JB
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
1305e9a8
JB
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
f59e1027 361 workerNode.usage.tasks.executing === 0
a4e07f72
JB
362 ? accumulator + 1
363 : accumulator,
6b27d407
JB
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
f59e1027 368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
369 0
370 ),
a4e07f72 371 executedTasks: this.workerNodes.reduce(
6b27d407 372 (accumulator, workerNode) =>
f59e1027 373 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
379 0
380 ),
daf86646
JB
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
a1763c54
JB
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
a4e07f72
JB
398 failedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
f59e1027 400 accumulator + workerNode.usage.tasks.failed,
a4e07f72 401 0
1dcf8b7b
JB
402 ),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .runTime.aggregate && {
405 runTime: {
98e72cda
JB
406 minimum: round(
407 Math.min(
408 ...this.workerNodes.map(
8ebe6c30 409 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 410 )
1dcf8b7b
JB
411 )
412 ),
98e72cda
JB
413 maximum: round(
414 Math.max(
415 ...this.workerNodes.map(
8ebe6c30 416 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 417 )
1dcf8b7b 418 )
98e72cda
JB
419 ),
420 average: round(
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
424 0
425 ) /
426 this.workerNodes.reduce(
427 (accumulator, workerNode) =>
428 accumulator + (workerNode.usage.tasks?.executed ?? 0),
429 0
430 )
431 ),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.median && {
434 median: round(
435 median(
436 this.workerNodes.map(
8ebe6c30 437 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
438 )
439 )
440 )
441 })
1dcf8b7b
JB
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
98e72cda
JB
447 minimum: round(
448 Math.min(
449 ...this.workerNodes.map(
8ebe6c30 450 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 451 )
1dcf8b7b
JB
452 )
453 ),
98e72cda
JB
454 maximum: round(
455 Math.max(
456 ...this.workerNodes.map(
8ebe6c30 457 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 458 )
1dcf8b7b 459 )
98e72cda
JB
460 ),
461 average: round(
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
465 0
466 ) /
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.tasks?.executed ?? 0),
470 0
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
477 this.workerNodes.map(
8ebe6c30 478 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
479 )
480 )
481 )
482 })
1dcf8b7b
JB
483 }
484 })
6b27d407
JB
485 }
486 }
08f3f44c 487
aa9eede8
JB
488 /**
489 * The pool readiness boolean status.
490 */
2431bdb4
JB
491 private get ready (): boolean {
492 return (
b97d82d8
JB
493 this.workerNodes.reduce(
494 (accumulator, workerNode) =>
495 !workerNode.info.dynamic && workerNode.info.ready
496 ? accumulator + 1
497 : accumulator,
498 0
499 ) >= this.minSize
2431bdb4
JB
500 )
501 }
502
afe0d5bf 503 /**
aa9eede8 504 * The approximate pool utilization.
afe0d5bf
JB
505 *
506 * @returns The pool utilization.
507 */
508 private get utilization (): number {
8e5ca040 509 const poolTimeCapacity =
fe7d90db 510 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
511 const totalTasksRunTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
71514351 513 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
514 0
515 )
516 const totalTasksWaitTime = this.workerNodes.reduce(
517 (accumulator, workerNode) =>
71514351 518 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
519 0
520 )
8e5ca040 521 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
522 }
523
8881ae32 524 /**
aa9eede8 525 * The pool type.
8881ae32
JB
526 *
527 * If it is `'dynamic'`, it provides the `max` property.
528 */
529 protected abstract get type (): PoolType
530
184855e6 531 /**
aa9eede8 532 * The worker type.
184855e6
JB
533 */
534 protected abstract get worker (): WorkerType
535
c2ade475 536 /**
aa9eede8 537 * The pool minimum size.
c2ade475 538 */
8735b4e5
JB
539 protected get minSize (): number {
540 return this.numberOfWorkers
541 }
ff733df7
JB
542
543 /**
aa9eede8 544 * The pool maximum size.
ff733df7 545 */
8735b4e5
JB
546 protected get maxSize (): number {
547 return this.max ?? this.numberOfWorkers
548 }
a35560ba 549
6b813701
JB
550 /**
551 * Checks if the worker id sent in the received message from a worker is valid.
552 *
553 * @param message - The received message.
554 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
555 */
21f710aa 556 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
557 if (message.workerId == null) {
558 throw new Error('Worker message received without worker id')
559 } else if (
21f710aa 560 message.workerId != null &&
aad6fb64 561 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
562 ) {
563 throw new Error(
564 `Worker message received from unknown worker '${message.workerId}'`
565 )
566 }
567 }
568
ffcbbad8 569 /**
f06e48d8 570 * Gets the given worker its worker node key.
ffcbbad8
JB
571 *
572 * @param worker - The worker.
f59e1027 573 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 574 */
aad6fb64 575 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 576 return this.workerNodes.findIndex(
8ebe6c30 577 (workerNode) => workerNode.worker === worker
f06e48d8 578 )
bf9549ae
JB
579 }
580
aa9eede8
JB
581 /**
582 * Gets the worker node key given its worker id.
583 *
584 * @param workerId - The worker id.
aad6fb64 585 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 586 */
aad6fb64
JB
587 private getWorkerNodeKeyByWorkerId (workerId: number): number {
588 return this.workerNodes.findIndex(
8ebe6c30 589 (workerNode) => workerNode.info.id === workerId
aad6fb64 590 )
aa9eede8
JB
591 }
592
afc003b2 593 /** @inheritDoc */
a35560ba 594 public setWorkerChoiceStrategy (
59219cbb
JB
595 workerChoiceStrategy: WorkerChoiceStrategy,
596 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 597 ): void {
aee46736 598 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 599 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
600 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
601 this.opts.workerChoiceStrategy
602 )
603 if (workerChoiceStrategyOptions != null) {
604 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
605 }
aa9eede8 606 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 607 workerNode.resetUsage()
9edb9717 608 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 609 }
a20f0ba5
JB
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategyOptions (
614 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
615 ): void {
0d80593b 616 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
617 this.opts.workerChoiceStrategyOptions = {
618 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
619 ...workerChoiceStrategyOptions
620 }
a20f0ba5
JB
621 this.workerChoiceStrategyContext.setOptions(
622 this.opts.workerChoiceStrategyOptions
a35560ba
S
623 )
624 }
625
a20f0ba5 626 /** @inheritDoc */
8f52842f
JB
627 public enableTasksQueue (
628 enable: boolean,
629 tasksQueueOptions?: TasksQueueOptions
630 ): void {
a20f0ba5 631 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 632 this.flushTasksQueues()
a20f0ba5
JB
633 }
634 this.opts.enableTasksQueue = enable
8f52842f 635 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
636 }
637
638 /** @inheritDoc */
8f52842f 639 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 640 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
641 this.checkValidTasksQueueOptions(tasksQueueOptions)
642 this.opts.tasksQueueOptions =
643 this.buildTasksQueueOptions(tasksQueueOptions)
20c6f652
JB
644 this.setTasksQueueMaxSize(
645 this.opts.tasksQueueOptions.queueMaxSize as number
646 )
5baee0d7 647 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
20c6f652
JB
652 private setTasksQueueMaxSize (queueMaxSize: number): void {
653 for (const workerNode of this.workerNodes) {
654 workerNode.tasksQueueBackPressureSize = queueMaxSize
655 }
656 }
657
a20f0ba5
JB
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
20c6f652
JB
662 ...{
663 queueMaxSize: Math.pow(this.maxSize, 2),
664 concurrency: 1
665 },
666 ...tasksQueueOptions
a20f0ba5
JB
667 }
668 }
669
c319c66b
JB
670 /**
671 * Whether the pool is full or not.
672 *
673 * The pool filling boolean status.
674 */
dea903a8
JB
675 protected get full (): boolean {
676 return this.workerNodes.length >= this.maxSize
677 }
c2ade475 678
c319c66b
JB
679 /**
680 * Whether the pool is busy or not.
681 *
682 * The pool busyness boolean status.
683 */
684 protected abstract get busy (): boolean
7c0ba920 685
6c6afb84 686 /**
3d76750a 687 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
688 *
689 * @returns Worker nodes busyness boolean status.
690 */
c2ade475 691 protected internalBusy (): boolean {
3d76750a
JB
692 if (this.opts.enableTasksQueue === true) {
693 return (
694 this.workerNodes.findIndex(
8ebe6c30 695 (workerNode) =>
3d76750a
JB
696 workerNode.info.ready &&
697 workerNode.usage.tasks.executing <
698 (this.opts.tasksQueueOptions?.concurrency as number)
699 ) === -1
700 )
701 } else {
702 return (
703 this.workerNodes.findIndex(
8ebe6c30 704 (workerNode) =>
3d76750a
JB
705 workerNode.info.ready && workerNode.usage.tasks.executing === 0
706 ) === -1
707 )
708 }
cb70b19d
JB
709 }
710
90d7d101
JB
711 /** @inheritDoc */
712 public listTaskFunctions (): string[] {
f2dbbf95
JB
713 for (const workerNode of this.workerNodes) {
714 if (
715 Array.isArray(workerNode.info.taskFunctions) &&
716 workerNode.info.taskFunctions.length > 0
717 ) {
718 return workerNode.info.taskFunctions
719 }
90d7d101 720 }
f2dbbf95 721 return []
90d7d101
JB
722 }
723
afc003b2 724 /** @inheritDoc */
7d91a8cd
JB
725 public async execute (
726 data?: Data,
727 name?: string,
728 transferList?: TransferListItem[]
729 ): Promise<Response> {
52b71763 730 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
731 if (!this.started) {
732 reject(new Error('Cannot execute a task on destroyed pool'))
733 }
7d91a8cd
JB
734 if (name != null && typeof name !== 'string') {
735 reject(new TypeError('name argument must be a string'))
736 }
90d7d101
JB
737 if (
738 name != null &&
739 typeof name === 'string' &&
740 name.trim().length === 0
741 ) {
f58b60b9 742 reject(new TypeError('name argument must not be an empty string'))
90d7d101 743 }
b558f6b5
JB
744 if (transferList != null && !Array.isArray(transferList)) {
745 reject(new TypeError('transferList argument must be an array'))
746 }
747 const timestamp = performance.now()
748 const workerNodeKey = this.chooseWorkerNode()
94407def 749 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
750 if (
751 name != null &&
a5d15204
JB
752 Array.isArray(workerInfo.taskFunctions) &&
753 !workerInfo.taskFunctions.includes(name)
cea399c8 754 ) {
90d7d101
JB
755 reject(
756 new Error(`Task function '${name}' is not registered in the pool`)
757 )
758 }
501aea93 759 const task: Task<Data> = {
52b71763
JB
760 name: name ?? DEFAULT_TASK_NAME,
761 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
762 data: data ?? ({} as Data),
7d91a8cd 763 transferList,
52b71763 764 timestamp,
a5d15204 765 workerId: workerInfo.id as number,
7629bdf1 766 taskId: randomUUID()
52b71763 767 }
7629bdf1 768 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
769 resolve,
770 reject,
501aea93 771 workerNodeKey
2e81254d 772 })
52b71763 773 if (
4e377863
JB
774 this.opts.enableTasksQueue === false ||
775 (this.opts.enableTasksQueue === true &&
776 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 777 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 778 ) {
501aea93 779 this.executeTask(workerNodeKey, task)
4e377863
JB
780 } else {
781 this.enqueueTask(workerNodeKey, task)
52b71763 782 }
2e81254d 783 })
280c2a77 784 }
c97c7edb 785
afc003b2 786 /** @inheritDoc */
c97c7edb 787 public async destroy (): Promise<void> {
1fbcaa7c 788 await Promise.all(
81c02522 789 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 790 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
791 })
792 )
33e6bb4c 793 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 794 this.started = false
c97c7edb
S
795 }
796
1e3214b6
JB
797 protected async sendKillMessageToWorker (
798 workerNodeKey: number,
799 workerId: number
800 ): Promise<void> {
9edb9717 801 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
802 this.registerWorkerMessageListener(workerNodeKey, (message) => {
803 if (message.kill === 'success') {
804 resolve()
805 } else if (message.kill === 'failure') {
e1af34e6 806 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
807 }
808 })
9edb9717 809 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 810 })
1e3214b6
JB
811 }
812
4a6952ff 813 /**
aa9eede8 814 * Terminates the worker node given its worker node key.
4a6952ff 815 *
aa9eede8 816 * @param workerNodeKey - The worker node key.
4a6952ff 817 */
81c02522 818 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 819
729c563d 820 /**
6677a3d3
JB
821 * Setup hook to execute code before worker nodes are created in the abstract constructor.
822 * Can be overridden.
afc003b2
JB
823 *
824 * @virtual
729c563d 825 */
280c2a77 826 protected setupHook (): void {
d99ba5a8 827 // Intentionally empty
280c2a77 828 }
c97c7edb 829
729c563d 830 /**
280c2a77
S
831 * Should return whether the worker is the main worker or not.
832 */
833 protected abstract isMain (): boolean
834
835 /**
2e81254d 836 * Hook executed before the worker task execution.
bf9549ae 837 * Can be overridden.
729c563d 838 *
f06e48d8 839 * @param workerNodeKey - The worker node key.
1c6fe997 840 * @param task - The task to execute.
729c563d 841 */
1c6fe997
JB
842 protected beforeTaskExecutionHook (
843 workerNodeKey: number,
844 task: Task<Data>
845 ): void {
94407def
JB
846 if (this.workerNodes[workerNodeKey]?.usage != null) {
847 const workerUsage = this.workerNodes[workerNodeKey].usage
848 ++workerUsage.tasks.executing
849 this.updateWaitTimeWorkerUsage(workerUsage, task)
850 }
851 if (
852 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
853 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
854 task.name as string
855 ) != null
856 ) {
db0e38ee 857 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 858 workerNodeKey
db0e38ee 859 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
860 ++taskFunctionWorkerUsage.tasks.executing
861 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 862 }
c97c7edb
S
863 }
864
c01733f1 865 /**
2e81254d 866 * Hook executed after the worker task execution.
bf9549ae 867 * Can be overridden.
c01733f1 868 *
501aea93 869 * @param workerNodeKey - The worker node key.
38e795c1 870 * @param message - The received message.
c01733f1 871 */
2e81254d 872 protected afterTaskExecutionHook (
501aea93 873 workerNodeKey: number,
2740a743 874 message: MessageValue<Response>
bf9549ae 875 ): void {
94407def
JB
876 if (this.workerNodes[workerNodeKey]?.usage != null) {
877 const workerUsage = this.workerNodes[workerNodeKey].usage
878 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
879 this.updateRunTimeWorkerUsage(workerUsage, message)
880 this.updateEluWorkerUsage(workerUsage, message)
881 }
882 if (
883 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
884 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 885 message.taskPerformance?.name as string
94407def
JB
886 ) != null
887 ) {
db0e38ee 888 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 889 workerNodeKey
db0e38ee 890 ].getTaskFunctionWorkerUsage(
0628755c 891 message.taskPerformance?.name as string
b558f6b5 892 ) as WorkerUsage
db0e38ee
JB
893 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
894 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
895 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
896 }
897 }
898
db0e38ee
JB
899 /**
900 * Whether the worker node shall update its task function worker usage or not.
901 *
902 * @param workerNodeKey - The worker node key.
903 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
904 */
905 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 906 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 907 return (
94407def 908 workerInfo != null &&
a5d15204 909 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 910 workerInfo.taskFunctions.length > 2
b558f6b5 911 )
f1c06930
JB
912 }
913
914 private updateTaskStatisticsWorkerUsage (
915 workerUsage: WorkerUsage,
916 message: MessageValue<Response>
917 ): void {
a4e07f72 918 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
919 if (
920 workerTaskStatistics.executing != null &&
921 workerTaskStatistics.executing > 0
922 ) {
923 --workerTaskStatistics.executing
5bb5be17 924 }
98e72cda
JB
925 if (message.taskError == null) {
926 ++workerTaskStatistics.executed
927 } else {
a4e07f72 928 ++workerTaskStatistics.failed
2740a743 929 }
f8eb0a2a
JB
930 }
931
a4e07f72
JB
932 private updateRunTimeWorkerUsage (
933 workerUsage: WorkerUsage,
f8eb0a2a
JB
934 message: MessageValue<Response>
935 ): void {
e4f20deb
JB
936 updateMeasurementStatistics(
937 workerUsage.runTime,
938 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
939 message.taskPerformance?.runTime ?? 0,
940 workerUsage.tasks.executed
941 )
f8eb0a2a
JB
942 }
943
a4e07f72
JB
944 private updateWaitTimeWorkerUsage (
945 workerUsage: WorkerUsage,
1c6fe997 946 task: Task<Data>
f8eb0a2a 947 ): void {
1c6fe997
JB
948 const timestamp = performance.now()
949 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
950 updateMeasurementStatistics(
951 workerUsage.waitTime,
952 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
953 taskWaitTime,
954 workerUsage.tasks.executed
955 )
c01733f1 956 }
957
a4e07f72 958 private updateEluWorkerUsage (
5df69fab 959 workerUsage: WorkerUsage,
62c15a68
JB
960 message: MessageValue<Response>
961 ): void {
008512c7
JB
962 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
963 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
964 updateMeasurementStatistics(
965 workerUsage.elu.active,
008512c7 966 eluTaskStatisticsRequirements,
e4f20deb
JB
967 message.taskPerformance?.elu?.active ?? 0,
968 workerUsage.tasks.executed
969 )
970 updateMeasurementStatistics(
971 workerUsage.elu.idle,
008512c7 972 eluTaskStatisticsRequirements,
e4f20deb
JB
973 message.taskPerformance?.elu?.idle ?? 0,
974 workerUsage.tasks.executed
975 )
008512c7 976 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 977 if (message.taskPerformance?.elu != null) {
f7510105
JB
978 if (workerUsage.elu.utilization != null) {
979 workerUsage.elu.utilization =
980 (workerUsage.elu.utilization +
981 message.taskPerformance.elu.utilization) /
982 2
983 } else {
984 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
985 }
62c15a68
JB
986 }
987 }
988 }
989
280c2a77 990 /**
f06e48d8 991 * Chooses a worker node for the next task.
280c2a77 992 *
6c6afb84 993 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 994 *
aa9eede8 995 * @returns The chosen worker node key
280c2a77 996 */
6c6afb84 997 private chooseWorkerNode (): number {
930dcf12 998 if (this.shallCreateDynamicWorker()) {
aa9eede8 999 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1000 if (
b1aae695 1001 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1002 ) {
aa9eede8 1003 return workerNodeKey
6c6afb84 1004 }
17393ac8 1005 }
930dcf12
JB
1006 return this.workerChoiceStrategyContext.execute()
1007 }
1008
6c6afb84
JB
1009 /**
1010 * Conditions for dynamic worker creation.
1011 *
1012 * @returns Whether to create a dynamic worker or not.
1013 */
1014 private shallCreateDynamicWorker (): boolean {
930dcf12 1015 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1016 }
1017
280c2a77 1018 /**
aa9eede8 1019 * Sends a message to worker given its worker node key.
280c2a77 1020 *
aa9eede8 1021 * @param workerNodeKey - The worker node key.
38e795c1 1022 * @param message - The message.
7d91a8cd 1023 * @param transferList - The optional array of transferable objects.
280c2a77
S
1024 */
1025 protected abstract sendToWorker (
aa9eede8 1026 workerNodeKey: number,
7d91a8cd
JB
1027 message: MessageValue<Data>,
1028 transferList?: TransferListItem[]
280c2a77
S
1029 ): void
1030
729c563d 1031 /**
41344292 1032 * Creates a new worker.
6c6afb84
JB
1033 *
1034 * @returns Newly created worker.
729c563d 1035 */
280c2a77 1036 protected abstract createWorker (): Worker
c97c7edb 1037
4a6952ff 1038 /**
aa9eede8 1039 * Creates a new, completely set up worker node.
4a6952ff 1040 *
aa9eede8 1041 * @returns New, completely set up worker node key.
4a6952ff 1042 */
aa9eede8 1043 protected createAndSetupWorkerNode (): number {
bdacc2d2 1044 const worker = this.createWorker()
280c2a77 1045
fd04474e 1046 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1047 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1048 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1049 worker.on('error', (error) => {
aad6fb64 1050 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1051 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1052 workerInfo.ready = false
0dc838e3 1053 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1054 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1055 if (
1056 this.opts.restartWorkerOnError === true &&
1057 !this.starting &&
1058 this.started
1059 ) {
9b106837 1060 if (workerInfo.dynamic) {
aa9eede8 1061 this.createAndSetupDynamicWorkerNode()
8a1260a3 1062 } else {
aa9eede8 1063 this.createAndSetupWorkerNode()
8a1260a3 1064 }
5baee0d7 1065 }
19dbc45b 1066 if (this.opts.enableTasksQueue === true) {
9b106837 1067 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1068 }
5baee0d7 1069 })
a35560ba 1070 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1071 worker.once('exit', () => {
f06e48d8 1072 this.removeWorkerNode(worker)
a974afa6 1073 })
280c2a77 1074
aa9eede8 1075 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1076
aa9eede8 1077 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1078
aa9eede8 1079 return workerNodeKey
c97c7edb 1080 }
be0676b3 1081
930dcf12 1082 /**
aa9eede8 1083 * Creates a new, completely set up dynamic worker node.
930dcf12 1084 *
aa9eede8 1085 * @returns New, completely set up dynamic worker node key.
930dcf12 1086 */
aa9eede8
JB
1087 protected createAndSetupDynamicWorkerNode (): number {
1088 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1089 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1090 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1091 message.workerId
aad6fb64 1092 )
aa9eede8 1093 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1094 // Kill message received from worker
930dcf12
JB
1095 if (
1096 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1097 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1098 ((this.opts.enableTasksQueue === false &&
aa9eede8 1099 workerUsage.tasks.executing === 0) ||
7b56f532 1100 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1101 workerUsage.tasks.executing === 0 &&
1102 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1103 ) {
5270d253
JB
1104 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1105 this.emitter?.emit(PoolEvents.error, error)
1106 })
930dcf12
JB
1107 }
1108 })
94407def 1109 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1110 this.sendToWorker(workerNodeKey, {
b0a4db63 1111 checkActive: true,
21f710aa
JB
1112 workerId: workerInfo.id as number
1113 })
b5e113f6 1114 workerInfo.dynamic = true
b1aae695
JB
1115 if (
1116 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1117 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1118 ) {
b5e113f6
JB
1119 workerInfo.ready = true
1120 }
33e6bb4c 1121 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1122 return workerNodeKey
930dcf12
JB
1123 }
1124
a2ed5053 1125 /**
aa9eede8 1126 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1127 *
aa9eede8 1128 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1129 * @param listener - The message listener callback.
1130 */
85aeb3f3
JB
1131 protected abstract registerWorkerMessageListener<
1132 Message extends Data | Response
aa9eede8
JB
1133 >(
1134 workerNodeKey: number,
1135 listener: (message: MessageValue<Message>) => void
1136 ): void
a2ed5053
JB
1137
1138 /**
aa9eede8 1139 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1140 * Can be overridden.
1141 *
aa9eede8 1142 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1143 */
aa9eede8 1144 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1145 // Listen to worker messages.
aa9eede8 1146 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1147 // Send the startup message to worker.
aa9eede8 1148 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1149 // Send the statistics message to worker.
1150 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86
JB
1151 if (this.opts.enableTasksQueue === true) {
1152 this.workerNodes[workerNodeKey].onBackPressure =
1153 this.tasksStealingOnBackPressure.bind(this)
1154 }
d2c73f82
JB
1155 }
1156
85aeb3f3 1157 /**
aa9eede8
JB
1158 * Sends the startup message to worker given its worker node key.
1159 *
1160 * @param workerNodeKey - The worker node key.
1161 */
1162 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1163
1164 /**
9edb9717 1165 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1166 *
aa9eede8 1167 * @param workerNodeKey - The worker node key.
85aeb3f3 1168 */
9edb9717 1169 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1170 this.sendToWorker(workerNodeKey, {
1171 statistics: {
1172 runTime:
1173 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1174 .runTime.aggregate,
1175 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1176 .elu.aggregate
1177 },
94407def 1178 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1179 })
1180 }
a2ed5053
JB
1181
1182 private redistributeQueuedTasks (workerNodeKey: number): void {
4de3d785
JB
1183 const workerNodes = this.workerNodes.filter(
1184 (_, workerNodeId) => workerNodeId !== workerNodeKey
1185 )
a2ed5053
JB
1186 while (this.tasksQueueSize(workerNodeKey) > 0) {
1187 let targetWorkerNodeKey: number = workerNodeKey
1188 let minQueuedTasks = Infinity
10ecf8fd 1189 let executeTask = false
4de3d785 1190 for (const [workerNodeId, workerNode] of workerNodes.entries()) {
72695f86 1191 if (
736671a1 1192 workerNode.usage.tasks.executing <
72695f86
JB
1193 (this.opts.tasksQueueOptions?.concurrency as number)
1194 ) {
1195 executeTask = true
1196 }
4de3d785 1197 if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
a2ed5053
JB
1198 targetWorkerNodeKey = workerNodeId
1199 break
1200 }
1201 if (
72695f86 1202 workerNode.info.ready &&
a2ed5053
JB
1203 workerNode.usage.tasks.queued < minQueuedTasks
1204 ) {
1205 minQueuedTasks = workerNode.usage.tasks.queued
1206 targetWorkerNodeKey = workerNodeId
1207 }
1208 }
10ecf8fd
JB
1209 if (executeTask) {
1210 this.executeTask(
1211 targetWorkerNodeKey,
251d6ac2 1212 this.dequeueTask(workerNodeKey) as Task<Data>
10ecf8fd
JB
1213 )
1214 } else {
1215 this.enqueueTask(
1216 targetWorkerNodeKey,
251d6ac2 1217 this.dequeueTask(workerNodeKey) as Task<Data>
72695f86
JB
1218 )
1219 }
1220 }
1221 }
1222
1223 private tasksStealingOnBackPressure (workerId: number): void {
1224 const sourceWorkerNode =
1225 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1226 const workerNodes = this.workerNodes
1227 .filter((workerNode) => workerNode.info.id !== workerId)
1228 .sort(
1229 (workerNodeA, workerNodeB) =>
1230 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1231 )
1232 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1233 if (
1234 workerNode.info.ready &&
1235 sourceWorkerNode.usage.tasks.queued > 0 &&
4de3d785 1236 !workerNode.hasBackPressure()
72695f86 1237 ) {
4de3d785
JB
1238 if (
1239 workerNode.usage.tasks.executing <
72695f86 1240 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785
JB
1241 ) {
1242 this.executeTask(
1243 workerNodeKey,
1244 sourceWorkerNode.popTask() as Task<Data>
1245 )
1246 } else {
1247 this.enqueueTask(
1248 workerNodeKey,
1249 sourceWorkerNode.popTask() as Task<Data>
1250 )
1251 }
10ecf8fd 1252 }
a2ed5053
JB
1253 }
1254 }
1255
be0676b3 1256 /**
aa9eede8 1257 * This method is the listener registered for each worker message.
be0676b3 1258 *
bdacc2d2 1259 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1260 */
1261 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1262 return (message) => {
21f710aa 1263 this.checkMessageWorkerId(message)
a5d15204 1264 if (message.ready != null && message.taskFunctions != null) {
81c02522 1265 // Worker ready response received from worker
10e2aa7e 1266 this.handleWorkerReadyResponse(message)
7629bdf1 1267 } else if (message.taskId != null) {
81c02522 1268 // Task execution response received from worker
6b272951 1269 this.handleTaskExecutionResponse(message)
90d7d101
JB
1270 } else if (message.taskFunctions != null) {
1271 // Task functions message received from worker
94407def
JB
1272 (
1273 this.getWorkerInfo(
1274 this.getWorkerNodeKeyByWorkerId(message.workerId)
1275 ) as WorkerInfo
b558f6b5 1276 ).taskFunctions = message.taskFunctions
6b272951
JB
1277 }
1278 }
1279 }
1280
10e2aa7e 1281 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1282 if (message.ready === false) {
1283 throw new Error(`Worker ${message.workerId} failed to initialize`)
1284 }
a5d15204 1285 const workerInfo = this.getWorkerInfo(
aad6fb64 1286 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1287 ) as WorkerInfo
a5d15204
JB
1288 workerInfo.ready = message.ready as boolean
1289 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1290 if (this.emitter != null && this.ready) {
1291 this.emitter.emit(PoolEvents.ready, this.info)
1292 }
6b272951
JB
1293 }
1294
1295 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1296 const { taskId, taskError, data } = message
1297 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1298 if (promiseResponse != null) {
5441aea6
JB
1299 if (taskError != null) {
1300 this.emitter?.emit(PoolEvents.taskError, taskError)
1301 promiseResponse.reject(taskError.message)
6b272951 1302 } else {
5441aea6 1303 promiseResponse.resolve(data as Response)
6b272951 1304 }
501aea93
JB
1305 const workerNodeKey = promiseResponse.workerNodeKey
1306 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1307 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1308 if (
1309 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1310 this.tasksQueueSize(workerNodeKey) > 0 &&
1311 this.workerNodes[workerNodeKey].usage.tasks.executing <
1312 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1313 ) {
1314 this.executeTask(
1315 workerNodeKey,
1316 this.dequeueTask(workerNodeKey) as Task<Data>
1317 )
be0676b3 1318 }
6b272951 1319 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1320 }
be0676b3 1321 }
7c0ba920 1322
a1763c54 1323 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1324 if (this.busy) {
1325 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1326 }
1327 }
1328
1329 private checkAndEmitTaskQueuingEvents (): void {
1330 if (this.hasBackPressure()) {
1331 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1332 }
1333 }
1334
33e6bb4c
JB
1335 private checkAndEmitDynamicWorkerCreationEvents (): void {
1336 if (this.type === PoolTypes.dynamic) {
1337 if (this.full) {
1338 this.emitter?.emit(PoolEvents.full, this.info)
1339 }
1340 }
1341 }
1342
8a1260a3 1343 /**
aa9eede8 1344 * Gets the worker information given its worker node key.
8a1260a3
JB
1345 *
1346 * @param workerNodeKey - The worker node key.
3f09ed9f 1347 * @returns The worker information.
8a1260a3 1348 */
94407def
JB
1349 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1350 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1351 }
1352
a05c10de 1353 /**
b0a4db63 1354 * Adds the given worker in the pool worker nodes.
ea7a90d3 1355 *
38e795c1 1356 * @param worker - The worker.
aa9eede8
JB
1357 * @returns The added worker node key.
1358 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1359 */
b0a4db63 1360 private addWorkerNode (worker: Worker): number {
671d5154
JB
1361 const workerNode = new WorkerNode<Worker, Data>(
1362 worker,
1363 this.worker,
20c6f652 1364 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
671d5154 1365 )
b97d82d8 1366 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1367 if (this.starting) {
1368 workerNode.info.ready = true
1369 }
aa9eede8 1370 this.workerNodes.push(workerNode)
aad6fb64 1371 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1372 if (workerNodeKey === -1) {
a5d15204 1373 throw new Error('Worker node added not found')
aa9eede8
JB
1374 }
1375 return workerNodeKey
ea7a90d3 1376 }
c923ce56 1377
51fe3d3c 1378 /**
f06e48d8 1379 * Removes the given worker from the pool worker nodes.
51fe3d3c 1380 *
f06e48d8 1381 * @param worker - The worker.
51fe3d3c 1382 */
416fd65c 1383 private removeWorkerNode (worker: Worker): void {
aad6fb64 1384 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1385 if (workerNodeKey !== -1) {
1386 this.workerNodes.splice(workerNodeKey, 1)
1387 this.workerChoiceStrategyContext.remove(workerNodeKey)
1388 }
51fe3d3c 1389 }
adc3c320 1390
e2b31e32
JB
1391 /** @inheritDoc */
1392 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1393 return (
e2b31e32
JB
1394 this.opts.enableTasksQueue === true &&
1395 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1396 )
1397 }
1398
1399 private hasBackPressure (): boolean {
1400 return (
1401 this.opts.enableTasksQueue === true &&
1402 this.workerNodes.findIndex(
1403 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1404 ) === -1
9e844245 1405 )
e2b31e32
JB
1406 }
1407
b0a4db63 1408 /**
aa9eede8 1409 * Executes the given task on the worker given its worker node key.
b0a4db63 1410 *
aa9eede8 1411 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1412 * @param task - The task to execute.
1413 */
2e81254d 1414 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1415 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1416 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1417 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1418 }
1419
f9f00b5f 1420 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1421 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1422 this.checkAndEmitTaskQueuingEvents()
1423 return tasksQueueSize
adc3c320
JB
1424 }
1425
416fd65c 1426 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1427 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1428 }
1429
416fd65c 1430 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1431 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1432 }
1433
81c02522 1434 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1435 while (this.tasksQueueSize(workerNodeKey) > 0) {
1436 this.executeTask(
1437 workerNodeKey,
1438 this.dequeueTask(workerNodeKey) as Task<Data>
1439 )
ff733df7 1440 }
4b628b48 1441 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1442 }
1443
ef41a6e6
JB
1444 private flushTasksQueues (): void {
1445 for (const [workerNodeKey] of this.workerNodes.entries()) {
1446 this.flushTasksQueue(workerNodeKey)
1447 }
1448 }
c97c7edb 1449}