Merge branch 'master' of github.com:jerome-benoit/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
c4855468 24import {
65d7a1c9 25 type IPool,
7c5a1080 26 PoolEmitter,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
bbfa38a2
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
23ccf9d7 51
729c563d 52/**
ea7a90d3 53 * Base class that implements some shared logic for all poolifier pools.
729c563d 54 *
38e795c1 55 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractPool<
f06e48d8 60 Worker extends IWorker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
c4855468 63> implements IPool<Worker, Data, Response> {
afc003b2 64 /** @inheritDoc */
4b628b48 65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 66
afc003b2 67 /** @inheritDoc */
7c0ba920
JB
68 public readonly emitter?: PoolEmitter
69
be0676b3 70 /**
52b71763 71 * The task execution response promise map.
be0676b3 72 *
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
075e51d1 95 /**
adc9cc64 96 * Whether the pool is starting or not.
075e51d1
JB
97 */
98 private readonly starting: boolean
15b176e0
JB
99 /**
100 * Whether the pool is started or not.
101 */
102 private started: boolean
afe0d5bf
JB
103 /**
104 * The start timestamp of the pool.
105 */
106 private readonly startTimestamp
107
729c563d
S
108 /**
109 * Constructs a new poolifier pool.
110 *
38e795c1 111 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 112 * @param filePath - Path to the worker file.
38e795c1 113 * @param opts - Options for the pool.
729c563d 114 */
c97c7edb 115 public constructor (
b4213b7f
JB
116 protected readonly numberOfWorkers: number,
117 protected readonly filePath: string,
118 protected readonly opts: PoolOptions<Worker>
c97c7edb 119 ) {
78cea37e 120 if (!this.isMain()) {
04f45163 121 throw new Error(
8c6d4acf 122 'Cannot start a pool from a worker with the same type as the pool'
04f45163 123 )
c97c7edb 124 }
8d3782fa 125 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 126 this.checkFilePath(this.filePath)
7c0ba920 127 this.checkPoolOptions(this.opts)
1086026a 128
7254e419
JB
129 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
130 this.executeTask = this.executeTask.bind(this)
131 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 132
6bd72cd0 133 if (this.opts.enableEvents === true) {
7c0ba920
JB
134 this.emitter = new PoolEmitter()
135 }
d59df138
JB
136 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
137 Worker,
138 Data,
139 Response
da309861
JB
140 >(
141 this,
142 this.opts.workerChoiceStrategy,
143 this.opts.workerChoiceStrategyOptions
144 )
b6b32453
JB
145
146 this.setupHook()
147
075e51d1 148 this.starting = true
e761c033 149 this.startPool()
075e51d1 150 this.starting = false
15b176e0 151 this.started = true
afe0d5bf
JB
152
153 this.startTimestamp = performance.now()
c97c7edb
S
154 }
155
a35560ba 156 private checkFilePath (filePath: string): void {
ffcbbad8
JB
157 if (
158 filePath == null ||
3d6dd312 159 typeof filePath !== 'string' ||
ffcbbad8
JB
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
c510fea7
APA
162 throw new Error('Please specify a file with a worker implementation')
163 }
3d6dd312
JB
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
c510fea7
APA
167 }
168
8d3782fa
JB
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
78cea37e 174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 175 throw new TypeError(
0d80593b 176 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
177 )
178 } else if (numberOfWorkers < 0) {
473c717a 179 throw new RangeError(
8d3782fa
JB
180 'Cannot instantiate a pool with a negative number of workers'
181 )
6b27d407 182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 188 if (this.type === PoolTypes.dynamic) {
a5ed75b7 189 if (max == null) {
e695d66f 190 throw new TypeError(
a5ed75b7
JB
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
079de991
JB
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
b97d82d8 201 } else if (max === 0) {
079de991 202 throw new RangeError(
d640b48b 203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
8d3782fa
JB
210 }
211 }
212
7c0ba920 213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
214 if (isPlainObject(opts)) {
215 this.opts.workerChoiceStrategy =
216 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
217 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
218 this.opts.workerChoiceStrategyOptions = {
219 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
220 ...opts.workerChoiceStrategyOptions
221 }
49be33fe
JB
222 this.checkValidWorkerChoiceStrategyOptions(
223 this.opts.workerChoiceStrategyOptions
224 )
1f68cede 225 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
226 this.opts.enableEvents = opts.enableEvents ?? true
227 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
228 if (this.opts.enableTasksQueue) {
229 this.checkValidTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
233 opts.tasksQueueOptions as TasksQueueOptions
234 )
235 }
236 } else {
237 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 238 }
aee46736
JB
239 }
240
241 private checkValidWorkerChoiceStrategy (
242 workerChoiceStrategy: WorkerChoiceStrategy
243 ): void {
244 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 245 throw new Error(
aee46736 246 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
247 )
248 }
7c0ba920
JB
249 }
250
0d80593b
JB
251 private checkValidWorkerChoiceStrategyOptions (
252 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
253 ): void {
254 if (!isPlainObject(workerChoiceStrategyOptions)) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: must be a plain object'
257 )
258 }
8990357d 259 if (
8c0b113f
JB
260 workerChoiceStrategyOptions.retries != null &&
261 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
262 ) {
263 throw new TypeError(
8c0b113f 264 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
265 )
266 }
267 if (
8c0b113f
JB
268 workerChoiceStrategyOptions.retries != null &&
269 workerChoiceStrategyOptions.retries < 0
8990357d
JB
270 ) {
271 throw new RangeError(
8c0b113f 272 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
273 )
274 }
49be33fe
JB
275 if (
276 workerChoiceStrategyOptions.weights != null &&
6b27d407 277 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
278 ) {
279 throw new Error(
280 'Invalid worker choice strategy options: must have a weight for each worker node'
281 )
282 }
f0d7f803
JB
283 if (
284 workerChoiceStrategyOptions.measurement != null &&
285 !Object.values(Measurements).includes(
286 workerChoiceStrategyOptions.measurement
287 )
288 ) {
289 throw new Error(
290 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
291 )
292 }
0d80593b
JB
293 }
294
a20f0ba5 295 private checkValidTasksQueueOptions (
76d91ea0 296 tasksQueueOptions: TasksQueueOptions
a20f0ba5 297 ): void {
0d80593b
JB
298 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
299 throw new TypeError('Invalid tasks queue options: must be a plain object')
300 }
f0d7f803 301 if (
b7d085c4
JB
302 tasksQueueOptions?.concurrency != null &&
303 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
304 ) {
305 throw new TypeError(
20c6f652 306 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
307 )
308 }
309 if (
b7d085c4
JB
310 tasksQueueOptions?.concurrency != null &&
311 tasksQueueOptions?.concurrency <= 0
f0d7f803 312 ) {
e695d66f 313 throw new RangeError(
b7d085c4 314 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
315 )
316 }
b7d085c4 317 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 318 throw new Error(
68dbcdc0 319 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
320 )
321 }
322 if (
b7d085c4
JB
323 tasksQueueOptions?.size != null &&
324 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 325 ) {
ff3f866a 326 throw new TypeError(
68dbcdc0 327 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
328 )
329 }
b7d085c4 330 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 331 throw new RangeError(
b7d085c4 332 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
333 )
334 }
335 }
336
e761c033
JB
337 private startPool (): void {
338 while (
339 this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
342 0
343 ) < this.numberOfWorkers
344 ) {
aa9eede8 345 this.createAndSetupWorkerNode()
e761c033
JB
346 }
347 }
348
08f3f44c 349 /** @inheritDoc */
6b27d407
JB
350 public get info (): PoolInfo {
351 return {
23ccf9d7 352 version,
6b27d407 353 type: this.type,
184855e6 354 worker: this.worker,
2431bdb4
JB
355 ready: this.ready,
356 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
357 minSize: this.minSize,
358 maxSize: this.maxSize,
c05f0d50
JB
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate &&
1305e9a8
JB
361 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
363 workerNodes: this.workerNodes.length,
364 idleWorkerNodes: this.workerNodes.reduce(
365 (accumulator, workerNode) =>
f59e1027 366 workerNode.usage.tasks.executing === 0
a4e07f72
JB
367 ? accumulator + 1
368 : accumulator,
6b27d407
JB
369 0
370 ),
371 busyWorkerNodes: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
f59e1027 373 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
374 0
375 ),
a4e07f72 376 executedTasks: this.workerNodes.reduce(
6b27d407 377 (accumulator, workerNode) =>
f59e1027 378 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
379 0
380 ),
381 executingTasks: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
384 0
385 ),
daf86646
JB
386 ...(this.opts.enableTasksQueue === true && {
387 queuedTasks: this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + workerNode.usage.tasks.queued,
390 0
391 )
392 }),
393 ...(this.opts.enableTasksQueue === true && {
394 maxQueuedTasks: this.workerNodes.reduce(
395 (accumulator, workerNode) =>
396 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
397 0
398 )
399 }),
a1763c54
JB
400 ...(this.opts.enableTasksQueue === true && {
401 backPressure: this.hasBackPressure()
402 }),
68cbdc84
JB
403 ...(this.opts.enableTasksQueue === true && {
404 stolenTasks: this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + workerNode.usage.tasks.stolen,
407 0
408 )
409 }),
a4e07f72
JB
410 failedTasks: this.workerNodes.reduce(
411 (accumulator, workerNode) =>
f59e1027 412 accumulator + workerNode.usage.tasks.failed,
a4e07f72 413 0
1dcf8b7b
JB
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .runTime.aggregate && {
417 runTime: {
98e72cda 418 minimum: round(
90d6701c 419 min(
98e72cda 420 ...this.workerNodes.map(
041dc05b 421 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 422 )
1dcf8b7b
JB
423 )
424 ),
98e72cda 425 maximum: round(
90d6701c 426 max(
98e72cda 427 ...this.workerNodes.map(
041dc05b 428 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 429 )
1dcf8b7b 430 )
98e72cda 431 ),
3baa0837
JB
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.average && {
434 average: round(
435 average(
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.runTime.history),
439 []
440 )
98e72cda 441 )
dc021bcc 442 )
3baa0837 443 }),
98e72cda
JB
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .runTime.median && {
446 median: round(
447 median(
3baa0837
JB
448 this.workerNodes.reduce<number[]>(
449 (accumulator, workerNode) =>
450 accumulator.concat(workerNode.usage.runTime.history),
451 []
98e72cda
JB
452 )
453 )
454 )
455 })
1dcf8b7b
JB
456 }
457 }),
458 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
459 .waitTime.aggregate && {
460 waitTime: {
98e72cda 461 minimum: round(
90d6701c 462 min(
98e72cda 463 ...this.workerNodes.map(
041dc05b 464 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 465 )
1dcf8b7b
JB
466 )
467 ),
98e72cda 468 maximum: round(
90d6701c 469 max(
98e72cda 470 ...this.workerNodes.map(
041dc05b 471 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 472 )
1dcf8b7b 473 )
98e72cda 474 ),
3baa0837
JB
475 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
476 .waitTime.average && {
477 average: round(
478 average(
479 this.workerNodes.reduce<number[]>(
480 (accumulator, workerNode) =>
481 accumulator.concat(workerNode.usage.waitTime.history),
482 []
483 )
98e72cda 484 )
dc021bcc 485 )
3baa0837 486 }),
98e72cda
JB
487 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
488 .waitTime.median && {
489 median: round(
490 median(
3baa0837
JB
491 this.workerNodes.reduce<number[]>(
492 (accumulator, workerNode) =>
493 accumulator.concat(workerNode.usage.waitTime.history),
494 []
98e72cda
JB
495 )
496 )
497 )
498 })
1dcf8b7b
JB
499 }
500 })
6b27d407
JB
501 }
502 }
08f3f44c 503
aa9eede8
JB
504 /**
505 * The pool readiness boolean status.
506 */
2431bdb4
JB
507 private get ready (): boolean {
508 return (
b97d82d8
JB
509 this.workerNodes.reduce(
510 (accumulator, workerNode) =>
511 !workerNode.info.dynamic && workerNode.info.ready
512 ? accumulator + 1
513 : accumulator,
514 0
515 ) >= this.minSize
2431bdb4
JB
516 )
517 }
518
afe0d5bf 519 /**
aa9eede8 520 * The approximate pool utilization.
afe0d5bf
JB
521 *
522 * @returns The pool utilization.
523 */
524 private get utilization (): number {
8e5ca040 525 const poolTimeCapacity =
fe7d90db 526 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
527 const totalTasksRunTime = this.workerNodes.reduce(
528 (accumulator, workerNode) =>
71514351 529 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
530 0
531 )
532 const totalTasksWaitTime = this.workerNodes.reduce(
533 (accumulator, workerNode) =>
71514351 534 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
535 0
536 )
8e5ca040 537 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
538 }
539
8881ae32 540 /**
aa9eede8 541 * The pool type.
8881ae32
JB
542 *
543 * If it is `'dynamic'`, it provides the `max` property.
544 */
545 protected abstract get type (): PoolType
546
184855e6 547 /**
aa9eede8 548 * The worker type.
184855e6
JB
549 */
550 protected abstract get worker (): WorkerType
551
c2ade475 552 /**
aa9eede8 553 * The pool minimum size.
c2ade475 554 */
8735b4e5
JB
555 protected get minSize (): number {
556 return this.numberOfWorkers
557 }
ff733df7
JB
558
559 /**
aa9eede8 560 * The pool maximum size.
ff733df7 561 */
8735b4e5
JB
562 protected get maxSize (): number {
563 return this.max ?? this.numberOfWorkers
564 }
a35560ba 565
6b813701
JB
566 /**
567 * Checks if the worker id sent in the received message from a worker is valid.
568 *
569 * @param message - The received message.
570 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
571 */
21f710aa 572 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
573 if (message.workerId == null) {
574 throw new Error('Worker message received without worker id')
575 } else if (
21f710aa 576 message.workerId != null &&
aad6fb64 577 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
578 ) {
579 throw new Error(
580 `Worker message received from unknown worker '${message.workerId}'`
581 )
582 }
583 }
584
ffcbbad8 585 /**
f06e48d8 586 * Gets the given worker its worker node key.
ffcbbad8
JB
587 *
588 * @param worker - The worker.
f59e1027 589 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 590 */
aad6fb64 591 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 592 return this.workerNodes.findIndex(
041dc05b 593 workerNode => workerNode.worker === worker
f06e48d8 594 )
bf9549ae
JB
595 }
596
aa9eede8
JB
597 /**
598 * Gets the worker node key given its worker id.
599 *
600 * @param workerId - The worker id.
aad6fb64 601 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 602 */
aad6fb64
JB
603 private getWorkerNodeKeyByWorkerId (workerId: number): number {
604 return this.workerNodes.findIndex(
041dc05b 605 workerNode => workerNode.info.id === workerId
aad6fb64 606 )
aa9eede8
JB
607 }
608
afc003b2 609 /** @inheritDoc */
a35560ba 610 public setWorkerChoiceStrategy (
59219cbb
JB
611 workerChoiceStrategy: WorkerChoiceStrategy,
612 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 613 ): void {
aee46736 614 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 615 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
616 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
617 this.opts.workerChoiceStrategy
618 )
619 if (workerChoiceStrategyOptions != null) {
620 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
621 }
aa9eede8 622 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 623 workerNode.resetUsage()
9edb9717 624 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 625 }
a20f0ba5
JB
626 }
627
628 /** @inheritDoc */
629 public setWorkerChoiceStrategyOptions (
630 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
631 ): void {
0d80593b 632 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
633 this.opts.workerChoiceStrategyOptions = {
634 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
635 ...workerChoiceStrategyOptions
636 }
a20f0ba5
JB
637 this.workerChoiceStrategyContext.setOptions(
638 this.opts.workerChoiceStrategyOptions
a35560ba
S
639 )
640 }
641
a20f0ba5 642 /** @inheritDoc */
8f52842f
JB
643 public enableTasksQueue (
644 enable: boolean,
645 tasksQueueOptions?: TasksQueueOptions
646 ): void {
a20f0ba5 647 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 648 this.flushTasksQueues()
a20f0ba5
JB
649 }
650 this.opts.enableTasksQueue = enable
8f52842f 651 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
652 }
653
654 /** @inheritDoc */
8f52842f 655 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 656 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
657 this.checkValidTasksQueueOptions(tasksQueueOptions)
658 this.opts.tasksQueueOptions =
659 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 660 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 661 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
662 delete this.opts.tasksQueueOptions
663 }
664 }
665
5b49e864 666 private setTasksQueueSize (size: number): void {
20c6f652 667 for (const workerNode of this.workerNodes) {
ff3f866a 668 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
669 }
670 }
671
a20f0ba5
JB
672 private buildTasksQueueOptions (
673 tasksQueueOptions: TasksQueueOptions
674 ): TasksQueueOptions {
675 return {
20c6f652 676 ...{
ff3f866a 677 size: Math.pow(this.maxSize, 2),
20c6f652
JB
678 concurrency: 1
679 },
680 ...tasksQueueOptions
a20f0ba5
JB
681 }
682 }
683
c319c66b
JB
684 /**
685 * Whether the pool is full or not.
686 *
687 * The pool filling boolean status.
688 */
dea903a8
JB
689 protected get full (): boolean {
690 return this.workerNodes.length >= this.maxSize
691 }
c2ade475 692
c319c66b
JB
693 /**
694 * Whether the pool is busy or not.
695 *
696 * The pool busyness boolean status.
697 */
698 protected abstract get busy (): boolean
7c0ba920 699
6c6afb84 700 /**
3d76750a 701 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
702 *
703 * @returns Worker nodes busyness boolean status.
704 */
c2ade475 705 protected internalBusy (): boolean {
3d76750a
JB
706 if (this.opts.enableTasksQueue === true) {
707 return (
708 this.workerNodes.findIndex(
041dc05b 709 workerNode =>
3d76750a
JB
710 workerNode.info.ready &&
711 workerNode.usage.tasks.executing <
712 (this.opts.tasksQueueOptions?.concurrency as number)
713 ) === -1
714 )
715 } else {
716 return (
717 this.workerNodes.findIndex(
041dc05b 718 workerNode =>
3d76750a
JB
719 workerNode.info.ready && workerNode.usage.tasks.executing === 0
720 ) === -1
721 )
722 }
cb70b19d
JB
723 }
724
90d7d101
JB
725 /** @inheritDoc */
726 public listTaskFunctions (): string[] {
f2dbbf95
JB
727 for (const workerNode of this.workerNodes) {
728 if (
729 Array.isArray(workerNode.info.taskFunctions) &&
730 workerNode.info.taskFunctions.length > 0
731 ) {
732 return workerNode.info.taskFunctions
733 }
90d7d101 734 }
f2dbbf95 735 return []
90d7d101
JB
736 }
737
375f7504
JB
738 private shallExecuteTask (workerNodeKey: number): boolean {
739 return (
740 this.tasksQueueSize(workerNodeKey) === 0 &&
741 this.workerNodes[workerNodeKey].usage.tasks.executing <
742 (this.opts.tasksQueueOptions?.concurrency as number)
743 )
744 }
745
afc003b2 746 /** @inheritDoc */
7d91a8cd
JB
747 public async execute (
748 data?: Data,
749 name?: string,
750 transferList?: TransferListItem[]
751 ): Promise<Response> {
52b71763 752 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
753 if (!this.started) {
754 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 755 return
15b176e0 756 }
7d91a8cd
JB
757 if (name != null && typeof name !== 'string') {
758 reject(new TypeError('name argument must be a string'))
9d2d0da1 759 return
7d91a8cd 760 }
90d7d101
JB
761 if (
762 name != null &&
763 typeof name === 'string' &&
764 name.trim().length === 0
765 ) {
f58b60b9 766 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 767 return
90d7d101 768 }
b558f6b5
JB
769 if (transferList != null && !Array.isArray(transferList)) {
770 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 771 return
b558f6b5
JB
772 }
773 const timestamp = performance.now()
774 const workerNodeKey = this.chooseWorkerNode()
501aea93 775 const task: Task<Data> = {
52b71763
JB
776 name: name ?? DEFAULT_TASK_NAME,
777 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
778 data: data ?? ({} as Data),
7d91a8cd 779 transferList,
52b71763 780 timestamp,
1a28f967 781 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 782 taskId: randomUUID()
52b71763 783 }
7629bdf1 784 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
785 resolve,
786 reject,
501aea93 787 workerNodeKey
2e81254d 788 })
52b71763 789 if (
4e377863
JB
790 this.opts.enableTasksQueue === false ||
791 (this.opts.enableTasksQueue === true &&
375f7504 792 this.shallExecuteTask(workerNodeKey))
52b71763 793 ) {
501aea93 794 this.executeTask(workerNodeKey, task)
4e377863
JB
795 } else {
796 this.enqueueTask(workerNodeKey, task)
52b71763 797 }
2e81254d 798 })
280c2a77 799 }
c97c7edb 800
afc003b2 801 /** @inheritDoc */
c97c7edb 802 public async destroy (): Promise<void> {
1fbcaa7c 803 await Promise.all(
81c02522 804 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 805 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
806 })
807 )
33e6bb4c 808 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 809 this.started = false
c97c7edb
S
810 }
811
1e3214b6
JB
812 protected async sendKillMessageToWorker (
813 workerNodeKey: number,
814 workerId: number
815 ): Promise<void> {
9edb9717 816 await new Promise<void>((resolve, reject) => {
041dc05b 817 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
818 if (message.kill === 'success') {
819 resolve()
820 } else if (message.kill === 'failure') {
e1af34e6 821 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
822 }
823 })
9edb9717 824 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 825 })
1e3214b6
JB
826 }
827
4a6952ff 828 /**
aa9eede8 829 * Terminates the worker node given its worker node key.
4a6952ff 830 *
aa9eede8 831 * @param workerNodeKey - The worker node key.
4a6952ff 832 */
81c02522 833 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 834
729c563d 835 /**
6677a3d3
JB
836 * Setup hook to execute code before worker nodes are created in the abstract constructor.
837 * Can be overridden.
afc003b2
JB
838 *
839 * @virtual
729c563d 840 */
280c2a77 841 protected setupHook (): void {
965df41c 842 /* Intentionally empty */
280c2a77 843 }
c97c7edb 844
729c563d 845 /**
280c2a77
S
846 * Should return whether the worker is the main worker or not.
847 */
848 protected abstract isMain (): boolean
849
850 /**
2e81254d 851 * Hook executed before the worker task execution.
bf9549ae 852 * Can be overridden.
729c563d 853 *
f06e48d8 854 * @param workerNodeKey - The worker node key.
1c6fe997 855 * @param task - The task to execute.
729c563d 856 */
1c6fe997
JB
857 protected beforeTaskExecutionHook (
858 workerNodeKey: number,
859 task: Task<Data>
860 ): void {
94407def
JB
861 if (this.workerNodes[workerNodeKey]?.usage != null) {
862 const workerUsage = this.workerNodes[workerNodeKey].usage
863 ++workerUsage.tasks.executing
864 this.updateWaitTimeWorkerUsage(workerUsage, task)
865 }
866 if (
867 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
868 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
869 task.name as string
870 ) != null
871 ) {
db0e38ee 872 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 873 workerNodeKey
db0e38ee 874 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
875 ++taskFunctionWorkerUsage.tasks.executing
876 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 877 }
c97c7edb
S
878 }
879
c01733f1 880 /**
2e81254d 881 * Hook executed after the worker task execution.
bf9549ae 882 * Can be overridden.
c01733f1 883 *
501aea93 884 * @param workerNodeKey - The worker node key.
38e795c1 885 * @param message - The received message.
c01733f1 886 */
2e81254d 887 protected afterTaskExecutionHook (
501aea93 888 workerNodeKey: number,
2740a743 889 message: MessageValue<Response>
bf9549ae 890 ): void {
94407def
JB
891 if (this.workerNodes[workerNodeKey]?.usage != null) {
892 const workerUsage = this.workerNodes[workerNodeKey].usage
893 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
894 this.updateRunTimeWorkerUsage(workerUsage, message)
895 this.updateEluWorkerUsage(workerUsage, message)
896 }
897 if (
898 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
899 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 900 message.taskPerformance?.name as string
94407def
JB
901 ) != null
902 ) {
db0e38ee 903 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 904 workerNodeKey
db0e38ee 905 ].getTaskFunctionWorkerUsage(
0628755c 906 message.taskPerformance?.name as string
b558f6b5 907 ) as WorkerUsage
db0e38ee
JB
908 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
909 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
910 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
911 }
912 }
913
db0e38ee
JB
914 /**
915 * Whether the worker node shall update its task function worker usage or not.
916 *
917 * @param workerNodeKey - The worker node key.
918 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
919 */
920 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 921 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 922 return (
94407def 923 workerInfo != null &&
a5d15204 924 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 925 workerInfo.taskFunctions.length > 2
b558f6b5 926 )
f1c06930
JB
927 }
928
929 private updateTaskStatisticsWorkerUsage (
930 workerUsage: WorkerUsage,
931 message: MessageValue<Response>
932 ): void {
a4e07f72 933 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
934 if (
935 workerTaskStatistics.executing != null &&
936 workerTaskStatistics.executing > 0
937 ) {
938 --workerTaskStatistics.executing
5bb5be17 939 }
98e72cda
JB
940 if (message.taskError == null) {
941 ++workerTaskStatistics.executed
942 } else {
a4e07f72 943 ++workerTaskStatistics.failed
2740a743 944 }
f8eb0a2a
JB
945 }
946
a4e07f72
JB
947 private updateRunTimeWorkerUsage (
948 workerUsage: WorkerUsage,
f8eb0a2a
JB
949 message: MessageValue<Response>
950 ): void {
dc021bcc
JB
951 if (message.taskError != null) {
952 return
953 }
e4f20deb
JB
954 updateMeasurementStatistics(
955 workerUsage.runTime,
956 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 957 message.taskPerformance?.runTime ?? 0
e4f20deb 958 )
f8eb0a2a
JB
959 }
960
a4e07f72
JB
961 private updateWaitTimeWorkerUsage (
962 workerUsage: WorkerUsage,
1c6fe997 963 task: Task<Data>
f8eb0a2a 964 ): void {
1c6fe997
JB
965 const timestamp = performance.now()
966 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
967 updateMeasurementStatistics(
968 workerUsage.waitTime,
969 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 970 taskWaitTime
e4f20deb 971 )
c01733f1 972 }
973
a4e07f72 974 private updateEluWorkerUsage (
5df69fab 975 workerUsage: WorkerUsage,
62c15a68
JB
976 message: MessageValue<Response>
977 ): void {
dc021bcc
JB
978 if (message.taskError != null) {
979 return
980 }
008512c7
JB
981 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
982 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
983 updateMeasurementStatistics(
984 workerUsage.elu.active,
008512c7 985 eluTaskStatisticsRequirements,
dc021bcc 986 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
987 )
988 updateMeasurementStatistics(
989 workerUsage.elu.idle,
008512c7 990 eluTaskStatisticsRequirements,
dc021bcc 991 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 992 )
008512c7 993 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 994 if (message.taskPerformance?.elu != null) {
f7510105
JB
995 if (workerUsage.elu.utilization != null) {
996 workerUsage.elu.utilization =
997 (workerUsage.elu.utilization +
998 message.taskPerformance.elu.utilization) /
999 2
1000 } else {
1001 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1002 }
62c15a68
JB
1003 }
1004 }
1005 }
1006
280c2a77 1007 /**
f06e48d8 1008 * Chooses a worker node for the next task.
280c2a77 1009 *
6c6afb84 1010 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1011 *
aa9eede8 1012 * @returns The chosen worker node key
280c2a77 1013 */
6c6afb84 1014 private chooseWorkerNode (): number {
930dcf12 1015 if (this.shallCreateDynamicWorker()) {
aa9eede8 1016 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1017 if (
b1aae695 1018 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1019 ) {
aa9eede8 1020 return workerNodeKey
6c6afb84 1021 }
17393ac8 1022 }
930dcf12
JB
1023 return this.workerChoiceStrategyContext.execute()
1024 }
1025
6c6afb84
JB
1026 /**
1027 * Conditions for dynamic worker creation.
1028 *
1029 * @returns Whether to create a dynamic worker or not.
1030 */
1031 private shallCreateDynamicWorker (): boolean {
930dcf12 1032 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1033 }
1034
280c2a77 1035 /**
aa9eede8 1036 * Sends a message to worker given its worker node key.
280c2a77 1037 *
aa9eede8 1038 * @param workerNodeKey - The worker node key.
38e795c1 1039 * @param message - The message.
7d91a8cd 1040 * @param transferList - The optional array of transferable objects.
280c2a77
S
1041 */
1042 protected abstract sendToWorker (
aa9eede8 1043 workerNodeKey: number,
7d91a8cd
JB
1044 message: MessageValue<Data>,
1045 transferList?: TransferListItem[]
280c2a77
S
1046 ): void
1047
729c563d 1048 /**
41344292 1049 * Creates a new worker.
6c6afb84
JB
1050 *
1051 * @returns Newly created worker.
729c563d 1052 */
280c2a77 1053 protected abstract createWorker (): Worker
c97c7edb 1054
4a6952ff 1055 /**
aa9eede8 1056 * Creates a new, completely set up worker node.
4a6952ff 1057 *
aa9eede8 1058 * @returns New, completely set up worker node key.
4a6952ff 1059 */
aa9eede8 1060 protected createAndSetupWorkerNode (): number {
bdacc2d2 1061 const worker = this.createWorker()
280c2a77 1062
fd04474e 1063 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1064 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1065 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1066 worker.on('error', error => {
aad6fb64 1067 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1068 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1069 workerInfo.ready = false
0dc838e3 1070 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1071 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1072 if (
1073 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1074 this.started &&
1075 !this.starting
15b176e0 1076 ) {
9b106837 1077 if (workerInfo.dynamic) {
aa9eede8 1078 this.createAndSetupDynamicWorkerNode()
8a1260a3 1079 } else {
aa9eede8 1080 this.createAndSetupWorkerNode()
8a1260a3 1081 }
5baee0d7 1082 }
19dbc45b 1083 if (this.opts.enableTasksQueue === true) {
9b106837 1084 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1085 }
5baee0d7 1086 })
a35560ba 1087 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1088 worker.once('exit', () => {
f06e48d8 1089 this.removeWorkerNode(worker)
a974afa6 1090 })
280c2a77 1091
aa9eede8 1092 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1093
aa9eede8 1094 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1095
aa9eede8 1096 return workerNodeKey
c97c7edb 1097 }
be0676b3 1098
930dcf12 1099 /**
aa9eede8 1100 * Creates a new, completely set up dynamic worker node.
930dcf12 1101 *
aa9eede8 1102 * @returns New, completely set up dynamic worker node key.
930dcf12 1103 */
aa9eede8
JB
1104 protected createAndSetupDynamicWorkerNode (): number {
1105 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1106 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1107 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1108 message.workerId
aad6fb64 1109 )
aa9eede8 1110 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1111 // Kill message received from worker
930dcf12
JB
1112 if (
1113 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1114 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1115 ((this.opts.enableTasksQueue === false &&
aa9eede8 1116 workerUsage.tasks.executing === 0) ||
7b56f532 1117 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1118 workerUsage.tasks.executing === 0 &&
1119 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1120 ) {
041dc05b 1121 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1122 this.emitter?.emit(PoolEvents.error, error)
1123 })
930dcf12
JB
1124 }
1125 })
46b0bb09 1126 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1127 this.sendToWorker(workerNodeKey, {
b0a4db63 1128 checkActive: true,
21f710aa
JB
1129 workerId: workerInfo.id as number
1130 })
b5e113f6 1131 workerInfo.dynamic = true
b1aae695
JB
1132 if (
1133 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1134 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1135 ) {
b5e113f6
JB
1136 workerInfo.ready = true
1137 }
33e6bb4c 1138 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1139 return workerNodeKey
930dcf12
JB
1140 }
1141
a2ed5053 1142 /**
aa9eede8 1143 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1144 *
aa9eede8 1145 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1146 * @param listener - The message listener callback.
1147 */
85aeb3f3
JB
1148 protected abstract registerWorkerMessageListener<
1149 Message extends Data | Response
aa9eede8
JB
1150 >(
1151 workerNodeKey: number,
1152 listener: (message: MessageValue<Message>) => void
1153 ): void
a2ed5053
JB
1154
1155 /**
aa9eede8 1156 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1157 * Can be overridden.
1158 *
aa9eede8 1159 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1160 */
aa9eede8 1161 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1162 // Listen to worker messages.
aa9eede8 1163 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1164 // Send the startup message to worker.
aa9eede8 1165 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1166 // Send the statistics message to worker.
1167 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1168 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1169 this.workerNodes[workerNodeKey].onEmptyQueue =
1170 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1171 this.workerNodes[workerNodeKey].onBackPressure =
1172 this.tasksStealingOnBackPressure.bind(this)
1173 }
d2c73f82
JB
1174 }
1175
85aeb3f3 1176 /**
aa9eede8
JB
1177 * Sends the startup message to worker given its worker node key.
1178 *
1179 * @param workerNodeKey - The worker node key.
1180 */
1181 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1182
1183 /**
9edb9717 1184 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1185 *
aa9eede8 1186 * @param workerNodeKey - The worker node key.
85aeb3f3 1187 */
9edb9717 1188 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1189 this.sendToWorker(workerNodeKey, {
1190 statistics: {
1191 runTime:
1192 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1193 .runTime.aggregate,
1194 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1195 .elu.aggregate
1196 },
46b0bb09 1197 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1198 })
1199 }
a2ed5053
JB
1200
1201 private redistributeQueuedTasks (workerNodeKey: number): void {
1202 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1203 const destinationWorkerNodeKey = this.workerNodes.reduce(
1204 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1205 return workerNode.info.ready &&
1206 workerNode.usage.tasks.queued <
1207 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1208 ? workerNodeKey
1209 : minWorkerNodeKey
1210 },
1211 0
1212 )
3f690f25
JB
1213 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1214 const task = {
1215 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1216 workerId: destinationWorkerNode.info.id as number
1217 }
1218 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1219 this.executeTask(destinationWorkerNodeKey, task)
1220 } else {
1221 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1222 }
1223 }
1224 }
1225
b1838604
JB
1226 private updateTaskStolenStatisticsWorkerUsage (
1227 workerNodeKey: number,
b1838604
JB
1228 taskName: string
1229 ): void {
1a880eca 1230 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1231 if (workerNode?.usage != null) {
1232 ++workerNode.usage.tasks.stolen
1233 }
1234 if (
1235 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1236 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1237 ) {
1238 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1239 taskName
1240 ) as WorkerUsage
1241 ++taskFunctionWorkerUsage.tasks.stolen
1242 }
1243 }
1244
dd951876 1245 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1246 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1247 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1248 const workerNodes = this.workerNodes
a6b3272b 1249 .slice()
dd951876
JB
1250 .sort(
1251 (workerNodeA, workerNodeB) =>
1252 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1253 )
f201a0cd 1254 const sourceWorkerNode = workerNodes.find(
041dc05b 1255 workerNode =>
f201a0cd
JB
1256 workerNode.info.ready &&
1257 workerNode.info.id !== workerId &&
1258 workerNode.usage.tasks.queued > 0
1259 )
1260 if (sourceWorkerNode != null) {
1261 const task = {
1262 ...(sourceWorkerNode.popTask() as Task<Data>),
1263 workerId: destinationWorkerNode.info.id as number
0bc68267 1264 }
f201a0cd
JB
1265 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1266 this.executeTask(destinationWorkerNodeKey, task)
1267 } else {
1268 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1269 }
f201a0cd
JB
1270 this.updateTaskStolenStatisticsWorkerUsage(
1271 destinationWorkerNodeKey,
1272 task.name as string
1273 )
72695f86
JB
1274 }
1275 }
1276
1277 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1278 const sizeOffset = 1
1279 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1280 return
1281 }
72695f86
JB
1282 const sourceWorkerNode =
1283 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1284 const workerNodes = this.workerNodes
a6b3272b 1285 .slice()
72695f86
JB
1286 .sort(
1287 (workerNodeA, workerNodeB) =>
1288 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1289 )
1290 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1291 if (
0bc68267 1292 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1293 workerNode.info.ready &&
1294 workerNode.info.id !== workerId &&
0bc68267 1295 workerNode.usage.tasks.queued <
f778c355 1296 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1297 ) {
dd951876
JB
1298 const task = {
1299 ...(sourceWorkerNode.popTask() as Task<Data>),
1300 workerId: workerNode.info.id as number
1301 }
375f7504 1302 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1303 this.executeTask(workerNodeKey, task)
4de3d785 1304 } else {
dd951876 1305 this.enqueueTask(workerNodeKey, task)
4de3d785 1306 }
b1838604
JB
1307 this.updateTaskStolenStatisticsWorkerUsage(
1308 workerNodeKey,
b1838604
JB
1309 task.name as string
1310 )
10ecf8fd 1311 }
a2ed5053
JB
1312 }
1313 }
1314
be0676b3 1315 /**
aa9eede8 1316 * This method is the listener registered for each worker message.
be0676b3 1317 *
bdacc2d2 1318 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1319 */
1320 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1321 return message => {
21f710aa 1322 this.checkMessageWorkerId(message)
a5d15204 1323 if (message.ready != null && message.taskFunctions != null) {
81c02522 1324 // Worker ready response received from worker
10e2aa7e 1325 this.handleWorkerReadyResponse(message)
7629bdf1 1326 } else if (message.taskId != null) {
81c02522 1327 // Task execution response received from worker
6b272951 1328 this.handleTaskExecutionResponse(message)
90d7d101
JB
1329 } else if (message.taskFunctions != null) {
1330 // Task functions message received from worker
46b0bb09
JB
1331 this.getWorkerInfo(
1332 this.getWorkerNodeKeyByWorkerId(message.workerId)
b558f6b5 1333 ).taskFunctions = message.taskFunctions
6b272951
JB
1334 }
1335 }
1336 }
1337
10e2aa7e 1338 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1339 if (message.ready === false) {
1340 throw new Error(`Worker ${message.workerId} failed to initialize`)
1341 }
a5d15204 1342 const workerInfo = this.getWorkerInfo(
aad6fb64 1343 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1344 )
a5d15204
JB
1345 workerInfo.ready = message.ready as boolean
1346 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1347 if (this.emitter != null && this.ready) {
1348 this.emitter.emit(PoolEvents.ready, this.info)
1349 }
6b272951
JB
1350 }
1351
1352 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1353 const { taskId, taskError, data } = message
1354 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1355 if (promiseResponse != null) {
5441aea6
JB
1356 if (taskError != null) {
1357 this.emitter?.emit(PoolEvents.taskError, taskError)
1358 promiseResponse.reject(taskError.message)
6b272951 1359 } else {
5441aea6 1360 promiseResponse.resolve(data as Response)
6b272951 1361 }
501aea93
JB
1362 const workerNodeKey = promiseResponse.workerNodeKey
1363 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1364 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1365 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1366 if (
1367 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1368 this.tasksQueueSize(workerNodeKey) > 0 &&
1369 this.workerNodes[workerNodeKey].usage.tasks.executing <
1370 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1371 ) {
1372 this.executeTask(
1373 workerNodeKey,
1374 this.dequeueTask(workerNodeKey) as Task<Data>
1375 )
be0676b3
APA
1376 }
1377 }
be0676b3 1378 }
7c0ba920 1379
a1763c54 1380 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1381 if (this.busy) {
1382 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1383 }
1384 }
1385
1386 private checkAndEmitTaskQueuingEvents (): void {
1387 if (this.hasBackPressure()) {
1388 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1389 }
1390 }
1391
33e6bb4c
JB
1392 private checkAndEmitDynamicWorkerCreationEvents (): void {
1393 if (this.type === PoolTypes.dynamic) {
1394 if (this.full) {
1395 this.emitter?.emit(PoolEvents.full, this.info)
1396 }
1397 }
1398 }
1399
8a1260a3 1400 /**
aa9eede8 1401 * Gets the worker information given its worker node key.
8a1260a3
JB
1402 *
1403 * @param workerNodeKey - The worker node key.
3f09ed9f 1404 * @returns The worker information.
8a1260a3 1405 */
46b0bb09
JB
1406 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1407 return this.workerNodes[workerNodeKey].info
e221309a
JB
1408 }
1409
a05c10de 1410 /**
b0a4db63 1411 * Adds the given worker in the pool worker nodes.
ea7a90d3 1412 *
38e795c1 1413 * @param worker - The worker.
aa9eede8
JB
1414 * @returns The added worker node key.
1415 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1416 */
b0a4db63 1417 private addWorkerNode (worker: Worker): number {
671d5154
JB
1418 const workerNode = new WorkerNode<Worker, Data>(
1419 worker,
ff3f866a 1420 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1421 )
b97d82d8 1422 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1423 if (this.starting) {
1424 workerNode.info.ready = true
1425 }
aa9eede8 1426 this.workerNodes.push(workerNode)
aad6fb64 1427 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1428 if (workerNodeKey === -1) {
86ed0598 1429 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1430 }
1431 return workerNodeKey
ea7a90d3 1432 }
c923ce56 1433
51fe3d3c 1434 /**
f06e48d8 1435 * Removes the given worker from the pool worker nodes.
51fe3d3c 1436 *
f06e48d8 1437 * @param worker - The worker.
51fe3d3c 1438 */
416fd65c 1439 private removeWorkerNode (worker: Worker): void {
aad6fb64 1440 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1441 if (workerNodeKey !== -1) {
1442 this.workerNodes.splice(workerNodeKey, 1)
1443 this.workerChoiceStrategyContext.remove(workerNodeKey)
1444 }
51fe3d3c 1445 }
adc3c320 1446
e2b31e32
JB
1447 /** @inheritDoc */
1448 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1449 return (
e2b31e32
JB
1450 this.opts.enableTasksQueue === true &&
1451 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1452 )
1453 }
1454
1455 private hasBackPressure (): boolean {
1456 return (
1457 this.opts.enableTasksQueue === true &&
1458 this.workerNodes.findIndex(
041dc05b 1459 workerNode => !workerNode.hasBackPressure()
a1763c54 1460 ) === -1
9e844245 1461 )
e2b31e32
JB
1462 }
1463
b0a4db63 1464 /**
aa9eede8 1465 * Executes the given task on the worker given its worker node key.
b0a4db63 1466 *
aa9eede8 1467 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1468 * @param task - The task to execute.
1469 */
2e81254d 1470 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1471 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1472 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1473 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1474 }
1475
f9f00b5f 1476 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1477 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1478 this.checkAndEmitTaskQueuingEvents()
1479 return tasksQueueSize
adc3c320
JB
1480 }
1481
416fd65c 1482 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1483 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1484 }
1485
416fd65c 1486 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1487 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1488 }
1489
81c02522 1490 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1491 while (this.tasksQueueSize(workerNodeKey) > 0) {
1492 this.executeTask(
1493 workerNodeKey,
1494 this.dequeueTask(workerNodeKey) as Task<Data>
1495 )
ff733df7 1496 }
4b628b48 1497 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1498 }
1499
ef41a6e6
JB
1500 private flushTasksQueues (): void {
1501 for (const [workerNodeKey] of this.workerNodes.entries()) {
1502 this.flushTasksQueue(workerNodeKey)
1503 }
1504 }
c97c7edb 1505}