test: cover new pool methods
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
6703b9f4 24import type { TaskFunction } from '../worker/task-functions'
c4855468 25import {
65d7a1c9 26 type IPool,
7c5a1080 27 PoolEmitter,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
bbfa38a2
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
39 WorkerType,
40 WorkerUsage
e102732c 41} from './worker'
a35560ba 42import {
008512c7 43 type MeasurementStatisticsRequirements,
f0d7f803 44 Measurements,
a35560ba 45 WorkerChoiceStrategies,
a20f0ba5
JB
46 type WorkerChoiceStrategy,
47 type WorkerChoiceStrategyOptions
bdaf31cd
JB
48} from './selection-strategies/selection-strategies-types'
49import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 50import { version } from './version'
4b628b48 51import { WorkerNode } from './worker-node'
23ccf9d7 52
729c563d 53/**
ea7a90d3 54 * Base class that implements some shared logic for all poolifier pools.
729c563d 55 *
38e795c1 56 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
57 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
58 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 59 */
c97c7edb 60export abstract class AbstractPool<
f06e48d8 61 Worker extends IWorker,
d3c8a1a8
S
62 Data = unknown,
63 Response = unknown
c4855468 64> implements IPool<Worker, Data, Response> {
afc003b2 65 /** @inheritDoc */
4b628b48 66 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 67
afc003b2 68 /** @inheritDoc */
7c0ba920
JB
69 public readonly emitter?: PoolEmitter
70
be0676b3 71 /**
419e8121 72 * The task execution response promise map:
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
72ae84a2
JB
95 /**
96 * The task functions added at runtime map:
97 * - `key`: The task function name.
98 * - `value`: The task function itself.
99 */
100 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
101
15b176e0
JB
102 /**
103 * Whether the pool is started or not.
104 */
105 private started: boolean
47352846
JB
106 /**
107 * Whether the pool is starting or not.
108 */
109 private starting: boolean
afe0d5bf
JB
110 /**
111 * The start timestamp of the pool.
112 */
113 private readonly startTimestamp
114
729c563d
S
115 /**
116 * Constructs a new poolifier pool.
117 *
38e795c1 118 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 119 * @param filePath - Path to the worker file.
38e795c1 120 * @param opts - Options for the pool.
729c563d 121 */
c97c7edb 122 public constructor (
b4213b7f
JB
123 protected readonly numberOfWorkers: number,
124 protected readonly filePath: string,
125 protected readonly opts: PoolOptions<Worker>
c97c7edb 126 ) {
78cea37e 127 if (!this.isMain()) {
04f45163 128 throw new Error(
8c6d4acf 129 'Cannot start a pool from a worker with the same type as the pool'
04f45163 130 )
c97c7edb 131 }
8d3782fa 132 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 133 this.checkFilePath(this.filePath)
7c0ba920 134 this.checkPoolOptions(this.opts)
1086026a 135
7254e419
JB
136 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
137 this.executeTask = this.executeTask.bind(this)
138 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 139
6bd72cd0 140 if (this.opts.enableEvents === true) {
7c0ba920
JB
141 this.emitter = new PoolEmitter()
142 }
d59df138
JB
143 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
144 Worker,
145 Data,
146 Response
da309861
JB
147 >(
148 this,
149 this.opts.workerChoiceStrategy,
150 this.opts.workerChoiceStrategyOptions
151 )
b6b32453
JB
152
153 this.setupHook()
154
72ae84a2
JB
155 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
156
47352846 157 this.started = false
075e51d1 158 this.starting = false
47352846
JB
159 if (this.opts.startWorkers === true) {
160 this.start()
161 }
afe0d5bf
JB
162
163 this.startTimestamp = performance.now()
c97c7edb
S
164 }
165
a35560ba 166 private checkFilePath (filePath: string): void {
ffcbbad8
JB
167 if (
168 filePath == null ||
3d6dd312 169 typeof filePath !== 'string' ||
ffcbbad8
JB
170 (typeof filePath === 'string' && filePath.trim().length === 0)
171 ) {
c510fea7
APA
172 throw new Error('Please specify a file with a worker implementation')
173 }
3d6dd312
JB
174 if (!existsSync(filePath)) {
175 throw new Error(`Cannot find the worker file '${filePath}'`)
176 }
c510fea7
APA
177 }
178
8d3782fa
JB
179 private checkNumberOfWorkers (numberOfWorkers: number): void {
180 if (numberOfWorkers == null) {
181 throw new Error(
182 'Cannot instantiate a pool without specifying the number of workers'
183 )
78cea37e 184 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 185 throw new TypeError(
0d80593b 186 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
187 )
188 } else if (numberOfWorkers < 0) {
473c717a 189 throw new RangeError(
8d3782fa
JB
190 'Cannot instantiate a pool with a negative number of workers'
191 )
6b27d407 192 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
193 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
194 }
195 }
196
197 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 198 if (this.type === PoolTypes.dynamic) {
a5ed75b7 199 if (max == null) {
e695d66f 200 throw new TypeError(
a5ed75b7
JB
201 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
202 )
203 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
204 throw new TypeError(
205 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
206 )
207 } else if (min > max) {
079de991
JB
208 throw new RangeError(
209 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
210 )
b97d82d8 211 } else if (max === 0) {
079de991 212 throw new RangeError(
d640b48b 213 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
214 )
215 } else if (min === max) {
216 throw new RangeError(
217 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
218 )
219 }
8d3782fa
JB
220 }
221 }
222
7c0ba920 223 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 224 if (isPlainObject(opts)) {
47352846 225 this.opts.startWorkers = opts.startWorkers ?? true
2324f8c9
JB
226 this.checkValidWorkerChoiceStrategy(
227 opts.workerChoiceStrategy as WorkerChoiceStrategy
228 )
0d80593b
JB
229 this.opts.workerChoiceStrategy =
230 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
231 this.checkValidWorkerChoiceStrategyOptions(
232 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
233 )
8990357d
JB
234 this.opts.workerChoiceStrategyOptions = {
235 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
236 ...opts.workerChoiceStrategyOptions
237 }
1f68cede 238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
242 this.checkValidTasksQueueOptions(
243 opts.tasksQueueOptions as TasksQueueOptions
244 )
245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
246 opts.tasksQueueOptions as TasksQueueOptions
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 251 }
aee46736
JB
252 }
253
254 private checkValidWorkerChoiceStrategy (
255 workerChoiceStrategy: WorkerChoiceStrategy
256 ): void {
2324f8c9
JB
257 if (
258 workerChoiceStrategy != null &&
259 !Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)
260 ) {
b529c323 261 throw new Error(
aee46736 262 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
263 )
264 }
7c0ba920
JB
265 }
266
0d80593b
JB
267 private checkValidWorkerChoiceStrategyOptions (
268 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
269 ): void {
2324f8c9
JB
270 if (
271 workerChoiceStrategyOptions != null &&
272 !isPlainObject(workerChoiceStrategyOptions)
273 ) {
0d80593b
JB
274 throw new TypeError(
275 'Invalid worker choice strategy options: must be a plain object'
276 )
277 }
8990357d 278 if (
2324f8c9 279 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 280 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
281 ) {
282 throw new TypeError(
8c0b113f 283 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
284 )
285 }
286 if (
2324f8c9 287 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 288 workerChoiceStrategyOptions.retries < 0
8990357d
JB
289 ) {
290 throw new RangeError(
8c0b113f 291 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
292 )
293 }
49be33fe 294 if (
2324f8c9 295 workerChoiceStrategyOptions?.weights != null &&
6b27d407 296 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
297 ) {
298 throw new Error(
299 'Invalid worker choice strategy options: must have a weight for each worker node'
300 )
301 }
f0d7f803 302 if (
2324f8c9 303 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
304 !Object.values(Measurements).includes(
305 workerChoiceStrategyOptions.measurement
306 )
307 ) {
308 throw new Error(
309 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
310 )
311 }
0d80593b
JB
312 }
313
a20f0ba5 314 private checkValidTasksQueueOptions (
76d91ea0 315 tasksQueueOptions: TasksQueueOptions
a20f0ba5 316 ): void {
0d80593b
JB
317 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
318 throw new TypeError('Invalid tasks queue options: must be a plain object')
319 }
f0d7f803 320 if (
b7d085c4 321 tasksQueueOptions?.concurrency != null &&
2324f8c9 322 !Number.isSafeInteger(tasksQueueOptions.concurrency)
f0d7f803
JB
323 ) {
324 throw new TypeError(
20c6f652 325 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
326 )
327 }
328 if (
b7d085c4 329 tasksQueueOptions?.concurrency != null &&
2324f8c9 330 tasksQueueOptions.concurrency <= 0
f0d7f803 331 ) {
e695d66f 332 throw new RangeError(
2324f8c9 333 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
20c6f652
JB
334 )
335 }
20c6f652 336 if (
b7d085c4 337 tasksQueueOptions?.size != null &&
2324f8c9 338 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 339 ) {
ff3f866a 340 throw new TypeError(
68dbcdc0 341 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
342 )
343 }
2324f8c9 344 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 345 throw new RangeError(
2324f8c9 346 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
347 )
348 }
349 }
350
08f3f44c 351 /** @inheritDoc */
6b27d407
JB
352 public get info (): PoolInfo {
353 return {
23ccf9d7 354 version,
6b27d407 355 type: this.type,
184855e6 356 worker: this.worker,
47352846 357 started: this.started,
2431bdb4
JB
358 ready: this.ready,
359 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
360 minSize: this.minSize,
361 maxSize: this.maxSize,
c05f0d50
JB
362 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
363 .runTime.aggregate &&
1305e9a8
JB
364 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
366 workerNodes: this.workerNodes.length,
367 idleWorkerNodes: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
f59e1027 369 workerNode.usage.tasks.executing === 0
a4e07f72
JB
370 ? accumulator + 1
371 : accumulator,
6b27d407
JB
372 0
373 ),
374 busyWorkerNodes: this.workerNodes.reduce(
375 (accumulator, workerNode) =>
f59e1027 376 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
377 0
378 ),
a4e07f72 379 executedTasks: this.workerNodes.reduce(
6b27d407 380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
382 0
383 ),
384 executingTasks: this.workerNodes.reduce(
385 (accumulator, workerNode) =>
f59e1027 386 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
387 0
388 ),
daf86646
JB
389 ...(this.opts.enableTasksQueue === true && {
390 queuedTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.queued,
393 0
394 )
395 }),
396 ...(this.opts.enableTasksQueue === true && {
397 maxQueuedTasks: this.workerNodes.reduce(
398 (accumulator, workerNode) =>
399 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
400 0
401 )
402 }),
a1763c54
JB
403 ...(this.opts.enableTasksQueue === true && {
404 backPressure: this.hasBackPressure()
405 }),
68cbdc84
JB
406 ...(this.opts.enableTasksQueue === true && {
407 stolenTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + workerNode.usage.tasks.stolen,
410 0
411 )
412 }),
a4e07f72
JB
413 failedTasks: this.workerNodes.reduce(
414 (accumulator, workerNode) =>
f59e1027 415 accumulator + workerNode.usage.tasks.failed,
a4e07f72 416 0
1dcf8b7b
JB
417 ),
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.aggregate && {
420 runTime: {
98e72cda 421 minimum: round(
90d6701c 422 min(
98e72cda 423 ...this.workerNodes.map(
041dc05b 424 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 425 )
1dcf8b7b
JB
426 )
427 ),
98e72cda 428 maximum: round(
90d6701c 429 max(
98e72cda 430 ...this.workerNodes.map(
041dc05b 431 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 432 )
1dcf8b7b 433 )
98e72cda 434 ),
3baa0837
JB
435 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
436 .runTime.average && {
437 average: round(
438 average(
439 this.workerNodes.reduce<number[]>(
440 (accumulator, workerNode) =>
441 accumulator.concat(workerNode.usage.runTime.history),
442 []
443 )
98e72cda 444 )
dc021bcc 445 )
3baa0837 446 }),
98e72cda
JB
447 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
448 .runTime.median && {
449 median: round(
450 median(
3baa0837
JB
451 this.workerNodes.reduce<number[]>(
452 (accumulator, workerNode) =>
453 accumulator.concat(workerNode.usage.runTime.history),
454 []
98e72cda
JB
455 )
456 )
457 )
458 })
1dcf8b7b
JB
459 }
460 }),
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.aggregate && {
463 waitTime: {
98e72cda 464 minimum: round(
90d6701c 465 min(
98e72cda 466 ...this.workerNodes.map(
041dc05b 467 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 468 )
1dcf8b7b
JB
469 )
470 ),
98e72cda 471 maximum: round(
90d6701c 472 max(
98e72cda 473 ...this.workerNodes.map(
041dc05b 474 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 475 )
1dcf8b7b 476 )
98e72cda 477 ),
3baa0837
JB
478 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
479 .waitTime.average && {
480 average: round(
481 average(
482 this.workerNodes.reduce<number[]>(
483 (accumulator, workerNode) =>
484 accumulator.concat(workerNode.usage.waitTime.history),
485 []
486 )
98e72cda 487 )
dc021bcc 488 )
3baa0837 489 }),
98e72cda
JB
490 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
491 .waitTime.median && {
492 median: round(
493 median(
3baa0837
JB
494 this.workerNodes.reduce<number[]>(
495 (accumulator, workerNode) =>
496 accumulator.concat(workerNode.usage.waitTime.history),
497 []
98e72cda
JB
498 )
499 )
500 )
501 })
1dcf8b7b
JB
502 }
503 })
6b27d407
JB
504 }
505 }
08f3f44c 506
aa9eede8
JB
507 /**
508 * The pool readiness boolean status.
509 */
2431bdb4
JB
510 private get ready (): boolean {
511 return (
b97d82d8
JB
512 this.workerNodes.reduce(
513 (accumulator, workerNode) =>
514 !workerNode.info.dynamic && workerNode.info.ready
515 ? accumulator + 1
516 : accumulator,
517 0
518 ) >= this.minSize
2431bdb4
JB
519 )
520 }
521
afe0d5bf 522 /**
aa9eede8 523 * The approximate pool utilization.
afe0d5bf
JB
524 *
525 * @returns The pool utilization.
526 */
527 private get utilization (): number {
8e5ca040 528 const poolTimeCapacity =
fe7d90db 529 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
530 const totalTasksRunTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
535 const totalTasksWaitTime = this.workerNodes.reduce(
536 (accumulator, workerNode) =>
71514351 537 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
538 0
539 )
8e5ca040 540 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
541 }
542
8881ae32 543 /**
aa9eede8 544 * The pool type.
8881ae32
JB
545 *
546 * If it is `'dynamic'`, it provides the `max` property.
547 */
548 protected abstract get type (): PoolType
549
184855e6 550 /**
aa9eede8 551 * The worker type.
184855e6
JB
552 */
553 protected abstract get worker (): WorkerType
554
c2ade475 555 /**
aa9eede8 556 * The pool minimum size.
c2ade475 557 */
8735b4e5
JB
558 protected get minSize (): number {
559 return this.numberOfWorkers
560 }
ff733df7
JB
561
562 /**
aa9eede8 563 * The pool maximum size.
ff733df7 564 */
8735b4e5
JB
565 protected get maxSize (): number {
566 return this.max ?? this.numberOfWorkers
567 }
a35560ba 568
6b813701
JB
569 /**
570 * Checks if the worker id sent in the received message from a worker is valid.
571 *
572 * @param message - The received message.
573 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
574 */
21f710aa 575 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
576 if (message.workerId == null) {
577 throw new Error('Worker message received without worker id')
578 } else if (
21f710aa 579 message.workerId != null &&
aad6fb64 580 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
581 ) {
582 throw new Error(
583 `Worker message received from unknown worker '${message.workerId}'`
584 )
585 }
586 }
587
ffcbbad8 588 /**
f06e48d8 589 * Gets the given worker its worker node key.
ffcbbad8
JB
590 *
591 * @param worker - The worker.
f59e1027 592 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 593 */
aad6fb64 594 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 595 return this.workerNodes.findIndex(
041dc05b 596 workerNode => workerNode.worker === worker
f06e48d8 597 )
bf9549ae
JB
598 }
599
aa9eede8
JB
600 /**
601 * Gets the worker node key given its worker id.
602 *
603 * @param workerId - The worker id.
aad6fb64 604 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 605 */
72ae84a2 606 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 607 return this.workerNodes.findIndex(
041dc05b 608 workerNode => workerNode.info.id === workerId
aad6fb64 609 )
aa9eede8
JB
610 }
611
afc003b2 612 /** @inheritDoc */
a35560ba 613 public setWorkerChoiceStrategy (
59219cbb
JB
614 workerChoiceStrategy: WorkerChoiceStrategy,
615 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 616 ): void {
aee46736 617 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 618 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
619 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
620 this.opts.workerChoiceStrategy
621 )
622 if (workerChoiceStrategyOptions != null) {
623 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
624 }
aa9eede8 625 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 626 workerNode.resetUsage()
9edb9717 627 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 628 }
a20f0ba5
JB
629 }
630
631 /** @inheritDoc */
632 public setWorkerChoiceStrategyOptions (
633 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
634 ): void {
0d80593b 635 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
636 this.opts.workerChoiceStrategyOptions = {
637 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
638 ...workerChoiceStrategyOptions
639 }
a20f0ba5
JB
640 this.workerChoiceStrategyContext.setOptions(
641 this.opts.workerChoiceStrategyOptions
a35560ba
S
642 )
643 }
644
a20f0ba5 645 /** @inheritDoc */
8f52842f
JB
646 public enableTasksQueue (
647 enable: boolean,
648 tasksQueueOptions?: TasksQueueOptions
649 ): void {
a20f0ba5 650 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
651 this.unsetTaskStealing()
652 this.unsetTasksStealingOnBackPressure()
ef41a6e6 653 this.flushTasksQueues()
a20f0ba5
JB
654 }
655 this.opts.enableTasksQueue = enable
8f52842f 656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
657 }
658
659 /** @inheritDoc */
8f52842f 660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 661 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 665 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416
JB
666 if (this.opts.tasksQueueOptions.taskStealing === true) {
667 this.setTaskStealing()
668 } else {
669 this.unsetTaskStealing()
670 }
671 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
672 this.setTasksStealingOnBackPressure()
673 } else {
674 this.unsetTasksStealingOnBackPressure()
675 }
5baee0d7 676 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
677 delete this.opts.tasksQueueOptions
678 }
679 }
680
9b38ab2d
JB
681 private buildTasksQueueOptions (
682 tasksQueueOptions: TasksQueueOptions
683 ): TasksQueueOptions {
684 return {
685 ...{
686 size: Math.pow(this.maxSize, 2),
687 concurrency: 1,
688 taskStealing: true,
689 tasksStealingOnBackPressure: true
690 },
691 ...tasksQueueOptions
692 }
693 }
694
5b49e864 695 private setTasksQueueSize (size: number): void {
20c6f652 696 for (const workerNode of this.workerNodes) {
ff3f866a 697 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
698 }
699 }
700
d6ca1416
JB
701 private setTaskStealing (): void {
702 for (const [workerNodeKey] of this.workerNodes.entries()) {
703 this.workerNodes[workerNodeKey].onEmptyQueue =
704 this.taskStealingOnEmptyQueue.bind(this)
705 }
706 }
707
708 private unsetTaskStealing (): void {
709 for (const [workerNodeKey] of this.workerNodes.entries()) {
710 delete this.workerNodes[workerNodeKey].onEmptyQueue
711 }
712 }
713
714 private setTasksStealingOnBackPressure (): void {
715 for (const [workerNodeKey] of this.workerNodes.entries()) {
716 this.workerNodes[workerNodeKey].onBackPressure =
717 this.tasksStealingOnBackPressure.bind(this)
718 }
719 }
720
721 private unsetTasksStealingOnBackPressure (): void {
722 for (const [workerNodeKey] of this.workerNodes.entries()) {
723 delete this.workerNodes[workerNodeKey].onBackPressure
724 }
725 }
726
c319c66b
JB
727 /**
728 * Whether the pool is full or not.
729 *
730 * The pool filling boolean status.
731 */
dea903a8
JB
732 protected get full (): boolean {
733 return this.workerNodes.length >= this.maxSize
734 }
c2ade475 735
c319c66b
JB
736 /**
737 * Whether the pool is busy or not.
738 *
739 * The pool busyness boolean status.
740 */
741 protected abstract get busy (): boolean
7c0ba920 742
6c6afb84 743 /**
3d76750a 744 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
745 *
746 * @returns Worker nodes busyness boolean status.
747 */
c2ade475 748 protected internalBusy (): boolean {
3d76750a
JB
749 if (this.opts.enableTasksQueue === true) {
750 return (
751 this.workerNodes.findIndex(
041dc05b 752 workerNode =>
3d76750a
JB
753 workerNode.info.ready &&
754 workerNode.usage.tasks.executing <
755 (this.opts.tasksQueueOptions?.concurrency as number)
756 ) === -1
757 )
3d76750a 758 }
419e8121
JB
759 return (
760 this.workerNodes.findIndex(
761 workerNode =>
762 workerNode.info.ready && workerNode.usage.tasks.executing === 0
763 ) === -1
764 )
cb70b19d
JB
765 }
766
e81c38f2 767 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
768 workerNodeKey: number,
769 message: MessageValue<Data>
770 ): Promise<boolean> {
771 const workerId = this.getWorkerInfo(workerNodeKey).id as number
772 return await new Promise<boolean>((resolve, reject) => {
773 this.registerWorkerMessageListener(workerNodeKey, message => {
774 if (
775 message.workerId === workerId &&
776 message.taskFunctionOperationStatus === true
777 ) {
778 resolve(true)
779 } else if (
780 message.workerId === workerId &&
781 message.taskFunctionOperationStatus === false
782 ) {
783 reject(
784 new Error(
785 `Task function operation ${
786 message.taskFunctionOperation as string
787 } failed on worker ${message.workerId}`
788 )
789 )
790 }
791 })
792 this.sendToWorker(workerNodeKey, message)
793 })
794 }
795
796 private async sendTaskFunctionOperationToWorkers (
e81c38f2
JB
797 message: Omit<MessageValue<Data>, 'workerId'>
798 ): Promise<boolean> {
799 return await new Promise<boolean>((resolve, reject) => {
800 const responsesReceived = new Array<MessageValue<Data | Response>>()
801 for (const [workerNodeKey] of this.workerNodes.entries()) {
802 this.registerWorkerMessageListener(workerNodeKey, message => {
803 if (message.taskFunctionOperationStatus != null) {
804 responsesReceived.push(message)
805 if (
806 responsesReceived.length === this.workerNodes.length &&
807 responsesReceived.every(
808 message => message.taskFunctionOperationStatus === true
809 )
810 ) {
811 resolve(true)
812 } else if (
813 responsesReceived.length === this.workerNodes.length &&
814 responsesReceived.some(
815 message => message.taskFunctionOperationStatus === false
816 )
817 ) {
818 reject(
819 new Error(
820 `Task function operation ${
821 message.taskFunctionOperation as string
72ae84a2 822 } failed on worker ${message.workerId as number}`
e81c38f2
JB
823 )
824 )
825 }
826 }
827 })
72ae84a2 828 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
829 }
830 })
6703b9f4
JB
831 }
832
833 /** @inheritDoc */
834 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
835 for (const workerNode of this.workerNodes) {
836 if (
837 Array.isArray(workerNode.info.taskFunctionNames) &&
838 workerNode.info.taskFunctionNames.includes(name)
839 ) {
840 return true
841 }
842 }
843 return false
6703b9f4
JB
844 }
845
846 /** @inheritDoc */
e81c38f2
JB
847 public async addTaskFunction (
848 name: string,
72ae84a2 849 taskFunction: TaskFunction<Data, Response>
e81c38f2 850 ): Promise<boolean> {
72ae84a2
JB
851 this.taskFunctions.set(name, taskFunction)
852 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
853 taskFunctionOperation: 'add',
854 taskFunctionName: name,
855 taskFunction: taskFunction.toString()
856 })
6703b9f4
JB
857 }
858
859 /** @inheritDoc */
e81c38f2 860 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
861 if (!this.taskFunctions.has(name)) {
862 throw new Error(
863 'Cannot remove a task function that does not exist on the pool side'
864 )
865 }
72ae84a2
JB
866 this.taskFunctions.delete(name)
867 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
868 taskFunctionOperation: 'remove',
869 taskFunctionName: name
870 })
6703b9f4
JB
871 }
872
90d7d101 873 /** @inheritDoc */
6703b9f4 874 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
875 for (const workerNode of this.workerNodes) {
876 if (
6703b9f4
JB
877 Array.isArray(workerNode.info.taskFunctionNames) &&
878 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 879 ) {
6703b9f4 880 return workerNode.info.taskFunctionNames
f2dbbf95 881 }
90d7d101 882 }
f2dbbf95 883 return []
90d7d101
JB
884 }
885
6703b9f4 886 /** @inheritDoc */
e81c38f2 887 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 888 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
889 taskFunctionOperation: 'default',
890 taskFunctionName: name
891 })
6703b9f4
JB
892 }
893
375f7504
JB
894 private shallExecuteTask (workerNodeKey: number): boolean {
895 return (
896 this.tasksQueueSize(workerNodeKey) === 0 &&
897 this.workerNodes[workerNodeKey].usage.tasks.executing <
898 (this.opts.tasksQueueOptions?.concurrency as number)
899 )
900 }
901
afc003b2 902 /** @inheritDoc */
7d91a8cd
JB
903 public async execute (
904 data?: Data,
905 name?: string,
906 transferList?: TransferListItem[]
907 ): Promise<Response> {
52b71763 908 return await new Promise<Response>((resolve, reject) => {
15b176e0 909 if (!this.started) {
47352846 910 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 911 return
15b176e0 912 }
7d91a8cd
JB
913 if (name != null && typeof name !== 'string') {
914 reject(new TypeError('name argument must be a string'))
9d2d0da1 915 return
7d91a8cd 916 }
90d7d101
JB
917 if (
918 name != null &&
919 typeof name === 'string' &&
920 name.trim().length === 0
921 ) {
f58b60b9 922 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 923 return
90d7d101 924 }
b558f6b5
JB
925 if (transferList != null && !Array.isArray(transferList)) {
926 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 927 return
b558f6b5
JB
928 }
929 const timestamp = performance.now()
930 const workerNodeKey = this.chooseWorkerNode()
501aea93 931 const task: Task<Data> = {
52b71763
JB
932 name: name ?? DEFAULT_TASK_NAME,
933 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
934 data: data ?? ({} as Data),
7d91a8cd 935 transferList,
52b71763 936 timestamp,
7629bdf1 937 taskId: randomUUID()
52b71763 938 }
7629bdf1 939 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
940 resolve,
941 reject,
501aea93 942 workerNodeKey
2e81254d 943 })
52b71763 944 if (
4e377863
JB
945 this.opts.enableTasksQueue === false ||
946 (this.opts.enableTasksQueue === true &&
375f7504 947 this.shallExecuteTask(workerNodeKey))
52b71763 948 ) {
501aea93 949 this.executeTask(workerNodeKey, task)
4e377863
JB
950 } else {
951 this.enqueueTask(workerNodeKey, task)
52b71763 952 }
2e81254d 953 })
280c2a77 954 }
c97c7edb 955
47352846
JB
956 /** @inheritdoc */
957 public start (): void {
958 this.starting = true
959 while (
960 this.workerNodes.reduce(
961 (accumulator, workerNode) =>
962 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
963 0
964 ) < this.numberOfWorkers
965 ) {
966 this.createAndSetupWorkerNode()
967 }
968 this.starting = false
969 this.started = true
970 }
971
afc003b2 972 /** @inheritDoc */
c97c7edb 973 public async destroy (): Promise<void> {
1fbcaa7c 974 await Promise.all(
81c02522 975 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 976 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
977 })
978 )
33e6bb4c 979 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 980 this.started = false
c97c7edb
S
981 }
982
1e3214b6 983 protected async sendKillMessageToWorker (
72ae84a2 984 workerNodeKey: number
1e3214b6 985 ): Promise<void> {
9edb9717 986 await new Promise<void>((resolve, reject) => {
041dc05b 987 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
988 if (message.kill === 'success') {
989 resolve()
990 } else if (message.kill === 'failure') {
72ae84a2
JB
991 reject(
992 new Error(
993 `Worker ${
994 message.workerId as number
995 } kill message handling failed`
996 )
997 )
1e3214b6
JB
998 }
999 })
72ae84a2 1000 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1001 })
1e3214b6
JB
1002 }
1003
4a6952ff 1004 /**
aa9eede8 1005 * Terminates the worker node given its worker node key.
4a6952ff 1006 *
aa9eede8 1007 * @param workerNodeKey - The worker node key.
4a6952ff 1008 */
81c02522 1009 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1010
729c563d 1011 /**
6677a3d3
JB
1012 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1013 * Can be overridden.
afc003b2
JB
1014 *
1015 * @virtual
729c563d 1016 */
280c2a77 1017 protected setupHook (): void {
965df41c 1018 /* Intentionally empty */
280c2a77 1019 }
c97c7edb 1020
729c563d 1021 /**
280c2a77
S
1022 * Should return whether the worker is the main worker or not.
1023 */
1024 protected abstract isMain (): boolean
1025
1026 /**
2e81254d 1027 * Hook executed before the worker task execution.
bf9549ae 1028 * Can be overridden.
729c563d 1029 *
f06e48d8 1030 * @param workerNodeKey - The worker node key.
1c6fe997 1031 * @param task - The task to execute.
729c563d 1032 */
1c6fe997
JB
1033 protected beforeTaskExecutionHook (
1034 workerNodeKey: number,
1035 task: Task<Data>
1036 ): void {
94407def
JB
1037 if (this.workerNodes[workerNodeKey]?.usage != null) {
1038 const workerUsage = this.workerNodes[workerNodeKey].usage
1039 ++workerUsage.tasks.executing
1040 this.updateWaitTimeWorkerUsage(workerUsage, task)
1041 }
1042 if (
1043 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1044 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1045 task.name as string
1046 ) != null
1047 ) {
db0e38ee 1048 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1049 workerNodeKey
db0e38ee 1050 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1051 ++taskFunctionWorkerUsage.tasks.executing
1052 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1053 }
c97c7edb
S
1054 }
1055
c01733f1 1056 /**
2e81254d 1057 * Hook executed after the worker task execution.
bf9549ae 1058 * Can be overridden.
c01733f1 1059 *
501aea93 1060 * @param workerNodeKey - The worker node key.
38e795c1 1061 * @param message - The received message.
c01733f1 1062 */
2e81254d 1063 protected afterTaskExecutionHook (
501aea93 1064 workerNodeKey: number,
2740a743 1065 message: MessageValue<Response>
bf9549ae 1066 ): void {
94407def
JB
1067 if (this.workerNodes[workerNodeKey]?.usage != null) {
1068 const workerUsage = this.workerNodes[workerNodeKey].usage
1069 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1070 this.updateRunTimeWorkerUsage(workerUsage, message)
1071 this.updateEluWorkerUsage(workerUsage, message)
1072 }
1073 if (
1074 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1075 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1076 message.taskPerformance?.name as string
94407def
JB
1077 ) != null
1078 ) {
db0e38ee 1079 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1080 workerNodeKey
db0e38ee 1081 ].getTaskFunctionWorkerUsage(
0628755c 1082 message.taskPerformance?.name as string
b558f6b5 1083 ) as WorkerUsage
db0e38ee
JB
1084 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1085 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1086 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1087 }
1088 }
1089
db0e38ee
JB
1090 /**
1091 * Whether the worker node shall update its task function worker usage or not.
1092 *
1093 * @param workerNodeKey - The worker node key.
1094 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1095 */
1096 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1097 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1098 return (
94407def 1099 workerInfo != null &&
6703b9f4
JB
1100 Array.isArray(workerInfo.taskFunctionNames) &&
1101 workerInfo.taskFunctionNames.length > 2
b558f6b5 1102 )
f1c06930
JB
1103 }
1104
1105 private updateTaskStatisticsWorkerUsage (
1106 workerUsage: WorkerUsage,
1107 message: MessageValue<Response>
1108 ): void {
a4e07f72 1109 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1110 if (
1111 workerTaskStatistics.executing != null &&
1112 workerTaskStatistics.executing > 0
1113 ) {
1114 --workerTaskStatistics.executing
5bb5be17 1115 }
6703b9f4 1116 if (message.workerError == null) {
98e72cda
JB
1117 ++workerTaskStatistics.executed
1118 } else {
a4e07f72 1119 ++workerTaskStatistics.failed
2740a743 1120 }
f8eb0a2a
JB
1121 }
1122
a4e07f72
JB
1123 private updateRunTimeWorkerUsage (
1124 workerUsage: WorkerUsage,
f8eb0a2a
JB
1125 message: MessageValue<Response>
1126 ): void {
6703b9f4 1127 if (message.workerError != null) {
dc021bcc
JB
1128 return
1129 }
e4f20deb
JB
1130 updateMeasurementStatistics(
1131 workerUsage.runTime,
1132 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1133 message.taskPerformance?.runTime ?? 0
e4f20deb 1134 )
f8eb0a2a
JB
1135 }
1136
a4e07f72
JB
1137 private updateWaitTimeWorkerUsage (
1138 workerUsage: WorkerUsage,
1c6fe997 1139 task: Task<Data>
f8eb0a2a 1140 ): void {
1c6fe997
JB
1141 const timestamp = performance.now()
1142 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1143 updateMeasurementStatistics(
1144 workerUsage.waitTime,
1145 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1146 taskWaitTime
e4f20deb 1147 )
c01733f1 1148 }
1149
a4e07f72 1150 private updateEluWorkerUsage (
5df69fab 1151 workerUsage: WorkerUsage,
62c15a68
JB
1152 message: MessageValue<Response>
1153 ): void {
6703b9f4 1154 if (message.workerError != null) {
dc021bcc
JB
1155 return
1156 }
008512c7
JB
1157 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1158 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1159 updateMeasurementStatistics(
1160 workerUsage.elu.active,
008512c7 1161 eluTaskStatisticsRequirements,
dc021bcc 1162 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1163 )
1164 updateMeasurementStatistics(
1165 workerUsage.elu.idle,
008512c7 1166 eluTaskStatisticsRequirements,
dc021bcc 1167 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1168 )
008512c7 1169 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1170 if (message.taskPerformance?.elu != null) {
f7510105
JB
1171 if (workerUsage.elu.utilization != null) {
1172 workerUsage.elu.utilization =
1173 (workerUsage.elu.utilization +
1174 message.taskPerformance.elu.utilization) /
1175 2
1176 } else {
1177 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1178 }
62c15a68
JB
1179 }
1180 }
1181 }
1182
280c2a77 1183 /**
f06e48d8 1184 * Chooses a worker node for the next task.
280c2a77 1185 *
6c6afb84 1186 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1187 *
aa9eede8 1188 * @returns The chosen worker node key
280c2a77 1189 */
6c6afb84 1190 private chooseWorkerNode (): number {
930dcf12 1191 if (this.shallCreateDynamicWorker()) {
aa9eede8 1192 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1193 if (
b1aae695 1194 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1195 ) {
aa9eede8 1196 return workerNodeKey
6c6afb84 1197 }
17393ac8 1198 }
930dcf12
JB
1199 return this.workerChoiceStrategyContext.execute()
1200 }
1201
6c6afb84
JB
1202 /**
1203 * Conditions for dynamic worker creation.
1204 *
1205 * @returns Whether to create a dynamic worker or not.
1206 */
1207 private shallCreateDynamicWorker (): boolean {
930dcf12 1208 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1209 }
1210
280c2a77 1211 /**
aa9eede8 1212 * Sends a message to worker given its worker node key.
280c2a77 1213 *
aa9eede8 1214 * @param workerNodeKey - The worker node key.
38e795c1 1215 * @param message - The message.
7d91a8cd 1216 * @param transferList - The optional array of transferable objects.
280c2a77
S
1217 */
1218 protected abstract sendToWorker (
aa9eede8 1219 workerNodeKey: number,
7d91a8cd
JB
1220 message: MessageValue<Data>,
1221 transferList?: TransferListItem[]
280c2a77
S
1222 ): void
1223
729c563d 1224 /**
41344292 1225 * Creates a new worker.
6c6afb84
JB
1226 *
1227 * @returns Newly created worker.
729c563d 1228 */
280c2a77 1229 protected abstract createWorker (): Worker
c97c7edb 1230
4a6952ff 1231 /**
aa9eede8 1232 * Creates a new, completely set up worker node.
4a6952ff 1233 *
aa9eede8 1234 * @returns New, completely set up worker node key.
4a6952ff 1235 */
aa9eede8 1236 protected createAndSetupWorkerNode (): number {
bdacc2d2 1237 const worker = this.createWorker()
280c2a77 1238
fd04474e 1239 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1240 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1241 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1242 worker.on('error', error => {
aad6fb64 1243 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1244 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1245 workerInfo.ready = false
0dc838e3 1246 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1247 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1248 if (
b6bfca01 1249 this.started &&
9b38ab2d
JB
1250 !this.starting &&
1251 this.opts.restartWorkerOnError === true
15b176e0 1252 ) {
9b106837 1253 if (workerInfo.dynamic) {
aa9eede8 1254 this.createAndSetupDynamicWorkerNode()
8a1260a3 1255 } else {
aa9eede8 1256 this.createAndSetupWorkerNode()
8a1260a3 1257 }
5baee0d7 1258 }
9b38ab2d 1259 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1260 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1261 }
5baee0d7 1262 })
a35560ba 1263 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1264 worker.once('exit', () => {
f06e48d8 1265 this.removeWorkerNode(worker)
a974afa6 1266 })
280c2a77 1267
aa9eede8 1268 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1269
aa9eede8 1270 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1271
aa9eede8 1272 return workerNodeKey
c97c7edb 1273 }
be0676b3 1274
930dcf12 1275 /**
aa9eede8 1276 * Creates a new, completely set up dynamic worker node.
930dcf12 1277 *
aa9eede8 1278 * @returns New, completely set up dynamic worker node key.
930dcf12 1279 */
aa9eede8
JB
1280 protected createAndSetupDynamicWorkerNode (): number {
1281 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1282 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1283 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1284 message.workerId
aad6fb64 1285 )
aa9eede8 1286 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1287 // Kill message received from worker
930dcf12
JB
1288 if (
1289 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1290 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1291 ((this.opts.enableTasksQueue === false &&
aa9eede8 1292 workerUsage.tasks.executing === 0) ||
7b56f532 1293 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1294 workerUsage.tasks.executing === 0 &&
1295 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1296 ) {
041dc05b 1297 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1298 this.emitter?.emit(PoolEvents.error, error)
1299 })
930dcf12
JB
1300 }
1301 })
46b0bb09 1302 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1303 this.sendToWorker(workerNodeKey, {
e9dd5b66 1304 checkActive: true
21f710aa 1305 })
72ae84a2
JB
1306 if (this.taskFunctions.size > 0) {
1307 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1308 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1309 taskFunctionOperation: 'add',
1310 taskFunctionName,
1311 taskFunction: taskFunction.toString()
1312 }).catch(error => {
1313 this.emitter?.emit(PoolEvents.error, error)
1314 })
1315 }
1316 }
b5e113f6 1317 workerInfo.dynamic = true
b1aae695
JB
1318 if (
1319 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1320 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1321 ) {
b5e113f6
JB
1322 workerInfo.ready = true
1323 }
33e6bb4c 1324 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1325 return workerNodeKey
930dcf12
JB
1326 }
1327
a2ed5053 1328 /**
aa9eede8 1329 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1330 *
aa9eede8 1331 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1332 * @param listener - The message listener callback.
1333 */
85aeb3f3
JB
1334 protected abstract registerWorkerMessageListener<
1335 Message extends Data | Response
aa9eede8
JB
1336 >(
1337 workerNodeKey: number,
1338 listener: (message: MessageValue<Message>) => void
1339 ): void
a2ed5053
JB
1340
1341 /**
aa9eede8 1342 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1343 * Can be overridden.
1344 *
aa9eede8 1345 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1346 */
aa9eede8 1347 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1348 // Listen to worker messages.
aa9eede8 1349 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1350 // Send the startup message to worker.
aa9eede8 1351 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1352 // Send the statistics message to worker.
1353 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1354 if (this.opts.enableTasksQueue === true) {
dbd73092 1355 if (this.opts.tasksQueueOptions?.taskStealing === true) {
47352846
JB
1356 this.workerNodes[workerNodeKey].onEmptyQueue =
1357 this.taskStealingOnEmptyQueue.bind(this)
1358 }
1359 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1360 this.workerNodes[workerNodeKey].onBackPressure =
1361 this.tasksStealingOnBackPressure.bind(this)
1362 }
72695f86 1363 }
d2c73f82
JB
1364 }
1365
85aeb3f3 1366 /**
aa9eede8
JB
1367 * Sends the startup message to worker given its worker node key.
1368 *
1369 * @param workerNodeKey - The worker node key.
1370 */
1371 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1372
1373 /**
9edb9717 1374 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1375 *
aa9eede8 1376 * @param workerNodeKey - The worker node key.
85aeb3f3 1377 */
9edb9717 1378 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1379 this.sendToWorker(workerNodeKey, {
1380 statistics: {
1381 runTime:
1382 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1383 .runTime.aggregate,
1384 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1385 .elu.aggregate
72ae84a2 1386 }
aa9eede8
JB
1387 })
1388 }
a2ed5053
JB
1389
1390 private redistributeQueuedTasks (workerNodeKey: number): void {
1391 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1392 const destinationWorkerNodeKey = this.workerNodes.reduce(
1393 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1394 return workerNode.info.ready &&
1395 workerNode.usage.tasks.queued <
1396 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1397 ? workerNodeKey
1398 : minWorkerNodeKey
1399 },
1400 0
1401 )
72ae84a2 1402 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1403 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1404 this.executeTask(destinationWorkerNodeKey, task)
1405 } else {
1406 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1407 }
1408 }
1409 }
1410
b1838604
JB
1411 private updateTaskStolenStatisticsWorkerUsage (
1412 workerNodeKey: number,
b1838604
JB
1413 taskName: string
1414 ): void {
1a880eca 1415 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1416 if (workerNode?.usage != null) {
1417 ++workerNode.usage.tasks.stolen
1418 }
1419 if (
1420 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1421 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1422 ) {
1423 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1424 taskName
1425 ) as WorkerUsage
1426 ++taskFunctionWorkerUsage.tasks.stolen
1427 }
1428 }
1429
dd951876 1430 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b 1431 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
dd951876 1432 const workerNodes = this.workerNodes
a6b3272b 1433 .slice()
dd951876
JB
1434 .sort(
1435 (workerNodeA, workerNodeB) =>
1436 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1437 )
f201a0cd 1438 const sourceWorkerNode = workerNodes.find(
041dc05b 1439 workerNode =>
f201a0cd
JB
1440 workerNode.info.ready &&
1441 workerNode.info.id !== workerId &&
1442 workerNode.usage.tasks.queued > 0
1443 )
1444 if (sourceWorkerNode != null) {
72ae84a2 1445 const task = sourceWorkerNode.popTask() as Task<Data>
f201a0cd
JB
1446 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1447 this.executeTask(destinationWorkerNodeKey, task)
1448 } else {
1449 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1450 }
f201a0cd
JB
1451 this.updateTaskStolenStatisticsWorkerUsage(
1452 destinationWorkerNodeKey,
1453 task.name as string
1454 )
72695f86
JB
1455 }
1456 }
1457
1458 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1459 const sizeOffset = 1
1460 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1461 return
1462 }
72695f86
JB
1463 const sourceWorkerNode =
1464 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1465 const workerNodes = this.workerNodes
a6b3272b 1466 .slice()
72695f86
JB
1467 .sort(
1468 (workerNodeA, workerNodeB) =>
1469 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1470 )
1471 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1472 if (
0bc68267 1473 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1474 workerNode.info.ready &&
1475 workerNode.info.id !== workerId &&
0bc68267 1476 workerNode.usage.tasks.queued <
f778c355 1477 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1478 ) {
72ae84a2 1479 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1480 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1481 this.executeTask(workerNodeKey, task)
4de3d785 1482 } else {
dd951876 1483 this.enqueueTask(workerNodeKey, task)
4de3d785 1484 }
b1838604
JB
1485 this.updateTaskStolenStatisticsWorkerUsage(
1486 workerNodeKey,
b1838604
JB
1487 task.name as string
1488 )
10ecf8fd 1489 }
a2ed5053
JB
1490 }
1491 }
1492
be0676b3 1493 /**
aa9eede8 1494 * This method is the listener registered for each worker message.
be0676b3 1495 *
bdacc2d2 1496 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1497 */
1498 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1499 return message => {
21f710aa 1500 this.checkMessageWorkerId(message)
6703b9f4 1501 if (message.ready != null && message.taskFunctionNames != null) {
81c02522 1502 // Worker ready response received from worker
10e2aa7e 1503 this.handleWorkerReadyResponse(message)
7629bdf1 1504 } else if (message.taskId != null) {
81c02522 1505 // Task execution response received from worker
6b272951 1506 this.handleTaskExecutionResponse(message)
6703b9f4
JB
1507 } else if (message.taskFunctionNames != null) {
1508 // Task function names message received from worker
46b0bb09
JB
1509 this.getWorkerInfo(
1510 this.getWorkerNodeKeyByWorkerId(message.workerId)
6703b9f4 1511 ).taskFunctionNames = message.taskFunctionNames
6b272951
JB
1512 }
1513 }
1514 }
1515
10e2aa7e 1516 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c 1517 if (message.ready === false) {
72ae84a2
JB
1518 throw new Error(
1519 `Worker ${message.workerId as number} failed to initialize`
1520 )
f05ed93c 1521 }
a5d15204 1522 const workerInfo = this.getWorkerInfo(
aad6fb64 1523 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1524 )
a5d15204 1525 workerInfo.ready = message.ready as boolean
6703b9f4 1526 workerInfo.taskFunctionNames = message.taskFunctionNames
9b38ab2d
JB
1527 if (this.ready) {
1528 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1529 }
6b272951
JB
1530 }
1531
1532 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
6703b9f4 1533 const { taskId, workerError, data } = message
5441aea6 1534 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1535 if (promiseResponse != null) {
6703b9f4
JB
1536 if (workerError != null) {
1537 this.emitter?.emit(PoolEvents.taskError, workerError)
1538 promiseResponse.reject(workerError.message)
6b272951 1539 } else {
5441aea6 1540 promiseResponse.resolve(data as Response)
6b272951 1541 }
501aea93
JB
1542 const workerNodeKey = promiseResponse.workerNodeKey
1543 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1544 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1545 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1546 if (
1547 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1548 this.tasksQueueSize(workerNodeKey) > 0 &&
1549 this.workerNodes[workerNodeKey].usage.tasks.executing <
1550 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1551 ) {
1552 this.executeTask(
1553 workerNodeKey,
1554 this.dequeueTask(workerNodeKey) as Task<Data>
1555 )
be0676b3
APA
1556 }
1557 }
be0676b3 1558 }
7c0ba920 1559
a1763c54 1560 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1561 if (this.busy) {
1562 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1563 }
1564 }
1565
1566 private checkAndEmitTaskQueuingEvents (): void {
1567 if (this.hasBackPressure()) {
1568 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1569 }
1570 }
1571
33e6bb4c
JB
1572 private checkAndEmitDynamicWorkerCreationEvents (): void {
1573 if (this.type === PoolTypes.dynamic) {
1574 if (this.full) {
1575 this.emitter?.emit(PoolEvents.full, this.info)
1576 }
1577 }
1578 }
1579
8a1260a3 1580 /**
aa9eede8 1581 * Gets the worker information given its worker node key.
8a1260a3
JB
1582 *
1583 * @param workerNodeKey - The worker node key.
3f09ed9f 1584 * @returns The worker information.
8a1260a3 1585 */
46b0bb09
JB
1586 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1587 return this.workerNodes[workerNodeKey].info
e221309a
JB
1588 }
1589
a05c10de 1590 /**
b0a4db63 1591 * Adds the given worker in the pool worker nodes.
ea7a90d3 1592 *
38e795c1 1593 * @param worker - The worker.
aa9eede8
JB
1594 * @returns The added worker node key.
1595 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1596 */
b0a4db63 1597 private addWorkerNode (worker: Worker): number {
671d5154
JB
1598 const workerNode = new WorkerNode<Worker, Data>(
1599 worker,
ff3f866a 1600 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1601 )
b97d82d8 1602 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1603 if (this.starting) {
1604 workerNode.info.ready = true
1605 }
aa9eede8 1606 this.workerNodes.push(workerNode)
aad6fb64 1607 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1608 if (workerNodeKey === -1) {
86ed0598 1609 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1610 }
1611 return workerNodeKey
ea7a90d3 1612 }
c923ce56 1613
51fe3d3c 1614 /**
f06e48d8 1615 * Removes the given worker from the pool worker nodes.
51fe3d3c 1616 *
f06e48d8 1617 * @param worker - The worker.
51fe3d3c 1618 */
416fd65c 1619 private removeWorkerNode (worker: Worker): void {
aad6fb64 1620 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1621 if (workerNodeKey !== -1) {
1622 this.workerNodes.splice(workerNodeKey, 1)
1623 this.workerChoiceStrategyContext.remove(workerNodeKey)
1624 }
51fe3d3c 1625 }
adc3c320 1626
e2b31e32
JB
1627 /** @inheritDoc */
1628 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1629 return (
e2b31e32
JB
1630 this.opts.enableTasksQueue === true &&
1631 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1632 )
1633 }
1634
1635 private hasBackPressure (): boolean {
1636 return (
1637 this.opts.enableTasksQueue === true &&
1638 this.workerNodes.findIndex(
041dc05b 1639 workerNode => !workerNode.hasBackPressure()
a1763c54 1640 ) === -1
9e844245 1641 )
e2b31e32
JB
1642 }
1643
b0a4db63 1644 /**
aa9eede8 1645 * Executes the given task on the worker given its worker node key.
b0a4db63 1646 *
aa9eede8 1647 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1648 * @param task - The task to execute.
1649 */
2e81254d 1650 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1651 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1652 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1653 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1654 }
1655
f9f00b5f 1656 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1657 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1658 this.checkAndEmitTaskQueuingEvents()
1659 return tasksQueueSize
adc3c320
JB
1660 }
1661
416fd65c 1662 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1663 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1664 }
1665
416fd65c 1666 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1667 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1668 }
1669
81c02522 1670 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1671 while (this.tasksQueueSize(workerNodeKey) > 0) {
1672 this.executeTask(
1673 workerNodeKey,
1674 this.dequeueTask(workerNodeKey) as Task<Data>
1675 )
ff733df7 1676 }
4b628b48 1677 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1678 }
1679
ef41a6e6
JB
1680 private flushTasksQueues (): void {
1681 for (const [workerNodeKey] of this.workerNodes.entries()) {
1682 this.flushTasksQueue(workerNodeKey)
1683 }
1684 }
c97c7edb 1685}