test: use sinon stub to simplify code
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
5b7d00b4 4import { type EventEmitter, EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
bfc75cca 20 round
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
6703b9f4 23import type { TaskFunction } from '../worker/task-functions'
c4855468 24import {
65d7a1c9 25 type IPool,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
4d159167
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
bde6b5d7
JB
50import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
bfc75cca
JB
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
bde6b5d7 55} from './utils'
23ccf9d7 56
729c563d 57/**
ea7a90d3 58 * Base class that implements some shared logic for all poolifier pools.
729c563d 59 *
38e795c1 60 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 63 */
c97c7edb 64export abstract class AbstractPool<
f06e48d8 65 Worker extends IWorker,
d3c8a1a8
S
66 Data = unknown,
67 Response = unknown
c4855468 68> implements IPool<Worker, Data, Response> {
afc003b2 69 /** @inheritDoc */
4b628b48 70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 71
afc003b2 72 /** @inheritDoc */
5b7d00b4 73 public emitter?: EventEmitter | EventEmitterAsyncResource
7c0ba920 74
be0676b3 75 /**
419e8121 76 * The task execution response promise map:
2740a743 77 * - `key`: The message id of each submitted task.
a3445496 78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 79 *
a3445496 80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 81 */
501aea93
JB
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 84
a35560ba 85 /**
51fe3d3c 86 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
89 Worker,
90 Data,
91 Response
a35560ba
S
92 >
93
8735b4e5
JB
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
72ae84a2
JB
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
15b176e0
JB
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
47352846
JB
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
afe0d5bf
JB
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
729c563d
S
119 /**
120 * Constructs a new poolifier pool.
121 *
38e795c1 122 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 123 * @param filePath - Path to the worker file.
38e795c1 124 * @param opts - Options for the pool.
729c563d 125 */
c97c7edb 126 public constructor (
b4213b7f
JB
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
c97c7edb 130 ) {
78cea37e 131 if (!this.isMain()) {
04f45163 132 throw new Error(
8c6d4acf 133 'Cannot start a pool from a worker with the same type as the pool'
04f45163 134 )
c97c7edb 135 }
bde6b5d7 136 checkFilePath(this.filePath)
8003c026 137 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 138 this.checkPoolOptions(this.opts)
1086026a 139
7254e419
JB
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 143
6bd72cd0 144 if (this.opts.enableEvents === true) {
b5604034 145 this.initializeEventEmitter()
7c0ba920 146 }
d59df138
JB
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
da309861
JB
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
b6b32453
JB
156
157 this.setupHook()
158
72ae84a2
JB
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
47352846 161 this.started = false
075e51d1 162 this.starting = false
47352846
JB
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
afe0d5bf
JB
166
167 this.startTimestamp = performance.now()
c97c7edb
S
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
7c0ba920 188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 189 if (isPlainObject(opts)) {
47352846 190 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 191 checkValidWorkerChoiceStrategy(
2324f8c9
JB
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
0d80593b
JB
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
8990357d
JB
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
bde6b5d7 207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 214 }
aee46736
JB
215 }
216
0d80593b
JB
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
2324f8c9
JB
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
0d80593b
JB
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
8990357d 228 if (
2324f8c9 229 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
231 ) {
232 throw new TypeError(
8c0b113f 233 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
234 )
235 }
236 if (
2324f8c9 237 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 238 workerChoiceStrategyOptions.retries < 0
8990357d
JB
239 ) {
240 throw new RangeError(
8c0b113f 241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
242 )
243 }
49be33fe 244 if (
2324f8c9 245 workerChoiceStrategyOptions?.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803 252 if (
2324f8c9 253 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
b5604034 264 private initializeEventEmitter (): void {
5b7d00b4 265 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
266 name: `poolifier:${this.type}-${this.worker}-pool`
267 })
268 }
269
08f3f44c 270 /** @inheritDoc */
6b27d407
JB
271 public get info (): PoolInfo {
272 return {
23ccf9d7 273 version,
6b27d407 274 type: this.type,
184855e6 275 worker: this.worker,
47352846 276 started: this.started,
2431bdb4
JB
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
279 minSize: this.minSize,
280 maxSize: this.maxSize,
c05f0d50
JB
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
1305e9a8
JB
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
f59e1027 288 workerNode.usage.tasks.executing === 0
a4e07f72
JB
289 ? accumulator + 1
290 : accumulator,
6b27d407
JB
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
296 0
297 ),
a4e07f72 298 executedTasks: this.workerNodes.reduce(
6b27d407 299 (accumulator, workerNode) =>
f59e1027 300 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
306 0
307 ),
daf86646
JB
308 ...(this.opts.enableTasksQueue === true && {
309 queuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
311 accumulator + workerNode.usage.tasks.queued,
312 0
313 )
314 }),
315 ...(this.opts.enableTasksQueue === true && {
316 maxQueuedTasks: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
319 0
320 )
321 }),
a1763c54
JB
322 ...(this.opts.enableTasksQueue === true && {
323 backPressure: this.hasBackPressure()
324 }),
68cbdc84
JB
325 ...(this.opts.enableTasksQueue === true && {
326 stolenTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.stolen,
329 0
330 )
331 }),
a4e07f72
JB
332 failedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
f59e1027 334 accumulator + workerNode.usage.tasks.failed,
a4e07f72 335 0
1dcf8b7b
JB
336 ),
337 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
338 .runTime.aggregate && {
339 runTime: {
98e72cda 340 minimum: round(
90d6701c 341 min(
98e72cda 342 ...this.workerNodes.map(
041dc05b 343 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 344 )
1dcf8b7b
JB
345 )
346 ),
98e72cda 347 maximum: round(
90d6701c 348 max(
98e72cda 349 ...this.workerNodes.map(
041dc05b 350 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 351 )
1dcf8b7b 352 )
98e72cda 353 ),
3baa0837
JB
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.average && {
356 average: round(
357 average(
358 this.workerNodes.reduce<number[]>(
359 (accumulator, workerNode) =>
360 accumulator.concat(workerNode.usage.runTime.history),
361 []
362 )
98e72cda 363 )
dc021bcc 364 )
3baa0837 365 }),
98e72cda
JB
366 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .runTime.median && {
368 median: round(
369 median(
3baa0837
JB
370 this.workerNodes.reduce<number[]>(
371 (accumulator, workerNode) =>
372 accumulator.concat(workerNode.usage.runTime.history),
373 []
98e72cda
JB
374 )
375 )
376 )
377 })
1dcf8b7b
JB
378 }
379 }),
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .waitTime.aggregate && {
382 waitTime: {
98e72cda 383 minimum: round(
90d6701c 384 min(
98e72cda 385 ...this.workerNodes.map(
041dc05b 386 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 387 )
1dcf8b7b
JB
388 )
389 ),
98e72cda 390 maximum: round(
90d6701c 391 max(
98e72cda 392 ...this.workerNodes.map(
041dc05b 393 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 394 )
1dcf8b7b 395 )
98e72cda 396 ),
3baa0837
JB
397 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
398 .waitTime.average && {
399 average: round(
400 average(
401 this.workerNodes.reduce<number[]>(
402 (accumulator, workerNode) =>
403 accumulator.concat(workerNode.usage.waitTime.history),
404 []
405 )
98e72cda 406 )
dc021bcc 407 )
3baa0837 408 }),
98e72cda
JB
409 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
410 .waitTime.median && {
411 median: round(
412 median(
3baa0837
JB
413 this.workerNodes.reduce<number[]>(
414 (accumulator, workerNode) =>
415 accumulator.concat(workerNode.usage.waitTime.history),
416 []
98e72cda
JB
417 )
418 )
419 )
420 })
1dcf8b7b
JB
421 }
422 })
6b27d407
JB
423 }
424 }
08f3f44c 425
aa9eede8
JB
426 /**
427 * The pool readiness boolean status.
428 */
2431bdb4
JB
429 private get ready (): boolean {
430 return (
b97d82d8
JB
431 this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 !workerNode.info.dynamic && workerNode.info.ready
434 ? accumulator + 1
435 : accumulator,
436 0
437 ) >= this.minSize
2431bdb4
JB
438 )
439 }
440
afe0d5bf 441 /**
aa9eede8 442 * The approximate pool utilization.
afe0d5bf
JB
443 *
444 * @returns The pool utilization.
445 */
446 private get utilization (): number {
8e5ca040 447 const poolTimeCapacity =
fe7d90db 448 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
449 const totalTasksRunTime = this.workerNodes.reduce(
450 (accumulator, workerNode) =>
71514351 451 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
452 0
453 )
454 const totalTasksWaitTime = this.workerNodes.reduce(
455 (accumulator, workerNode) =>
71514351 456 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
457 0
458 )
8e5ca040 459 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
460 }
461
8881ae32 462 /**
aa9eede8 463 * The pool type.
8881ae32
JB
464 *
465 * If it is `'dynamic'`, it provides the `max` property.
466 */
467 protected abstract get type (): PoolType
468
184855e6 469 /**
aa9eede8 470 * The worker type.
184855e6
JB
471 */
472 protected abstract get worker (): WorkerType
473
c2ade475 474 /**
aa9eede8 475 * The pool minimum size.
c2ade475 476 */
8735b4e5
JB
477 protected get minSize (): number {
478 return this.numberOfWorkers
479 }
ff733df7
JB
480
481 /**
aa9eede8 482 * The pool maximum size.
ff733df7 483 */
8735b4e5
JB
484 protected get maxSize (): number {
485 return this.max ?? this.numberOfWorkers
486 }
a35560ba 487
6b813701
JB
488 /**
489 * Checks if the worker id sent in the received message from a worker is valid.
490 *
491 * @param message - The received message.
492 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
493 */
4d159167 494 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
495 if (message.workerId == null) {
496 throw new Error('Worker message received without worker id')
497 } else if (
21f710aa 498 message.workerId != null &&
aad6fb64 499 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
500 ) {
501 throw new Error(
502 `Worker message received from unknown worker '${message.workerId}'`
503 )
504 }
505 }
506
ffcbbad8 507 /**
f06e48d8 508 * Gets the given worker its worker node key.
ffcbbad8
JB
509 *
510 * @param worker - The worker.
f59e1027 511 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 512 */
aad6fb64 513 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 514 return this.workerNodes.findIndex(
041dc05b 515 workerNode => workerNode.worker === worker
f06e48d8 516 )
bf9549ae
JB
517 }
518
aa9eede8
JB
519 /**
520 * Gets the worker node key given its worker id.
521 *
522 * @param workerId - The worker id.
aad6fb64 523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 524 */
72ae84a2 525 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 526 return this.workerNodes.findIndex(
041dc05b 527 workerNode => workerNode.info.id === workerId
aad6fb64 528 )
aa9eede8
JB
529 }
530
afc003b2 531 /** @inheritDoc */
a35560ba 532 public setWorkerChoiceStrategy (
59219cbb
JB
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 535 ): void {
bde6b5d7 536 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 537 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
538 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
aa9eede8 544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 545 workerNode.resetUsage()
9edb9717 546 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 547 }
a20f0ba5
JB
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
553 ): void {
0d80593b 554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
555 this.opts.workerChoiceStrategyOptions = {
556 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
557 ...workerChoiceStrategyOptions
558 }
a20f0ba5
JB
559 this.workerChoiceStrategyContext.setOptions(
560 this.opts.workerChoiceStrategyOptions
a35560ba
S
561 )
562 }
563
a20f0ba5 564 /** @inheritDoc */
8f52842f
JB
565 public enableTasksQueue (
566 enable: boolean,
567 tasksQueueOptions?: TasksQueueOptions
568 ): void {
a20f0ba5 569 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
570 this.unsetTaskStealing()
571 this.unsetTasksStealingOnBackPressure()
ef41a6e6 572 this.flushTasksQueues()
a20f0ba5
JB
573 }
574 this.opts.enableTasksQueue = enable
8f52842f 575 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
576 }
577
578 /** @inheritDoc */
8f52842f 579 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true) {
bde6b5d7 581 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 584 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
585 if (this.opts.tasksQueueOptions.taskStealing === true) {
586 this.setTaskStealing()
587 } else {
588 this.unsetTaskStealing()
589 }
590 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
591 this.setTasksStealingOnBackPressure()
592 } else {
593 this.unsetTasksStealingOnBackPressure()
594 }
5baee0d7 595 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
596 delete this.opts.tasksQueueOptions
597 }
598 }
599
9b38ab2d
JB
600 private buildTasksQueueOptions (
601 tasksQueueOptions: TasksQueueOptions
602 ): TasksQueueOptions {
603 return {
604 ...{
605 size: Math.pow(this.maxSize, 2),
606 concurrency: 1,
607 taskStealing: true,
608 tasksStealingOnBackPressure: true
609 },
610 ...tasksQueueOptions
611 }
612 }
613
5b49e864 614 private setTasksQueueSize (size: number): void {
20c6f652 615 for (const workerNode of this.workerNodes) {
ff3f866a 616 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
617 }
618 }
619
d6ca1416
JB
620 private setTaskStealing (): void {
621 for (const [workerNodeKey] of this.workerNodes.entries()) {
622 this.workerNodes[workerNodeKey].onEmptyQueue =
623 this.taskStealingOnEmptyQueue.bind(this)
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 delete this.workerNodes[workerNodeKey].onEmptyQueue
630 }
631 }
632
633 private setTasksStealingOnBackPressure (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
635 this.workerNodes[workerNodeKey].onBackPressure =
636 this.tasksStealingOnBackPressure.bind(this)
637 }
638 }
639
640 private unsetTasksStealingOnBackPressure (): void {
641 for (const [workerNodeKey] of this.workerNodes.entries()) {
642 delete this.workerNodes[workerNodeKey].onBackPressure
643 }
644 }
645
c319c66b
JB
646 /**
647 * Whether the pool is full or not.
648 *
649 * The pool filling boolean status.
650 */
dea903a8
JB
651 protected get full (): boolean {
652 return this.workerNodes.length >= this.maxSize
653 }
c2ade475 654
c319c66b
JB
655 /**
656 * Whether the pool is busy or not.
657 *
658 * The pool busyness boolean status.
659 */
660 protected abstract get busy (): boolean
7c0ba920 661
6c6afb84 662 /**
3d76750a 663 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
664 *
665 * @returns Worker nodes busyness boolean status.
666 */
c2ade475 667 protected internalBusy (): boolean {
3d76750a
JB
668 if (this.opts.enableTasksQueue === true) {
669 return (
670 this.workerNodes.findIndex(
041dc05b 671 workerNode =>
3d76750a
JB
672 workerNode.info.ready &&
673 workerNode.usage.tasks.executing <
674 (this.opts.tasksQueueOptions?.concurrency as number)
675 ) === -1
676 )
3d76750a 677 }
419e8121
JB
678 return (
679 this.workerNodes.findIndex(
680 workerNode =>
681 workerNode.info.ready && workerNode.usage.tasks.executing === 0
682 ) === -1
683 )
cb70b19d
JB
684 }
685
e81c38f2 686 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
687 workerNodeKey: number,
688 message: MessageValue<Data>
689 ): Promise<boolean> {
72ae84a2 690 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
691 const taskFunctionOperationListener = (
692 message: MessageValue<Response>
693 ): void => {
694 this.checkMessageWorkerId(message)
695 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 696 if (
ae036c3e
JB
697 message.taskFunctionOperationStatus != null &&
698 message.workerId === workerId
72ae84a2 699 ) {
ae036c3e
JB
700 if (message.taskFunctionOperationStatus) {
701 resolve(true)
702 } else if (!message.taskFunctionOperationStatus) {
703 reject(
704 new Error(
705 `Task function operation '${
706 message.taskFunctionOperation as string
707 }' failed on worker ${message.workerId} with error: '${
708 message.workerError?.message as string
709 }'`
710 )
72ae84a2 711 )
ae036c3e
JB
712 }
713 this.deregisterWorkerMessageListener(
714 this.getWorkerNodeKeyByWorkerId(message.workerId),
715 taskFunctionOperationListener
72ae84a2
JB
716 )
717 }
ae036c3e
JB
718 }
719 this.registerWorkerMessageListener(
720 workerNodeKey,
721 taskFunctionOperationListener
722 )
72ae84a2
JB
723 this.sendToWorker(workerNodeKey, message)
724 })
725 }
726
727 private async sendTaskFunctionOperationToWorkers (
adee6053 728 message: MessageValue<Data>
e81c38f2
JB
729 ): Promise<boolean> {
730 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
731 const responsesReceived = new Array<MessageValue<Response>>()
732 const taskFunctionOperationsListener = (
733 message: MessageValue<Response>
734 ): void => {
735 this.checkMessageWorkerId(message)
736 if (message.taskFunctionOperationStatus != null) {
737 responsesReceived.push(message)
738 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 739 if (
e81c38f2
JB
740 responsesReceived.every(
741 message => message.taskFunctionOperationStatus === true
742 )
743 ) {
744 resolve(true)
745 } else if (
e81c38f2
JB
746 responsesReceived.some(
747 message => message.taskFunctionOperationStatus === false
748 )
749 ) {
b0b55f57
JB
750 const errorResponse = responsesReceived.find(
751 response => response.taskFunctionOperationStatus === false
752 )
e81c38f2
JB
753 reject(
754 new Error(
b0b55f57 755 `Task function operation '${
e81c38f2 756 message.taskFunctionOperation as string
b0b55f57
JB
757 }' failed on worker ${
758 errorResponse?.workerId as number
759 } with error: '${
760 errorResponse?.workerError?.message as string
761 }'`
e81c38f2
JB
762 )
763 )
764 }
ae036c3e
JB
765 this.deregisterWorkerMessageListener(
766 this.getWorkerNodeKeyByWorkerId(message.workerId),
767 taskFunctionOperationsListener
768 )
e81c38f2 769 }
ae036c3e
JB
770 }
771 }
772 for (const [workerNodeKey] of this.workerNodes.entries()) {
773 this.registerWorkerMessageListener(
774 workerNodeKey,
775 taskFunctionOperationsListener
776 )
72ae84a2 777 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
778 }
779 })
6703b9f4
JB
780 }
781
782 /** @inheritDoc */
783 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
784 for (const workerNode of this.workerNodes) {
785 if (
786 Array.isArray(workerNode.info.taskFunctionNames) &&
787 workerNode.info.taskFunctionNames.includes(name)
788 ) {
789 return true
790 }
791 }
792 return false
6703b9f4
JB
793 }
794
795 /** @inheritDoc */
e81c38f2
JB
796 public async addTaskFunction (
797 name: string,
3feeab69 798 fn: TaskFunction<Data, Response>
e81c38f2 799 ): Promise<boolean> {
3feeab69
JB
800 if (typeof name !== 'string') {
801 throw new TypeError('name argument must be a string')
802 }
803 if (typeof name === 'string' && name.trim().length === 0) {
804 throw new TypeError('name argument must not be an empty string')
805 }
806 if (typeof fn !== 'function') {
807 throw new TypeError('fn argument must be a function')
808 }
adee6053 809 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
810 taskFunctionOperation: 'add',
811 taskFunctionName: name,
3feeab69 812 taskFunction: fn.toString()
6703b9f4 813 })
adee6053
JB
814 this.taskFunctions.set(name, fn)
815 return opResult
6703b9f4
JB
816 }
817
818 /** @inheritDoc */
e81c38f2 819 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
820 if (!this.taskFunctions.has(name)) {
821 throw new Error(
16248b23 822 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
823 )
824 }
adee6053 825 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
826 taskFunctionOperation: 'remove',
827 taskFunctionName: name
828 })
adee6053
JB
829 this.deleteTaskFunctionWorkerUsages(name)
830 this.taskFunctions.delete(name)
831 return opResult
6703b9f4
JB
832 }
833
90d7d101 834 /** @inheritDoc */
6703b9f4 835 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
836 for (const workerNode of this.workerNodes) {
837 if (
6703b9f4
JB
838 Array.isArray(workerNode.info.taskFunctionNames) &&
839 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 840 ) {
6703b9f4 841 return workerNode.info.taskFunctionNames
f2dbbf95 842 }
90d7d101 843 }
f2dbbf95 844 return []
90d7d101
JB
845 }
846
6703b9f4 847 /** @inheritDoc */
e81c38f2 848 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 849 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
850 taskFunctionOperation: 'default',
851 taskFunctionName: name
852 })
6703b9f4
JB
853 }
854
adee6053
JB
855 private deleteTaskFunctionWorkerUsages (name: string): void {
856 for (const workerNode of this.workerNodes) {
857 workerNode.deleteTaskFunctionWorkerUsage(name)
858 }
859 }
860
375f7504
JB
861 private shallExecuteTask (workerNodeKey: number): boolean {
862 return (
863 this.tasksQueueSize(workerNodeKey) === 0 &&
864 this.workerNodes[workerNodeKey].usage.tasks.executing <
865 (this.opts.tasksQueueOptions?.concurrency as number)
866 )
867 }
868
afc003b2 869 /** @inheritDoc */
7d91a8cd
JB
870 public async execute (
871 data?: Data,
872 name?: string,
873 transferList?: TransferListItem[]
874 ): Promise<Response> {
52b71763 875 return await new Promise<Response>((resolve, reject) => {
15b176e0 876 if (!this.started) {
47352846 877 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 878 return
15b176e0 879 }
7d91a8cd
JB
880 if (name != null && typeof name !== 'string') {
881 reject(new TypeError('name argument must be a string'))
9d2d0da1 882 return
7d91a8cd 883 }
90d7d101
JB
884 if (
885 name != null &&
886 typeof name === 'string' &&
887 name.trim().length === 0
888 ) {
f58b60b9 889 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 890 return
90d7d101 891 }
b558f6b5
JB
892 if (transferList != null && !Array.isArray(transferList)) {
893 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 894 return
b558f6b5
JB
895 }
896 const timestamp = performance.now()
897 const workerNodeKey = this.chooseWorkerNode()
501aea93 898 const task: Task<Data> = {
52b71763
JB
899 name: name ?? DEFAULT_TASK_NAME,
900 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
901 data: data ?? ({} as Data),
7d91a8cd 902 transferList,
52b71763 903 timestamp,
7629bdf1 904 taskId: randomUUID()
52b71763 905 }
7629bdf1 906 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
907 resolve,
908 reject,
501aea93 909 workerNodeKey
2e81254d 910 })
52b71763 911 if (
4e377863
JB
912 this.opts.enableTasksQueue === false ||
913 (this.opts.enableTasksQueue === true &&
375f7504 914 this.shallExecuteTask(workerNodeKey))
52b71763 915 ) {
501aea93 916 this.executeTask(workerNodeKey, task)
4e377863
JB
917 } else {
918 this.enqueueTask(workerNodeKey, task)
52b71763 919 }
2e81254d 920 })
280c2a77 921 }
c97c7edb 922
47352846
JB
923 /** @inheritdoc */
924 public start (): void {
925 this.starting = true
926 while (
927 this.workerNodes.reduce(
928 (accumulator, workerNode) =>
929 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
930 0
931 ) < this.numberOfWorkers
932 ) {
933 this.createAndSetupWorkerNode()
934 }
935 this.starting = false
936 this.started = true
937 }
938
afc003b2 939 /** @inheritDoc */
c97c7edb 940 public async destroy (): Promise<void> {
1fbcaa7c 941 await Promise.all(
81c02522 942 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 943 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
944 })
945 )
33e6bb4c 946 this.emitter?.emit(PoolEvents.destroy, this.info)
5b7d00b4
JB
947 if (this.emitter instanceof EventEmitterAsyncResource) {
948 this.emitter?.emitDestroy()
949 }
15b176e0 950 this.started = false
c97c7edb
S
951 }
952
1e3214b6 953 protected async sendKillMessageToWorker (
72ae84a2 954 workerNodeKey: number
1e3214b6 955 ): Promise<void> {
9edb9717 956 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
957 const killMessageListener = (message: MessageValue<Response>): void => {
958 this.checkMessageWorkerId(message)
1e3214b6
JB
959 if (message.kill === 'success') {
960 resolve()
961 } else if (message.kill === 'failure') {
72ae84a2
JB
962 reject(
963 new Error(
ae036c3e 964 `Kill message handling failed on worker ${
72ae84a2 965 message.workerId as number
ae036c3e 966 }`
72ae84a2
JB
967 )
968 )
1e3214b6 969 }
ae036c3e
JB
970 }
971 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 972 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 973 })
1e3214b6
JB
974 }
975
4a6952ff 976 /**
aa9eede8 977 * Terminates the worker node given its worker node key.
4a6952ff 978 *
aa9eede8 979 * @param workerNodeKey - The worker node key.
4a6952ff 980 */
81c02522 981 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 982
729c563d 983 /**
6677a3d3
JB
984 * Setup hook to execute code before worker nodes are created in the abstract constructor.
985 * Can be overridden.
afc003b2
JB
986 *
987 * @virtual
729c563d 988 */
280c2a77 989 protected setupHook (): void {
965df41c 990 /* Intentionally empty */
280c2a77 991 }
c97c7edb 992
729c563d 993 /**
280c2a77
S
994 * Should return whether the worker is the main worker or not.
995 */
996 protected abstract isMain (): boolean
997
998 /**
2e81254d 999 * Hook executed before the worker task execution.
bf9549ae 1000 * Can be overridden.
729c563d 1001 *
f06e48d8 1002 * @param workerNodeKey - The worker node key.
1c6fe997 1003 * @param task - The task to execute.
729c563d 1004 */
1c6fe997
JB
1005 protected beforeTaskExecutionHook (
1006 workerNodeKey: number,
1007 task: Task<Data>
1008 ): void {
94407def
JB
1009 if (this.workerNodes[workerNodeKey]?.usage != null) {
1010 const workerUsage = this.workerNodes[workerNodeKey].usage
1011 ++workerUsage.tasks.executing
1012 this.updateWaitTimeWorkerUsage(workerUsage, task)
1013 }
1014 if (
1015 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1016 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1017 task.name as string
1018 ) != null
1019 ) {
db0e38ee 1020 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1021 workerNodeKey
db0e38ee 1022 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1023 ++taskFunctionWorkerUsage.tasks.executing
1024 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1025 }
c97c7edb
S
1026 }
1027
c01733f1 1028 /**
2e81254d 1029 * Hook executed after the worker task execution.
bf9549ae 1030 * Can be overridden.
c01733f1 1031 *
501aea93 1032 * @param workerNodeKey - The worker node key.
38e795c1 1033 * @param message - The received message.
c01733f1 1034 */
2e81254d 1035 protected afterTaskExecutionHook (
501aea93 1036 workerNodeKey: number,
2740a743 1037 message: MessageValue<Response>
bf9549ae 1038 ): void {
94407def
JB
1039 if (this.workerNodes[workerNodeKey]?.usage != null) {
1040 const workerUsage = this.workerNodes[workerNodeKey].usage
1041 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1042 this.updateRunTimeWorkerUsage(workerUsage, message)
1043 this.updateEluWorkerUsage(workerUsage, message)
1044 }
1045 if (
1046 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1047 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1048 message.taskPerformance?.name as string
94407def
JB
1049 ) != null
1050 ) {
db0e38ee 1051 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1052 workerNodeKey
db0e38ee 1053 ].getTaskFunctionWorkerUsage(
0628755c 1054 message.taskPerformance?.name as string
b558f6b5 1055 ) as WorkerUsage
db0e38ee
JB
1056 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1057 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1058 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1059 }
1060 }
1061
db0e38ee
JB
1062 /**
1063 * Whether the worker node shall update its task function worker usage or not.
1064 *
1065 * @param workerNodeKey - The worker node key.
1066 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1067 */
1068 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1069 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1070 return (
94407def 1071 workerInfo != null &&
6703b9f4
JB
1072 Array.isArray(workerInfo.taskFunctionNames) &&
1073 workerInfo.taskFunctionNames.length > 2
b558f6b5 1074 )
f1c06930
JB
1075 }
1076
1077 private updateTaskStatisticsWorkerUsage (
1078 workerUsage: WorkerUsage,
1079 message: MessageValue<Response>
1080 ): void {
a4e07f72 1081 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1082 if (
1083 workerTaskStatistics.executing != null &&
1084 workerTaskStatistics.executing > 0
1085 ) {
1086 --workerTaskStatistics.executing
5bb5be17 1087 }
6703b9f4 1088 if (message.workerError == null) {
98e72cda
JB
1089 ++workerTaskStatistics.executed
1090 } else {
a4e07f72 1091 ++workerTaskStatistics.failed
2740a743 1092 }
f8eb0a2a
JB
1093 }
1094
a4e07f72
JB
1095 private updateRunTimeWorkerUsage (
1096 workerUsage: WorkerUsage,
f8eb0a2a
JB
1097 message: MessageValue<Response>
1098 ): void {
6703b9f4 1099 if (message.workerError != null) {
dc021bcc
JB
1100 return
1101 }
e4f20deb
JB
1102 updateMeasurementStatistics(
1103 workerUsage.runTime,
1104 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1105 message.taskPerformance?.runTime ?? 0
e4f20deb 1106 )
f8eb0a2a
JB
1107 }
1108
a4e07f72
JB
1109 private updateWaitTimeWorkerUsage (
1110 workerUsage: WorkerUsage,
1c6fe997 1111 task: Task<Data>
f8eb0a2a 1112 ): void {
1c6fe997
JB
1113 const timestamp = performance.now()
1114 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1115 updateMeasurementStatistics(
1116 workerUsage.waitTime,
1117 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1118 taskWaitTime
e4f20deb 1119 )
c01733f1 1120 }
1121
a4e07f72 1122 private updateEluWorkerUsage (
5df69fab 1123 workerUsage: WorkerUsage,
62c15a68
JB
1124 message: MessageValue<Response>
1125 ): void {
6703b9f4 1126 if (message.workerError != null) {
dc021bcc
JB
1127 return
1128 }
008512c7
JB
1129 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1130 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1131 updateMeasurementStatistics(
1132 workerUsage.elu.active,
008512c7 1133 eluTaskStatisticsRequirements,
dc021bcc 1134 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1135 )
1136 updateMeasurementStatistics(
1137 workerUsage.elu.idle,
008512c7 1138 eluTaskStatisticsRequirements,
dc021bcc 1139 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1140 )
008512c7 1141 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1142 if (message.taskPerformance?.elu != null) {
f7510105
JB
1143 if (workerUsage.elu.utilization != null) {
1144 workerUsage.elu.utilization =
1145 (workerUsage.elu.utilization +
1146 message.taskPerformance.elu.utilization) /
1147 2
1148 } else {
1149 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1150 }
62c15a68
JB
1151 }
1152 }
1153 }
1154
280c2a77 1155 /**
f06e48d8 1156 * Chooses a worker node for the next task.
280c2a77 1157 *
6c6afb84 1158 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1159 *
aa9eede8 1160 * @returns The chosen worker node key
280c2a77 1161 */
6c6afb84 1162 private chooseWorkerNode (): number {
930dcf12 1163 if (this.shallCreateDynamicWorker()) {
aa9eede8 1164 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1165 if (
b1aae695 1166 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1167 ) {
aa9eede8 1168 return workerNodeKey
6c6afb84 1169 }
17393ac8 1170 }
930dcf12
JB
1171 return this.workerChoiceStrategyContext.execute()
1172 }
1173
6c6afb84
JB
1174 /**
1175 * Conditions for dynamic worker creation.
1176 *
1177 * @returns Whether to create a dynamic worker or not.
1178 */
1179 private shallCreateDynamicWorker (): boolean {
930dcf12 1180 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1181 }
1182
280c2a77 1183 /**
aa9eede8 1184 * Sends a message to worker given its worker node key.
280c2a77 1185 *
aa9eede8 1186 * @param workerNodeKey - The worker node key.
38e795c1 1187 * @param message - The message.
7d91a8cd 1188 * @param transferList - The optional array of transferable objects.
280c2a77
S
1189 */
1190 protected abstract sendToWorker (
aa9eede8 1191 workerNodeKey: number,
7d91a8cd
JB
1192 message: MessageValue<Data>,
1193 transferList?: TransferListItem[]
280c2a77
S
1194 ): void
1195
729c563d 1196 /**
41344292 1197 * Creates a new worker.
6c6afb84
JB
1198 *
1199 * @returns Newly created worker.
729c563d 1200 */
280c2a77 1201 protected abstract createWorker (): Worker
c97c7edb 1202
4a6952ff 1203 /**
aa9eede8 1204 * Creates a new, completely set up worker node.
4a6952ff 1205 *
aa9eede8 1206 * @returns New, completely set up worker node key.
4a6952ff 1207 */
aa9eede8 1208 protected createAndSetupWorkerNode (): number {
bdacc2d2 1209 const worker = this.createWorker()
280c2a77 1210
fd04474e 1211 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1212 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1213 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1214 worker.on('error', error => {
aad6fb64 1215 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1216 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1217 workerInfo.ready = false
0dc838e3 1218 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1219 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1220 if (
b6bfca01 1221 this.started &&
9b38ab2d
JB
1222 !this.starting &&
1223 this.opts.restartWorkerOnError === true
15b176e0 1224 ) {
9b106837 1225 if (workerInfo.dynamic) {
aa9eede8 1226 this.createAndSetupDynamicWorkerNode()
8a1260a3 1227 } else {
aa9eede8 1228 this.createAndSetupWorkerNode()
8a1260a3 1229 }
5baee0d7 1230 }
9b38ab2d 1231 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1232 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1233 }
5baee0d7 1234 })
a35560ba 1235 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1236 worker.once('exit', () => {
f06e48d8 1237 this.removeWorkerNode(worker)
a974afa6 1238 })
280c2a77 1239
aa9eede8 1240 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1241
aa9eede8 1242 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1243
aa9eede8 1244 return workerNodeKey
c97c7edb 1245 }
be0676b3 1246
930dcf12 1247 /**
aa9eede8 1248 * Creates a new, completely set up dynamic worker node.
930dcf12 1249 *
aa9eede8 1250 * @returns New, completely set up dynamic worker node key.
930dcf12 1251 */
aa9eede8
JB
1252 protected createAndSetupDynamicWorkerNode (): number {
1253 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1254 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1255 this.checkMessageWorkerId(message)
aa9eede8
JB
1256 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1257 message.workerId
aad6fb64 1258 )
aa9eede8 1259 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1260 // Kill message received from worker
930dcf12
JB
1261 if (
1262 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1263 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1264 ((this.opts.enableTasksQueue === false &&
aa9eede8 1265 workerUsage.tasks.executing === 0) ||
7b56f532 1266 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1267 workerUsage.tasks.executing === 0 &&
1268 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1269 ) {
041dc05b 1270 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1271 this.emitter?.emit(PoolEvents.error, error)
1272 })
930dcf12
JB
1273 }
1274 })
46b0bb09 1275 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1276 this.sendToWorker(workerNodeKey, {
e9dd5b66 1277 checkActive: true
21f710aa 1278 })
72ae84a2
JB
1279 if (this.taskFunctions.size > 0) {
1280 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1281 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1282 taskFunctionOperation: 'add',
1283 taskFunctionName,
1284 taskFunction: taskFunction.toString()
1285 }).catch(error => {
1286 this.emitter?.emit(PoolEvents.error, error)
1287 })
1288 }
1289 }
b5e113f6 1290 workerInfo.dynamic = true
b1aae695
JB
1291 if (
1292 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1293 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1294 ) {
b5e113f6
JB
1295 workerInfo.ready = true
1296 }
33e6bb4c 1297 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1298 return workerNodeKey
930dcf12
JB
1299 }
1300
a2ed5053 1301 /**
aa9eede8 1302 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1303 *
aa9eede8 1304 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1305 * @param listener - The message listener callback.
1306 */
85aeb3f3
JB
1307 protected abstract registerWorkerMessageListener<
1308 Message extends Data | Response
aa9eede8
JB
1309 >(
1310 workerNodeKey: number,
1311 listener: (message: MessageValue<Message>) => void
1312 ): void
a2ed5053 1313
ae036c3e
JB
1314 /**
1315 * Registers once a listener callback on the worker given its worker node key.
1316 *
1317 * @param workerNodeKey - The worker node key.
1318 * @param listener - The message listener callback.
1319 */
1320 protected abstract registerOnceWorkerMessageListener<
1321 Message extends Data | Response
1322 >(
1323 workerNodeKey: number,
1324 listener: (message: MessageValue<Message>) => void
1325 ): void
1326
1327 /**
1328 * Deregisters a listener callback on the worker given its worker node key.
1329 *
1330 * @param workerNodeKey - The worker node key.
1331 * @param listener - The message listener callback.
1332 */
1333 protected abstract deregisterWorkerMessageListener<
1334 Message extends Data | Response
1335 >(
1336 workerNodeKey: number,
1337 listener: (message: MessageValue<Message>) => void
1338 ): void
1339
a2ed5053 1340 /**
aa9eede8 1341 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1342 * Can be overridden.
1343 *
aa9eede8 1344 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1345 */
aa9eede8 1346 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1347 // Listen to worker messages.
aa9eede8 1348 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1349 // Send the startup message to worker.
aa9eede8 1350 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1351 // Send the statistics message to worker.
1352 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1353 if (this.opts.enableTasksQueue === true) {
dbd73092 1354 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1355 this.workerNodes[workerNodeKey].onEmptyQueue =
1356 this.taskStealingOnEmptyQueue.bind(this)
1357 }
1358 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1359 this.workerNodes[workerNodeKey].onBackPressure =
1360 this.tasksStealingOnBackPressure.bind(this)
1361 }
72695f86 1362 }
d2c73f82
JB
1363 }
1364
85aeb3f3 1365 /**
aa9eede8
JB
1366 * Sends the startup message to worker given its worker node key.
1367 *
1368 * @param workerNodeKey - The worker node key.
1369 */
1370 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1371
1372 /**
9edb9717 1373 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1374 *
aa9eede8 1375 * @param workerNodeKey - The worker node key.
85aeb3f3 1376 */
9edb9717 1377 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1378 this.sendToWorker(workerNodeKey, {
1379 statistics: {
1380 runTime:
1381 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1382 .runTime.aggregate,
1383 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1384 .elu.aggregate
72ae84a2 1385 }
aa9eede8
JB
1386 })
1387 }
a2ed5053
JB
1388
1389 private redistributeQueuedTasks (workerNodeKey: number): void {
1390 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1391 const destinationWorkerNodeKey = this.workerNodes.reduce(
1392 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1393 return workerNode.info.ready &&
1394 workerNode.usage.tasks.queued <
1395 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1396 ? workerNodeKey
1397 : minWorkerNodeKey
1398 },
1399 0
1400 )
72ae84a2 1401 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1402 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1403 this.executeTask(destinationWorkerNodeKey, task)
1404 } else {
1405 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1406 }
1407 }
1408 }
1409
b1838604
JB
1410 private updateTaskStolenStatisticsWorkerUsage (
1411 workerNodeKey: number,
b1838604
JB
1412 taskName: string
1413 ): void {
1a880eca 1414 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1415 if (workerNode?.usage != null) {
1416 ++workerNode.usage.tasks.stolen
1417 }
1418 if (
1419 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1420 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1421 ) {
1422 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1423 taskName
1424 ) as WorkerUsage
1425 ++taskFunctionWorkerUsage.tasks.stolen
1426 }
1427 }
1428
dd951876 1429 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1430 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1431 const workerNodes = this.workerNodes
a6b3272b 1432 .slice()
dd951876
JB
1433 .sort(
1434 (workerNodeA, workerNodeB) =>
1435 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1436 )
f201a0cd 1437 const sourceWorkerNode = workerNodes.find(
041dc05b 1438 workerNode =>
f201a0cd
JB
1439 workerNode.info.ready &&
1440 workerNode.info.id !== workerId &&
1441 workerNode.usage.tasks.queued > 0
1442 )
1443 if (sourceWorkerNode != null) {
72ae84a2 1444 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1445 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1446 this.executeTask(destinationWorkerNodeKey, task)
1447 } else {
1448 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1449 }
f201a0cd
JB
1450 this.updateTaskStolenStatisticsWorkerUsage(
1451 destinationWorkerNodeKey,
1452 task.name as string
1453 )
72695f86
JB
1454 }
1455 }
1456
1457 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1458 const sizeOffset = 1
1459 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1460 return
1461 }
72695f86
JB
1462 const sourceWorkerNode =
1463 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1464 const workerNodes = this.workerNodes
a6b3272b 1465 .slice()
72695f86
JB
1466 .sort(
1467 (workerNodeA, workerNodeB) =>
1468 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1469 )
1470 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1471 if (
0bc68267 1472 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1473 workerNode.info.ready &&
1474 workerNode.info.id !== workerId &&
0bc68267 1475 workerNode.usage.tasks.queued <
f778c355 1476 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1477 ) {
72ae84a2 1478 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1479 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1480 this.executeTask(workerNodeKey, task)
4de3d785 1481 } else {
dd951876 1482 this.enqueueTask(workerNodeKey, task)
4de3d785 1483 }
b1838604
JB
1484 this.updateTaskStolenStatisticsWorkerUsage(
1485 workerNodeKey,
b1838604
JB
1486 task.name as string
1487 )
10ecf8fd 1488 }
a2ed5053
JB
1489 }
1490 }
1491
be0676b3 1492 /**
aa9eede8 1493 * This method is the listener registered for each worker message.
be0676b3 1494 *
bdacc2d2 1495 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1496 */
1497 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1498 return message => {
21f710aa 1499 this.checkMessageWorkerId(message)
6703b9f4 1500 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1501 // Worker ready response received from worker
10e2aa7e 1502 this.handleWorkerReadyResponse(message)
7629bdf1 1503 } else if (message.taskId != null) {
81c02522 1504 // Task execution response received from worker
6b272951 1505 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1506 } else if (message.taskFunctionNames != null) {
1507 // Task function names message received from worker
46b0bb09
JB
1508 this.getWorkerInfo(
1509 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1510 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1511 }
1512 }
1513 }
1514
10e2aa7e 1515 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1516 if (message.ready === false) {
72ae84a2
JB
1517 throw new Error(
1518 `Worker ${message.workerId as number} failed to initialize`
1519 )
f05ed93c 1520 }
a5d15204 1521 const workerInfo = this.getWorkerInfo(
aad6fb64 1522 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1523 )
a5d15204 1524 workerInfo.ready = message.ready as boolean
6703b9f4 1525 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1526 if (this.ready) {
1527 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1528 }
6b272951
JB
1529 }
1530
1531 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1532 const { taskId, workerError, data } = message
5441aea6 1533 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1534 if (promiseResponse != null) {
6703b9f4
JB
1535 if (workerError != null) {
1536 this.emitter?.emit(PoolEvents.taskError, workerError)
1537 promiseResponse.reject(workerError.message)
6b272951 1538 } else {
5441aea6 1539 promiseResponse.resolve(data as Response)
6b272951 1540 }
501aea93
JB
1541 const workerNodeKey = promiseResponse.workerNodeKey
1542 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1543 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1544 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1545 if (
1546 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1547 this.tasksQueueSize(workerNodeKey) > 0 &&
1548 this.workerNodes[workerNodeKey].usage.tasks.executing <
1549 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1550 ) {
1551 this.executeTask(
1552 workerNodeKey,
1553 this.dequeueTask(workerNodeKey) as Task<Data>
1554 )
be0676b3
APA
1555 }
1556 }
be0676b3 1557 }
7c0ba920 1558
a1763c54 1559 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1560 if (this.busy) {
1561 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1562 }
1563 }
1564
1565 private checkAndEmitTaskQueuingEvents (): void {
1566 if (this.hasBackPressure()) {
1567 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1568 }
1569 }
1570
33e6bb4c
JB
1571 private checkAndEmitDynamicWorkerCreationEvents (): void {
1572 if (this.type === PoolTypes.dynamic) {
1573 if (this.full) {
1574 this.emitter?.emit(PoolEvents.full, this.info)
1575 }
1576 }
1577 }
1578
8a1260a3 1579 /**
aa9eede8 1580 * Gets the worker information given its worker node key.
8a1260a3
JB
1581 *
1582 * @param workerNodeKey - The worker node key.
3f09ed9f 1583 * @returns The worker information.
8a1260a3 1584 */
46b0bb09
JB
1585 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1586 return this.workerNodes[workerNodeKey].info
e221309a
JB
1587 }
1588
a05c10de 1589 /**
b0a4db63 1590 * Adds the given worker in the pool worker nodes.
ea7a90d3 1591 *
38e795c1 1592 * @param worker - The worker.
aa9eede8
JB
1593 * @returns The added worker node key.
1594 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1595 */
b0a4db63 1596 private addWorkerNode (worker: Worker): number {
671d5154
JB
1597 const workerNode = new WorkerNode<Worker, Data>(
1598 worker,
ff3f866a 1599 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1600 )
b97d82d8 1601 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1602 if (this.starting) {
1603 workerNode.info.ready = true
1604 }
aa9eede8 1605 this.workerNodes.push(workerNode)
aad6fb64 1606 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1607 if (workerNodeKey === -1) {
86ed0598 1608 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1609 }
1610 return workerNodeKey
ea7a90d3 1611 }
c923ce56 1612
51fe3d3c 1613 /**
f06e48d8 1614 * Removes the given worker from the pool worker nodes.
51fe3d3c 1615 *
f06e48d8 1616 * @param worker - The worker.
51fe3d3c 1617 */
416fd65c 1618 private removeWorkerNode (worker: Worker): void {
aad6fb64 1619 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1620 if (workerNodeKey !== -1) {
1621 this.workerNodes.splice(workerNodeKey, 1)
1622 this.workerChoiceStrategyContext.remove(workerNodeKey)
1623 }
51fe3d3c 1624 }
adc3c320 1625
e2b31e32
JB
1626 /** @inheritDoc */
1627 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1628 return (
e2b31e32
JB
1629 this.opts.enableTasksQueue === true &&
1630 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1631 )
1632 }
1633
1634 private hasBackPressure (): boolean {
1635 return (
1636 this.opts.enableTasksQueue === true &&
1637 this.workerNodes.findIndex(
041dc05b 1638 workerNode => !workerNode.hasBackPressure()
a1763c54 1639 ) === -1
9e844245 1640 )
e2b31e32
JB
1641 }
1642
b0a4db63 1643 /**
aa9eede8 1644 * Executes the given task on the worker given its worker node key.
b0a4db63 1645 *
aa9eede8 1646 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1647 * @param task - The task to execute.
1648 */
2e81254d 1649 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1650 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1651 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1652 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1653 }
1654
f9f00b5f 1655 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1656 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1657 this.checkAndEmitTaskQueuingEvents()
1658 return tasksQueueSize
adc3c320
JB
1659 }
1660
416fd65c 1661 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1662 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1663 }
1664
416fd65c 1665 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1666 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1667 }
1668
81c02522 1669 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1670 while (this.tasksQueueSize(workerNodeKey) > 0) {
1671 this.executeTask(
1672 workerNodeKey,
1673 this.dequeueTask(workerNodeKey) as Task<Data>
1674 )
ff733df7 1675 }
4b628b48 1676 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1677 }
1678
ef41a6e6
JB
1679 private flushTasksQueues (): void {
1680 for (const [workerNodeKey] of this.workerNodes.entries()) {
1681 this.flushTasksQueue(workerNodeKey)
1682 }
1683 }
c97c7edb 1684}