Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
c4855468 24import {
65d7a1c9 25 type IPool,
7c5a1080 26 PoolEmitter,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
bbfa38a2
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
23ccf9d7 51
729c563d 52/**
ea7a90d3 53 * Base class that implements some shared logic for all poolifier pools.
729c563d 54 *
38e795c1 55 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractPool<
f06e48d8 60 Worker extends IWorker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
c4855468 63> implements IPool<Worker, Data, Response> {
afc003b2 64 /** @inheritDoc */
4b628b48 65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 66
afc003b2 67 /** @inheritDoc */
7c0ba920
JB
68 public readonly emitter?: PoolEmitter
69
be0676b3 70 /**
52b71763 71 * The task execution response promise map.
be0676b3 72 *
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
075e51d1 95 /**
adc9cc64 96 * Whether the pool is starting or not.
075e51d1
JB
97 */
98 private readonly starting: boolean
15b176e0
JB
99 /**
100 * Whether the pool is started or not.
101 */
102 private started: boolean
afe0d5bf
JB
103 /**
104 * The start timestamp of the pool.
105 */
106 private readonly startTimestamp
107
729c563d
S
108 /**
109 * Constructs a new poolifier pool.
110 *
38e795c1 111 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 112 * @param filePath - Path to the worker file.
38e795c1 113 * @param opts - Options for the pool.
729c563d 114 */
c97c7edb 115 public constructor (
b4213b7f
JB
116 protected readonly numberOfWorkers: number,
117 protected readonly filePath: string,
118 protected readonly opts: PoolOptions<Worker>
c97c7edb 119 ) {
78cea37e 120 if (!this.isMain()) {
04f45163 121 throw new Error(
8c6d4acf 122 'Cannot start a pool from a worker with the same type as the pool'
04f45163 123 )
c97c7edb 124 }
8d3782fa 125 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 126 this.checkFilePath(this.filePath)
7c0ba920 127 this.checkPoolOptions(this.opts)
1086026a 128
7254e419
JB
129 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
130 this.executeTask = this.executeTask.bind(this)
131 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 132
6bd72cd0 133 if (this.opts.enableEvents === true) {
7c0ba920
JB
134 this.emitter = new PoolEmitter()
135 }
d59df138
JB
136 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
137 Worker,
138 Data,
139 Response
da309861
JB
140 >(
141 this,
142 this.opts.workerChoiceStrategy,
143 this.opts.workerChoiceStrategyOptions
144 )
b6b32453
JB
145
146 this.setupHook()
147
075e51d1 148 this.starting = true
e761c033 149 this.startPool()
075e51d1 150 this.starting = false
15b176e0 151 this.started = true
afe0d5bf
JB
152
153 this.startTimestamp = performance.now()
c97c7edb
S
154 }
155
a35560ba 156 private checkFilePath (filePath: string): void {
ffcbbad8
JB
157 if (
158 filePath == null ||
3d6dd312 159 typeof filePath !== 'string' ||
ffcbbad8
JB
160 (typeof filePath === 'string' && filePath.trim().length === 0)
161 ) {
c510fea7
APA
162 throw new Error('Please specify a file with a worker implementation')
163 }
3d6dd312
JB
164 if (!existsSync(filePath)) {
165 throw new Error(`Cannot find the worker file '${filePath}'`)
166 }
c510fea7
APA
167 }
168
8d3782fa
JB
169 private checkNumberOfWorkers (numberOfWorkers: number): void {
170 if (numberOfWorkers == null) {
171 throw new Error(
172 'Cannot instantiate a pool without specifying the number of workers'
173 )
78cea37e 174 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 175 throw new TypeError(
0d80593b 176 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
177 )
178 } else if (numberOfWorkers < 0) {
473c717a 179 throw new RangeError(
8d3782fa
JB
180 'Cannot instantiate a pool with a negative number of workers'
181 )
6b27d407 182 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
183 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
184 }
185 }
186
187 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 188 if (this.type === PoolTypes.dynamic) {
a5ed75b7 189 if (max == null) {
e695d66f 190 throw new TypeError(
a5ed75b7
JB
191 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
192 )
193 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
194 throw new TypeError(
195 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
196 )
197 } else if (min > max) {
079de991
JB
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
200 )
b97d82d8 201 } else if (max === 0) {
079de991 202 throw new RangeError(
d640b48b 203 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
204 )
205 } else if (min === max) {
206 throw new RangeError(
207 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
208 )
209 }
8d3782fa
JB
210 }
211 }
212
7c0ba920 213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
214 if (isPlainObject(opts)) {
215 this.opts.workerChoiceStrategy =
216 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
217 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
218 this.opts.workerChoiceStrategyOptions = {
219 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
220 ...opts.workerChoiceStrategyOptions
221 }
49be33fe
JB
222 this.checkValidWorkerChoiceStrategyOptions(
223 this.opts.workerChoiceStrategyOptions
224 )
1f68cede 225 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
226 this.opts.enableEvents = opts.enableEvents ?? true
227 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
228 if (this.opts.enableTasksQueue) {
229 this.checkValidTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
233 opts.tasksQueueOptions as TasksQueueOptions
234 )
235 }
236 } else {
237 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 238 }
aee46736
JB
239 }
240
241 private checkValidWorkerChoiceStrategy (
242 workerChoiceStrategy: WorkerChoiceStrategy
243 ): void {
244 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 245 throw new Error(
aee46736 246 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
247 )
248 }
7c0ba920
JB
249 }
250
0d80593b
JB
251 private checkValidWorkerChoiceStrategyOptions (
252 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
253 ): void {
254 if (!isPlainObject(workerChoiceStrategyOptions)) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: must be a plain object'
257 )
258 }
8990357d 259 if (
8c0b113f
JB
260 workerChoiceStrategyOptions.retries != null &&
261 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
262 ) {
263 throw new TypeError(
8c0b113f 264 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
265 )
266 }
267 if (
8c0b113f
JB
268 workerChoiceStrategyOptions.retries != null &&
269 workerChoiceStrategyOptions.retries < 0
8990357d
JB
270 ) {
271 throw new RangeError(
8c0b113f 272 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
273 )
274 }
49be33fe
JB
275 if (
276 workerChoiceStrategyOptions.weights != null &&
6b27d407 277 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
278 ) {
279 throw new Error(
280 'Invalid worker choice strategy options: must have a weight for each worker node'
281 )
282 }
f0d7f803
JB
283 if (
284 workerChoiceStrategyOptions.measurement != null &&
285 !Object.values(Measurements).includes(
286 workerChoiceStrategyOptions.measurement
287 )
288 ) {
289 throw new Error(
290 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
291 )
292 }
0d80593b
JB
293 }
294
a20f0ba5 295 private checkValidTasksQueueOptions (
76d91ea0 296 tasksQueueOptions: TasksQueueOptions
a20f0ba5 297 ): void {
0d80593b
JB
298 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
299 throw new TypeError('Invalid tasks queue options: must be a plain object')
300 }
f0d7f803 301 if (
b7d085c4
JB
302 tasksQueueOptions?.concurrency != null &&
303 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
304 ) {
305 throw new TypeError(
20c6f652 306 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
307 )
308 }
309 if (
b7d085c4
JB
310 tasksQueueOptions?.concurrency != null &&
311 tasksQueueOptions?.concurrency <= 0
f0d7f803 312 ) {
e695d66f 313 throw new RangeError(
b7d085c4 314 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
315 )
316 }
b7d085c4 317 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 318 throw new Error(
68dbcdc0 319 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
320 )
321 }
322 if (
b7d085c4
JB
323 tasksQueueOptions?.size != null &&
324 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 325 ) {
ff3f866a 326 throw new TypeError(
68dbcdc0 327 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
328 )
329 }
b7d085c4 330 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 331 throw new RangeError(
b7d085c4 332 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
333 )
334 }
335 }
336
e761c033
JB
337 private startPool (): void {
338 while (
339 this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
342 0
343 ) < this.numberOfWorkers
344 ) {
aa9eede8 345 this.createAndSetupWorkerNode()
e761c033
JB
346 }
347 }
348
08f3f44c 349 /** @inheritDoc */
6b27d407
JB
350 public get info (): PoolInfo {
351 return {
23ccf9d7 352 version,
6b27d407 353 type: this.type,
184855e6 354 worker: this.worker,
2431bdb4
JB
355 ready: this.ready,
356 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
357 minSize: this.minSize,
358 maxSize: this.maxSize,
c05f0d50
JB
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate &&
1305e9a8
JB
361 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
363 workerNodes: this.workerNodes.length,
364 idleWorkerNodes: this.workerNodes.reduce(
365 (accumulator, workerNode) =>
f59e1027 366 workerNode.usage.tasks.executing === 0
a4e07f72
JB
367 ? accumulator + 1
368 : accumulator,
6b27d407
JB
369 0
370 ),
371 busyWorkerNodes: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
f59e1027 373 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
374 0
375 ),
a4e07f72 376 executedTasks: this.workerNodes.reduce(
6b27d407 377 (accumulator, workerNode) =>
f59e1027 378 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
379 0
380 ),
381 executingTasks: this.workerNodes.reduce(
382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
384 0
385 ),
daf86646
JB
386 ...(this.opts.enableTasksQueue === true && {
387 queuedTasks: this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + workerNode.usage.tasks.queued,
390 0
391 )
392 }),
393 ...(this.opts.enableTasksQueue === true && {
394 maxQueuedTasks: this.workerNodes.reduce(
395 (accumulator, workerNode) =>
396 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
397 0
398 )
399 }),
a1763c54
JB
400 ...(this.opts.enableTasksQueue === true && {
401 backPressure: this.hasBackPressure()
402 }),
68cbdc84
JB
403 ...(this.opts.enableTasksQueue === true && {
404 stolenTasks: this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + workerNode.usage.tasks.stolen,
407 0
408 )
409 }),
a4e07f72
JB
410 failedTasks: this.workerNodes.reduce(
411 (accumulator, workerNode) =>
f59e1027 412 accumulator + workerNode.usage.tasks.failed,
a4e07f72 413 0
1dcf8b7b
JB
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .runTime.aggregate && {
417 runTime: {
98e72cda 418 minimum: round(
90d6701c 419 min(
98e72cda 420 ...this.workerNodes.map(
8ebe6c30 421 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 422 )
1dcf8b7b
JB
423 )
424 ),
98e72cda 425 maximum: round(
90d6701c 426 max(
98e72cda 427 ...this.workerNodes.map(
8ebe6c30 428 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 429 )
1dcf8b7b 430 )
98e72cda 431 ),
3baa0837
JB
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.average && {
434 average: round(
435 average(
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.runTime.history),
439 []
440 )
98e72cda 441 )
dc021bcc 442 )
3baa0837 443 }),
98e72cda
JB
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .runTime.median && {
446 median: round(
447 median(
3baa0837
JB
448 this.workerNodes.reduce<number[]>(
449 (accumulator, workerNode) =>
450 accumulator.concat(workerNode.usage.runTime.history),
451 []
98e72cda
JB
452 )
453 )
454 )
455 })
1dcf8b7b
JB
456 }
457 }),
458 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
459 .waitTime.aggregate && {
460 waitTime: {
98e72cda 461 minimum: round(
90d6701c 462 min(
98e72cda 463 ...this.workerNodes.map(
8ebe6c30 464 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 465 )
1dcf8b7b
JB
466 )
467 ),
98e72cda 468 maximum: round(
90d6701c 469 max(
98e72cda 470 ...this.workerNodes.map(
8ebe6c30 471 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 472 )
1dcf8b7b 473 )
98e72cda 474 ),
3baa0837
JB
475 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
476 .waitTime.average && {
477 average: round(
478 average(
479 this.workerNodes.reduce<number[]>(
480 (accumulator, workerNode) =>
481 accumulator.concat(workerNode.usage.waitTime.history),
482 []
483 )
98e72cda 484 )
dc021bcc 485 )
3baa0837 486 }),
98e72cda
JB
487 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
488 .waitTime.median && {
489 median: round(
490 median(
3baa0837
JB
491 this.workerNodes.reduce<number[]>(
492 (accumulator, workerNode) =>
493 accumulator.concat(workerNode.usage.waitTime.history),
494 []
98e72cda
JB
495 )
496 )
497 )
498 })
1dcf8b7b
JB
499 }
500 })
6b27d407
JB
501 }
502 }
08f3f44c 503
aa9eede8
JB
504 /**
505 * The pool readiness boolean status.
506 */
2431bdb4
JB
507 private get ready (): boolean {
508 return (
b97d82d8
JB
509 this.workerNodes.reduce(
510 (accumulator, workerNode) =>
511 !workerNode.info.dynamic && workerNode.info.ready
512 ? accumulator + 1
513 : accumulator,
514 0
515 ) >= this.minSize
2431bdb4
JB
516 )
517 }
518
afe0d5bf 519 /**
aa9eede8 520 * The approximate pool utilization.
afe0d5bf
JB
521 *
522 * @returns The pool utilization.
523 */
524 private get utilization (): number {
8e5ca040 525 const poolTimeCapacity =
fe7d90db 526 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
527 const totalTasksRunTime = this.workerNodes.reduce(
528 (accumulator, workerNode) =>
71514351 529 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
530 0
531 )
532 const totalTasksWaitTime = this.workerNodes.reduce(
533 (accumulator, workerNode) =>
71514351 534 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
535 0
536 )
8e5ca040 537 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
538 }
539
8881ae32 540 /**
aa9eede8 541 * The pool type.
8881ae32
JB
542 *
543 * If it is `'dynamic'`, it provides the `max` property.
544 */
545 protected abstract get type (): PoolType
546
184855e6 547 /**
aa9eede8 548 * The worker type.
184855e6
JB
549 */
550 protected abstract get worker (): WorkerType
551
c2ade475 552 /**
aa9eede8 553 * The pool minimum size.
c2ade475 554 */
8735b4e5
JB
555 protected get minSize (): number {
556 return this.numberOfWorkers
557 }
ff733df7
JB
558
559 /**
aa9eede8 560 * The pool maximum size.
ff733df7 561 */
8735b4e5
JB
562 protected get maxSize (): number {
563 return this.max ?? this.numberOfWorkers
564 }
a35560ba 565
6b813701
JB
566 /**
567 * Checks if the worker id sent in the received message from a worker is valid.
568 *
569 * @param message - The received message.
570 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
571 */
21f710aa 572 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
573 if (message.workerId == null) {
574 throw new Error('Worker message received without worker id')
575 } else if (
21f710aa 576 message.workerId != null &&
aad6fb64 577 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
578 ) {
579 throw new Error(
580 `Worker message received from unknown worker '${message.workerId}'`
581 )
582 }
583 }
584
ffcbbad8 585 /**
f06e48d8 586 * Gets the given worker its worker node key.
ffcbbad8
JB
587 *
588 * @param worker - The worker.
f59e1027 589 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 590 */
aad6fb64 591 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 592 return this.workerNodes.findIndex(
8ebe6c30 593 (workerNode) => workerNode.worker === worker
f06e48d8 594 )
bf9549ae
JB
595 }
596
aa9eede8
JB
597 /**
598 * Gets the worker node key given its worker id.
599 *
600 * @param workerId - The worker id.
aad6fb64 601 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 602 */
aad6fb64
JB
603 private getWorkerNodeKeyByWorkerId (workerId: number): number {
604 return this.workerNodes.findIndex(
8ebe6c30 605 (workerNode) => workerNode.info.id === workerId
aad6fb64 606 )
aa9eede8
JB
607 }
608
afc003b2 609 /** @inheritDoc */
a35560ba 610 public setWorkerChoiceStrategy (
59219cbb
JB
611 workerChoiceStrategy: WorkerChoiceStrategy,
612 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 613 ): void {
aee46736 614 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 615 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
616 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
617 this.opts.workerChoiceStrategy
618 )
619 if (workerChoiceStrategyOptions != null) {
620 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
621 }
aa9eede8 622 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 623 workerNode.resetUsage()
9edb9717 624 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 625 }
a20f0ba5
JB
626 }
627
628 /** @inheritDoc */
629 public setWorkerChoiceStrategyOptions (
630 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
631 ): void {
0d80593b 632 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
633 this.opts.workerChoiceStrategyOptions = {
634 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
635 ...workerChoiceStrategyOptions
636 }
a20f0ba5
JB
637 this.workerChoiceStrategyContext.setOptions(
638 this.opts.workerChoiceStrategyOptions
a35560ba
S
639 )
640 }
641
a20f0ba5 642 /** @inheritDoc */
8f52842f
JB
643 public enableTasksQueue (
644 enable: boolean,
645 tasksQueueOptions?: TasksQueueOptions
646 ): void {
a20f0ba5 647 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 648 this.flushTasksQueues()
a20f0ba5
JB
649 }
650 this.opts.enableTasksQueue = enable
8f52842f 651 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
652 }
653
654 /** @inheritDoc */
8f52842f 655 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 656 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
657 this.checkValidTasksQueueOptions(tasksQueueOptions)
658 this.opts.tasksQueueOptions =
659 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 660 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 661 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
662 delete this.opts.tasksQueueOptions
663 }
664 }
665
5b49e864 666 private setTasksQueueSize (size: number): void {
20c6f652 667 for (const workerNode of this.workerNodes) {
ff3f866a 668 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
669 }
670 }
671
a20f0ba5
JB
672 private buildTasksQueueOptions (
673 tasksQueueOptions: TasksQueueOptions
674 ): TasksQueueOptions {
675 return {
20c6f652 676 ...{
ff3f866a 677 size: Math.pow(this.maxSize, 2),
20c6f652
JB
678 concurrency: 1
679 },
680 ...tasksQueueOptions
a20f0ba5
JB
681 }
682 }
683
c319c66b
JB
684 /**
685 * Whether the pool is full or not.
686 *
687 * The pool filling boolean status.
688 */
dea903a8
JB
689 protected get full (): boolean {
690 return this.workerNodes.length >= this.maxSize
691 }
c2ade475 692
c319c66b
JB
693 /**
694 * Whether the pool is busy or not.
695 *
696 * The pool busyness boolean status.
697 */
698 protected abstract get busy (): boolean
7c0ba920 699
6c6afb84 700 /**
3d76750a 701 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
702 *
703 * @returns Worker nodes busyness boolean status.
704 */
c2ade475 705 protected internalBusy (): boolean {
3d76750a
JB
706 if (this.opts.enableTasksQueue === true) {
707 return (
708 this.workerNodes.findIndex(
8ebe6c30 709 (workerNode) =>
3d76750a
JB
710 workerNode.info.ready &&
711 workerNode.usage.tasks.executing <
712 (this.opts.tasksQueueOptions?.concurrency as number)
713 ) === -1
714 )
715 } else {
716 return (
717 this.workerNodes.findIndex(
8ebe6c30 718 (workerNode) =>
3d76750a
JB
719 workerNode.info.ready && workerNode.usage.tasks.executing === 0
720 ) === -1
721 )
722 }
cb70b19d
JB
723 }
724
90d7d101
JB
725 /** @inheritDoc */
726 public listTaskFunctions (): string[] {
f2dbbf95
JB
727 for (const workerNode of this.workerNodes) {
728 if (
729 Array.isArray(workerNode.info.taskFunctions) &&
730 workerNode.info.taskFunctions.length > 0
731 ) {
732 return workerNode.info.taskFunctions
733 }
90d7d101 734 }
f2dbbf95 735 return []
90d7d101
JB
736 }
737
375f7504
JB
738 private shallExecuteTask (workerNodeKey: number): boolean {
739 return (
740 this.tasksQueueSize(workerNodeKey) === 0 &&
741 this.workerNodes[workerNodeKey].usage.tasks.executing <
742 (this.opts.tasksQueueOptions?.concurrency as number)
743 )
744 }
745
afc003b2 746 /** @inheritDoc */
7d91a8cd
JB
747 public async execute (
748 data?: Data,
749 name?: string,
750 transferList?: TransferListItem[]
751 ): Promise<Response> {
52b71763 752 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
753 if (!this.started) {
754 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 755 return
15b176e0 756 }
7d91a8cd
JB
757 if (name != null && typeof name !== 'string') {
758 reject(new TypeError('name argument must be a string'))
9d2d0da1 759 return
7d91a8cd 760 }
90d7d101
JB
761 if (
762 name != null &&
763 typeof name === 'string' &&
764 name.trim().length === 0
765 ) {
f58b60b9 766 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 767 return
90d7d101 768 }
b558f6b5
JB
769 if (transferList != null && !Array.isArray(transferList)) {
770 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 771 return
b558f6b5
JB
772 }
773 const timestamp = performance.now()
774 const workerNodeKey = this.chooseWorkerNode()
46b0bb09 775 const workerInfo = this.getWorkerInfo(workerNodeKey)
501aea93 776 const task: Task<Data> = {
52b71763
JB
777 name: name ?? DEFAULT_TASK_NAME,
778 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
779 data: data ?? ({} as Data),
7d91a8cd 780 transferList,
52b71763 781 timestamp,
a5d15204 782 workerId: workerInfo.id as number,
7629bdf1 783 taskId: randomUUID()
52b71763 784 }
7629bdf1 785 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
786 resolve,
787 reject,
501aea93 788 workerNodeKey
2e81254d 789 })
52b71763 790 if (
4e377863
JB
791 this.opts.enableTasksQueue === false ||
792 (this.opts.enableTasksQueue === true &&
375f7504 793 this.shallExecuteTask(workerNodeKey))
52b71763 794 ) {
501aea93 795 this.executeTask(workerNodeKey, task)
4e377863
JB
796 } else {
797 this.enqueueTask(workerNodeKey, task)
52b71763 798 }
2e81254d 799 })
280c2a77 800 }
c97c7edb 801
afc003b2 802 /** @inheritDoc */
c97c7edb 803 public async destroy (): Promise<void> {
1fbcaa7c 804 await Promise.all(
81c02522 805 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 806 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
807 })
808 )
33e6bb4c 809 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 810 this.started = false
c97c7edb
S
811 }
812
1e3214b6
JB
813 protected async sendKillMessageToWorker (
814 workerNodeKey: number,
815 workerId: number
816 ): Promise<void> {
9edb9717 817 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
818 this.registerWorkerMessageListener(workerNodeKey, (message) => {
819 if (message.kill === 'success') {
820 resolve()
821 } else if (message.kill === 'failure') {
e1af34e6 822 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
823 }
824 })
9edb9717 825 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 826 })
1e3214b6
JB
827 }
828
4a6952ff 829 /**
aa9eede8 830 * Terminates the worker node given its worker node key.
4a6952ff 831 *
aa9eede8 832 * @param workerNodeKey - The worker node key.
4a6952ff 833 */
81c02522 834 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 835
729c563d 836 /**
6677a3d3
JB
837 * Setup hook to execute code before worker nodes are created in the abstract constructor.
838 * Can be overridden.
afc003b2
JB
839 *
840 * @virtual
729c563d 841 */
280c2a77 842 protected setupHook (): void {
965df41c 843 /* Intentionally empty */
280c2a77 844 }
c97c7edb 845
729c563d 846 /**
280c2a77
S
847 * Should return whether the worker is the main worker or not.
848 */
849 protected abstract isMain (): boolean
850
851 /**
2e81254d 852 * Hook executed before the worker task execution.
bf9549ae 853 * Can be overridden.
729c563d 854 *
f06e48d8 855 * @param workerNodeKey - The worker node key.
1c6fe997 856 * @param task - The task to execute.
729c563d 857 */
1c6fe997
JB
858 protected beforeTaskExecutionHook (
859 workerNodeKey: number,
860 task: Task<Data>
861 ): void {
94407def
JB
862 if (this.workerNodes[workerNodeKey]?.usage != null) {
863 const workerUsage = this.workerNodes[workerNodeKey].usage
864 ++workerUsage.tasks.executing
865 this.updateWaitTimeWorkerUsage(workerUsage, task)
866 }
867 if (
868 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
869 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
870 task.name as string
871 ) != null
872 ) {
db0e38ee 873 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 874 workerNodeKey
db0e38ee 875 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
876 ++taskFunctionWorkerUsage.tasks.executing
877 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 878 }
c97c7edb
S
879 }
880
c01733f1 881 /**
2e81254d 882 * Hook executed after the worker task execution.
bf9549ae 883 * Can be overridden.
c01733f1 884 *
501aea93 885 * @param workerNodeKey - The worker node key.
38e795c1 886 * @param message - The received message.
c01733f1 887 */
2e81254d 888 protected afterTaskExecutionHook (
501aea93 889 workerNodeKey: number,
2740a743 890 message: MessageValue<Response>
bf9549ae 891 ): void {
94407def
JB
892 if (this.workerNodes[workerNodeKey]?.usage != null) {
893 const workerUsage = this.workerNodes[workerNodeKey].usage
894 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
895 this.updateRunTimeWorkerUsage(workerUsage, message)
896 this.updateEluWorkerUsage(workerUsage, message)
897 }
898 if (
899 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
900 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 901 message.taskPerformance?.name as string
94407def
JB
902 ) != null
903 ) {
db0e38ee 904 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 905 workerNodeKey
db0e38ee 906 ].getTaskFunctionWorkerUsage(
0628755c 907 message.taskPerformance?.name as string
b558f6b5 908 ) as WorkerUsage
db0e38ee
JB
909 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
910 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
911 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
912 }
913 }
914
db0e38ee
JB
915 /**
916 * Whether the worker node shall update its task function worker usage or not.
917 *
918 * @param workerNodeKey - The worker node key.
919 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
920 */
921 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 922 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 923 return (
94407def 924 workerInfo != null &&
a5d15204 925 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 926 workerInfo.taskFunctions.length > 2
b558f6b5 927 )
f1c06930
JB
928 }
929
930 private updateTaskStatisticsWorkerUsage (
931 workerUsage: WorkerUsage,
932 message: MessageValue<Response>
933 ): void {
a4e07f72 934 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
935 if (
936 workerTaskStatistics.executing != null &&
937 workerTaskStatistics.executing > 0
938 ) {
939 --workerTaskStatistics.executing
5bb5be17 940 }
98e72cda
JB
941 if (message.taskError == null) {
942 ++workerTaskStatistics.executed
943 } else {
a4e07f72 944 ++workerTaskStatistics.failed
2740a743 945 }
f8eb0a2a
JB
946 }
947
a4e07f72
JB
948 private updateRunTimeWorkerUsage (
949 workerUsage: WorkerUsage,
f8eb0a2a
JB
950 message: MessageValue<Response>
951 ): void {
dc021bcc
JB
952 if (message.taskError != null) {
953 return
954 }
e4f20deb
JB
955 updateMeasurementStatistics(
956 workerUsage.runTime,
957 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 958 message.taskPerformance?.runTime ?? 0
e4f20deb 959 )
f8eb0a2a
JB
960 }
961
a4e07f72
JB
962 private updateWaitTimeWorkerUsage (
963 workerUsage: WorkerUsage,
1c6fe997 964 task: Task<Data>
f8eb0a2a 965 ): void {
1c6fe997
JB
966 const timestamp = performance.now()
967 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
968 updateMeasurementStatistics(
969 workerUsage.waitTime,
970 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 971 taskWaitTime
e4f20deb 972 )
c01733f1 973 }
974
a4e07f72 975 private updateEluWorkerUsage (
5df69fab 976 workerUsage: WorkerUsage,
62c15a68
JB
977 message: MessageValue<Response>
978 ): void {
dc021bcc
JB
979 if (message.taskError != null) {
980 return
981 }
008512c7
JB
982 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
983 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
984 updateMeasurementStatistics(
985 workerUsage.elu.active,
008512c7 986 eluTaskStatisticsRequirements,
dc021bcc 987 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
988 )
989 updateMeasurementStatistics(
990 workerUsage.elu.idle,
008512c7 991 eluTaskStatisticsRequirements,
dc021bcc 992 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 993 )
008512c7 994 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 995 if (message.taskPerformance?.elu != null) {
f7510105
JB
996 if (workerUsage.elu.utilization != null) {
997 workerUsage.elu.utilization =
998 (workerUsage.elu.utilization +
999 message.taskPerformance.elu.utilization) /
1000 2
1001 } else {
1002 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1003 }
62c15a68
JB
1004 }
1005 }
1006 }
1007
280c2a77 1008 /**
f06e48d8 1009 * Chooses a worker node for the next task.
280c2a77 1010 *
6c6afb84 1011 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1012 *
aa9eede8 1013 * @returns The chosen worker node key
280c2a77 1014 */
6c6afb84 1015 private chooseWorkerNode (): number {
930dcf12 1016 if (this.shallCreateDynamicWorker()) {
aa9eede8 1017 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1018 if (
b1aae695 1019 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1020 ) {
aa9eede8 1021 return workerNodeKey
6c6afb84 1022 }
17393ac8 1023 }
930dcf12
JB
1024 return this.workerChoiceStrategyContext.execute()
1025 }
1026
6c6afb84
JB
1027 /**
1028 * Conditions for dynamic worker creation.
1029 *
1030 * @returns Whether to create a dynamic worker or not.
1031 */
1032 private shallCreateDynamicWorker (): boolean {
930dcf12 1033 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1034 }
1035
280c2a77 1036 /**
aa9eede8 1037 * Sends a message to worker given its worker node key.
280c2a77 1038 *
aa9eede8 1039 * @param workerNodeKey - The worker node key.
38e795c1 1040 * @param message - The message.
7d91a8cd 1041 * @param transferList - The optional array of transferable objects.
280c2a77
S
1042 */
1043 protected abstract sendToWorker (
aa9eede8 1044 workerNodeKey: number,
7d91a8cd
JB
1045 message: MessageValue<Data>,
1046 transferList?: TransferListItem[]
280c2a77
S
1047 ): void
1048
729c563d 1049 /**
41344292 1050 * Creates a new worker.
6c6afb84
JB
1051 *
1052 * @returns Newly created worker.
729c563d 1053 */
280c2a77 1054 protected abstract createWorker (): Worker
c97c7edb 1055
4a6952ff 1056 /**
aa9eede8 1057 * Creates a new, completely set up worker node.
4a6952ff 1058 *
aa9eede8 1059 * @returns New, completely set up worker node key.
4a6952ff 1060 */
aa9eede8 1061 protected createAndSetupWorkerNode (): number {
bdacc2d2 1062 const worker = this.createWorker()
280c2a77 1063
fd04474e 1064 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1065 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1066 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1067 worker.on('error', (error) => {
aad6fb64 1068 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1069 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1070 workerInfo.ready = false
0dc838e3 1071 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1072 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1073 if (
1074 this.opts.restartWorkerOnError === true &&
1075 !this.starting &&
1076 this.started
1077 ) {
9b106837 1078 if (workerInfo.dynamic) {
aa9eede8 1079 this.createAndSetupDynamicWorkerNode()
8a1260a3 1080 } else {
aa9eede8 1081 this.createAndSetupWorkerNode()
8a1260a3 1082 }
5baee0d7 1083 }
19dbc45b 1084 if (this.opts.enableTasksQueue === true) {
9b106837 1085 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1086 }
5baee0d7 1087 })
a35560ba 1088 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1089 worker.once('exit', () => {
f06e48d8 1090 this.removeWorkerNode(worker)
a974afa6 1091 })
280c2a77 1092
aa9eede8 1093 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1094
aa9eede8 1095 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1096
aa9eede8 1097 return workerNodeKey
c97c7edb 1098 }
be0676b3 1099
930dcf12 1100 /**
aa9eede8 1101 * Creates a new, completely set up dynamic worker node.
930dcf12 1102 *
aa9eede8 1103 * @returns New, completely set up dynamic worker node key.
930dcf12 1104 */
aa9eede8
JB
1105 protected createAndSetupDynamicWorkerNode (): number {
1106 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1107 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1108 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1109 message.workerId
aad6fb64 1110 )
aa9eede8 1111 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1112 // Kill message received from worker
930dcf12
JB
1113 if (
1114 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1115 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1116 ((this.opts.enableTasksQueue === false &&
aa9eede8 1117 workerUsage.tasks.executing === 0) ||
7b56f532 1118 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1119 workerUsage.tasks.executing === 0 &&
1120 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1121 ) {
5270d253
JB
1122 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1123 this.emitter?.emit(PoolEvents.error, error)
1124 })
930dcf12
JB
1125 }
1126 })
46b0bb09 1127 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1128 this.sendToWorker(workerNodeKey, {
b0a4db63 1129 checkActive: true,
21f710aa
JB
1130 workerId: workerInfo.id as number
1131 })
b5e113f6 1132 workerInfo.dynamic = true
b1aae695
JB
1133 if (
1134 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1135 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1136 ) {
b5e113f6
JB
1137 workerInfo.ready = true
1138 }
33e6bb4c 1139 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1140 return workerNodeKey
930dcf12
JB
1141 }
1142
a2ed5053 1143 /**
aa9eede8 1144 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1145 *
aa9eede8 1146 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1147 * @param listener - The message listener callback.
1148 */
85aeb3f3
JB
1149 protected abstract registerWorkerMessageListener<
1150 Message extends Data | Response
aa9eede8
JB
1151 >(
1152 workerNodeKey: number,
1153 listener: (message: MessageValue<Message>) => void
1154 ): void
a2ed5053
JB
1155
1156 /**
aa9eede8 1157 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1158 * Can be overridden.
1159 *
aa9eede8 1160 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1161 */
aa9eede8 1162 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1163 // Listen to worker messages.
aa9eede8 1164 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1165 // Send the startup message to worker.
aa9eede8 1166 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1167 // Send the statistics message to worker.
1168 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1169 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1170 this.workerNodes[workerNodeKey].onEmptyQueue =
1171 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1172 this.workerNodes[workerNodeKey].onBackPressure =
1173 this.tasksStealingOnBackPressure.bind(this)
1174 }
d2c73f82
JB
1175 }
1176
85aeb3f3 1177 /**
aa9eede8
JB
1178 * Sends the startup message to worker given its worker node key.
1179 *
1180 * @param workerNodeKey - The worker node key.
1181 */
1182 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1183
1184 /**
9edb9717 1185 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1186 *
aa9eede8 1187 * @param workerNodeKey - The worker node key.
85aeb3f3 1188 */
9edb9717 1189 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1190 this.sendToWorker(workerNodeKey, {
1191 statistics: {
1192 runTime:
1193 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1194 .runTime.aggregate,
1195 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1196 .elu.aggregate
1197 },
46b0bb09 1198 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1199 })
1200 }
a2ed5053
JB
1201
1202 private redistributeQueuedTasks (workerNodeKey: number): void {
1203 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1204 const destinationWorkerNodeKey = this.workerNodes.reduce(
1205 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1206 if (!workerNode.info.ready) {
1207 return minWorkerNodeKey
58baffd3 1208 }
f201a0cd
JB
1209 return workerNode.usage.tasks.queued <
1210 workerNodes[minWorkerNodeKey].usage.tasks.queued
1211 ? workerNodeKey
1212 : minWorkerNodeKey
1213 },
1214 0
1215 )
0bc68267 1216 if (destinationWorkerNodeKey != null) {
033f1776 1217 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1218 const task = {
1219 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1220 workerId: destinationWorkerNode.info.id as number
0bc68267 1221 }
375f7504 1222 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
0bc68267
JB
1223 this.executeTask(destinationWorkerNodeKey, task)
1224 } else {
1225 this.enqueueTask(destinationWorkerNodeKey, task)
1226 }
dd951876
JB
1227 }
1228 }
1229 }
1230
b1838604
JB
1231 private updateTaskStolenStatisticsWorkerUsage (
1232 workerNodeKey: number,
b1838604
JB
1233 taskName: string
1234 ): void {
1a880eca 1235 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1236 if (workerNode?.usage != null) {
1237 ++workerNode.usage.tasks.stolen
1238 }
1239 if (
1240 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1241 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1242 ) {
1243 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1244 taskName
1245 ) as WorkerUsage
1246 ++taskFunctionWorkerUsage.tasks.stolen
1247 }
1248 }
1249
dd951876 1250 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1251 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1252 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1253 const workerNodes = this.workerNodes
a6b3272b 1254 .slice()
dd951876
JB
1255 .sort(
1256 (workerNodeA, workerNodeB) =>
1257 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1258 )
f201a0cd
JB
1259 const sourceWorkerNode = workerNodes.find(
1260 (workerNode) =>
1261 workerNode.info.ready &&
1262 workerNode.info.id !== workerId &&
1263 workerNode.usage.tasks.queued > 0
1264 )
1265 if (sourceWorkerNode != null) {
1266 const task = {
1267 ...(sourceWorkerNode.popTask() as Task<Data>),
1268 workerId: destinationWorkerNode.info.id as number
0bc68267 1269 }
f201a0cd
JB
1270 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1271 this.executeTask(destinationWorkerNodeKey, task)
1272 } else {
1273 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1274 }
f201a0cd
JB
1275 this.updateTaskStolenStatisticsWorkerUsage(
1276 destinationWorkerNodeKey,
1277 task.name as string
1278 )
72695f86
JB
1279 }
1280 }
1281
1282 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1283 const sizeOffset = 1
1284 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1285 return
1286 }
72695f86
JB
1287 const sourceWorkerNode =
1288 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1289 const workerNodes = this.workerNodes
a6b3272b 1290 .slice()
72695f86
JB
1291 .sort(
1292 (workerNodeA, workerNodeB) =>
1293 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1294 )
1295 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1296 if (
0bc68267 1297 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1298 workerNode.info.ready &&
1299 workerNode.info.id !== workerId &&
0bc68267 1300 workerNode.usage.tasks.queued <
f778c355 1301 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1302 ) {
dd951876
JB
1303 const task = {
1304 ...(sourceWorkerNode.popTask() as Task<Data>),
1305 workerId: workerNode.info.id as number
1306 }
375f7504 1307 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1308 this.executeTask(workerNodeKey, task)
4de3d785 1309 } else {
dd951876 1310 this.enqueueTask(workerNodeKey, task)
4de3d785 1311 }
b1838604
JB
1312 this.updateTaskStolenStatisticsWorkerUsage(
1313 workerNodeKey,
b1838604
JB
1314 task.name as string
1315 )
10ecf8fd 1316 }
a2ed5053
JB
1317 }
1318 }
1319
be0676b3 1320 /**
aa9eede8 1321 * This method is the listener registered for each worker message.
be0676b3 1322 *
bdacc2d2 1323 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1324 */
1325 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1326 return (message) => {
21f710aa 1327 this.checkMessageWorkerId(message)
a5d15204 1328 if (message.ready != null && message.taskFunctions != null) {
81c02522 1329 // Worker ready response received from worker
10e2aa7e 1330 this.handleWorkerReadyResponse(message)
7629bdf1 1331 } else if (message.taskId != null) {
81c02522 1332 // Task execution response received from worker
6b272951 1333 this.handleTaskExecutionResponse(message)
90d7d101
JB
1334 } else if (message.taskFunctions != null) {
1335 // Task functions message received from worker
46b0bb09
JB
1336 this.getWorkerInfo(
1337 this.getWorkerNodeKeyByWorkerId(message.workerId)
b558f6b5 1338 ).taskFunctions = message.taskFunctions
6b272951
JB
1339 }
1340 }
1341 }
1342
10e2aa7e 1343 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1344 if (message.ready === false) {
1345 throw new Error(`Worker ${message.workerId} failed to initialize`)
1346 }
a5d15204 1347 const workerInfo = this.getWorkerInfo(
aad6fb64 1348 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1349 )
a5d15204
JB
1350 workerInfo.ready = message.ready as boolean
1351 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1352 if (this.emitter != null && this.ready) {
1353 this.emitter.emit(PoolEvents.ready, this.info)
1354 }
6b272951
JB
1355 }
1356
1357 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1358 const { taskId, taskError, data } = message
1359 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1360 if (promiseResponse != null) {
5441aea6
JB
1361 if (taskError != null) {
1362 this.emitter?.emit(PoolEvents.taskError, taskError)
1363 promiseResponse.reject(taskError.message)
6b272951 1364 } else {
5441aea6 1365 promiseResponse.resolve(data as Response)
6b272951 1366 }
501aea93
JB
1367 const workerNodeKey = promiseResponse.workerNodeKey
1368 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1369 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1370 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1371 if (
1372 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1373 this.tasksQueueSize(workerNodeKey) > 0 &&
1374 this.workerNodes[workerNodeKey].usage.tasks.executing <
1375 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1376 ) {
1377 this.executeTask(
1378 workerNodeKey,
1379 this.dequeueTask(workerNodeKey) as Task<Data>
1380 )
be0676b3
APA
1381 }
1382 }
be0676b3 1383 }
7c0ba920 1384
a1763c54 1385 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1386 if (this.busy) {
1387 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1388 }
1389 }
1390
1391 private checkAndEmitTaskQueuingEvents (): void {
1392 if (this.hasBackPressure()) {
1393 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1394 }
1395 }
1396
33e6bb4c
JB
1397 private checkAndEmitDynamicWorkerCreationEvents (): void {
1398 if (this.type === PoolTypes.dynamic) {
1399 if (this.full) {
1400 this.emitter?.emit(PoolEvents.full, this.info)
1401 }
1402 }
1403 }
1404
8a1260a3 1405 /**
aa9eede8 1406 * Gets the worker information given its worker node key.
8a1260a3
JB
1407 *
1408 * @param workerNodeKey - The worker node key.
3f09ed9f 1409 * @returns The worker information.
8a1260a3 1410 */
46b0bb09
JB
1411 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1412 return this.workerNodes[workerNodeKey].info
e221309a
JB
1413 }
1414
a05c10de 1415 /**
b0a4db63 1416 * Adds the given worker in the pool worker nodes.
ea7a90d3 1417 *
38e795c1 1418 * @param worker - The worker.
aa9eede8
JB
1419 * @returns The added worker node key.
1420 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1421 */
b0a4db63 1422 private addWorkerNode (worker: Worker): number {
671d5154
JB
1423 const workerNode = new WorkerNode<Worker, Data>(
1424 worker,
ff3f866a 1425 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1426 )
b97d82d8 1427 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1428 if (this.starting) {
1429 workerNode.info.ready = true
1430 }
aa9eede8 1431 this.workerNodes.push(workerNode)
aad6fb64 1432 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1433 if (workerNodeKey === -1) {
86ed0598 1434 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1435 }
1436 return workerNodeKey
ea7a90d3 1437 }
c923ce56 1438
51fe3d3c 1439 /**
f06e48d8 1440 * Removes the given worker from the pool worker nodes.
51fe3d3c 1441 *
f06e48d8 1442 * @param worker - The worker.
51fe3d3c 1443 */
416fd65c 1444 private removeWorkerNode (worker: Worker): void {
aad6fb64 1445 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1446 if (workerNodeKey !== -1) {
1447 this.workerNodes.splice(workerNodeKey, 1)
1448 this.workerChoiceStrategyContext.remove(workerNodeKey)
1449 }
51fe3d3c 1450 }
adc3c320 1451
e2b31e32
JB
1452 /** @inheritDoc */
1453 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1454 return (
e2b31e32
JB
1455 this.opts.enableTasksQueue === true &&
1456 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1457 )
1458 }
1459
1460 private hasBackPressure (): boolean {
1461 return (
1462 this.opts.enableTasksQueue === true &&
1463 this.workerNodes.findIndex(
1464 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1465 ) === -1
9e844245 1466 )
e2b31e32
JB
1467 }
1468
b0a4db63 1469 /**
aa9eede8 1470 * Executes the given task on the worker given its worker node key.
b0a4db63 1471 *
aa9eede8 1472 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1473 * @param task - The task to execute.
1474 */
2e81254d 1475 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1476 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1477 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1478 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1479 }
1480
f9f00b5f 1481 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1482 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1483 this.checkAndEmitTaskQueuingEvents()
1484 return tasksQueueSize
adc3c320
JB
1485 }
1486
416fd65c 1487 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1488 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1489 }
1490
416fd65c 1491 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1492 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1493 }
1494
81c02522 1495 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1496 while (this.tasksQueueSize(workerNodeKey) > 0) {
1497 this.executeTask(
1498 workerNodeKey,
1499 this.dequeueTask(workerNodeKey) as Task<Data>
1500 )
ff733df7 1501 }
4b628b48 1502 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1503 }
1504
ef41a6e6
JB
1505 private flushTasksQueues (): void {
1506 for (const [workerNodeKey] of this.workerNodes.entries()) {
1507 this.flushTasksQueue(workerNodeKey)
1508 }
1509 }
c97c7edb 1510}