Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
afe0d5bf 17 median,
e4f20deb
JB
18 round,
19 updateMeasurementStatistics
bbeadd16 20} from '../utils'
59317253 21import { KillBehaviors } from '../worker/worker-options'
c4855468 22import {
65d7a1c9 23 type IPool,
7c5a1080 24 PoolEmitter,
c4855468 25 PoolEvents,
6b27d407 26 type PoolInfo,
c4855468 27 type PoolOptions,
6b27d407
JB
28 type PoolType,
29 PoolTypes,
4b628b48 30 type TasksQueueOptions
c4855468 31} from './pool'
bbfa38a2
JB
32import type {
33 IWorker,
34 IWorkerNode,
35 WorkerInfo,
36 WorkerType,
37 WorkerUsage
e102732c 38} from './worker'
a35560ba 39import {
008512c7 40 type MeasurementStatisticsRequirements,
f0d7f803 41 Measurements,
a35560ba 42 WorkerChoiceStrategies,
a20f0ba5
JB
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
bdaf31cd
JB
45} from './selection-strategies/selection-strategies-types'
46import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 47import { version } from './version'
4b628b48 48import { WorkerNode } from './worker-node'
23ccf9d7 49
729c563d 50/**
ea7a90d3 51 * Base class that implements some shared logic for all poolifier pools.
729c563d 52 *
38e795c1 53 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 56 */
c97c7edb 57export abstract class AbstractPool<
f06e48d8 58 Worker extends IWorker,
d3c8a1a8
S
59 Data = unknown,
60 Response = unknown
c4855468 61> implements IPool<Worker, Data, Response> {
afc003b2 62 /** @inheritDoc */
4b628b48 63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 64
afc003b2 65 /** @inheritDoc */
7c0ba920
JB
66 public readonly emitter?: PoolEmitter
67
be0676b3 68 /**
52b71763 69 * The task execution response promise map.
be0676b3 70 *
2740a743 71 * - `key`: The message id of each submitted task.
a3445496 72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 73 *
a3445496 74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 75 */
501aea93
JB
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
8735b4e5
JB
88 /**
89 * Dynamic pool maximum size property placeholder.
90 */
91 protected readonly max?: number
92
075e51d1 93 /**
adc9cc64 94 * Whether the pool is starting or not.
075e51d1
JB
95 */
96 private readonly starting: boolean
15b176e0
JB
97 /**
98 * Whether the pool is started or not.
99 */
100 private started: boolean
afe0d5bf
JB
101 /**
102 * The start timestamp of the pool.
103 */
104 private readonly startTimestamp
105
729c563d
S
106 /**
107 * Constructs a new poolifier pool.
108 *
38e795c1 109 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 110 * @param filePath - Path to the worker file.
38e795c1 111 * @param opts - Options for the pool.
729c563d 112 */
c97c7edb 113 public constructor (
b4213b7f
JB
114 protected readonly numberOfWorkers: number,
115 protected readonly filePath: string,
116 protected readonly opts: PoolOptions<Worker>
c97c7edb 117 ) {
78cea37e 118 if (!this.isMain()) {
04f45163 119 throw new Error(
8c6d4acf 120 'Cannot start a pool from a worker with the same type as the pool'
04f45163 121 )
c97c7edb 122 }
8d3782fa 123 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 124 this.checkFilePath(this.filePath)
7c0ba920 125 this.checkPoolOptions(this.opts)
1086026a 126
7254e419
JB
127 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
128 this.executeTask = this.executeTask.bind(this)
129 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 130
6bd72cd0 131 if (this.opts.enableEvents === true) {
7c0ba920
JB
132 this.emitter = new PoolEmitter()
133 }
d59df138
JB
134 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
135 Worker,
136 Data,
137 Response
da309861
JB
138 >(
139 this,
140 this.opts.workerChoiceStrategy,
141 this.opts.workerChoiceStrategyOptions
142 )
b6b32453
JB
143
144 this.setupHook()
145
075e51d1 146 this.starting = true
e761c033 147 this.startPool()
075e51d1 148 this.starting = false
15b176e0 149 this.started = true
afe0d5bf
JB
150
151 this.startTimestamp = performance.now()
c97c7edb
S
152 }
153
a35560ba 154 private checkFilePath (filePath: string): void {
ffcbbad8
JB
155 if (
156 filePath == null ||
3d6dd312 157 typeof filePath !== 'string' ||
ffcbbad8
JB
158 (typeof filePath === 'string' && filePath.trim().length === 0)
159 ) {
c510fea7
APA
160 throw new Error('Please specify a file with a worker implementation')
161 }
3d6dd312
JB
162 if (!existsSync(filePath)) {
163 throw new Error(`Cannot find the worker file '${filePath}'`)
164 }
c510fea7
APA
165 }
166
8d3782fa
JB
167 private checkNumberOfWorkers (numberOfWorkers: number): void {
168 if (numberOfWorkers == null) {
169 throw new Error(
170 'Cannot instantiate a pool without specifying the number of workers'
171 )
78cea37e 172 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 173 throw new TypeError(
0d80593b 174 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
175 )
176 } else if (numberOfWorkers < 0) {
473c717a 177 throw new RangeError(
8d3782fa
JB
178 'Cannot instantiate a pool with a negative number of workers'
179 )
6b27d407 180 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
181 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
182 }
183 }
184
185 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 186 if (this.type === PoolTypes.dynamic) {
a5ed75b7 187 if (max == null) {
e695d66f 188 throw new TypeError(
a5ed75b7
JB
189 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
190 )
191 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
192 throw new TypeError(
193 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
194 )
195 } else if (min > max) {
079de991
JB
196 throw new RangeError(
197 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
198 )
b97d82d8 199 } else if (max === 0) {
079de991 200 throw new RangeError(
d640b48b 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
202 )
203 } else if (min === max) {
204 throw new RangeError(
205 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
206 )
207 }
8d3782fa
JB
208 }
209 }
210
7c0ba920 211 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
212 if (isPlainObject(opts)) {
213 this.opts.workerChoiceStrategy =
214 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
215 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
216 this.opts.workerChoiceStrategyOptions = {
217 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
218 ...opts.workerChoiceStrategyOptions
219 }
49be33fe
JB
220 this.checkValidWorkerChoiceStrategyOptions(
221 this.opts.workerChoiceStrategyOptions
222 )
1f68cede 223 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
224 this.opts.enableEvents = opts.enableEvents ?? true
225 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
226 if (this.opts.enableTasksQueue) {
227 this.checkValidTasksQueueOptions(
228 opts.tasksQueueOptions as TasksQueueOptions
229 )
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 236 }
aee46736
JB
237 }
238
239 private checkValidWorkerChoiceStrategy (
240 workerChoiceStrategy: WorkerChoiceStrategy
241 ): void {
242 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 243 throw new Error(
aee46736 244 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
245 )
246 }
7c0ba920
JB
247 }
248
0d80593b
JB
249 private checkValidWorkerChoiceStrategyOptions (
250 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
251 ): void {
252 if (!isPlainObject(workerChoiceStrategyOptions)) {
253 throw new TypeError(
254 'Invalid worker choice strategy options: must be a plain object'
255 )
256 }
8990357d 257 if (
8c0b113f
JB
258 workerChoiceStrategyOptions.retries != null &&
259 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
260 ) {
261 throw new TypeError(
8c0b113f 262 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
263 )
264 }
265 if (
8c0b113f
JB
266 workerChoiceStrategyOptions.retries != null &&
267 workerChoiceStrategyOptions.retries < 0
8990357d
JB
268 ) {
269 throw new RangeError(
8c0b113f 270 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
271 )
272 }
49be33fe
JB
273 if (
274 workerChoiceStrategyOptions.weights != null &&
6b27d407 275 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
276 ) {
277 throw new Error(
278 'Invalid worker choice strategy options: must have a weight for each worker node'
279 )
280 }
f0d7f803
JB
281 if (
282 workerChoiceStrategyOptions.measurement != null &&
283 !Object.values(Measurements).includes(
284 workerChoiceStrategyOptions.measurement
285 )
286 ) {
287 throw new Error(
288 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
289 )
290 }
0d80593b
JB
291 }
292
a20f0ba5 293 private checkValidTasksQueueOptions (
76d91ea0 294 tasksQueueOptions: TasksQueueOptions
a20f0ba5 295 ): void {
0d80593b
JB
296 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
297 throw new TypeError('Invalid tasks queue options: must be a plain object')
298 }
f0d7f803
JB
299 if (
300 tasksQueueOptions?.concurrency != null &&
301 !Number.isSafeInteger(tasksQueueOptions.concurrency)
302 ) {
303 throw new TypeError(
20c6f652 304 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
305 )
306 }
307 if (
308 tasksQueueOptions?.concurrency != null &&
309 tasksQueueOptions.concurrency <= 0
310 ) {
e695d66f 311 throw new RangeError(
20c6f652
JB
312 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
313 )
314 }
68dbcdc0 315 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 316 throw new Error(
68dbcdc0 317 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
318 )
319 }
320 if (
ff3f866a
JB
321 tasksQueueOptions?.size != null &&
322 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 323 ) {
ff3f866a 324 throw new TypeError(
68dbcdc0 325 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
326 )
327 }
328 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 329 throw new RangeError(
68dbcdc0 330 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
331 )
332 }
333 }
334
e761c033
JB
335 private startPool (): void {
336 while (
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
340 0
341 ) < this.numberOfWorkers
342 ) {
aa9eede8 343 this.createAndSetupWorkerNode()
e761c033
JB
344 }
345 }
346
08f3f44c 347 /** @inheritDoc */
6b27d407
JB
348 public get info (): PoolInfo {
349 return {
23ccf9d7 350 version,
6b27d407 351 type: this.type,
184855e6 352 worker: this.worker,
2431bdb4
JB
353 ready: this.ready,
354 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
355 minSize: this.minSize,
356 maxSize: this.maxSize,
c05f0d50
JB
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.aggregate &&
1305e9a8
JB
359 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
361 workerNodes: this.workerNodes.length,
362 idleWorkerNodes: this.workerNodes.reduce(
363 (accumulator, workerNode) =>
f59e1027 364 workerNode.usage.tasks.executing === 0
a4e07f72
JB
365 ? accumulator + 1
366 : accumulator,
6b27d407
JB
367 0
368 ),
369 busyWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
f59e1027 371 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
372 0
373 ),
a4e07f72 374 executedTasks: this.workerNodes.reduce(
6b27d407 375 (accumulator, workerNode) =>
f59e1027 376 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
377 0
378 ),
379 executingTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
382 0
383 ),
daf86646
JB
384 ...(this.opts.enableTasksQueue === true && {
385 queuedTasks: this.workerNodes.reduce(
386 (accumulator, workerNode) =>
387 accumulator + workerNode.usage.tasks.queued,
388 0
389 )
390 }),
391 ...(this.opts.enableTasksQueue === true && {
392 maxQueuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
395 0
396 )
397 }),
a1763c54
JB
398 ...(this.opts.enableTasksQueue === true && {
399 backPressure: this.hasBackPressure()
400 }),
68cbdc84
JB
401 ...(this.opts.enableTasksQueue === true && {
402 stolenTasks: this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + workerNode.usage.tasks.stolen,
405 0
406 )
407 }),
a4e07f72
JB
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
f59e1027 410 accumulator + workerNode.usage.tasks.failed,
a4e07f72 411 0
1dcf8b7b
JB
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
98e72cda
JB
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
8ebe6c30 419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 420 )
1dcf8b7b
JB
421 )
422 ),
98e72cda
JB
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
8ebe6c30 426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 427 )
1dcf8b7b 428 )
98e72cda 429 ),
3baa0837
JB
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
98e72cda 439 )
dc021bcc 440 )
3baa0837 441 }),
98e72cda
JB
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.median && {
444 median: round(
445 median(
3baa0837
JB
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
98e72cda
JB
450 )
451 )
452 )
453 })
1dcf8b7b
JB
454 }
455 }),
456 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
457 .waitTime.aggregate && {
458 waitTime: {
98e72cda
JB
459 minimum: round(
460 Math.min(
461 ...this.workerNodes.map(
8ebe6c30 462 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 463 )
1dcf8b7b
JB
464 )
465 ),
98e72cda
JB
466 maximum: round(
467 Math.max(
468 ...this.workerNodes.map(
8ebe6c30 469 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 470 )
1dcf8b7b 471 )
98e72cda 472 ),
3baa0837
JB
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.average && {
475 average: round(
476 average(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
98e72cda 482 )
dc021bcc 483 )
3baa0837 484 }),
98e72cda
JB
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.median && {
487 median: round(
488 median(
3baa0837
JB
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
98e72cda
JB
493 )
494 )
495 )
496 })
1dcf8b7b
JB
497 }
498 })
6b27d407
JB
499 }
500 }
08f3f44c 501
aa9eede8
JB
502 /**
503 * The pool readiness boolean status.
504 */
2431bdb4
JB
505 private get ready (): boolean {
506 return (
b97d82d8
JB
507 this.workerNodes.reduce(
508 (accumulator, workerNode) =>
509 !workerNode.info.dynamic && workerNode.info.ready
510 ? accumulator + 1
511 : accumulator,
512 0
513 ) >= this.minSize
2431bdb4
JB
514 )
515 }
516
afe0d5bf 517 /**
aa9eede8 518 * The approximate pool utilization.
afe0d5bf
JB
519 *
520 * @returns The pool utilization.
521 */
522 private get utilization (): number {
8e5ca040 523 const poolTimeCapacity =
fe7d90db 524 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
525 const totalTasksRunTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
71514351 527 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
528 0
529 )
530 const totalTasksWaitTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
8e5ca040 535 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
536 }
537
8881ae32 538 /**
aa9eede8 539 * The pool type.
8881ae32
JB
540 *
541 * If it is `'dynamic'`, it provides the `max` property.
542 */
543 protected abstract get type (): PoolType
544
184855e6 545 /**
aa9eede8 546 * The worker type.
184855e6
JB
547 */
548 protected abstract get worker (): WorkerType
549
c2ade475 550 /**
aa9eede8 551 * The pool minimum size.
c2ade475 552 */
8735b4e5
JB
553 protected get minSize (): number {
554 return this.numberOfWorkers
555 }
ff733df7
JB
556
557 /**
aa9eede8 558 * The pool maximum size.
ff733df7 559 */
8735b4e5
JB
560 protected get maxSize (): number {
561 return this.max ?? this.numberOfWorkers
562 }
a35560ba 563
6b813701
JB
564 /**
565 * Checks if the worker id sent in the received message from a worker is valid.
566 *
567 * @param message - The received message.
568 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
569 */
21f710aa 570 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
571 if (message.workerId == null) {
572 throw new Error('Worker message received without worker id')
573 } else if (
21f710aa 574 message.workerId != null &&
aad6fb64 575 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
576 ) {
577 throw new Error(
578 `Worker message received from unknown worker '${message.workerId}'`
579 )
580 }
581 }
582
ffcbbad8 583 /**
f06e48d8 584 * Gets the given worker its worker node key.
ffcbbad8
JB
585 *
586 * @param worker - The worker.
f59e1027 587 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 588 */
aad6fb64 589 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 590 return this.workerNodes.findIndex(
8ebe6c30 591 (workerNode) => workerNode.worker === worker
f06e48d8 592 )
bf9549ae
JB
593 }
594
aa9eede8
JB
595 /**
596 * Gets the worker node key given its worker id.
597 *
598 * @param workerId - The worker id.
aad6fb64 599 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 600 */
aad6fb64
JB
601 private getWorkerNodeKeyByWorkerId (workerId: number): number {
602 return this.workerNodes.findIndex(
8ebe6c30 603 (workerNode) => workerNode.info.id === workerId
aad6fb64 604 )
aa9eede8
JB
605 }
606
afc003b2 607 /** @inheritDoc */
a35560ba 608 public setWorkerChoiceStrategy (
59219cbb
JB
609 workerChoiceStrategy: WorkerChoiceStrategy,
610 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 611 ): void {
aee46736 612 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 613 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
614 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
615 this.opts.workerChoiceStrategy
616 )
617 if (workerChoiceStrategyOptions != null) {
618 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 }
aa9eede8 620 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 621 workerNode.resetUsage()
9edb9717 622 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 623 }
a20f0ba5
JB
624 }
625
626 /** @inheritDoc */
627 public setWorkerChoiceStrategyOptions (
628 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
629 ): void {
0d80593b 630 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
631 this.opts.workerChoiceStrategyOptions = {
632 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
633 ...workerChoiceStrategyOptions
634 }
a20f0ba5
JB
635 this.workerChoiceStrategyContext.setOptions(
636 this.opts.workerChoiceStrategyOptions
a35560ba
S
637 )
638 }
639
a20f0ba5 640 /** @inheritDoc */
8f52842f
JB
641 public enableTasksQueue (
642 enable: boolean,
643 tasksQueueOptions?: TasksQueueOptions
644 ): void {
a20f0ba5 645 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 646 this.flushTasksQueues()
a20f0ba5
JB
647 }
648 this.opts.enableTasksQueue = enable
8f52842f 649 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
650 }
651
652 /** @inheritDoc */
8f52842f 653 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 654 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
655 this.checkValidTasksQueueOptions(tasksQueueOptions)
656 this.opts.tasksQueueOptions =
657 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 658 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 659 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
660 delete this.opts.tasksQueueOptions
661 }
662 }
663
5b49e864 664 private setTasksQueueSize (size: number): void {
20c6f652 665 for (const workerNode of this.workerNodes) {
ff3f866a 666 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
667 }
668 }
669
a20f0ba5
JB
670 private buildTasksQueueOptions (
671 tasksQueueOptions: TasksQueueOptions
672 ): TasksQueueOptions {
673 return {
20c6f652 674 ...{
ff3f866a 675 size: Math.pow(this.maxSize, 2),
20c6f652
JB
676 concurrency: 1
677 },
678 ...tasksQueueOptions
a20f0ba5
JB
679 }
680 }
681
c319c66b
JB
682 /**
683 * Whether the pool is full or not.
684 *
685 * The pool filling boolean status.
686 */
dea903a8
JB
687 protected get full (): boolean {
688 return this.workerNodes.length >= this.maxSize
689 }
c2ade475 690
c319c66b
JB
691 /**
692 * Whether the pool is busy or not.
693 *
694 * The pool busyness boolean status.
695 */
696 protected abstract get busy (): boolean
7c0ba920 697
6c6afb84 698 /**
3d76750a 699 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
700 *
701 * @returns Worker nodes busyness boolean status.
702 */
c2ade475 703 protected internalBusy (): boolean {
3d76750a
JB
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes.findIndex(
8ebe6c30 707 (workerNode) =>
3d76750a
JB
708 workerNode.info.ready &&
709 workerNode.usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number)
711 ) === -1
712 )
713 } else {
714 return (
715 this.workerNodes.findIndex(
8ebe6c30 716 (workerNode) =>
3d76750a
JB
717 workerNode.info.ready && workerNode.usage.tasks.executing === 0
718 ) === -1
719 )
720 }
cb70b19d
JB
721 }
722
90d7d101
JB
723 /** @inheritDoc */
724 public listTaskFunctions (): string[] {
f2dbbf95
JB
725 for (const workerNode of this.workerNodes) {
726 if (
727 Array.isArray(workerNode.info.taskFunctions) &&
728 workerNode.info.taskFunctions.length > 0
729 ) {
730 return workerNode.info.taskFunctions
731 }
90d7d101 732 }
f2dbbf95 733 return []
90d7d101
JB
734 }
735
375f7504
JB
736 private shallExecuteTask (workerNodeKey: number): boolean {
737 return (
738 this.tasksQueueSize(workerNodeKey) === 0 &&
739 this.workerNodes[workerNodeKey].usage.tasks.executing <
740 (this.opts.tasksQueueOptions?.concurrency as number)
741 )
742 }
743
afc003b2 744 /** @inheritDoc */
7d91a8cd
JB
745 public async execute (
746 data?: Data,
747 name?: string,
748 transferList?: TransferListItem[]
749 ): Promise<Response> {
52b71763 750 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
751 if (!this.started) {
752 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 753 return
15b176e0 754 }
7d91a8cd
JB
755 if (name != null && typeof name !== 'string') {
756 reject(new TypeError('name argument must be a string'))
9d2d0da1 757 return
7d91a8cd 758 }
90d7d101
JB
759 if (
760 name != null &&
761 typeof name === 'string' &&
762 name.trim().length === 0
763 ) {
f58b60b9 764 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 765 return
90d7d101 766 }
b558f6b5
JB
767 if (transferList != null && !Array.isArray(transferList)) {
768 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 769 return
b558f6b5
JB
770 }
771 const timestamp = performance.now()
772 const workerNodeKey = this.chooseWorkerNode()
94407def 773 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
501aea93 774 const task: Task<Data> = {
52b71763
JB
775 name: name ?? DEFAULT_TASK_NAME,
776 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
777 data: data ?? ({} as Data),
7d91a8cd 778 transferList,
52b71763 779 timestamp,
a5d15204 780 workerId: workerInfo.id as number,
7629bdf1 781 taskId: randomUUID()
52b71763 782 }
7629bdf1 783 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
784 resolve,
785 reject,
501aea93 786 workerNodeKey
2e81254d 787 })
52b71763 788 if (
4e377863
JB
789 this.opts.enableTasksQueue === false ||
790 (this.opts.enableTasksQueue === true &&
375f7504 791 this.shallExecuteTask(workerNodeKey))
52b71763 792 ) {
501aea93 793 this.executeTask(workerNodeKey, task)
4e377863
JB
794 } else {
795 this.enqueueTask(workerNodeKey, task)
52b71763 796 }
2e81254d 797 })
280c2a77 798 }
c97c7edb 799
afc003b2 800 /** @inheritDoc */
c97c7edb 801 public async destroy (): Promise<void> {
1fbcaa7c 802 await Promise.all(
81c02522 803 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 804 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
805 })
806 )
33e6bb4c 807 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 808 this.started = false
c97c7edb
S
809 }
810
1e3214b6
JB
811 protected async sendKillMessageToWorker (
812 workerNodeKey: number,
813 workerId: number
814 ): Promise<void> {
9edb9717 815 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
816 this.registerWorkerMessageListener(workerNodeKey, (message) => {
817 if (message.kill === 'success') {
818 resolve()
819 } else if (message.kill === 'failure') {
e1af34e6 820 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
821 }
822 })
9edb9717 823 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 824 })
1e3214b6
JB
825 }
826
4a6952ff 827 /**
aa9eede8 828 * Terminates the worker node given its worker node key.
4a6952ff 829 *
aa9eede8 830 * @param workerNodeKey - The worker node key.
4a6952ff 831 */
81c02522 832 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 833
729c563d 834 /**
6677a3d3
JB
835 * Setup hook to execute code before worker nodes are created in the abstract constructor.
836 * Can be overridden.
afc003b2
JB
837 *
838 * @virtual
729c563d 839 */
280c2a77 840 protected setupHook (): void {
965df41c 841 /* Intentionally empty */
280c2a77 842 }
c97c7edb 843
729c563d 844 /**
280c2a77
S
845 * Should return whether the worker is the main worker or not.
846 */
847 protected abstract isMain (): boolean
848
849 /**
2e81254d 850 * Hook executed before the worker task execution.
bf9549ae 851 * Can be overridden.
729c563d 852 *
f06e48d8 853 * @param workerNodeKey - The worker node key.
1c6fe997 854 * @param task - The task to execute.
729c563d 855 */
1c6fe997
JB
856 protected beforeTaskExecutionHook (
857 workerNodeKey: number,
858 task: Task<Data>
859 ): void {
94407def
JB
860 if (this.workerNodes[workerNodeKey]?.usage != null) {
861 const workerUsage = this.workerNodes[workerNodeKey].usage
862 ++workerUsage.tasks.executing
863 this.updateWaitTimeWorkerUsage(workerUsage, task)
864 }
865 if (
866 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
867 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
868 task.name as string
869 ) != null
870 ) {
db0e38ee 871 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 872 workerNodeKey
db0e38ee 873 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
874 ++taskFunctionWorkerUsage.tasks.executing
875 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 876 }
c97c7edb
S
877 }
878
c01733f1 879 /**
2e81254d 880 * Hook executed after the worker task execution.
bf9549ae 881 * Can be overridden.
c01733f1 882 *
501aea93 883 * @param workerNodeKey - The worker node key.
38e795c1 884 * @param message - The received message.
c01733f1 885 */
2e81254d 886 protected afterTaskExecutionHook (
501aea93 887 workerNodeKey: number,
2740a743 888 message: MessageValue<Response>
bf9549ae 889 ): void {
94407def
JB
890 if (this.workerNodes[workerNodeKey]?.usage != null) {
891 const workerUsage = this.workerNodes[workerNodeKey].usage
892 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
893 this.updateRunTimeWorkerUsage(workerUsage, message)
894 this.updateEluWorkerUsage(workerUsage, message)
895 }
896 if (
897 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
898 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 899 message.taskPerformance?.name as string
94407def
JB
900 ) != null
901 ) {
db0e38ee 902 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 903 workerNodeKey
db0e38ee 904 ].getTaskFunctionWorkerUsage(
0628755c 905 message.taskPerformance?.name as string
b558f6b5 906 ) as WorkerUsage
db0e38ee
JB
907 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
908 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
909 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
910 }
911 }
912
db0e38ee
JB
913 /**
914 * Whether the worker node shall update its task function worker usage or not.
915 *
916 * @param workerNodeKey - The worker node key.
917 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
918 */
919 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 920 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 921 return (
94407def 922 workerInfo != null &&
a5d15204 923 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 924 workerInfo.taskFunctions.length > 2
b558f6b5 925 )
f1c06930
JB
926 }
927
928 private updateTaskStatisticsWorkerUsage (
929 workerUsage: WorkerUsage,
930 message: MessageValue<Response>
931 ): void {
a4e07f72 932 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
933 if (
934 workerTaskStatistics.executing != null &&
935 workerTaskStatistics.executing > 0
936 ) {
937 --workerTaskStatistics.executing
5bb5be17 938 }
98e72cda
JB
939 if (message.taskError == null) {
940 ++workerTaskStatistics.executed
941 } else {
a4e07f72 942 ++workerTaskStatistics.failed
2740a743 943 }
f8eb0a2a
JB
944 }
945
a4e07f72
JB
946 private updateRunTimeWorkerUsage (
947 workerUsage: WorkerUsage,
f8eb0a2a
JB
948 message: MessageValue<Response>
949 ): void {
dc021bcc
JB
950 if (message.taskError != null) {
951 return
952 }
e4f20deb
JB
953 updateMeasurementStatistics(
954 workerUsage.runTime,
955 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 956 message.taskPerformance?.runTime ?? 0
e4f20deb 957 )
f8eb0a2a
JB
958 }
959
a4e07f72
JB
960 private updateWaitTimeWorkerUsage (
961 workerUsage: WorkerUsage,
1c6fe997 962 task: Task<Data>
f8eb0a2a 963 ): void {
1c6fe997
JB
964 const timestamp = performance.now()
965 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
966 updateMeasurementStatistics(
967 workerUsage.waitTime,
968 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 969 taskWaitTime
e4f20deb 970 )
c01733f1 971 }
972
a4e07f72 973 private updateEluWorkerUsage (
5df69fab 974 workerUsage: WorkerUsage,
62c15a68
JB
975 message: MessageValue<Response>
976 ): void {
dc021bcc
JB
977 if (message.taskError != null) {
978 return
979 }
008512c7
JB
980 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
981 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
982 updateMeasurementStatistics(
983 workerUsage.elu.active,
008512c7 984 eluTaskStatisticsRequirements,
dc021bcc 985 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
986 )
987 updateMeasurementStatistics(
988 workerUsage.elu.idle,
008512c7 989 eluTaskStatisticsRequirements,
dc021bcc 990 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 991 )
008512c7 992 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 993 if (message.taskPerformance?.elu != null) {
f7510105
JB
994 if (workerUsage.elu.utilization != null) {
995 workerUsage.elu.utilization =
996 (workerUsage.elu.utilization +
997 message.taskPerformance.elu.utilization) /
998 2
999 } else {
1000 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1001 }
62c15a68
JB
1002 }
1003 }
1004 }
1005
280c2a77 1006 /**
f06e48d8 1007 * Chooses a worker node for the next task.
280c2a77 1008 *
6c6afb84 1009 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1010 *
aa9eede8 1011 * @returns The chosen worker node key
280c2a77 1012 */
6c6afb84 1013 private chooseWorkerNode (): number {
930dcf12 1014 if (this.shallCreateDynamicWorker()) {
aa9eede8 1015 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1016 if (
b1aae695 1017 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1018 ) {
aa9eede8 1019 return workerNodeKey
6c6afb84 1020 }
17393ac8 1021 }
930dcf12
JB
1022 return this.workerChoiceStrategyContext.execute()
1023 }
1024
6c6afb84
JB
1025 /**
1026 * Conditions for dynamic worker creation.
1027 *
1028 * @returns Whether to create a dynamic worker or not.
1029 */
1030 private shallCreateDynamicWorker (): boolean {
930dcf12 1031 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1032 }
1033
280c2a77 1034 /**
aa9eede8 1035 * Sends a message to worker given its worker node key.
280c2a77 1036 *
aa9eede8 1037 * @param workerNodeKey - The worker node key.
38e795c1 1038 * @param message - The message.
7d91a8cd 1039 * @param transferList - The optional array of transferable objects.
280c2a77
S
1040 */
1041 protected abstract sendToWorker (
aa9eede8 1042 workerNodeKey: number,
7d91a8cd
JB
1043 message: MessageValue<Data>,
1044 transferList?: TransferListItem[]
280c2a77
S
1045 ): void
1046
729c563d 1047 /**
41344292 1048 * Creates a new worker.
6c6afb84
JB
1049 *
1050 * @returns Newly created worker.
729c563d 1051 */
280c2a77 1052 protected abstract createWorker (): Worker
c97c7edb 1053
4a6952ff 1054 /**
aa9eede8 1055 * Creates a new, completely set up worker node.
4a6952ff 1056 *
aa9eede8 1057 * @returns New, completely set up worker node key.
4a6952ff 1058 */
aa9eede8 1059 protected createAndSetupWorkerNode (): number {
bdacc2d2 1060 const worker = this.createWorker()
280c2a77 1061
fd04474e 1062 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1063 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1064 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1065 worker.on('error', (error) => {
aad6fb64 1066 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1067 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1068 workerInfo.ready = false
0dc838e3 1069 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1070 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1071 if (
1072 this.opts.restartWorkerOnError === true &&
1073 !this.starting &&
1074 this.started
1075 ) {
9b106837 1076 if (workerInfo.dynamic) {
aa9eede8 1077 this.createAndSetupDynamicWorkerNode()
8a1260a3 1078 } else {
aa9eede8 1079 this.createAndSetupWorkerNode()
8a1260a3 1080 }
5baee0d7 1081 }
19dbc45b 1082 if (this.opts.enableTasksQueue === true) {
9b106837 1083 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1084 }
5baee0d7 1085 })
a35560ba 1086 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1087 worker.once('exit', () => {
f06e48d8 1088 this.removeWorkerNode(worker)
a974afa6 1089 })
280c2a77 1090
aa9eede8 1091 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1092
aa9eede8 1093 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1094
aa9eede8 1095 return workerNodeKey
c97c7edb 1096 }
be0676b3 1097
930dcf12 1098 /**
aa9eede8 1099 * Creates a new, completely set up dynamic worker node.
930dcf12 1100 *
aa9eede8 1101 * @returns New, completely set up dynamic worker node key.
930dcf12 1102 */
aa9eede8
JB
1103 protected createAndSetupDynamicWorkerNode (): number {
1104 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1105 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1106 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1107 message.workerId
aad6fb64 1108 )
aa9eede8 1109 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1110 // Kill message received from worker
930dcf12
JB
1111 if (
1112 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1113 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1114 ((this.opts.enableTasksQueue === false &&
aa9eede8 1115 workerUsage.tasks.executing === 0) ||
7b56f532 1116 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1117 workerUsage.tasks.executing === 0 &&
1118 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1119 ) {
5270d253
JB
1120 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1121 this.emitter?.emit(PoolEvents.error, error)
1122 })
930dcf12
JB
1123 }
1124 })
94407def 1125 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1126 this.sendToWorker(workerNodeKey, {
b0a4db63 1127 checkActive: true,
21f710aa
JB
1128 workerId: workerInfo.id as number
1129 })
b5e113f6 1130 workerInfo.dynamic = true
b1aae695
JB
1131 if (
1132 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1133 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1134 ) {
b5e113f6
JB
1135 workerInfo.ready = true
1136 }
33e6bb4c 1137 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1138 return workerNodeKey
930dcf12
JB
1139 }
1140
a2ed5053 1141 /**
aa9eede8 1142 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1143 *
aa9eede8 1144 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1145 * @param listener - The message listener callback.
1146 */
85aeb3f3
JB
1147 protected abstract registerWorkerMessageListener<
1148 Message extends Data | Response
aa9eede8
JB
1149 >(
1150 workerNodeKey: number,
1151 listener: (message: MessageValue<Message>) => void
1152 ): void
a2ed5053
JB
1153
1154 /**
aa9eede8 1155 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1156 * Can be overridden.
1157 *
aa9eede8 1158 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1159 */
aa9eede8 1160 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1161 // Listen to worker messages.
aa9eede8 1162 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1163 // Send the startup message to worker.
aa9eede8 1164 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1165 // Send the statistics message to worker.
1166 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1167 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1168 this.workerNodes[workerNodeKey].onEmptyQueue =
1169 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1170 this.workerNodes[workerNodeKey].onBackPressure =
1171 this.tasksStealingOnBackPressure.bind(this)
1172 }
d2c73f82
JB
1173 }
1174
85aeb3f3 1175 /**
aa9eede8
JB
1176 * Sends the startup message to worker given its worker node key.
1177 *
1178 * @param workerNodeKey - The worker node key.
1179 */
1180 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1181
1182 /**
9edb9717 1183 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1184 *
aa9eede8 1185 * @param workerNodeKey - The worker node key.
85aeb3f3 1186 */
9edb9717 1187 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1188 this.sendToWorker(workerNodeKey, {
1189 statistics: {
1190 runTime:
1191 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1192 .runTime.aggregate,
1193 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1194 .elu.aggregate
1195 },
94407def 1196 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1197 })
1198 }
a2ed5053
JB
1199
1200 private redistributeQueuedTasks (workerNodeKey: number): void {
1201 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1202 let destinationWorkerNodeKey!: number
a2ed5053 1203 let minQueuedTasks = Infinity
a6b3272b 1204 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3 1205 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
58baffd3
JB
1206 if (workerNode.usage.tasks.queued === 0) {
1207 destinationWorkerNodeKey = workerNodeId
1208 break
1209 }
1210 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1211 minQueuedTasks = workerNode.usage.tasks.queued
1212 destinationWorkerNodeKey = workerNodeId
1213 }
a2ed5053
JB
1214 }
1215 }
0bc68267 1216 if (destinationWorkerNodeKey != null) {
033f1776 1217 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1218 const task = {
1219 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1220 workerId: destinationWorkerNode.info.id as number
0bc68267 1221 }
375f7504 1222 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
0bc68267
JB
1223 this.executeTask(destinationWorkerNodeKey, task)
1224 } else {
1225 this.enqueueTask(destinationWorkerNodeKey, task)
1226 }
dd951876
JB
1227 }
1228 }
1229 }
1230
b1838604
JB
1231 private updateTaskStolenStatisticsWorkerUsage (
1232 workerNodeKey: number,
b1838604
JB
1233 taskName: string
1234 ): void {
1a880eca 1235 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1236 if (workerNode?.usage != null) {
1237 ++workerNode.usage.tasks.stolen
1238 }
1239 if (
1240 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1241 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1242 ) {
1243 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1244 taskName
1245 ) as WorkerUsage
1246 ++taskFunctionWorkerUsage.tasks.stolen
1247 }
1248 }
1249
dd951876 1250 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1251 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1252 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1253 const workerNodes = this.workerNodes
a6b3272b 1254 .slice()
dd951876
JB
1255 .sort(
1256 (workerNodeA, workerNodeB) =>
1257 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1258 )
dd951876 1259 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1260 if (sourceWorkerNode.usage.tasks.queued === 0) {
1261 break
1262 }
a6b3272b
JB
1263 if (
1264 sourceWorkerNode.info.ready &&
1265 sourceWorkerNode.info.id !== workerId &&
1266 sourceWorkerNode.usage.tasks.queued > 0
1267 ) {
1268 const task = {
1269 ...(sourceWorkerNode.popTask() as Task<Data>),
1270 workerId: destinationWorkerNode.info.id as number
1271 }
375f7504 1272 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
2cb4ff45
JB
1273 this.executeTask(destinationWorkerNodeKey, task)
1274 } else {
1275 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876 1276 }
b1838604
JB
1277 this.updateTaskStolenStatisticsWorkerUsage(
1278 destinationWorkerNodeKey,
b1838604
JB
1279 task.name as string
1280 )
dd951876 1281 break
72695f86
JB
1282 }
1283 }
1284 }
1285
1286 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1287 const sizeOffset = 1
1288 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1289 return
1290 }
72695f86
JB
1291 const sourceWorkerNode =
1292 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1293 const workerNodes = this.workerNodes
a6b3272b 1294 .slice()
72695f86
JB
1295 .sort(
1296 (workerNodeA, workerNodeB) =>
1297 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1298 )
1299 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1300 if (
0bc68267 1301 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1302 workerNode.info.ready &&
1303 workerNode.info.id !== workerId &&
0bc68267 1304 workerNode.usage.tasks.queued <
f778c355 1305 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1306 ) {
dd951876
JB
1307 const task = {
1308 ...(sourceWorkerNode.popTask() as Task<Data>),
1309 workerId: workerNode.info.id as number
1310 }
375f7504 1311 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1312 this.executeTask(workerNodeKey, task)
4de3d785 1313 } else {
dd951876 1314 this.enqueueTask(workerNodeKey, task)
4de3d785 1315 }
b1838604
JB
1316 this.updateTaskStolenStatisticsWorkerUsage(
1317 workerNodeKey,
b1838604
JB
1318 task.name as string
1319 )
10ecf8fd 1320 }
a2ed5053
JB
1321 }
1322 }
1323
be0676b3 1324 /**
aa9eede8 1325 * This method is the listener registered for each worker message.
be0676b3 1326 *
bdacc2d2 1327 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1328 */
1329 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1330 return (message) => {
21f710aa 1331 this.checkMessageWorkerId(message)
a5d15204 1332 if (message.ready != null && message.taskFunctions != null) {
81c02522 1333 // Worker ready response received from worker
10e2aa7e 1334 this.handleWorkerReadyResponse(message)
7629bdf1 1335 } else if (message.taskId != null) {
81c02522 1336 // Task execution response received from worker
6b272951 1337 this.handleTaskExecutionResponse(message)
90d7d101
JB
1338 } else if (message.taskFunctions != null) {
1339 // Task functions message received from worker
94407def
JB
1340 (
1341 this.getWorkerInfo(
1342 this.getWorkerNodeKeyByWorkerId(message.workerId)
1343 ) as WorkerInfo
b558f6b5 1344 ).taskFunctions = message.taskFunctions
6b272951
JB
1345 }
1346 }
1347 }
1348
10e2aa7e 1349 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1350 if (message.ready === false) {
1351 throw new Error(`Worker ${message.workerId} failed to initialize`)
1352 }
a5d15204 1353 const workerInfo = this.getWorkerInfo(
aad6fb64 1354 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1355 ) as WorkerInfo
a5d15204
JB
1356 workerInfo.ready = message.ready as boolean
1357 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1358 if (this.emitter != null && this.ready) {
1359 this.emitter.emit(PoolEvents.ready, this.info)
1360 }
6b272951
JB
1361 }
1362
1363 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1364 const { taskId, taskError, data } = message
1365 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1366 if (promiseResponse != null) {
5441aea6
JB
1367 if (taskError != null) {
1368 this.emitter?.emit(PoolEvents.taskError, taskError)
1369 promiseResponse.reject(taskError.message)
6b272951 1370 } else {
5441aea6 1371 promiseResponse.resolve(data as Response)
6b272951 1372 }
501aea93
JB
1373 const workerNodeKey = promiseResponse.workerNodeKey
1374 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1375 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1376 if (
1377 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1378 this.tasksQueueSize(workerNodeKey) > 0 &&
1379 this.workerNodes[workerNodeKey].usage.tasks.executing <
1380 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1381 ) {
1382 this.executeTask(
1383 workerNodeKey,
1384 this.dequeueTask(workerNodeKey) as Task<Data>
1385 )
be0676b3 1386 }
6b272951 1387 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1388 }
be0676b3 1389 }
7c0ba920 1390
a1763c54 1391 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1392 if (this.busy) {
1393 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1394 }
1395 }
1396
1397 private checkAndEmitTaskQueuingEvents (): void {
1398 if (this.hasBackPressure()) {
1399 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1400 }
1401 }
1402
33e6bb4c
JB
1403 private checkAndEmitDynamicWorkerCreationEvents (): void {
1404 if (this.type === PoolTypes.dynamic) {
1405 if (this.full) {
1406 this.emitter?.emit(PoolEvents.full, this.info)
1407 }
1408 }
1409 }
1410
8a1260a3 1411 /**
aa9eede8 1412 * Gets the worker information given its worker node key.
8a1260a3
JB
1413 *
1414 * @param workerNodeKey - The worker node key.
3f09ed9f 1415 * @returns The worker information.
8a1260a3 1416 */
94407def
JB
1417 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1418 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1419 }
1420
a05c10de 1421 /**
b0a4db63 1422 * Adds the given worker in the pool worker nodes.
ea7a90d3 1423 *
38e795c1 1424 * @param worker - The worker.
aa9eede8
JB
1425 * @returns The added worker node key.
1426 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1427 */
b0a4db63 1428 private addWorkerNode (worker: Worker): number {
671d5154
JB
1429 const workerNode = new WorkerNode<Worker, Data>(
1430 worker,
ff3f866a 1431 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1432 )
b97d82d8 1433 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1434 if (this.starting) {
1435 workerNode.info.ready = true
1436 }
aa9eede8 1437 this.workerNodes.push(workerNode)
aad6fb64 1438 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1439 if (workerNodeKey === -1) {
86ed0598 1440 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1441 }
1442 return workerNodeKey
ea7a90d3 1443 }
c923ce56 1444
51fe3d3c 1445 /**
f06e48d8 1446 * Removes the given worker from the pool worker nodes.
51fe3d3c 1447 *
f06e48d8 1448 * @param worker - The worker.
51fe3d3c 1449 */
416fd65c 1450 private removeWorkerNode (worker: Worker): void {
aad6fb64 1451 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1452 if (workerNodeKey !== -1) {
1453 this.workerNodes.splice(workerNodeKey, 1)
1454 this.workerChoiceStrategyContext.remove(workerNodeKey)
1455 }
51fe3d3c 1456 }
adc3c320 1457
e2b31e32
JB
1458 /** @inheritDoc */
1459 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1460 return (
e2b31e32
JB
1461 this.opts.enableTasksQueue === true &&
1462 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1463 )
1464 }
1465
1466 private hasBackPressure (): boolean {
1467 return (
1468 this.opts.enableTasksQueue === true &&
1469 this.workerNodes.findIndex(
1470 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1471 ) === -1
9e844245 1472 )
e2b31e32
JB
1473 }
1474
b0a4db63 1475 /**
aa9eede8 1476 * Executes the given task on the worker given its worker node key.
b0a4db63 1477 *
aa9eede8 1478 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1479 * @param task - The task to execute.
1480 */
2e81254d 1481 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1482 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1483 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1484 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1485 }
1486
f9f00b5f 1487 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1488 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1489 this.checkAndEmitTaskQueuingEvents()
1490 return tasksQueueSize
adc3c320
JB
1491 }
1492
416fd65c 1493 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1494 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1495 }
1496
416fd65c 1497 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1498 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1499 }
1500
81c02522 1501 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1502 while (this.tasksQueueSize(workerNodeKey) > 0) {
1503 this.executeTask(
1504 workerNodeKey,
1505 this.dequeueTask(workerNodeKey) as Task<Data>
1506 )
ff733df7 1507 }
4b628b48 1508 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1509 }
1510
ef41a6e6
JB
1511 private flushTasksQueues (): void {
1512 for (const [workerNodeKey] of this.workerNodes.entries()) {
1513 this.flushTasksQueue(workerNodeKey)
1514 }
1515 }
c97c7edb 1516}