docs: add changelog entry
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
dc021bcc 15 average,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
afe0d5bf 18 median,
e4f20deb
JB
19 round,
20 updateMeasurementStatistics
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
bbfa38a2
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
23ccf9d7 50
729c563d 51/**
ea7a90d3 52 * Base class that implements some shared logic for all poolifier pools.
729c563d 53 *
38e795c1 54 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 57 */
c97c7edb 58export abstract class AbstractPool<
f06e48d8 59 Worker extends IWorker,
d3c8a1a8
S
60 Data = unknown,
61 Response = unknown
c4855468 62> implements IPool<Worker, Data, Response> {
afc003b2 63 /** @inheritDoc */
4b628b48 64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 65
afc003b2 66 /** @inheritDoc */
7c0ba920
JB
67 public readonly emitter?: PoolEmitter
68
be0676b3 69 /**
52b71763 70 * The task execution response promise map.
be0676b3 71 *
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
075e51d1 94 /**
adc9cc64 95 * Whether the pool is starting or not.
075e51d1
JB
96 */
97 private readonly starting: boolean
15b176e0
JB
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
075e51d1 147 this.starting = true
e761c033 148 this.startPool()
075e51d1 149 this.starting = false
15b176e0 150 this.started = true
afe0d5bf
JB
151
152 this.startTimestamp = performance.now()
c97c7edb
S
153 }
154
a35560ba 155 private checkFilePath (filePath: string): void {
ffcbbad8
JB
156 if (
157 filePath == null ||
3d6dd312 158 typeof filePath !== 'string' ||
ffcbbad8
JB
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
c510fea7
APA
161 throw new Error('Please specify a file with a worker implementation')
162 }
3d6dd312
JB
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
c510fea7
APA
166 }
167
8d3782fa
JB
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
78cea37e 173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 174 throw new TypeError(
0d80593b 175 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
176 )
177 } else if (numberOfWorkers < 0) {
473c717a 178 throw new RangeError(
8d3782fa
JB
179 'Cannot instantiate a pool with a negative number of workers'
180 )
6b27d407 181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 187 if (this.type === PoolTypes.dynamic) {
a5ed75b7 188 if (max == null) {
e695d66f 189 throw new TypeError(
a5ed75b7
JB
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
079de991
JB
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
b97d82d8 200 } else if (max === 0) {
079de991 201 throw new RangeError(
d640b48b 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
8d3782fa
JB
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
49be33fe
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 244 throw new Error(
aee46736 245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
246 )
247 }
7c0ba920
JB
248 }
249
0d80593b
JB
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
8990357d
JB
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
49be33fe
JB
274 if (
275 workerChoiceStrategyOptions.weights != null &&
6b27d407 276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
f0d7f803
JB
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
0d80593b
JB
292 }
293
a20f0ba5 294 private checkValidTasksQueueOptions (
ff3f866a 295 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 296 ): void {
0d80593b
JB
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
f0d7f803
JB
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
20c6f652 305 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
e695d66f 312 throw new RangeError(
20c6f652
JB
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
68dbcdc0 316 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 317 throw new Error(
68dbcdc0 318 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
319 )
320 }
321 if (
ff3f866a
JB
322 tasksQueueOptions?.size != null &&
323 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 324 ) {
ff3f866a 325 throw new TypeError(
68dbcdc0 326 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
327 )
328 }
329 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 330 throw new RangeError(
68dbcdc0 331 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
332 )
333 }
334 }
335
e761c033
JB
336 private startPool (): void {
337 while (
338 this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
341 0
342 ) < this.numberOfWorkers
343 ) {
aa9eede8 344 this.createAndSetupWorkerNode()
e761c033
JB
345 }
346 }
347
08f3f44c 348 /** @inheritDoc */
6b27d407
JB
349 public get info (): PoolInfo {
350 return {
23ccf9d7 351 version,
6b27d407 352 type: this.type,
184855e6 353 worker: this.worker,
2431bdb4
JB
354 ready: this.ready,
355 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
356 minSize: this.minSize,
357 maxSize: this.maxSize,
c05f0d50
JB
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .runTime.aggregate &&
1305e9a8
JB
360 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
362 workerNodes: this.workerNodes.length,
363 idleWorkerNodes: this.workerNodes.reduce(
364 (accumulator, workerNode) =>
f59e1027 365 workerNode.usage.tasks.executing === 0
a4e07f72
JB
366 ? accumulator + 1
367 : accumulator,
6b27d407
JB
368 0
369 ),
370 busyWorkerNodes: this.workerNodes.reduce(
371 (accumulator, workerNode) =>
f59e1027 372 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
373 0
374 ),
a4e07f72 375 executedTasks: this.workerNodes.reduce(
6b27d407 376 (accumulator, workerNode) =>
f59e1027 377 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
378 0
379 ),
380 executingTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
f59e1027 382 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
383 0
384 ),
daf86646
JB
385 ...(this.opts.enableTasksQueue === true && {
386 queuedTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.queued,
389 0
390 )
391 }),
392 ...(this.opts.enableTasksQueue === true && {
393 maxQueuedTasks: this.workerNodes.reduce(
394 (accumulator, workerNode) =>
395 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
396 0
397 )
398 }),
a1763c54
JB
399 ...(this.opts.enableTasksQueue === true && {
400 backPressure: this.hasBackPressure()
401 }),
68cbdc84
JB
402 ...(this.opts.enableTasksQueue === true && {
403 stolenTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + workerNode.usage.tasks.stolen,
406 0
407 )
408 }),
a4e07f72
JB
409 failedTasks: this.workerNodes.reduce(
410 (accumulator, workerNode) =>
f59e1027 411 accumulator + workerNode.usage.tasks.failed,
a4e07f72 412 0
1dcf8b7b
JB
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .runTime.aggregate && {
416 runTime: {
98e72cda
JB
417 minimum: round(
418 Math.min(
419 ...this.workerNodes.map(
8ebe6c30 420 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 421 )
1dcf8b7b
JB
422 )
423 ),
98e72cda
JB
424 maximum: round(
425 Math.max(
426 ...this.workerNodes.map(
8ebe6c30 427 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 428 )
1dcf8b7b 429 )
98e72cda 430 ),
3baa0837
JB
431 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
432 .runTime.average && {
433 average: round(
434 average(
435 this.workerNodes.reduce<number[]>(
436 (accumulator, workerNode) =>
437 accumulator.concat(workerNode.usage.runTime.history),
438 []
439 )
98e72cda 440 )
dc021bcc 441 )
3baa0837 442 }),
98e72cda
JB
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .runTime.median && {
445 median: round(
446 median(
3baa0837
JB
447 this.workerNodes.reduce<number[]>(
448 (accumulator, workerNode) =>
449 accumulator.concat(workerNode.usage.runTime.history),
450 []
98e72cda
JB
451 )
452 )
453 )
454 })
1dcf8b7b
JB
455 }
456 }),
457 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
458 .waitTime.aggregate && {
459 waitTime: {
98e72cda
JB
460 minimum: round(
461 Math.min(
462 ...this.workerNodes.map(
8ebe6c30 463 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 464 )
1dcf8b7b
JB
465 )
466 ),
98e72cda
JB
467 maximum: round(
468 Math.max(
469 ...this.workerNodes.map(
8ebe6c30 470 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 471 )
1dcf8b7b 472 )
98e72cda 473 ),
3baa0837
JB
474 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
475 .waitTime.average && {
476 average: round(
477 average(
478 this.workerNodes.reduce<number[]>(
479 (accumulator, workerNode) =>
480 accumulator.concat(workerNode.usage.waitTime.history),
481 []
482 )
98e72cda 483 )
dc021bcc 484 )
3baa0837 485 }),
98e72cda
JB
486 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 .waitTime.median && {
488 median: round(
489 median(
3baa0837
JB
490 this.workerNodes.reduce<number[]>(
491 (accumulator, workerNode) =>
492 accumulator.concat(workerNode.usage.waitTime.history),
493 []
98e72cda
JB
494 )
495 )
496 )
497 })
1dcf8b7b
JB
498 }
499 })
6b27d407
JB
500 }
501 }
08f3f44c 502
aa9eede8
JB
503 /**
504 * The pool readiness boolean status.
505 */
2431bdb4
JB
506 private get ready (): boolean {
507 return (
b97d82d8
JB
508 this.workerNodes.reduce(
509 (accumulator, workerNode) =>
510 !workerNode.info.dynamic && workerNode.info.ready
511 ? accumulator + 1
512 : accumulator,
513 0
514 ) >= this.minSize
2431bdb4
JB
515 )
516 }
517
afe0d5bf 518 /**
aa9eede8 519 * The approximate pool utilization.
afe0d5bf
JB
520 *
521 * @returns The pool utilization.
522 */
523 private get utilization (): number {
8e5ca040 524 const poolTimeCapacity =
fe7d90db 525 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
526 const totalTasksRunTime = this.workerNodes.reduce(
527 (accumulator, workerNode) =>
71514351 528 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
529 0
530 )
531 const totalTasksWaitTime = this.workerNodes.reduce(
532 (accumulator, workerNode) =>
71514351 533 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
534 0
535 )
8e5ca040 536 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
537 }
538
8881ae32 539 /**
aa9eede8 540 * The pool type.
8881ae32
JB
541 *
542 * If it is `'dynamic'`, it provides the `max` property.
543 */
544 protected abstract get type (): PoolType
545
184855e6 546 /**
aa9eede8 547 * The worker type.
184855e6
JB
548 */
549 protected abstract get worker (): WorkerType
550
c2ade475 551 /**
aa9eede8 552 * The pool minimum size.
c2ade475 553 */
8735b4e5
JB
554 protected get minSize (): number {
555 return this.numberOfWorkers
556 }
ff733df7
JB
557
558 /**
aa9eede8 559 * The pool maximum size.
ff733df7 560 */
8735b4e5
JB
561 protected get maxSize (): number {
562 return this.max ?? this.numberOfWorkers
563 }
a35560ba 564
6b813701
JB
565 /**
566 * Checks if the worker id sent in the received message from a worker is valid.
567 *
568 * @param message - The received message.
569 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
570 */
21f710aa 571 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
572 if (message.workerId == null) {
573 throw new Error('Worker message received without worker id')
574 } else if (
21f710aa 575 message.workerId != null &&
aad6fb64 576 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
577 ) {
578 throw new Error(
579 `Worker message received from unknown worker '${message.workerId}'`
580 )
581 }
582 }
583
ffcbbad8 584 /**
f06e48d8 585 * Gets the given worker its worker node key.
ffcbbad8
JB
586 *
587 * @param worker - The worker.
f59e1027 588 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 589 */
aad6fb64 590 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 591 return this.workerNodes.findIndex(
8ebe6c30 592 (workerNode) => workerNode.worker === worker
f06e48d8 593 )
bf9549ae
JB
594 }
595
aa9eede8
JB
596 /**
597 * Gets the worker node key given its worker id.
598 *
599 * @param workerId - The worker id.
aad6fb64 600 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 601 */
aad6fb64
JB
602 private getWorkerNodeKeyByWorkerId (workerId: number): number {
603 return this.workerNodes.findIndex(
8ebe6c30 604 (workerNode) => workerNode.info.id === workerId
aad6fb64 605 )
aa9eede8
JB
606 }
607
afc003b2 608 /** @inheritDoc */
a35560ba 609 public setWorkerChoiceStrategy (
59219cbb
JB
610 workerChoiceStrategy: WorkerChoiceStrategy,
611 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 612 ): void {
aee46736 613 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 614 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
615 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
616 this.opts.workerChoiceStrategy
617 )
618 if (workerChoiceStrategyOptions != null) {
619 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
620 }
aa9eede8 621 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 622 workerNode.resetUsage()
9edb9717 623 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 624 }
a20f0ba5
JB
625 }
626
627 /** @inheritDoc */
628 public setWorkerChoiceStrategyOptions (
629 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
630 ): void {
0d80593b 631 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
632 this.opts.workerChoiceStrategyOptions = {
633 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
634 ...workerChoiceStrategyOptions
635 }
a20f0ba5
JB
636 this.workerChoiceStrategyContext.setOptions(
637 this.opts.workerChoiceStrategyOptions
a35560ba
S
638 )
639 }
640
a20f0ba5 641 /** @inheritDoc */
8f52842f
JB
642 public enableTasksQueue (
643 enable: boolean,
644 tasksQueueOptions?: TasksQueueOptions
645 ): void {
a20f0ba5 646 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 647 this.flushTasksQueues()
a20f0ba5
JB
648 }
649 this.opts.enableTasksQueue = enable
8f52842f 650 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
651 }
652
653 /** @inheritDoc */
8f52842f 654 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 655 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
656 this.checkValidTasksQueueOptions(tasksQueueOptions)
657 this.opts.tasksQueueOptions =
658 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 659 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 660 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
661 delete this.opts.tasksQueueOptions
662 }
663 }
664
ff3f866a 665 private setTasksQueueMaxSize (size: number): void {
20c6f652 666 for (const workerNode of this.workerNodes) {
ff3f866a 667 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
668 }
669 }
670
a20f0ba5
JB
671 private buildTasksQueueOptions (
672 tasksQueueOptions: TasksQueueOptions
673 ): TasksQueueOptions {
674 return {
20c6f652 675 ...{
ff3f866a 676 size: Math.pow(this.maxSize, 2),
20c6f652
JB
677 concurrency: 1
678 },
679 ...tasksQueueOptions
a20f0ba5
JB
680 }
681 }
682
c319c66b
JB
683 /**
684 * Whether the pool is full or not.
685 *
686 * The pool filling boolean status.
687 */
dea903a8
JB
688 protected get full (): boolean {
689 return this.workerNodes.length >= this.maxSize
690 }
c2ade475 691
c319c66b
JB
692 /**
693 * Whether the pool is busy or not.
694 *
695 * The pool busyness boolean status.
696 */
697 protected abstract get busy (): boolean
7c0ba920 698
6c6afb84 699 /**
3d76750a 700 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
701 *
702 * @returns Worker nodes busyness boolean status.
703 */
c2ade475 704 protected internalBusy (): boolean {
3d76750a
JB
705 if (this.opts.enableTasksQueue === true) {
706 return (
707 this.workerNodes.findIndex(
8ebe6c30 708 (workerNode) =>
3d76750a
JB
709 workerNode.info.ready &&
710 workerNode.usage.tasks.executing <
711 (this.opts.tasksQueueOptions?.concurrency as number)
712 ) === -1
713 )
714 } else {
715 return (
716 this.workerNodes.findIndex(
8ebe6c30 717 (workerNode) =>
3d76750a
JB
718 workerNode.info.ready && workerNode.usage.tasks.executing === 0
719 ) === -1
720 )
721 }
cb70b19d
JB
722 }
723
90d7d101
JB
724 /** @inheritDoc */
725 public listTaskFunctions (): string[] {
f2dbbf95
JB
726 for (const workerNode of this.workerNodes) {
727 if (
728 Array.isArray(workerNode.info.taskFunctions) &&
729 workerNode.info.taskFunctions.length > 0
730 ) {
731 return workerNode.info.taskFunctions
732 }
90d7d101 733 }
f2dbbf95 734 return []
90d7d101
JB
735 }
736
afc003b2 737 /** @inheritDoc */
7d91a8cd
JB
738 public async execute (
739 data?: Data,
740 name?: string,
741 transferList?: TransferListItem[]
742 ): Promise<Response> {
52b71763 743 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
744 if (!this.started) {
745 reject(new Error('Cannot execute a task on destroyed pool'))
746 }
7d91a8cd
JB
747 if (name != null && typeof name !== 'string') {
748 reject(new TypeError('name argument must be a string'))
749 }
90d7d101
JB
750 if (
751 name != null &&
752 typeof name === 'string' &&
753 name.trim().length === 0
754 ) {
f58b60b9 755 reject(new TypeError('name argument must not be an empty string'))
90d7d101 756 }
b558f6b5
JB
757 if (transferList != null && !Array.isArray(transferList)) {
758 reject(new TypeError('transferList argument must be an array'))
759 }
760 const timestamp = performance.now()
761 const workerNodeKey = this.chooseWorkerNode()
94407def 762 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
763 if (
764 name != null &&
a5d15204
JB
765 Array.isArray(workerInfo.taskFunctions) &&
766 !workerInfo.taskFunctions.includes(name)
cea399c8 767 ) {
90d7d101
JB
768 reject(
769 new Error(`Task function '${name}' is not registered in the pool`)
770 )
771 }
501aea93 772 const task: Task<Data> = {
52b71763
JB
773 name: name ?? DEFAULT_TASK_NAME,
774 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
775 data: data ?? ({} as Data),
7d91a8cd 776 transferList,
52b71763 777 timestamp,
a5d15204 778 workerId: workerInfo.id as number,
7629bdf1 779 taskId: randomUUID()
52b71763 780 }
7629bdf1 781 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
782 resolve,
783 reject,
501aea93 784 workerNodeKey
2e81254d 785 })
52b71763 786 if (
4e377863
JB
787 this.opts.enableTasksQueue === false ||
788 (this.opts.enableTasksQueue === true &&
033f1776 789 this.tasksQueueSize(workerNodeKey) === 0 &&
4e377863 790 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 791 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 792 ) {
501aea93 793 this.executeTask(workerNodeKey, task)
4e377863
JB
794 } else {
795 this.enqueueTask(workerNodeKey, task)
52b71763 796 }
2e81254d 797 })
280c2a77 798 }
c97c7edb 799
afc003b2 800 /** @inheritDoc */
c97c7edb 801 public async destroy (): Promise<void> {
1fbcaa7c 802 await Promise.all(
81c02522 803 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 804 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
805 })
806 )
33e6bb4c 807 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 808 this.started = false
c97c7edb
S
809 }
810
1e3214b6
JB
811 protected async sendKillMessageToWorker (
812 workerNodeKey: number,
813 workerId: number
814 ): Promise<void> {
9edb9717 815 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
816 this.registerWorkerMessageListener(workerNodeKey, (message) => {
817 if (message.kill === 'success') {
818 resolve()
819 } else if (message.kill === 'failure') {
e1af34e6 820 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
821 }
822 })
9edb9717 823 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 824 })
1e3214b6
JB
825 }
826
4a6952ff 827 /**
aa9eede8 828 * Terminates the worker node given its worker node key.
4a6952ff 829 *
aa9eede8 830 * @param workerNodeKey - The worker node key.
4a6952ff 831 */
81c02522 832 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 833
729c563d 834 /**
6677a3d3
JB
835 * Setup hook to execute code before worker nodes are created in the abstract constructor.
836 * Can be overridden.
afc003b2
JB
837 *
838 * @virtual
729c563d 839 */
280c2a77 840 protected setupHook (): void {
033f1776 841 /** Intentionally empty */
280c2a77 842 }
c97c7edb 843
729c563d 844 /**
280c2a77
S
845 * Should return whether the worker is the main worker or not.
846 */
847 protected abstract isMain (): boolean
848
849 /**
2e81254d 850 * Hook executed before the worker task execution.
bf9549ae 851 * Can be overridden.
729c563d 852 *
f06e48d8 853 * @param workerNodeKey - The worker node key.
1c6fe997 854 * @param task - The task to execute.
729c563d 855 */
1c6fe997
JB
856 protected beforeTaskExecutionHook (
857 workerNodeKey: number,
858 task: Task<Data>
859 ): void {
94407def
JB
860 if (this.workerNodes[workerNodeKey]?.usage != null) {
861 const workerUsage = this.workerNodes[workerNodeKey].usage
862 ++workerUsage.tasks.executing
863 this.updateWaitTimeWorkerUsage(workerUsage, task)
864 }
865 if (
866 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
867 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
868 task.name as string
869 ) != null
870 ) {
db0e38ee 871 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 872 workerNodeKey
db0e38ee 873 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
874 ++taskFunctionWorkerUsage.tasks.executing
875 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 876 }
c97c7edb
S
877 }
878
c01733f1 879 /**
2e81254d 880 * Hook executed after the worker task execution.
bf9549ae 881 * Can be overridden.
c01733f1 882 *
501aea93 883 * @param workerNodeKey - The worker node key.
38e795c1 884 * @param message - The received message.
c01733f1 885 */
2e81254d 886 protected afterTaskExecutionHook (
501aea93 887 workerNodeKey: number,
2740a743 888 message: MessageValue<Response>
bf9549ae 889 ): void {
94407def
JB
890 if (this.workerNodes[workerNodeKey]?.usage != null) {
891 const workerUsage = this.workerNodes[workerNodeKey].usage
892 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
893 this.updateRunTimeWorkerUsage(workerUsage, message)
894 this.updateEluWorkerUsage(workerUsage, message)
895 }
896 if (
897 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
898 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 899 message.taskPerformance?.name as string
94407def
JB
900 ) != null
901 ) {
db0e38ee 902 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 903 workerNodeKey
db0e38ee 904 ].getTaskFunctionWorkerUsage(
0628755c 905 message.taskPerformance?.name as string
b558f6b5 906 ) as WorkerUsage
db0e38ee
JB
907 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
908 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
909 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
910 }
911 }
912
db0e38ee
JB
913 /**
914 * Whether the worker node shall update its task function worker usage or not.
915 *
916 * @param workerNodeKey - The worker node key.
917 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
918 */
919 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 920 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 921 return (
94407def 922 workerInfo != null &&
a5d15204 923 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 924 workerInfo.taskFunctions.length > 2
b558f6b5 925 )
f1c06930
JB
926 }
927
928 private updateTaskStatisticsWorkerUsage (
929 workerUsage: WorkerUsage,
930 message: MessageValue<Response>
931 ): void {
a4e07f72 932 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
933 if (
934 workerTaskStatistics.executing != null &&
935 workerTaskStatistics.executing > 0
936 ) {
937 --workerTaskStatistics.executing
5bb5be17 938 }
98e72cda
JB
939 if (message.taskError == null) {
940 ++workerTaskStatistics.executed
941 } else {
a4e07f72 942 ++workerTaskStatistics.failed
2740a743 943 }
f8eb0a2a
JB
944 }
945
a4e07f72
JB
946 private updateRunTimeWorkerUsage (
947 workerUsage: WorkerUsage,
f8eb0a2a
JB
948 message: MessageValue<Response>
949 ): void {
dc021bcc
JB
950 if (message.taskError != null) {
951 return
952 }
e4f20deb
JB
953 updateMeasurementStatistics(
954 workerUsage.runTime,
955 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 956 message.taskPerformance?.runTime ?? 0
e4f20deb 957 )
f8eb0a2a
JB
958 }
959
a4e07f72
JB
960 private updateWaitTimeWorkerUsage (
961 workerUsage: WorkerUsage,
1c6fe997 962 task: Task<Data>
f8eb0a2a 963 ): void {
1c6fe997
JB
964 const timestamp = performance.now()
965 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
966 updateMeasurementStatistics(
967 workerUsage.waitTime,
968 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 969 taskWaitTime
e4f20deb 970 )
c01733f1 971 }
972
a4e07f72 973 private updateEluWorkerUsage (
5df69fab 974 workerUsage: WorkerUsage,
62c15a68
JB
975 message: MessageValue<Response>
976 ): void {
dc021bcc
JB
977 if (message.taskError != null) {
978 return
979 }
008512c7
JB
980 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
981 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
982 updateMeasurementStatistics(
983 workerUsage.elu.active,
008512c7 984 eluTaskStatisticsRequirements,
dc021bcc 985 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
986 )
987 updateMeasurementStatistics(
988 workerUsage.elu.idle,
008512c7 989 eluTaskStatisticsRequirements,
dc021bcc 990 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 991 )
008512c7 992 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 993 if (message.taskPerformance?.elu != null) {
f7510105
JB
994 if (workerUsage.elu.utilization != null) {
995 workerUsage.elu.utilization =
996 (workerUsage.elu.utilization +
997 message.taskPerformance.elu.utilization) /
998 2
999 } else {
1000 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1001 }
62c15a68
JB
1002 }
1003 }
1004 }
1005
280c2a77 1006 /**
f06e48d8 1007 * Chooses a worker node for the next task.
280c2a77 1008 *
6c6afb84 1009 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1010 *
aa9eede8 1011 * @returns The chosen worker node key
280c2a77 1012 */
6c6afb84 1013 private chooseWorkerNode (): number {
930dcf12 1014 if (this.shallCreateDynamicWorker()) {
aa9eede8 1015 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1016 if (
b1aae695 1017 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1018 ) {
aa9eede8 1019 return workerNodeKey
6c6afb84 1020 }
17393ac8 1021 }
930dcf12
JB
1022 return this.workerChoiceStrategyContext.execute()
1023 }
1024
6c6afb84
JB
1025 /**
1026 * Conditions for dynamic worker creation.
1027 *
1028 * @returns Whether to create a dynamic worker or not.
1029 */
1030 private shallCreateDynamicWorker (): boolean {
930dcf12 1031 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1032 }
1033
280c2a77 1034 /**
aa9eede8 1035 * Sends a message to worker given its worker node key.
280c2a77 1036 *
aa9eede8 1037 * @param workerNodeKey - The worker node key.
38e795c1 1038 * @param message - The message.
7d91a8cd 1039 * @param transferList - The optional array of transferable objects.
280c2a77
S
1040 */
1041 protected abstract sendToWorker (
aa9eede8 1042 workerNodeKey: number,
7d91a8cd
JB
1043 message: MessageValue<Data>,
1044 transferList?: TransferListItem[]
280c2a77
S
1045 ): void
1046
729c563d 1047 /**
41344292 1048 * Creates a new worker.
6c6afb84
JB
1049 *
1050 * @returns Newly created worker.
729c563d 1051 */
280c2a77 1052 protected abstract createWorker (): Worker
c97c7edb 1053
4a6952ff 1054 /**
aa9eede8 1055 * Creates a new, completely set up worker node.
4a6952ff 1056 *
aa9eede8 1057 * @returns New, completely set up worker node key.
4a6952ff 1058 */
aa9eede8 1059 protected createAndSetupWorkerNode (): number {
bdacc2d2 1060 const worker = this.createWorker()
280c2a77 1061
fd04474e 1062 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1063 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1064 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1065 worker.on('error', (error) => {
aad6fb64 1066 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1067 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1068 workerInfo.ready = false
0dc838e3 1069 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1070 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1071 if (
1072 this.opts.restartWorkerOnError === true &&
1073 !this.starting &&
1074 this.started
1075 ) {
9b106837 1076 if (workerInfo.dynamic) {
aa9eede8 1077 this.createAndSetupDynamicWorkerNode()
8a1260a3 1078 } else {
aa9eede8 1079 this.createAndSetupWorkerNode()
8a1260a3 1080 }
5baee0d7 1081 }
19dbc45b 1082 if (this.opts.enableTasksQueue === true) {
9b106837 1083 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1084 }
5baee0d7 1085 })
a35560ba 1086 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1087 worker.once('exit', () => {
f06e48d8 1088 this.removeWorkerNode(worker)
a974afa6 1089 })
280c2a77 1090
aa9eede8 1091 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1092
aa9eede8 1093 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1094
aa9eede8 1095 return workerNodeKey
c97c7edb 1096 }
be0676b3 1097
930dcf12 1098 /**
aa9eede8 1099 * Creates a new, completely set up dynamic worker node.
930dcf12 1100 *
aa9eede8 1101 * @returns New, completely set up dynamic worker node key.
930dcf12 1102 */
aa9eede8
JB
1103 protected createAndSetupDynamicWorkerNode (): number {
1104 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1105 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1106 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1107 message.workerId
aad6fb64 1108 )
aa9eede8 1109 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1110 // Kill message received from worker
930dcf12
JB
1111 if (
1112 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1113 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1114 ((this.opts.enableTasksQueue === false &&
aa9eede8 1115 workerUsage.tasks.executing === 0) ||
7b56f532 1116 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1117 workerUsage.tasks.executing === 0 &&
1118 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1119 ) {
5270d253
JB
1120 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1121 this.emitter?.emit(PoolEvents.error, error)
1122 })
930dcf12
JB
1123 }
1124 })
94407def 1125 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1126 this.sendToWorker(workerNodeKey, {
b0a4db63 1127 checkActive: true,
21f710aa
JB
1128 workerId: workerInfo.id as number
1129 })
b5e113f6 1130 workerInfo.dynamic = true
b1aae695
JB
1131 if (
1132 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1133 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1134 ) {
b5e113f6
JB
1135 workerInfo.ready = true
1136 }
33e6bb4c 1137 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1138 return workerNodeKey
930dcf12
JB
1139 }
1140
a2ed5053 1141 /**
aa9eede8 1142 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1143 *
aa9eede8 1144 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1145 * @param listener - The message listener callback.
1146 */
85aeb3f3
JB
1147 protected abstract registerWorkerMessageListener<
1148 Message extends Data | Response
aa9eede8
JB
1149 >(
1150 workerNodeKey: number,
1151 listener: (message: MessageValue<Message>) => void
1152 ): void
a2ed5053
JB
1153
1154 /**
aa9eede8 1155 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1156 * Can be overridden.
1157 *
aa9eede8 1158 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1159 */
aa9eede8 1160 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1161 // Listen to worker messages.
aa9eede8 1162 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1163 // Send the startup message to worker.
aa9eede8 1164 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1165 // Send the statistics message to worker.
1166 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1167 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1168 this.workerNodes[workerNodeKey].onEmptyQueue =
1169 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1170 this.workerNodes[workerNodeKey].onBackPressure =
1171 this.tasksStealingOnBackPressure.bind(this)
1172 }
d2c73f82
JB
1173 }
1174
85aeb3f3 1175 /**
aa9eede8
JB
1176 * Sends the startup message to worker given its worker node key.
1177 *
1178 * @param workerNodeKey - The worker node key.
1179 */
1180 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1181
1182 /**
9edb9717 1183 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1184 *
aa9eede8 1185 * @param workerNodeKey - The worker node key.
85aeb3f3 1186 */
9edb9717 1187 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1188 this.sendToWorker(workerNodeKey, {
1189 statistics: {
1190 runTime:
1191 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1192 .runTime.aggregate,
1193 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1194 .elu.aggregate
1195 },
94407def 1196 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1197 })
1198 }
a2ed5053
JB
1199
1200 private redistributeQueuedTasks (workerNodeKey: number): void {
1201 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1202 let destinationWorkerNodeKey!: number
a2ed5053 1203 let minQueuedTasks = Infinity
a6b3272b 1204 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3 1205 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
58baffd3
JB
1206 if (workerNode.usage.tasks.queued === 0) {
1207 destinationWorkerNodeKey = workerNodeId
1208 break
1209 }
1210 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1211 minQueuedTasks = workerNode.usage.tasks.queued
1212 destinationWorkerNodeKey = workerNodeId
1213 }
a2ed5053
JB
1214 }
1215 }
0bc68267 1216 if (destinationWorkerNodeKey != null) {
033f1776 1217 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1218 const task = {
1219 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1220 workerId: destinationWorkerNode.info.id as number
0bc68267 1221 }
033f1776
JB
1222 if (
1223 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1224 destinationWorkerNode.usage.tasks.executing <
1225 (this.opts.tasksQueueOptions?.concurrency as number)
1226 ) {
0bc68267
JB
1227 this.executeTask(destinationWorkerNodeKey, task)
1228 } else {
1229 this.enqueueTask(destinationWorkerNodeKey, task)
1230 }
dd951876
JB
1231 }
1232 }
1233 }
1234
1235 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1236 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1237 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1238 const workerNodes = this.workerNodes
a6b3272b 1239 .slice()
dd951876
JB
1240 .sort(
1241 (workerNodeA, workerNodeB) =>
1242 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1243 )
dd951876 1244 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1245 if (sourceWorkerNode.usage.tasks.queued === 0) {
1246 break
1247 }
a6b3272b
JB
1248 if (
1249 sourceWorkerNode.info.ready &&
1250 sourceWorkerNode.info.id !== workerId &&
1251 sourceWorkerNode.usage.tasks.queued > 0
1252 ) {
1253 const task = {
1254 ...(sourceWorkerNode.popTask() as Task<Data>),
1255 workerId: destinationWorkerNode.info.id as number
1256 }
dd951876 1257 if (
2cb4ff45 1258 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
a6b3272b 1259 destinationWorkerNode.usage.tasks.executing <
033f1776 1260 (this.opts.tasksQueueOptions?.concurrency as number)
dd951876 1261 ) {
2cb4ff45
JB
1262 this.executeTask(destinationWorkerNodeKey, task)
1263 } else {
1264 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876 1265 }
b2559a74
JB
1266 if (destinationWorkerNode?.usage != null) {
1267 ++destinationWorkerNode.usage.tasks.stolen
1268 }
1269 if (
1270 this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
1271 destinationWorkerNode.getTaskFunctionWorkerUsage(
1272 task.name as string
1273 ) != null
1274 ) {
68cbdc84
JB
1275 const taskFunctionWorkerUsage =
1276 destinationWorkerNode.getTaskFunctionWorkerUsage(
1277 task.name as string
1278 ) as WorkerUsage
1279 ++taskFunctionWorkerUsage.tasks.stolen
1280 }
dd951876 1281 break
72695f86
JB
1282 }
1283 }
1284 }
1285
1286 private tasksStealingOnBackPressure (workerId: number): void {
68dbcdc0
JB
1287 if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
1288 return
1289 }
72695f86
JB
1290 const sourceWorkerNode =
1291 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1292 const workerNodes = this.workerNodes
a6b3272b 1293 .slice()
72695f86
JB
1294 .sort(
1295 (workerNodeA, workerNodeB) =>
1296 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1297 )
1298 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1299 if (
0bc68267 1300 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1301 workerNode.info.ready &&
1302 workerNode.info.id !== workerId &&
0bc68267
JB
1303 workerNode.usage.tasks.queued <
1304 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1305 ) {
dd951876
JB
1306 const task = {
1307 ...(sourceWorkerNode.popTask() as Task<Data>),
1308 workerId: workerNode.info.id as number
1309 }
4de3d785 1310 if (
033f1776 1311 this.tasksQueueSize(workerNodeKey) === 0 &&
4de3d785 1312 workerNode.usage.tasks.executing <
033f1776 1313 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785 1314 ) {
dd951876 1315 this.executeTask(workerNodeKey, task)
4de3d785 1316 } else {
dd951876 1317 this.enqueueTask(workerNodeKey, task)
4de3d785 1318 }
b2559a74
JB
1319 if (workerNode?.usage != null) {
1320 ++workerNode.usage.tasks.stolen
1321 }
1322 if (
1323 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1324 workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
1325 ) {
68cbdc84
JB
1326 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1327 task.name as string
1328 ) as WorkerUsage
1329 ++taskFunctionWorkerUsage.tasks.stolen
1330 }
10ecf8fd 1331 }
a2ed5053
JB
1332 }
1333 }
1334
be0676b3 1335 /**
aa9eede8 1336 * This method is the listener registered for each worker message.
be0676b3 1337 *
bdacc2d2 1338 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1339 */
1340 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1341 return (message) => {
21f710aa 1342 this.checkMessageWorkerId(message)
a5d15204 1343 if (message.ready != null && message.taskFunctions != null) {
81c02522 1344 // Worker ready response received from worker
10e2aa7e 1345 this.handleWorkerReadyResponse(message)
7629bdf1 1346 } else if (message.taskId != null) {
81c02522 1347 // Task execution response received from worker
6b272951 1348 this.handleTaskExecutionResponse(message)
90d7d101
JB
1349 } else if (message.taskFunctions != null) {
1350 // Task functions message received from worker
94407def
JB
1351 (
1352 this.getWorkerInfo(
1353 this.getWorkerNodeKeyByWorkerId(message.workerId)
1354 ) as WorkerInfo
b558f6b5 1355 ).taskFunctions = message.taskFunctions
6b272951
JB
1356 }
1357 }
1358 }
1359
10e2aa7e 1360 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1361 if (message.ready === false) {
1362 throw new Error(`Worker ${message.workerId} failed to initialize`)
1363 }
a5d15204 1364 const workerInfo = this.getWorkerInfo(
aad6fb64 1365 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1366 ) as WorkerInfo
a5d15204
JB
1367 workerInfo.ready = message.ready as boolean
1368 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1369 if (this.emitter != null && this.ready) {
1370 this.emitter.emit(PoolEvents.ready, this.info)
1371 }
6b272951
JB
1372 }
1373
1374 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1375 const { taskId, taskError, data } = message
1376 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1377 if (promiseResponse != null) {
5441aea6
JB
1378 if (taskError != null) {
1379 this.emitter?.emit(PoolEvents.taskError, taskError)
1380 promiseResponse.reject(taskError.message)
6b272951 1381 } else {
5441aea6 1382 promiseResponse.resolve(data as Response)
6b272951 1383 }
501aea93
JB
1384 const workerNodeKey = promiseResponse.workerNodeKey
1385 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1386 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1387 if (
1388 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1389 this.tasksQueueSize(workerNodeKey) > 0 &&
1390 this.workerNodes[workerNodeKey].usage.tasks.executing <
1391 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1392 ) {
1393 this.executeTask(
1394 workerNodeKey,
1395 this.dequeueTask(workerNodeKey) as Task<Data>
1396 )
be0676b3 1397 }
6b272951 1398 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1399 }
be0676b3 1400 }
7c0ba920 1401
a1763c54 1402 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1403 if (this.busy) {
1404 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1405 }
1406 }
1407
1408 private checkAndEmitTaskQueuingEvents (): void {
1409 if (this.hasBackPressure()) {
1410 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1411 }
1412 }
1413
33e6bb4c
JB
1414 private checkAndEmitDynamicWorkerCreationEvents (): void {
1415 if (this.type === PoolTypes.dynamic) {
1416 if (this.full) {
1417 this.emitter?.emit(PoolEvents.full, this.info)
1418 }
1419 }
1420 }
1421
8a1260a3 1422 /**
aa9eede8 1423 * Gets the worker information given its worker node key.
8a1260a3
JB
1424 *
1425 * @param workerNodeKey - The worker node key.
3f09ed9f 1426 * @returns The worker information.
8a1260a3 1427 */
94407def
JB
1428 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1429 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1430 }
1431
a05c10de 1432 /**
b0a4db63 1433 * Adds the given worker in the pool worker nodes.
ea7a90d3 1434 *
38e795c1 1435 * @param worker - The worker.
aa9eede8
JB
1436 * @returns The added worker node key.
1437 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1438 */
b0a4db63 1439 private addWorkerNode (worker: Worker): number {
671d5154
JB
1440 const workerNode = new WorkerNode<Worker, Data>(
1441 worker,
1442 this.worker,
ff3f866a 1443 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1444 )
b97d82d8 1445 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1446 if (this.starting) {
1447 workerNode.info.ready = true
1448 }
aa9eede8 1449 this.workerNodes.push(workerNode)
aad6fb64 1450 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1451 if (workerNodeKey === -1) {
a5d15204 1452 throw new Error('Worker node added not found')
aa9eede8
JB
1453 }
1454 return workerNodeKey
ea7a90d3 1455 }
c923ce56 1456
51fe3d3c 1457 /**
f06e48d8 1458 * Removes the given worker from the pool worker nodes.
51fe3d3c 1459 *
f06e48d8 1460 * @param worker - The worker.
51fe3d3c 1461 */
416fd65c 1462 private removeWorkerNode (worker: Worker): void {
aad6fb64 1463 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1464 if (workerNodeKey !== -1) {
1465 this.workerNodes.splice(workerNodeKey, 1)
1466 this.workerChoiceStrategyContext.remove(workerNodeKey)
1467 }
51fe3d3c 1468 }
adc3c320 1469
e2b31e32
JB
1470 /** @inheritDoc */
1471 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1472 return (
e2b31e32
JB
1473 this.opts.enableTasksQueue === true &&
1474 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1475 )
1476 }
1477
1478 private hasBackPressure (): boolean {
1479 return (
1480 this.opts.enableTasksQueue === true &&
1481 this.workerNodes.findIndex(
1482 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1483 ) === -1
9e844245 1484 )
e2b31e32
JB
1485 }
1486
b0a4db63 1487 /**
aa9eede8 1488 * Executes the given task on the worker given its worker node key.
b0a4db63 1489 *
aa9eede8 1490 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1491 * @param task - The task to execute.
1492 */
2e81254d 1493 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1494 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1495 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1496 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1497 }
1498
f9f00b5f 1499 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1500 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1501 this.checkAndEmitTaskQueuingEvents()
1502 return tasksQueueSize
adc3c320
JB
1503 }
1504
416fd65c 1505 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1506 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1507 }
1508
416fd65c 1509 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1510 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1511 }
1512
81c02522 1513 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1514 while (this.tasksQueueSize(workerNodeKey) > 0) {
1515 this.executeTask(
1516 workerNodeKey,
1517 this.dequeueTask(workerNodeKey) as Task<Data>
1518 )
ff733df7 1519 }
4b628b48 1520 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1521 }
1522
ef41a6e6
JB
1523 private flushTasksQueues (): void {
1524 for (const [workerNodeKey] of this.workerNodes.entries()) {
1525 this.flushTasksQueue(workerNodeKey)
1526 }
1527 }
c97c7edb 1528}