refactor: code cleanups
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
075e51d1 102 /**
adc9cc64 103 * Whether the pool is starting or not.
075e51d1
JB
104 */
105 private readonly starting: boolean
15b176e0
JB
106 /**
107 * Whether the pool is started or not.
108 */
109 private started: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
075e51d1 157 this.starting = true
e761c033 158 this.startPool()
075e51d1 159 this.starting = false
15b176e0 160 this.started = true
afe0d5bf
JB
161
162 this.startTimestamp = performance.now()
c97c7edb
S
163 }
164
a35560ba 165 private checkFilePath (filePath: string): void {
ffcbbad8
JB
166 if (
167 filePath == null ||
3d6dd312 168 typeof filePath !== 'string' ||
ffcbbad8
JB
169 (typeof filePath === 'string' && filePath.trim().length === 0)
170 ) {
c510fea7
APA
171 throw new Error('Please specify a file with a worker implementation')
172 }
3d6dd312
JB
173 if (!existsSync(filePath)) {
174 throw new Error(`Cannot find the worker file '${filePath}'`)
175 }
c510fea7
APA
176 }
177
8d3782fa
JB
178 private checkNumberOfWorkers (numberOfWorkers: number): void {
179 if (numberOfWorkers == null) {
180 throw new Error(
181 'Cannot instantiate a pool without specifying the number of workers'
182 )
78cea37e 183 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 184 throw new TypeError(
0d80593b 185 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
186 )
187 } else if (numberOfWorkers < 0) {
473c717a 188 throw new RangeError(
8d3782fa
JB
189 'Cannot instantiate a pool with a negative number of workers'
190 )
6b27d407 191 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
192 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
193 }
194 }
195
196 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 197 if (this.type === PoolTypes.dynamic) {
a5ed75b7 198 if (max == null) {
e695d66f 199 throw new TypeError(
a5ed75b7
JB
200 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
201 )
202 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
203 throw new TypeError(
204 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
205 )
206 } else if (min > max) {
079de991
JB
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
209 )
b97d82d8 210 } else if (max === 0) {
079de991 211 throw new RangeError(
d640b48b 212 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
213 )
214 } else if (min === max) {
215 throw new RangeError(
216 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
217 )
218 }
8d3782fa
JB
219 }
220 }
221
7c0ba920 222 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
223 if (isPlainObject(opts)) {
224 this.opts.workerChoiceStrategy =
225 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
226 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
227 this.opts.workerChoiceStrategyOptions = {
228 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
229 ...opts.workerChoiceStrategyOptions
230 }
49be33fe
JB
231 this.checkValidWorkerChoiceStrategyOptions(
232 this.opts.workerChoiceStrategyOptions
233 )
1f68cede 234 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
235 this.opts.enableEvents = opts.enableEvents ?? true
236 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
237 if (this.opts.enableTasksQueue) {
238 this.checkValidTasksQueueOptions(
239 opts.tasksQueueOptions as TasksQueueOptions
240 )
241 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
242 opts.tasksQueueOptions as TasksQueueOptions
243 )
244 }
245 } else {
246 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 247 }
aee46736
JB
248 }
249
250 private checkValidWorkerChoiceStrategy (
251 workerChoiceStrategy: WorkerChoiceStrategy
252 ): void {
253 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 254 throw new Error(
aee46736 255 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
256 )
257 }
7c0ba920
JB
258 }
259
0d80593b
JB
260 private checkValidWorkerChoiceStrategyOptions (
261 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
262 ): void {
263 if (!isPlainObject(workerChoiceStrategyOptions)) {
264 throw new TypeError(
265 'Invalid worker choice strategy options: must be a plain object'
266 )
267 }
8990357d 268 if (
8c0b113f
JB
269 workerChoiceStrategyOptions.retries != null &&
270 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
271 ) {
272 throw new TypeError(
8c0b113f 273 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
274 )
275 }
276 if (
8c0b113f
JB
277 workerChoiceStrategyOptions.retries != null &&
278 workerChoiceStrategyOptions.retries < 0
8990357d
JB
279 ) {
280 throw new RangeError(
8c0b113f 281 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
282 )
283 }
49be33fe
JB
284 if (
285 workerChoiceStrategyOptions.weights != null &&
6b27d407 286 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
287 ) {
288 throw new Error(
289 'Invalid worker choice strategy options: must have a weight for each worker node'
290 )
291 }
f0d7f803
JB
292 if (
293 workerChoiceStrategyOptions.measurement != null &&
294 !Object.values(Measurements).includes(
295 workerChoiceStrategyOptions.measurement
296 )
297 ) {
298 throw new Error(
299 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
300 )
301 }
0d80593b
JB
302 }
303
a20f0ba5 304 private checkValidTasksQueueOptions (
76d91ea0 305 tasksQueueOptions: TasksQueueOptions
a20f0ba5 306 ): void {
0d80593b
JB
307 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
308 throw new TypeError('Invalid tasks queue options: must be a plain object')
309 }
f0d7f803 310 if (
b7d085c4
JB
311 tasksQueueOptions?.concurrency != null &&
312 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
313 ) {
314 throw new TypeError(
20c6f652 315 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
316 )
317 }
318 if (
b7d085c4
JB
319 tasksQueueOptions?.concurrency != null &&
320 tasksQueueOptions?.concurrency <= 0
f0d7f803 321 ) {
e695d66f 322 throw new RangeError(
b7d085c4 323 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
324 )
325 }
b7d085c4 326 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 327 throw new Error(
68dbcdc0 328 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
329 )
330 }
331 if (
b7d085c4
JB
332 tasksQueueOptions?.size != null &&
333 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 334 ) {
ff3f866a 335 throw new TypeError(
68dbcdc0 336 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
337 )
338 }
b7d085c4 339 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 340 throw new RangeError(
b7d085c4 341 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
342 )
343 }
344 }
345
e761c033
JB
346 private startPool (): void {
347 while (
348 this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
351 0
352 ) < this.numberOfWorkers
353 ) {
aa9eede8 354 this.createAndSetupWorkerNode()
e761c033
JB
355 }
356 }
357
08f3f44c 358 /** @inheritDoc */
6b27d407
JB
359 public get info (): PoolInfo {
360 return {
23ccf9d7 361 version,
6b27d407 362 type: this.type,
184855e6 363 worker: this.worker,
2431bdb4
JB
364 ready: this.ready,
365 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
366 minSize: this.minSize,
367 maxSize: this.maxSize,
c05f0d50
JB
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.aggregate &&
1305e9a8
JB
370 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
371 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
372 workerNodes: this.workerNodes.length,
373 idleWorkerNodes: this.workerNodes.reduce(
374 (accumulator, workerNode) =>
f59e1027 375 workerNode.usage.tasks.executing === 0
a4e07f72
JB
376 ? accumulator + 1
377 : accumulator,
6b27d407
JB
378 0
379 ),
380 busyWorkerNodes: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
f59e1027 382 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
383 0
384 ),
a4e07f72 385 executedTasks: this.workerNodes.reduce(
6b27d407 386 (accumulator, workerNode) =>
f59e1027 387 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
388 0
389 ),
390 executingTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
f59e1027 392 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
393 0
394 ),
daf86646
JB
395 ...(this.opts.enableTasksQueue === true && {
396 queuedTasks: this.workerNodes.reduce(
397 (accumulator, workerNode) =>
398 accumulator + workerNode.usage.tasks.queued,
399 0
400 )
401 }),
402 ...(this.opts.enableTasksQueue === true && {
403 maxQueuedTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
406 0
407 )
408 }),
a1763c54
JB
409 ...(this.opts.enableTasksQueue === true && {
410 backPressure: this.hasBackPressure()
411 }),
68cbdc84
JB
412 ...(this.opts.enableTasksQueue === true && {
413 stolenTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
415 accumulator + workerNode.usage.tasks.stolen,
416 0
417 )
418 }),
a4e07f72
JB
419 failedTasks: this.workerNodes.reduce(
420 (accumulator, workerNode) =>
f59e1027 421 accumulator + workerNode.usage.tasks.failed,
a4e07f72 422 0
1dcf8b7b
JB
423 ),
424 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
425 .runTime.aggregate && {
426 runTime: {
98e72cda 427 minimum: round(
90d6701c 428 min(
98e72cda 429 ...this.workerNodes.map(
041dc05b 430 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 431 )
1dcf8b7b
JB
432 )
433 ),
98e72cda 434 maximum: round(
90d6701c 435 max(
98e72cda 436 ...this.workerNodes.map(
041dc05b 437 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 438 )
1dcf8b7b 439 )
98e72cda 440 ),
3baa0837
JB
441 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
442 .runTime.average && {
443 average: round(
444 average(
445 this.workerNodes.reduce<number[]>(
446 (accumulator, workerNode) =>
447 accumulator.concat(workerNode.usage.runTime.history),
448 []
449 )
98e72cda 450 )
dc021bcc 451 )
3baa0837 452 }),
98e72cda
JB
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .runTime.median && {
455 median: round(
456 median(
3baa0837
JB
457 this.workerNodes.reduce<number[]>(
458 (accumulator, workerNode) =>
459 accumulator.concat(workerNode.usage.runTime.history),
460 []
98e72cda
JB
461 )
462 )
463 )
464 })
1dcf8b7b
JB
465 }
466 }),
467 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
468 .waitTime.aggregate && {
469 waitTime: {
98e72cda 470 minimum: round(
90d6701c 471 min(
98e72cda 472 ...this.workerNodes.map(
041dc05b 473 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 474 )
1dcf8b7b
JB
475 )
476 ),
98e72cda 477 maximum: round(
90d6701c 478 max(
98e72cda 479 ...this.workerNodes.map(
041dc05b 480 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 481 )
1dcf8b7b 482 )
98e72cda 483 ),
3baa0837
JB
484 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
485 .waitTime.average && {
486 average: round(
487 average(
488 this.workerNodes.reduce<number[]>(
489 (accumulator, workerNode) =>
490 accumulator.concat(workerNode.usage.waitTime.history),
491 []
492 )
98e72cda 493 )
dc021bcc 494 )
3baa0837 495 }),
98e72cda
JB
496 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
497 .waitTime.median && {
498 median: round(
499 median(
3baa0837
JB
500 this.workerNodes.reduce<number[]>(
501 (accumulator, workerNode) =>
502 accumulator.concat(workerNode.usage.waitTime.history),
503 []
98e72cda
JB
504 )
505 )
506 )
507 })
1dcf8b7b
JB
508 }
509 })
6b27d407
JB
510 }
511 }
08f3f44c 512
aa9eede8
JB
513 /**
514 * The pool readiness boolean status.
515 */
2431bdb4
JB
516 private get ready (): boolean {
517 return (
b97d82d8
JB
518 this.workerNodes.reduce(
519 (accumulator, workerNode) =>
520 !workerNode.info.dynamic && workerNode.info.ready
521 ? accumulator + 1
522 : accumulator,
523 0
524 ) >= this.minSize
2431bdb4
JB
525 )
526 }
527
afe0d5bf 528 /**
aa9eede8 529 * The approximate pool utilization.
afe0d5bf
JB
530 *
531 * @returns The pool utilization.
532 */
533 private get utilization (): number {
8e5ca040 534 const poolTimeCapacity =
fe7d90db 535 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
536 const totalTasksRunTime = this.workerNodes.reduce(
537 (accumulator, workerNode) =>
71514351 538 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
539 0
540 )
541 const totalTasksWaitTime = this.workerNodes.reduce(
542 (accumulator, workerNode) =>
71514351 543 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
544 0
545 )
8e5ca040 546 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
547 }
548
8881ae32 549 /**
aa9eede8 550 * The pool type.
8881ae32
JB
551 *
552 * If it is `'dynamic'`, it provides the `max` property.
553 */
554 protected abstract get type (): PoolType
555
184855e6 556 /**
aa9eede8 557 * The worker type.
184855e6
JB
558 */
559 protected abstract get worker (): WorkerType
560
c2ade475 561 /**
aa9eede8 562 * The pool minimum size.
c2ade475 563 */
8735b4e5
JB
564 protected get minSize (): number {
565 return this.numberOfWorkers
566 }
ff733df7
JB
567
568 /**
aa9eede8 569 * The pool maximum size.
ff733df7 570 */
8735b4e5
JB
571 protected get maxSize (): number {
572 return this.max ?? this.numberOfWorkers
573 }
a35560ba 574
6b813701
JB
575 /**
576 * Checks if the worker id sent in the received message from a worker is valid.
577 *
578 * @param message - The received message.
579 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
580 */
21f710aa 581 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
582 if (message.workerId == null) {
583 throw new Error('Worker message received without worker id')
584 } else if (
21f710aa 585 message.workerId != null &&
aad6fb64 586 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
587 ) {
588 throw new Error(
589 `Worker message received from unknown worker '${message.workerId}'`
590 )
591 }
592 }
593
ffcbbad8 594 /**
f06e48d8 595 * Gets the given worker its worker node key.
ffcbbad8
JB
596 *
597 * @param worker - The worker.
f59e1027 598 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 599 */
aad6fb64 600 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 601 return this.workerNodes.findIndex(
041dc05b 602 workerNode => workerNode.worker === worker
f06e48d8 603 )
bf9549ae
JB
604 }
605
aa9eede8
JB
606 /**
607 * Gets the worker node key given its worker id.
608 *
609 * @param workerId - The worker id.
aad6fb64 610 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 611 */
72ae84a2 612 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 613 return this.workerNodes.findIndex(
041dc05b 614 workerNode => workerNode.info.id === workerId
aad6fb64 615 )
aa9eede8
JB
616 }
617
afc003b2 618 /** @inheritDoc */
a35560ba 619 public setWorkerChoiceStrategy (
59219cbb
JB
620 workerChoiceStrategy: WorkerChoiceStrategy,
621 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 622 ): void {
aee46736 623 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 624 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
625 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
626 this.opts.workerChoiceStrategy
627 )
628 if (workerChoiceStrategyOptions != null) {
629 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
630 }
aa9eede8 631 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 632 workerNode.resetUsage()
9edb9717 633 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 634 }
a20f0ba5
JB
635 }
636
637 /** @inheritDoc */
638 public setWorkerChoiceStrategyOptions (
639 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
640 ): void {
0d80593b 641 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
642 this.opts.workerChoiceStrategyOptions = {
643 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
644 ...workerChoiceStrategyOptions
645 }
a20f0ba5
JB
646 this.workerChoiceStrategyContext.setOptions(
647 this.opts.workerChoiceStrategyOptions
a35560ba
S
648 )
649 }
650
a20f0ba5 651 /** @inheritDoc */
8f52842f
JB
652 public enableTasksQueue (
653 enable: boolean,
654 tasksQueueOptions?: TasksQueueOptions
655 ): void {
a20f0ba5 656 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 657 this.flushTasksQueues()
a20f0ba5
JB
658 }
659 this.opts.enableTasksQueue = enable
8f52842f 660 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
661 }
662
663 /** @inheritDoc */
8f52842f 664 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 665 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
666 this.checkValidTasksQueueOptions(tasksQueueOptions)
667 this.opts.tasksQueueOptions =
668 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 669 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 670 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
671 delete this.opts.tasksQueueOptions
672 }
673 }
674
5b49e864 675 private setTasksQueueSize (size: number): void {
20c6f652 676 for (const workerNode of this.workerNodes) {
ff3f866a 677 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
678 }
679 }
680
a20f0ba5
JB
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
20c6f652 685 ...{
ff3f866a 686 size: Math.pow(this.maxSize, 2),
20c6f652
JB
687 concurrency: 1
688 },
689 ...tasksQueueOptions
a20f0ba5
JB
690 }
691 }
692
c319c66b
JB
693 /**
694 * Whether the pool is full or not.
695 *
696 * The pool filling boolean status.
697 */
dea903a8
JB
698 protected get full (): boolean {
699 return this.workerNodes.length >= this.maxSize
700 }
c2ade475 701
c319c66b
JB
702 /**
703 * Whether the pool is busy or not.
704 *
705 * The pool busyness boolean status.
706 */
707 protected abstract get busy (): boolean
7c0ba920 708
6c6afb84 709 /**
3d76750a 710 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
711 *
712 * @returns Worker nodes busyness boolean status.
713 */
c2ade475 714 protected internalBusy (): boolean {
3d76750a
JB
715 if (this.opts.enableTasksQueue === true) {
716 return (
717 this.workerNodes.findIndex(
041dc05b 718 workerNode =>
3d76750a
JB
719 workerNode.info.ready &&
720 workerNode.usage.tasks.executing <
721 (this.opts.tasksQueueOptions?.concurrency as number)
722 ) === -1
723 )
3d76750a 724 }
419e8121
JB
725 return (
726 this.workerNodes.findIndex(
727 workerNode =>
728 workerNode.info.ready && workerNode.usage.tasks.executing === 0
729 ) === -1
730 )
cb70b19d
JB
731 }
732
e81c38f2 733 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
734 workerNodeKey: number,
735 message: MessageValue<Data>
736 ): Promise<boolean> {
737 const workerId = this.getWorkerInfo(workerNodeKey).id as number
738 return await new Promise<boolean>((resolve, reject) => {
739 this.registerWorkerMessageListener(workerNodeKey, message => {
740 if (
741 message.workerId === workerId &&
742 message.taskFunctionOperationStatus === true
743 ) {
744 resolve(true)
745 } else if (
746 message.workerId === workerId &&
747 message.taskFunctionOperationStatus === false
748 ) {
749 reject(
750 new Error(
751 `Task function operation ${
752 message.taskFunctionOperation as string
753 } failed on worker ${message.workerId}`
754 )
755 )
756 }
757 })
758 this.sendToWorker(workerNodeKey, message)
759 })
760 }
761
762 private async sendTaskFunctionOperationToWorkers (
e81c38f2
JB
763 message: Omit<MessageValue<Data>, 'workerId'>
764 ): Promise<boolean> {
765 return await new Promise<boolean>((resolve, reject) => {
766 const responsesReceived = new Array<MessageValue<Data | Response>>()
767 for (const [workerNodeKey] of this.workerNodes.entries()) {
768 this.registerWorkerMessageListener(workerNodeKey, message => {
769 if (message.taskFunctionOperationStatus != null) {
770 responsesReceived.push(message)
771 if (
772 responsesReceived.length === this.workerNodes.length &&
773 responsesReceived.every(
774 message => message.taskFunctionOperationStatus === true
775 )
776 ) {
777 resolve(true)
778 } else if (
779 responsesReceived.length === this.workerNodes.length &&
780 responsesReceived.some(
781 message => message.taskFunctionOperationStatus === false
782 )
783 ) {
784 reject(
785 new Error(
786 `Task function operation ${
787 message.taskFunctionOperation as string
72ae84a2 788 } failed on worker ${message.workerId as number}`
e81c38f2
JB
789 )
790 )
791 }
792 }
793 })
72ae84a2 794 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
795 }
796 })
6703b9f4
JB
797 }
798
799 /** @inheritDoc */
800 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
801 for (const workerNode of this.workerNodes) {
802 if (
803 Array.isArray(workerNode.info.taskFunctionNames) &&
804 workerNode.info.taskFunctionNames.includes(name)
805 ) {
806 return true
807 }
808 }
809 return false
6703b9f4
JB
810 }
811
812 /** @inheritDoc */
e81c38f2
JB
813 public async addTaskFunction (
814 name: string,
72ae84a2 815 taskFunction: TaskFunction<Data, Response>
e81c38f2 816 ): Promise<boolean> {
72ae84a2
JB
817 this.taskFunctions.set(name, taskFunction)
818 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
819 taskFunctionOperation: 'add',
820 taskFunctionName: name,
821 taskFunction: taskFunction.toString()
822 })
6703b9f4
JB
823 }
824
825 /** @inheritDoc */
e81c38f2 826 public async removeTaskFunction (name: string): Promise<boolean> {
72ae84a2
JB
827 this.taskFunctions.delete(name)
828 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
829 taskFunctionOperation: 'remove',
830 taskFunctionName: name
831 })
6703b9f4
JB
832 }
833
90d7d101 834 /** @inheritDoc */
6703b9f4 835 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
836 for (const workerNode of this.workerNodes) {
837 if (
6703b9f4
JB
838 Array.isArray(workerNode.info.taskFunctionNames) &&
839 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 840 ) {
6703b9f4 841 return workerNode.info.taskFunctionNames
f2dbbf95 842 }
90d7d101 843 }
f2dbbf95 844 return []
90d7d101
JB
845 }
846
6703b9f4 847 /** @inheritDoc */
e81c38f2 848 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 849 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
850 taskFunctionOperation: 'default',
851 taskFunctionName: name
852 })
6703b9f4
JB
853 }
854
375f7504
JB
855 private shallExecuteTask (workerNodeKey: number): boolean {
856 return (
857 this.tasksQueueSize(workerNodeKey) === 0 &&
858 this.workerNodes[workerNodeKey].usage.tasks.executing <
859 (this.opts.tasksQueueOptions?.concurrency as number)
860 )
861 }
862
afc003b2 863 /** @inheritDoc */
7d91a8cd
JB
864 public async execute (
865 data?: Data,
866 name?: string,
867 transferList?: TransferListItem[]
868 ): Promise<Response> {
52b71763 869 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
870 if (!this.started) {
871 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 872 return
15b176e0 873 }
7d91a8cd
JB
874 if (name != null && typeof name !== 'string') {
875 reject(new TypeError('name argument must be a string'))
9d2d0da1 876 return
7d91a8cd 877 }
90d7d101
JB
878 if (
879 name != null &&
880 typeof name === 'string' &&
881 name.trim().length === 0
882 ) {
f58b60b9 883 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 884 return
90d7d101 885 }
b558f6b5
JB
886 if (transferList != null && !Array.isArray(transferList)) {
887 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 888 return
b558f6b5
JB
889 }
890 const timestamp = performance.now()
891 const workerNodeKey = this.chooseWorkerNode()
501aea93 892 const task: Task<Data> = {
52b71763
JB
893 name: name ?? DEFAULT_TASK_NAME,
894 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
895 data: data ?? ({} as Data),
7d91a8cd 896 transferList,
52b71763 897 timestamp,
7629bdf1 898 taskId: randomUUID()
52b71763 899 }
7629bdf1 900 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
901 resolve,
902 reject,
501aea93 903 workerNodeKey
2e81254d 904 })
52b71763 905 if (
4e377863
JB
906 this.opts.enableTasksQueue === false ||
907 (this.opts.enableTasksQueue === true &&
375f7504 908 this.shallExecuteTask(workerNodeKey))
52b71763 909 ) {
501aea93 910 this.executeTask(workerNodeKey, task)
4e377863
JB
911 } else {
912 this.enqueueTask(workerNodeKey, task)
52b71763 913 }
2e81254d 914 })
280c2a77 915 }
c97c7edb 916
afc003b2 917 /** @inheritDoc */
c97c7edb 918 public async destroy (): Promise<void> {
1fbcaa7c 919 await Promise.all(
81c02522 920 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 921 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
922 })
923 )
33e6bb4c 924 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 925 this.started = false
c97c7edb
S
926 }
927
1e3214b6 928 protected async sendKillMessageToWorker (
72ae84a2 929 workerNodeKey: number
1e3214b6 930 ): Promise<void> {
9edb9717 931 await new Promise<void>((resolve, reject) => {
041dc05b 932 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
933 if (message.kill === 'success') {
934 resolve()
935 } else if (message.kill === 'failure') {
72ae84a2
JB
936 reject(
937 new Error(
938 `Worker ${
939 message.workerId as number
940 } kill message handling failed`
941 )
942 )
1e3214b6
JB
943 }
944 })
72ae84a2 945 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 946 })
1e3214b6
JB
947 }
948
4a6952ff 949 /**
aa9eede8 950 * Terminates the worker node given its worker node key.
4a6952ff 951 *
aa9eede8 952 * @param workerNodeKey - The worker node key.
4a6952ff 953 */
81c02522 954 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 955
729c563d 956 /**
6677a3d3
JB
957 * Setup hook to execute code before worker nodes are created in the abstract constructor.
958 * Can be overridden.
afc003b2
JB
959 *
960 * @virtual
729c563d 961 */
280c2a77 962 protected setupHook (): void {
965df41c 963 /* Intentionally empty */
280c2a77 964 }
c97c7edb 965
729c563d 966 /**
280c2a77
S
967 * Should return whether the worker is the main worker or not.
968 */
969 protected abstract isMain (): boolean
970
971 /**
2e81254d 972 * Hook executed before the worker task execution.
bf9549ae 973 * Can be overridden.
729c563d 974 *
f06e48d8 975 * @param workerNodeKey - The worker node key.
1c6fe997 976 * @param task - The task to execute.
729c563d 977 */
1c6fe997
JB
978 protected beforeTaskExecutionHook (
979 workerNodeKey: number,
980 task: Task<Data>
981 ): void {
94407def
JB
982 if (this.workerNodes[workerNodeKey]?.usage != null) {
983 const workerUsage = this.workerNodes[workerNodeKey].usage
984 ++workerUsage.tasks.executing
985 this.updateWaitTimeWorkerUsage(workerUsage, task)
986 }
987 if (
988 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
989 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
990 task.name as string
991 ) != null
992 ) {
db0e38ee 993 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 994 workerNodeKey
db0e38ee 995 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
996 ++taskFunctionWorkerUsage.tasks.executing
997 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 998 }
c97c7edb
S
999 }
1000
c01733f1 1001 /**
2e81254d 1002 * Hook executed after the worker task execution.
bf9549ae 1003 * Can be overridden.
c01733f1 1004 *
501aea93 1005 * @param workerNodeKey - The worker node key.
38e795c1 1006 * @param message - The received message.
c01733f1 1007 */
2e81254d 1008 protected afterTaskExecutionHook (
501aea93 1009 workerNodeKey: number,
2740a743 1010 message: MessageValue<Response>
bf9549ae 1011 ): void {
94407def
JB
1012 if (this.workerNodes[workerNodeKey]?.usage != null) {
1013 const workerUsage = this.workerNodes[workerNodeKey].usage
1014 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1015 this.updateRunTimeWorkerUsage(workerUsage, message)
1016 this.updateEluWorkerUsage(workerUsage, message)
1017 }
1018 if (
1019 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1020 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1021 message.taskPerformance?.name as string
94407def
JB
1022 ) != null
1023 ) {
db0e38ee 1024 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1025 workerNodeKey
db0e38ee 1026 ].getTaskFunctionWorkerUsage(
0628755c 1027 message.taskPerformance?.name as string
b558f6b5 1028 ) as WorkerUsage
db0e38ee
JB
1029 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1030 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1031 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1032 }
1033 }
1034
db0e38ee
JB
1035 /**
1036 * Whether the worker node shall update its task function worker usage or not.
1037 *
1038 * @param workerNodeKey - The worker node key.
1039 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1040 */
1041 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1042 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1043 return (
94407def 1044 workerInfo != null &&
6703b9f4
JB
1045 Array.isArray(workerInfo.taskFunctionNames) &&
1046 workerInfo.taskFunctionNames.length > 2
b558f6b5 1047 )
f1c06930
JB
1048 }
1049
1050 private updateTaskStatisticsWorkerUsage (
1051 workerUsage: WorkerUsage,
1052 message: MessageValue<Response>
1053 ): void {
a4e07f72 1054 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1055 if (
1056 workerTaskStatistics.executing != null &&
1057 workerTaskStatistics.executing > 0
1058 ) {
1059 --workerTaskStatistics.executing
5bb5be17 1060 }
6703b9f4 1061 if (message.workerError == null) {
98e72cda
JB
1062 ++workerTaskStatistics.executed
1063 } else {
a4e07f72 1064 ++workerTaskStatistics.failed
2740a743 1065 }
f8eb0a2a
JB
1066 }
1067
a4e07f72
JB
1068 private updateRunTimeWorkerUsage (
1069 workerUsage: WorkerUsage,
f8eb0a2a
JB
1070 message: MessageValue<Response>
1071 ): void {
6703b9f4 1072 if (message.workerError != null) {
dc021bcc
JB
1073 return
1074 }
e4f20deb
JB
1075 updateMeasurementStatistics(
1076 workerUsage.runTime,
1077 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1078 message.taskPerformance?.runTime ?? 0
e4f20deb 1079 )
f8eb0a2a
JB
1080 }
1081
a4e07f72
JB
1082 private updateWaitTimeWorkerUsage (
1083 workerUsage: WorkerUsage,
1c6fe997 1084 task: Task<Data>
f8eb0a2a 1085 ): void {
1c6fe997
JB
1086 const timestamp = performance.now()
1087 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1088 updateMeasurementStatistics(
1089 workerUsage.waitTime,
1090 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1091 taskWaitTime
e4f20deb 1092 )
c01733f1 1093 }
1094
a4e07f72 1095 private updateEluWorkerUsage (
5df69fab 1096 workerUsage: WorkerUsage,
62c15a68
JB
1097 message: MessageValue<Response>
1098 ): void {
6703b9f4 1099 if (message.workerError != null) {
dc021bcc
JB
1100 return
1101 }
008512c7
JB
1102 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1103 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1104 updateMeasurementStatistics(
1105 workerUsage.elu.active,
008512c7 1106 eluTaskStatisticsRequirements,
dc021bcc 1107 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1108 )
1109 updateMeasurementStatistics(
1110 workerUsage.elu.idle,
008512c7 1111 eluTaskStatisticsRequirements,
dc021bcc 1112 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1113 )
008512c7 1114 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1115 if (message.taskPerformance?.elu != null) {
f7510105
JB
1116 if (workerUsage.elu.utilization != null) {
1117 workerUsage.elu.utilization =
1118 (workerUsage.elu.utilization +
1119 message.taskPerformance.elu.utilization) /
1120 2
1121 } else {
1122 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1123 }
62c15a68
JB
1124 }
1125 }
1126 }
1127
280c2a77 1128 /**
f06e48d8 1129 * Chooses a worker node for the next task.
280c2a77 1130 *
6c6afb84 1131 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1132 *
aa9eede8 1133 * @returns The chosen worker node key
280c2a77 1134 */
6c6afb84 1135 private chooseWorkerNode (): number {
930dcf12 1136 if (this.shallCreateDynamicWorker()) {
aa9eede8 1137 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1138 if (
b1aae695 1139 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1140 ) {
aa9eede8 1141 return workerNodeKey
6c6afb84 1142 }
17393ac8 1143 }
930dcf12
JB
1144 return this.workerChoiceStrategyContext.execute()
1145 }
1146
6c6afb84
JB
1147 /**
1148 * Conditions for dynamic worker creation.
1149 *
1150 * @returns Whether to create a dynamic worker or not.
1151 */
1152 private shallCreateDynamicWorker (): boolean {
930dcf12 1153 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1154 }
1155
280c2a77 1156 /**
aa9eede8 1157 * Sends a message to worker given its worker node key.
280c2a77 1158 *
aa9eede8 1159 * @param workerNodeKey - The worker node key.
38e795c1 1160 * @param message - The message.
7d91a8cd 1161 * @param transferList - The optional array of transferable objects.
280c2a77
S
1162 */
1163 protected abstract sendToWorker (
aa9eede8 1164 workerNodeKey: number,
7d91a8cd
JB
1165 message: MessageValue<Data>,
1166 transferList?: TransferListItem[]
280c2a77
S
1167 ): void
1168
729c563d 1169 /**
41344292 1170 * Creates a new worker.
6c6afb84
JB
1171 *
1172 * @returns Newly created worker.
729c563d 1173 */
280c2a77 1174 protected abstract createWorker (): Worker
c97c7edb 1175
4a6952ff 1176 /**
aa9eede8 1177 * Creates a new, completely set up worker node.
4a6952ff 1178 *
aa9eede8 1179 * @returns New, completely set up worker node key.
4a6952ff 1180 */
aa9eede8 1181 protected createAndSetupWorkerNode (): number {
bdacc2d2 1182 const worker = this.createWorker()
280c2a77 1183
fd04474e 1184 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1185 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1186 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1187 worker.on('error', error => {
aad6fb64 1188 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1189 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1190 workerInfo.ready = false
0dc838e3 1191 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1192 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1193 if (
1194 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1195 this.started &&
1196 !this.starting
15b176e0 1197 ) {
9b106837 1198 if (workerInfo.dynamic) {
aa9eede8 1199 this.createAndSetupDynamicWorkerNode()
8a1260a3 1200 } else {
aa9eede8 1201 this.createAndSetupWorkerNode()
8a1260a3 1202 }
5baee0d7 1203 }
19dbc45b 1204 if (this.opts.enableTasksQueue === true) {
9b106837 1205 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1206 }
5baee0d7 1207 })
a35560ba 1208 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1209 worker.once('exit', () => {
f06e48d8 1210 this.removeWorkerNode(worker)
a974afa6 1211 })
280c2a77 1212
aa9eede8 1213 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1214
aa9eede8 1215 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1216
aa9eede8 1217 return workerNodeKey
c97c7edb 1218 }
be0676b3 1219
930dcf12 1220 /**
aa9eede8 1221 * Creates a new, completely set up dynamic worker node.
930dcf12 1222 *
aa9eede8 1223 * @returns New, completely set up dynamic worker node key.
930dcf12 1224 */
aa9eede8
JB
1225 protected createAndSetupDynamicWorkerNode (): number {
1226 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1227 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1228 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1229 message.workerId
aad6fb64 1230 )
aa9eede8 1231 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1232 // Kill message received from worker
930dcf12
JB
1233 if (
1234 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1235 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1236 ((this.opts.enableTasksQueue === false &&
aa9eede8 1237 workerUsage.tasks.executing === 0) ||
7b56f532 1238 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1239 workerUsage.tasks.executing === 0 &&
1240 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1241 ) {
041dc05b 1242 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1243 this.emitter?.emit(PoolEvents.error, error)
1244 })
930dcf12
JB
1245 }
1246 })
46b0bb09 1247 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1248 this.sendToWorker(workerNodeKey, {
e9dd5b66 1249 checkActive: true
21f710aa 1250 })
72ae84a2
JB
1251 if (this.taskFunctions.size > 0) {
1252 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1253 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1254 taskFunctionOperation: 'add',
1255 taskFunctionName,
1256 taskFunction: taskFunction.toString()
1257 }).catch(error => {
1258 this.emitter?.emit(PoolEvents.error, error)
1259 })
1260 }
1261 }
b5e113f6 1262 workerInfo.dynamic = true
b1aae695
JB
1263 if (
1264 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1265 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1266 ) {
b5e113f6
JB
1267 workerInfo.ready = true
1268 }
33e6bb4c 1269 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1270 return workerNodeKey
930dcf12
JB
1271 }
1272
a2ed5053 1273 /**
aa9eede8 1274 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1275 *
aa9eede8 1276 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1277 * @param listener - The message listener callback.
1278 */
85aeb3f3
JB
1279 protected abstract registerWorkerMessageListener<
1280 Message extends Data | Response
aa9eede8
JB
1281 >(
1282 workerNodeKey: number,
1283 listener: (message: MessageValue<Message>) => void
1284 ): void
a2ed5053
JB
1285
1286 /**
aa9eede8 1287 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1288 * Can be overridden.
1289 *
aa9eede8 1290 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1291 */
aa9eede8 1292 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1293 // Listen to worker messages.
aa9eede8 1294 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1295 // Send the startup message to worker.
aa9eede8 1296 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1297 // Send the statistics message to worker.
1298 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1299 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1300 this.workerNodes[workerNodeKey].onEmptyQueue =
1301 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1302 this.workerNodes[workerNodeKey].onBackPressure =
1303 this.tasksStealingOnBackPressure.bind(this)
1304 }
d2c73f82
JB
1305 }
1306
85aeb3f3 1307 /**
aa9eede8
JB
1308 * Sends the startup message to worker given its worker node key.
1309 *
1310 * @param workerNodeKey - The worker node key.
1311 */
1312 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1313
1314 /**
9edb9717 1315 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1316 *
aa9eede8 1317 * @param workerNodeKey - The worker node key.
85aeb3f3 1318 */
9edb9717 1319 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1320 this.sendToWorker(workerNodeKey, {
1321 statistics: {
1322 runTime:
1323 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1324 .runTime.aggregate,
1325 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1326 .elu.aggregate
72ae84a2 1327 }
aa9eede8
JB
1328 })
1329 }
a2ed5053
JB
1330
1331 private redistributeQueuedTasks (workerNodeKey: number): void {
1332 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1333 const destinationWorkerNodeKey = this.workerNodes.reduce(
1334 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1335 return workerNode.info.ready &&
1336 workerNode.usage.tasks.queued <
1337 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1338 ? workerNodeKey
1339 : minWorkerNodeKey
1340 },
1341 0
1342 )
72ae84a2 1343 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1344 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1345 this.executeTask(destinationWorkerNodeKey, task)
1346 } else {
1347 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1348 }
1349 }
1350 }
1351
b1838604
JB
1352 private updateTaskStolenStatisticsWorkerUsage (
1353 workerNodeKey: number,
b1838604
JB
1354 taskName: string
1355 ): void {
1a880eca 1356 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1357 if (workerNode?.usage != null) {
1358 ++workerNode.usage.tasks.stolen
1359 }
1360 if (
1361 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1362 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1363 ) {
1364 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1365 taskName
1366 ) as WorkerUsage
1367 ++taskFunctionWorkerUsage.tasks.stolen
1368 }
1369 }
1370
dd951876 1371 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1372 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1373 const workerNodes = this.workerNodes
a6b3272b 1374 .slice()
dd951876
JB
1375 .sort(
1376 (workerNodeA, workerNodeB) =>
1377 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1378 )
f201a0cd 1379 const sourceWorkerNode = workerNodes.find(
041dc05b 1380 workerNode =>
f201a0cd
JB
1381 workerNode.info.ready &&
1382 workerNode.info.id !== workerId &&
1383 workerNode.usage.tasks.queued > 0
1384 )
1385 if (sourceWorkerNode != null) {
72ae84a2 1386 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1387 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1388 this.executeTask(destinationWorkerNodeKey, task)
1389 } else {
1390 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1391 }
f201a0cd
JB
1392 this.updateTaskStolenStatisticsWorkerUsage(
1393 destinationWorkerNodeKey,
1394 task.name as string
1395 )
72695f86
JB
1396 }
1397 }
1398
1399 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1400 const sizeOffset = 1
1401 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1402 return
1403 }
72695f86
JB
1404 const sourceWorkerNode =
1405 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1406 const workerNodes = this.workerNodes
a6b3272b 1407 .slice()
72695f86
JB
1408 .sort(
1409 (workerNodeA, workerNodeB) =>
1410 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1411 )
1412 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1413 if (
0bc68267 1414 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1415 workerNode.info.ready &&
1416 workerNode.info.id !== workerId &&
0bc68267 1417 workerNode.usage.tasks.queued <
f778c355 1418 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1419 ) {
72ae84a2 1420 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1421 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1422 this.executeTask(workerNodeKey, task)
4de3d785 1423 } else {
dd951876 1424 this.enqueueTask(workerNodeKey, task)
4de3d785 1425 }
b1838604
JB
1426 this.updateTaskStolenStatisticsWorkerUsage(
1427 workerNodeKey,
b1838604
JB
1428 task.name as string
1429 )
10ecf8fd 1430 }
a2ed5053
JB
1431 }
1432 }
1433
be0676b3 1434 /**
aa9eede8 1435 * This method is the listener registered for each worker message.
be0676b3 1436 *
bdacc2d2 1437 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1438 */
1439 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1440 return message => {
21f710aa 1441 this.checkMessageWorkerId(message)
6703b9f4 1442 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1443 // Worker ready response received from worker
10e2aa7e 1444 this.handleWorkerReadyResponse(message)
7629bdf1 1445 } else if (message.taskId != null) {
81c02522 1446 // Task execution response received from worker
6b272951 1447 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1448 } else if (message.taskFunctionNames != null) {
1449 // Task function names message received from worker
46b0bb09
JB
1450 this.getWorkerInfo(
1451 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1452 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1453 }
1454 }
1455 }
1456
10e2aa7e 1457 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1458 if (message.ready === false) {
72ae84a2
JB
1459 throw new Error(
1460 `Worker ${message.workerId as number} failed to initialize`
1461 )
f05ed93c 1462 }
a5d15204 1463 const workerInfo = this.getWorkerInfo(
aad6fb64 1464 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1465 )
a5d15204 1466 workerInfo.ready = message.ready as boolean
6703b9f4 1467 workerInfo.taskFunctionNames = message.taskFunctionNames
2431bdb4
JB
1468 if (this.emitter != null && this.ready) {
1469 this.emitter.emit(PoolEvents.ready, this.info)
1470 }
6b272951
JB
1471 }
1472
1473 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1474 const { taskId, workerError, data } = message
5441aea6 1475 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1476 if (promiseResponse != null) {
6703b9f4
JB
1477 if (workerError != null) {
1478 this.emitter?.emit(PoolEvents.taskError, workerError)
1479 promiseResponse.reject(workerError.message)
6b272951 1480 } else {
5441aea6 1481 promiseResponse.resolve(data as Response)
6b272951 1482 }
501aea93
JB
1483 const workerNodeKey = promiseResponse.workerNodeKey
1484 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1485 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1486 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1487 if (
1488 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1489 this.tasksQueueSize(workerNodeKey) > 0 &&
1490 this.workerNodes[workerNodeKey].usage.tasks.executing <
1491 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1492 ) {
1493 this.executeTask(
1494 workerNodeKey,
1495 this.dequeueTask(workerNodeKey) as Task<Data>
1496 )
be0676b3
APA
1497 }
1498 }
be0676b3 1499 }
7c0ba920 1500
a1763c54 1501 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1502 if (this.busy) {
1503 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1504 }
1505 }
1506
1507 private checkAndEmitTaskQueuingEvents (): void {
1508 if (this.hasBackPressure()) {
1509 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1510 }
1511 }
1512
33e6bb4c
JB
1513 private checkAndEmitDynamicWorkerCreationEvents (): void {
1514 if (this.type === PoolTypes.dynamic) {
1515 if (this.full) {
1516 this.emitter?.emit(PoolEvents.full, this.info)
1517 }
1518 }
1519 }
1520
8a1260a3 1521 /**
aa9eede8 1522 * Gets the worker information given its worker node key.
8a1260a3
JB
1523 *
1524 * @param workerNodeKey - The worker node key.
3f09ed9f 1525 * @returns The worker information.
8a1260a3 1526 */
46b0bb09
JB
1527 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1528 return this.workerNodes[workerNodeKey].info
e221309a
JB
1529 }
1530
a05c10de 1531 /**
b0a4db63 1532 * Adds the given worker in the pool worker nodes.
ea7a90d3 1533 *
38e795c1 1534 * @param worker - The worker.
aa9eede8
JB
1535 * @returns The added worker node key.
1536 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1537 */
b0a4db63 1538 private addWorkerNode (worker: Worker): number {
671d5154
JB
1539 const workerNode = new WorkerNode<Worker, Data>(
1540 worker,
ff3f866a 1541 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1542 )
b97d82d8 1543 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1544 if (this.starting) {
1545 workerNode.info.ready = true
1546 }
aa9eede8 1547 this.workerNodes.push(workerNode)
aad6fb64 1548 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1549 if (workerNodeKey === -1) {
86ed0598 1550 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1551 }
1552 return workerNodeKey
ea7a90d3 1553 }
c923ce56 1554
51fe3d3c 1555 /**
f06e48d8 1556 * Removes the given worker from the pool worker nodes.
51fe3d3c 1557 *
f06e48d8 1558 * @param worker - The worker.
51fe3d3c 1559 */
416fd65c 1560 private removeWorkerNode (worker: Worker): void {
aad6fb64 1561 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1562 if (workerNodeKey !== -1) {
1563 this.workerNodes.splice(workerNodeKey, 1)
1564 this.workerChoiceStrategyContext.remove(workerNodeKey)
1565 }
51fe3d3c 1566 }
adc3c320 1567
e2b31e32
JB
1568 /** @inheritDoc */
1569 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1570 return (
e2b31e32
JB
1571 this.opts.enableTasksQueue === true &&
1572 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1573 )
1574 }
1575
1576 private hasBackPressure (): boolean {
1577 return (
1578 this.opts.enableTasksQueue === true &&
1579 this.workerNodes.findIndex(
041dc05b 1580 workerNode => !workerNode.hasBackPressure()
a1763c54 1581 ) === -1
9e844245 1582 )
e2b31e32
JB
1583 }
1584
b0a4db63 1585 /**
aa9eede8 1586 * Executes the given task on the worker given its worker node key.
b0a4db63 1587 *
aa9eede8 1588 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1589 * @param task - The task to execute.
1590 */
2e81254d 1591 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1592 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1593 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1594 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1595 }
1596
f9f00b5f 1597 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1598 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1599 this.checkAndEmitTaskQueuingEvents()
1600 return tasksQueueSize
adc3c320
JB
1601 }
1602
416fd65c 1603 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1604 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1605 }
1606
416fd65c 1607 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1608 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1609 }
1610
81c02522 1611 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1612 while (this.tasksQueueSize(workerNodeKey) > 0) {
1613 this.executeTask(
1614 workerNodeKey,
1615 this.dequeueTask(workerNodeKey) as Task<Data>
1616 )
ff733df7 1617 }
4b628b48 1618 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1619 }
1620
ef41a6e6
JB
1621 private flushTasksQueues (): void {
1622 for (const [workerNodeKey] of this.workerNodes.entries()) {
1623 this.flushTasksQueue(workerNodeKey)
1624 }
1625 }
c97c7edb 1626}