feat: change pool event emitter to event emitter async resource
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
76d91ea0 7 Task
5c4d16da 8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
dc021bcc 13 average,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
90d6701c 16 max,
afe0d5bf 17 median,
90d6701c 18 min,
bfc75cca 19 round
bbeadd16 20} from '../utils'
59317253 21import { KillBehaviors } from '../worker/worker-options'
6703b9f4 22import type { TaskFunction } from '../worker/task-functions'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
4d159167
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
bde6b5d7
JB
50import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
bfc75cca
JB
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
bde6b5d7 55} from './utils'
23ccf9d7 56
729c563d 57/**
ea7a90d3 58 * Base class that implements some shared logic for all poolifier pools.
729c563d 59 *
38e795c1 60 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 63 */
c97c7edb 64export abstract class AbstractPool<
f06e48d8 65 Worker extends IWorker,
d3c8a1a8
S
66 Data = unknown,
67 Response = unknown
c4855468 68> implements IPool<Worker, Data, Response> {
afc003b2 69 /** @inheritDoc */
4b628b48 70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 71
afc003b2 72 /** @inheritDoc */
b5604034 73 public emitter?: PoolEmitter
7c0ba920 74
be0676b3 75 /**
419e8121 76 * The task execution response promise map:
2740a743 77 * - `key`: The message id of each submitted task.
a3445496 78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 79 *
a3445496 80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 81 */
501aea93
JB
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 84
a35560ba 85 /**
51fe3d3c 86 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
89 Worker,
90 Data,
91 Response
a35560ba
S
92 >
93
8735b4e5
JB
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
72ae84a2
JB
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
15b176e0
JB
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
47352846
JB
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
afe0d5bf
JB
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
729c563d
S
119 /**
120 * Constructs a new poolifier pool.
121 *
38e795c1 122 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 123 * @param filePath - Path to the worker file.
38e795c1 124 * @param opts - Options for the pool.
729c563d 125 */
c97c7edb 126 public constructor (
b4213b7f
JB
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
c97c7edb 130 ) {
78cea37e 131 if (!this.isMain()) {
04f45163 132 throw new Error(
8c6d4acf 133 'Cannot start a pool from a worker with the same type as the pool'
04f45163 134 )
c97c7edb 135 }
bde6b5d7 136 checkFilePath(this.filePath)
8003c026 137 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 138 this.checkPoolOptions(this.opts)
1086026a 139
7254e419
JB
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 143
6bd72cd0 144 if (this.opts.enableEvents === true) {
b5604034 145 this.initializeEventEmitter()
7c0ba920 146 }
d59df138
JB
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
da309861
JB
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
b6b32453
JB
156
157 this.setupHook()
158
72ae84a2
JB
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
47352846 161 this.started = false
075e51d1 162 this.starting = false
47352846
JB
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
afe0d5bf
JB
166
167 this.startTimestamp = performance.now()
c97c7edb
S
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
7c0ba920 188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 189 if (isPlainObject(opts)) {
47352846 190 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 191 checkValidWorkerChoiceStrategy(
2324f8c9
JB
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
0d80593b
JB
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
8990357d
JB
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
bde6b5d7 207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 214 }
aee46736
JB
215 }
216
0d80593b
JB
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
2324f8c9
JB
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
0d80593b
JB
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
8990357d 228 if (
2324f8c9 229 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
231 ) {
232 throw new TypeError(
8c0b113f 233 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
234 )
235 }
236 if (
2324f8c9 237 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 238 workerChoiceStrategyOptions.retries < 0
8990357d
JB
239 ) {
240 throw new RangeError(
8c0b113f 241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
242 )
243 }
49be33fe 244 if (
2324f8c9 245 workerChoiceStrategyOptions?.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803 252 if (
2324f8c9 253 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
b5604034
JB
264 private initializeEventEmitter (): void {
265 this.emitter = new PoolEmitter({
266 name: `poolifier:${this.type}-${this.worker}-pool`
267 })
268 }
269
08f3f44c 270 /** @inheritDoc */
6b27d407
JB
271 public get info (): PoolInfo {
272 return {
23ccf9d7 273 version,
6b27d407 274 type: this.type,
184855e6 275 worker: this.worker,
47352846 276 started: this.started,
2431bdb4
JB
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
279 minSize: this.minSize,
280 maxSize: this.maxSize,
c05f0d50
JB
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
1305e9a8
JB
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
f59e1027 288 workerNode.usage.tasks.executing === 0
a4e07f72
JB
289 ? accumulator + 1
290 : accumulator,
6b27d407
JB
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
296 0
297 ),
a4e07f72 298 executedTasks: this.workerNodes.reduce(
6b27d407 299 (accumulator, workerNode) =>
f59e1027 300 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
306 0
307 ),
daf86646
JB
308 ...(this.opts.enableTasksQueue === true && {
309 queuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.queued,
312 0
313 )
314 }),
315 ...(this.opts.enableTasksQueue === true && {
316 maxQueuedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
319 0
320 )
321 }),
a1763c54
JB
322 ...(this.opts.enableTasksQueue === true && {
323 backPressure: this.hasBackPressure()
324 }),
68cbdc84
JB
325 ...(this.opts.enableTasksQueue === true && {
326 stolenTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.stolen,
329 0
330 )
331 }),
a4e07f72
JB
332 failedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
f59e1027 334 accumulator + workerNode.usage.tasks.failed,
a4e07f72 335 0
1dcf8b7b
JB
336 ),
337 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 .runTime.aggregate && {
339 runTime: {
98e72cda 340 minimum: round(
90d6701c 341 min(
98e72cda 342 ...this.workerNodes.map(
041dc05b 343 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 344 )
1dcf8b7b
JB
345 )
346 ),
98e72cda 347 maximum: round(
90d6701c 348 max(
98e72cda 349 ...this.workerNodes.map(
041dc05b 350 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 351 )
1dcf8b7b 352 )
98e72cda 353 ),
3baa0837
JB
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.average && {
356 average: round(
357 average(
358 this.workerNodes.reduce<number[]>(
359 (accumulator, workerNode) =>
360 accumulator.concat(workerNode.usage.runTime.history),
361 []
362 )
98e72cda 363 )
dc021bcc 364 )
3baa0837 365 }),
98e72cda
JB
366 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .runTime.median && {
368 median: round(
369 median(
3baa0837
JB
370 this.workerNodes.reduce<number[]>(
371 (accumulator, workerNode) =>
372 accumulator.concat(workerNode.usage.runTime.history),
373 []
98e72cda
JB
374 )
375 )
376 )
377 })
1dcf8b7b
JB
378 }
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .waitTime.aggregate && {
382 waitTime: {
98e72cda 383 minimum: round(
90d6701c 384 min(
98e72cda 385 ...this.workerNodes.map(
041dc05b 386 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 387 )
1dcf8b7b
JB
388 )
389 ),
98e72cda 390 maximum: round(
90d6701c 391 max(
98e72cda 392 ...this.workerNodes.map(
041dc05b 393 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 394 )
1dcf8b7b 395 )
98e72cda 396 ),
3baa0837
JB
397 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
398 .waitTime.average && {
399 average: round(
400 average(
401 this.workerNodes.reduce<number[]>(
402 (accumulator, workerNode) =>
403 accumulator.concat(workerNode.usage.waitTime.history),
404 []
405 )
98e72cda 406 )
dc021bcc 407 )
3baa0837 408 }),
98e72cda
JB
409 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
410 .waitTime.median && {
411 median: round(
412 median(
3baa0837
JB
413 this.workerNodes.reduce<number[]>(
414 (accumulator, workerNode) =>
415 accumulator.concat(workerNode.usage.waitTime.history),
416 []
98e72cda
JB
417 )
418 )
419 )
420 })
1dcf8b7b
JB
421 }
422 })
6b27d407
JB
423 }
424 }
08f3f44c 425
aa9eede8
JB
426 /**
427 * The pool readiness boolean status.
428 */
2431bdb4
JB
429 private get ready (): boolean {
430 return (
b97d82d8
JB
431 this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 !workerNode.info.dynamic && workerNode.info.ready
434 ? accumulator + 1
435 : accumulator,
436 0
437 ) >= this.minSize
2431bdb4
JB
438 )
439 }
440
afe0d5bf 441 /**
aa9eede8 442 * The approximate pool utilization.
afe0d5bf
JB
443 *
444 * @returns The pool utilization.
445 */
446 private get utilization (): number {
8e5ca040 447 const poolTimeCapacity =
fe7d90db 448 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
449 const totalTasksRunTime = this.workerNodes.reduce(
450 (accumulator, workerNode) =>
71514351 451 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
452 0
453 )
454 const totalTasksWaitTime = this.workerNodes.reduce(
455 (accumulator, workerNode) =>
71514351 456 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
457 0
458 )
8e5ca040 459 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
460 }
461
8881ae32 462 /**
aa9eede8 463 * The pool type.
8881ae32
JB
464 *
465 * If it is `'dynamic'`, it provides the `max` property.
466 */
467 protected abstract get type (): PoolType
468
184855e6 469 /**
aa9eede8 470 * The worker type.
184855e6
JB
471 */
472 protected abstract get worker (): WorkerType
473
c2ade475 474 /**
aa9eede8 475 * The pool minimum size.
c2ade475 476 */
8735b4e5
JB
477 protected get minSize (): number {
478 return this.numberOfWorkers
479 }
ff733df7
JB
480
481 /**
aa9eede8 482 * The pool maximum size.
ff733df7 483 */
8735b4e5
JB
484 protected get maxSize (): number {
485 return this.max ?? this.numberOfWorkers
486 }
a35560ba 487
6b813701
JB
488 /**
489 * Checks if the worker id sent in the received message from a worker is valid.
490 *
491 * @param message - The received message.
492 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
493 */
4d159167 494 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
495 if (message.workerId == null) {
496 throw new Error('Worker message received without worker id')
497 } else if (
21f710aa 498 message.workerId != null &&
aad6fb64 499 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
500 ) {
501 throw new Error(
502 `Worker message received from unknown worker '${message.workerId}'`
503 )
504 }
505 }
506
ffcbbad8 507 /**
f06e48d8 508 * Gets the given worker its worker node key.
ffcbbad8
JB
509 *
510 * @param worker - The worker.
f59e1027 511 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 512 */
aad6fb64 513 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 514 return this.workerNodes.findIndex(
041dc05b 515 workerNode => workerNode.worker === worker
f06e48d8 516 )
bf9549ae
JB
517 }
518
aa9eede8
JB
519 /**
520 * Gets the worker node key given its worker id.
521 *
522 * @param workerId - The worker id.
aad6fb64 523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 524 */
72ae84a2 525 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 526 return this.workerNodes.findIndex(
041dc05b 527 workerNode => workerNode.info.id === workerId
aad6fb64 528 )
aa9eede8
JB
529 }
530
afc003b2 531 /** @inheritDoc */
a35560ba 532 public setWorkerChoiceStrategy (
59219cbb
JB
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 535 ): void {
bde6b5d7 536 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 537 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
538 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
aa9eede8 544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 545 workerNode.resetUsage()
9edb9717 546 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 547 }
a20f0ba5
JB
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
553 ): void {
0d80593b 554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
555 this.opts.workerChoiceStrategyOptions = {
556 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
557 ...workerChoiceStrategyOptions
558 }
a20f0ba5
JB
559 this.workerChoiceStrategyContext.setOptions(
560 this.opts.workerChoiceStrategyOptions
a35560ba
S
561 )
562 }
563
a20f0ba5 564 /** @inheritDoc */
8f52842f
JB
565 public enableTasksQueue (
566 enable: boolean,
567 tasksQueueOptions?: TasksQueueOptions
568 ): void {
a20f0ba5 569 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
570 this.unsetTaskStealing()
571 this.unsetTasksStealingOnBackPressure()
ef41a6e6 572 this.flushTasksQueues()
a20f0ba5
JB
573 }
574 this.opts.enableTasksQueue = enable
8f52842f 575 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
576 }
577
578 /** @inheritDoc */
8f52842f 579 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true) {
bde6b5d7 581 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 584 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
585 if (this.opts.tasksQueueOptions.taskStealing === true) {
586 this.setTaskStealing()
587 } else {
588 this.unsetTaskStealing()
589 }
590 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
591 this.setTasksStealingOnBackPressure()
592 } else {
593 this.unsetTasksStealingOnBackPressure()
594 }
5baee0d7 595 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
596 delete this.opts.tasksQueueOptions
597 }
598 }
599
9b38ab2d
JB
600 private buildTasksQueueOptions (
601 tasksQueueOptions: TasksQueueOptions
602 ): TasksQueueOptions {
603 return {
604 ...{
605 size: Math.pow(this.maxSize, 2),
606 concurrency: 1,
607 taskStealing: true,
608 tasksStealingOnBackPressure: true
609 },
610 ...tasksQueueOptions
611 }
612 }
613
5b49e864 614 private setTasksQueueSize (size: number): void {
20c6f652 615 for (const workerNode of this.workerNodes) {
ff3f866a 616 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
617 }
618 }
619
d6ca1416
JB
620 private setTaskStealing (): void {
621 for (const [workerNodeKey] of this.workerNodes.entries()) {
622 this.workerNodes[workerNodeKey].onEmptyQueue =
623 this.taskStealingOnEmptyQueue.bind(this)
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 delete this.workerNodes[workerNodeKey].onEmptyQueue
630 }
631 }
632
633 private setTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].onBackPressure =
636 this.tasksStealingOnBackPressure.bind(this)
637 }
638 }
639
640 private unsetTasksStealingOnBackPressure (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
642 delete this.workerNodes[workerNodeKey].onBackPressure
643 }
644 }
645
c319c66b
JB
646 /**
647 * Whether the pool is full or not.
648 *
649 * The pool filling boolean status.
650 */
dea903a8
JB
651 protected get full (): boolean {
652 return this.workerNodes.length >= this.maxSize
653 }
c2ade475 654
c319c66b
JB
655 /**
656 * Whether the pool is busy or not.
657 *
658 * The pool busyness boolean status.
659 */
660 protected abstract get busy (): boolean
7c0ba920 661
6c6afb84 662 /**
3d76750a 663 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
664 *
665 * @returns Worker nodes busyness boolean status.
666 */
c2ade475 667 protected internalBusy (): boolean {
3d76750a
JB
668 if (this.opts.enableTasksQueue === true) {
669 return (
670 this.workerNodes.findIndex(
041dc05b 671 workerNode =>
3d76750a
JB
672 workerNode.info.ready &&
673 workerNode.usage.tasks.executing <
674 (this.opts.tasksQueueOptions?.concurrency as number)
675 ) === -1
676 )
3d76750a 677 }
419e8121
JB
678 return (
679 this.workerNodes.findIndex(
680 workerNode =>
681 workerNode.info.ready && workerNode.usage.tasks.executing === 0
682 ) === -1
683 )
cb70b19d
JB
684 }
685
e81c38f2 686 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
687 workerNodeKey: number,
688 message: MessageValue<Data>
689 ): Promise<boolean> {
72ae84a2 690 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
691 const taskFunctionOperationListener = (
692 message: MessageValue<Response>
693 ): void => {
694 this.checkMessageWorkerId(message)
695 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 696 if (
ae036c3e
JB
697 message.taskFunctionOperationStatus != null &&
698 message.workerId === workerId
72ae84a2 699 ) {
ae036c3e
JB
700 if (message.taskFunctionOperationStatus) {
701 resolve(true)
702 } else if (!message.taskFunctionOperationStatus) {
703 reject(
704 new Error(
705 `Task function operation '${
706 message.taskFunctionOperation as string
707 }' failed on worker ${message.workerId} with error: '${
708 message.workerError?.message as string
709 }'`
710 )
72ae84a2 711 )
ae036c3e
JB
712 }
713 this.deregisterWorkerMessageListener(
714 this.getWorkerNodeKeyByWorkerId(message.workerId),
715 taskFunctionOperationListener
72ae84a2
JB
716 )
717 }
ae036c3e
JB
718 }
719 this.registerWorkerMessageListener(
720 workerNodeKey,
721 taskFunctionOperationListener
722 )
72ae84a2
JB
723 this.sendToWorker(workerNodeKey, message)
724 })
725 }
726
727 private async sendTaskFunctionOperationToWorkers (
adee6053 728 message: MessageValue<Data>
e81c38f2
JB
729 ): Promise<boolean> {
730 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
731 const responsesReceived = new Array<MessageValue<Response>>()
732 const taskFunctionOperationsListener = (
733 message: MessageValue<Response>
734 ): void => {
735 this.checkMessageWorkerId(message)
736 if (message.taskFunctionOperationStatus != null) {
737 responsesReceived.push(message)
738 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 739 if (
e81c38f2
JB
740 responsesReceived.every(
741 message => message.taskFunctionOperationStatus === true
742 )
743 ) {
744 resolve(true)
745 } else if (
e81c38f2
JB
746 responsesReceived.some(
747 message => message.taskFunctionOperationStatus === false
748 )
749 ) {
b0b55f57
JB
750 const errorResponse = responsesReceived.find(
751 response => response.taskFunctionOperationStatus === false
752 )
e81c38f2
JB
753 reject(
754 new Error(
b0b55f57 755 `Task function operation '${
e81c38f2 756 message.taskFunctionOperation as string
b0b55f57
JB
757 }' failed on worker ${
758 errorResponse?.workerId as number
759 } with error: '${
760 errorResponse?.workerError?.message as string
761 }'`
e81c38f2
JB
762 )
763 )
764 }
ae036c3e
JB
765 this.deregisterWorkerMessageListener(
766 this.getWorkerNodeKeyByWorkerId(message.workerId),
767 taskFunctionOperationsListener
768 )
e81c38f2 769 }
ae036c3e
JB
770 }
771 }
772 for (const [workerNodeKey] of this.workerNodes.entries()) {
773 this.registerWorkerMessageListener(
774 workerNodeKey,
775 taskFunctionOperationsListener
776 )
72ae84a2 777 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
778 }
779 })
6703b9f4
JB
780 }
781
782 /** @inheritDoc */
783 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
784 for (const workerNode of this.workerNodes) {
785 if (
786 Array.isArray(workerNode.info.taskFunctionNames) &&
787 workerNode.info.taskFunctionNames.includes(name)
788 ) {
789 return true
790 }
791 }
792 return false
6703b9f4
JB
793 }
794
795 /** @inheritDoc */
e81c38f2
JB
796 public async addTaskFunction (
797 name: string,
3feeab69 798 fn: TaskFunction<Data, Response>
e81c38f2 799 ): Promise<boolean> {
3feeab69
JB
800 if (typeof name !== 'string') {
801 throw new TypeError('name argument must be a string')
802 }
803 if (typeof name === 'string' && name.trim().length === 0) {
804 throw new TypeError('name argument must not be an empty string')
805 }
806 if (typeof fn !== 'function') {
807 throw new TypeError('fn argument must be a function')
808 }
adee6053 809 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
810 taskFunctionOperation: 'add',
811 taskFunctionName: name,
3feeab69 812 taskFunction: fn.toString()
6703b9f4 813 })
adee6053
JB
814 this.taskFunctions.set(name, fn)
815 return opResult
6703b9f4
JB
816 }
817
818 /** @inheritDoc */
e81c38f2 819 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
820 if (!this.taskFunctions.has(name)) {
821 throw new Error(
16248b23 822 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
823 )
824 }
adee6053 825 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
826 taskFunctionOperation: 'remove',
827 taskFunctionName: name
828 })
adee6053
JB
829 this.deleteTaskFunctionWorkerUsages(name)
830 this.taskFunctions.delete(name)
831 return opResult
6703b9f4
JB
832 }
833
90d7d101 834 /** @inheritDoc */
6703b9f4 835 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
836 for (const workerNode of this.workerNodes) {
837 if (
6703b9f4
JB
838 Array.isArray(workerNode.info.taskFunctionNames) &&
839 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 840 ) {
6703b9f4 841 return workerNode.info.taskFunctionNames
f2dbbf95 842 }
90d7d101 843 }
f2dbbf95 844 return []
90d7d101
JB
845 }
846
6703b9f4 847 /** @inheritDoc */
e81c38f2 848 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 849 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
850 taskFunctionOperation: 'default',
851 taskFunctionName: name
852 })
6703b9f4
JB
853 }
854
adee6053
JB
855 private deleteTaskFunctionWorkerUsages (name: string): void {
856 for (const workerNode of this.workerNodes) {
857 workerNode.deleteTaskFunctionWorkerUsage(name)
858 }
859 }
860
375f7504
JB
861 private shallExecuteTask (workerNodeKey: number): boolean {
862 return (
863 this.tasksQueueSize(workerNodeKey) === 0 &&
864 this.workerNodes[workerNodeKey].usage.tasks.executing <
865 (this.opts.tasksQueueOptions?.concurrency as number)
866 )
867 }
868
afc003b2 869 /** @inheritDoc */
7d91a8cd
JB
870 public async execute (
871 data?: Data,
872 name?: string,
873 transferList?: TransferListItem[]
874 ): Promise<Response> {
52b71763 875 return await new Promise<Response>((resolve, reject) => {
15b176e0 876 if (!this.started) {
47352846 877 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 878 return
15b176e0 879 }
7d91a8cd
JB
880 if (name != null && typeof name !== 'string') {
881 reject(new TypeError('name argument must be a string'))
9d2d0da1 882 return
7d91a8cd 883 }
90d7d101
JB
884 if (
885 name != null &&
886 typeof name === 'string' &&
887 name.trim().length === 0
888 ) {
f58b60b9 889 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 890 return
90d7d101 891 }
b558f6b5
JB
892 if (transferList != null && !Array.isArray(transferList)) {
893 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 894 return
b558f6b5
JB
895 }
896 const timestamp = performance.now()
897 const workerNodeKey = this.chooseWorkerNode()
501aea93 898 const task: Task<Data> = {
52b71763
JB
899 name: name ?? DEFAULT_TASK_NAME,
900 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
901 data: data ?? ({} as Data),
7d91a8cd 902 transferList,
52b71763 903 timestamp,
7629bdf1 904 taskId: randomUUID()
52b71763 905 }
7629bdf1 906 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
907 resolve,
908 reject,
501aea93 909 workerNodeKey
2e81254d 910 })
52b71763 911 if (
4e377863
JB
912 this.opts.enableTasksQueue === false ||
913 (this.opts.enableTasksQueue === true &&
375f7504 914 this.shallExecuteTask(workerNodeKey))
52b71763 915 ) {
501aea93 916 this.executeTask(workerNodeKey, task)
4e377863
JB
917 } else {
918 this.enqueueTask(workerNodeKey, task)
52b71763 919 }
2e81254d 920 })
280c2a77 921 }
c97c7edb 922
47352846
JB
923 /** @inheritdoc */
924 public start (): void {
925 this.starting = true
926 while (
927 this.workerNodes.reduce(
928 (accumulator, workerNode) =>
929 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
930 0
931 ) < this.numberOfWorkers
932 ) {
933 this.createAndSetupWorkerNode()
934 }
935 this.starting = false
936 this.started = true
937 }
938
afc003b2 939 /** @inheritDoc */
c97c7edb 940 public async destroy (): Promise<void> {
1fbcaa7c 941 await Promise.all(
81c02522 942 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 943 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
944 })
945 )
33e6bb4c 946 this.emitter?.emit(PoolEvents.destroy, this.info)
b5604034 947 this.emitter?.emitDestroy()
15b176e0 948 this.started = false
c97c7edb
S
949 }
950
1e3214b6 951 protected async sendKillMessageToWorker (
72ae84a2 952 workerNodeKey: number
1e3214b6 953 ): Promise<void> {
9edb9717 954 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
955 const killMessageListener = (message: MessageValue<Response>): void => {
956 this.checkMessageWorkerId(message)
1e3214b6
JB
957 if (message.kill === 'success') {
958 resolve()
959 } else if (message.kill === 'failure') {
72ae84a2
JB
960 reject(
961 new Error(
ae036c3e 962 `Kill message handling failed on worker ${
72ae84a2 963 message.workerId as number
ae036c3e 964 }`
72ae84a2
JB
965 )
966 )
1e3214b6 967 }
ae036c3e
JB
968 }
969 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 970 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 971 })
1e3214b6
JB
972 }
973
4a6952ff 974 /**
aa9eede8 975 * Terminates the worker node given its worker node key.
4a6952ff 976 *
aa9eede8 977 * @param workerNodeKey - The worker node key.
4a6952ff 978 */
81c02522 979 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 980
729c563d 981 /**
6677a3d3
JB
982 * Setup hook to execute code before worker nodes are created in the abstract constructor.
983 * Can be overridden.
afc003b2
JB
984 *
985 * @virtual
729c563d 986 */
280c2a77 987 protected setupHook (): void {
965df41c 988 /* Intentionally empty */
280c2a77 989 }
c97c7edb 990
729c563d 991 /**
280c2a77
S
992 * Should return whether the worker is the main worker or not.
993 */
994 protected abstract isMain (): boolean
995
996 /**
2e81254d 997 * Hook executed before the worker task execution.
bf9549ae 998 * Can be overridden.
729c563d 999 *
f06e48d8 1000 * @param workerNodeKey - The worker node key.
1c6fe997 1001 * @param task - The task to execute.
729c563d 1002 */
1c6fe997
JB
1003 protected beforeTaskExecutionHook (
1004 workerNodeKey: number,
1005 task: Task<Data>
1006 ): void {
94407def
JB
1007 if (this.workerNodes[workerNodeKey]?.usage != null) {
1008 const workerUsage = this.workerNodes[workerNodeKey].usage
1009 ++workerUsage.tasks.executing
1010 this.updateWaitTimeWorkerUsage(workerUsage, task)
1011 }
1012 if (
1013 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1014 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1015 task.name as string
1016 ) != null
1017 ) {
db0e38ee 1018 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1019 workerNodeKey
db0e38ee 1020 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1021 ++taskFunctionWorkerUsage.tasks.executing
1022 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1023 }
c97c7edb
S
1024 }
1025
c01733f1 1026 /**
2e81254d 1027 * Hook executed after the worker task execution.
bf9549ae 1028 * Can be overridden.
c01733f1 1029 *
501aea93 1030 * @param workerNodeKey - The worker node key.
38e795c1 1031 * @param message - The received message.
c01733f1 1032 */
2e81254d 1033 protected afterTaskExecutionHook (
501aea93 1034 workerNodeKey: number,
2740a743 1035 message: MessageValue<Response>
bf9549ae 1036 ): void {
94407def
JB
1037 if (this.workerNodes[workerNodeKey]?.usage != null) {
1038 const workerUsage = this.workerNodes[workerNodeKey].usage
1039 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1040 this.updateRunTimeWorkerUsage(workerUsage, message)
1041 this.updateEluWorkerUsage(workerUsage, message)
1042 }
1043 if (
1044 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1045 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1046 message.taskPerformance?.name as string
94407def
JB
1047 ) != null
1048 ) {
db0e38ee 1049 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1050 workerNodeKey
db0e38ee 1051 ].getTaskFunctionWorkerUsage(
0628755c 1052 message.taskPerformance?.name as string
b558f6b5 1053 ) as WorkerUsage
db0e38ee
JB
1054 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1055 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1056 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1057 }
1058 }
1059
db0e38ee
JB
1060 /**
1061 * Whether the worker node shall update its task function worker usage or not.
1062 *
1063 * @param workerNodeKey - The worker node key.
1064 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1065 */
1066 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1067 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1068 return (
94407def 1069 workerInfo != null &&
6703b9f4
JB
1070 Array.isArray(workerInfo.taskFunctionNames) &&
1071 workerInfo.taskFunctionNames.length > 2
b558f6b5 1072 )
f1c06930
JB
1073 }
1074
1075 private updateTaskStatisticsWorkerUsage (
1076 workerUsage: WorkerUsage,
1077 message: MessageValue<Response>
1078 ): void {
a4e07f72 1079 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1080 if (
1081 workerTaskStatistics.executing != null &&
1082 workerTaskStatistics.executing > 0
1083 ) {
1084 --workerTaskStatistics.executing
5bb5be17 1085 }
6703b9f4 1086 if (message.workerError == null) {
98e72cda
JB
1087 ++workerTaskStatistics.executed
1088 } else {
a4e07f72 1089 ++workerTaskStatistics.failed
2740a743 1090 }
f8eb0a2a
JB
1091 }
1092
a4e07f72
JB
1093 private updateRunTimeWorkerUsage (
1094 workerUsage: WorkerUsage,
f8eb0a2a
JB
1095 message: MessageValue<Response>
1096 ): void {
6703b9f4 1097 if (message.workerError != null) {
dc021bcc
JB
1098 return
1099 }
e4f20deb
JB
1100 updateMeasurementStatistics(
1101 workerUsage.runTime,
1102 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1103 message.taskPerformance?.runTime ?? 0
e4f20deb 1104 )
f8eb0a2a
JB
1105 }
1106
a4e07f72
JB
1107 private updateWaitTimeWorkerUsage (
1108 workerUsage: WorkerUsage,
1c6fe997 1109 task: Task<Data>
f8eb0a2a 1110 ): void {
1c6fe997
JB
1111 const timestamp = performance.now()
1112 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1113 updateMeasurementStatistics(
1114 workerUsage.waitTime,
1115 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1116 taskWaitTime
e4f20deb 1117 )
c01733f1 1118 }
1119
a4e07f72 1120 private updateEluWorkerUsage (
5df69fab 1121 workerUsage: WorkerUsage,
62c15a68
JB
1122 message: MessageValue<Response>
1123 ): void {
6703b9f4 1124 if (message.workerError != null) {
dc021bcc
JB
1125 return
1126 }
008512c7
JB
1127 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1128 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1129 updateMeasurementStatistics(
1130 workerUsage.elu.active,
008512c7 1131 eluTaskStatisticsRequirements,
dc021bcc 1132 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1133 )
1134 updateMeasurementStatistics(
1135 workerUsage.elu.idle,
008512c7 1136 eluTaskStatisticsRequirements,
dc021bcc 1137 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1138 )
008512c7 1139 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1140 if (message.taskPerformance?.elu != null) {
f7510105
JB
1141 if (workerUsage.elu.utilization != null) {
1142 workerUsage.elu.utilization =
1143 (workerUsage.elu.utilization +
1144 message.taskPerformance.elu.utilization) /
1145 2
1146 } else {
1147 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1148 }
62c15a68
JB
1149 }
1150 }
1151 }
1152
280c2a77 1153 /**
f06e48d8 1154 * Chooses a worker node for the next task.
280c2a77 1155 *
6c6afb84 1156 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1157 *
aa9eede8 1158 * @returns The chosen worker node key
280c2a77 1159 */
6c6afb84 1160 private chooseWorkerNode (): number {
930dcf12 1161 if (this.shallCreateDynamicWorker()) {
aa9eede8 1162 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1163 if (
b1aae695 1164 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1165 ) {
aa9eede8 1166 return workerNodeKey
6c6afb84 1167 }
17393ac8 1168 }
930dcf12
JB
1169 return this.workerChoiceStrategyContext.execute()
1170 }
1171
6c6afb84
JB
1172 /**
1173 * Conditions for dynamic worker creation.
1174 *
1175 * @returns Whether to create a dynamic worker or not.
1176 */
1177 private shallCreateDynamicWorker (): boolean {
930dcf12 1178 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1179 }
1180
280c2a77 1181 /**
aa9eede8 1182 * Sends a message to worker given its worker node key.
280c2a77 1183 *
aa9eede8 1184 * @param workerNodeKey - The worker node key.
38e795c1 1185 * @param message - The message.
7d91a8cd 1186 * @param transferList - The optional array of transferable objects.
280c2a77
S
1187 */
1188 protected abstract sendToWorker (
aa9eede8 1189 workerNodeKey: number,
7d91a8cd
JB
1190 message: MessageValue<Data>,
1191 transferList?: TransferListItem[]
280c2a77
S
1192 ): void
1193
729c563d 1194 /**
41344292 1195 * Creates a new worker.
6c6afb84
JB
1196 *
1197 * @returns Newly created worker.
729c563d 1198 */
280c2a77 1199 protected abstract createWorker (): Worker
c97c7edb 1200
4a6952ff 1201 /**
aa9eede8 1202 * Creates a new, completely set up worker node.
4a6952ff 1203 *
aa9eede8 1204 * @returns New, completely set up worker node key.
4a6952ff 1205 */
aa9eede8 1206 protected createAndSetupWorkerNode (): number {
bdacc2d2 1207 const worker = this.createWorker()
280c2a77 1208
fd04474e 1209 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1210 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1211 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1212 worker.on('error', error => {
aad6fb64 1213 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1214 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1215 workerInfo.ready = false
0dc838e3 1216 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1217 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1218 if (
b6bfca01 1219 this.started &&
9b38ab2d
JB
1220 !this.starting &&
1221 this.opts.restartWorkerOnError === true
15b176e0 1222 ) {
9b106837 1223 if (workerInfo.dynamic) {
aa9eede8 1224 this.createAndSetupDynamicWorkerNode()
8a1260a3 1225 } else {
aa9eede8 1226 this.createAndSetupWorkerNode()
8a1260a3 1227 }
5baee0d7 1228 }
9b38ab2d 1229 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1230 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1231 }
5baee0d7 1232 })
a35560ba 1233 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1234 worker.once('exit', () => {
f06e48d8 1235 this.removeWorkerNode(worker)
a974afa6 1236 })
280c2a77 1237
aa9eede8 1238 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1239
aa9eede8 1240 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1241
aa9eede8 1242 return workerNodeKey
c97c7edb 1243 }
be0676b3 1244
930dcf12 1245 /**
aa9eede8 1246 * Creates a new, completely set up dynamic worker node.
930dcf12 1247 *
aa9eede8 1248 * @returns New, completely set up dynamic worker node key.
930dcf12 1249 */
aa9eede8
JB
1250 protected createAndSetupDynamicWorkerNode (): number {
1251 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1252 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1253 this.checkMessageWorkerId(message)
aa9eede8
JB
1254 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1255 message.workerId
aad6fb64 1256 )
aa9eede8 1257 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1258 // Kill message received from worker
930dcf12
JB
1259 if (
1260 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1261 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1262 ((this.opts.enableTasksQueue === false &&
aa9eede8 1263 workerUsage.tasks.executing === 0) ||
7b56f532 1264 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1265 workerUsage.tasks.executing === 0 &&
1266 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1267 ) {
041dc05b 1268 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1269 this.emitter?.emit(PoolEvents.error, error)
1270 })
930dcf12
JB
1271 }
1272 })
46b0bb09 1273 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1274 this.sendToWorker(workerNodeKey, {
e9dd5b66 1275 checkActive: true
21f710aa 1276 })
72ae84a2
JB
1277 if (this.taskFunctions.size > 0) {
1278 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1279 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1280 taskFunctionOperation: 'add',
1281 taskFunctionName,
1282 taskFunction: taskFunction.toString()
1283 }).catch(error => {
1284 this.emitter?.emit(PoolEvents.error, error)
1285 })
1286 }
1287 }
b5e113f6 1288 workerInfo.dynamic = true
b1aae695
JB
1289 if (
1290 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1291 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1292 ) {
b5e113f6
JB
1293 workerInfo.ready = true
1294 }
33e6bb4c 1295 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1296 return workerNodeKey
930dcf12
JB
1297 }
1298
a2ed5053 1299 /**
aa9eede8 1300 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1301 *
aa9eede8 1302 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1303 * @param listener - The message listener callback.
1304 */
85aeb3f3
JB
1305 protected abstract registerWorkerMessageListener<
1306 Message extends Data | Response
aa9eede8
JB
1307 >(
1308 workerNodeKey: number,
1309 listener: (message: MessageValue<Message>) => void
1310 ): void
a2ed5053 1311
ae036c3e
JB
1312 /**
1313 * Registers once a listener callback on the worker given its worker node key.
1314 *
1315 * @param workerNodeKey - The worker node key.
1316 * @param listener - The message listener callback.
1317 */
1318 protected abstract registerOnceWorkerMessageListener<
1319 Message extends Data | Response
1320 >(
1321 workerNodeKey: number,
1322 listener: (message: MessageValue<Message>) => void
1323 ): void
1324
1325 /**
1326 * Deregisters a listener callback on the worker given its worker node key.
1327 *
1328 * @param workerNodeKey - The worker node key.
1329 * @param listener - The message listener callback.
1330 */
1331 protected abstract deregisterWorkerMessageListener<
1332 Message extends Data | Response
1333 >(
1334 workerNodeKey: number,
1335 listener: (message: MessageValue<Message>) => void
1336 ): void
1337
a2ed5053 1338 /**
aa9eede8 1339 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1340 * Can be overridden.
1341 *
aa9eede8 1342 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1343 */
aa9eede8 1344 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1345 // Listen to worker messages.
aa9eede8 1346 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1347 // Send the startup message to worker.
aa9eede8 1348 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1349 // Send the statistics message to worker.
1350 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1351 if (this.opts.enableTasksQueue === true) {
dbd73092 1352 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1353 this.workerNodes[workerNodeKey].onEmptyQueue =
1354 this.taskStealingOnEmptyQueue.bind(this)
1355 }
1356 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1357 this.workerNodes[workerNodeKey].onBackPressure =
1358 this.tasksStealingOnBackPressure.bind(this)
1359 }
72695f86 1360 }
d2c73f82
JB
1361 }
1362
85aeb3f3 1363 /**
aa9eede8
JB
1364 * Sends the startup message to worker given its worker node key.
1365 *
1366 * @param workerNodeKey - The worker node key.
1367 */
1368 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1369
1370 /**
9edb9717 1371 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1372 *
aa9eede8 1373 * @param workerNodeKey - The worker node key.
85aeb3f3 1374 */
9edb9717 1375 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1376 this.sendToWorker(workerNodeKey, {
1377 statistics: {
1378 runTime:
1379 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1380 .runTime.aggregate,
1381 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1382 .elu.aggregate
72ae84a2 1383 }
aa9eede8
JB
1384 })
1385 }
a2ed5053
JB
1386
1387 private redistributeQueuedTasks (workerNodeKey: number): void {
1388 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1389 const destinationWorkerNodeKey = this.workerNodes.reduce(
1390 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1391 return workerNode.info.ready &&
1392 workerNode.usage.tasks.queued <
1393 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1394 ? workerNodeKey
1395 : minWorkerNodeKey
1396 },
1397 0
1398 )
72ae84a2 1399 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1400 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1401 this.executeTask(destinationWorkerNodeKey, task)
1402 } else {
1403 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1404 }
1405 }
1406 }
1407
b1838604
JB
1408 private updateTaskStolenStatisticsWorkerUsage (
1409 workerNodeKey: number,
b1838604
JB
1410 taskName: string
1411 ): void {
1a880eca 1412 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1413 if (workerNode?.usage != null) {
1414 ++workerNode.usage.tasks.stolen
1415 }
1416 if (
1417 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1418 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1419 ) {
1420 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1421 taskName
1422 ) as WorkerUsage
1423 ++taskFunctionWorkerUsage.tasks.stolen
1424 }
1425 }
1426
dd951876 1427 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1428 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1429 const workerNodes = this.workerNodes
a6b3272b 1430 .slice()
dd951876
JB
1431 .sort(
1432 (workerNodeA, workerNodeB) =>
1433 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1434 )
f201a0cd 1435 const sourceWorkerNode = workerNodes.find(
041dc05b 1436 workerNode =>
f201a0cd
JB
1437 workerNode.info.ready &&
1438 workerNode.info.id !== workerId &&
1439 workerNode.usage.tasks.queued > 0
1440 )
1441 if (sourceWorkerNode != null) {
72ae84a2 1442 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1443 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1444 this.executeTask(destinationWorkerNodeKey, task)
1445 } else {
1446 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1447 }
f201a0cd
JB
1448 this.updateTaskStolenStatisticsWorkerUsage(
1449 destinationWorkerNodeKey,
1450 task.name as string
1451 )
72695f86
JB
1452 }
1453 }
1454
1455 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1456 const sizeOffset = 1
1457 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1458 return
1459 }
72695f86
JB
1460 const sourceWorkerNode =
1461 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1462 const workerNodes = this.workerNodes
a6b3272b 1463 .slice()
72695f86
JB
1464 .sort(
1465 (workerNodeA, workerNodeB) =>
1466 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1467 )
1468 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1469 if (
0bc68267 1470 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1471 workerNode.info.ready &&
1472 workerNode.info.id !== workerId &&
0bc68267 1473 workerNode.usage.tasks.queued <
f778c355 1474 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1475 ) {
72ae84a2 1476 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1477 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1478 this.executeTask(workerNodeKey, task)
4de3d785 1479 } else {
dd951876 1480 this.enqueueTask(workerNodeKey, task)
4de3d785 1481 }
b1838604
JB
1482 this.updateTaskStolenStatisticsWorkerUsage(
1483 workerNodeKey,
b1838604
JB
1484 task.name as string
1485 )
10ecf8fd 1486 }
a2ed5053
JB
1487 }
1488 }
1489
be0676b3 1490 /**
aa9eede8 1491 * This method is the listener registered for each worker message.
be0676b3 1492 *
bdacc2d2 1493 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1494 */
1495 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1496 return message => {
21f710aa 1497 this.checkMessageWorkerId(message)
6703b9f4 1498 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1499 // Worker ready response received from worker
10e2aa7e 1500 this.handleWorkerReadyResponse(message)
7629bdf1 1501 } else if (message.taskId != null) {
81c02522 1502 // Task execution response received from worker
6b272951 1503 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1504 } else if (message.taskFunctionNames != null) {
1505 // Task function names message received from worker
46b0bb09
JB
1506 this.getWorkerInfo(
1507 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1508 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1509 }
1510 }
1511 }
1512
10e2aa7e 1513 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1514 if (message.ready === false) {
72ae84a2
JB
1515 throw new Error(
1516 `Worker ${message.workerId as number} failed to initialize`
1517 )
f05ed93c 1518 }
a5d15204 1519 const workerInfo = this.getWorkerInfo(
aad6fb64 1520 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1521 )
a5d15204 1522 workerInfo.ready = message.ready as boolean
6703b9f4 1523 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1524 if (this.ready) {
1525 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1526 }
6b272951
JB
1527 }
1528
1529 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1530 const { taskId, workerError, data } = message
5441aea6 1531 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1532 if (promiseResponse != null) {
6703b9f4
JB
1533 if (workerError != null) {
1534 this.emitter?.emit(PoolEvents.taskError, workerError)
1535 promiseResponse.reject(workerError.message)
6b272951 1536 } else {
5441aea6 1537 promiseResponse.resolve(data as Response)
6b272951 1538 }
501aea93
JB
1539 const workerNodeKey = promiseResponse.workerNodeKey
1540 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1541 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1542 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1543 if (
1544 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1545 this.tasksQueueSize(workerNodeKey) > 0 &&
1546 this.workerNodes[workerNodeKey].usage.tasks.executing <
1547 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1548 ) {
1549 this.executeTask(
1550 workerNodeKey,
1551 this.dequeueTask(workerNodeKey) as Task<Data>
1552 )
be0676b3
APA
1553 }
1554 }
be0676b3 1555 }
7c0ba920 1556
a1763c54 1557 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1558 if (this.busy) {
1559 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1560 }
1561 }
1562
1563 private checkAndEmitTaskQueuingEvents (): void {
1564 if (this.hasBackPressure()) {
1565 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1566 }
1567 }
1568
33e6bb4c
JB
1569 private checkAndEmitDynamicWorkerCreationEvents (): void {
1570 if (this.type === PoolTypes.dynamic) {
1571 if (this.full) {
1572 this.emitter?.emit(PoolEvents.full, this.info)
1573 }
1574 }
1575 }
1576
8a1260a3 1577 /**
aa9eede8 1578 * Gets the worker information given its worker node key.
8a1260a3
JB
1579 *
1580 * @param workerNodeKey - The worker node key.
3f09ed9f 1581 * @returns The worker information.
8a1260a3 1582 */
46b0bb09
JB
1583 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1584 return this.workerNodes[workerNodeKey].info
e221309a
JB
1585 }
1586
a05c10de 1587 /**
b0a4db63 1588 * Adds the given worker in the pool worker nodes.
ea7a90d3 1589 *
38e795c1 1590 * @param worker - The worker.
aa9eede8
JB
1591 * @returns The added worker node key.
1592 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1593 */
b0a4db63 1594 private addWorkerNode (worker: Worker): number {
671d5154
JB
1595 const workerNode = new WorkerNode<Worker, Data>(
1596 worker,
ff3f866a 1597 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1598 )
b97d82d8 1599 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1600 if (this.starting) {
1601 workerNode.info.ready = true
1602 }
aa9eede8 1603 this.workerNodes.push(workerNode)
aad6fb64 1604 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1605 if (workerNodeKey === -1) {
86ed0598 1606 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1607 }
1608 return workerNodeKey
ea7a90d3 1609 }
c923ce56 1610
51fe3d3c 1611 /**
f06e48d8 1612 * Removes the given worker from the pool worker nodes.
51fe3d3c 1613 *
f06e48d8 1614 * @param worker - The worker.
51fe3d3c 1615 */
416fd65c 1616 private removeWorkerNode (worker: Worker): void {
aad6fb64 1617 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1618 if (workerNodeKey !== -1) {
1619 this.workerNodes.splice(workerNodeKey, 1)
1620 this.workerChoiceStrategyContext.remove(workerNodeKey)
1621 }
51fe3d3c 1622 }
adc3c320 1623
e2b31e32
JB
1624 /** @inheritDoc */
1625 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1626 return (
e2b31e32
JB
1627 this.opts.enableTasksQueue === true &&
1628 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1629 )
1630 }
1631
1632 private hasBackPressure (): boolean {
1633 return (
1634 this.opts.enableTasksQueue === true &&
1635 this.workerNodes.findIndex(
041dc05b 1636 workerNode => !workerNode.hasBackPressure()
a1763c54 1637 ) === -1
9e844245 1638 )
e2b31e32
JB
1639 }
1640
b0a4db63 1641 /**
aa9eede8 1642 * Executes the given task on the worker given its worker node key.
b0a4db63 1643 *
aa9eede8 1644 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1645 * @param task - The task to execute.
1646 */
2e81254d 1647 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1648 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1649 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1650 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1651 }
1652
f9f00b5f 1653 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1654 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1655 this.checkAndEmitTaskQueuingEvents()
1656 return tasksQueueSize
adc3c320
JB
1657 }
1658
416fd65c 1659 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1660 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1661 }
1662
416fd65c 1663 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1664 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1665 }
1666
81c02522 1667 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1668 while (this.tasksQueueSize(workerNodeKey) > 0) {
1669 this.executeTask(
1670 workerNodeKey,
1671 this.dequeueTask(workerNodeKey) as Task<Data>
1672 )
ff733df7 1673 }
4b628b48 1674 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1675 }
1676
ef41a6e6
JB
1677 private flushTasksQueues (): void {
1678 for (const [workerNodeKey] of this.workerNodes.entries()) {
1679 this.flushTasksQueue(workerNodeKey)
1680 }
1681 }
c97c7edb 1682}