Merge branch 'master' into feature/task-functions
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
15b176e0
JB
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
47352846
JB
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
47352846 157 this.started = false
075e51d1 158 this.starting = false
47352846
JB
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 224 if (isPlainObject(opts)) {
47352846 225 this.opts.startWorkers = opts.startWorkers ?? true
2324f8c9
JB
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
0d80593b
JB
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
8990357d
JB
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
1f68cede 238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 251 }
aee46736
JB
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
2324f8c9
JB
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
b529c323 261 throw new Error(
aee46736 262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
263 )
264 }
7c0ba920
JB
265 }
266
0d80593b
JB
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
2324f8c9
JB
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
0d80593b
JB
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
8990357d 278 if (
2324f8c9 279 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
281 ) {
282 throw new TypeError(
8c0b113f 283 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
284 )
285 }
286 if (
2324f8c9 287 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 288 workerChoiceStrategyOptions.retries < 0
8990357d
JB
289 ) {
290 throw new RangeError(
8c0b113f 291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
292 )
293 }
49be33fe 294 if (
2324f8c9 295 workerChoiceStrategyOptions?.weights != null &&
6b27d407 296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
f0d7f803 302 if (
2324f8c9 303 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
0d80593b
JB
312 }
313
a20f0ba5 314 private checkValidTasksQueueOptions (
76d91ea0 315 tasksQueueOptions: TasksQueueOptions
a20f0ba5 316 ): void {
0d80593b
JB
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
f0d7f803 320 if (
b7d085c4 321 tasksQueueOptions?.concurrency != null &&
2324f8c9 322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
f0d7f803
JB
323 ) {
324 throw new TypeError(
20c6f652 325 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
326 )
327 }
328 if (
b7d085c4 329 tasksQueueOptions?.concurrency != null &&
2324f8c9 330 tasksQueueOptions.concurrency <= 0
f0d7f803 331 ) {
e695d66f 332 throw new RangeError(
2324f8c9 333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
20c6f652
JB
334 )
335 }
20c6f652 336 if (
b7d085c4 337 tasksQueueOptions?.size != null &&
2324f8c9 338 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 339 ) {
ff3f866a 340 throw new TypeError(
68dbcdc0 341 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
342 )
343 }
2324f8c9 344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 345 throw new RangeError(
2324f8c9 346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
347 )
348 }
349 }
350
08f3f44c 351 /** @inheritDoc */
6b27d407
JB
352 public get info (): PoolInfo {
353 return {
23ccf9d7 354 version,
6b27d407 355 type: this.type,
184855e6 356 worker: this.worker,
47352846 357 started: this.started,
2431bdb4
JB
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
360 minSize: this.minSize,
361 maxSize: this.maxSize,
c05f0d50
JB
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
1305e9a8
JB
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
f59e1027 369 workerNode.usage.tasks.executing === 0
a4e07f72
JB
370 ? accumulator + 1
371 : accumulator,
6b27d407
JB
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
377 0
378 ),
a4e07f72 379 executedTasks: this.workerNodes.reduce(
6b27d407 380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
f59e1027 386 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
387 0
388 ),
daf86646
JB
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
a1763c54
JB
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
68cbdc84
JB
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
a4e07f72
JB
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
f59e1027 415 accumulator + workerNode.usage.tasks.failed,
a4e07f72 416 0
1dcf8b7b
JB
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
98e72cda 421 minimum: round(
90d6701c 422 min(
98e72cda 423 ...this.workerNodes.map(
041dc05b 424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 425 )
1dcf8b7b
JB
426 )
427 ),
98e72cda 428 maximum: round(
90d6701c 429 max(
98e72cda 430 ...this.workerNodes.map(
041dc05b 431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 432 )
1dcf8b7b 433 )
98e72cda 434 ),
3baa0837
JB
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
98e72cda 444 )
dc021bcc 445 )
3baa0837 446 }),
98e72cda
JB
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
3baa0837
JB
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
98e72cda
JB
455 )
456 )
457 )
458 })
1dcf8b7b
JB
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
98e72cda 464 minimum: round(
90d6701c 465 min(
98e72cda 466 ...this.workerNodes.map(
041dc05b 467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 468 )
1dcf8b7b
JB
469 )
470 ),
98e72cda 471 maximum: round(
90d6701c 472 max(
98e72cda 473 ...this.workerNodes.map(
041dc05b 474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 475 )
1dcf8b7b 476 )
98e72cda 477 ),
3baa0837
JB
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
98e72cda 487 )
dc021bcc 488 )
3baa0837 489 }),
98e72cda
JB
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
3baa0837
JB
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
98e72cda
JB
498 )
499 )
500 )
501 })
1dcf8b7b
JB
502 }
503 })
6b27d407
JB
504 }
505 }
08f3f44c 506
aa9eede8
JB
507 /**
508 * The pool readiness boolean status.
509 */
2431bdb4
JB
510 private get ready (): boolean {
511 return (
b97d82d8
JB
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
2431bdb4
JB
519 )
520 }
521
afe0d5bf 522 /**
aa9eede8 523 * The approximate pool utilization.
afe0d5bf
JB
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
8e5ca040 528 const poolTimeCapacity =
fe7d90db 529 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
71514351 537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
538 0
539 )
8e5ca040 540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
541 }
542
8881ae32 543 /**
aa9eede8 544 * The pool type.
8881ae32
JB
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
184855e6 550 /**
aa9eede8 551 * The worker type.
184855e6
JB
552 */
553 protected abstract get worker (): WorkerType
554
c2ade475 555 /**
aa9eede8 556 * The pool minimum size.
c2ade475 557 */
8735b4e5
JB
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
ff733df7
JB
561
562 /**
aa9eede8 563 * The pool maximum size.
ff733df7 564 */
8735b4e5
JB
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
a35560ba 568
6b813701
JB
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
21f710aa 575 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
21f710aa 579 message.workerId != null &&
aad6fb64 580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
ffcbbad8 588 /**
f06e48d8 589 * Gets the given worker its worker node key.
ffcbbad8
JB
590 *
591 * @param worker - The worker.
f59e1027 592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 593 */
aad6fb64 594 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 595 return this.workerNodes.findIndex(
041dc05b 596 workerNode => workerNode.worker === worker
f06e48d8 597 )
bf9549ae
JB
598 }
599
aa9eede8
JB
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
aad6fb64 604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 605 */
72ae84a2 606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 607 return this.workerNodes.findIndex(
041dc05b 608 workerNode => workerNode.info.id === workerId
aad6fb64 609 )
aa9eede8
JB
610 }
611
afc003b2 612 /** @inheritDoc */
a35560ba 613 public setWorkerChoiceStrategy (
59219cbb
JB
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 616 ): void {
aee46736 617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 618 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
aa9eede8 625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 626 workerNode.resetUsage()
9edb9717 627 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 628 }
a20f0ba5
JB
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
0d80593b 635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
a20f0ba5
JB
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
a35560ba
S
642 )
643 }
644
a20f0ba5 645 /** @inheritDoc */
8f52842f
JB
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
a20f0ba5 650 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
ef41a6e6 653 this.flushTasksQueues()
a20f0ba5
JB
654 }
655 this.opts.enableTasksQueue = enable
8f52842f 656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
657 }
658
659 /** @inheritDoc */
8f52842f 660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 661 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
5baee0d7 676 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
9b38ab2d
JB
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
5b49e864 695 private setTasksQueueSize (size: number): void {
20c6f652 696 for (const workerNode of this.workerNodes) {
ff3f866a 697 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
698 }
699 }
700
d6ca1416
JB
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
c319c66b
JB
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
dea903a8
JB
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
c2ade475 735
c319c66b
JB
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
7c0ba920 742
6c6afb84 743 /**
3d76750a 744 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
c2ade475 748 protected internalBusy (): boolean {
3d76750a
JB
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
041dc05b 752 workerNode =>
3d76750a
JB
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
3d76750a 758 }
419e8121
JB
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
cb70b19d
JB
765 }
766
e81c38f2 767 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
771 const workerId = this.getWorkerInfo(workerNodeKey).id as number
772 return await new Promise<boolean>((resolve, reject) => {
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
785 `Task function operation ${
786 message.taskFunctionOperation as string
787 } failed on worker ${message.workerId}`
788 )
789 )
790 }
791 })
792 this.sendToWorker(workerNodeKey, message)
793 })
794 }
795
796 private async sendTaskFunctionOperationToWorkers (
e81c38f2
JB
797 message: Omit<MessageValue<Data>, 'workerId'>
798 ): Promise<boolean> {
799 return await new Promise<boolean>((resolve, reject) => {
800 const responsesReceived = new Array<MessageValue<Data | Response>>()
801 for (const [workerNodeKey] of this.workerNodes.entries()) {
802 this.registerWorkerMessageListener(workerNodeKey, message => {
803 if (message.taskFunctionOperationStatus != null) {
804 responsesReceived.push(message)
805 if (
806 responsesReceived.length === this.workerNodes.length &&
807 responsesReceived.every(
808 message => message.taskFunctionOperationStatus === true
809 )
810 ) {
811 resolve(true)
812 } else if (
813 responsesReceived.length === this.workerNodes.length &&
814 responsesReceived.some(
815 message => message.taskFunctionOperationStatus === false
816 )
817 ) {
818 reject(
819 new Error(
820 `Task function operation ${
821 message.taskFunctionOperation as string
72ae84a2 822 } failed on worker ${message.workerId as number}`
e81c38f2
JB
823 )
824 )
825 }
826 }
827 })
72ae84a2 828 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
829 }
830 })
6703b9f4
JB
831 }
832
833 /** @inheritDoc */
834 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
835 for (const workerNode of this.workerNodes) {
836 if (
837 Array.isArray(workerNode.info.taskFunctionNames) &&
838 workerNode.info.taskFunctionNames.includes(name)
839 ) {
840 return true
841 }
842 }
843 return false
6703b9f4
JB
844 }
845
846 /** @inheritDoc */
e81c38f2
JB
847 public async addTaskFunction (
848 name: string,
72ae84a2 849 taskFunction: TaskFunction<Data, Response>
e81c38f2 850 ): Promise<boolean> {
72ae84a2
JB
851 this.taskFunctions.set(name, taskFunction)
852 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
853 taskFunctionOperation: 'add',
854 taskFunctionName: name,
855 taskFunction: taskFunction.toString()
856 })
6703b9f4
JB
857 }
858
859 /** @inheritDoc */
e81c38f2 860 public async removeTaskFunction (name: string): Promise<boolean> {
72ae84a2
JB
861 this.taskFunctions.delete(name)
862 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
863 taskFunctionOperation: 'remove',
864 taskFunctionName: name
865 })
6703b9f4
JB
866 }
867
90d7d101 868 /** @inheritDoc */
6703b9f4 869 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
870 for (const workerNode of this.workerNodes) {
871 if (
6703b9f4
JB
872 Array.isArray(workerNode.info.taskFunctionNames) &&
873 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 874 ) {
6703b9f4 875 return workerNode.info.taskFunctionNames
f2dbbf95 876 }
90d7d101 877 }
f2dbbf95 878 return []
90d7d101
JB
879 }
880
6703b9f4 881 /** @inheritDoc */
e81c38f2 882 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 883 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
884 taskFunctionOperation: 'default',
885 taskFunctionName: name
886 })
6703b9f4
JB
887 }
888
375f7504
JB
889 private shallExecuteTask (workerNodeKey: number): boolean {
890 return (
891 this.tasksQueueSize(workerNodeKey) === 0 &&
892 this.workerNodes[workerNodeKey].usage.tasks.executing <
893 (this.opts.tasksQueueOptions?.concurrency as number)
894 )
895 }
896
afc003b2 897 /** @inheritDoc */
7d91a8cd
JB
898 public async execute (
899 data?: Data,
900 name?: string,
901 transferList?: TransferListItem[]
902 ): Promise<Response> {
52b71763 903 return await new Promise<Response>((resolve, reject) => {
15b176e0 904 if (!this.started) {
47352846 905 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 906 return
15b176e0 907 }
7d91a8cd
JB
908 if (name != null && typeof name !== 'string') {
909 reject(new TypeError('name argument must be a string'))
9d2d0da1 910 return
7d91a8cd 911 }
90d7d101
JB
912 if (
913 name != null &&
914 typeof name === 'string' &&
915 name.trim().length === 0
916 ) {
f58b60b9 917 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 918 return
90d7d101 919 }
b558f6b5
JB
920 if (transferList != null && !Array.isArray(transferList)) {
921 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 922 return
b558f6b5
JB
923 }
924 const timestamp = performance.now()
925 const workerNodeKey = this.chooseWorkerNode()
501aea93 926 const task: Task<Data> = {
52b71763
JB
927 name: name ?? DEFAULT_TASK_NAME,
928 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
929 data: data ?? ({} as Data),
7d91a8cd 930 transferList,
52b71763 931 timestamp,
7629bdf1 932 taskId: randomUUID()
52b71763 933 }
7629bdf1 934 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
935 resolve,
936 reject,
501aea93 937 workerNodeKey
2e81254d 938 })
52b71763 939 if (
4e377863
JB
940 this.opts.enableTasksQueue === false ||
941 (this.opts.enableTasksQueue === true &&
375f7504 942 this.shallExecuteTask(workerNodeKey))
52b71763 943 ) {
501aea93 944 this.executeTask(workerNodeKey, task)
4e377863
JB
945 } else {
946 this.enqueueTask(workerNodeKey, task)
52b71763 947 }
2e81254d 948 })
280c2a77 949 }
c97c7edb 950
47352846
JB
951 /** @inheritdoc */
952 public start (): void {
953 this.starting = true
954 while (
955 this.workerNodes.reduce(
956 (accumulator, workerNode) =>
957 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
958 0
959 ) < this.numberOfWorkers
960 ) {
961 this.createAndSetupWorkerNode()
962 }
963 this.starting = false
964 this.started = true
965 }
966
afc003b2 967 /** @inheritDoc */
c97c7edb 968 public async destroy (): Promise<void> {
1fbcaa7c 969 await Promise.all(
81c02522 970 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 971 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
972 })
973 )
33e6bb4c 974 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 975 this.started = false
c97c7edb
S
976 }
977
1e3214b6 978 protected async sendKillMessageToWorker (
72ae84a2 979 workerNodeKey: number
1e3214b6 980 ): Promise<void> {
9edb9717 981 await new Promise<void>((resolve, reject) => {
041dc05b 982 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
983 if (message.kill === 'success') {
984 resolve()
985 } else if (message.kill === 'failure') {
72ae84a2
JB
986 reject(
987 new Error(
988 `Worker ${
989 message.workerId as number
990 } kill message handling failed`
991 )
992 )
1e3214b6
JB
993 }
994 })
72ae84a2 995 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 996 })
1e3214b6
JB
997 }
998
4a6952ff 999 /**
aa9eede8 1000 * Terminates the worker node given its worker node key.
4a6952ff 1001 *
aa9eede8 1002 * @param workerNodeKey - The worker node key.
4a6952ff 1003 */
81c02522 1004 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1005
729c563d 1006 /**
6677a3d3
JB
1007 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1008 * Can be overridden.
afc003b2
JB
1009 *
1010 * @virtual
729c563d 1011 */
280c2a77 1012 protected setupHook (): void {
965df41c 1013 /* Intentionally empty */
280c2a77 1014 }
c97c7edb 1015
729c563d 1016 /**
280c2a77
S
1017 * Should return whether the worker is the main worker or not.
1018 */
1019 protected abstract isMain (): boolean
1020
1021 /**
2e81254d 1022 * Hook executed before the worker task execution.
bf9549ae 1023 * Can be overridden.
729c563d 1024 *
f06e48d8 1025 * @param workerNodeKey - The worker node key.
1c6fe997 1026 * @param task - The task to execute.
729c563d 1027 */
1c6fe997
JB
1028 protected beforeTaskExecutionHook (
1029 workerNodeKey: number,
1030 task: Task<Data>
1031 ): void {
94407def
JB
1032 if (this.workerNodes[workerNodeKey]?.usage != null) {
1033 const workerUsage = this.workerNodes[workerNodeKey].usage
1034 ++workerUsage.tasks.executing
1035 this.updateWaitTimeWorkerUsage(workerUsage, task)
1036 }
1037 if (
1038 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1039 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1040 task.name as string
1041 ) != null
1042 ) {
db0e38ee 1043 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1044 workerNodeKey
db0e38ee 1045 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1046 ++taskFunctionWorkerUsage.tasks.executing
1047 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1048 }
c97c7edb
S
1049 }
1050
c01733f1 1051 /**
2e81254d 1052 * Hook executed after the worker task execution.
bf9549ae 1053 * Can be overridden.
c01733f1 1054 *
501aea93 1055 * @param workerNodeKey - The worker node key.
38e795c1 1056 * @param message - The received message.
c01733f1 1057 */
2e81254d 1058 protected afterTaskExecutionHook (
501aea93 1059 workerNodeKey: number,
2740a743 1060 message: MessageValue<Response>
bf9549ae 1061 ): void {
94407def
JB
1062 if (this.workerNodes[workerNodeKey]?.usage != null) {
1063 const workerUsage = this.workerNodes[workerNodeKey].usage
1064 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1065 this.updateRunTimeWorkerUsage(workerUsage, message)
1066 this.updateEluWorkerUsage(workerUsage, message)
1067 }
1068 if (
1069 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1070 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1071 message.taskPerformance?.name as string
94407def
JB
1072 ) != null
1073 ) {
db0e38ee 1074 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1075 workerNodeKey
db0e38ee 1076 ].getTaskFunctionWorkerUsage(
0628755c 1077 message.taskPerformance?.name as string
b558f6b5 1078 ) as WorkerUsage
db0e38ee
JB
1079 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1080 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1081 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1082 }
1083 }
1084
db0e38ee
JB
1085 /**
1086 * Whether the worker node shall update its task function worker usage or not.
1087 *
1088 * @param workerNodeKey - The worker node key.
1089 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1090 */
1091 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1092 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1093 return (
94407def 1094 workerInfo != null &&
6703b9f4
JB
1095 Array.isArray(workerInfo.taskFunctionNames) &&
1096 workerInfo.taskFunctionNames.length > 2
b558f6b5 1097 )
f1c06930
JB
1098 }
1099
1100 private updateTaskStatisticsWorkerUsage (
1101 workerUsage: WorkerUsage,
1102 message: MessageValue<Response>
1103 ): void {
a4e07f72 1104 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1105 if (
1106 workerTaskStatistics.executing != null &&
1107 workerTaskStatistics.executing > 0
1108 ) {
1109 --workerTaskStatistics.executing
5bb5be17 1110 }
6703b9f4 1111 if (message.workerError == null) {
98e72cda
JB
1112 ++workerTaskStatistics.executed
1113 } else {
a4e07f72 1114 ++workerTaskStatistics.failed
2740a743 1115 }
f8eb0a2a
JB
1116 }
1117
a4e07f72
JB
1118 private updateRunTimeWorkerUsage (
1119 workerUsage: WorkerUsage,
f8eb0a2a
JB
1120 message: MessageValue<Response>
1121 ): void {
6703b9f4 1122 if (message.workerError != null) {
dc021bcc
JB
1123 return
1124 }
e4f20deb
JB
1125 updateMeasurementStatistics(
1126 workerUsage.runTime,
1127 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1128 message.taskPerformance?.runTime ?? 0
e4f20deb 1129 )
f8eb0a2a
JB
1130 }
1131
a4e07f72
JB
1132 private updateWaitTimeWorkerUsage (
1133 workerUsage: WorkerUsage,
1c6fe997 1134 task: Task<Data>
f8eb0a2a 1135 ): void {
1c6fe997
JB
1136 const timestamp = performance.now()
1137 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1138 updateMeasurementStatistics(
1139 workerUsage.waitTime,
1140 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1141 taskWaitTime
e4f20deb 1142 )
c01733f1 1143 }
1144
a4e07f72 1145 private updateEluWorkerUsage (
5df69fab 1146 workerUsage: WorkerUsage,
62c15a68
JB
1147 message: MessageValue<Response>
1148 ): void {
6703b9f4 1149 if (message.workerError != null) {
dc021bcc
JB
1150 return
1151 }
008512c7
JB
1152 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1153 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1154 updateMeasurementStatistics(
1155 workerUsage.elu.active,
008512c7 1156 eluTaskStatisticsRequirements,
dc021bcc 1157 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1158 )
1159 updateMeasurementStatistics(
1160 workerUsage.elu.idle,
008512c7 1161 eluTaskStatisticsRequirements,
dc021bcc 1162 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1163 )
008512c7 1164 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1165 if (message.taskPerformance?.elu != null) {
f7510105
JB
1166 if (workerUsage.elu.utilization != null) {
1167 workerUsage.elu.utilization =
1168 (workerUsage.elu.utilization +
1169 message.taskPerformance.elu.utilization) /
1170 2
1171 } else {
1172 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1173 }
62c15a68
JB
1174 }
1175 }
1176 }
1177
280c2a77 1178 /**
f06e48d8 1179 * Chooses a worker node for the next task.
280c2a77 1180 *
6c6afb84 1181 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1182 *
aa9eede8 1183 * @returns The chosen worker node key
280c2a77 1184 */
6c6afb84 1185 private chooseWorkerNode (): number {
930dcf12 1186 if (this.shallCreateDynamicWorker()) {
aa9eede8 1187 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1188 if (
b1aae695 1189 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1190 ) {
aa9eede8 1191 return workerNodeKey
6c6afb84 1192 }
17393ac8 1193 }
930dcf12
JB
1194 return this.workerChoiceStrategyContext.execute()
1195 }
1196
6c6afb84
JB
1197 /**
1198 * Conditions for dynamic worker creation.
1199 *
1200 * @returns Whether to create a dynamic worker or not.
1201 */
1202 private shallCreateDynamicWorker (): boolean {
930dcf12 1203 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1204 }
1205
280c2a77 1206 /**
aa9eede8 1207 * Sends a message to worker given its worker node key.
280c2a77 1208 *
aa9eede8 1209 * @param workerNodeKey - The worker node key.
38e795c1 1210 * @param message - The message.
7d91a8cd 1211 * @param transferList - The optional array of transferable objects.
280c2a77
S
1212 */
1213 protected abstract sendToWorker (
aa9eede8 1214 workerNodeKey: number,
7d91a8cd
JB
1215 message: MessageValue<Data>,
1216 transferList?: TransferListItem[]
280c2a77
S
1217 ): void
1218
729c563d 1219 /**
41344292 1220 * Creates a new worker.
6c6afb84
JB
1221 *
1222 * @returns Newly created worker.
729c563d 1223 */
280c2a77 1224 protected abstract createWorker (): Worker
c97c7edb 1225
4a6952ff 1226 /**
aa9eede8 1227 * Creates a new, completely set up worker node.
4a6952ff 1228 *
aa9eede8 1229 * @returns New, completely set up worker node key.
4a6952ff 1230 */
aa9eede8 1231 protected createAndSetupWorkerNode (): number {
bdacc2d2 1232 const worker = this.createWorker()
280c2a77 1233
fd04474e 1234 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1235 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1236 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1237 worker.on('error', error => {
aad6fb64 1238 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1239 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1240 workerInfo.ready = false
0dc838e3 1241 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1242 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1243 if (
b6bfca01 1244 this.started &&
9b38ab2d
JB
1245 !this.starting &&
1246 this.opts.restartWorkerOnError === true
15b176e0 1247 ) {
9b106837 1248 if (workerInfo.dynamic) {
aa9eede8 1249 this.createAndSetupDynamicWorkerNode()
8a1260a3 1250 } else {
aa9eede8 1251 this.createAndSetupWorkerNode()
8a1260a3 1252 }
5baee0d7 1253 }
9b38ab2d 1254 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1255 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1256 }
5baee0d7 1257 })
a35560ba 1258 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1259 worker.once('exit', () => {
f06e48d8 1260 this.removeWorkerNode(worker)
a974afa6 1261 })
280c2a77 1262
aa9eede8 1263 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1264
aa9eede8 1265 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1266
aa9eede8 1267 return workerNodeKey
c97c7edb 1268 }
be0676b3 1269
930dcf12 1270 /**
aa9eede8 1271 * Creates a new, completely set up dynamic worker node.
930dcf12 1272 *
aa9eede8 1273 * @returns New, completely set up dynamic worker node key.
930dcf12 1274 */
aa9eede8
JB
1275 protected createAndSetupDynamicWorkerNode (): number {
1276 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1277 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1278 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1279 message.workerId
aad6fb64 1280 )
aa9eede8 1281 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1282 // Kill message received from worker
930dcf12
JB
1283 if (
1284 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1285 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1286 ((this.opts.enableTasksQueue === false &&
aa9eede8 1287 workerUsage.tasks.executing === 0) ||
7b56f532 1288 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1289 workerUsage.tasks.executing === 0 &&
1290 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1291 ) {
041dc05b 1292 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1293 this.emitter?.emit(PoolEvents.error, error)
1294 })
930dcf12
JB
1295 }
1296 })
46b0bb09 1297 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1298 this.sendToWorker(workerNodeKey, {
e9dd5b66 1299 checkActive: true
21f710aa 1300 })
72ae84a2
JB
1301 if (this.taskFunctions.size > 0) {
1302 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1303 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1304 taskFunctionOperation: 'add',
1305 taskFunctionName,
1306 taskFunction: taskFunction.toString()
1307 }).catch(error => {
1308 this.emitter?.emit(PoolEvents.error, error)
1309 })
1310 }
1311 }
b5e113f6 1312 workerInfo.dynamic = true
b1aae695
JB
1313 if (
1314 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1315 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1316 ) {
b5e113f6
JB
1317 workerInfo.ready = true
1318 }
33e6bb4c 1319 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1320 return workerNodeKey
930dcf12
JB
1321 }
1322
a2ed5053 1323 /**
aa9eede8 1324 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1325 *
aa9eede8 1326 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1327 * @param listener - The message listener callback.
1328 */
85aeb3f3
JB
1329 protected abstract registerWorkerMessageListener<
1330 Message extends Data | Response
aa9eede8
JB
1331 >(
1332 workerNodeKey: number,
1333 listener: (message: MessageValue<Message>) => void
1334 ): void
a2ed5053
JB
1335
1336 /**
aa9eede8 1337 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1338 * Can be overridden.
1339 *
aa9eede8 1340 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1341 */
aa9eede8 1342 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1343 // Listen to worker messages.
aa9eede8 1344 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1345 // Send the startup message to worker.
aa9eede8 1346 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1347 // Send the statistics message to worker.
1348 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1349 if (this.opts.enableTasksQueue === true) {
dbd73092 1350 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1351 this.workerNodes[workerNodeKey].onEmptyQueue =
1352 this.taskStealingOnEmptyQueue.bind(this)
1353 }
1354 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1355 this.workerNodes[workerNodeKey].onBackPressure =
1356 this.tasksStealingOnBackPressure.bind(this)
1357 }
72695f86 1358 }
d2c73f82
JB
1359 }
1360
85aeb3f3 1361 /**
aa9eede8
JB
1362 * Sends the startup message to worker given its worker node key.
1363 *
1364 * @param workerNodeKey - The worker node key.
1365 */
1366 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1367
1368 /**
9edb9717 1369 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1370 *
aa9eede8 1371 * @param workerNodeKey - The worker node key.
85aeb3f3 1372 */
9edb9717 1373 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1374 this.sendToWorker(workerNodeKey, {
1375 statistics: {
1376 runTime:
1377 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1378 .runTime.aggregate,
1379 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1380 .elu.aggregate
72ae84a2 1381 }
aa9eede8
JB
1382 })
1383 }
a2ed5053
JB
1384
1385 private redistributeQueuedTasks (workerNodeKey: number): void {
1386 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1387 const destinationWorkerNodeKey = this.workerNodes.reduce(
1388 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1389 return workerNode.info.ready &&
1390 workerNode.usage.tasks.queued <
1391 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1392 ? workerNodeKey
1393 : minWorkerNodeKey
1394 },
1395 0
1396 )
72ae84a2 1397 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1398 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1399 this.executeTask(destinationWorkerNodeKey, task)
1400 } else {
1401 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1402 }
1403 }
1404 }
1405
b1838604
JB
1406 private updateTaskStolenStatisticsWorkerUsage (
1407 workerNodeKey: number,
b1838604
JB
1408 taskName: string
1409 ): void {
1a880eca 1410 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1411 if (workerNode?.usage != null) {
1412 ++workerNode.usage.tasks.stolen
1413 }
1414 if (
1415 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1416 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1417 ) {
1418 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1419 taskName
1420 ) as WorkerUsage
1421 ++taskFunctionWorkerUsage.tasks.stolen
1422 }
1423 }
1424
dd951876 1425 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1426 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1427 const workerNodes = this.workerNodes
a6b3272b 1428 .slice()
dd951876
JB
1429 .sort(
1430 (workerNodeA, workerNodeB) =>
1431 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1432 )
f201a0cd 1433 const sourceWorkerNode = workerNodes.find(
041dc05b 1434 workerNode =>
f201a0cd
JB
1435 workerNode.info.ready &&
1436 workerNode.info.id !== workerId &&
1437 workerNode.usage.tasks.queued > 0
1438 )
1439 if (sourceWorkerNode != null) {
72ae84a2 1440 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1441 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1442 this.executeTask(destinationWorkerNodeKey, task)
1443 } else {
1444 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1445 }
f201a0cd
JB
1446 this.updateTaskStolenStatisticsWorkerUsage(
1447 destinationWorkerNodeKey,
1448 task.name as string
1449 )
72695f86
JB
1450 }
1451 }
1452
1453 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1454 const sizeOffset = 1
1455 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1456 return
1457 }
72695f86
JB
1458 const sourceWorkerNode =
1459 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1460 const workerNodes = this.workerNodes
a6b3272b 1461 .slice()
72695f86
JB
1462 .sort(
1463 (workerNodeA, workerNodeB) =>
1464 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1465 )
1466 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1467 if (
0bc68267 1468 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1469 workerNode.info.ready &&
1470 workerNode.info.id !== workerId &&
0bc68267 1471 workerNode.usage.tasks.queued <
f778c355 1472 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1473 ) {
72ae84a2 1474 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1475 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1476 this.executeTask(workerNodeKey, task)
4de3d785 1477 } else {
dd951876 1478 this.enqueueTask(workerNodeKey, task)
4de3d785 1479 }
b1838604
JB
1480 this.updateTaskStolenStatisticsWorkerUsage(
1481 workerNodeKey,
b1838604
JB
1482 task.name as string
1483 )
10ecf8fd 1484 }
a2ed5053
JB
1485 }
1486 }
1487
be0676b3 1488 /**
aa9eede8 1489 * This method is the listener registered for each worker message.
be0676b3 1490 *
bdacc2d2 1491 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1492 */
1493 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1494 return message => {
21f710aa 1495 this.checkMessageWorkerId(message)
6703b9f4 1496 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1497 // Worker ready response received from worker
10e2aa7e 1498 this.handleWorkerReadyResponse(message)
7629bdf1 1499 } else if (message.taskId != null) {
81c02522 1500 // Task execution response received from worker
6b272951 1501 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1502 } else if (message.taskFunctionNames != null) {
1503 // Task function names message received from worker
46b0bb09
JB
1504 this.getWorkerInfo(
1505 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1506 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1507 }
1508 }
1509 }
1510
10e2aa7e 1511 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1512 if (message.ready === false) {
72ae84a2
JB
1513 throw new Error(
1514 `Worker ${message.workerId as number} failed to initialize`
1515 )
f05ed93c 1516 }
a5d15204 1517 const workerInfo = this.getWorkerInfo(
aad6fb64 1518 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1519 )
a5d15204 1520 workerInfo.ready = message.ready as boolean
6703b9f4 1521 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1522 if (this.ready) {
1523 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1524 }
6b272951
JB
1525 }
1526
1527 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1528 const { taskId, workerError, data } = message
5441aea6 1529 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1530 if (promiseResponse != null) {
6703b9f4
JB
1531 if (workerError != null) {
1532 this.emitter?.emit(PoolEvents.taskError, workerError)
1533 promiseResponse.reject(workerError.message)
6b272951 1534 } else {
5441aea6 1535 promiseResponse.resolve(data as Response)
6b272951 1536 }
501aea93
JB
1537 const workerNodeKey = promiseResponse.workerNodeKey
1538 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1539 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1540 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1541 if (
1542 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1543 this.tasksQueueSize(workerNodeKey) > 0 &&
1544 this.workerNodes[workerNodeKey].usage.tasks.executing <
1545 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1546 ) {
1547 this.executeTask(
1548 workerNodeKey,
1549 this.dequeueTask(workerNodeKey) as Task<Data>
1550 )
be0676b3
APA
1551 }
1552 }
be0676b3 1553 }
7c0ba920 1554
a1763c54 1555 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1556 if (this.busy) {
1557 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1558 }
1559 }
1560
1561 private checkAndEmitTaskQueuingEvents (): void {
1562 if (this.hasBackPressure()) {
1563 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1564 }
1565 }
1566
33e6bb4c
JB
1567 private checkAndEmitDynamicWorkerCreationEvents (): void {
1568 if (this.type === PoolTypes.dynamic) {
1569 if (this.full) {
1570 this.emitter?.emit(PoolEvents.full, this.info)
1571 }
1572 }
1573 }
1574
8a1260a3 1575 /**
aa9eede8 1576 * Gets the worker information given its worker node key.
8a1260a3
JB
1577 *
1578 * @param workerNodeKey - The worker node key.
3f09ed9f 1579 * @returns The worker information.
8a1260a3 1580 */
46b0bb09
JB
1581 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1582 return this.workerNodes[workerNodeKey].info
e221309a
JB
1583 }
1584
a05c10de 1585 /**
b0a4db63 1586 * Adds the given worker in the pool worker nodes.
ea7a90d3 1587 *
38e795c1 1588 * @param worker - The worker.
aa9eede8
JB
1589 * @returns The added worker node key.
1590 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1591 */
b0a4db63 1592 private addWorkerNode (worker: Worker): number {
671d5154
JB
1593 const workerNode = new WorkerNode<Worker, Data>(
1594 worker,
ff3f866a 1595 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1596 )
b97d82d8 1597 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1598 if (this.starting) {
1599 workerNode.info.ready = true
1600 }
aa9eede8 1601 this.workerNodes.push(workerNode)
aad6fb64 1602 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1603 if (workerNodeKey === -1) {
86ed0598 1604 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1605 }
1606 return workerNodeKey
ea7a90d3 1607 }
c923ce56 1608
51fe3d3c 1609 /**
f06e48d8 1610 * Removes the given worker from the pool worker nodes.
51fe3d3c 1611 *
f06e48d8 1612 * @param worker - The worker.
51fe3d3c 1613 */
416fd65c 1614 private removeWorkerNode (worker: Worker): void {
aad6fb64 1615 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1616 if (workerNodeKey !== -1) {
1617 this.workerNodes.splice(workerNodeKey, 1)
1618 this.workerChoiceStrategyContext.remove(workerNodeKey)
1619 }
51fe3d3c 1620 }
adc3c320 1621
e2b31e32
JB
1622 /** @inheritDoc */
1623 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1624 return (
e2b31e32
JB
1625 this.opts.enableTasksQueue === true &&
1626 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1627 )
1628 }
1629
1630 private hasBackPressure (): boolean {
1631 return (
1632 this.opts.enableTasksQueue === true &&
1633 this.workerNodes.findIndex(
041dc05b 1634 workerNode => !workerNode.hasBackPressure()
a1763c54 1635 ) === -1
9e844245 1636 )
e2b31e32
JB
1637 }
1638
b0a4db63 1639 /**
aa9eede8 1640 * Executes the given task on the worker given its worker node key.
b0a4db63 1641 *
aa9eede8 1642 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1643 * @param task - The task to execute.
1644 */
2e81254d 1645 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1646 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1647 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1648 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1649 }
1650
f9f00b5f 1651 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1652 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1653 this.checkAndEmitTaskQueuingEvents()
1654 return tasksQueueSize
adc3c320
JB
1655 }
1656
416fd65c 1657 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1658 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1659 }
1660
416fd65c 1661 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1662 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1663 }
1664
81c02522 1665 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1666 while (this.tasksQueueSize(workerNodeKey) > 0) {
1667 this.executeTask(
1668 workerNodeKey,
1669 this.dequeueTask(workerNodeKey) as Task<Data>
1670 )
ff733df7 1671 }
4b628b48 1672 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1673 }
1674
ef41a6e6
JB
1675 private flushTasksQueues (): void {
1676 for (const [workerNodeKey] of this.workerNodes.entries()) {
1677 this.flushTasksQueue(workerNodeKey)
1678 }
1679 }
c97c7edb 1680}