docs: refine README.md
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
76d91ea0 7 Task
5c4d16da 8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
dc021bcc 13 average,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
90d6701c 16 max,
afe0d5bf 17 median,
90d6701c 18 min,
bfc75cca 19 round
bbeadd16 20} from '../utils'
59317253 21import { KillBehaviors } from '../worker/worker-options'
6703b9f4 22import type { TaskFunction } from '../worker/task-functions'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
4d159167
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
bde6b5d7
JB
50import {
51 checkFilePath,
52 checkValidTasksQueueOptions,
bfc75cca
JB
53 checkValidWorkerChoiceStrategy,
54 updateMeasurementStatistics
bde6b5d7 55} from './utils'
23ccf9d7 56
729c563d 57/**
ea7a90d3 58 * Base class that implements some shared logic for all poolifier pools.
729c563d 59 *
38e795c1 60 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
61 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
62 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 63 */
c97c7edb 64export abstract class AbstractPool<
f06e48d8 65 Worker extends IWorker,
d3c8a1a8
S
66 Data = unknown,
67 Response = unknown
c4855468 68> implements IPool<Worker, Data, Response> {
afc003b2 69 /** @inheritDoc */
4b628b48 70 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 71
afc003b2 72 /** @inheritDoc */
7c0ba920
JB
73 public readonly emitter?: PoolEmitter
74
be0676b3 75 /**
419e8121 76 * The task execution response promise map:
2740a743 77 * - `key`: The message id of each submitted task.
a3445496 78 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 79 *
a3445496 80 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 81 */
501aea93
JB
82 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
83 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 84
a35560ba 85 /**
51fe3d3c 86 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
87 */
88 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
89 Worker,
90 Data,
91 Response
a35560ba
S
92 >
93
8735b4e5
JB
94 /**
95 * Dynamic pool maximum size property placeholder.
96 */
97 protected readonly max?: number
98
72ae84a2
JB
99 /**
100 * The task functions added at runtime map:
101 * - `key`: The task function name.
102 * - `value`: The task function itself.
103 */
104 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
105
15b176e0
JB
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
47352846
JB
110 /**
111 * Whether the pool is starting or not.
112 */
113 private starting: boolean
afe0d5bf
JB
114 /**
115 * The start timestamp of the pool.
116 */
117 private readonly startTimestamp
118
729c563d
S
119 /**
120 * Constructs a new poolifier pool.
121 *
38e795c1 122 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 123 * @param filePath - Path to the worker file.
38e795c1 124 * @param opts - Options for the pool.
729c563d 125 */
c97c7edb 126 public constructor (
b4213b7f
JB
127 protected readonly numberOfWorkers: number,
128 protected readonly filePath: string,
129 protected readonly opts: PoolOptions<Worker>
c97c7edb 130 ) {
78cea37e 131 if (!this.isMain()) {
04f45163 132 throw new Error(
8c6d4acf 133 'Cannot start a pool from a worker with the same type as the pool'
04f45163 134 )
c97c7edb 135 }
bde6b5d7 136 checkFilePath(this.filePath)
8003c026 137 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 138 this.checkPoolOptions(this.opts)
1086026a 139
7254e419
JB
140 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
141 this.executeTask = this.executeTask.bind(this)
142 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 143
6bd72cd0 144 if (this.opts.enableEvents === true) {
7c0ba920
JB
145 this.emitter = new PoolEmitter()
146 }
d59df138
JB
147 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
148 Worker,
149 Data,
150 Response
da309861
JB
151 >(
152 this,
153 this.opts.workerChoiceStrategy,
154 this.opts.workerChoiceStrategyOptions
155 )
b6b32453
JB
156
157 this.setupHook()
158
72ae84a2
JB
159 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
160
47352846 161 this.started = false
075e51d1 162 this.starting = false
47352846
JB
163 if (this.opts.startWorkers === true) {
164 this.start()
165 }
afe0d5bf
JB
166
167 this.startTimestamp = performance.now()
c97c7edb
S
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
7c0ba920 188 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 189 if (isPlainObject(opts)) {
47352846 190 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 191 checkValidWorkerChoiceStrategy(
2324f8c9
JB
192 opts.workerChoiceStrategy as WorkerChoiceStrategy
193 )
0d80593b
JB
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
196 this.checkValidWorkerChoiceStrategyOptions(
197 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
198 )
8990357d
JB
199 this.opts.workerChoiceStrategyOptions = {
200 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
201 ...opts.workerChoiceStrategyOptions
202 }
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
bde6b5d7 207 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 214 }
aee46736
JB
215 }
216
0d80593b
JB
217 private checkValidWorkerChoiceStrategyOptions (
218 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
219 ): void {
2324f8c9
JB
220 if (
221 workerChoiceStrategyOptions != null &&
222 !isPlainObject(workerChoiceStrategyOptions)
223 ) {
0d80593b
JB
224 throw new TypeError(
225 'Invalid worker choice strategy options: must be a plain object'
226 )
227 }
8990357d 228 if (
2324f8c9 229 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 230 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
231 ) {
232 throw new TypeError(
8c0b113f 233 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
234 )
235 }
236 if (
2324f8c9 237 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 238 workerChoiceStrategyOptions.retries < 0
8990357d
JB
239 ) {
240 throw new RangeError(
8c0b113f 241 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
242 )
243 }
49be33fe 244 if (
2324f8c9 245 workerChoiceStrategyOptions?.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803 252 if (
2324f8c9 253 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
08f3f44c 264 /** @inheritDoc */
6b27d407
JB
265 public get info (): PoolInfo {
266 return {
23ccf9d7 267 version,
6b27d407 268 type: this.type,
184855e6 269 worker: this.worker,
47352846 270 started: this.started,
2431bdb4
JB
271 ready: this.ready,
272 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
273 minSize: this.minSize,
274 maxSize: this.maxSize,
c05f0d50
JB
275 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 .runTime.aggregate &&
1305e9a8
JB
277 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
279 workerNodes: this.workerNodes.length,
280 idleWorkerNodes: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
f59e1027 282 workerNode.usage.tasks.executing === 0
a4e07f72
JB
283 ? accumulator + 1
284 : accumulator,
6b27d407
JB
285 0
286 ),
287 busyWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
f59e1027 289 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
290 0
291 ),
a4e07f72 292 executedTasks: this.workerNodes.reduce(
6b27d407 293 (accumulator, workerNode) =>
f59e1027 294 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
295 0
296 ),
297 executingTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
f59e1027 299 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
300 0
301 ),
daf86646
JB
302 ...(this.opts.enableTasksQueue === true && {
303 queuedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 accumulator + workerNode.usage.tasks.queued,
306 0
307 )
308 }),
309 ...(this.opts.enableTasksQueue === true && {
310 maxQueuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
313 0
314 )
315 }),
a1763c54
JB
316 ...(this.opts.enableTasksQueue === true && {
317 backPressure: this.hasBackPressure()
318 }),
68cbdc84
JB
319 ...(this.opts.enableTasksQueue === true && {
320 stolenTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 accumulator + workerNode.usage.tasks.stolen,
323 0
324 )
325 }),
a4e07f72
JB
326 failedTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
f59e1027 328 accumulator + workerNode.usage.tasks.failed,
a4e07f72 329 0
1dcf8b7b
JB
330 ),
331 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
332 .runTime.aggregate && {
333 runTime: {
98e72cda 334 minimum: round(
90d6701c 335 min(
98e72cda 336 ...this.workerNodes.map(
041dc05b 337 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 338 )
1dcf8b7b
JB
339 )
340 ),
98e72cda 341 maximum: round(
90d6701c 342 max(
98e72cda 343 ...this.workerNodes.map(
041dc05b 344 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 345 )
1dcf8b7b 346 )
98e72cda 347 ),
3baa0837
JB
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.average && {
350 average: round(
351 average(
352 this.workerNodes.reduce<number[]>(
353 (accumulator, workerNode) =>
354 accumulator.concat(workerNode.usage.runTime.history),
355 []
356 )
98e72cda 357 )
dc021bcc 358 )
3baa0837 359 }),
98e72cda
JB
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .runTime.median && {
362 median: round(
363 median(
3baa0837
JB
364 this.workerNodes.reduce<number[]>(
365 (accumulator, workerNode) =>
366 accumulator.concat(workerNode.usage.runTime.history),
367 []
98e72cda
JB
368 )
369 )
370 )
371 })
1dcf8b7b
JB
372 }
373 }),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .waitTime.aggregate && {
376 waitTime: {
98e72cda 377 minimum: round(
90d6701c 378 min(
98e72cda 379 ...this.workerNodes.map(
041dc05b 380 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 381 )
1dcf8b7b
JB
382 )
383 ),
98e72cda 384 maximum: round(
90d6701c 385 max(
98e72cda 386 ...this.workerNodes.map(
041dc05b 387 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 388 )
1dcf8b7b 389 )
98e72cda 390 ),
3baa0837
JB
391 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
392 .waitTime.average && {
393 average: round(
394 average(
395 this.workerNodes.reduce<number[]>(
396 (accumulator, workerNode) =>
397 accumulator.concat(workerNode.usage.waitTime.history),
398 []
399 )
98e72cda 400 )
dc021bcc 401 )
3baa0837 402 }),
98e72cda
JB
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .waitTime.median && {
405 median: round(
406 median(
3baa0837
JB
407 this.workerNodes.reduce<number[]>(
408 (accumulator, workerNode) =>
409 accumulator.concat(workerNode.usage.waitTime.history),
410 []
98e72cda
JB
411 )
412 )
413 )
414 })
1dcf8b7b
JB
415 }
416 })
6b27d407
JB
417 }
418 }
08f3f44c 419
aa9eede8
JB
420 /**
421 * The pool readiness boolean status.
422 */
2431bdb4
JB
423 private get ready (): boolean {
424 return (
b97d82d8
JB
425 this.workerNodes.reduce(
426 (accumulator, workerNode) =>
427 !workerNode.info.dynamic && workerNode.info.ready
428 ? accumulator + 1
429 : accumulator,
430 0
431 ) >= this.minSize
2431bdb4
JB
432 )
433 }
434
afe0d5bf 435 /**
aa9eede8 436 * The approximate pool utilization.
afe0d5bf
JB
437 *
438 * @returns The pool utilization.
439 */
440 private get utilization (): number {
8e5ca040 441 const poolTimeCapacity =
fe7d90db 442 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
443 const totalTasksRunTime = this.workerNodes.reduce(
444 (accumulator, workerNode) =>
71514351 445 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
446 0
447 )
448 const totalTasksWaitTime = this.workerNodes.reduce(
449 (accumulator, workerNode) =>
71514351 450 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
451 0
452 )
8e5ca040 453 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
454 }
455
8881ae32 456 /**
aa9eede8 457 * The pool type.
8881ae32
JB
458 *
459 * If it is `'dynamic'`, it provides the `max` property.
460 */
461 protected abstract get type (): PoolType
462
184855e6 463 /**
aa9eede8 464 * The worker type.
184855e6
JB
465 */
466 protected abstract get worker (): WorkerType
467
c2ade475 468 /**
aa9eede8 469 * The pool minimum size.
c2ade475 470 */
8735b4e5
JB
471 protected get minSize (): number {
472 return this.numberOfWorkers
473 }
ff733df7
JB
474
475 /**
aa9eede8 476 * The pool maximum size.
ff733df7 477 */
8735b4e5
JB
478 protected get maxSize (): number {
479 return this.max ?? this.numberOfWorkers
480 }
a35560ba 481
6b813701
JB
482 /**
483 * Checks if the worker id sent in the received message from a worker is valid.
484 *
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
487 */
4d159167 488 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
489 if (message.workerId == null) {
490 throw new Error('Worker message received without worker id')
491 } else if (
21f710aa 492 message.workerId != null &&
aad6fb64 493 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
494 ) {
495 throw new Error(
496 `Worker message received from unknown worker '${message.workerId}'`
497 )
498 }
499 }
500
ffcbbad8 501 /**
f06e48d8 502 * Gets the given worker its worker node key.
ffcbbad8
JB
503 *
504 * @param worker - The worker.
f59e1027 505 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 506 */
aad6fb64 507 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 508 return this.workerNodes.findIndex(
041dc05b 509 workerNode => workerNode.worker === worker
f06e48d8 510 )
bf9549ae
JB
511 }
512
aa9eede8
JB
513 /**
514 * Gets the worker node key given its worker id.
515 *
516 * @param workerId - The worker id.
aad6fb64 517 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 518 */
72ae84a2 519 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 520 return this.workerNodes.findIndex(
041dc05b 521 workerNode => workerNode.info.id === workerId
aad6fb64 522 )
aa9eede8
JB
523 }
524
afc003b2 525 /** @inheritDoc */
a35560ba 526 public setWorkerChoiceStrategy (
59219cbb
JB
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 529 ): void {
bde6b5d7 530 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 531 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
aa9eede8 538 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 539 workerNode.resetUsage()
9edb9717 540 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 541 }
a20f0ba5
JB
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
0d80593b 548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
549 this.opts.workerChoiceStrategyOptions = {
550 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
551 ...workerChoiceStrategyOptions
552 }
a20f0ba5
JB
553 this.workerChoiceStrategyContext.setOptions(
554 this.opts.workerChoiceStrategyOptions
a35560ba
S
555 )
556 }
557
a20f0ba5 558 /** @inheritDoc */
8f52842f
JB
559 public enableTasksQueue (
560 enable: boolean,
561 tasksQueueOptions?: TasksQueueOptions
562 ): void {
a20f0ba5 563 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
564 this.unsetTaskStealing()
565 this.unsetTasksStealingOnBackPressure()
ef41a6e6 566 this.flushTasksQueues()
a20f0ba5
JB
567 }
568 this.opts.enableTasksQueue = enable
8f52842f 569 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
570 }
571
572 /** @inheritDoc */
8f52842f 573 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 574 if (this.opts.enableTasksQueue === true) {
bde6b5d7 575 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
576 this.opts.tasksQueueOptions =
577 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 578 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
579 if (this.opts.tasksQueueOptions.taskStealing === true) {
580 this.setTaskStealing()
581 } else {
582 this.unsetTaskStealing()
583 }
584 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
585 this.setTasksStealingOnBackPressure()
586 } else {
587 this.unsetTasksStealingOnBackPressure()
588 }
5baee0d7 589 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
590 delete this.opts.tasksQueueOptions
591 }
592 }
593
9b38ab2d
JB
594 private buildTasksQueueOptions (
595 tasksQueueOptions: TasksQueueOptions
596 ): TasksQueueOptions {
597 return {
598 ...{
599 size: Math.pow(this.maxSize, 2),
600 concurrency: 1,
601 taskStealing: true,
602 tasksStealingOnBackPressure: true
603 },
604 ...tasksQueueOptions
605 }
606 }
607
5b49e864 608 private setTasksQueueSize (size: number): void {
20c6f652 609 for (const workerNode of this.workerNodes) {
ff3f866a 610 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
611 }
612 }
613
d6ca1416
JB
614 private setTaskStealing (): void {
615 for (const [workerNodeKey] of this.workerNodes.entries()) {
616 this.workerNodes[workerNodeKey].onEmptyQueue =
617 this.taskStealingOnEmptyQueue.bind(this)
618 }
619 }
620
621 private unsetTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
623 delete this.workerNodes[workerNodeKey].onEmptyQueue
624 }
625 }
626
627 private setTasksStealingOnBackPressure (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
629 this.workerNodes[workerNodeKey].onBackPressure =
630 this.tasksStealingOnBackPressure.bind(this)
631 }
632 }
633
634 private unsetTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
636 delete this.workerNodes[workerNodeKey].onBackPressure
637 }
638 }
639
c319c66b
JB
640 /**
641 * Whether the pool is full or not.
642 *
643 * The pool filling boolean status.
644 */
dea903a8
JB
645 protected get full (): boolean {
646 return this.workerNodes.length >= this.maxSize
647 }
c2ade475 648
c319c66b
JB
649 /**
650 * Whether the pool is busy or not.
651 *
652 * The pool busyness boolean status.
653 */
654 protected abstract get busy (): boolean
7c0ba920 655
6c6afb84 656 /**
3d76750a 657 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
658 *
659 * @returns Worker nodes busyness boolean status.
660 */
c2ade475 661 protected internalBusy (): boolean {
3d76750a
JB
662 if (this.opts.enableTasksQueue === true) {
663 return (
664 this.workerNodes.findIndex(
041dc05b 665 workerNode =>
3d76750a
JB
666 workerNode.info.ready &&
667 workerNode.usage.tasks.executing <
668 (this.opts.tasksQueueOptions?.concurrency as number)
669 ) === -1
670 )
3d76750a 671 }
419e8121
JB
672 return (
673 this.workerNodes.findIndex(
674 workerNode =>
675 workerNode.info.ready && workerNode.usage.tasks.executing === 0
676 ) === -1
677 )
cb70b19d
JB
678 }
679
e81c38f2 680 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
681 workerNodeKey: number,
682 message: MessageValue<Data>
683 ): Promise<boolean> {
72ae84a2 684 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
685 const taskFunctionOperationListener = (
686 message: MessageValue<Response>
687 ): void => {
688 this.checkMessageWorkerId(message)
689 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 690 if (
ae036c3e
JB
691 message.taskFunctionOperationStatus != null &&
692 message.workerId === workerId
72ae84a2 693 ) {
ae036c3e
JB
694 if (message.taskFunctionOperationStatus) {
695 resolve(true)
696 } else if (!message.taskFunctionOperationStatus) {
697 reject(
698 new Error(
699 `Task function operation '${
700 message.taskFunctionOperation as string
701 }' failed on worker ${message.workerId} with error: '${
702 message.workerError?.message as string
703 }'`
704 )
72ae84a2 705 )
ae036c3e
JB
706 }
707 this.deregisterWorkerMessageListener(
708 this.getWorkerNodeKeyByWorkerId(message.workerId),
709 taskFunctionOperationListener
72ae84a2
JB
710 )
711 }
ae036c3e
JB
712 }
713 this.registerWorkerMessageListener(
714 workerNodeKey,
715 taskFunctionOperationListener
716 )
72ae84a2
JB
717 this.sendToWorker(workerNodeKey, message)
718 })
719 }
720
721 private async sendTaskFunctionOperationToWorkers (
adee6053 722 message: MessageValue<Data>
e81c38f2
JB
723 ): Promise<boolean> {
724 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
725 const responsesReceived = new Array<MessageValue<Response>>()
726 const taskFunctionOperationsListener = (
727 message: MessageValue<Response>
728 ): void => {
729 this.checkMessageWorkerId(message)
730 if (message.taskFunctionOperationStatus != null) {
731 responsesReceived.push(message)
732 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 733 if (
e81c38f2
JB
734 responsesReceived.every(
735 message => message.taskFunctionOperationStatus === true
736 )
737 ) {
738 resolve(true)
739 } else if (
e81c38f2
JB
740 responsesReceived.some(
741 message => message.taskFunctionOperationStatus === false
742 )
743 ) {
b0b55f57
JB
744 const errorResponse = responsesReceived.find(
745 response => response.taskFunctionOperationStatus === false
746 )
e81c38f2
JB
747 reject(
748 new Error(
b0b55f57 749 `Task function operation '${
e81c38f2 750 message.taskFunctionOperation as string
b0b55f57
JB
751 }' failed on worker ${
752 errorResponse?.workerId as number
753 } with error: '${
754 errorResponse?.workerError?.message as string
755 }'`
e81c38f2
JB
756 )
757 )
758 }
ae036c3e
JB
759 this.deregisterWorkerMessageListener(
760 this.getWorkerNodeKeyByWorkerId(message.workerId),
761 taskFunctionOperationsListener
762 )
e81c38f2 763 }
ae036c3e
JB
764 }
765 }
766 for (const [workerNodeKey] of this.workerNodes.entries()) {
767 this.registerWorkerMessageListener(
768 workerNodeKey,
769 taskFunctionOperationsListener
770 )
72ae84a2 771 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
772 }
773 })
6703b9f4
JB
774 }
775
776 /** @inheritDoc */
777 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
778 for (const workerNode of this.workerNodes) {
779 if (
780 Array.isArray(workerNode.info.taskFunctionNames) &&
781 workerNode.info.taskFunctionNames.includes(name)
782 ) {
783 return true
784 }
785 }
786 return false
6703b9f4
JB
787 }
788
789 /** @inheritDoc */
e81c38f2
JB
790 public async addTaskFunction (
791 name: string,
3feeab69 792 fn: TaskFunction<Data, Response>
e81c38f2 793 ): Promise<boolean> {
3feeab69
JB
794 if (typeof name !== 'string') {
795 throw new TypeError('name argument must be a string')
796 }
797 if (typeof name === 'string' && name.trim().length === 0) {
798 throw new TypeError('name argument must not be an empty string')
799 }
800 if (typeof fn !== 'function') {
801 throw new TypeError('fn argument must be a function')
802 }
adee6053 803 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
804 taskFunctionOperation: 'add',
805 taskFunctionName: name,
3feeab69 806 taskFunction: fn.toString()
6703b9f4 807 })
adee6053
JB
808 this.taskFunctions.set(name, fn)
809 return opResult
6703b9f4
JB
810 }
811
812 /** @inheritDoc */
e81c38f2 813 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
814 if (!this.taskFunctions.has(name)) {
815 throw new Error(
16248b23 816 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
817 )
818 }
adee6053 819 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
820 taskFunctionOperation: 'remove',
821 taskFunctionName: name
822 })
adee6053
JB
823 this.deleteTaskFunctionWorkerUsages(name)
824 this.taskFunctions.delete(name)
825 return opResult
6703b9f4
JB
826 }
827
90d7d101 828 /** @inheritDoc */
6703b9f4 829 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
830 for (const workerNode of this.workerNodes) {
831 if (
6703b9f4
JB
832 Array.isArray(workerNode.info.taskFunctionNames) &&
833 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 834 ) {
6703b9f4 835 return workerNode.info.taskFunctionNames
f2dbbf95 836 }
90d7d101 837 }
f2dbbf95 838 return []
90d7d101
JB
839 }
840
6703b9f4 841 /** @inheritDoc */
e81c38f2 842 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 843 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
844 taskFunctionOperation: 'default',
845 taskFunctionName: name
846 })
6703b9f4
JB
847 }
848
adee6053
JB
849 private deleteTaskFunctionWorkerUsages (name: string): void {
850 for (const workerNode of this.workerNodes) {
851 workerNode.deleteTaskFunctionWorkerUsage(name)
852 }
853 }
854
375f7504
JB
855 private shallExecuteTask (workerNodeKey: number): boolean {
856 return (
857 this.tasksQueueSize(workerNodeKey) === 0 &&
858 this.workerNodes[workerNodeKey].usage.tasks.executing <
859 (this.opts.tasksQueueOptions?.concurrency as number)
860 )
861 }
862
afc003b2 863 /** @inheritDoc */
7d91a8cd
JB
864 public async execute (
865 data?: Data,
866 name?: string,
867 transferList?: TransferListItem[]
868 ): Promise<Response> {
52b71763 869 return await new Promise<Response>((resolve, reject) => {
15b176e0 870 if (!this.started) {
47352846 871 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 872 return
15b176e0 873 }
7d91a8cd
JB
874 if (name != null && typeof name !== 'string') {
875 reject(new TypeError('name argument must be a string'))
9d2d0da1 876 return
7d91a8cd 877 }
90d7d101
JB
878 if (
879 name != null &&
880 typeof name === 'string' &&
881 name.trim().length === 0
882 ) {
f58b60b9 883 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 884 return
90d7d101 885 }
b558f6b5
JB
886 if (transferList != null && !Array.isArray(transferList)) {
887 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 888 return
b558f6b5
JB
889 }
890 const timestamp = performance.now()
891 const workerNodeKey = this.chooseWorkerNode()
501aea93 892 const task: Task<Data> = {
52b71763
JB
893 name: name ?? DEFAULT_TASK_NAME,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data: data ?? ({} as Data),
7d91a8cd 896 transferList,
52b71763 897 timestamp,
7629bdf1 898 taskId: randomUUID()
52b71763 899 }
7629bdf1 900 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
901 resolve,
902 reject,
501aea93 903 workerNodeKey
2e81254d 904 })
52b71763 905 if (
4e377863
JB
906 this.opts.enableTasksQueue === false ||
907 (this.opts.enableTasksQueue === true &&
375f7504 908 this.shallExecuteTask(workerNodeKey))
52b71763 909 ) {
501aea93 910 this.executeTask(workerNodeKey, task)
4e377863
JB
911 } else {
912 this.enqueueTask(workerNodeKey, task)
52b71763 913 }
2e81254d 914 })
280c2a77 915 }
c97c7edb 916
47352846
JB
917 /** @inheritdoc */
918 public start (): void {
919 this.starting = true
920 while (
921 this.workerNodes.reduce(
922 (accumulator, workerNode) =>
923 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
924 0
925 ) < this.numberOfWorkers
926 ) {
927 this.createAndSetupWorkerNode()
928 }
929 this.starting = false
930 this.started = true
931 }
932
afc003b2 933 /** @inheritDoc */
c97c7edb 934 public async destroy (): Promise<void> {
1fbcaa7c 935 await Promise.all(
81c02522 936 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 937 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
938 })
939 )
33e6bb4c 940 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 941 this.started = false
c97c7edb
S
942 }
943
1e3214b6 944 protected async sendKillMessageToWorker (
72ae84a2 945 workerNodeKey: number
1e3214b6 946 ): Promise<void> {
9edb9717 947 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
948 const killMessageListener = (message: MessageValue<Response>): void => {
949 this.checkMessageWorkerId(message)
1e3214b6
JB
950 if (message.kill === 'success') {
951 resolve()
952 } else if (message.kill === 'failure') {
72ae84a2
JB
953 reject(
954 new Error(
ae036c3e 955 `Kill message handling failed on worker ${
72ae84a2 956 message.workerId as number
ae036c3e 957 }`
72ae84a2
JB
958 )
959 )
1e3214b6 960 }
ae036c3e
JB
961 }
962 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 963 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 964 })
1e3214b6
JB
965 }
966
4a6952ff 967 /**
aa9eede8 968 * Terminates the worker node given its worker node key.
4a6952ff 969 *
aa9eede8 970 * @param workerNodeKey - The worker node key.
4a6952ff 971 */
81c02522 972 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 973
729c563d 974 /**
6677a3d3
JB
975 * Setup hook to execute code before worker nodes are created in the abstract constructor.
976 * Can be overridden.
afc003b2
JB
977 *
978 * @virtual
729c563d 979 */
280c2a77 980 protected setupHook (): void {
965df41c 981 /* Intentionally empty */
280c2a77 982 }
c97c7edb 983
729c563d 984 /**
280c2a77
S
985 * Should return whether the worker is the main worker or not.
986 */
987 protected abstract isMain (): boolean
988
989 /**
2e81254d 990 * Hook executed before the worker task execution.
bf9549ae 991 * Can be overridden.
729c563d 992 *
f06e48d8 993 * @param workerNodeKey - The worker node key.
1c6fe997 994 * @param task - The task to execute.
729c563d 995 */
1c6fe997
JB
996 protected beforeTaskExecutionHook (
997 workerNodeKey: number,
998 task: Task<Data>
999 ): void {
94407def
JB
1000 if (this.workerNodes[workerNodeKey]?.usage != null) {
1001 const workerUsage = this.workerNodes[workerNodeKey].usage
1002 ++workerUsage.tasks.executing
1003 this.updateWaitTimeWorkerUsage(workerUsage, task)
1004 }
1005 if (
1006 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1007 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1008 task.name as string
1009 ) != null
1010 ) {
db0e38ee 1011 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1012 workerNodeKey
db0e38ee 1013 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1014 ++taskFunctionWorkerUsage.tasks.executing
1015 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1016 }
c97c7edb
S
1017 }
1018
c01733f1 1019 /**
2e81254d 1020 * Hook executed after the worker task execution.
bf9549ae 1021 * Can be overridden.
c01733f1 1022 *
501aea93 1023 * @param workerNodeKey - The worker node key.
38e795c1 1024 * @param message - The received message.
c01733f1 1025 */
2e81254d 1026 protected afterTaskExecutionHook (
501aea93 1027 workerNodeKey: number,
2740a743 1028 message: MessageValue<Response>
bf9549ae 1029 ): void {
94407def
JB
1030 if (this.workerNodes[workerNodeKey]?.usage != null) {
1031 const workerUsage = this.workerNodes[workerNodeKey].usage
1032 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1033 this.updateRunTimeWorkerUsage(workerUsage, message)
1034 this.updateEluWorkerUsage(workerUsage, message)
1035 }
1036 if (
1037 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1038 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1039 message.taskPerformance?.name as string
94407def
JB
1040 ) != null
1041 ) {
db0e38ee 1042 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1043 workerNodeKey
db0e38ee 1044 ].getTaskFunctionWorkerUsage(
0628755c 1045 message.taskPerformance?.name as string
b558f6b5 1046 ) as WorkerUsage
db0e38ee
JB
1047 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1048 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1049 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1050 }
1051 }
1052
db0e38ee
JB
1053 /**
1054 * Whether the worker node shall update its task function worker usage or not.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1058 */
1059 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1060 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1061 return (
94407def 1062 workerInfo != null &&
6703b9f4
JB
1063 Array.isArray(workerInfo.taskFunctionNames) &&
1064 workerInfo.taskFunctionNames.length > 2
b558f6b5 1065 )
f1c06930
JB
1066 }
1067
1068 private updateTaskStatisticsWorkerUsage (
1069 workerUsage: WorkerUsage,
1070 message: MessageValue<Response>
1071 ): void {
a4e07f72 1072 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1073 if (
1074 workerTaskStatistics.executing != null &&
1075 workerTaskStatistics.executing > 0
1076 ) {
1077 --workerTaskStatistics.executing
5bb5be17 1078 }
6703b9f4 1079 if (message.workerError == null) {
98e72cda
JB
1080 ++workerTaskStatistics.executed
1081 } else {
a4e07f72 1082 ++workerTaskStatistics.failed
2740a743 1083 }
f8eb0a2a
JB
1084 }
1085
a4e07f72
JB
1086 private updateRunTimeWorkerUsage (
1087 workerUsage: WorkerUsage,
f8eb0a2a
JB
1088 message: MessageValue<Response>
1089 ): void {
6703b9f4 1090 if (message.workerError != null) {
dc021bcc
JB
1091 return
1092 }
e4f20deb
JB
1093 updateMeasurementStatistics(
1094 workerUsage.runTime,
1095 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1096 message.taskPerformance?.runTime ?? 0
e4f20deb 1097 )
f8eb0a2a
JB
1098 }
1099
a4e07f72
JB
1100 private updateWaitTimeWorkerUsage (
1101 workerUsage: WorkerUsage,
1c6fe997 1102 task: Task<Data>
f8eb0a2a 1103 ): void {
1c6fe997
JB
1104 const timestamp = performance.now()
1105 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1106 updateMeasurementStatistics(
1107 workerUsage.waitTime,
1108 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1109 taskWaitTime
e4f20deb 1110 )
c01733f1 1111 }
1112
a4e07f72 1113 private updateEluWorkerUsage (
5df69fab 1114 workerUsage: WorkerUsage,
62c15a68
JB
1115 message: MessageValue<Response>
1116 ): void {
6703b9f4 1117 if (message.workerError != null) {
dc021bcc
JB
1118 return
1119 }
008512c7
JB
1120 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1121 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1122 updateMeasurementStatistics(
1123 workerUsage.elu.active,
008512c7 1124 eluTaskStatisticsRequirements,
dc021bcc 1125 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1126 )
1127 updateMeasurementStatistics(
1128 workerUsage.elu.idle,
008512c7 1129 eluTaskStatisticsRequirements,
dc021bcc 1130 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1131 )
008512c7 1132 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1133 if (message.taskPerformance?.elu != null) {
f7510105
JB
1134 if (workerUsage.elu.utilization != null) {
1135 workerUsage.elu.utilization =
1136 (workerUsage.elu.utilization +
1137 message.taskPerformance.elu.utilization) /
1138 2
1139 } else {
1140 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1141 }
62c15a68
JB
1142 }
1143 }
1144 }
1145
280c2a77 1146 /**
f06e48d8 1147 * Chooses a worker node for the next task.
280c2a77 1148 *
6c6afb84 1149 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1150 *
aa9eede8 1151 * @returns The chosen worker node key
280c2a77 1152 */
6c6afb84 1153 private chooseWorkerNode (): number {
930dcf12 1154 if (this.shallCreateDynamicWorker()) {
aa9eede8 1155 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1156 if (
b1aae695 1157 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1158 ) {
aa9eede8 1159 return workerNodeKey
6c6afb84 1160 }
17393ac8 1161 }
930dcf12
JB
1162 return this.workerChoiceStrategyContext.execute()
1163 }
1164
6c6afb84
JB
1165 /**
1166 * Conditions for dynamic worker creation.
1167 *
1168 * @returns Whether to create a dynamic worker or not.
1169 */
1170 private shallCreateDynamicWorker (): boolean {
930dcf12 1171 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1172 }
1173
280c2a77 1174 /**
aa9eede8 1175 * Sends a message to worker given its worker node key.
280c2a77 1176 *
aa9eede8 1177 * @param workerNodeKey - The worker node key.
38e795c1 1178 * @param message - The message.
7d91a8cd 1179 * @param transferList - The optional array of transferable objects.
280c2a77
S
1180 */
1181 protected abstract sendToWorker (
aa9eede8 1182 workerNodeKey: number,
7d91a8cd
JB
1183 message: MessageValue<Data>,
1184 transferList?: TransferListItem[]
280c2a77
S
1185 ): void
1186
729c563d 1187 /**
41344292 1188 * Creates a new worker.
6c6afb84
JB
1189 *
1190 * @returns Newly created worker.
729c563d 1191 */
280c2a77 1192 protected abstract createWorker (): Worker
c97c7edb 1193
4a6952ff 1194 /**
aa9eede8 1195 * Creates a new, completely set up worker node.
4a6952ff 1196 *
aa9eede8 1197 * @returns New, completely set up worker node key.
4a6952ff 1198 */
aa9eede8 1199 protected createAndSetupWorkerNode (): number {
bdacc2d2 1200 const worker = this.createWorker()
280c2a77 1201
fd04474e 1202 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1203 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1204 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1205 worker.on('error', error => {
aad6fb64 1206 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1207 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1208 workerInfo.ready = false
0dc838e3 1209 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1210 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1211 if (
b6bfca01 1212 this.started &&
9b38ab2d
JB
1213 !this.starting &&
1214 this.opts.restartWorkerOnError === true
15b176e0 1215 ) {
9b106837 1216 if (workerInfo.dynamic) {
aa9eede8 1217 this.createAndSetupDynamicWorkerNode()
8a1260a3 1218 } else {
aa9eede8 1219 this.createAndSetupWorkerNode()
8a1260a3 1220 }
5baee0d7 1221 }
9b38ab2d 1222 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1223 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1224 }
5baee0d7 1225 })
a35560ba 1226 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1227 worker.once('exit', () => {
f06e48d8 1228 this.removeWorkerNode(worker)
a974afa6 1229 })
280c2a77 1230
aa9eede8 1231 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1232
aa9eede8 1233 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1234
aa9eede8 1235 return workerNodeKey
c97c7edb 1236 }
be0676b3 1237
930dcf12 1238 /**
aa9eede8 1239 * Creates a new, completely set up dynamic worker node.
930dcf12 1240 *
aa9eede8 1241 * @returns New, completely set up dynamic worker node key.
930dcf12 1242 */
aa9eede8
JB
1243 protected createAndSetupDynamicWorkerNode (): number {
1244 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1245 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1246 this.checkMessageWorkerId(message)
aa9eede8
JB
1247 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1248 message.workerId
aad6fb64 1249 )
aa9eede8 1250 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1251 // Kill message received from worker
930dcf12
JB
1252 if (
1253 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1254 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1255 ((this.opts.enableTasksQueue === false &&
aa9eede8 1256 workerUsage.tasks.executing === 0) ||
7b56f532 1257 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1258 workerUsage.tasks.executing === 0 &&
1259 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1260 ) {
041dc05b 1261 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1262 this.emitter?.emit(PoolEvents.error, error)
1263 })
930dcf12
JB
1264 }
1265 })
46b0bb09 1266 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1267 this.sendToWorker(workerNodeKey, {
e9dd5b66 1268 checkActive: true
21f710aa 1269 })
72ae84a2
JB
1270 if (this.taskFunctions.size > 0) {
1271 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1272 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1273 taskFunctionOperation: 'add',
1274 taskFunctionName,
1275 taskFunction: taskFunction.toString()
1276 }).catch(error => {
1277 this.emitter?.emit(PoolEvents.error, error)
1278 })
1279 }
1280 }
b5e113f6 1281 workerInfo.dynamic = true
b1aae695
JB
1282 if (
1283 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1284 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1285 ) {
b5e113f6
JB
1286 workerInfo.ready = true
1287 }
33e6bb4c 1288 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1289 return workerNodeKey
930dcf12
JB
1290 }
1291
a2ed5053 1292 /**
aa9eede8 1293 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1294 *
aa9eede8 1295 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1296 * @param listener - The message listener callback.
1297 */
85aeb3f3
JB
1298 protected abstract registerWorkerMessageListener<
1299 Message extends Data | Response
aa9eede8
JB
1300 >(
1301 workerNodeKey: number,
1302 listener: (message: MessageValue<Message>) => void
1303 ): void
a2ed5053 1304
ae036c3e
JB
1305 /**
1306 * Registers once a listener callback on the worker given its worker node key.
1307 *
1308 * @param workerNodeKey - The worker node key.
1309 * @param listener - The message listener callback.
1310 */
1311 protected abstract registerOnceWorkerMessageListener<
1312 Message extends Data | Response
1313 >(
1314 workerNodeKey: number,
1315 listener: (message: MessageValue<Message>) => void
1316 ): void
1317
1318 /**
1319 * Deregisters a listener callback on the worker given its worker node key.
1320 *
1321 * @param workerNodeKey - The worker node key.
1322 * @param listener - The message listener callback.
1323 */
1324 protected abstract deregisterWorkerMessageListener<
1325 Message extends Data | Response
1326 >(
1327 workerNodeKey: number,
1328 listener: (message: MessageValue<Message>) => void
1329 ): void
1330
a2ed5053 1331 /**
aa9eede8 1332 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1333 * Can be overridden.
1334 *
aa9eede8 1335 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1336 */
aa9eede8 1337 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1338 // Listen to worker messages.
aa9eede8 1339 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1340 // Send the startup message to worker.
aa9eede8 1341 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1342 // Send the statistics message to worker.
1343 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1344 if (this.opts.enableTasksQueue === true) {
dbd73092 1345 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1346 this.workerNodes[workerNodeKey].onEmptyQueue =
1347 this.taskStealingOnEmptyQueue.bind(this)
1348 }
1349 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1350 this.workerNodes[workerNodeKey].onBackPressure =
1351 this.tasksStealingOnBackPressure.bind(this)
1352 }
72695f86 1353 }
d2c73f82
JB
1354 }
1355
85aeb3f3 1356 /**
aa9eede8
JB
1357 * Sends the startup message to worker given its worker node key.
1358 *
1359 * @param workerNodeKey - The worker node key.
1360 */
1361 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1362
1363 /**
9edb9717 1364 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1365 *
aa9eede8 1366 * @param workerNodeKey - The worker node key.
85aeb3f3 1367 */
9edb9717 1368 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1369 this.sendToWorker(workerNodeKey, {
1370 statistics: {
1371 runTime:
1372 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1373 .runTime.aggregate,
1374 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1375 .elu.aggregate
72ae84a2 1376 }
aa9eede8
JB
1377 })
1378 }
a2ed5053
JB
1379
1380 private redistributeQueuedTasks (workerNodeKey: number): void {
1381 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1382 const destinationWorkerNodeKey = this.workerNodes.reduce(
1383 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1384 return workerNode.info.ready &&
1385 workerNode.usage.tasks.queued <
1386 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1387 ? workerNodeKey
1388 : minWorkerNodeKey
1389 },
1390 0
1391 )
72ae84a2 1392 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1393 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1394 this.executeTask(destinationWorkerNodeKey, task)
1395 } else {
1396 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1397 }
1398 }
1399 }
1400
b1838604
JB
1401 private updateTaskStolenStatisticsWorkerUsage (
1402 workerNodeKey: number,
b1838604
JB
1403 taskName: string
1404 ): void {
1a880eca 1405 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1406 if (workerNode?.usage != null) {
1407 ++workerNode.usage.tasks.stolen
1408 }
1409 if (
1410 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1411 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1412 ) {
1413 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1414 taskName
1415 ) as WorkerUsage
1416 ++taskFunctionWorkerUsage.tasks.stolen
1417 }
1418 }
1419
dd951876 1420 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1421 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1422 const workerNodes = this.workerNodes
a6b3272b 1423 .slice()
dd951876
JB
1424 .sort(
1425 (workerNodeA, workerNodeB) =>
1426 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1427 )
f201a0cd 1428 const sourceWorkerNode = workerNodes.find(
041dc05b 1429 workerNode =>
f201a0cd
JB
1430 workerNode.info.ready &&
1431 workerNode.info.id !== workerId &&
1432 workerNode.usage.tasks.queued > 0
1433 )
1434 if (sourceWorkerNode != null) {
72ae84a2 1435 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1436 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1437 this.executeTask(destinationWorkerNodeKey, task)
1438 } else {
1439 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1440 }
f201a0cd
JB
1441 this.updateTaskStolenStatisticsWorkerUsage(
1442 destinationWorkerNodeKey,
1443 task.name as string
1444 )
72695f86
JB
1445 }
1446 }
1447
1448 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1449 const sizeOffset = 1
1450 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1451 return
1452 }
72695f86
JB
1453 const sourceWorkerNode =
1454 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1455 const workerNodes = this.workerNodes
a6b3272b 1456 .slice()
72695f86
JB
1457 .sort(
1458 (workerNodeA, workerNodeB) =>
1459 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1460 )
1461 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1462 if (
0bc68267 1463 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1464 workerNode.info.ready &&
1465 workerNode.info.id !== workerId &&
0bc68267 1466 workerNode.usage.tasks.queued <
f778c355 1467 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1468 ) {
72ae84a2 1469 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1470 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1471 this.executeTask(workerNodeKey, task)
4de3d785 1472 } else {
dd951876 1473 this.enqueueTask(workerNodeKey, task)
4de3d785 1474 }
b1838604
JB
1475 this.updateTaskStolenStatisticsWorkerUsage(
1476 workerNodeKey,
b1838604
JB
1477 task.name as string
1478 )
10ecf8fd 1479 }
a2ed5053
JB
1480 }
1481 }
1482
be0676b3 1483 /**
aa9eede8 1484 * This method is the listener registered for each worker message.
be0676b3 1485 *
bdacc2d2 1486 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1487 */
1488 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1489 return message => {
21f710aa 1490 this.checkMessageWorkerId(message)
6703b9f4 1491 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1492 // Worker ready response received from worker
10e2aa7e 1493 this.handleWorkerReadyResponse(message)
7629bdf1 1494 } else if (message.taskId != null) {
81c02522 1495 // Task execution response received from worker
6b272951 1496 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1497 } else if (message.taskFunctionNames != null) {
1498 // Task function names message received from worker
46b0bb09
JB
1499 this.getWorkerInfo(
1500 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1501 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1502 }
1503 }
1504 }
1505
10e2aa7e 1506 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1507 if (message.ready === false) {
72ae84a2
JB
1508 throw new Error(
1509 `Worker ${message.workerId as number} failed to initialize`
1510 )
f05ed93c 1511 }
a5d15204 1512 const workerInfo = this.getWorkerInfo(
aad6fb64 1513 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1514 )
a5d15204 1515 workerInfo.ready = message.ready as boolean
6703b9f4 1516 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1517 if (this.ready) {
1518 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1519 }
6b272951
JB
1520 }
1521
1522 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1523 const { taskId, workerError, data } = message
5441aea6 1524 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1525 if (promiseResponse != null) {
6703b9f4
JB
1526 if (workerError != null) {
1527 this.emitter?.emit(PoolEvents.taskError, workerError)
1528 promiseResponse.reject(workerError.message)
6b272951 1529 } else {
5441aea6 1530 promiseResponse.resolve(data as Response)
6b272951 1531 }
501aea93
JB
1532 const workerNodeKey = promiseResponse.workerNodeKey
1533 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1534 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1535 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1536 if (
1537 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1538 this.tasksQueueSize(workerNodeKey) > 0 &&
1539 this.workerNodes[workerNodeKey].usage.tasks.executing <
1540 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1541 ) {
1542 this.executeTask(
1543 workerNodeKey,
1544 this.dequeueTask(workerNodeKey) as Task<Data>
1545 )
be0676b3
APA
1546 }
1547 }
be0676b3 1548 }
7c0ba920 1549
a1763c54 1550 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1551 if (this.busy) {
1552 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1553 }
1554 }
1555
1556 private checkAndEmitTaskQueuingEvents (): void {
1557 if (this.hasBackPressure()) {
1558 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1559 }
1560 }
1561
33e6bb4c
JB
1562 private checkAndEmitDynamicWorkerCreationEvents (): void {
1563 if (this.type === PoolTypes.dynamic) {
1564 if (this.full) {
1565 this.emitter?.emit(PoolEvents.full, this.info)
1566 }
1567 }
1568 }
1569
8a1260a3 1570 /**
aa9eede8 1571 * Gets the worker information given its worker node key.
8a1260a3
JB
1572 *
1573 * @param workerNodeKey - The worker node key.
3f09ed9f 1574 * @returns The worker information.
8a1260a3 1575 */
46b0bb09
JB
1576 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1577 return this.workerNodes[workerNodeKey].info
e221309a
JB
1578 }
1579
a05c10de 1580 /**
b0a4db63 1581 * Adds the given worker in the pool worker nodes.
ea7a90d3 1582 *
38e795c1 1583 * @param worker - The worker.
aa9eede8
JB
1584 * @returns The added worker node key.
1585 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1586 */
b0a4db63 1587 private addWorkerNode (worker: Worker): number {
671d5154
JB
1588 const workerNode = new WorkerNode<Worker, Data>(
1589 worker,
ff3f866a 1590 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1591 )
b97d82d8 1592 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1593 if (this.starting) {
1594 workerNode.info.ready = true
1595 }
aa9eede8 1596 this.workerNodes.push(workerNode)
aad6fb64 1597 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1598 if (workerNodeKey === -1) {
86ed0598 1599 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1600 }
1601 return workerNodeKey
ea7a90d3 1602 }
c923ce56 1603
51fe3d3c 1604 /**
f06e48d8 1605 * Removes the given worker from the pool worker nodes.
51fe3d3c 1606 *
f06e48d8 1607 * @param worker - The worker.
51fe3d3c 1608 */
416fd65c 1609 private removeWorkerNode (worker: Worker): void {
aad6fb64 1610 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1611 if (workerNodeKey !== -1) {
1612 this.workerNodes.splice(workerNodeKey, 1)
1613 this.workerChoiceStrategyContext.remove(workerNodeKey)
1614 }
51fe3d3c 1615 }
adc3c320 1616
e2b31e32
JB
1617 /** @inheritDoc */
1618 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1619 return (
e2b31e32
JB
1620 this.opts.enableTasksQueue === true &&
1621 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1622 )
1623 }
1624
1625 private hasBackPressure (): boolean {
1626 return (
1627 this.opts.enableTasksQueue === true &&
1628 this.workerNodes.findIndex(
041dc05b 1629 workerNode => !workerNode.hasBackPressure()
a1763c54 1630 ) === -1
9e844245 1631 )
e2b31e32
JB
1632 }
1633
b0a4db63 1634 /**
aa9eede8 1635 * Executes the given task on the worker given its worker node key.
b0a4db63 1636 *
aa9eede8 1637 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1638 * @param task - The task to execute.
1639 */
2e81254d 1640 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1641 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1642 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1643 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1644 }
1645
f9f00b5f 1646 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1647 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1648 this.checkAndEmitTaskQueuingEvents()
1649 return tasksQueueSize
adc3c320
JB
1650 }
1651
416fd65c 1652 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1653 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1654 }
1655
416fd65c 1656 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1657 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1658 }
1659
81c02522 1660 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1661 while (this.tasksQueueSize(workerNodeKey) > 0) {
1662 this.executeTask(
1663 workerNodeKey,
1664 this.dequeueTask(workerNodeKey) as Task<Data>
1665 )
ff733df7 1666 }
4b628b48 1667 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1668 }
1669
ef41a6e6
JB
1670 private flushTasksQueues (): void {
1671 for (const [workerNodeKey] of this.workerNodes.entries()) {
1672 this.flushTasksQueue(workerNodeKey)
1673 }
1674 }
c97c7edb 1675}