chore: use `biome` instead of `rome`
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
c4855468 24import {
65d7a1c9 25 type IPool,
7c5a1080 26 PoolEmitter,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
bbfa38a2
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
23ccf9d7 51
729c563d 52/**
ea7a90d3 53 * Base class that implements some shared logic for all poolifier pools.
729c563d 54 *
38e795c1 55 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractPool<
f06e48d8 60 Worker extends IWorker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
c4855468 63> implements IPool<Worker, Data, Response> {
afc003b2 64 /** @inheritDoc */
4b628b48 65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 66
afc003b2 67 /** @inheritDoc */
7c0ba920
JB
68 public readonly emitter?: PoolEmitter
69
be0676b3 70 /**
52b71763 71 * The task execution response promise map.
be0676b3 72 *
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
075e51d1 95 /**
adc9cc64 96 * Whether the pool is starting or not.
075e51d1
JB
97 */
98 private readonly starting: boolean
15b176e0
JB
99 /**
100 * Whether the pool is started or not.
101 */
102 private started: boolean
afe0d5bf
JB
103 /**
104 * The start timestamp of the pool.
105 */
106 private readonly startTimestamp
107
729c563d
S
108 /**
109 * Constructs a new poolifier pool.
110 *
38e795c1 111 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 112 * @param filePath - Path to the worker file.
38e795c1 113 * @param opts - Options for the pool.
729c563d 114 */
c97c7edb 115 public constructor (
b4213b7f
JB
116 protected readonly numberOfWorkers: number,
117 protected readonly filePath: string,
118 protected readonly opts: PoolOptions<Worker>
c97c7edb 119 ) {
78cea37e 120 if (!this.isMain()) {
04f45163 121 throw new Error(
8c6d4acf 122 'Cannot start a pool from a worker with the same type as the pool'
04f45163 123 )
c97c7edb 124 }
8d3782fa 125 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 126 this.checkFilePath(this.filePath)
7c0ba920 127 this.checkPoolOptions(this.opts)
1086026a 128
7254e419
JB
129 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
130 this.executeTask = this.executeTask.bind(this)
131 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 132
6bd72cd0 133 if (this.opts.enableEvents === true) {
7c0ba920
JB
134 this.emitter = new PoolEmitter()
135 }
d59df138
JB
136 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
137 Worker,
138 Data,
139 Response
da309861
JB
140 >(
141 this,
142 this.opts.workerChoiceStrategy,
143 this.opts.workerChoiceStrategyOptions
144 )
b6b32453
JB
145
146 this.setupHook()
147
075e51d1 148 this.starting = true
e761c033 149 this.startPool()
075e51d1 150 this.starting = false
15b176e0 151 this.started = true
afe0d5bf
JB
152
153 this.startTimestamp = performance.now()
c97c7edb
S
154 }
155
a35560ba 156 private checkFilePath (filePath: string): void {
ffcbbad8
JB
157 if (
158 filePath == null ||
3d6dd312 159 typeof filePath !== 'string' ||
ffcbbad8
JB
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
c510fea7
APA
162 throw new Error('Please specify a file with a worker implementation')
163 }
3d6dd312
JB
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
c510fea7
APA
167 }
168
8d3782fa
JB
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
78cea37e 174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 175 throw new TypeError(
0d80593b 176 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
177 )
178 } else if (numberOfWorkers < 0) {
473c717a 179 throw new RangeError(
8d3782fa
JB
180 'Cannot instantiate a pool with a negative number of workers'
181 )
6b27d407 182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 188 if (this.type === PoolTypes.dynamic) {
a5ed75b7 189 if (max == null) {
e695d66f 190 throw new TypeError(
a5ed75b7
JB
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
079de991
JB
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
b97d82d8 201 } else if (max === 0) {
079de991 202 throw new RangeError(
d640b48b 203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
8d3782fa
JB
210 }
211 }
212
7c0ba920 213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
214 if (isPlainObject(opts)) {
215 this.opts.workerChoiceStrategy =
216 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
217 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
218 this.opts.workerChoiceStrategyOptions = {
219 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
220 ...opts.workerChoiceStrategyOptions
221 }
49be33fe
JB
222 this.checkValidWorkerChoiceStrategyOptions(
223 this.opts.workerChoiceStrategyOptions
224 )
1f68cede 225 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
226 this.opts.enableEvents = opts.enableEvents ?? true
227 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
228 if (this.opts.enableTasksQueue) {
229 this.checkValidTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
233 opts.tasksQueueOptions as TasksQueueOptions
234 )
235 }
236 } else {
237 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 238 }
aee46736
JB
239 }
240
241 private checkValidWorkerChoiceStrategy (
242 workerChoiceStrategy: WorkerChoiceStrategy
243 ): void {
244 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 245 throw new Error(
aee46736 246 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
247 )
248 }
7c0ba920
JB
249 }
250
0d80593b
JB
251 private checkValidWorkerChoiceStrategyOptions (
252 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
253 ): void {
254 if (!isPlainObject(workerChoiceStrategyOptions)) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: must be a plain object'
257 )
258 }
8990357d 259 if (
8c0b113f
JB
260 workerChoiceStrategyOptions.retries != null &&
261 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
262 ) {
263 throw new TypeError(
8c0b113f 264 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
265 )
266 }
267 if (
8c0b113f
JB
268 workerChoiceStrategyOptions.retries != null &&
269 workerChoiceStrategyOptions.retries < 0
8990357d
JB
270 ) {
271 throw new RangeError(
8c0b113f 272 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
273 )
274 }
49be33fe
JB
275 if (
276 workerChoiceStrategyOptions.weights != null &&
6b27d407 277 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
278 ) {
279 throw new Error(
280 'Invalid worker choice strategy options: must have a weight for each worker node'
281 )
282 }
f0d7f803
JB
283 if (
284 workerChoiceStrategyOptions.measurement != null &&
285 !Object.values(Measurements).includes(
286 workerChoiceStrategyOptions.measurement
287 )
288 ) {
289 throw new Error(
290 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
291 )
292 }
0d80593b
JB
293 }
294
a20f0ba5 295 private checkValidTasksQueueOptions (
76d91ea0 296 tasksQueueOptions: TasksQueueOptions
a20f0ba5 297 ): void {
0d80593b
JB
298 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
299 throw new TypeError('Invalid tasks queue options: must be a plain object')
300 }
f0d7f803 301 if (
b7d085c4
JB
302 tasksQueueOptions?.concurrency != null &&
303 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
304 ) {
305 throw new TypeError(
20c6f652 306 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
307 )
308 }
309 if (
b7d085c4
JB
310 tasksQueueOptions?.concurrency != null &&
311 tasksQueueOptions?.concurrency <= 0
f0d7f803 312 ) {
e695d66f 313 throw new RangeError(
b7d085c4 314 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
315 )
316 }
b7d085c4 317 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 318 throw new Error(
68dbcdc0 319 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
320 )
321 }
322 if (
b7d085c4
JB
323 tasksQueueOptions?.size != null &&
324 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 325 ) {
ff3f866a 326 throw new TypeError(
68dbcdc0 327 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
328 )
329 }
b7d085c4 330 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 331 throw new RangeError(
b7d085c4 332 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
333 )
334 }
335 }
336
e761c033
JB
337 private startPool (): void {
338 while (
339 this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
342 0
343 ) < this.numberOfWorkers
344 ) {
aa9eede8 345 this.createAndSetupWorkerNode()
e761c033
JB
346 }
347 }
348
08f3f44c 349 /** @inheritDoc */
6b27d407
JB
350 public get info (): PoolInfo {
351 return {
23ccf9d7 352 version,
6b27d407 353 type: this.type,
184855e6 354 worker: this.worker,
2431bdb4
JB
355 ready: this.ready,
356 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
357 minSize: this.minSize,
358 maxSize: this.maxSize,
c05f0d50
JB
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate &&
1305e9a8
JB
361 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
363 workerNodes: this.workerNodes.length,
364 idleWorkerNodes: this.workerNodes.reduce(
365 (accumulator, workerNode) =>
f59e1027 366 workerNode.usage.tasks.executing === 0
a4e07f72
JB
367 ? accumulator + 1
368 : accumulator,
6b27d407
JB
369 0
370 ),
371 busyWorkerNodes: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
f59e1027 373 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
374 0
375 ),
a4e07f72 376 executedTasks: this.workerNodes.reduce(
6b27d407 377 (accumulator, workerNode) =>
f59e1027 378 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
379 0
380 ),
381 executingTasks: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
384 0
385 ),
daf86646
JB
386 ...(this.opts.enableTasksQueue === true && {
387 queuedTasks: this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + workerNode.usage.tasks.queued,
390 0
391 )
392 }),
393 ...(this.opts.enableTasksQueue === true && {
394 maxQueuedTasks: this.workerNodes.reduce(
395 (accumulator, workerNode) =>
396 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
397 0
398 )
399 }),
a1763c54
JB
400 ...(this.opts.enableTasksQueue === true && {
401 backPressure: this.hasBackPressure()
402 }),
68cbdc84
JB
403 ...(this.opts.enableTasksQueue === true && {
404 stolenTasks: this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + workerNode.usage.tasks.stolen,
407 0
408 )
409 }),
a4e07f72
JB
410 failedTasks: this.workerNodes.reduce(
411 (accumulator, workerNode) =>
f59e1027 412 accumulator + workerNode.usage.tasks.failed,
a4e07f72 413 0
1dcf8b7b
JB
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .runTime.aggregate && {
417 runTime: {
98e72cda 418 minimum: round(
90d6701c 419 min(
98e72cda 420 ...this.workerNodes.map(
8ebe6c30 421 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 422 )
1dcf8b7b
JB
423 )
424 ),
98e72cda 425 maximum: round(
90d6701c 426 max(
98e72cda 427 ...this.workerNodes.map(
8ebe6c30 428 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 429 )
1dcf8b7b 430 )
98e72cda 431 ),
3baa0837
JB
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.average && {
434 average: round(
435 average(
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.runTime.history),
439 []
440 )
98e72cda 441 )
dc021bcc 442 )
3baa0837 443 }),
98e72cda
JB
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .runTime.median && {
446 median: round(
447 median(
3baa0837
JB
448 this.workerNodes.reduce<number[]>(
449 (accumulator, workerNode) =>
450 accumulator.concat(workerNode.usage.runTime.history),
451 []
98e72cda
JB
452 )
453 )
454 )
455 })
1dcf8b7b
JB
456 }
457 }),
458 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
459 .waitTime.aggregate && {
460 waitTime: {
98e72cda 461 minimum: round(
90d6701c 462 min(
98e72cda 463 ...this.workerNodes.map(
8ebe6c30 464 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 465 )
1dcf8b7b
JB
466 )
467 ),
98e72cda 468 maximum: round(
90d6701c 469 max(
98e72cda 470 ...this.workerNodes.map(
8ebe6c30 471 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 472 )
1dcf8b7b 473 )
98e72cda 474 ),
3baa0837
JB
475 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
476 .waitTime.average && {
477 average: round(
478 average(
479 this.workerNodes.reduce<number[]>(
480 (accumulator, workerNode) =>
481 accumulator.concat(workerNode.usage.waitTime.history),
482 []
483 )
98e72cda 484 )
dc021bcc 485 )
3baa0837 486 }),
98e72cda
JB
487 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
488 .waitTime.median && {
489 median: round(
490 median(
3baa0837
JB
491 this.workerNodes.reduce<number[]>(
492 (accumulator, workerNode) =>
493 accumulator.concat(workerNode.usage.waitTime.history),
494 []
98e72cda
JB
495 )
496 )
497 )
498 })
1dcf8b7b
JB
499 }
500 })
6b27d407
JB
501 }
502 }
08f3f44c 503
aa9eede8
JB
504 /**
505 * The pool readiness boolean status.
506 */
2431bdb4
JB
507 private get ready (): boolean {
508 return (
b97d82d8
JB
509 this.workerNodes.reduce(
510 (accumulator, workerNode) =>
511 !workerNode.info.dynamic && workerNode.info.ready
512 ? accumulator + 1
513 : accumulator,
514 0
515 ) >= this.minSize
2431bdb4
JB
516 )
517 }
518
afe0d5bf 519 /**
aa9eede8 520 * The approximate pool utilization.
afe0d5bf
JB
521 *
522 * @returns The pool utilization.
523 */
524 private get utilization (): number {
8e5ca040 525 const poolTimeCapacity =
fe7d90db 526 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
527 const totalTasksRunTime = this.workerNodes.reduce(
528 (accumulator, workerNode) =>
71514351 529 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
530 0
531 )
532 const totalTasksWaitTime = this.workerNodes.reduce(
533 (accumulator, workerNode) =>
71514351 534 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
535 0
536 )
8e5ca040 537 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
538 }
539
8881ae32 540 /**
aa9eede8 541 * The pool type.
8881ae32
JB
542 *
543 * If it is `'dynamic'`, it provides the `max` property.
544 */
545 protected abstract get type (): PoolType
546
184855e6 547 /**
aa9eede8 548 * The worker type.
184855e6
JB
549 */
550 protected abstract get worker (): WorkerType
551
c2ade475 552 /**
aa9eede8 553 * The pool minimum size.
c2ade475 554 */
8735b4e5
JB
555 protected get minSize (): number {
556 return this.numberOfWorkers
557 }
ff733df7
JB
558
559 /**
aa9eede8 560 * The pool maximum size.
ff733df7 561 */
8735b4e5
JB
562 protected get maxSize (): number {
563 return this.max ?? this.numberOfWorkers
564 }
a35560ba 565
6b813701
JB
566 /**
567 * Checks if the worker id sent in the received message from a worker is valid.
568 *
569 * @param message - The received message.
570 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
571 */
21f710aa 572 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
573 if (message.workerId == null) {
574 throw new Error('Worker message received without worker id')
575 } else if (
21f710aa 576 message.workerId != null &&
aad6fb64 577 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
578 ) {
579 throw new Error(
580 `Worker message received from unknown worker '${message.workerId}'`
581 )
582 }
583 }
584
ffcbbad8 585 /**
f06e48d8 586 * Gets the given worker its worker node key.
ffcbbad8
JB
587 *
588 * @param worker - The worker.
f59e1027 589 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 590 */
aad6fb64 591 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 592 return this.workerNodes.findIndex(
8ebe6c30 593 (workerNode) => workerNode.worker === worker
f06e48d8 594 )
bf9549ae
JB
595 }
596
aa9eede8
JB
597 /**
598 * Gets the worker node key given its worker id.
599 *
600 * @param workerId - The worker id.
aad6fb64 601 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 602 */
aad6fb64
JB
603 private getWorkerNodeKeyByWorkerId (workerId: number): number {
604 return this.workerNodes.findIndex(
8ebe6c30 605 (workerNode) => workerNode.info.id === workerId
aad6fb64 606 )
aa9eede8
JB
607 }
608
afc003b2 609 /** @inheritDoc */
a35560ba 610 public setWorkerChoiceStrategy (
59219cbb
JB
611 workerChoiceStrategy: WorkerChoiceStrategy,
612 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 613 ): void {
aee46736 614 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 615 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
616 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
617 this.opts.workerChoiceStrategy
618 )
619 if (workerChoiceStrategyOptions != null) {
620 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
621 }
aa9eede8 622 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 623 workerNode.resetUsage()
9edb9717 624 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 625 }
a20f0ba5
JB
626 }
627
628 /** @inheritDoc */
629 public setWorkerChoiceStrategyOptions (
630 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
631 ): void {
0d80593b 632 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
633 this.opts.workerChoiceStrategyOptions = {
634 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
635 ...workerChoiceStrategyOptions
636 }
a20f0ba5
JB
637 this.workerChoiceStrategyContext.setOptions(
638 this.opts.workerChoiceStrategyOptions
a35560ba
S
639 )
640 }
641
a20f0ba5 642 /** @inheritDoc */
8f52842f
JB
643 public enableTasksQueue (
644 enable: boolean,
645 tasksQueueOptions?: TasksQueueOptions
646 ): void {
a20f0ba5 647 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 648 this.flushTasksQueues()
a20f0ba5
JB
649 }
650 this.opts.enableTasksQueue = enable
8f52842f 651 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
652 }
653
654 /** @inheritDoc */
8f52842f 655 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 656 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
657 this.checkValidTasksQueueOptions(tasksQueueOptions)
658 this.opts.tasksQueueOptions =
659 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 660 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 661 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
662 delete this.opts.tasksQueueOptions
663 }
664 }
665
5b49e864 666 private setTasksQueueSize (size: number): void {
20c6f652 667 for (const workerNode of this.workerNodes) {
ff3f866a 668 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
669 }
670 }
671
a20f0ba5
JB
672 private buildTasksQueueOptions (
673 tasksQueueOptions: TasksQueueOptions
674 ): TasksQueueOptions {
675 return {
20c6f652 676 ...{
ff3f866a 677 size: Math.pow(this.maxSize, 2),
20c6f652
JB
678 concurrency: 1
679 },
680 ...tasksQueueOptions
a20f0ba5
JB
681 }
682 }
683
c319c66b
JB
684 /**
685 * Whether the pool is full or not.
686 *
687 * The pool filling boolean status.
688 */
dea903a8
JB
689 protected get full (): boolean {
690 return this.workerNodes.length >= this.maxSize
691 }
c2ade475 692
c319c66b
JB
693 /**
694 * Whether the pool is busy or not.
695 *
696 * The pool busyness boolean status.
697 */
698 protected abstract get busy (): boolean
7c0ba920 699
6c6afb84 700 /**
3d76750a 701 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
702 *
703 * @returns Worker nodes busyness boolean status.
704 */
c2ade475 705 protected internalBusy (): boolean {
3d76750a
JB
706 if (this.opts.enableTasksQueue === true) {
707 return (
708 this.workerNodes.findIndex(
8ebe6c30 709 (workerNode) =>
3d76750a
JB
710 workerNode.info.ready &&
711 workerNode.usage.tasks.executing <
712 (this.opts.tasksQueueOptions?.concurrency as number)
713 ) === -1
714 )
715 } else {
716 return (
717 this.workerNodes.findIndex(
8ebe6c30 718 (workerNode) =>
3d76750a
JB
719 workerNode.info.ready && workerNode.usage.tasks.executing === 0
720 ) === -1
721 )
722 }
cb70b19d
JB
723 }
724
90d7d101
JB
725 /** @inheritDoc */
726 public listTaskFunctions (): string[] {
f2dbbf95
JB
727 for (const workerNode of this.workerNodes) {
728 if (
729 Array.isArray(workerNode.info.taskFunctions) &&
730 workerNode.info.taskFunctions.length > 0
731 ) {
732 return workerNode.info.taskFunctions
733 }
90d7d101 734 }
f2dbbf95 735 return []
90d7d101
JB
736 }
737
375f7504
JB
738 private shallExecuteTask (workerNodeKey: number): boolean {
739 return (
740 this.tasksQueueSize(workerNodeKey) === 0 &&
741 this.workerNodes[workerNodeKey].usage.tasks.executing <
742 (this.opts.tasksQueueOptions?.concurrency as number)
743 )
744 }
745
afc003b2 746 /** @inheritDoc */
7d91a8cd
JB
747 public async execute (
748 data?: Data,
749 name?: string,
750 transferList?: TransferListItem[]
751 ): Promise<Response> {
52b71763 752 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
753 if (!this.started) {
754 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 755 return
15b176e0 756 }
7d91a8cd
JB
757 if (name != null && typeof name !== 'string') {
758 reject(new TypeError('name argument must be a string'))
9d2d0da1 759 return
7d91a8cd 760 }
90d7d101
JB
761 if (
762 name != null &&
763 typeof name === 'string' &&
764 name.trim().length === 0
765 ) {
f58b60b9 766 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 767 return
90d7d101 768 }
b558f6b5
JB
769 if (transferList != null && !Array.isArray(transferList)) {
770 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 771 return
b558f6b5
JB
772 }
773 const timestamp = performance.now()
774 const workerNodeKey = this.chooseWorkerNode()
46b0bb09 775 const workerInfo = this.getWorkerInfo(workerNodeKey)
501aea93 776 const task: Task<Data> = {
52b71763
JB
777 name: name ?? DEFAULT_TASK_NAME,
778 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
779 data: data ?? ({} as Data),
7d91a8cd 780 transferList,
52b71763 781 timestamp,
a5d15204 782 workerId: workerInfo.id as number,
7629bdf1 783 taskId: randomUUID()
52b71763 784 }
7629bdf1 785 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
786 resolve,
787 reject,
501aea93 788 workerNodeKey
2e81254d 789 })
52b71763 790 if (
4e377863
JB
791 this.opts.enableTasksQueue === false ||
792 (this.opts.enableTasksQueue === true &&
375f7504 793 this.shallExecuteTask(workerNodeKey))
52b71763 794 ) {
501aea93 795 this.executeTask(workerNodeKey, task)
4e377863
JB
796 } else {
797 this.enqueueTask(workerNodeKey, task)
52b71763 798 }
2e81254d 799 })
280c2a77 800 }
c97c7edb 801
afc003b2 802 /** @inheritDoc */
c97c7edb 803 public async destroy (): Promise<void> {
1fbcaa7c 804 await Promise.all(
81c02522 805 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 806 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
807 })
808 )
33e6bb4c 809 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 810 this.started = false
c97c7edb
S
811 }
812
1e3214b6
JB
813 protected async sendKillMessageToWorker (
814 workerNodeKey: number,
815 workerId: number
816 ): Promise<void> {
9edb9717 817 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
818 this.registerWorkerMessageListener(workerNodeKey, (message) => {
819 if (message.kill === 'success') {
820 resolve()
821 } else if (message.kill === 'failure') {
e1af34e6 822 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
823 }
824 })
9edb9717 825 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 826 })
1e3214b6
JB
827 }
828
4a6952ff 829 /**
aa9eede8 830 * Terminates the worker node given its worker node key.
4a6952ff 831 *
aa9eede8 832 * @param workerNodeKey - The worker node key.
4a6952ff 833 */
81c02522 834 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 835
729c563d 836 /**
6677a3d3
JB
837 * Setup hook to execute code before worker nodes are created in the abstract constructor.
838 * Can be overridden.
afc003b2
JB
839 *
840 * @virtual
729c563d 841 */
280c2a77 842 protected setupHook (): void {
965df41c 843 /* Intentionally empty */
280c2a77 844 }
c97c7edb 845
729c563d 846 /**
280c2a77
S
847 * Should return whether the worker is the main worker or not.
848 */
849 protected abstract isMain (): boolean
850
851 /**
2e81254d 852 * Hook executed before the worker task execution.
bf9549ae 853 * Can be overridden.
729c563d 854 *
f06e48d8 855 * @param workerNodeKey - The worker node key.
1c6fe997 856 * @param task - The task to execute.
729c563d 857 */
1c6fe997
JB
858 protected beforeTaskExecutionHook (
859 workerNodeKey: number,
860 task: Task<Data>
861 ): void {
94407def
JB
862 if (this.workerNodes[workerNodeKey]?.usage != null) {
863 const workerUsage = this.workerNodes[workerNodeKey].usage
864 ++workerUsage.tasks.executing
865 this.updateWaitTimeWorkerUsage(workerUsage, task)
866 }
867 if (
868 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
869 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
870 task.name as string
871 ) != null
872 ) {
db0e38ee 873 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 874 workerNodeKey
db0e38ee 875 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
876 ++taskFunctionWorkerUsage.tasks.executing
877 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 878 }
c97c7edb
S
879 }
880
c01733f1 881 /**
2e81254d 882 * Hook executed after the worker task execution.
bf9549ae 883 * Can be overridden.
c01733f1 884 *
501aea93 885 * @param workerNodeKey - The worker node key.
38e795c1 886 * @param message - The received message.
c01733f1 887 */
2e81254d 888 protected afterTaskExecutionHook (
501aea93 889 workerNodeKey: number,
2740a743 890 message: MessageValue<Response>
bf9549ae 891 ): void {
94407def
JB
892 if (this.workerNodes[workerNodeKey]?.usage != null) {
893 const workerUsage = this.workerNodes[workerNodeKey].usage
894 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
895 this.updateRunTimeWorkerUsage(workerUsage, message)
896 this.updateEluWorkerUsage(workerUsage, message)
897 }
898 if (
899 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
900 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 901 message.taskPerformance?.name as string
94407def
JB
902 ) != null
903 ) {
db0e38ee 904 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 905 workerNodeKey
db0e38ee 906 ].getTaskFunctionWorkerUsage(
0628755c 907 message.taskPerformance?.name as string
b558f6b5 908 ) as WorkerUsage
db0e38ee
JB
909 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
910 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
911 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
912 }
913 }
914
db0e38ee
JB
915 /**
916 * Whether the worker node shall update its task function worker usage or not.
917 *
918 * @param workerNodeKey - The worker node key.
919 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
920 */
921 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 922 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 923 return (
94407def 924 workerInfo != null &&
a5d15204 925 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 926 workerInfo.taskFunctions.length > 2
b558f6b5 927 )
f1c06930
JB
928 }
929
930 private updateTaskStatisticsWorkerUsage (
931 workerUsage: WorkerUsage,
932 message: MessageValue<Response>
933 ): void {
a4e07f72 934 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
935 if (
936 workerTaskStatistics.executing != null &&
937 workerTaskStatistics.executing > 0
938 ) {
939 --workerTaskStatistics.executing
5bb5be17 940 }
98e72cda
JB
941 if (message.taskError == null) {
942 ++workerTaskStatistics.executed
943 } else {
a4e07f72 944 ++workerTaskStatistics.failed
2740a743 945 }
f8eb0a2a
JB
946 }
947
a4e07f72
JB
948 private updateRunTimeWorkerUsage (
949 workerUsage: WorkerUsage,
f8eb0a2a
JB
950 message: MessageValue<Response>
951 ): void {
dc021bcc
JB
952 if (message.taskError != null) {
953 return
954 }
e4f20deb
JB
955 updateMeasurementStatistics(
956 workerUsage.runTime,
957 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 958 message.taskPerformance?.runTime ?? 0
e4f20deb 959 )
f8eb0a2a
JB
960 }
961
a4e07f72
JB
962 private updateWaitTimeWorkerUsage (
963 workerUsage: WorkerUsage,
1c6fe997 964 task: Task<Data>
f8eb0a2a 965 ): void {
1c6fe997
JB
966 const timestamp = performance.now()
967 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
968 updateMeasurementStatistics(
969 workerUsage.waitTime,
970 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 971 taskWaitTime
e4f20deb 972 )
c01733f1 973 }
974
a4e07f72 975 private updateEluWorkerUsage (
5df69fab 976 workerUsage: WorkerUsage,
62c15a68
JB
977 message: MessageValue<Response>
978 ): void {
dc021bcc
JB
979 if (message.taskError != null) {
980 return
981 }
008512c7
JB
982 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
983 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
984 updateMeasurementStatistics(
985 workerUsage.elu.active,
008512c7 986 eluTaskStatisticsRequirements,
dc021bcc 987 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
988 )
989 updateMeasurementStatistics(
990 workerUsage.elu.idle,
008512c7 991 eluTaskStatisticsRequirements,
dc021bcc 992 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 993 )
008512c7 994 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 995 if (message.taskPerformance?.elu != null) {
f7510105
JB
996 if (workerUsage.elu.utilization != null) {
997 workerUsage.elu.utilization =
998 (workerUsage.elu.utilization +
999 message.taskPerformance.elu.utilization) /
1000 2
1001 } else {
1002 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1003 }
62c15a68
JB
1004 }
1005 }
1006 }
1007
280c2a77 1008 /**
f06e48d8 1009 * Chooses a worker node for the next task.
280c2a77 1010 *
6c6afb84 1011 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1012 *
aa9eede8 1013 * @returns The chosen worker node key
280c2a77 1014 */
6c6afb84 1015 private chooseWorkerNode (): number {
930dcf12 1016 if (this.shallCreateDynamicWorker()) {
aa9eede8 1017 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1018 if (
b1aae695 1019 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1020 ) {
aa9eede8 1021 return workerNodeKey
6c6afb84 1022 }
17393ac8 1023 }
930dcf12
JB
1024 return this.workerChoiceStrategyContext.execute()
1025 }
1026
6c6afb84
JB
1027 /**
1028 * Conditions for dynamic worker creation.
1029 *
1030 * @returns Whether to create a dynamic worker or not.
1031 */
1032 private shallCreateDynamicWorker (): boolean {
930dcf12 1033 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1034 }
1035
280c2a77 1036 /**
aa9eede8 1037 * Sends a message to worker given its worker node key.
280c2a77 1038 *
aa9eede8 1039 * @param workerNodeKey - The worker node key.
38e795c1 1040 * @param message - The message.
7d91a8cd 1041 * @param transferList - The optional array of transferable objects.
280c2a77
S
1042 */
1043 protected abstract sendToWorker (
aa9eede8 1044 workerNodeKey: number,
7d91a8cd
JB
1045 message: MessageValue<Data>,
1046 transferList?: TransferListItem[]
280c2a77
S
1047 ): void
1048
729c563d 1049 /**
41344292 1050 * Creates a new worker.
6c6afb84
JB
1051 *
1052 * @returns Newly created worker.
729c563d 1053 */
280c2a77 1054 protected abstract createWorker (): Worker
c97c7edb 1055
4a6952ff 1056 /**
aa9eede8 1057 * Creates a new, completely set up worker node.
4a6952ff 1058 *
aa9eede8 1059 * @returns New, completely set up worker node key.
4a6952ff 1060 */
aa9eede8 1061 protected createAndSetupWorkerNode (): number {
bdacc2d2 1062 const worker = this.createWorker()
280c2a77 1063
fd04474e 1064 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1065 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1066 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1067 worker.on('error', (error) => {
aad6fb64 1068 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1069 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1070 workerInfo.ready = false
0dc838e3 1071 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1072 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1073 if (
1074 this.opts.restartWorkerOnError === true &&
1075 !this.starting &&
1076 this.started
1077 ) {
9b106837 1078 if (workerInfo.dynamic) {
aa9eede8 1079 this.createAndSetupDynamicWorkerNode()
8a1260a3 1080 } else {
aa9eede8 1081 this.createAndSetupWorkerNode()
8a1260a3 1082 }
5baee0d7 1083 }
19dbc45b 1084 if (this.opts.enableTasksQueue === true) {
9b106837 1085 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1086 }
5baee0d7 1087 })
a35560ba 1088 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1089 worker.once('exit', () => {
f06e48d8 1090 this.removeWorkerNode(worker)
a974afa6 1091 })
280c2a77 1092
aa9eede8 1093 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1094
aa9eede8 1095 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1096
aa9eede8 1097 return workerNodeKey
c97c7edb 1098 }
be0676b3 1099
930dcf12 1100 /**
aa9eede8 1101 * Creates a new, completely set up dynamic worker node.
930dcf12 1102 *
aa9eede8 1103 * @returns New, completely set up dynamic worker node key.
930dcf12 1104 */
aa9eede8
JB
1105 protected createAndSetupDynamicWorkerNode (): number {
1106 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1107 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1108 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1109 message.workerId
aad6fb64 1110 )
aa9eede8 1111 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1112 // Kill message received from worker
930dcf12
JB
1113 if (
1114 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1115 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1116 ((this.opts.enableTasksQueue === false &&
aa9eede8 1117 workerUsage.tasks.executing === 0) ||
7b56f532 1118 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1119 workerUsage.tasks.executing === 0 &&
1120 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1121 ) {
5270d253
JB
1122 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1123 this.emitter?.emit(PoolEvents.error, error)
1124 })
930dcf12
JB
1125 }
1126 })
46b0bb09 1127 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1128 this.sendToWorker(workerNodeKey, {
b0a4db63 1129 checkActive: true,
21f710aa
JB
1130 workerId: workerInfo.id as number
1131 })
b5e113f6 1132 workerInfo.dynamic = true
b1aae695
JB
1133 if (
1134 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1135 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1136 ) {
b5e113f6
JB
1137 workerInfo.ready = true
1138 }
33e6bb4c 1139 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1140 return workerNodeKey
930dcf12
JB
1141 }
1142
a2ed5053 1143 /**
aa9eede8 1144 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1145 *
aa9eede8 1146 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1147 * @param listener - The message listener callback.
1148 */
85aeb3f3
JB
1149 protected abstract registerWorkerMessageListener<
1150 Message extends Data | Response
aa9eede8
JB
1151 >(
1152 workerNodeKey: number,
1153 listener: (message: MessageValue<Message>) => void
1154 ): void
a2ed5053
JB
1155
1156 /**
aa9eede8 1157 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1158 * Can be overridden.
1159 *
aa9eede8 1160 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1161 */
aa9eede8 1162 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1163 // Listen to worker messages.
aa9eede8 1164 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1165 // Send the startup message to worker.
aa9eede8 1166 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1167 // Send the statistics message to worker.
1168 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1169 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1170 this.workerNodes[workerNodeKey].onEmptyQueue =
1171 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1172 this.workerNodes[workerNodeKey].onBackPressure =
1173 this.tasksStealingOnBackPressure.bind(this)
1174 }
d2c73f82
JB
1175 }
1176
85aeb3f3 1177 /**
aa9eede8
JB
1178 * Sends the startup message to worker given its worker node key.
1179 *
1180 * @param workerNodeKey - The worker node key.
1181 */
1182 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1183
1184 /**
9edb9717 1185 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1186 *
aa9eede8 1187 * @param workerNodeKey - The worker node key.
85aeb3f3 1188 */
9edb9717 1189 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1190 this.sendToWorker(workerNodeKey, {
1191 statistics: {
1192 runTime:
1193 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1194 .runTime.aggregate,
1195 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1196 .elu.aggregate
1197 },
46b0bb09 1198 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1199 })
1200 }
a2ed5053
JB
1201
1202 private redistributeQueuedTasks (workerNodeKey: number): void {
1203 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1204 const destinationWorkerNodeKey = this.workerNodes.reduce(
1205 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1206 return workerNode.info.ready &&
1207 workerNode.usage.tasks.queued <
1208 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1209 ? workerNodeKey
1210 : minWorkerNodeKey
1211 },
1212 0
1213 )
3f690f25
JB
1214 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1215 const task = {
1216 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1217 workerId: destinationWorkerNode.info.id as number
1218 }
1219 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1220 this.executeTask(destinationWorkerNodeKey, task)
1221 } else {
1222 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1223 }
1224 }
1225 }
1226
b1838604
JB
1227 private updateTaskStolenStatisticsWorkerUsage (
1228 workerNodeKey: number,
b1838604
JB
1229 taskName: string
1230 ): void {
1a880eca 1231 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1232 if (workerNode?.usage != null) {
1233 ++workerNode.usage.tasks.stolen
1234 }
1235 if (
1236 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1237 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1238 ) {
1239 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1240 taskName
1241 ) as WorkerUsage
1242 ++taskFunctionWorkerUsage.tasks.stolen
1243 }
1244 }
1245
dd951876 1246 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1247 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1248 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1249 const workerNodes = this.workerNodes
a6b3272b 1250 .slice()
dd951876
JB
1251 .sort(
1252 (workerNodeA, workerNodeB) =>
1253 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1254 )
f201a0cd
JB
1255 const sourceWorkerNode = workerNodes.find(
1256 (workerNode) =>
1257 workerNode.info.ready &&
1258 workerNode.info.id !== workerId &&
1259 workerNode.usage.tasks.queued > 0
1260 )
1261 if (sourceWorkerNode != null) {
1262 const task = {
1263 ...(sourceWorkerNode.popTask() as Task<Data>),
1264 workerId: destinationWorkerNode.info.id as number
0bc68267 1265 }
f201a0cd
JB
1266 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1267 this.executeTask(destinationWorkerNodeKey, task)
1268 } else {
1269 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1270 }
f201a0cd
JB
1271 this.updateTaskStolenStatisticsWorkerUsage(
1272 destinationWorkerNodeKey,
1273 task.name as string
1274 )
72695f86
JB
1275 }
1276 }
1277
1278 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1279 const sizeOffset = 1
1280 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1281 return
1282 }
72695f86
JB
1283 const sourceWorkerNode =
1284 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1285 const workerNodes = this.workerNodes
a6b3272b 1286 .slice()
72695f86
JB
1287 .sort(
1288 (workerNodeA, workerNodeB) =>
1289 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1290 )
1291 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1292 if (
0bc68267 1293 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1294 workerNode.info.ready &&
1295 workerNode.info.id !== workerId &&
0bc68267 1296 workerNode.usage.tasks.queued <
f778c355 1297 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1298 ) {
dd951876
JB
1299 const task = {
1300 ...(sourceWorkerNode.popTask() as Task<Data>),
1301 workerId: workerNode.info.id as number
1302 }
375f7504 1303 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1304 this.executeTask(workerNodeKey, task)
4de3d785 1305 } else {
dd951876 1306 this.enqueueTask(workerNodeKey, task)
4de3d785 1307 }
b1838604
JB
1308 this.updateTaskStolenStatisticsWorkerUsage(
1309 workerNodeKey,
b1838604
JB
1310 task.name as string
1311 )
10ecf8fd 1312 }
a2ed5053
JB
1313 }
1314 }
1315
be0676b3 1316 /**
aa9eede8 1317 * This method is the listener registered for each worker message.
be0676b3 1318 *
bdacc2d2 1319 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1320 */
1321 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1322 return (message) => {
21f710aa 1323 this.checkMessageWorkerId(message)
a5d15204 1324 if (message.ready != null && message.taskFunctions != null) {
81c02522 1325 // Worker ready response received from worker
10e2aa7e 1326 this.handleWorkerReadyResponse(message)
7629bdf1 1327 } else if (message.taskId != null) {
81c02522 1328 // Task execution response received from worker
6b272951 1329 this.handleTaskExecutionResponse(message)
90d7d101
JB
1330 } else if (message.taskFunctions != null) {
1331 // Task functions message received from worker
46b0bb09
JB
1332 this.getWorkerInfo(
1333 this.getWorkerNodeKeyByWorkerId(message.workerId)
b558f6b5 1334 ).taskFunctions = message.taskFunctions
6b272951
JB
1335 }
1336 }
1337 }
1338
10e2aa7e 1339 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1340 if (message.ready === false) {
1341 throw new Error(`Worker ${message.workerId} failed to initialize`)
1342 }
a5d15204 1343 const workerInfo = this.getWorkerInfo(
aad6fb64 1344 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1345 )
a5d15204
JB
1346 workerInfo.ready = message.ready as boolean
1347 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1348 if (this.emitter != null && this.ready) {
1349 this.emitter.emit(PoolEvents.ready, this.info)
1350 }
6b272951
JB
1351 }
1352
1353 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1354 const { taskId, taskError, data } = message
1355 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1356 if (promiseResponse != null) {
5441aea6
JB
1357 if (taskError != null) {
1358 this.emitter?.emit(PoolEvents.taskError, taskError)
1359 promiseResponse.reject(taskError.message)
6b272951 1360 } else {
5441aea6 1361 promiseResponse.resolve(data as Response)
6b272951 1362 }
501aea93
JB
1363 const workerNodeKey = promiseResponse.workerNodeKey
1364 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1365 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1366 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1367 if (
1368 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1369 this.tasksQueueSize(workerNodeKey) > 0 &&
1370 this.workerNodes[workerNodeKey].usage.tasks.executing <
1371 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1372 ) {
1373 this.executeTask(
1374 workerNodeKey,
1375 this.dequeueTask(workerNodeKey) as Task<Data>
1376 )
be0676b3
APA
1377 }
1378 }
be0676b3 1379 }
7c0ba920 1380
a1763c54 1381 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1382 if (this.busy) {
1383 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1384 }
1385 }
1386
1387 private checkAndEmitTaskQueuingEvents (): void {
1388 if (this.hasBackPressure()) {
1389 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1390 }
1391 }
1392
33e6bb4c
JB
1393 private checkAndEmitDynamicWorkerCreationEvents (): void {
1394 if (this.type === PoolTypes.dynamic) {
1395 if (this.full) {
1396 this.emitter?.emit(PoolEvents.full, this.info)
1397 }
1398 }
1399 }
1400
8a1260a3 1401 /**
aa9eede8 1402 * Gets the worker information given its worker node key.
8a1260a3
JB
1403 *
1404 * @param workerNodeKey - The worker node key.
3f09ed9f 1405 * @returns The worker information.
8a1260a3 1406 */
46b0bb09
JB
1407 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1408 return this.workerNodes[workerNodeKey].info
e221309a
JB
1409 }
1410
a05c10de 1411 /**
b0a4db63 1412 * Adds the given worker in the pool worker nodes.
ea7a90d3 1413 *
38e795c1 1414 * @param worker - The worker.
aa9eede8
JB
1415 * @returns The added worker node key.
1416 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1417 */
b0a4db63 1418 private addWorkerNode (worker: Worker): number {
671d5154
JB
1419 const workerNode = new WorkerNode<Worker, Data>(
1420 worker,
ff3f866a 1421 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1422 )
b97d82d8 1423 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1424 if (this.starting) {
1425 workerNode.info.ready = true
1426 }
aa9eede8 1427 this.workerNodes.push(workerNode)
aad6fb64 1428 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1429 if (workerNodeKey === -1) {
86ed0598 1430 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1431 }
1432 return workerNodeKey
ea7a90d3 1433 }
c923ce56 1434
51fe3d3c 1435 /**
f06e48d8 1436 * Removes the given worker from the pool worker nodes.
51fe3d3c 1437 *
f06e48d8 1438 * @param worker - The worker.
51fe3d3c 1439 */
416fd65c 1440 private removeWorkerNode (worker: Worker): void {
aad6fb64 1441 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1442 if (workerNodeKey !== -1) {
1443 this.workerNodes.splice(workerNodeKey, 1)
1444 this.workerChoiceStrategyContext.remove(workerNodeKey)
1445 }
51fe3d3c 1446 }
adc3c320 1447
e2b31e32
JB
1448 /** @inheritDoc */
1449 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1450 return (
e2b31e32
JB
1451 this.opts.enableTasksQueue === true &&
1452 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1453 )
1454 }
1455
1456 private hasBackPressure (): boolean {
1457 return (
1458 this.opts.enableTasksQueue === true &&
1459 this.workerNodes.findIndex(
1460 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1461 ) === -1
9e844245 1462 )
e2b31e32
JB
1463 }
1464
b0a4db63 1465 /**
aa9eede8 1466 * Executes the given task on the worker given its worker node key.
b0a4db63 1467 *
aa9eede8 1468 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1469 * @param task - The task to execute.
1470 */
2e81254d 1471 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1472 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1473 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1474 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1475 }
1476
f9f00b5f 1477 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1478 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1479 this.checkAndEmitTaskQueuingEvents()
1480 return tasksQueueSize
adc3c320
JB
1481 }
1482
416fd65c 1483 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1484 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1485 }
1486
416fd65c 1487 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1488 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1489 }
1490
81c02522 1491 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1492 while (this.tasksQueueSize(workerNodeKey) > 0) {
1493 this.executeTask(
1494 workerNodeKey,
1495 this.dequeueTask(workerNodeKey) as Task<Data>
1496 )
ff733df7 1497 }
4b628b48 1498 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1499 }
1500
ef41a6e6
JB
1501 private flushTasksQueues (): void {
1502 for (const [workerNodeKey] of this.workerNodes.entries()) {
1503 this.flushTasksQueue(workerNodeKey)
1504 }
1505 }
c97c7edb 1506}