refactor: remove unused queue code
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
8735b4e5
JB
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
075e51d1 92 /**
adc9cc64 93 * Whether the pool is starting or not.
075e51d1
JB
94 */
95 private readonly starting: boolean
afe0d5bf
JB
96 /**
97 * The start timestamp of the pool.
98 */
99 private readonly startTimestamp
100
729c563d
S
101 /**
102 * Constructs a new poolifier pool.
103 *
38e795c1 104 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 105 * @param filePath - Path to the worker file.
38e795c1 106 * @param opts - Options for the pool.
729c563d 107 */
c97c7edb 108 public constructor (
b4213b7f
JB
109 protected readonly numberOfWorkers: number,
110 protected readonly filePath: string,
111 protected readonly opts: PoolOptions<Worker>
c97c7edb 112 ) {
78cea37e 113 if (!this.isMain()) {
04f45163 114 throw new Error(
8c6d4acf 115 'Cannot start a pool from a worker with the same type as the pool'
04f45163 116 )
c97c7edb 117 }
8d3782fa 118 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 119 this.checkFilePath(this.filePath)
7c0ba920 120 this.checkPoolOptions(this.opts)
1086026a 121
7254e419
JB
122 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
123 this.executeTask = this.executeTask.bind(this)
124 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 125
6bd72cd0 126 if (this.opts.enableEvents === true) {
7c0ba920
JB
127 this.emitter = new PoolEmitter()
128 }
d59df138
JB
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
da309861
JB
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
b6b32453
JB
138
139 this.setupHook()
140
075e51d1 141 this.starting = true
e761c033 142 this.startPool()
075e51d1 143 this.starting = false
afe0d5bf
JB
144
145 this.startTimestamp = performance.now()
c97c7edb
S
146 }
147
a35560ba 148 private checkFilePath (filePath: string): void {
ffcbbad8
JB
149 if (
150 filePath == null ||
3d6dd312 151 typeof filePath !== 'string' ||
ffcbbad8
JB
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
c510fea7
APA
154 throw new Error('Please specify a file with a worker implementation')
155 }
3d6dd312
JB
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
c510fea7
APA
159 }
160
8d3782fa
JB
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
78cea37e 166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 167 throw new TypeError(
0d80593b 168 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
169 )
170 } else if (numberOfWorkers < 0) {
473c717a 171 throw new RangeError(
8d3782fa
JB
172 'Cannot instantiate a pool with a negative number of workers'
173 )
6b27d407 174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 180 if (this.type === PoolTypes.dynamic) {
a5ed75b7 181 if (max == null) {
e695d66f 182 throw new TypeError(
a5ed75b7
JB
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
079de991
JB
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
b97d82d8 193 } else if (max === 0) {
079de991 194 throw new RangeError(
d640b48b 195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
8d3782fa
JB
202 }
203 }
204
7c0ba920 205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
49be33fe
JB
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 230 }
aee46736
JB
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 237 throw new Error(
aee46736 238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
239 )
240 }
7c0ba920
JB
241 }
242
0d80593b
JB
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
8990357d
JB
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
49be33fe
JB
267 if (
268 workerChoiceStrategyOptions.weights != null &&
6b27d407 269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
f0d7f803
JB
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
0d80593b
JB
285 }
286
a20f0ba5
JB
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
0d80593b
JB
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
f0d7f803
JB
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
20c6f652 298 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
e695d66f 305 throw new RangeError(
20c6f652
JB
306 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
307 )
308 }
309 if (
310 tasksQueueOptions?.queueMaxSize != null &&
311 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
312 ) {
313 throw new TypeError(
314 'Invalid worker node tasks queue max size: must be an integer'
315 )
316 }
317 if (
318 tasksQueueOptions?.queueMaxSize != null &&
319 tasksQueueOptions.queueMaxSize <= 0
320 ) {
321 throw new RangeError(
322 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
a20f0ba5
JB
323 )
324 }
325 }
326
e761c033
JB
327 private startPool (): void {
328 while (
329 this.workerNodes.reduce(
330 (accumulator, workerNode) =>
331 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
332 0
333 ) < this.numberOfWorkers
334 ) {
aa9eede8 335 this.createAndSetupWorkerNode()
e761c033
JB
336 }
337 }
338
08f3f44c 339 /** @inheritDoc */
6b27d407
JB
340 public get info (): PoolInfo {
341 return {
23ccf9d7 342 version,
6b27d407 343 type: this.type,
184855e6 344 worker: this.worker,
2431bdb4
JB
345 ready: this.ready,
346 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
347 minSize: this.minSize,
348 maxSize: this.maxSize,
c05f0d50
JB
349 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
350 .runTime.aggregate &&
1305e9a8
JB
351 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
353 workerNodes: this.workerNodes.length,
354 idleWorkerNodes: this.workerNodes.reduce(
355 (accumulator, workerNode) =>
f59e1027 356 workerNode.usage.tasks.executing === 0
a4e07f72
JB
357 ? accumulator + 1
358 : accumulator,
6b27d407
JB
359 0
360 ),
361 busyWorkerNodes: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
f59e1027 363 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
364 0
365 ),
a4e07f72 366 executedTasks: this.workerNodes.reduce(
6b27d407 367 (accumulator, workerNode) =>
f59e1027 368 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
369 0
370 ),
371 executingTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
f59e1027 373 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
374 0
375 ),
daf86646
JB
376 ...(this.opts.enableTasksQueue === true && {
377 queuedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + workerNode.usage.tasks.queued,
380 0
381 )
382 }),
383 ...(this.opts.enableTasksQueue === true && {
384 maxQueuedTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
386 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
387 0
388 )
389 }),
a1763c54
JB
390 ...(this.opts.enableTasksQueue === true && {
391 backPressure: this.hasBackPressure()
392 }),
a4e07f72
JB
393 failedTasks: this.workerNodes.reduce(
394 (accumulator, workerNode) =>
f59e1027 395 accumulator + workerNode.usage.tasks.failed,
a4e07f72 396 0
1dcf8b7b
JB
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .runTime.aggregate && {
400 runTime: {
98e72cda
JB
401 minimum: round(
402 Math.min(
403 ...this.workerNodes.map(
8ebe6c30 404 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 405 )
1dcf8b7b
JB
406 )
407 ),
98e72cda
JB
408 maximum: round(
409 Math.max(
410 ...this.workerNodes.map(
8ebe6c30 411 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 412 )
1dcf8b7b 413 )
98e72cda
JB
414 ),
415 average: round(
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
419 0
420 ) /
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.tasks?.executed ?? 0),
424 0
425 )
426 ),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .runTime.median && {
429 median: round(
430 median(
431 this.workerNodes.map(
8ebe6c30 432 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
433 )
434 )
435 )
436 })
1dcf8b7b
JB
437 }
438 }),
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .waitTime.aggregate && {
441 waitTime: {
98e72cda
JB
442 minimum: round(
443 Math.min(
444 ...this.workerNodes.map(
8ebe6c30 445 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 446 )
1dcf8b7b
JB
447 )
448 ),
98e72cda
JB
449 maximum: round(
450 Math.max(
451 ...this.workerNodes.map(
8ebe6c30 452 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 453 )
1dcf8b7b 454 )
98e72cda
JB
455 ),
456 average: round(
457 this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
460 0
461 ) /
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.tasks?.executed ?? 0),
465 0
466 )
467 ),
468 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
469 .waitTime.median && {
470 median: round(
471 median(
472 this.workerNodes.map(
8ebe6c30 473 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
474 )
475 )
476 )
477 })
1dcf8b7b
JB
478 }
479 })
6b27d407
JB
480 }
481 }
08f3f44c 482
aa9eede8
JB
483 /**
484 * The pool readiness boolean status.
485 */
2431bdb4
JB
486 private get ready (): boolean {
487 return (
b97d82d8
JB
488 this.workerNodes.reduce(
489 (accumulator, workerNode) =>
490 !workerNode.info.dynamic && workerNode.info.ready
491 ? accumulator + 1
492 : accumulator,
493 0
494 ) >= this.minSize
2431bdb4
JB
495 )
496 }
497
afe0d5bf 498 /**
aa9eede8 499 * The approximate pool utilization.
afe0d5bf
JB
500 *
501 * @returns The pool utilization.
502 */
503 private get utilization (): number {
8e5ca040 504 const poolTimeCapacity =
fe7d90db 505 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
506 const totalTasksRunTime = this.workerNodes.reduce(
507 (accumulator, workerNode) =>
71514351 508 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
509 0
510 )
511 const totalTasksWaitTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
71514351 513 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
514 0
515 )
8e5ca040 516 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
517 }
518
8881ae32 519 /**
aa9eede8 520 * The pool type.
8881ae32
JB
521 *
522 * If it is `'dynamic'`, it provides the `max` property.
523 */
524 protected abstract get type (): PoolType
525
184855e6 526 /**
aa9eede8 527 * The worker type.
184855e6
JB
528 */
529 protected abstract get worker (): WorkerType
530
c2ade475 531 /**
aa9eede8 532 * The pool minimum size.
c2ade475 533 */
8735b4e5
JB
534 protected get minSize (): number {
535 return this.numberOfWorkers
536 }
ff733df7
JB
537
538 /**
aa9eede8 539 * The pool maximum size.
ff733df7 540 */
8735b4e5
JB
541 protected get maxSize (): number {
542 return this.max ?? this.numberOfWorkers
543 }
a35560ba 544
6b813701
JB
545 /**
546 * Checks if the worker id sent in the received message from a worker is valid.
547 *
548 * @param message - The received message.
549 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
550 */
21f710aa 551 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
552 if (message.workerId == null) {
553 throw new Error('Worker message received without worker id')
554 } else if (
21f710aa 555 message.workerId != null &&
aad6fb64 556 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
557 ) {
558 throw new Error(
559 `Worker message received from unknown worker '${message.workerId}'`
560 )
561 }
562 }
563
ffcbbad8 564 /**
f06e48d8 565 * Gets the given worker its worker node key.
ffcbbad8
JB
566 *
567 * @param worker - The worker.
f59e1027 568 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 569 */
aad6fb64 570 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 571 return this.workerNodes.findIndex(
8ebe6c30 572 (workerNode) => workerNode.worker === worker
f06e48d8 573 )
bf9549ae
JB
574 }
575
aa9eede8
JB
576 /**
577 * Gets the worker node key given its worker id.
578 *
579 * @param workerId - The worker id.
aad6fb64 580 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 581 */
aad6fb64
JB
582 private getWorkerNodeKeyByWorkerId (workerId: number): number {
583 return this.workerNodes.findIndex(
8ebe6c30 584 (workerNode) => workerNode.info.id === workerId
aad6fb64 585 )
aa9eede8
JB
586 }
587
afc003b2 588 /** @inheritDoc */
a35560ba 589 public setWorkerChoiceStrategy (
59219cbb
JB
590 workerChoiceStrategy: WorkerChoiceStrategy,
591 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 592 ): void {
aee46736 593 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 594 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
595 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
596 this.opts.workerChoiceStrategy
597 )
598 if (workerChoiceStrategyOptions != null) {
599 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
600 }
aa9eede8 601 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 602 workerNode.resetUsage()
9edb9717 603 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 604 }
a20f0ba5
JB
605 }
606
607 /** @inheritDoc */
608 public setWorkerChoiceStrategyOptions (
609 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
610 ): void {
0d80593b 611 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
612 this.opts.workerChoiceStrategyOptions = {
613 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
614 ...workerChoiceStrategyOptions
615 }
a20f0ba5
JB
616 this.workerChoiceStrategyContext.setOptions(
617 this.opts.workerChoiceStrategyOptions
a35560ba
S
618 )
619 }
620
a20f0ba5 621 /** @inheritDoc */
8f52842f
JB
622 public enableTasksQueue (
623 enable: boolean,
624 tasksQueueOptions?: TasksQueueOptions
625 ): void {
a20f0ba5 626 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 627 this.flushTasksQueues()
a20f0ba5
JB
628 }
629 this.opts.enableTasksQueue = enable
8f52842f 630 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
631 }
632
633 /** @inheritDoc */
8f52842f 634 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 635 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
636 this.checkValidTasksQueueOptions(tasksQueueOptions)
637 this.opts.tasksQueueOptions =
638 this.buildTasksQueueOptions(tasksQueueOptions)
20c6f652
JB
639 this.setTasksQueueMaxSize(
640 this.opts.tasksQueueOptions.queueMaxSize as number
641 )
5baee0d7 642 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
643 delete this.opts.tasksQueueOptions
644 }
645 }
646
20c6f652
JB
647 private setTasksQueueMaxSize (queueMaxSize: number): void {
648 for (const workerNode of this.workerNodes) {
649 workerNode.tasksQueueBackPressureSize = queueMaxSize
650 }
651 }
652
a20f0ba5
JB
653 private buildTasksQueueOptions (
654 tasksQueueOptions: TasksQueueOptions
655 ): TasksQueueOptions {
656 return {
20c6f652
JB
657 ...{
658 queueMaxSize: Math.pow(this.maxSize, 2),
659 concurrency: 1
660 },
661 ...tasksQueueOptions
a20f0ba5
JB
662 }
663 }
664
c319c66b
JB
665 /**
666 * Whether the pool is full or not.
667 *
668 * The pool filling boolean status.
669 */
dea903a8
JB
670 protected get full (): boolean {
671 return this.workerNodes.length >= this.maxSize
672 }
c2ade475 673
c319c66b
JB
674 /**
675 * Whether the pool is busy or not.
676 *
677 * The pool busyness boolean status.
678 */
679 protected abstract get busy (): boolean
7c0ba920 680
6c6afb84 681 /**
3d76750a 682 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
683 *
684 * @returns Worker nodes busyness boolean status.
685 */
c2ade475 686 protected internalBusy (): boolean {
3d76750a
JB
687 if (this.opts.enableTasksQueue === true) {
688 return (
689 this.workerNodes.findIndex(
8ebe6c30 690 (workerNode) =>
3d76750a
JB
691 workerNode.info.ready &&
692 workerNode.usage.tasks.executing <
693 (this.opts.tasksQueueOptions?.concurrency as number)
694 ) === -1
695 )
696 } else {
697 return (
698 this.workerNodes.findIndex(
8ebe6c30 699 (workerNode) =>
3d76750a
JB
700 workerNode.info.ready && workerNode.usage.tasks.executing === 0
701 ) === -1
702 )
703 }
cb70b19d
JB
704 }
705
90d7d101
JB
706 /** @inheritDoc */
707 public listTaskFunctions (): string[] {
f2dbbf95
JB
708 for (const workerNode of this.workerNodes) {
709 if (
710 Array.isArray(workerNode.info.taskFunctions) &&
711 workerNode.info.taskFunctions.length > 0
712 ) {
713 return workerNode.info.taskFunctions
714 }
90d7d101 715 }
f2dbbf95 716 return []
90d7d101
JB
717 }
718
afc003b2 719 /** @inheritDoc */
7d91a8cd
JB
720 public async execute (
721 data?: Data,
722 name?: string,
723 transferList?: TransferListItem[]
724 ): Promise<Response> {
52b71763 725 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
726 if (name != null && typeof name !== 'string') {
727 reject(new TypeError('name argument must be a string'))
728 }
90d7d101
JB
729 if (
730 name != null &&
731 typeof name === 'string' &&
732 name.trim().length === 0
733 ) {
f58b60b9 734 reject(new TypeError('name argument must not be an empty string'))
90d7d101 735 }
b558f6b5
JB
736 if (transferList != null && !Array.isArray(transferList)) {
737 reject(new TypeError('transferList argument must be an array'))
738 }
739 const timestamp = performance.now()
740 const workerNodeKey = this.chooseWorkerNode()
94407def 741 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
742 if (
743 name != null &&
a5d15204
JB
744 Array.isArray(workerInfo.taskFunctions) &&
745 !workerInfo.taskFunctions.includes(name)
cea399c8 746 ) {
90d7d101
JB
747 reject(
748 new Error(`Task function '${name}' is not registered in the pool`)
749 )
750 }
501aea93 751 const task: Task<Data> = {
52b71763
JB
752 name: name ?? DEFAULT_TASK_NAME,
753 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
754 data: data ?? ({} as Data),
7d91a8cd 755 transferList,
52b71763 756 timestamp,
a5d15204 757 workerId: workerInfo.id as number,
7629bdf1 758 taskId: randomUUID()
52b71763 759 }
7629bdf1 760 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
761 resolve,
762 reject,
501aea93 763 workerNodeKey
2e81254d 764 })
52b71763 765 if (
4e377863
JB
766 this.opts.enableTasksQueue === false ||
767 (this.opts.enableTasksQueue === true &&
768 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 769 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 770 ) {
501aea93 771 this.executeTask(workerNodeKey, task)
4e377863
JB
772 } else {
773 this.enqueueTask(workerNodeKey, task)
52b71763 774 }
2e81254d 775 })
280c2a77 776 }
c97c7edb 777
afc003b2 778 /** @inheritDoc */
c97c7edb 779 public async destroy (): Promise<void> {
1fbcaa7c 780 await Promise.all(
81c02522 781 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 782 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
783 })
784 )
33e6bb4c 785 this.emitter?.emit(PoolEvents.destroy, this.info)
c97c7edb
S
786 }
787
1e3214b6
JB
788 protected async sendKillMessageToWorker (
789 workerNodeKey: number,
790 workerId: number
791 ): Promise<void> {
9edb9717 792 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
793 this.registerWorkerMessageListener(workerNodeKey, (message) => {
794 if (message.kill === 'success') {
795 resolve()
796 } else if (message.kill === 'failure') {
e1af34e6 797 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
798 }
799 })
9edb9717 800 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 801 })
1e3214b6
JB
802 }
803
4a6952ff 804 /**
aa9eede8 805 * Terminates the worker node given its worker node key.
4a6952ff 806 *
aa9eede8 807 * @param workerNodeKey - The worker node key.
4a6952ff 808 */
81c02522 809 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 810
729c563d 811 /**
6677a3d3
JB
812 * Setup hook to execute code before worker nodes are created in the abstract constructor.
813 * Can be overridden.
afc003b2
JB
814 *
815 * @virtual
729c563d 816 */
280c2a77 817 protected setupHook (): void {
d99ba5a8 818 // Intentionally empty
280c2a77 819 }
c97c7edb 820
729c563d 821 /**
280c2a77
S
822 * Should return whether the worker is the main worker or not.
823 */
824 protected abstract isMain (): boolean
825
826 /**
2e81254d 827 * Hook executed before the worker task execution.
bf9549ae 828 * Can be overridden.
729c563d 829 *
f06e48d8 830 * @param workerNodeKey - The worker node key.
1c6fe997 831 * @param task - The task to execute.
729c563d 832 */
1c6fe997
JB
833 protected beforeTaskExecutionHook (
834 workerNodeKey: number,
835 task: Task<Data>
836 ): void {
94407def
JB
837 if (this.workerNodes[workerNodeKey]?.usage != null) {
838 const workerUsage = this.workerNodes[workerNodeKey].usage
839 ++workerUsage.tasks.executing
840 this.updateWaitTimeWorkerUsage(workerUsage, task)
841 }
842 if (
843 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
844 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
845 task.name as string
846 ) != null
847 ) {
db0e38ee 848 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 849 workerNodeKey
db0e38ee 850 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
851 ++taskFunctionWorkerUsage.tasks.executing
852 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 853 }
c97c7edb
S
854 }
855
c01733f1 856 /**
2e81254d 857 * Hook executed after the worker task execution.
bf9549ae 858 * Can be overridden.
c01733f1 859 *
501aea93 860 * @param workerNodeKey - The worker node key.
38e795c1 861 * @param message - The received message.
c01733f1 862 */
2e81254d 863 protected afterTaskExecutionHook (
501aea93 864 workerNodeKey: number,
2740a743 865 message: MessageValue<Response>
bf9549ae 866 ): void {
94407def
JB
867 if (this.workerNodes[workerNodeKey]?.usage != null) {
868 const workerUsage = this.workerNodes[workerNodeKey].usage
869 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
870 this.updateRunTimeWorkerUsage(workerUsage, message)
871 this.updateEluWorkerUsage(workerUsage, message)
872 }
873 if (
874 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
875 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 876 message.taskPerformance?.name as string
94407def
JB
877 ) != null
878 ) {
db0e38ee 879 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 880 workerNodeKey
db0e38ee 881 ].getTaskFunctionWorkerUsage(
0628755c 882 message.taskPerformance?.name as string
b558f6b5 883 ) as WorkerUsage
db0e38ee
JB
884 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
885 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
886 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
887 }
888 }
889
db0e38ee
JB
890 /**
891 * Whether the worker node shall update its task function worker usage or not.
892 *
893 * @param workerNodeKey - The worker node key.
894 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
895 */
896 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 897 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 898 return (
94407def 899 workerInfo != null &&
a5d15204 900 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 901 workerInfo.taskFunctions.length > 2
b558f6b5 902 )
f1c06930
JB
903 }
904
905 private updateTaskStatisticsWorkerUsage (
906 workerUsage: WorkerUsage,
907 message: MessageValue<Response>
908 ): void {
a4e07f72 909 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
910 if (
911 workerTaskStatistics.executing != null &&
912 workerTaskStatistics.executing > 0
913 ) {
914 --workerTaskStatistics.executing
915 } else if (
916 workerTaskStatistics.executing != null &&
917 workerTaskStatistics.executing < 0
918 ) {
919 throw new Error(
cf6e3991 920 'Worker usage statistic for tasks executing cannot be negative'
5bb5be17
JB
921 )
922 }
98e72cda
JB
923 if (message.taskError == null) {
924 ++workerTaskStatistics.executed
925 } else {
a4e07f72 926 ++workerTaskStatistics.failed
2740a743 927 }
f8eb0a2a
JB
928 }
929
a4e07f72
JB
930 private updateRunTimeWorkerUsage (
931 workerUsage: WorkerUsage,
f8eb0a2a
JB
932 message: MessageValue<Response>
933 ): void {
e4f20deb
JB
934 updateMeasurementStatistics(
935 workerUsage.runTime,
936 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
937 message.taskPerformance?.runTime ?? 0,
938 workerUsage.tasks.executed
939 )
f8eb0a2a
JB
940 }
941
a4e07f72
JB
942 private updateWaitTimeWorkerUsage (
943 workerUsage: WorkerUsage,
1c6fe997 944 task: Task<Data>
f8eb0a2a 945 ): void {
1c6fe997
JB
946 const timestamp = performance.now()
947 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
948 updateMeasurementStatistics(
949 workerUsage.waitTime,
950 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
951 taskWaitTime,
952 workerUsage.tasks.executed
953 )
c01733f1 954 }
955
a4e07f72 956 private updateEluWorkerUsage (
5df69fab 957 workerUsage: WorkerUsage,
62c15a68
JB
958 message: MessageValue<Response>
959 ): void {
008512c7
JB
960 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
961 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
962 updateMeasurementStatistics(
963 workerUsage.elu.active,
008512c7 964 eluTaskStatisticsRequirements,
e4f20deb
JB
965 message.taskPerformance?.elu?.active ?? 0,
966 workerUsage.tasks.executed
967 )
968 updateMeasurementStatistics(
969 workerUsage.elu.idle,
008512c7 970 eluTaskStatisticsRequirements,
e4f20deb
JB
971 message.taskPerformance?.elu?.idle ?? 0,
972 workerUsage.tasks.executed
973 )
008512c7 974 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 975 if (message.taskPerformance?.elu != null) {
f7510105
JB
976 if (workerUsage.elu.utilization != null) {
977 workerUsage.elu.utilization =
978 (workerUsage.elu.utilization +
979 message.taskPerformance.elu.utilization) /
980 2
981 } else {
982 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
983 }
62c15a68
JB
984 }
985 }
986 }
987
280c2a77 988 /**
f06e48d8 989 * Chooses a worker node for the next task.
280c2a77 990 *
6c6afb84 991 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 992 *
aa9eede8 993 * @returns The chosen worker node key
280c2a77 994 */
6c6afb84 995 private chooseWorkerNode (): number {
930dcf12 996 if (this.shallCreateDynamicWorker()) {
aa9eede8 997 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 998 if (
b1aae695 999 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1000 ) {
aa9eede8 1001 return workerNodeKey
6c6afb84 1002 }
17393ac8 1003 }
930dcf12
JB
1004 return this.workerChoiceStrategyContext.execute()
1005 }
1006
6c6afb84
JB
1007 /**
1008 * Conditions for dynamic worker creation.
1009 *
1010 * @returns Whether to create a dynamic worker or not.
1011 */
1012 private shallCreateDynamicWorker (): boolean {
930dcf12 1013 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1014 }
1015
280c2a77 1016 /**
aa9eede8 1017 * Sends a message to worker given its worker node key.
280c2a77 1018 *
aa9eede8 1019 * @param workerNodeKey - The worker node key.
38e795c1 1020 * @param message - The message.
7d91a8cd 1021 * @param transferList - The optional array of transferable objects.
280c2a77
S
1022 */
1023 protected abstract sendToWorker (
aa9eede8 1024 workerNodeKey: number,
7d91a8cd
JB
1025 message: MessageValue<Data>,
1026 transferList?: TransferListItem[]
280c2a77
S
1027 ): void
1028
729c563d 1029 /**
41344292 1030 * Creates a new worker.
6c6afb84
JB
1031 *
1032 * @returns Newly created worker.
729c563d 1033 */
280c2a77 1034 protected abstract createWorker (): Worker
c97c7edb 1035
4a6952ff 1036 /**
aa9eede8 1037 * Creates a new, completely set up worker node.
4a6952ff 1038 *
aa9eede8 1039 * @returns New, completely set up worker node key.
4a6952ff 1040 */
aa9eede8 1041 protected createAndSetupWorkerNode (): number {
bdacc2d2 1042 const worker = this.createWorker()
280c2a77 1043
fd04474e 1044 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1045 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1046 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1047 worker.on('error', (error) => {
aad6fb64 1048 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1049 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1050 workerInfo.ready = false
0dc838e3 1051 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1052 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 1053 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 1054 if (workerInfo.dynamic) {
aa9eede8 1055 this.createAndSetupDynamicWorkerNode()
8a1260a3 1056 } else {
aa9eede8 1057 this.createAndSetupWorkerNode()
8a1260a3 1058 }
5baee0d7 1059 }
19dbc45b 1060 if (this.opts.enableTasksQueue === true) {
9b106837 1061 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1062 }
5baee0d7 1063 })
a35560ba 1064 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1065 worker.once('exit', () => {
f06e48d8 1066 this.removeWorkerNode(worker)
a974afa6 1067 })
280c2a77 1068
aa9eede8 1069 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1070
aa9eede8 1071 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1072
aa9eede8 1073 return workerNodeKey
c97c7edb 1074 }
be0676b3 1075
930dcf12 1076 /**
aa9eede8 1077 * Creates a new, completely set up dynamic worker node.
930dcf12 1078 *
aa9eede8 1079 * @returns New, completely set up dynamic worker node key.
930dcf12 1080 */
aa9eede8
JB
1081 protected createAndSetupDynamicWorkerNode (): number {
1082 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1083 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1084 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1085 message.workerId
aad6fb64 1086 )
aa9eede8 1087 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1088 // Kill message received from worker
930dcf12
JB
1089 if (
1090 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1091 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1092 ((this.opts.enableTasksQueue === false &&
aa9eede8 1093 workerUsage.tasks.executing === 0) ||
7b56f532 1094 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1095 workerUsage.tasks.executing === 0 &&
1096 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1097 ) {
5270d253
JB
1098 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1099 this.emitter?.emit(PoolEvents.error, error)
1100 })
930dcf12
JB
1101 }
1102 })
94407def 1103 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1104 this.sendToWorker(workerNodeKey, {
b0a4db63 1105 checkActive: true,
21f710aa
JB
1106 workerId: workerInfo.id as number
1107 })
b5e113f6 1108 workerInfo.dynamic = true
b1aae695
JB
1109 if (
1110 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1111 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1112 ) {
b5e113f6
JB
1113 workerInfo.ready = true
1114 }
33e6bb4c 1115 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1116 return workerNodeKey
930dcf12
JB
1117 }
1118
a2ed5053 1119 /**
aa9eede8 1120 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1121 *
aa9eede8 1122 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1123 * @param listener - The message listener callback.
1124 */
85aeb3f3
JB
1125 protected abstract registerWorkerMessageListener<
1126 Message extends Data | Response
aa9eede8
JB
1127 >(
1128 workerNodeKey: number,
1129 listener: (message: MessageValue<Message>) => void
1130 ): void
a2ed5053
JB
1131
1132 /**
aa9eede8 1133 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1134 * Can be overridden.
1135 *
aa9eede8 1136 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1137 */
aa9eede8 1138 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1139 // Listen to worker messages.
aa9eede8 1140 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1141 // Send the startup message to worker.
aa9eede8 1142 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1143 // Send the statistics message to worker.
1144 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86
JB
1145 if (this.opts.enableTasksQueue === true) {
1146 this.workerNodes[workerNodeKey].onBackPressure =
1147 this.tasksStealingOnBackPressure.bind(this)
1148 }
d2c73f82
JB
1149 }
1150
85aeb3f3 1151 /**
aa9eede8
JB
1152 * Sends the startup message to worker given its worker node key.
1153 *
1154 * @param workerNodeKey - The worker node key.
1155 */
1156 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1157
1158 /**
9edb9717 1159 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1160 *
aa9eede8 1161 * @param workerNodeKey - The worker node key.
85aeb3f3 1162 */
9edb9717 1163 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1164 this.sendToWorker(workerNodeKey, {
1165 statistics: {
1166 runTime:
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1168 .runTime.aggregate,
1169 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1170 .elu.aggregate
1171 },
94407def 1172 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1173 })
1174 }
a2ed5053
JB
1175
1176 private redistributeQueuedTasks (workerNodeKey: number): void {
1177 while (this.tasksQueueSize(workerNodeKey) > 0) {
1178 let targetWorkerNodeKey: number = workerNodeKey
1179 let minQueuedTasks = Infinity
10ecf8fd 1180 let executeTask = false
a2ed5053 1181 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
72695f86
JB
1182 if (
1183 this.workerNodes[workerNodeId].usage.tasks.executing <
1184 (this.opts.tasksQueueOptions?.concurrency as number)
1185 ) {
1186 executeTask = true
1187 }
a2ed5053
JB
1188 if (
1189 workerNodeId !== workerNodeKey &&
72695f86 1190 workerNode.info.ready &&
a2ed5053
JB
1191 workerNode.usage.tasks.queued === 0
1192 ) {
1193 targetWorkerNodeKey = workerNodeId
1194 break
1195 }
1196 if (
1197 workerNodeId !== workerNodeKey &&
72695f86 1198 workerNode.info.ready &&
a2ed5053
JB
1199 workerNode.usage.tasks.queued < minQueuedTasks
1200 ) {
1201 minQueuedTasks = workerNode.usage.tasks.queued
1202 targetWorkerNodeKey = workerNodeId
1203 }
1204 }
10ecf8fd
JB
1205 if (executeTask) {
1206 this.executeTask(
1207 targetWorkerNodeKey,
251d6ac2 1208 this.dequeueTask(workerNodeKey) as Task<Data>
10ecf8fd
JB
1209 )
1210 } else {
1211 this.enqueueTask(
1212 targetWorkerNodeKey,
251d6ac2 1213 this.dequeueTask(workerNodeKey) as Task<Data>
72695f86
JB
1214 )
1215 }
1216 }
1217 }
1218
1219 private tasksStealingOnBackPressure (workerId: number): void {
1220 const sourceWorkerNode =
1221 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1222 const workerNodes = this.workerNodes
1223 .filter((workerNode) => workerNode.info.id !== workerId)
1224 .sort(
1225 (workerNodeA, workerNodeB) =>
1226 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1227 )
1228 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1229 if (
1230 workerNode.info.ready &&
1231 sourceWorkerNode.usage.tasks.queued > 0 &&
1232 !workerNode.hasBackPressure() &&
1233 workerNode.usage.tasks.executing <
1234 (this.opts.tasksQueueOptions?.concurrency as number)
1235 ) {
1236 this.executeTask(
1237 workerNodeKey,
1238 sourceWorkerNode.popTask() as Task<Data>
1239 )
1240 } else if (
1241 workerNode.info.ready &&
1242 sourceWorkerNode.usage.tasks.queued > 0 &&
1243 !workerNode.hasBackPressure() &&
1244 workerNode.usage.tasks.executing >=
1245 (this.opts.tasksQueueOptions?.concurrency as number)
1246 ) {
1247 this.enqueueTask(
1248 workerNodeKey,
1249 sourceWorkerNode.popTask() as Task<Data>
10ecf8fd
JB
1250 )
1251 }
a2ed5053
JB
1252 }
1253 }
1254
be0676b3 1255 /**
aa9eede8 1256 * This method is the listener registered for each worker message.
be0676b3 1257 *
bdacc2d2 1258 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1259 */
1260 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1261 return (message) => {
21f710aa 1262 this.checkMessageWorkerId(message)
a5d15204 1263 if (message.ready != null && message.taskFunctions != null) {
81c02522 1264 // Worker ready response received from worker
10e2aa7e 1265 this.handleWorkerReadyResponse(message)
7629bdf1 1266 } else if (message.taskId != null) {
81c02522 1267 // Task execution response received from worker
6b272951 1268 this.handleTaskExecutionResponse(message)
90d7d101
JB
1269 } else if (message.taskFunctions != null) {
1270 // Task functions message received from worker
94407def
JB
1271 (
1272 this.getWorkerInfo(
1273 this.getWorkerNodeKeyByWorkerId(message.workerId)
1274 ) as WorkerInfo
b558f6b5 1275 ).taskFunctions = message.taskFunctions
6b272951
JB
1276 }
1277 }
1278 }
1279
10e2aa7e 1280 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1281 if (message.ready === false) {
1282 throw new Error(`Worker ${message.workerId} failed to initialize`)
1283 }
a5d15204 1284 const workerInfo = this.getWorkerInfo(
aad6fb64 1285 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1286 ) as WorkerInfo
a5d15204
JB
1287 workerInfo.ready = message.ready as boolean
1288 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1289 if (this.emitter != null && this.ready) {
1290 this.emitter.emit(PoolEvents.ready, this.info)
1291 }
6b272951
JB
1292 }
1293
1294 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1295 const { taskId, taskError, data } = message
1296 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1297 if (promiseResponse != null) {
5441aea6
JB
1298 if (taskError != null) {
1299 this.emitter?.emit(PoolEvents.taskError, taskError)
1300 promiseResponse.reject(taskError.message)
6b272951 1301 } else {
5441aea6 1302 promiseResponse.resolve(data as Response)
6b272951 1303 }
501aea93
JB
1304 const workerNodeKey = promiseResponse.workerNodeKey
1305 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1306 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1307 if (
1308 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1309 this.tasksQueueSize(workerNodeKey) > 0 &&
1310 this.workerNodes[workerNodeKey].usage.tasks.executing <
1311 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1312 ) {
1313 this.executeTask(
1314 workerNodeKey,
1315 this.dequeueTask(workerNodeKey) as Task<Data>
1316 )
be0676b3 1317 }
6b272951 1318 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1319 }
be0676b3 1320 }
7c0ba920 1321
a1763c54 1322 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1323 if (this.busy) {
1324 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1325 }
1326 }
1327
1328 private checkAndEmitTaskQueuingEvents (): void {
1329 if (this.hasBackPressure()) {
1330 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1331 }
1332 }
1333
33e6bb4c
JB
1334 private checkAndEmitDynamicWorkerCreationEvents (): void {
1335 if (this.type === PoolTypes.dynamic) {
1336 if (this.full) {
1337 this.emitter?.emit(PoolEvents.full, this.info)
1338 }
1339 }
1340 }
1341
8a1260a3 1342 /**
aa9eede8 1343 * Gets the worker information given its worker node key.
8a1260a3
JB
1344 *
1345 * @param workerNodeKey - The worker node key.
3f09ed9f 1346 * @returns The worker information.
8a1260a3 1347 */
94407def
JB
1348 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1349 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1350 }
1351
a05c10de 1352 /**
b0a4db63 1353 * Adds the given worker in the pool worker nodes.
ea7a90d3 1354 *
38e795c1 1355 * @param worker - The worker.
aa9eede8
JB
1356 * @returns The added worker node key.
1357 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1358 */
b0a4db63 1359 private addWorkerNode (worker: Worker): number {
671d5154
JB
1360 const workerNode = new WorkerNode<Worker, Data>(
1361 worker,
1362 this.worker,
20c6f652 1363 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
671d5154 1364 )
b97d82d8 1365 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1366 if (this.starting) {
1367 workerNode.info.ready = true
1368 }
aa9eede8 1369 this.workerNodes.push(workerNode)
aad6fb64 1370 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1371 if (workerNodeKey === -1) {
a5d15204 1372 throw new Error('Worker node added not found')
aa9eede8
JB
1373 }
1374 return workerNodeKey
ea7a90d3 1375 }
c923ce56 1376
51fe3d3c 1377 /**
f06e48d8 1378 * Removes the given worker from the pool worker nodes.
51fe3d3c 1379 *
f06e48d8 1380 * @param worker - The worker.
51fe3d3c 1381 */
416fd65c 1382 private removeWorkerNode (worker: Worker): void {
aad6fb64 1383 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1384 if (workerNodeKey !== -1) {
1385 this.workerNodes.splice(workerNodeKey, 1)
1386 this.workerChoiceStrategyContext.remove(workerNodeKey)
1387 }
51fe3d3c 1388 }
adc3c320 1389
e2b31e32
JB
1390 /** @inheritDoc */
1391 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1392 return (
e2b31e32
JB
1393 this.opts.enableTasksQueue === true &&
1394 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1395 )
1396 }
1397
1398 private hasBackPressure (): boolean {
1399 return (
1400 this.opts.enableTasksQueue === true &&
1401 this.workerNodes.findIndex(
1402 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1403 ) === -1
9e844245 1404 )
e2b31e32
JB
1405 }
1406
b0a4db63 1407 /**
aa9eede8 1408 * Executes the given task on the worker given its worker node key.
b0a4db63 1409 *
aa9eede8 1410 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1411 * @param task - The task to execute.
1412 */
2e81254d 1413 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1414 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1415 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1416 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1417 }
1418
f9f00b5f 1419 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1420 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1421 this.checkAndEmitTaskQueuingEvents()
1422 return tasksQueueSize
adc3c320
JB
1423 }
1424
416fd65c 1425 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1426 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1427 }
1428
416fd65c 1429 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1430 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1431 }
1432
81c02522 1433 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1434 while (this.tasksQueueSize(workerNodeKey) > 0) {
1435 this.executeTask(
1436 workerNodeKey,
1437 this.dequeueTask(workerNodeKey) as Task<Data>
1438 )
ff733df7 1439 }
4b628b48 1440 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1441 }
1442
ef41a6e6
JB
1443 private flushTasksQueues (): void {
1444 for (const [workerNodeKey] of this.workerNodes.entries()) {
1445 this.flushTasksQueue(workerNodeKey)
1446 }
1447 }
c97c7edb 1448}