refactor: cleanup circular array UT
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
dc021bcc 15 average,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
afe0d5bf 18 median,
e4f20deb
JB
19 round,
20 updateMeasurementStatistics
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
bbfa38a2
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
23ccf9d7 50
729c563d 51/**
ea7a90d3 52 * Base class that implements some shared logic for all poolifier pools.
729c563d 53 *
38e795c1 54 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 57 */
c97c7edb 58export abstract class AbstractPool<
f06e48d8 59 Worker extends IWorker,
d3c8a1a8
S
60 Data = unknown,
61 Response = unknown
c4855468 62> implements IPool<Worker, Data, Response> {
afc003b2 63 /** @inheritDoc */
4b628b48 64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 65
afc003b2 66 /** @inheritDoc */
7c0ba920
JB
67 public readonly emitter?: PoolEmitter
68
be0676b3 69 /**
52b71763 70 * The task execution response promise map.
be0676b3 71 *
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
075e51d1 94 /**
adc9cc64 95 * Whether the pool is starting or not.
075e51d1
JB
96 */
97 private readonly starting: boolean
15b176e0
JB
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
075e51d1 147 this.starting = true
e761c033 148 this.startPool()
075e51d1 149 this.starting = false
15b176e0 150 this.started = true
afe0d5bf
JB
151
152 this.startTimestamp = performance.now()
c97c7edb
S
153 }
154
a35560ba 155 private checkFilePath (filePath: string): void {
ffcbbad8
JB
156 if (
157 filePath == null ||
3d6dd312 158 typeof filePath !== 'string' ||
ffcbbad8
JB
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
c510fea7
APA
161 throw new Error('Please specify a file with a worker implementation')
162 }
3d6dd312
JB
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
c510fea7
APA
166 }
167
8d3782fa
JB
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
78cea37e 173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 174 throw new TypeError(
0d80593b 175 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
176 )
177 } else if (numberOfWorkers < 0) {
473c717a 178 throw new RangeError(
8d3782fa
JB
179 'Cannot instantiate a pool with a negative number of workers'
180 )
6b27d407 181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 187 if (this.type === PoolTypes.dynamic) {
a5ed75b7 188 if (max == null) {
e695d66f 189 throw new TypeError(
a5ed75b7
JB
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
079de991
JB
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
b97d82d8 200 } else if (max === 0) {
079de991 201 throw new RangeError(
d640b48b 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
8d3782fa
JB
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
49be33fe
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 244 throw new Error(
aee46736 245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
246 )
247 }
7c0ba920
JB
248 }
249
0d80593b
JB
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
8990357d
JB
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
49be33fe
JB
274 if (
275 workerChoiceStrategyOptions.weights != null &&
6b27d407 276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
f0d7f803
JB
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
0d80593b
JB
292 }
293
a20f0ba5 294 private checkValidTasksQueueOptions (
ff3f866a 295 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 296 ): void {
0d80593b
JB
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
f0d7f803
JB
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
20c6f652 305 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
e695d66f 312 throw new RangeError(
20c6f652
JB
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
ff3f866a 318 tasksQueueOptions?.size != null
20c6f652 319 ) {
ff3f866a
JB
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
20c6f652
JB
322 )
323 }
ff3f866a
JB
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
20c6f652 327 if (
ff3f866a
JB
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 330 ) {
ff3f866a
JB
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 336 throw new RangeError(
ff3f866a 337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
338 )
339 }
340 }
341
e761c033
JB
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
aa9eede8 350 this.createAndSetupWorkerNode()
e761c033
JB
351 }
352 }
353
08f3f44c 354 /** @inheritDoc */
6b27d407
JB
355 public get info (): PoolInfo {
356 return {
23ccf9d7 357 version,
6b27d407 358 type: this.type,
184855e6 359 worker: this.worker,
2431bdb4
JB
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
362 minSize: this.minSize,
363 maxSize: this.maxSize,
c05f0d50
JB
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
1305e9a8
JB
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
f59e1027 371 workerNode.usage.tasks.executing === 0
a4e07f72
JB
372 ? accumulator + 1
373 : accumulator,
6b27d407
JB
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
379 0
380 ),
a4e07f72 381 executedTasks: this.workerNodes.reduce(
6b27d407 382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
f59e1027 388 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
389 0
390 ),
daf86646
JB
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
a1763c54
JB
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
a4e07f72
JB
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
f59e1027 410 accumulator + workerNode.usage.tasks.failed,
a4e07f72 411 0
1dcf8b7b
JB
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
98e72cda
JB
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
8ebe6c30 419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 420 )
1dcf8b7b
JB
421 )
422 ),
98e72cda
JB
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
8ebe6c30 426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 427 )
1dcf8b7b 428 )
98e72cda 429 ),
3baa0837
JB
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
98e72cda 439 )
dc021bcc 440 )
3baa0837 441 }),
98e72cda
JB
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.median && {
444 median: round(
445 median(
3baa0837
JB
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
98e72cda
JB
450 )
451 )
452 )
453 })
1dcf8b7b
JB
454 }
455 }),
456 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
457 .waitTime.aggregate && {
458 waitTime: {
98e72cda
JB
459 minimum: round(
460 Math.min(
461 ...this.workerNodes.map(
8ebe6c30 462 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 463 )
1dcf8b7b
JB
464 )
465 ),
98e72cda
JB
466 maximum: round(
467 Math.max(
468 ...this.workerNodes.map(
8ebe6c30 469 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 470 )
1dcf8b7b 471 )
98e72cda 472 ),
3baa0837
JB
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.average && {
475 average: round(
476 average(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
98e72cda 482 )
dc021bcc 483 )
3baa0837 484 }),
98e72cda
JB
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.median && {
487 median: round(
488 median(
3baa0837
JB
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
98e72cda
JB
493 )
494 )
495 )
496 })
1dcf8b7b
JB
497 }
498 })
6b27d407
JB
499 }
500 }
08f3f44c 501
aa9eede8
JB
502 /**
503 * The pool readiness boolean status.
504 */
2431bdb4
JB
505 private get ready (): boolean {
506 return (
b97d82d8
JB
507 this.workerNodes.reduce(
508 (accumulator, workerNode) =>
509 !workerNode.info.dynamic && workerNode.info.ready
510 ? accumulator + 1
511 : accumulator,
512 0
513 ) >= this.minSize
2431bdb4
JB
514 )
515 }
516
afe0d5bf 517 /**
aa9eede8 518 * The approximate pool utilization.
afe0d5bf
JB
519 *
520 * @returns The pool utilization.
521 */
522 private get utilization (): number {
8e5ca040 523 const poolTimeCapacity =
fe7d90db 524 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
525 const totalTasksRunTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
71514351 527 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
528 0
529 )
530 const totalTasksWaitTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
8e5ca040 535 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
536 }
537
8881ae32 538 /**
aa9eede8 539 * The pool type.
8881ae32
JB
540 *
541 * If it is `'dynamic'`, it provides the `max` property.
542 */
543 protected abstract get type (): PoolType
544
184855e6 545 /**
aa9eede8 546 * The worker type.
184855e6
JB
547 */
548 protected abstract get worker (): WorkerType
549
c2ade475 550 /**
aa9eede8 551 * The pool minimum size.
c2ade475 552 */
8735b4e5
JB
553 protected get minSize (): number {
554 return this.numberOfWorkers
555 }
ff733df7
JB
556
557 /**
aa9eede8 558 * The pool maximum size.
ff733df7 559 */
8735b4e5
JB
560 protected get maxSize (): number {
561 return this.max ?? this.numberOfWorkers
562 }
a35560ba 563
6b813701
JB
564 /**
565 * Checks if the worker id sent in the received message from a worker is valid.
566 *
567 * @param message - The received message.
568 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
569 */
21f710aa 570 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
571 if (message.workerId == null) {
572 throw new Error('Worker message received without worker id')
573 } else if (
21f710aa 574 message.workerId != null &&
aad6fb64 575 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
576 ) {
577 throw new Error(
578 `Worker message received from unknown worker '${message.workerId}'`
579 )
580 }
581 }
582
ffcbbad8 583 /**
f06e48d8 584 * Gets the given worker its worker node key.
ffcbbad8
JB
585 *
586 * @param worker - The worker.
f59e1027 587 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 588 */
aad6fb64 589 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 590 return this.workerNodes.findIndex(
8ebe6c30 591 (workerNode) => workerNode.worker === worker
f06e48d8 592 )
bf9549ae
JB
593 }
594
aa9eede8
JB
595 /**
596 * Gets the worker node key given its worker id.
597 *
598 * @param workerId - The worker id.
aad6fb64 599 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 600 */
aad6fb64
JB
601 private getWorkerNodeKeyByWorkerId (workerId: number): number {
602 return this.workerNodes.findIndex(
8ebe6c30 603 (workerNode) => workerNode.info.id === workerId
aad6fb64 604 )
aa9eede8
JB
605 }
606
afc003b2 607 /** @inheritDoc */
a35560ba 608 public setWorkerChoiceStrategy (
59219cbb
JB
609 workerChoiceStrategy: WorkerChoiceStrategy,
610 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 611 ): void {
aee46736 612 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 613 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
614 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
615 this.opts.workerChoiceStrategy
616 )
617 if (workerChoiceStrategyOptions != null) {
618 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 }
aa9eede8 620 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 621 workerNode.resetUsage()
9edb9717 622 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 623 }
a20f0ba5
JB
624 }
625
626 /** @inheritDoc */
627 public setWorkerChoiceStrategyOptions (
628 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
629 ): void {
0d80593b 630 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
631 this.opts.workerChoiceStrategyOptions = {
632 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
633 ...workerChoiceStrategyOptions
634 }
a20f0ba5
JB
635 this.workerChoiceStrategyContext.setOptions(
636 this.opts.workerChoiceStrategyOptions
a35560ba
S
637 )
638 }
639
a20f0ba5 640 /** @inheritDoc */
8f52842f
JB
641 public enableTasksQueue (
642 enable: boolean,
643 tasksQueueOptions?: TasksQueueOptions
644 ): void {
a20f0ba5 645 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 646 this.flushTasksQueues()
a20f0ba5
JB
647 }
648 this.opts.enableTasksQueue = enable
8f52842f 649 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
650 }
651
652 /** @inheritDoc */
8f52842f 653 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 654 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
655 this.checkValidTasksQueueOptions(tasksQueueOptions)
656 this.opts.tasksQueueOptions =
657 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 658 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 659 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
660 delete this.opts.tasksQueueOptions
661 }
662 }
663
ff3f866a 664 private setTasksQueueMaxSize (size: number): void {
20c6f652 665 for (const workerNode of this.workerNodes) {
ff3f866a 666 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
667 }
668 }
669
a20f0ba5
JB
670 private buildTasksQueueOptions (
671 tasksQueueOptions: TasksQueueOptions
672 ): TasksQueueOptions {
673 return {
20c6f652 674 ...{
ff3f866a 675 size: Math.pow(this.maxSize, 2),
20c6f652
JB
676 concurrency: 1
677 },
678 ...tasksQueueOptions
a20f0ba5
JB
679 }
680 }
681
c319c66b
JB
682 /**
683 * Whether the pool is full or not.
684 *
685 * The pool filling boolean status.
686 */
dea903a8
JB
687 protected get full (): boolean {
688 return this.workerNodes.length >= this.maxSize
689 }
c2ade475 690
c319c66b
JB
691 /**
692 * Whether the pool is busy or not.
693 *
694 * The pool busyness boolean status.
695 */
696 protected abstract get busy (): boolean
7c0ba920 697
6c6afb84 698 /**
3d76750a 699 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
700 *
701 * @returns Worker nodes busyness boolean status.
702 */
c2ade475 703 protected internalBusy (): boolean {
3d76750a
JB
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes.findIndex(
8ebe6c30 707 (workerNode) =>
3d76750a
JB
708 workerNode.info.ready &&
709 workerNode.usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number)
711 ) === -1
712 )
713 } else {
714 return (
715 this.workerNodes.findIndex(
8ebe6c30 716 (workerNode) =>
3d76750a
JB
717 workerNode.info.ready && workerNode.usage.tasks.executing === 0
718 ) === -1
719 )
720 }
cb70b19d
JB
721 }
722
90d7d101
JB
723 /** @inheritDoc */
724 public listTaskFunctions (): string[] {
f2dbbf95
JB
725 for (const workerNode of this.workerNodes) {
726 if (
727 Array.isArray(workerNode.info.taskFunctions) &&
728 workerNode.info.taskFunctions.length > 0
729 ) {
730 return workerNode.info.taskFunctions
731 }
90d7d101 732 }
f2dbbf95 733 return []
90d7d101
JB
734 }
735
afc003b2 736 /** @inheritDoc */
7d91a8cd
JB
737 public async execute (
738 data?: Data,
739 name?: string,
740 transferList?: TransferListItem[]
741 ): Promise<Response> {
52b71763 742 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
743 if (!this.started) {
744 reject(new Error('Cannot execute a task on destroyed pool'))
745 }
7d91a8cd
JB
746 if (name != null && typeof name !== 'string') {
747 reject(new TypeError('name argument must be a string'))
748 }
90d7d101
JB
749 if (
750 name != null &&
751 typeof name === 'string' &&
752 name.trim().length === 0
753 ) {
f58b60b9 754 reject(new TypeError('name argument must not be an empty string'))
90d7d101 755 }
b558f6b5
JB
756 if (transferList != null && !Array.isArray(transferList)) {
757 reject(new TypeError('transferList argument must be an array'))
758 }
759 const timestamp = performance.now()
760 const workerNodeKey = this.chooseWorkerNode()
94407def 761 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
762 if (
763 name != null &&
a5d15204
JB
764 Array.isArray(workerInfo.taskFunctions) &&
765 !workerInfo.taskFunctions.includes(name)
cea399c8 766 ) {
90d7d101
JB
767 reject(
768 new Error(`Task function '${name}' is not registered in the pool`)
769 )
770 }
501aea93 771 const task: Task<Data> = {
52b71763
JB
772 name: name ?? DEFAULT_TASK_NAME,
773 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
774 data: data ?? ({} as Data),
7d91a8cd 775 transferList,
52b71763 776 timestamp,
a5d15204 777 workerId: workerInfo.id as number,
7629bdf1 778 taskId: randomUUID()
52b71763 779 }
7629bdf1 780 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
781 resolve,
782 reject,
501aea93 783 workerNodeKey
2e81254d 784 })
52b71763 785 if (
4e377863
JB
786 this.opts.enableTasksQueue === false ||
787 (this.opts.enableTasksQueue === true &&
033f1776 788 this.tasksQueueSize(workerNodeKey) === 0 &&
4e377863 789 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 790 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 791 ) {
501aea93 792 this.executeTask(workerNodeKey, task)
4e377863
JB
793 } else {
794 this.enqueueTask(workerNodeKey, task)
52b71763 795 }
2e81254d 796 })
280c2a77 797 }
c97c7edb 798
afc003b2 799 /** @inheritDoc */
c97c7edb 800 public async destroy (): Promise<void> {
1fbcaa7c 801 await Promise.all(
81c02522 802 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 803 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
804 })
805 )
33e6bb4c 806 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 807 this.started = false
c97c7edb
S
808 }
809
1e3214b6
JB
810 protected async sendKillMessageToWorker (
811 workerNodeKey: number,
812 workerId: number
813 ): Promise<void> {
9edb9717 814 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
815 this.registerWorkerMessageListener(workerNodeKey, (message) => {
816 if (message.kill === 'success') {
817 resolve()
818 } else if (message.kill === 'failure') {
e1af34e6 819 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
820 }
821 })
9edb9717 822 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 823 })
1e3214b6
JB
824 }
825
4a6952ff 826 /**
aa9eede8 827 * Terminates the worker node given its worker node key.
4a6952ff 828 *
aa9eede8 829 * @param workerNodeKey - The worker node key.
4a6952ff 830 */
81c02522 831 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 832
729c563d 833 /**
6677a3d3
JB
834 * Setup hook to execute code before worker nodes are created in the abstract constructor.
835 * Can be overridden.
afc003b2
JB
836 *
837 * @virtual
729c563d 838 */
280c2a77 839 protected setupHook (): void {
033f1776 840 /** Intentionally empty */
280c2a77 841 }
c97c7edb 842
729c563d 843 /**
280c2a77
S
844 * Should return whether the worker is the main worker or not.
845 */
846 protected abstract isMain (): boolean
847
848 /**
2e81254d 849 * Hook executed before the worker task execution.
bf9549ae 850 * Can be overridden.
729c563d 851 *
f06e48d8 852 * @param workerNodeKey - The worker node key.
1c6fe997 853 * @param task - The task to execute.
729c563d 854 */
1c6fe997
JB
855 protected beforeTaskExecutionHook (
856 workerNodeKey: number,
857 task: Task<Data>
858 ): void {
94407def
JB
859 if (this.workerNodes[workerNodeKey]?.usage != null) {
860 const workerUsage = this.workerNodes[workerNodeKey].usage
861 ++workerUsage.tasks.executing
862 this.updateWaitTimeWorkerUsage(workerUsage, task)
863 }
864 if (
865 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
866 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
867 task.name as string
868 ) != null
869 ) {
db0e38ee 870 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 871 workerNodeKey
db0e38ee 872 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
873 ++taskFunctionWorkerUsage.tasks.executing
874 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 875 }
c97c7edb
S
876 }
877
c01733f1 878 /**
2e81254d 879 * Hook executed after the worker task execution.
bf9549ae 880 * Can be overridden.
c01733f1 881 *
501aea93 882 * @param workerNodeKey - The worker node key.
38e795c1 883 * @param message - The received message.
c01733f1 884 */
2e81254d 885 protected afterTaskExecutionHook (
501aea93 886 workerNodeKey: number,
2740a743 887 message: MessageValue<Response>
bf9549ae 888 ): void {
94407def
JB
889 if (this.workerNodes[workerNodeKey]?.usage != null) {
890 const workerUsage = this.workerNodes[workerNodeKey].usage
891 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
892 this.updateRunTimeWorkerUsage(workerUsage, message)
893 this.updateEluWorkerUsage(workerUsage, message)
894 }
895 if (
896 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
897 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 898 message.taskPerformance?.name as string
94407def
JB
899 ) != null
900 ) {
db0e38ee 901 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 902 workerNodeKey
db0e38ee 903 ].getTaskFunctionWorkerUsage(
0628755c 904 message.taskPerformance?.name as string
b558f6b5 905 ) as WorkerUsage
db0e38ee
JB
906 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
907 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
908 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
909 }
910 }
911
db0e38ee
JB
912 /**
913 * Whether the worker node shall update its task function worker usage or not.
914 *
915 * @param workerNodeKey - The worker node key.
916 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
917 */
918 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 919 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 920 return (
94407def 921 workerInfo != null &&
a5d15204 922 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 923 workerInfo.taskFunctions.length > 2
b558f6b5 924 )
f1c06930
JB
925 }
926
927 private updateTaskStatisticsWorkerUsage (
928 workerUsage: WorkerUsage,
929 message: MessageValue<Response>
930 ): void {
a4e07f72 931 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
932 if (
933 workerTaskStatistics.executing != null &&
934 workerTaskStatistics.executing > 0
935 ) {
936 --workerTaskStatistics.executing
5bb5be17 937 }
98e72cda
JB
938 if (message.taskError == null) {
939 ++workerTaskStatistics.executed
940 } else {
a4e07f72 941 ++workerTaskStatistics.failed
2740a743 942 }
f8eb0a2a
JB
943 }
944
a4e07f72
JB
945 private updateRunTimeWorkerUsage (
946 workerUsage: WorkerUsage,
f8eb0a2a
JB
947 message: MessageValue<Response>
948 ): void {
dc021bcc
JB
949 if (message.taskError != null) {
950 return
951 }
e4f20deb
JB
952 updateMeasurementStatistics(
953 workerUsage.runTime,
954 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 955 message.taskPerformance?.runTime ?? 0
e4f20deb 956 )
f8eb0a2a
JB
957 }
958
a4e07f72
JB
959 private updateWaitTimeWorkerUsage (
960 workerUsage: WorkerUsage,
1c6fe997 961 task: Task<Data>
f8eb0a2a 962 ): void {
1c6fe997
JB
963 const timestamp = performance.now()
964 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
965 updateMeasurementStatistics(
966 workerUsage.waitTime,
967 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 968 taskWaitTime
e4f20deb 969 )
c01733f1 970 }
971
a4e07f72 972 private updateEluWorkerUsage (
5df69fab 973 workerUsage: WorkerUsage,
62c15a68
JB
974 message: MessageValue<Response>
975 ): void {
dc021bcc
JB
976 if (message.taskError != null) {
977 return
978 }
008512c7
JB
979 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
980 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
981 updateMeasurementStatistics(
982 workerUsage.elu.active,
008512c7 983 eluTaskStatisticsRequirements,
dc021bcc 984 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
985 )
986 updateMeasurementStatistics(
987 workerUsage.elu.idle,
008512c7 988 eluTaskStatisticsRequirements,
dc021bcc 989 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 990 )
008512c7 991 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 992 if (message.taskPerformance?.elu != null) {
f7510105
JB
993 if (workerUsage.elu.utilization != null) {
994 workerUsage.elu.utilization =
995 (workerUsage.elu.utilization +
996 message.taskPerformance.elu.utilization) /
997 2
998 } else {
999 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1000 }
62c15a68
JB
1001 }
1002 }
1003 }
1004
280c2a77 1005 /**
f06e48d8 1006 * Chooses a worker node for the next task.
280c2a77 1007 *
6c6afb84 1008 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1009 *
aa9eede8 1010 * @returns The chosen worker node key
280c2a77 1011 */
6c6afb84 1012 private chooseWorkerNode (): number {
930dcf12 1013 if (this.shallCreateDynamicWorker()) {
aa9eede8 1014 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1015 if (
b1aae695 1016 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1017 ) {
aa9eede8 1018 return workerNodeKey
6c6afb84 1019 }
17393ac8 1020 }
930dcf12
JB
1021 return this.workerChoiceStrategyContext.execute()
1022 }
1023
6c6afb84
JB
1024 /**
1025 * Conditions for dynamic worker creation.
1026 *
1027 * @returns Whether to create a dynamic worker or not.
1028 */
1029 private shallCreateDynamicWorker (): boolean {
930dcf12 1030 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1031 }
1032
280c2a77 1033 /**
aa9eede8 1034 * Sends a message to worker given its worker node key.
280c2a77 1035 *
aa9eede8 1036 * @param workerNodeKey - The worker node key.
38e795c1 1037 * @param message - The message.
7d91a8cd 1038 * @param transferList - The optional array of transferable objects.
280c2a77
S
1039 */
1040 protected abstract sendToWorker (
aa9eede8 1041 workerNodeKey: number,
7d91a8cd
JB
1042 message: MessageValue<Data>,
1043 transferList?: TransferListItem[]
280c2a77
S
1044 ): void
1045
729c563d 1046 /**
41344292 1047 * Creates a new worker.
6c6afb84
JB
1048 *
1049 * @returns Newly created worker.
729c563d 1050 */
280c2a77 1051 protected abstract createWorker (): Worker
c97c7edb 1052
4a6952ff 1053 /**
aa9eede8 1054 * Creates a new, completely set up worker node.
4a6952ff 1055 *
aa9eede8 1056 * @returns New, completely set up worker node key.
4a6952ff 1057 */
aa9eede8 1058 protected createAndSetupWorkerNode (): number {
bdacc2d2 1059 const worker = this.createWorker()
280c2a77 1060
fd04474e 1061 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1062 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1063 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1064 worker.on('error', (error) => {
aad6fb64 1065 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1066 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1067 workerInfo.ready = false
0dc838e3 1068 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1069 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1070 if (
1071 this.opts.restartWorkerOnError === true &&
1072 !this.starting &&
1073 this.started
1074 ) {
9b106837 1075 if (workerInfo.dynamic) {
aa9eede8 1076 this.createAndSetupDynamicWorkerNode()
8a1260a3 1077 } else {
aa9eede8 1078 this.createAndSetupWorkerNode()
8a1260a3 1079 }
5baee0d7 1080 }
19dbc45b 1081 if (this.opts.enableTasksQueue === true) {
9b106837 1082 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1083 }
5baee0d7 1084 })
a35560ba 1085 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1086 worker.once('exit', () => {
f06e48d8 1087 this.removeWorkerNode(worker)
a974afa6 1088 })
280c2a77 1089
aa9eede8 1090 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1091
aa9eede8 1092 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1093
aa9eede8 1094 return workerNodeKey
c97c7edb 1095 }
be0676b3 1096
930dcf12 1097 /**
aa9eede8 1098 * Creates a new, completely set up dynamic worker node.
930dcf12 1099 *
aa9eede8 1100 * @returns New, completely set up dynamic worker node key.
930dcf12 1101 */
aa9eede8
JB
1102 protected createAndSetupDynamicWorkerNode (): number {
1103 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1104 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1105 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1106 message.workerId
aad6fb64 1107 )
aa9eede8 1108 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1109 // Kill message received from worker
930dcf12
JB
1110 if (
1111 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1112 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1113 ((this.opts.enableTasksQueue === false &&
aa9eede8 1114 workerUsage.tasks.executing === 0) ||
7b56f532 1115 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1116 workerUsage.tasks.executing === 0 &&
1117 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1118 ) {
5270d253
JB
1119 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1120 this.emitter?.emit(PoolEvents.error, error)
1121 })
930dcf12
JB
1122 }
1123 })
94407def 1124 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1125 this.sendToWorker(workerNodeKey, {
b0a4db63 1126 checkActive: true,
21f710aa
JB
1127 workerId: workerInfo.id as number
1128 })
b5e113f6 1129 workerInfo.dynamic = true
b1aae695
JB
1130 if (
1131 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1132 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1133 ) {
b5e113f6
JB
1134 workerInfo.ready = true
1135 }
33e6bb4c 1136 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1137 return workerNodeKey
930dcf12
JB
1138 }
1139
a2ed5053 1140 /**
aa9eede8 1141 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1142 *
aa9eede8 1143 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1144 * @param listener - The message listener callback.
1145 */
85aeb3f3
JB
1146 protected abstract registerWorkerMessageListener<
1147 Message extends Data | Response
aa9eede8
JB
1148 >(
1149 workerNodeKey: number,
1150 listener: (message: MessageValue<Message>) => void
1151 ): void
a2ed5053
JB
1152
1153 /**
aa9eede8 1154 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1155 * Can be overridden.
1156 *
aa9eede8 1157 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1158 */
aa9eede8 1159 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1160 // Listen to worker messages.
aa9eede8 1161 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1162 // Send the startup message to worker.
aa9eede8 1163 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1164 // Send the statistics message to worker.
1165 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1166 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1167 this.workerNodes[workerNodeKey].onEmptyQueue =
1168 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1169 this.workerNodes[workerNodeKey].onBackPressure =
1170 this.tasksStealingOnBackPressure.bind(this)
1171 }
d2c73f82
JB
1172 }
1173
85aeb3f3 1174 /**
aa9eede8
JB
1175 * Sends the startup message to worker given its worker node key.
1176 *
1177 * @param workerNodeKey - The worker node key.
1178 */
1179 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1180
1181 /**
9edb9717 1182 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1183 *
aa9eede8 1184 * @param workerNodeKey - The worker node key.
85aeb3f3 1185 */
9edb9717 1186 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1187 this.sendToWorker(workerNodeKey, {
1188 statistics: {
1189 runTime:
1190 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1191 .runTime.aggregate,
1192 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1193 .elu.aggregate
1194 },
94407def 1195 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1196 })
1197 }
a2ed5053
JB
1198
1199 private redistributeQueuedTasks (workerNodeKey: number): void {
1200 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1201 let destinationWorkerNodeKey!: number
a2ed5053 1202 let minQueuedTasks = Infinity
a6b3272b 1203 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3 1204 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
58baffd3
JB
1205 if (workerNode.usage.tasks.queued === 0) {
1206 destinationWorkerNodeKey = workerNodeId
1207 break
1208 }
1209 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1210 minQueuedTasks = workerNode.usage.tasks.queued
1211 destinationWorkerNodeKey = workerNodeId
1212 }
a2ed5053
JB
1213 }
1214 }
0bc68267 1215 if (destinationWorkerNodeKey != null) {
033f1776 1216 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1217 const task = {
1218 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1219 workerId: destinationWorkerNode.info.id as number
0bc68267 1220 }
033f1776
JB
1221 if (
1222 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1223 destinationWorkerNode.usage.tasks.executing <
1224 (this.opts.tasksQueueOptions?.concurrency as number)
1225 ) {
0bc68267
JB
1226 this.executeTask(destinationWorkerNodeKey, task)
1227 } else {
1228 this.enqueueTask(destinationWorkerNodeKey, task)
1229 }
dd951876
JB
1230 }
1231 }
1232 }
1233
1234 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1235 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1236 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1237 const workerNodes = this.workerNodes
a6b3272b 1238 .slice()
dd951876
JB
1239 .sort(
1240 (workerNodeA, workerNodeB) =>
1241 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1242 )
dd951876 1243 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1244 if (sourceWorkerNode.usage.tasks.queued === 0) {
1245 break
1246 }
a6b3272b
JB
1247 if (
1248 sourceWorkerNode.info.ready &&
1249 sourceWorkerNode.info.id !== workerId &&
1250 sourceWorkerNode.usage.tasks.queued > 0
1251 ) {
1252 const task = {
1253 ...(sourceWorkerNode.popTask() as Task<Data>),
1254 workerId: destinationWorkerNode.info.id as number
1255 }
dd951876 1256 if (
2cb4ff45 1257 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
a6b3272b 1258 destinationWorkerNode.usage.tasks.executing <
033f1776 1259 (this.opts.tasksQueueOptions?.concurrency as number)
dd951876 1260 ) {
2cb4ff45
JB
1261 this.executeTask(destinationWorkerNodeKey, task)
1262 } else {
1263 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1264 }
1265 break
72695f86
JB
1266 }
1267 }
1268 }
1269
1270 private tasksStealingOnBackPressure (workerId: number): void {
1271 const sourceWorkerNode =
1272 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1273 const workerNodes = this.workerNodes
a6b3272b 1274 .slice()
72695f86
JB
1275 .sort(
1276 (workerNodeA, workerNodeB) =>
1277 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1278 )
1279 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1280 if (
0bc68267 1281 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1282 workerNode.info.ready &&
1283 workerNode.info.id !== workerId &&
0bc68267
JB
1284 workerNode.usage.tasks.queued <
1285 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1286 ) {
dd951876
JB
1287 const task = {
1288 ...(sourceWorkerNode.popTask() as Task<Data>),
1289 workerId: workerNode.info.id as number
1290 }
4de3d785 1291 if (
033f1776 1292 this.tasksQueueSize(workerNodeKey) === 0 &&
4de3d785 1293 workerNode.usage.tasks.executing <
033f1776 1294 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785 1295 ) {
dd951876 1296 this.executeTask(workerNodeKey, task)
4de3d785 1297 } else {
dd951876 1298 this.enqueueTask(workerNodeKey, task)
4de3d785 1299 }
10ecf8fd 1300 }
a2ed5053
JB
1301 }
1302 }
1303
be0676b3 1304 /**
aa9eede8 1305 * This method is the listener registered for each worker message.
be0676b3 1306 *
bdacc2d2 1307 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1308 */
1309 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1310 return (message) => {
21f710aa 1311 this.checkMessageWorkerId(message)
a5d15204 1312 if (message.ready != null && message.taskFunctions != null) {
81c02522 1313 // Worker ready response received from worker
10e2aa7e 1314 this.handleWorkerReadyResponse(message)
7629bdf1 1315 } else if (message.taskId != null) {
81c02522 1316 // Task execution response received from worker
6b272951 1317 this.handleTaskExecutionResponse(message)
90d7d101
JB
1318 } else if (message.taskFunctions != null) {
1319 // Task functions message received from worker
94407def
JB
1320 (
1321 this.getWorkerInfo(
1322 this.getWorkerNodeKeyByWorkerId(message.workerId)
1323 ) as WorkerInfo
b558f6b5 1324 ).taskFunctions = message.taskFunctions
6b272951
JB
1325 }
1326 }
1327 }
1328
10e2aa7e 1329 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1330 if (message.ready === false) {
1331 throw new Error(`Worker ${message.workerId} failed to initialize`)
1332 }
a5d15204 1333 const workerInfo = this.getWorkerInfo(
aad6fb64 1334 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1335 ) as WorkerInfo
a5d15204
JB
1336 workerInfo.ready = message.ready as boolean
1337 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1338 if (this.emitter != null && this.ready) {
1339 this.emitter.emit(PoolEvents.ready, this.info)
1340 }
6b272951
JB
1341 }
1342
1343 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1344 const { taskId, taskError, data } = message
1345 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1346 if (promiseResponse != null) {
5441aea6
JB
1347 if (taskError != null) {
1348 this.emitter?.emit(PoolEvents.taskError, taskError)
1349 promiseResponse.reject(taskError.message)
6b272951 1350 } else {
5441aea6 1351 promiseResponse.resolve(data as Response)
6b272951 1352 }
501aea93
JB
1353 const workerNodeKey = promiseResponse.workerNodeKey
1354 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1355 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1356 if (
1357 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1358 this.tasksQueueSize(workerNodeKey) > 0 &&
1359 this.workerNodes[workerNodeKey].usage.tasks.executing <
1360 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1361 ) {
1362 this.executeTask(
1363 workerNodeKey,
1364 this.dequeueTask(workerNodeKey) as Task<Data>
1365 )
be0676b3 1366 }
6b272951 1367 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1368 }
be0676b3 1369 }
7c0ba920 1370
a1763c54 1371 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1372 if (this.busy) {
1373 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1374 }
1375 }
1376
1377 private checkAndEmitTaskQueuingEvents (): void {
1378 if (this.hasBackPressure()) {
1379 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1380 }
1381 }
1382
33e6bb4c
JB
1383 private checkAndEmitDynamicWorkerCreationEvents (): void {
1384 if (this.type === PoolTypes.dynamic) {
1385 if (this.full) {
1386 this.emitter?.emit(PoolEvents.full, this.info)
1387 }
1388 }
1389 }
1390
8a1260a3 1391 /**
aa9eede8 1392 * Gets the worker information given its worker node key.
8a1260a3
JB
1393 *
1394 * @param workerNodeKey - The worker node key.
3f09ed9f 1395 * @returns The worker information.
8a1260a3 1396 */
94407def
JB
1397 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1398 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1399 }
1400
a05c10de 1401 /**
b0a4db63 1402 * Adds the given worker in the pool worker nodes.
ea7a90d3 1403 *
38e795c1 1404 * @param worker - The worker.
aa9eede8
JB
1405 * @returns The added worker node key.
1406 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1407 */
b0a4db63 1408 private addWorkerNode (worker: Worker): number {
671d5154
JB
1409 const workerNode = new WorkerNode<Worker, Data>(
1410 worker,
1411 this.worker,
ff3f866a 1412 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1413 )
b97d82d8 1414 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1415 if (this.starting) {
1416 workerNode.info.ready = true
1417 }
aa9eede8 1418 this.workerNodes.push(workerNode)
aad6fb64 1419 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1420 if (workerNodeKey === -1) {
a5d15204 1421 throw new Error('Worker node added not found')
aa9eede8
JB
1422 }
1423 return workerNodeKey
ea7a90d3 1424 }
c923ce56 1425
51fe3d3c 1426 /**
f06e48d8 1427 * Removes the given worker from the pool worker nodes.
51fe3d3c 1428 *
f06e48d8 1429 * @param worker - The worker.
51fe3d3c 1430 */
416fd65c 1431 private removeWorkerNode (worker: Worker): void {
aad6fb64 1432 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1433 if (workerNodeKey !== -1) {
1434 this.workerNodes.splice(workerNodeKey, 1)
1435 this.workerChoiceStrategyContext.remove(workerNodeKey)
1436 }
51fe3d3c 1437 }
adc3c320 1438
e2b31e32
JB
1439 /** @inheritDoc */
1440 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1441 return (
e2b31e32
JB
1442 this.opts.enableTasksQueue === true &&
1443 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1444 )
1445 }
1446
1447 private hasBackPressure (): boolean {
1448 return (
1449 this.opts.enableTasksQueue === true &&
1450 this.workerNodes.findIndex(
1451 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1452 ) === -1
9e844245 1453 )
e2b31e32
JB
1454 }
1455
b0a4db63 1456 /**
aa9eede8 1457 * Executes the given task on the worker given its worker node key.
b0a4db63 1458 *
aa9eede8 1459 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1460 * @param task - The task to execute.
1461 */
2e81254d 1462 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1463 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1464 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1465 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1466 }
1467
f9f00b5f 1468 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1469 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1470 this.checkAndEmitTaskQueuingEvents()
1471 return tasksQueueSize
adc3c320
JB
1472 }
1473
416fd65c 1474 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1475 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1476 }
1477
416fd65c 1478 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1479 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1480 }
1481
81c02522 1482 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1483 while (this.tasksQueueSize(workerNodeKey) > 0) {
1484 this.executeTask(
1485 workerNodeKey,
1486 this.dequeueTask(workerNodeKey) as Task<Data>
1487 )
ff733df7 1488 }
4b628b48 1489 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1490 }
1491
ef41a6e6
JB
1492 private flushTasksQueues (): void {
1493 for (const [workerNodeKey] of this.workerNodes.entries()) {
1494 this.flushTasksQueue(workerNodeKey)
1495 }
1496 }
c97c7edb 1497}