fix: fix pool information median computation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
dc021bcc 15 average,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
afe0d5bf 18 median,
e4f20deb
JB
19 round,
20 updateMeasurementStatistics
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
bbfa38a2
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
23ccf9d7 50
729c563d 51/**
ea7a90d3 52 * Base class that implements some shared logic for all poolifier pools.
729c563d 53 *
38e795c1 54 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 57 */
c97c7edb 58export abstract class AbstractPool<
f06e48d8 59 Worker extends IWorker,
d3c8a1a8
S
60 Data = unknown,
61 Response = unknown
c4855468 62> implements IPool<Worker, Data, Response> {
afc003b2 63 /** @inheritDoc */
4b628b48 64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 65
afc003b2 66 /** @inheritDoc */
7c0ba920
JB
67 public readonly emitter?: PoolEmitter
68
be0676b3 69 /**
52b71763 70 * The task execution response promise map.
be0676b3 71 *
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
075e51d1 94 /**
adc9cc64 95 * Whether the pool is starting or not.
075e51d1
JB
96 */
97 private readonly starting: boolean
15b176e0
JB
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
075e51d1 147 this.starting = true
e761c033 148 this.startPool()
075e51d1 149 this.starting = false
15b176e0 150 this.started = true
afe0d5bf
JB
151
152 this.startTimestamp = performance.now()
c97c7edb
S
153 }
154
a35560ba 155 private checkFilePath (filePath: string): void {
ffcbbad8
JB
156 if (
157 filePath == null ||
3d6dd312 158 typeof filePath !== 'string' ||
ffcbbad8
JB
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
c510fea7
APA
161 throw new Error('Please specify a file with a worker implementation')
162 }
3d6dd312
JB
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
c510fea7
APA
166 }
167
8d3782fa
JB
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
78cea37e 173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 174 throw new TypeError(
0d80593b 175 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
176 )
177 } else if (numberOfWorkers < 0) {
473c717a 178 throw new RangeError(
8d3782fa
JB
179 'Cannot instantiate a pool with a negative number of workers'
180 )
6b27d407 181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 187 if (this.type === PoolTypes.dynamic) {
a5ed75b7 188 if (max == null) {
e695d66f 189 throw new TypeError(
a5ed75b7
JB
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
079de991
JB
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
b97d82d8 200 } else if (max === 0) {
079de991 201 throw new RangeError(
d640b48b 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
8d3782fa
JB
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
49be33fe
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 244 throw new Error(
aee46736 245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
246 )
247 }
7c0ba920
JB
248 }
249
0d80593b
JB
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
8990357d
JB
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
49be33fe
JB
274 if (
275 workerChoiceStrategyOptions.weights != null &&
6b27d407 276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
f0d7f803
JB
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
0d80593b
JB
292 }
293
a20f0ba5 294 private checkValidTasksQueueOptions (
ff3f866a 295 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 296 ): void {
0d80593b
JB
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
f0d7f803
JB
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
20c6f652 305 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
e695d66f 312 throw new RangeError(
20c6f652
JB
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
ff3f866a 318 tasksQueueOptions?.size != null
20c6f652 319 ) {
ff3f866a
JB
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
20c6f652
JB
322 )
323 }
ff3f866a
JB
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
20c6f652 327 if (
ff3f866a
JB
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 330 ) {
ff3f866a
JB
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 336 throw new RangeError(
ff3f866a 337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
338 )
339 }
340 }
341
e761c033
JB
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
aa9eede8 350 this.createAndSetupWorkerNode()
e761c033
JB
351 }
352 }
353
08f3f44c 354 /** @inheritDoc */
6b27d407
JB
355 public get info (): PoolInfo {
356 return {
23ccf9d7 357 version,
6b27d407 358 type: this.type,
184855e6 359 worker: this.worker,
2431bdb4
JB
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
362 minSize: this.minSize,
363 maxSize: this.maxSize,
c05f0d50
JB
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
1305e9a8
JB
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
f59e1027 371 workerNode.usage.tasks.executing === 0
a4e07f72
JB
372 ? accumulator + 1
373 : accumulator,
6b27d407
JB
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
379 0
380 ),
a4e07f72 381 executedTasks: this.workerNodes.reduce(
6b27d407 382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
f59e1027 388 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
389 0
390 ),
daf86646
JB
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
a1763c54
JB
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
a4e07f72
JB
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
f59e1027 410 accumulator + workerNode.usage.tasks.failed,
a4e07f72 411 0
1dcf8b7b
JB
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
98e72cda
JB
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
8ebe6c30 419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 420 )
1dcf8b7b
JB
421 )
422 ),
98e72cda
JB
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
8ebe6c30 426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 427 )
1dcf8b7b 428 )
98e72cda 429 ),
3baa0837
JB
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
98e72cda 439 )
dc021bcc 440 )
3baa0837 441 }),
98e72cda
JB
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.median && {
444 median: round(
445 median(
3baa0837
JB
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
98e72cda
JB
450 )
451 )
452 )
453 })
1dcf8b7b
JB
454 }
455 }),
456 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
457 .waitTime.aggregate && {
458 waitTime: {
98e72cda
JB
459 minimum: round(
460 Math.min(
461 ...this.workerNodes.map(
8ebe6c30 462 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 463 )
1dcf8b7b
JB
464 )
465 ),
98e72cda
JB
466 maximum: round(
467 Math.max(
468 ...this.workerNodes.map(
8ebe6c30 469 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 470 )
1dcf8b7b 471 )
98e72cda 472 ),
3baa0837
JB
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.average && {
475 average: round(
476 average(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
98e72cda 482 )
dc021bcc 483 )
3baa0837 484 }),
98e72cda
JB
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.median && {
487 median: round(
488 median(
3baa0837
JB
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
98e72cda
JB
493 )
494 )
495 )
496 })
1dcf8b7b
JB
497 }
498 })
6b27d407
JB
499 }
500 }
08f3f44c 501
aa9eede8
JB
502 /**
503 * The pool readiness boolean status.
504 */
2431bdb4
JB
505 private get ready (): boolean {
506 return (
b97d82d8
JB
507 this.workerNodes.reduce(
508 (accumulator, workerNode) =>
509 !workerNode.info.dynamic && workerNode.info.ready
510 ? accumulator + 1
511 : accumulator,
512 0
513 ) >= this.minSize
2431bdb4
JB
514 )
515 }
516
afe0d5bf 517 /**
aa9eede8 518 * The approximate pool utilization.
afe0d5bf
JB
519 *
520 * @returns The pool utilization.
521 */
522 private get utilization (): number {
8e5ca040 523 const poolTimeCapacity =
fe7d90db 524 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
525 const totalTasksRunTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
71514351 527 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
528 0
529 )
530 const totalTasksWaitTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
8e5ca040 535 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
536 }
537
8881ae32 538 /**
aa9eede8 539 * The pool type.
8881ae32
JB
540 *
541 * If it is `'dynamic'`, it provides the `max` property.
542 */
543 protected abstract get type (): PoolType
544
184855e6 545 /**
aa9eede8 546 * The worker type.
184855e6
JB
547 */
548 protected abstract get worker (): WorkerType
549
c2ade475 550 /**
aa9eede8 551 * The pool minimum size.
c2ade475 552 */
8735b4e5
JB
553 protected get minSize (): number {
554 return this.numberOfWorkers
555 }
ff733df7
JB
556
557 /**
aa9eede8 558 * The pool maximum size.
ff733df7 559 */
8735b4e5
JB
560 protected get maxSize (): number {
561 return this.max ?? this.numberOfWorkers
562 }
a35560ba 563
6b813701
JB
564 /**
565 * Checks if the worker id sent in the received message from a worker is valid.
566 *
567 * @param message - The received message.
568 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
569 */
21f710aa 570 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
571 if (message.workerId == null) {
572 throw new Error('Worker message received without worker id')
573 } else if (
21f710aa 574 message.workerId != null &&
aad6fb64 575 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
576 ) {
577 throw new Error(
578 `Worker message received from unknown worker '${message.workerId}'`
579 )
580 }
581 }
582
ffcbbad8 583 /**
f06e48d8 584 * Gets the given worker its worker node key.
ffcbbad8
JB
585 *
586 * @param worker - The worker.
f59e1027 587 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 588 */
aad6fb64 589 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 590 return this.workerNodes.findIndex(
8ebe6c30 591 (workerNode) => workerNode.worker === worker
f06e48d8 592 )
bf9549ae
JB
593 }
594
aa9eede8
JB
595 /**
596 * Gets the worker node key given its worker id.
597 *
598 * @param workerId - The worker id.
aad6fb64 599 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 600 */
aad6fb64
JB
601 private getWorkerNodeKeyByWorkerId (workerId: number): number {
602 return this.workerNodes.findIndex(
8ebe6c30 603 (workerNode) => workerNode.info.id === workerId
aad6fb64 604 )
aa9eede8
JB
605 }
606
afc003b2 607 /** @inheritDoc */
a35560ba 608 public setWorkerChoiceStrategy (
59219cbb
JB
609 workerChoiceStrategy: WorkerChoiceStrategy,
610 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 611 ): void {
aee46736 612 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 613 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
614 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
615 this.opts.workerChoiceStrategy
616 )
617 if (workerChoiceStrategyOptions != null) {
618 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 }
aa9eede8 620 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 621 workerNode.resetUsage()
9edb9717 622 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 623 }
a20f0ba5
JB
624 }
625
626 /** @inheritDoc */
627 public setWorkerChoiceStrategyOptions (
628 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
629 ): void {
0d80593b 630 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
631 this.opts.workerChoiceStrategyOptions = {
632 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
633 ...workerChoiceStrategyOptions
634 }
a20f0ba5
JB
635 this.workerChoiceStrategyContext.setOptions(
636 this.opts.workerChoiceStrategyOptions
a35560ba
S
637 )
638 }
639
a20f0ba5 640 /** @inheritDoc */
8f52842f
JB
641 public enableTasksQueue (
642 enable: boolean,
643 tasksQueueOptions?: TasksQueueOptions
644 ): void {
a20f0ba5 645 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 646 this.flushTasksQueues()
a20f0ba5
JB
647 }
648 this.opts.enableTasksQueue = enable
8f52842f 649 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
650 }
651
652 /** @inheritDoc */
8f52842f 653 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 654 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
655 this.checkValidTasksQueueOptions(tasksQueueOptions)
656 this.opts.tasksQueueOptions =
657 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 658 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 659 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
660 delete this.opts.tasksQueueOptions
661 }
662 }
663
ff3f866a 664 private setTasksQueueMaxSize (size: number): void {
20c6f652 665 for (const workerNode of this.workerNodes) {
ff3f866a 666 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
667 }
668 }
669
a20f0ba5
JB
670 private buildTasksQueueOptions (
671 tasksQueueOptions: TasksQueueOptions
672 ): TasksQueueOptions {
673 return {
20c6f652 674 ...{
ff3f866a 675 size: Math.pow(this.maxSize, 2),
20c6f652
JB
676 concurrency: 1
677 },
678 ...tasksQueueOptions
a20f0ba5
JB
679 }
680 }
681
c319c66b
JB
682 /**
683 * Whether the pool is full or not.
684 *
685 * The pool filling boolean status.
686 */
dea903a8
JB
687 protected get full (): boolean {
688 return this.workerNodes.length >= this.maxSize
689 }
c2ade475 690
c319c66b
JB
691 /**
692 * Whether the pool is busy or not.
693 *
694 * The pool busyness boolean status.
695 */
696 protected abstract get busy (): boolean
7c0ba920 697
6c6afb84 698 /**
3d76750a 699 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
700 *
701 * @returns Worker nodes busyness boolean status.
702 */
c2ade475 703 protected internalBusy (): boolean {
3d76750a
JB
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes.findIndex(
8ebe6c30 707 (workerNode) =>
3d76750a
JB
708 workerNode.info.ready &&
709 workerNode.usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number)
711 ) === -1
712 )
713 } else {
714 return (
715 this.workerNodes.findIndex(
8ebe6c30 716 (workerNode) =>
3d76750a
JB
717 workerNode.info.ready && workerNode.usage.tasks.executing === 0
718 ) === -1
719 )
720 }
cb70b19d
JB
721 }
722
90d7d101
JB
723 /** @inheritDoc */
724 public listTaskFunctions (): string[] {
f2dbbf95
JB
725 for (const workerNode of this.workerNodes) {
726 if (
727 Array.isArray(workerNode.info.taskFunctions) &&
728 workerNode.info.taskFunctions.length > 0
729 ) {
730 return workerNode.info.taskFunctions
731 }
90d7d101 732 }
f2dbbf95 733 return []
90d7d101
JB
734 }
735
afc003b2 736 /** @inheritDoc */
7d91a8cd
JB
737 public async execute (
738 data?: Data,
739 name?: string,
740 transferList?: TransferListItem[]
741 ): Promise<Response> {
52b71763 742 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
743 if (!this.started) {
744 reject(new Error('Cannot execute a task on destroyed pool'))
745 }
7d91a8cd
JB
746 if (name != null && typeof name !== 'string') {
747 reject(new TypeError('name argument must be a string'))
748 }
90d7d101
JB
749 if (
750 name != null &&
751 typeof name === 'string' &&
752 name.trim().length === 0
753 ) {
f58b60b9 754 reject(new TypeError('name argument must not be an empty string'))
90d7d101 755 }
b558f6b5
JB
756 if (transferList != null && !Array.isArray(transferList)) {
757 reject(new TypeError('transferList argument must be an array'))
758 }
759 const timestamp = performance.now()
760 const workerNodeKey = this.chooseWorkerNode()
94407def 761 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
762 if (
763 name != null &&
a5d15204
JB
764 Array.isArray(workerInfo.taskFunctions) &&
765 !workerInfo.taskFunctions.includes(name)
cea399c8 766 ) {
90d7d101
JB
767 reject(
768 new Error(`Task function '${name}' is not registered in the pool`)
769 )
770 }
501aea93 771 const task: Task<Data> = {
52b71763
JB
772 name: name ?? DEFAULT_TASK_NAME,
773 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
774 data: data ?? ({} as Data),
7d91a8cd 775 transferList,
52b71763 776 timestamp,
a5d15204 777 workerId: workerInfo.id as number,
7629bdf1 778 taskId: randomUUID()
52b71763 779 }
7629bdf1 780 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
781 resolve,
782 reject,
501aea93 783 workerNodeKey
2e81254d 784 })
52b71763 785 if (
4e377863
JB
786 this.opts.enableTasksQueue === false ||
787 (this.opts.enableTasksQueue === true &&
788 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 789 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 790 ) {
501aea93 791 this.executeTask(workerNodeKey, task)
4e377863
JB
792 } else {
793 this.enqueueTask(workerNodeKey, task)
52b71763 794 }
2e81254d 795 })
280c2a77 796 }
c97c7edb 797
afc003b2 798 /** @inheritDoc */
c97c7edb 799 public async destroy (): Promise<void> {
1fbcaa7c 800 await Promise.all(
81c02522 801 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 802 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
803 })
804 )
33e6bb4c 805 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 806 this.started = false
c97c7edb
S
807 }
808
1e3214b6
JB
809 protected async sendKillMessageToWorker (
810 workerNodeKey: number,
811 workerId: number
812 ): Promise<void> {
9edb9717 813 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
814 this.registerWorkerMessageListener(workerNodeKey, (message) => {
815 if (message.kill === 'success') {
816 resolve()
817 } else if (message.kill === 'failure') {
e1af34e6 818 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
819 }
820 })
9edb9717 821 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 822 })
1e3214b6
JB
823 }
824
4a6952ff 825 /**
aa9eede8 826 * Terminates the worker node given its worker node key.
4a6952ff 827 *
aa9eede8 828 * @param workerNodeKey - The worker node key.
4a6952ff 829 */
81c02522 830 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 831
729c563d 832 /**
6677a3d3
JB
833 * Setup hook to execute code before worker nodes are created in the abstract constructor.
834 * Can be overridden.
afc003b2
JB
835 *
836 * @virtual
729c563d 837 */
280c2a77 838 protected setupHook (): void {
d99ba5a8 839 // Intentionally empty
280c2a77 840 }
c97c7edb 841
729c563d 842 /**
280c2a77
S
843 * Should return whether the worker is the main worker or not.
844 */
845 protected abstract isMain (): boolean
846
847 /**
2e81254d 848 * Hook executed before the worker task execution.
bf9549ae 849 * Can be overridden.
729c563d 850 *
f06e48d8 851 * @param workerNodeKey - The worker node key.
1c6fe997 852 * @param task - The task to execute.
729c563d 853 */
1c6fe997
JB
854 protected beforeTaskExecutionHook (
855 workerNodeKey: number,
856 task: Task<Data>
857 ): void {
94407def
JB
858 if (this.workerNodes[workerNodeKey]?.usage != null) {
859 const workerUsage = this.workerNodes[workerNodeKey].usage
860 ++workerUsage.tasks.executing
861 this.updateWaitTimeWorkerUsage(workerUsage, task)
862 }
863 if (
864 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
865 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
866 task.name as string
867 ) != null
868 ) {
db0e38ee 869 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 870 workerNodeKey
db0e38ee 871 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
872 ++taskFunctionWorkerUsage.tasks.executing
873 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 874 }
c97c7edb
S
875 }
876
c01733f1 877 /**
2e81254d 878 * Hook executed after the worker task execution.
bf9549ae 879 * Can be overridden.
c01733f1 880 *
501aea93 881 * @param workerNodeKey - The worker node key.
38e795c1 882 * @param message - The received message.
c01733f1 883 */
2e81254d 884 protected afterTaskExecutionHook (
501aea93 885 workerNodeKey: number,
2740a743 886 message: MessageValue<Response>
bf9549ae 887 ): void {
94407def
JB
888 if (this.workerNodes[workerNodeKey]?.usage != null) {
889 const workerUsage = this.workerNodes[workerNodeKey].usage
890 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
891 this.updateRunTimeWorkerUsage(workerUsage, message)
892 this.updateEluWorkerUsage(workerUsage, message)
893 }
894 if (
895 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
896 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 897 message.taskPerformance?.name as string
94407def
JB
898 ) != null
899 ) {
db0e38ee 900 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 901 workerNodeKey
db0e38ee 902 ].getTaskFunctionWorkerUsage(
0628755c 903 message.taskPerformance?.name as string
b558f6b5 904 ) as WorkerUsage
db0e38ee
JB
905 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
906 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
907 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
908 }
909 }
910
db0e38ee
JB
911 /**
912 * Whether the worker node shall update its task function worker usage or not.
913 *
914 * @param workerNodeKey - The worker node key.
915 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
916 */
917 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 918 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 919 return (
94407def 920 workerInfo != null &&
a5d15204 921 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 922 workerInfo.taskFunctions.length > 2
b558f6b5 923 )
f1c06930
JB
924 }
925
926 private updateTaskStatisticsWorkerUsage (
927 workerUsage: WorkerUsage,
928 message: MessageValue<Response>
929 ): void {
a4e07f72 930 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
931 if (
932 workerTaskStatistics.executing != null &&
933 workerTaskStatistics.executing > 0
934 ) {
935 --workerTaskStatistics.executing
5bb5be17 936 }
98e72cda
JB
937 if (message.taskError == null) {
938 ++workerTaskStatistics.executed
939 } else {
a4e07f72 940 ++workerTaskStatistics.failed
2740a743 941 }
f8eb0a2a
JB
942 }
943
a4e07f72
JB
944 private updateRunTimeWorkerUsage (
945 workerUsage: WorkerUsage,
f8eb0a2a
JB
946 message: MessageValue<Response>
947 ): void {
dc021bcc
JB
948 if (message.taskError != null) {
949 return
950 }
e4f20deb
JB
951 updateMeasurementStatistics(
952 workerUsage.runTime,
953 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 954 message.taskPerformance?.runTime ?? 0
e4f20deb 955 )
f8eb0a2a
JB
956 }
957
a4e07f72
JB
958 private updateWaitTimeWorkerUsage (
959 workerUsage: WorkerUsage,
1c6fe997 960 task: Task<Data>
f8eb0a2a 961 ): void {
1c6fe997
JB
962 const timestamp = performance.now()
963 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
964 updateMeasurementStatistics(
965 workerUsage.waitTime,
966 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 967 taskWaitTime
e4f20deb 968 )
c01733f1 969 }
970
a4e07f72 971 private updateEluWorkerUsage (
5df69fab 972 workerUsage: WorkerUsage,
62c15a68
JB
973 message: MessageValue<Response>
974 ): void {
dc021bcc
JB
975 if (message.taskError != null) {
976 return
977 }
008512c7
JB
978 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
979 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
980 updateMeasurementStatistics(
981 workerUsage.elu.active,
008512c7 982 eluTaskStatisticsRequirements,
dc021bcc 983 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
984 )
985 updateMeasurementStatistics(
986 workerUsage.elu.idle,
008512c7 987 eluTaskStatisticsRequirements,
dc021bcc 988 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 989 )
008512c7 990 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 991 if (message.taskPerformance?.elu != null) {
f7510105
JB
992 if (workerUsage.elu.utilization != null) {
993 workerUsage.elu.utilization =
994 (workerUsage.elu.utilization +
995 message.taskPerformance.elu.utilization) /
996 2
997 } else {
998 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
999 }
62c15a68
JB
1000 }
1001 }
1002 }
1003
280c2a77 1004 /**
f06e48d8 1005 * Chooses a worker node for the next task.
280c2a77 1006 *
6c6afb84 1007 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1008 *
aa9eede8 1009 * @returns The chosen worker node key
280c2a77 1010 */
6c6afb84 1011 private chooseWorkerNode (): number {
930dcf12 1012 if (this.shallCreateDynamicWorker()) {
aa9eede8 1013 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1014 if (
b1aae695 1015 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1016 ) {
aa9eede8 1017 return workerNodeKey
6c6afb84 1018 }
17393ac8 1019 }
930dcf12
JB
1020 return this.workerChoiceStrategyContext.execute()
1021 }
1022
6c6afb84
JB
1023 /**
1024 * Conditions for dynamic worker creation.
1025 *
1026 * @returns Whether to create a dynamic worker or not.
1027 */
1028 private shallCreateDynamicWorker (): boolean {
930dcf12 1029 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1030 }
1031
280c2a77 1032 /**
aa9eede8 1033 * Sends a message to worker given its worker node key.
280c2a77 1034 *
aa9eede8 1035 * @param workerNodeKey - The worker node key.
38e795c1 1036 * @param message - The message.
7d91a8cd 1037 * @param transferList - The optional array of transferable objects.
280c2a77
S
1038 */
1039 protected abstract sendToWorker (
aa9eede8 1040 workerNodeKey: number,
7d91a8cd
JB
1041 message: MessageValue<Data>,
1042 transferList?: TransferListItem[]
280c2a77
S
1043 ): void
1044
729c563d 1045 /**
41344292 1046 * Creates a new worker.
6c6afb84
JB
1047 *
1048 * @returns Newly created worker.
729c563d 1049 */
280c2a77 1050 protected abstract createWorker (): Worker
c97c7edb 1051
4a6952ff 1052 /**
aa9eede8 1053 * Creates a new, completely set up worker node.
4a6952ff 1054 *
aa9eede8 1055 * @returns New, completely set up worker node key.
4a6952ff 1056 */
aa9eede8 1057 protected createAndSetupWorkerNode (): number {
bdacc2d2 1058 const worker = this.createWorker()
280c2a77 1059
fd04474e 1060 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1061 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1062 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1063 worker.on('error', (error) => {
aad6fb64 1064 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1065 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1066 workerInfo.ready = false
0dc838e3 1067 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1068 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1069 if (
1070 this.opts.restartWorkerOnError === true &&
1071 !this.starting &&
1072 this.started
1073 ) {
9b106837 1074 if (workerInfo.dynamic) {
aa9eede8 1075 this.createAndSetupDynamicWorkerNode()
8a1260a3 1076 } else {
aa9eede8 1077 this.createAndSetupWorkerNode()
8a1260a3 1078 }
5baee0d7 1079 }
19dbc45b 1080 if (this.opts.enableTasksQueue === true) {
9b106837 1081 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1082 }
5baee0d7 1083 })
a35560ba 1084 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1085 worker.once('exit', () => {
f06e48d8 1086 this.removeWorkerNode(worker)
a974afa6 1087 })
280c2a77 1088
aa9eede8 1089 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1090
aa9eede8 1091 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1092
aa9eede8 1093 return workerNodeKey
c97c7edb 1094 }
be0676b3 1095
930dcf12 1096 /**
aa9eede8 1097 * Creates a new, completely set up dynamic worker node.
930dcf12 1098 *
aa9eede8 1099 * @returns New, completely set up dynamic worker node key.
930dcf12 1100 */
aa9eede8
JB
1101 protected createAndSetupDynamicWorkerNode (): number {
1102 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1103 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1104 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1105 message.workerId
aad6fb64 1106 )
aa9eede8 1107 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1108 // Kill message received from worker
930dcf12
JB
1109 if (
1110 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1111 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1112 ((this.opts.enableTasksQueue === false &&
aa9eede8 1113 workerUsage.tasks.executing === 0) ||
7b56f532 1114 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1115 workerUsage.tasks.executing === 0 &&
1116 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1117 ) {
5270d253
JB
1118 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1119 this.emitter?.emit(PoolEvents.error, error)
1120 })
930dcf12
JB
1121 }
1122 })
94407def 1123 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1124 this.sendToWorker(workerNodeKey, {
b0a4db63 1125 checkActive: true,
21f710aa
JB
1126 workerId: workerInfo.id as number
1127 })
b5e113f6 1128 workerInfo.dynamic = true
b1aae695
JB
1129 if (
1130 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1131 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1132 ) {
b5e113f6
JB
1133 workerInfo.ready = true
1134 }
33e6bb4c 1135 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1136 return workerNodeKey
930dcf12
JB
1137 }
1138
a2ed5053 1139 /**
aa9eede8 1140 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1141 *
aa9eede8 1142 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1143 * @param listener - The message listener callback.
1144 */
85aeb3f3
JB
1145 protected abstract registerWorkerMessageListener<
1146 Message extends Data | Response
aa9eede8
JB
1147 >(
1148 workerNodeKey: number,
1149 listener: (message: MessageValue<Message>) => void
1150 ): void
a2ed5053
JB
1151
1152 /**
aa9eede8 1153 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1154 * Can be overridden.
1155 *
aa9eede8 1156 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1157 */
aa9eede8 1158 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1159 // Listen to worker messages.
aa9eede8 1160 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1161 // Send the startup message to worker.
aa9eede8 1162 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1163 // Send the statistics message to worker.
1164 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1165 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1166 this.workerNodes[workerNodeKey].onEmptyQueue =
1167 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1168 this.workerNodes[workerNodeKey].onBackPressure =
1169 this.tasksStealingOnBackPressure.bind(this)
1170 }
d2c73f82
JB
1171 }
1172
85aeb3f3 1173 /**
aa9eede8
JB
1174 * Sends the startup message to worker given its worker node key.
1175 *
1176 * @param workerNodeKey - The worker node key.
1177 */
1178 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1179
1180 /**
9edb9717 1181 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1182 *
aa9eede8 1183 * @param workerNodeKey - The worker node key.
85aeb3f3 1184 */
9edb9717 1185 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1186 this.sendToWorker(workerNodeKey, {
1187 statistics: {
1188 runTime:
1189 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1190 .runTime.aggregate,
1191 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1192 .elu.aggregate
1193 },
94407def 1194 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1195 })
1196 }
a2ed5053
JB
1197
1198 private redistributeQueuedTasks (workerNodeKey: number): void {
1199 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1200 let destinationWorkerNodeKey!: number
a2ed5053 1201 let minQueuedTasks = Infinity
10ecf8fd 1202 let executeTask = false
a6b3272b 1203 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3
JB
1204 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1205 if (
1206 workerNode.usage.tasks.executing <
a6b3272b 1207 (this.opts.tasksQueueOptions?.concurrency as number)
58baffd3
JB
1208 ) {
1209 executeTask = true
1210 }
1211 if (workerNode.usage.tasks.queued === 0) {
1212 destinationWorkerNodeKey = workerNodeId
1213 break
1214 }
1215 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1216 minQueuedTasks = workerNode.usage.tasks.queued
1217 destinationWorkerNodeKey = workerNodeId
1218 }
a2ed5053
JB
1219 }
1220 }
0bc68267
JB
1221 if (destinationWorkerNodeKey != null) {
1222 const task = {
1223 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1224 workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
1225 .id as number
1226 }
1227 if (executeTask) {
1228 this.executeTask(destinationWorkerNodeKey, task)
1229 } else {
1230 this.enqueueTask(destinationWorkerNodeKey, task)
1231 }
dd951876
JB
1232 }
1233 }
1234 }
1235
1236 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1237 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1238 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1239 const workerNodes = this.workerNodes
a6b3272b 1240 .slice()
dd951876
JB
1241 .sort(
1242 (workerNodeA, workerNodeB) =>
1243 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1244 )
dd951876 1245 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1246 if (sourceWorkerNode.usage.tasks.queued === 0) {
1247 break
1248 }
a6b3272b
JB
1249 if (
1250 sourceWorkerNode.info.ready &&
1251 sourceWorkerNode.info.id !== workerId &&
1252 sourceWorkerNode.usage.tasks.queued > 0
1253 ) {
1254 const task = {
1255 ...(sourceWorkerNode.popTask() as Task<Data>),
1256 workerId: destinationWorkerNode.info.id as number
1257 }
dd951876 1258 if (
a6b3272b 1259 destinationWorkerNode.usage.tasks.executing <
dd951876
JB
1260 (this.opts.tasksQueueOptions?.concurrency as number)
1261 ) {
dd951876
JB
1262 this.executeTask(destinationWorkerNodeKey, task)
1263 } else {
dd951876
JB
1264 this.enqueueTask(destinationWorkerNodeKey, task)
1265 }
1266 break
72695f86
JB
1267 }
1268 }
1269 }
1270
1271 private tasksStealingOnBackPressure (workerId: number): void {
1272 const sourceWorkerNode =
1273 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1274 const workerNodes = this.workerNodes
a6b3272b 1275 .slice()
72695f86
JB
1276 .sort(
1277 (workerNodeA, workerNodeB) =>
1278 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1279 )
1280 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1281 if (
0bc68267 1282 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1283 workerNode.info.ready &&
1284 workerNode.info.id !== workerId &&
0bc68267
JB
1285 workerNode.usage.tasks.queued <
1286 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1287 ) {
dd951876
JB
1288 const task = {
1289 ...(sourceWorkerNode.popTask() as Task<Data>),
1290 workerId: workerNode.info.id as number
1291 }
4de3d785
JB
1292 if (
1293 workerNode.usage.tasks.executing <
72695f86 1294 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785 1295 ) {
dd951876 1296 this.executeTask(workerNodeKey, task)
4de3d785 1297 } else {
dd951876 1298 this.enqueueTask(workerNodeKey, task)
4de3d785 1299 }
10ecf8fd 1300 }
a2ed5053
JB
1301 }
1302 }
1303
be0676b3 1304 /**
aa9eede8 1305 * This method is the listener registered for each worker message.
be0676b3 1306 *
bdacc2d2 1307 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1308 */
1309 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1310 return (message) => {
21f710aa 1311 this.checkMessageWorkerId(message)
a5d15204 1312 if (message.ready != null && message.taskFunctions != null) {
81c02522 1313 // Worker ready response received from worker
10e2aa7e 1314 this.handleWorkerReadyResponse(message)
7629bdf1 1315 } else if (message.taskId != null) {
81c02522 1316 // Task execution response received from worker
6b272951 1317 this.handleTaskExecutionResponse(message)
90d7d101
JB
1318 } else if (message.taskFunctions != null) {
1319 // Task functions message received from worker
94407def
JB
1320 (
1321 this.getWorkerInfo(
1322 this.getWorkerNodeKeyByWorkerId(message.workerId)
1323 ) as WorkerInfo
b558f6b5 1324 ).taskFunctions = message.taskFunctions
6b272951
JB
1325 }
1326 }
1327 }
1328
10e2aa7e 1329 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1330 if (message.ready === false) {
1331 throw new Error(`Worker ${message.workerId} failed to initialize`)
1332 }
a5d15204 1333 const workerInfo = this.getWorkerInfo(
aad6fb64 1334 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1335 ) as WorkerInfo
a5d15204
JB
1336 workerInfo.ready = message.ready as boolean
1337 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1338 if (this.emitter != null && this.ready) {
1339 this.emitter.emit(PoolEvents.ready, this.info)
1340 }
6b272951
JB
1341 }
1342
1343 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1344 const { taskId, taskError, data } = message
1345 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1346 if (promiseResponse != null) {
5441aea6
JB
1347 if (taskError != null) {
1348 this.emitter?.emit(PoolEvents.taskError, taskError)
1349 promiseResponse.reject(taskError.message)
6b272951 1350 } else {
5441aea6 1351 promiseResponse.resolve(data as Response)
6b272951 1352 }
501aea93
JB
1353 const workerNodeKey = promiseResponse.workerNodeKey
1354 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1355 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1356 if (
1357 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1358 this.tasksQueueSize(workerNodeKey) > 0 &&
1359 this.workerNodes[workerNodeKey].usage.tasks.executing <
1360 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1361 ) {
1362 this.executeTask(
1363 workerNodeKey,
1364 this.dequeueTask(workerNodeKey) as Task<Data>
1365 )
be0676b3 1366 }
6b272951 1367 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1368 }
be0676b3 1369 }
7c0ba920 1370
a1763c54 1371 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1372 if (this.busy) {
1373 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1374 }
1375 }
1376
1377 private checkAndEmitTaskQueuingEvents (): void {
1378 if (this.hasBackPressure()) {
1379 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1380 }
1381 }
1382
33e6bb4c
JB
1383 private checkAndEmitDynamicWorkerCreationEvents (): void {
1384 if (this.type === PoolTypes.dynamic) {
1385 if (this.full) {
1386 this.emitter?.emit(PoolEvents.full, this.info)
1387 }
1388 }
1389 }
1390
8a1260a3 1391 /**
aa9eede8 1392 * Gets the worker information given its worker node key.
8a1260a3
JB
1393 *
1394 * @param workerNodeKey - The worker node key.
3f09ed9f 1395 * @returns The worker information.
8a1260a3 1396 */
94407def
JB
1397 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1398 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1399 }
1400
a05c10de 1401 /**
b0a4db63 1402 * Adds the given worker in the pool worker nodes.
ea7a90d3 1403 *
38e795c1 1404 * @param worker - The worker.
aa9eede8
JB
1405 * @returns The added worker node key.
1406 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1407 */
b0a4db63 1408 private addWorkerNode (worker: Worker): number {
671d5154
JB
1409 const workerNode = new WorkerNode<Worker, Data>(
1410 worker,
1411 this.worker,
ff3f866a 1412 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1413 )
b97d82d8 1414 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1415 if (this.starting) {
1416 workerNode.info.ready = true
1417 }
aa9eede8 1418 this.workerNodes.push(workerNode)
aad6fb64 1419 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1420 if (workerNodeKey === -1) {
a5d15204 1421 throw new Error('Worker node added not found')
aa9eede8
JB
1422 }
1423 return workerNodeKey
ea7a90d3 1424 }
c923ce56 1425
51fe3d3c 1426 /**
f06e48d8 1427 * Removes the given worker from the pool worker nodes.
51fe3d3c 1428 *
f06e48d8 1429 * @param worker - The worker.
51fe3d3c 1430 */
416fd65c 1431 private removeWorkerNode (worker: Worker): void {
aad6fb64 1432 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1433 if (workerNodeKey !== -1) {
1434 this.workerNodes.splice(workerNodeKey, 1)
1435 this.workerChoiceStrategyContext.remove(workerNodeKey)
1436 }
51fe3d3c 1437 }
adc3c320 1438
e2b31e32
JB
1439 /** @inheritDoc */
1440 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1441 return (
e2b31e32
JB
1442 this.opts.enableTasksQueue === true &&
1443 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1444 )
1445 }
1446
1447 private hasBackPressure (): boolean {
1448 return (
1449 this.opts.enableTasksQueue === true &&
1450 this.workerNodes.findIndex(
1451 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1452 ) === -1
9e844245 1453 )
e2b31e32
JB
1454 }
1455
b0a4db63 1456 /**
aa9eede8 1457 * Executes the given task on the worker given its worker node key.
b0a4db63 1458 *
aa9eede8 1459 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1460 * @param task - The task to execute.
1461 */
2e81254d 1462 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1463 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1464 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1465 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1466 }
1467
f9f00b5f 1468 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1469 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1470 this.checkAndEmitTaskQueuingEvents()
1471 return tasksQueueSize
adc3c320
JB
1472 }
1473
416fd65c 1474 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1475 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1476 }
1477
416fd65c 1478 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1479 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1480 }
1481
81c02522 1482 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1483 while (this.tasksQueueSize(workerNodeKey) > 0) {
1484 this.executeTask(
1485 workerNodeKey,
1486 this.dequeueTask(workerNodeKey) as Task<Data>
1487 )
ff733df7 1488 }
4b628b48 1489 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1490 }
1491
ef41a6e6
JB
1492 private flushTasksQueues (): void {
1493 for (const [workerNodeKey] of this.workerNodes.entries()) {
1494 this.flushTasksQueue(workerNodeKey)
1495 }
1496 }
c97c7edb 1497}