refactor: reorder pool options validation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
c4855468 24import {
65d7a1c9 25 type IPool,
7c5a1080 26 PoolEmitter,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
bbfa38a2
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
23ccf9d7 51
729c563d 52/**
ea7a90d3 53 * Base class that implements some shared logic for all poolifier pools.
729c563d 54 *
38e795c1 55 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractPool<
f06e48d8 60 Worker extends IWorker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
c4855468 63> implements IPool<Worker, Data, Response> {
afc003b2 64 /** @inheritDoc */
4b628b48 65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 66
afc003b2 67 /** @inheritDoc */
7c0ba920
JB
68 public readonly emitter?: PoolEmitter
69
be0676b3 70 /**
192566ec 71 * The task execution response promise map:
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
15b176e0
JB
94 /**
95 * Whether the pool is started or not.
96 */
97 private started: boolean
47352846
JB
98 /**
99 * Whether the pool is starting or not.
100 */
101 private starting: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
47352846 147 this.started = false
075e51d1 148 this.starting = false
47352846
JB
149 if (this.opts.startWorkers === true) {
150 this.start()
151 }
afe0d5bf
JB
152
153 this.startTimestamp = performance.now()
c97c7edb
S
154 }
155
a35560ba 156 private checkFilePath (filePath: string): void {
ffcbbad8
JB
157 if (
158 filePath == null ||
3d6dd312 159 typeof filePath !== 'string' ||
ffcbbad8
JB
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
c510fea7
APA
162 throw new Error('Please specify a file with a worker implementation')
163 }
3d6dd312
JB
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
c510fea7
APA
167 }
168
8d3782fa
JB
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
78cea37e 174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 175 throw new TypeError(
0d80593b 176 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
177 )
178 } else if (numberOfWorkers < 0) {
473c717a 179 throw new RangeError(
8d3782fa
JB
180 'Cannot instantiate a pool with a negative number of workers'
181 )
6b27d407 182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 188 if (this.type === PoolTypes.dynamic) {
a5ed75b7 189 if (max == null) {
e695d66f 190 throw new TypeError(
a5ed75b7
JB
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
079de991
JB
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
b97d82d8 201 } else if (max === 0) {
079de991 202 throw new RangeError(
d640b48b 203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
8d3782fa
JB
210 }
211 }
212
7c0ba920 213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 214 if (isPlainObject(opts)) {
47352846 215 this.opts.startWorkers = opts.startWorkers ?? true
2324f8c9
JB
216 this.checkValidWorkerChoiceStrategy(
217 opts.workerChoiceStrategy as WorkerChoiceStrategy
218 )
0d80593b
JB
219 this.opts.workerChoiceStrategy =
220 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
223 )
8990357d
JB
224 this.opts.workerChoiceStrategyOptions = {
225 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
226 ...opts.workerChoiceStrategyOptions
227 }
1f68cede 228 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
229 this.opts.enableEvents = opts.enableEvents ?? true
230 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
231 if (this.opts.enableTasksQueue) {
232 this.checkValidTasksQueueOptions(
233 opts.tasksQueueOptions as TasksQueueOptions
234 )
235 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
236 opts.tasksQueueOptions as TasksQueueOptions
237 )
238 }
239 } else {
240 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 241 }
aee46736
JB
242 }
243
244 private checkValidWorkerChoiceStrategy (
245 workerChoiceStrategy: WorkerChoiceStrategy
246 ): void {
2324f8c9
JB
247 if (
248 workerChoiceStrategy != null &&
249 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
250 ) {
b529c323 251 throw new Error(
aee46736 252 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
253 )
254 }
7c0ba920
JB
255 }
256
0d80593b
JB
257 private checkValidWorkerChoiceStrategyOptions (
258 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
259 ): void {
2324f8c9
JB
260 if (
261 workerChoiceStrategyOptions != null &&
262 !isPlainObject(workerChoiceStrategyOptions)
263 ) {
0d80593b
JB
264 throw new TypeError(
265 'Invalid worker choice strategy options: must be a plain object'
266 )
267 }
8990357d 268 if (
2324f8c9 269 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 270 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
271 ) {
272 throw new TypeError(
8c0b113f 273 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
274 )
275 }
276 if (
2324f8c9 277 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 278 workerChoiceStrategyOptions.retries < 0
8990357d
JB
279 ) {
280 throw new RangeError(
8c0b113f 281 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
282 )
283 }
49be33fe 284 if (
2324f8c9 285 workerChoiceStrategyOptions?.weights != null &&
6b27d407 286 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
287 ) {
288 throw new Error(
289 'Invalid worker choice strategy options: must have a weight for each worker node'
290 )
291 }
f0d7f803 292 if (
2324f8c9 293 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
294 !Object.values(Measurements).includes(
295 workerChoiceStrategyOptions.measurement
296 )
297 ) {
298 throw new Error(
299 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
300 )
301 }
0d80593b
JB
302 }
303
a20f0ba5 304 private checkValidTasksQueueOptions (
76d91ea0 305 tasksQueueOptions: TasksQueueOptions
a20f0ba5 306 ): void {
0d80593b
JB
307 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
308 throw new TypeError('Invalid tasks queue options: must be a plain object')
309 }
f0d7f803 310 if (
b7d085c4 311 tasksQueueOptions?.concurrency != null &&
2324f8c9 312 !Number.isSafeInteger(tasksQueueOptions.concurrency)
f0d7f803
JB
313 ) {
314 throw new TypeError(
20c6f652 315 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
316 )
317 }
318 if (
b7d085c4 319 tasksQueueOptions?.concurrency != null &&
2324f8c9 320 tasksQueueOptions.concurrency <= 0
f0d7f803 321 ) {
e695d66f 322 throw new RangeError(
2324f8c9 323 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
20c6f652
JB
324 )
325 }
20c6f652 326 if (
b7d085c4 327 tasksQueueOptions?.size != null &&
2324f8c9 328 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 329 ) {
ff3f866a 330 throw new TypeError(
68dbcdc0 331 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
332 )
333 }
2324f8c9 334 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 335 throw new RangeError(
2324f8c9 336 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
337 )
338 }
339 }
340
08f3f44c 341 /** @inheritDoc */
6b27d407
JB
342 public get info (): PoolInfo {
343 return {
23ccf9d7 344 version,
6b27d407 345 type: this.type,
184855e6 346 worker: this.worker,
47352846 347 started: this.started,
2431bdb4
JB
348 ready: this.ready,
349 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
350 minSize: this.minSize,
351 maxSize: this.maxSize,
c05f0d50
JB
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.aggregate &&
1305e9a8
JB
354 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
356 workerNodes: this.workerNodes.length,
357 idleWorkerNodes: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
f59e1027 359 workerNode.usage.tasks.executing === 0
a4e07f72
JB
360 ? accumulator + 1
361 : accumulator,
6b27d407
JB
362 0
363 ),
364 busyWorkerNodes: this.workerNodes.reduce(
365 (accumulator, workerNode) =>
f59e1027 366 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
367 0
368 ),
a4e07f72 369 executedTasks: this.workerNodes.reduce(
6b27d407 370 (accumulator, workerNode) =>
f59e1027 371 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
372 0
373 ),
374 executingTasks: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
377 0
378 ),
daf86646
JB
379 ...(this.opts.enableTasksQueue === true && {
380 queuedTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + workerNode.usage.tasks.queued,
383 0
384 )
385 }),
386 ...(this.opts.enableTasksQueue === true && {
387 maxQueuedTasks: this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
390 0
391 )
392 }),
a1763c54
JB
393 ...(this.opts.enableTasksQueue === true && {
394 backPressure: this.hasBackPressure()
395 }),
68cbdc84
JB
396 ...(this.opts.enableTasksQueue === true && {
397 stolenTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + workerNode.usage.tasks.stolen,
400 0
401 )
402 }),
a4e07f72
JB
403 failedTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
f59e1027 405 accumulator + workerNode.usage.tasks.failed,
a4e07f72 406 0
1dcf8b7b
JB
407 ),
408 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
409 .runTime.aggregate && {
410 runTime: {
98e72cda 411 minimum: round(
90d6701c 412 min(
98e72cda 413 ...this.workerNodes.map(
041dc05b 414 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 415 )
1dcf8b7b
JB
416 )
417 ),
98e72cda 418 maximum: round(
90d6701c 419 max(
98e72cda 420 ...this.workerNodes.map(
041dc05b 421 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 422 )
1dcf8b7b 423 )
98e72cda 424 ),
3baa0837
JB
425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
426 .runTime.average && {
427 average: round(
428 average(
429 this.workerNodes.reduce<number[]>(
430 (accumulator, workerNode) =>
431 accumulator.concat(workerNode.usage.runTime.history),
432 []
433 )
98e72cda 434 )
dc021bcc 435 )
3baa0837 436 }),
98e72cda
JB
437 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
438 .runTime.median && {
439 median: round(
440 median(
3baa0837
JB
441 this.workerNodes.reduce<number[]>(
442 (accumulator, workerNode) =>
443 accumulator.concat(workerNode.usage.runTime.history),
444 []
98e72cda
JB
445 )
446 )
447 )
448 })
1dcf8b7b
JB
449 }
450 }),
451 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
452 .waitTime.aggregate && {
453 waitTime: {
98e72cda 454 minimum: round(
90d6701c 455 min(
98e72cda 456 ...this.workerNodes.map(
041dc05b 457 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 458 )
1dcf8b7b
JB
459 )
460 ),
98e72cda 461 maximum: round(
90d6701c 462 max(
98e72cda 463 ...this.workerNodes.map(
041dc05b 464 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 465 )
1dcf8b7b 466 )
98e72cda 467 ),
3baa0837
JB
468 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
469 .waitTime.average && {
470 average: round(
471 average(
472 this.workerNodes.reduce<number[]>(
473 (accumulator, workerNode) =>
474 accumulator.concat(workerNode.usage.waitTime.history),
475 []
476 )
98e72cda 477 )
dc021bcc 478 )
3baa0837 479 }),
98e72cda
JB
480 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 .waitTime.median && {
482 median: round(
483 median(
3baa0837
JB
484 this.workerNodes.reduce<number[]>(
485 (accumulator, workerNode) =>
486 accumulator.concat(workerNode.usage.waitTime.history),
487 []
98e72cda
JB
488 )
489 )
490 )
491 })
1dcf8b7b
JB
492 }
493 })
6b27d407
JB
494 }
495 }
08f3f44c 496
aa9eede8
JB
497 /**
498 * The pool readiness boolean status.
499 */
2431bdb4
JB
500 private get ready (): boolean {
501 return (
b97d82d8
JB
502 this.workerNodes.reduce(
503 (accumulator, workerNode) =>
504 !workerNode.info.dynamic && workerNode.info.ready
505 ? accumulator + 1
506 : accumulator,
507 0
508 ) >= this.minSize
2431bdb4
JB
509 )
510 }
511
afe0d5bf 512 /**
aa9eede8 513 * The approximate pool utilization.
afe0d5bf
JB
514 *
515 * @returns The pool utilization.
516 */
517 private get utilization (): number {
8e5ca040 518 const poolTimeCapacity =
fe7d90db 519 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
520 const totalTasksRunTime = this.workerNodes.reduce(
521 (accumulator, workerNode) =>
71514351 522 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
523 0
524 )
525 const totalTasksWaitTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
71514351 527 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
528 0
529 )
8e5ca040 530 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
531 }
532
8881ae32 533 /**
aa9eede8 534 * The pool type.
8881ae32
JB
535 *
536 * If it is `'dynamic'`, it provides the `max` property.
537 */
538 protected abstract get type (): PoolType
539
184855e6 540 /**
aa9eede8 541 * The worker type.
184855e6
JB
542 */
543 protected abstract get worker (): WorkerType
544
c2ade475 545 /**
aa9eede8 546 * The pool minimum size.
c2ade475 547 */
8735b4e5
JB
548 protected get minSize (): number {
549 return this.numberOfWorkers
550 }
ff733df7
JB
551
552 /**
aa9eede8 553 * The pool maximum size.
ff733df7 554 */
8735b4e5
JB
555 protected get maxSize (): number {
556 return this.max ?? this.numberOfWorkers
557 }
a35560ba 558
6b813701
JB
559 /**
560 * Checks if the worker id sent in the received message from a worker is valid.
561 *
562 * @param message - The received message.
563 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
564 */
21f710aa 565 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
566 if (message.workerId == null) {
567 throw new Error('Worker message received without worker id')
568 } else if (
21f710aa 569 message.workerId != null &&
aad6fb64 570 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
571 ) {
572 throw new Error(
573 `Worker message received from unknown worker '${message.workerId}'`
574 )
575 }
576 }
577
ffcbbad8 578 /**
f06e48d8 579 * Gets the given worker its worker node key.
ffcbbad8
JB
580 *
581 * @param worker - The worker.
f59e1027 582 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 583 */
aad6fb64 584 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 585 return this.workerNodes.findIndex(
041dc05b 586 workerNode => workerNode.worker === worker
f06e48d8 587 )
bf9549ae
JB
588 }
589
aa9eede8
JB
590 /**
591 * Gets the worker node key given its worker id.
592 *
593 * @param workerId - The worker id.
aad6fb64 594 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 595 */
aad6fb64
JB
596 private getWorkerNodeKeyByWorkerId (workerId: number): number {
597 return this.workerNodes.findIndex(
041dc05b 598 workerNode => workerNode.info.id === workerId
aad6fb64 599 )
aa9eede8
JB
600 }
601
afc003b2 602 /** @inheritDoc */
a35560ba 603 public setWorkerChoiceStrategy (
59219cbb
JB
604 workerChoiceStrategy: WorkerChoiceStrategy,
605 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 606 ): void {
aee46736 607 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 608 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
609 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
610 this.opts.workerChoiceStrategy
611 )
612 if (workerChoiceStrategyOptions != null) {
613 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
614 }
aa9eede8 615 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 616 workerNode.resetUsage()
9edb9717 617 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 618 }
a20f0ba5
JB
619 }
620
621 /** @inheritDoc */
622 public setWorkerChoiceStrategyOptions (
623 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
624 ): void {
0d80593b 625 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
626 this.opts.workerChoiceStrategyOptions = {
627 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
628 ...workerChoiceStrategyOptions
629 }
a20f0ba5
JB
630 this.workerChoiceStrategyContext.setOptions(
631 this.opts.workerChoiceStrategyOptions
a35560ba
S
632 )
633 }
634
a20f0ba5 635 /** @inheritDoc */
8f52842f
JB
636 public enableTasksQueue (
637 enable: boolean,
638 tasksQueueOptions?: TasksQueueOptions
639 ): void {
a20f0ba5 640 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
641 this.unsetTaskStealing()
642 this.unsetTasksStealingOnBackPressure()
ef41a6e6 643 this.flushTasksQueues()
a20f0ba5
JB
644 }
645 this.opts.enableTasksQueue = enable
8f52842f 646 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
647 }
648
649 /** @inheritDoc */
8f52842f 650 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 651 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
652 this.checkValidTasksQueueOptions(tasksQueueOptions)
653 this.opts.tasksQueueOptions =
654 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 655 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
656 if (this.opts.tasksQueueOptions.taskStealing === true) {
657 this.setTaskStealing()
658 } else {
659 this.unsetTaskStealing()
660 }
661 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
662 this.setTasksStealingOnBackPressure()
663 } else {
664 this.unsetTasksStealingOnBackPressure()
665 }
5baee0d7 666 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
667 delete this.opts.tasksQueueOptions
668 }
669 }
670
5b49e864 671 private setTasksQueueSize (size: number): void {
20c6f652 672 for (const workerNode of this.workerNodes) {
ff3f866a 673 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
674 }
675 }
676
d6ca1416
JB
677 private setTaskStealing (): void {
678 for (const [workerNodeKey] of this.workerNodes.entries()) {
679 this.workerNodes[workerNodeKey].onEmptyQueue =
680 this.taskStealingOnEmptyQueue.bind(this)
681 }
682 }
683
684 private unsetTaskStealing (): void {
685 for (const [workerNodeKey] of this.workerNodes.entries()) {
686 delete this.workerNodes[workerNodeKey].onEmptyQueue
687 }
688 }
689
690 private setTasksStealingOnBackPressure (): void {
691 for (const [workerNodeKey] of this.workerNodes.entries()) {
692 this.workerNodes[workerNodeKey].onBackPressure =
693 this.tasksStealingOnBackPressure.bind(this)
694 }
695 }
696
697 private unsetTasksStealingOnBackPressure (): void {
698 for (const [workerNodeKey] of this.workerNodes.entries()) {
699 delete this.workerNodes[workerNodeKey].onBackPressure
700 }
701 }
702
a20f0ba5
JB
703 private buildTasksQueueOptions (
704 tasksQueueOptions: TasksQueueOptions
705 ): TasksQueueOptions {
706 return {
20c6f652 707 ...{
ff3f866a 708 size: Math.pow(this.maxSize, 2),
47352846 709 concurrency: 1,
dbd73092 710 taskStealing: true,
47352846 711 tasksStealingOnBackPressure: true
20c6f652
JB
712 },
713 ...tasksQueueOptions
a20f0ba5
JB
714 }
715 }
716
c319c66b
JB
717 /**
718 * Whether the pool is full or not.
719 *
720 * The pool filling boolean status.
721 */
dea903a8
JB
722 protected get full (): boolean {
723 return this.workerNodes.length >= this.maxSize
724 }
c2ade475 725
c319c66b
JB
726 /**
727 * Whether the pool is busy or not.
728 *
729 * The pool busyness boolean status.
730 */
731 protected abstract get busy (): boolean
7c0ba920 732
6c6afb84 733 /**
3d76750a 734 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
735 *
736 * @returns Worker nodes busyness boolean status.
737 */
c2ade475 738 protected internalBusy (): boolean {
3d76750a
JB
739 if (this.opts.enableTasksQueue === true) {
740 return (
741 this.workerNodes.findIndex(
041dc05b 742 workerNode =>
3d76750a
JB
743 workerNode.info.ready &&
744 workerNode.usage.tasks.executing <
745 (this.opts.tasksQueueOptions?.concurrency as number)
746 ) === -1
747 )
3d76750a 748 }
9d21ee7f
JB
749 return (
750 this.workerNodes.findIndex(
751 workerNode =>
752 workerNode.info.ready && workerNode.usage.tasks.executing === 0
753 ) === -1
754 )
cb70b19d
JB
755 }
756
90d7d101
JB
757 /** @inheritDoc */
758 public listTaskFunctions (): string[] {
f2dbbf95
JB
759 for (const workerNode of this.workerNodes) {
760 if (
761 Array.isArray(workerNode.info.taskFunctions) &&
762 workerNode.info.taskFunctions.length > 0
763 ) {
764 return workerNode.info.taskFunctions
765 }
90d7d101 766 }
f2dbbf95 767 return []
90d7d101
JB
768 }
769
375f7504
JB
770 private shallExecuteTask (workerNodeKey: number): boolean {
771 return (
772 this.tasksQueueSize(workerNodeKey) === 0 &&
773 this.workerNodes[workerNodeKey].usage.tasks.executing <
774 (this.opts.tasksQueueOptions?.concurrency as number)
775 )
776 }
777
afc003b2 778 /** @inheritDoc */
7d91a8cd
JB
779 public async execute (
780 data?: Data,
781 name?: string,
782 transferList?: TransferListItem[]
783 ): Promise<Response> {
52b71763 784 return await new Promise<Response>((resolve, reject) => {
15b176e0 785 if (!this.started) {
47352846 786 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 787 return
15b176e0 788 }
7d91a8cd
JB
789 if (name != null && typeof name !== 'string') {
790 reject(new TypeError('name argument must be a string'))
9d2d0da1 791 return
7d91a8cd 792 }
90d7d101
JB
793 if (
794 name != null &&
795 typeof name === 'string' &&
796 name.trim().length === 0
797 ) {
f58b60b9 798 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 799 return
90d7d101 800 }
b558f6b5
JB
801 if (transferList != null && !Array.isArray(transferList)) {
802 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 803 return
b558f6b5
JB
804 }
805 const timestamp = performance.now()
806 const workerNodeKey = this.chooseWorkerNode()
501aea93 807 const task: Task<Data> = {
52b71763
JB
808 name: name ?? DEFAULT_TASK_NAME,
809 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
810 data: data ?? ({} as Data),
7d91a8cd 811 transferList,
52b71763 812 timestamp,
1a28f967 813 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 814 taskId: randomUUID()
52b71763 815 }
7629bdf1 816 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
817 resolve,
818 reject,
501aea93 819 workerNodeKey
2e81254d 820 })
52b71763 821 if (
4e377863
JB
822 this.opts.enableTasksQueue === false ||
823 (this.opts.enableTasksQueue === true &&
375f7504 824 this.shallExecuteTask(workerNodeKey))
52b71763 825 ) {
501aea93 826 this.executeTask(workerNodeKey, task)
4e377863
JB
827 } else {
828 this.enqueueTask(workerNodeKey, task)
52b71763 829 }
2e81254d 830 })
280c2a77 831 }
c97c7edb 832
47352846
JB
833 /** @inheritdoc */
834 public start (): void {
835 this.starting = true
836 while (
837 this.workerNodes.reduce(
838 (accumulator, workerNode) =>
839 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
840 0
841 ) < this.numberOfWorkers
842 ) {
843 this.createAndSetupWorkerNode()
844 }
845 this.starting = false
846 this.started = true
847 }
848
afc003b2 849 /** @inheritDoc */
c97c7edb 850 public async destroy (): Promise<void> {
1fbcaa7c 851 await Promise.all(
81c02522 852 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 853 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
854 })
855 )
33e6bb4c 856 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 857 this.started = false
c97c7edb
S
858 }
859
1e3214b6
JB
860 protected async sendKillMessageToWorker (
861 workerNodeKey: number,
862 workerId: number
863 ): Promise<void> {
9edb9717 864 await new Promise<void>((resolve, reject) => {
041dc05b 865 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
866 if (message.kill === 'success') {
867 resolve()
868 } else if (message.kill === 'failure') {
e1af34e6 869 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
870 }
871 })
9edb9717 872 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 873 })
1e3214b6
JB
874 }
875
4a6952ff 876 /**
aa9eede8 877 * Terminates the worker node given its worker node key.
4a6952ff 878 *
aa9eede8 879 * @param workerNodeKey - The worker node key.
4a6952ff 880 */
81c02522 881 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 882
729c563d 883 /**
6677a3d3
JB
884 * Setup hook to execute code before worker nodes are created in the abstract constructor.
885 * Can be overridden.
afc003b2
JB
886 *
887 * @virtual
729c563d 888 */
280c2a77 889 protected setupHook (): void {
965df41c 890 /* Intentionally empty */
280c2a77 891 }
c97c7edb 892
729c563d 893 /**
280c2a77
S
894 * Should return whether the worker is the main worker or not.
895 */
896 protected abstract isMain (): boolean
897
898 /**
2e81254d 899 * Hook executed before the worker task execution.
bf9549ae 900 * Can be overridden.
729c563d 901 *
f06e48d8 902 * @param workerNodeKey - The worker node key.
1c6fe997 903 * @param task - The task to execute.
729c563d 904 */
1c6fe997
JB
905 protected beforeTaskExecutionHook (
906 workerNodeKey: number,
907 task: Task<Data>
908 ): void {
94407def
JB
909 if (this.workerNodes[workerNodeKey]?.usage != null) {
910 const workerUsage = this.workerNodes[workerNodeKey].usage
911 ++workerUsage.tasks.executing
912 this.updateWaitTimeWorkerUsage(workerUsage, task)
913 }
914 if (
915 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
916 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
917 task.name as string
918 ) != null
919 ) {
db0e38ee 920 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 921 workerNodeKey
db0e38ee 922 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
923 ++taskFunctionWorkerUsage.tasks.executing
924 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 925 }
c97c7edb
S
926 }
927
c01733f1 928 /**
2e81254d 929 * Hook executed after the worker task execution.
bf9549ae 930 * Can be overridden.
c01733f1 931 *
501aea93 932 * @param workerNodeKey - The worker node key.
38e795c1 933 * @param message - The received message.
c01733f1 934 */
2e81254d 935 protected afterTaskExecutionHook (
501aea93 936 workerNodeKey: number,
2740a743 937 message: MessageValue<Response>
bf9549ae 938 ): void {
94407def
JB
939 if (this.workerNodes[workerNodeKey]?.usage != null) {
940 const workerUsage = this.workerNodes[workerNodeKey].usage
941 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
942 this.updateRunTimeWorkerUsage(workerUsage, message)
943 this.updateEluWorkerUsage(workerUsage, message)
944 }
945 if (
946 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
947 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 948 message.taskPerformance?.name as string
94407def
JB
949 ) != null
950 ) {
db0e38ee 951 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 952 workerNodeKey
db0e38ee 953 ].getTaskFunctionWorkerUsage(
0628755c 954 message.taskPerformance?.name as string
b558f6b5 955 ) as WorkerUsage
db0e38ee
JB
956 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
957 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
958 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
959 }
960 }
961
db0e38ee
JB
962 /**
963 * Whether the worker node shall update its task function worker usage or not.
964 *
965 * @param workerNodeKey - The worker node key.
966 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
967 */
968 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 969 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 970 return (
94407def 971 workerInfo != null &&
a5d15204 972 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 973 workerInfo.taskFunctions.length > 2
b558f6b5 974 )
f1c06930
JB
975 }
976
977 private updateTaskStatisticsWorkerUsage (
978 workerUsage: WorkerUsage,
979 message: MessageValue<Response>
980 ): void {
a4e07f72 981 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
982 if (
983 workerTaskStatistics.executing != null &&
984 workerTaskStatistics.executing > 0
985 ) {
986 --workerTaskStatistics.executing
5bb5be17 987 }
98e72cda
JB
988 if (message.taskError == null) {
989 ++workerTaskStatistics.executed
990 } else {
a4e07f72 991 ++workerTaskStatistics.failed
2740a743 992 }
f8eb0a2a
JB
993 }
994
a4e07f72
JB
995 private updateRunTimeWorkerUsage (
996 workerUsage: WorkerUsage,
f8eb0a2a
JB
997 message: MessageValue<Response>
998 ): void {
dc021bcc
JB
999 if (message.taskError != null) {
1000 return
1001 }
e4f20deb
JB
1002 updateMeasurementStatistics(
1003 workerUsage.runTime,
1004 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1005 message.taskPerformance?.runTime ?? 0
e4f20deb 1006 )
f8eb0a2a
JB
1007 }
1008
a4e07f72
JB
1009 private updateWaitTimeWorkerUsage (
1010 workerUsage: WorkerUsage,
1c6fe997 1011 task: Task<Data>
f8eb0a2a 1012 ): void {
1c6fe997
JB
1013 const timestamp = performance.now()
1014 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1015 updateMeasurementStatistics(
1016 workerUsage.waitTime,
1017 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1018 taskWaitTime
e4f20deb 1019 )
c01733f1 1020 }
1021
a4e07f72 1022 private updateEluWorkerUsage (
5df69fab 1023 workerUsage: WorkerUsage,
62c15a68
JB
1024 message: MessageValue<Response>
1025 ): void {
dc021bcc
JB
1026 if (message.taskError != null) {
1027 return
1028 }
008512c7
JB
1029 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1030 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1031 updateMeasurementStatistics(
1032 workerUsage.elu.active,
008512c7 1033 eluTaskStatisticsRequirements,
dc021bcc 1034 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1035 )
1036 updateMeasurementStatistics(
1037 workerUsage.elu.idle,
008512c7 1038 eluTaskStatisticsRequirements,
dc021bcc 1039 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1040 )
008512c7 1041 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1042 if (message.taskPerformance?.elu != null) {
f7510105
JB
1043 if (workerUsage.elu.utilization != null) {
1044 workerUsage.elu.utilization =
1045 (workerUsage.elu.utilization +
1046 message.taskPerformance.elu.utilization) /
1047 2
1048 } else {
1049 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1050 }
62c15a68
JB
1051 }
1052 }
1053 }
1054
280c2a77 1055 /**
f06e48d8 1056 * Chooses a worker node for the next task.
280c2a77 1057 *
6c6afb84 1058 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1059 *
aa9eede8 1060 * @returns The chosen worker node key
280c2a77 1061 */
6c6afb84 1062 private chooseWorkerNode (): number {
930dcf12 1063 if (this.shallCreateDynamicWorker()) {
aa9eede8 1064 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1065 if (
b1aae695 1066 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1067 ) {
aa9eede8 1068 return workerNodeKey
6c6afb84 1069 }
17393ac8 1070 }
930dcf12
JB
1071 return this.workerChoiceStrategyContext.execute()
1072 }
1073
6c6afb84
JB
1074 /**
1075 * Conditions for dynamic worker creation.
1076 *
1077 * @returns Whether to create a dynamic worker or not.
1078 */
1079 private shallCreateDynamicWorker (): boolean {
930dcf12 1080 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1081 }
1082
280c2a77 1083 /**
aa9eede8 1084 * Sends a message to worker given its worker node key.
280c2a77 1085 *
aa9eede8 1086 * @param workerNodeKey - The worker node key.
38e795c1 1087 * @param message - The message.
7d91a8cd 1088 * @param transferList - The optional array of transferable objects.
280c2a77
S
1089 */
1090 protected abstract sendToWorker (
aa9eede8 1091 workerNodeKey: number,
7d91a8cd
JB
1092 message: MessageValue<Data>,
1093 transferList?: TransferListItem[]
280c2a77
S
1094 ): void
1095
729c563d 1096 /**
41344292 1097 * Creates a new worker.
6c6afb84
JB
1098 *
1099 * @returns Newly created worker.
729c563d 1100 */
280c2a77 1101 protected abstract createWorker (): Worker
c97c7edb 1102
4a6952ff 1103 /**
aa9eede8 1104 * Creates a new, completely set up worker node.
4a6952ff 1105 *
aa9eede8 1106 * @returns New, completely set up worker node key.
4a6952ff 1107 */
aa9eede8 1108 protected createAndSetupWorkerNode (): number {
bdacc2d2 1109 const worker = this.createWorker()
280c2a77 1110
fd04474e 1111 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1112 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1113 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1114 worker.on('error', error => {
aad6fb64 1115 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1116 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1117 workerInfo.ready = false
0dc838e3 1118 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1119 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1120 if (
1121 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1122 this.started &&
1123 !this.starting
15b176e0 1124 ) {
9b106837 1125 if (workerInfo.dynamic) {
aa9eede8 1126 this.createAndSetupDynamicWorkerNode()
8a1260a3 1127 } else {
aa9eede8 1128 this.createAndSetupWorkerNode()
8a1260a3 1129 }
5baee0d7 1130 }
19dbc45b 1131 if (this.opts.enableTasksQueue === true) {
9b106837 1132 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1133 }
5baee0d7 1134 })
a35560ba 1135 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1136 worker.once('exit', () => {
f06e48d8 1137 this.removeWorkerNode(worker)
a974afa6 1138 })
280c2a77 1139
aa9eede8 1140 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1141
aa9eede8 1142 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1143
aa9eede8 1144 return workerNodeKey
c97c7edb 1145 }
be0676b3 1146
930dcf12 1147 /**
aa9eede8 1148 * Creates a new, completely set up dynamic worker node.
930dcf12 1149 *
aa9eede8 1150 * @returns New, completely set up dynamic worker node key.
930dcf12 1151 */
aa9eede8
JB
1152 protected createAndSetupDynamicWorkerNode (): number {
1153 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1154 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1155 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1156 message.workerId
aad6fb64 1157 )
aa9eede8 1158 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1159 // Kill message received from worker
930dcf12
JB
1160 if (
1161 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1162 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1163 ((this.opts.enableTasksQueue === false &&
aa9eede8 1164 workerUsage.tasks.executing === 0) ||
7b56f532 1165 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1166 workerUsage.tasks.executing === 0 &&
1167 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1168 ) {
041dc05b 1169 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1170 this.emitter?.emit(PoolEvents.error, error)
1171 })
930dcf12
JB
1172 }
1173 })
46b0bb09 1174 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1175 this.sendToWorker(workerNodeKey, {
b0a4db63 1176 checkActive: true,
21f710aa
JB
1177 workerId: workerInfo.id as number
1178 })
b5e113f6 1179 workerInfo.dynamic = true
b1aae695
JB
1180 if (
1181 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1182 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1183 ) {
b5e113f6
JB
1184 workerInfo.ready = true
1185 }
33e6bb4c 1186 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1187 return workerNodeKey
930dcf12
JB
1188 }
1189
a2ed5053 1190 /**
aa9eede8 1191 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1192 *
aa9eede8 1193 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1194 * @param listener - The message listener callback.
1195 */
85aeb3f3
JB
1196 protected abstract registerWorkerMessageListener<
1197 Message extends Data | Response
aa9eede8
JB
1198 >(
1199 workerNodeKey: number,
1200 listener: (message: MessageValue<Message>) => void
1201 ): void
a2ed5053
JB
1202
1203 /**
aa9eede8 1204 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1205 * Can be overridden.
1206 *
aa9eede8 1207 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1208 */
aa9eede8 1209 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1210 // Listen to worker messages.
aa9eede8 1211 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1212 // Send the startup message to worker.
aa9eede8 1213 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1214 // Send the statistics message to worker.
1215 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1216 if (this.opts.enableTasksQueue === true) {
dbd73092 1217 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1218 this.workerNodes[workerNodeKey].onEmptyQueue =
1219 this.taskStealingOnEmptyQueue.bind(this)
1220 }
1221 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1222 this.workerNodes[workerNodeKey].onBackPressure =
1223 this.tasksStealingOnBackPressure.bind(this)
1224 }
72695f86 1225 }
d2c73f82
JB
1226 }
1227
85aeb3f3 1228 /**
aa9eede8
JB
1229 * Sends the startup message to worker given its worker node key.
1230 *
1231 * @param workerNodeKey - The worker node key.
1232 */
1233 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1234
1235 /**
9edb9717 1236 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1237 *
aa9eede8 1238 * @param workerNodeKey - The worker node key.
85aeb3f3 1239 */
9edb9717 1240 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1241 this.sendToWorker(workerNodeKey, {
1242 statistics: {
1243 runTime:
1244 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1245 .runTime.aggregate,
1246 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1247 .elu.aggregate
1248 },
46b0bb09 1249 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1250 })
1251 }
a2ed5053
JB
1252
1253 private redistributeQueuedTasks (workerNodeKey: number): void {
1254 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1255 const destinationWorkerNodeKey = this.workerNodes.reduce(
1256 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1257 return workerNode.info.ready &&
1258 workerNode.usage.tasks.queued <
1259 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1260 ? workerNodeKey
1261 : minWorkerNodeKey
1262 },
1263 0
1264 )
3f690f25
JB
1265 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1266 const task = {
1267 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1268 workerId: destinationWorkerNode.info.id as number
1269 }
1270 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1271 this.executeTask(destinationWorkerNodeKey, task)
1272 } else {
1273 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1274 }
1275 }
1276 }
1277
b1838604
JB
1278 private updateTaskStolenStatisticsWorkerUsage (
1279 workerNodeKey: number,
b1838604
JB
1280 taskName: string
1281 ): void {
1a880eca 1282 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1283 if (workerNode?.usage != null) {
1284 ++workerNode.usage.tasks.stolen
1285 }
1286 if (
1287 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1288 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1289 ) {
1290 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1291 taskName
1292 ) as WorkerUsage
1293 ++taskFunctionWorkerUsage.tasks.stolen
1294 }
1295 }
1296
dd951876 1297 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1298 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1299 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1300 const workerNodes = this.workerNodes
a6b3272b 1301 .slice()
dd951876
JB
1302 .sort(
1303 (workerNodeA, workerNodeB) =>
1304 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1305 )
f201a0cd 1306 const sourceWorkerNode = workerNodes.find(
041dc05b 1307 workerNode =>
f201a0cd
JB
1308 workerNode.info.ready &&
1309 workerNode.info.id !== workerId &&
1310 workerNode.usage.tasks.queued > 0
1311 )
1312 if (sourceWorkerNode != null) {
1313 const task = {
1314 ...(sourceWorkerNode.popTask() as Task<Data>),
1315 workerId: destinationWorkerNode.info.id as number
0bc68267 1316 }
f201a0cd
JB
1317 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1318 this.executeTask(destinationWorkerNodeKey, task)
1319 } else {
1320 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1321 }
f201a0cd
JB
1322 this.updateTaskStolenStatisticsWorkerUsage(
1323 destinationWorkerNodeKey,
1324 task.name as string
1325 )
72695f86
JB
1326 }
1327 }
1328
1329 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1330 const sizeOffset = 1
1331 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1332 return
1333 }
72695f86
JB
1334 const sourceWorkerNode =
1335 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1336 const workerNodes = this.workerNodes
a6b3272b 1337 .slice()
72695f86
JB
1338 .sort(
1339 (workerNodeA, workerNodeB) =>
1340 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1341 )
1342 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1343 if (
0bc68267 1344 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1345 workerNode.info.ready &&
1346 workerNode.info.id !== workerId &&
0bc68267 1347 workerNode.usage.tasks.queued <
f778c355 1348 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1349 ) {
dd951876
JB
1350 const task = {
1351 ...(sourceWorkerNode.popTask() as Task<Data>),
1352 workerId: workerNode.info.id as number
1353 }
375f7504 1354 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1355 this.executeTask(workerNodeKey, task)
4de3d785 1356 } else {
dd951876 1357 this.enqueueTask(workerNodeKey, task)
4de3d785 1358 }
b1838604
JB
1359 this.updateTaskStolenStatisticsWorkerUsage(
1360 workerNodeKey,
b1838604
JB
1361 task.name as string
1362 )
10ecf8fd 1363 }
a2ed5053
JB
1364 }
1365 }
1366
be0676b3 1367 /**
aa9eede8 1368 * This method is the listener registered for each worker message.
be0676b3 1369 *
bdacc2d2 1370 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1371 */
1372 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1373 return message => {
21f710aa 1374 this.checkMessageWorkerId(message)
a5d15204 1375 if (message.ready != null && message.taskFunctions != null) {
81c02522 1376 // Worker ready response received from worker
10e2aa7e 1377 this.handleWorkerReadyResponse(message)
7629bdf1 1378 } else if (message.taskId != null) {
81c02522 1379 // Task execution response received from worker
6b272951 1380 this.handleTaskExecutionResponse(message)
90d7d101
JB
1381 } else if (message.taskFunctions != null) {
1382 // Task functions message received from worker
46b0bb09
JB
1383 this.getWorkerInfo(
1384 this.getWorkerNodeKeyByWorkerId(message.workerId)
b558f6b5 1385 ).taskFunctions = message.taskFunctions
6b272951
JB
1386 }
1387 }
1388 }
1389
10e2aa7e 1390 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1391 if (message.ready === false) {
1392 throw new Error(`Worker ${message.workerId} failed to initialize`)
1393 }
a5d15204 1394 const workerInfo = this.getWorkerInfo(
aad6fb64 1395 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1396 )
a5d15204
JB
1397 workerInfo.ready = message.ready as boolean
1398 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1399 if (this.emitter != null && this.ready) {
1400 this.emitter.emit(PoolEvents.ready, this.info)
1401 }
6b272951
JB
1402 }
1403
1404 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1405 const { taskId, taskError, data } = message
1406 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1407 if (promiseResponse != null) {
5441aea6
JB
1408 if (taskError != null) {
1409 this.emitter?.emit(PoolEvents.taskError, taskError)
1410 promiseResponse.reject(taskError.message)
6b272951 1411 } else {
5441aea6 1412 promiseResponse.resolve(data as Response)
6b272951 1413 }
501aea93
JB
1414 const workerNodeKey = promiseResponse.workerNodeKey
1415 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1416 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1417 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1418 if (
1419 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1420 this.tasksQueueSize(workerNodeKey) > 0 &&
1421 this.workerNodes[workerNodeKey].usage.tasks.executing <
1422 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1423 ) {
1424 this.executeTask(
1425 workerNodeKey,
1426 this.dequeueTask(workerNodeKey) as Task<Data>
1427 )
be0676b3
APA
1428 }
1429 }
be0676b3 1430 }
7c0ba920 1431
a1763c54 1432 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1433 if (this.busy) {
1434 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1435 }
1436 }
1437
1438 private checkAndEmitTaskQueuingEvents (): void {
1439 if (this.hasBackPressure()) {
1440 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1441 }
1442 }
1443
33e6bb4c
JB
1444 private checkAndEmitDynamicWorkerCreationEvents (): void {
1445 if (this.type === PoolTypes.dynamic) {
1446 if (this.full) {
1447 this.emitter?.emit(PoolEvents.full, this.info)
1448 }
1449 }
1450 }
1451
8a1260a3 1452 /**
aa9eede8 1453 * Gets the worker information given its worker node key.
8a1260a3
JB
1454 *
1455 * @param workerNodeKey - The worker node key.
3f09ed9f 1456 * @returns The worker information.
8a1260a3 1457 */
46b0bb09
JB
1458 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1459 return this.workerNodes[workerNodeKey].info
e221309a
JB
1460 }
1461
a05c10de 1462 /**
b0a4db63 1463 * Adds the given worker in the pool worker nodes.
ea7a90d3 1464 *
38e795c1 1465 * @param worker - The worker.
aa9eede8
JB
1466 * @returns The added worker node key.
1467 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1468 */
b0a4db63 1469 private addWorkerNode (worker: Worker): number {
671d5154
JB
1470 const workerNode = new WorkerNode<Worker, Data>(
1471 worker,
ff3f866a 1472 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1473 )
b97d82d8 1474 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1475 if (this.starting) {
1476 workerNode.info.ready = true
1477 }
aa9eede8 1478 this.workerNodes.push(workerNode)
aad6fb64 1479 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1480 if (workerNodeKey === -1) {
86ed0598 1481 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1482 }
1483 return workerNodeKey
ea7a90d3 1484 }
c923ce56 1485
51fe3d3c 1486 /**
f06e48d8 1487 * Removes the given worker from the pool worker nodes.
51fe3d3c 1488 *
f06e48d8 1489 * @param worker - The worker.
51fe3d3c 1490 */
416fd65c 1491 private removeWorkerNode (worker: Worker): void {
aad6fb64 1492 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1493 if (workerNodeKey !== -1) {
1494 this.workerNodes.splice(workerNodeKey, 1)
1495 this.workerChoiceStrategyContext.remove(workerNodeKey)
1496 }
51fe3d3c 1497 }
adc3c320 1498
e2b31e32
JB
1499 /** @inheritDoc */
1500 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1501 return (
e2b31e32
JB
1502 this.opts.enableTasksQueue === true &&
1503 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1504 )
1505 }
1506
1507 private hasBackPressure (): boolean {
1508 return (
1509 this.opts.enableTasksQueue === true &&
1510 this.workerNodes.findIndex(
041dc05b 1511 workerNode => !workerNode.hasBackPressure()
a1763c54 1512 ) === -1
9e844245 1513 )
e2b31e32
JB
1514 }
1515
b0a4db63 1516 /**
aa9eede8 1517 * Executes the given task on the worker given its worker node key.
b0a4db63 1518 *
aa9eede8 1519 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1520 * @param task - The task to execute.
1521 */
2e81254d 1522 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1523 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1524 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1525 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1526 }
1527
f9f00b5f 1528 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1529 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1530 this.checkAndEmitTaskQueuingEvents()
1531 return tasksQueueSize
adc3c320
JB
1532 }
1533
416fd65c 1534 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1535 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1536 }
1537
416fd65c 1538 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1539 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1540 }
1541
81c02522 1542 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1543 while (this.tasksQueueSize(workerNodeKey) > 0) {
1544 this.executeTask(
1545 workerNodeKey,
1546 this.dequeueTask(workerNodeKey) as Task<Data>
1547 )
ff733df7 1548 }
4b628b48 1549 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1550 }
1551
ef41a6e6
JB
1552 private flushTasksQueues (): void {
1553 for (const [workerNodeKey] of this.workerNodes.entries()) {
1554 this.flushTasksQueue(workerNodeKey)
1555 }
1556 }
c97c7edb 1557}