Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
afe0d5bf 17 median,
e4f20deb
JB
18 round,
19 updateMeasurementStatistics
bbeadd16 20} from '../utils'
59317253 21import { KillBehaviors } from '../worker/worker-options'
c4855468 22import {
65d7a1c9 23 type IPool,
7c5a1080 24 PoolEmitter,
c4855468 25 PoolEvents,
6b27d407 26 type PoolInfo,
c4855468 27 type PoolOptions,
6b27d407
JB
28 type PoolType,
29 PoolTypes,
4b628b48 30 type TasksQueueOptions
c4855468 31} from './pool'
bbfa38a2
JB
32import type {
33 IWorker,
34 IWorkerNode,
35 WorkerInfo,
36 WorkerType,
37 WorkerUsage
e102732c 38} from './worker'
a35560ba 39import {
008512c7 40 type MeasurementStatisticsRequirements,
f0d7f803 41 Measurements,
a35560ba 42 WorkerChoiceStrategies,
a20f0ba5
JB
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
bdaf31cd
JB
45} from './selection-strategies/selection-strategies-types'
46import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 47import { version } from './version'
4b628b48 48import { WorkerNode } from './worker-node'
23ccf9d7 49
729c563d 50/**
ea7a90d3 51 * Base class that implements some shared logic for all poolifier pools.
729c563d 52 *
38e795c1 53 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 56 */
c97c7edb 57export abstract class AbstractPool<
f06e48d8 58 Worker extends IWorker,
d3c8a1a8
S
59 Data = unknown,
60 Response = unknown
c4855468 61> implements IPool<Worker, Data, Response> {
afc003b2 62 /** @inheritDoc */
4b628b48 63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 64
afc003b2 65 /** @inheritDoc */
7c0ba920
JB
66 public readonly emitter?: PoolEmitter
67
be0676b3 68 /**
52b71763 69 * The task execution response promise map.
be0676b3 70 *
2740a743 71 * - `key`: The message id of each submitted task.
a3445496 72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 73 *
a3445496 74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 75 */
501aea93
JB
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
8735b4e5
JB
88 /**
89 * Dynamic pool maximum size property placeholder.
90 */
91 protected readonly max?: number
92
075e51d1 93 /**
adc9cc64 94 * Whether the pool is starting or not.
075e51d1
JB
95 */
96 private readonly starting: boolean
15b176e0
JB
97 /**
98 * Whether the pool is started or not.
99 */
100 private started: boolean
afe0d5bf
JB
101 /**
102 * The start timestamp of the pool.
103 */
104 private readonly startTimestamp
105
729c563d
S
106 /**
107 * Constructs a new poolifier pool.
108 *
38e795c1 109 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 110 * @param filePath - Path to the worker file.
38e795c1 111 * @param opts - Options for the pool.
729c563d 112 */
c97c7edb 113 public constructor (
b4213b7f
JB
114 protected readonly numberOfWorkers: number,
115 protected readonly filePath: string,
116 protected readonly opts: PoolOptions<Worker>
c97c7edb 117 ) {
78cea37e 118 if (!this.isMain()) {
04f45163 119 throw new Error(
8c6d4acf 120 'Cannot start a pool from a worker with the same type as the pool'
04f45163 121 )
c97c7edb 122 }
8d3782fa 123 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 124 this.checkFilePath(this.filePath)
7c0ba920 125 this.checkPoolOptions(this.opts)
1086026a 126
7254e419
JB
127 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
128 this.executeTask = this.executeTask.bind(this)
129 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 130
6bd72cd0 131 if (this.opts.enableEvents === true) {
7c0ba920
JB
132 this.emitter = new PoolEmitter()
133 }
d59df138
JB
134 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
135 Worker,
136 Data,
137 Response
da309861
JB
138 >(
139 this,
140 this.opts.workerChoiceStrategy,
141 this.opts.workerChoiceStrategyOptions
142 )
b6b32453
JB
143
144 this.setupHook()
145
075e51d1 146 this.starting = true
e761c033 147 this.startPool()
075e51d1 148 this.starting = false
15b176e0 149 this.started = true
afe0d5bf
JB
150
151 this.startTimestamp = performance.now()
c97c7edb
S
152 }
153
a35560ba 154 private checkFilePath (filePath: string): void {
ffcbbad8
JB
155 if (
156 filePath == null ||
3d6dd312 157 typeof filePath !== 'string' ||
ffcbbad8
JB
158 (typeof filePath === 'string' && filePath.trim().length === 0)
159 ) {
c510fea7
APA
160 throw new Error('Please specify a file with a worker implementation')
161 }
3d6dd312
JB
162 if (!existsSync(filePath)) {
163 throw new Error(`Cannot find the worker file '${filePath}'`)
164 }
c510fea7
APA
165 }
166
8d3782fa
JB
167 private checkNumberOfWorkers (numberOfWorkers: number): void {
168 if (numberOfWorkers == null) {
169 throw new Error(
170 'Cannot instantiate a pool without specifying the number of workers'
171 )
78cea37e 172 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 173 throw new TypeError(
0d80593b 174 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
175 )
176 } else if (numberOfWorkers < 0) {
473c717a 177 throw new RangeError(
8d3782fa
JB
178 'Cannot instantiate a pool with a negative number of workers'
179 )
6b27d407 180 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
181 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
182 }
183 }
184
185 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 186 if (this.type === PoolTypes.dynamic) {
a5ed75b7 187 if (max == null) {
e695d66f 188 throw new TypeError(
a5ed75b7
JB
189 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
190 )
191 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
192 throw new TypeError(
193 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
194 )
195 } else if (min > max) {
079de991
JB
196 throw new RangeError(
197 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
198 )
b97d82d8 199 } else if (max === 0) {
079de991 200 throw new RangeError(
d640b48b 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
202 )
203 } else if (min === max) {
204 throw new RangeError(
205 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
206 )
207 }
8d3782fa
JB
208 }
209 }
210
7c0ba920 211 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
212 if (isPlainObject(opts)) {
213 this.opts.workerChoiceStrategy =
214 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
215 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
216 this.opts.workerChoiceStrategyOptions = {
217 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
218 ...opts.workerChoiceStrategyOptions
219 }
49be33fe
JB
220 this.checkValidWorkerChoiceStrategyOptions(
221 this.opts.workerChoiceStrategyOptions
222 )
1f68cede 223 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
224 this.opts.enableEvents = opts.enableEvents ?? true
225 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
226 if (this.opts.enableTasksQueue) {
227 this.checkValidTasksQueueOptions(
228 opts.tasksQueueOptions as TasksQueueOptions
229 )
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 236 }
aee46736
JB
237 }
238
239 private checkValidWorkerChoiceStrategy (
240 workerChoiceStrategy: WorkerChoiceStrategy
241 ): void {
242 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 243 throw new Error(
aee46736 244 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
245 )
246 }
7c0ba920
JB
247 }
248
0d80593b
JB
249 private checkValidWorkerChoiceStrategyOptions (
250 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
251 ): void {
252 if (!isPlainObject(workerChoiceStrategyOptions)) {
253 throw new TypeError(
254 'Invalid worker choice strategy options: must be a plain object'
255 )
256 }
8990357d
JB
257 if (
258 workerChoiceStrategyOptions.choiceRetries != null &&
259 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
260 ) {
261 throw new TypeError(
262 'Invalid worker choice strategy options: choice retries must be an integer'
263 )
264 }
265 if (
266 workerChoiceStrategyOptions.choiceRetries != null &&
267 workerChoiceStrategyOptions.choiceRetries <= 0
268 ) {
269 throw new RangeError(
270 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
271 )
272 }
49be33fe
JB
273 if (
274 workerChoiceStrategyOptions.weights != null &&
6b27d407 275 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
276 ) {
277 throw new Error(
278 'Invalid worker choice strategy options: must have a weight for each worker node'
279 )
280 }
f0d7f803
JB
281 if (
282 workerChoiceStrategyOptions.measurement != null &&
283 !Object.values(Measurements).includes(
284 workerChoiceStrategyOptions.measurement
285 )
286 ) {
287 throw new Error(
288 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
289 )
290 }
0d80593b
JB
291 }
292
a20f0ba5 293 private checkValidTasksQueueOptions (
ff3f866a 294 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 295 ): void {
0d80593b
JB
296 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
297 throw new TypeError('Invalid tasks queue options: must be a plain object')
298 }
f0d7f803
JB
299 if (
300 tasksQueueOptions?.concurrency != null &&
301 !Number.isSafeInteger(tasksQueueOptions.concurrency)
302 ) {
303 throw new TypeError(
20c6f652 304 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
305 )
306 }
307 if (
308 tasksQueueOptions?.concurrency != null &&
309 tasksQueueOptions.concurrency <= 0
310 ) {
e695d66f 311 throw new RangeError(
20c6f652
JB
312 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
313 )
314 }
315 if (
316 tasksQueueOptions?.queueMaxSize != null &&
ff3f866a 317 tasksQueueOptions?.size != null
20c6f652 318 ) {
ff3f866a
JB
319 throw new Error(
320 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
20c6f652
JB
321 )
322 }
ff3f866a
JB
323 if (tasksQueueOptions?.queueMaxSize != null) {
324 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
325 }
20c6f652 326 if (
ff3f866a
JB
327 tasksQueueOptions?.size != null &&
328 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 329 ) {
ff3f866a
JB
330 throw new TypeError(
331 'Invalid worker node tasks queue max size: must be an integer'
332 )
333 }
334 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 335 throw new RangeError(
ff3f866a 336 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
337 )
338 }
339 }
340
e761c033
JB
341 private startPool (): void {
342 while (
343 this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
346 0
347 ) < this.numberOfWorkers
348 ) {
aa9eede8 349 this.createAndSetupWorkerNode()
e761c033
JB
350 }
351 }
352
08f3f44c 353 /** @inheritDoc */
6b27d407
JB
354 public get info (): PoolInfo {
355 return {
23ccf9d7 356 version,
6b27d407 357 type: this.type,
184855e6 358 worker: this.worker,
2431bdb4
JB
359 ready: this.ready,
360 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
361 minSize: this.minSize,
362 maxSize: this.maxSize,
c05f0d50
JB
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate &&
1305e9a8
JB
365 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
366 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
367 workerNodes: this.workerNodes.length,
368 idleWorkerNodes: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
f59e1027 370 workerNode.usage.tasks.executing === 0
a4e07f72
JB
371 ? accumulator + 1
372 : accumulator,
6b27d407
JB
373 0
374 ),
375 busyWorkerNodes: this.workerNodes.reduce(
376 (accumulator, workerNode) =>
f59e1027 377 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
378 0
379 ),
a4e07f72 380 executedTasks: this.workerNodes.reduce(
6b27d407 381 (accumulator, workerNode) =>
f59e1027 382 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
383 0
384 ),
385 executingTasks: this.workerNodes.reduce(
386 (accumulator, workerNode) =>
f59e1027 387 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
388 0
389 ),
daf86646
JB
390 ...(this.opts.enableTasksQueue === true && {
391 queuedTasks: this.workerNodes.reduce(
392 (accumulator, workerNode) =>
393 accumulator + workerNode.usage.tasks.queued,
394 0
395 )
396 }),
397 ...(this.opts.enableTasksQueue === true && {
398 maxQueuedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
401 0
402 )
403 }),
a1763c54
JB
404 ...(this.opts.enableTasksQueue === true && {
405 backPressure: this.hasBackPressure()
406 }),
a4e07f72
JB
407 failedTasks: this.workerNodes.reduce(
408 (accumulator, workerNode) =>
f59e1027 409 accumulator + workerNode.usage.tasks.failed,
a4e07f72 410 0
1dcf8b7b
JB
411 ),
412 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
413 .runTime.aggregate && {
414 runTime: {
98e72cda
JB
415 minimum: round(
416 Math.min(
417 ...this.workerNodes.map(
8ebe6c30 418 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 419 )
1dcf8b7b
JB
420 )
421 ),
98e72cda
JB
422 maximum: round(
423 Math.max(
424 ...this.workerNodes.map(
8ebe6c30 425 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 426 )
1dcf8b7b 427 )
98e72cda
JB
428 ),
429 average: round(
430 this.workerNodes.reduce(
431 (accumulator, workerNode) =>
432 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
433 0
434 ) /
435 this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 accumulator + (workerNode.usage.tasks?.executed ?? 0),
438 0
439 )
440 ),
441 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
442 .runTime.median && {
443 median: round(
444 median(
445 this.workerNodes.map(
8ebe6c30 446 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
447 )
448 )
449 )
450 })
1dcf8b7b
JB
451 }
452 }),
453 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
454 .waitTime.aggregate && {
455 waitTime: {
98e72cda
JB
456 minimum: round(
457 Math.min(
458 ...this.workerNodes.map(
8ebe6c30 459 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 460 )
1dcf8b7b
JB
461 )
462 ),
98e72cda
JB
463 maximum: round(
464 Math.max(
465 ...this.workerNodes.map(
8ebe6c30 466 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 467 )
1dcf8b7b 468 )
98e72cda
JB
469 ),
470 average: round(
471 this.workerNodes.reduce(
472 (accumulator, workerNode) =>
473 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
474 0
475 ) /
476 this.workerNodes.reduce(
477 (accumulator, workerNode) =>
478 accumulator + (workerNode.usage.tasks?.executed ?? 0),
479 0
480 )
481 ),
482 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
483 .waitTime.median && {
484 median: round(
485 median(
486 this.workerNodes.map(
8ebe6c30 487 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
488 )
489 )
490 )
491 })
1dcf8b7b
JB
492 }
493 })
6b27d407
JB
494 }
495 }
08f3f44c 496
aa9eede8
JB
497 /**
498 * The pool readiness boolean status.
499 */
2431bdb4
JB
500 private get ready (): boolean {
501 return (
b97d82d8
JB
502 this.workerNodes.reduce(
503 (accumulator, workerNode) =>
504 !workerNode.info.dynamic && workerNode.info.ready
505 ? accumulator + 1
506 : accumulator,
507 0
508 ) >= this.minSize
2431bdb4
JB
509 )
510 }
511
afe0d5bf 512 /**
aa9eede8 513 * The approximate pool utilization.
afe0d5bf
JB
514 *
515 * @returns The pool utilization.
516 */
517 private get utilization (): number {
8e5ca040 518 const poolTimeCapacity =
fe7d90db 519 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
520 const totalTasksRunTime = this.workerNodes.reduce(
521 (accumulator, workerNode) =>
71514351 522 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
523 0
524 )
525 const totalTasksWaitTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
71514351 527 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
528 0
529 )
8e5ca040 530 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
531 }
532
8881ae32 533 /**
aa9eede8 534 * The pool type.
8881ae32
JB
535 *
536 * If it is `'dynamic'`, it provides the `max` property.
537 */
538 protected abstract get type (): PoolType
539
184855e6 540 /**
aa9eede8 541 * The worker type.
184855e6
JB
542 */
543 protected abstract get worker (): WorkerType
544
c2ade475 545 /**
aa9eede8 546 * The pool minimum size.
c2ade475 547 */
8735b4e5
JB
548 protected get minSize (): number {
549 return this.numberOfWorkers
550 }
ff733df7
JB
551
552 /**
aa9eede8 553 * The pool maximum size.
ff733df7 554 */
8735b4e5
JB
555 protected get maxSize (): number {
556 return this.max ?? this.numberOfWorkers
557 }
a35560ba 558
6b813701
JB
559 /**
560 * Checks if the worker id sent in the received message from a worker is valid.
561 *
562 * @param message - The received message.
563 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
564 */
21f710aa 565 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
566 if (message.workerId == null) {
567 throw new Error('Worker message received without worker id')
568 } else if (
21f710aa 569 message.workerId != null &&
aad6fb64 570 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
571 ) {
572 throw new Error(
573 `Worker message received from unknown worker '${message.workerId}'`
574 )
575 }
576 }
577
ffcbbad8 578 /**
f06e48d8 579 * Gets the given worker its worker node key.
ffcbbad8
JB
580 *
581 * @param worker - The worker.
f59e1027 582 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 583 */
aad6fb64 584 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 585 return this.workerNodes.findIndex(
8ebe6c30 586 (workerNode) => workerNode.worker === worker
f06e48d8 587 )
bf9549ae
JB
588 }
589
aa9eede8
JB
590 /**
591 * Gets the worker node key given its worker id.
592 *
593 * @param workerId - The worker id.
aad6fb64 594 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 595 */
aad6fb64
JB
596 private getWorkerNodeKeyByWorkerId (workerId: number): number {
597 return this.workerNodes.findIndex(
8ebe6c30 598 (workerNode) => workerNode.info.id === workerId
aad6fb64 599 )
aa9eede8
JB
600 }
601
afc003b2 602 /** @inheritDoc */
a35560ba 603 public setWorkerChoiceStrategy (
59219cbb
JB
604 workerChoiceStrategy: WorkerChoiceStrategy,
605 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 606 ): void {
aee46736 607 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 608 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
609 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
610 this.opts.workerChoiceStrategy
611 )
612 if (workerChoiceStrategyOptions != null) {
613 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
614 }
aa9eede8 615 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 616 workerNode.resetUsage()
9edb9717 617 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 618 }
a20f0ba5
JB
619 }
620
621 /** @inheritDoc */
622 public setWorkerChoiceStrategyOptions (
623 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
624 ): void {
0d80593b 625 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
626 this.opts.workerChoiceStrategyOptions = {
627 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
628 ...workerChoiceStrategyOptions
629 }
a20f0ba5
JB
630 this.workerChoiceStrategyContext.setOptions(
631 this.opts.workerChoiceStrategyOptions
a35560ba
S
632 )
633 }
634
a20f0ba5 635 /** @inheritDoc */
8f52842f
JB
636 public enableTasksQueue (
637 enable: boolean,
638 tasksQueueOptions?: TasksQueueOptions
639 ): void {
a20f0ba5 640 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 641 this.flushTasksQueues()
a20f0ba5
JB
642 }
643 this.opts.enableTasksQueue = enable
8f52842f 644 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
645 }
646
647 /** @inheritDoc */
8f52842f 648 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 649 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
650 this.checkValidTasksQueueOptions(tasksQueueOptions)
651 this.opts.tasksQueueOptions =
652 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 653 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 654 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
655 delete this.opts.tasksQueueOptions
656 }
657 }
658
ff3f866a 659 private setTasksQueueMaxSize (size: number): void {
20c6f652 660 for (const workerNode of this.workerNodes) {
ff3f866a 661 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
662 }
663 }
664
a20f0ba5
JB
665 private buildTasksQueueOptions (
666 tasksQueueOptions: TasksQueueOptions
667 ): TasksQueueOptions {
668 return {
20c6f652 669 ...{
ff3f866a 670 size: Math.pow(this.maxSize, 2),
20c6f652
JB
671 concurrency: 1
672 },
673 ...tasksQueueOptions
a20f0ba5
JB
674 }
675 }
676
c319c66b
JB
677 /**
678 * Whether the pool is full or not.
679 *
680 * The pool filling boolean status.
681 */
dea903a8
JB
682 protected get full (): boolean {
683 return this.workerNodes.length >= this.maxSize
684 }
c2ade475 685
c319c66b
JB
686 /**
687 * Whether the pool is busy or not.
688 *
689 * The pool busyness boolean status.
690 */
691 protected abstract get busy (): boolean
7c0ba920 692
6c6afb84 693 /**
3d76750a 694 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
695 *
696 * @returns Worker nodes busyness boolean status.
697 */
c2ade475 698 protected internalBusy (): boolean {
3d76750a
JB
699 if (this.opts.enableTasksQueue === true) {
700 return (
701 this.workerNodes.findIndex(
8ebe6c30 702 (workerNode) =>
3d76750a
JB
703 workerNode.info.ready &&
704 workerNode.usage.tasks.executing <
705 (this.opts.tasksQueueOptions?.concurrency as number)
706 ) === -1
707 )
708 } else {
709 return (
710 this.workerNodes.findIndex(
8ebe6c30 711 (workerNode) =>
3d76750a
JB
712 workerNode.info.ready && workerNode.usage.tasks.executing === 0
713 ) === -1
714 )
715 }
cb70b19d
JB
716 }
717
90d7d101
JB
718 /** @inheritDoc */
719 public listTaskFunctions (): string[] {
f2dbbf95
JB
720 for (const workerNode of this.workerNodes) {
721 if (
722 Array.isArray(workerNode.info.taskFunctions) &&
723 workerNode.info.taskFunctions.length > 0
724 ) {
725 return workerNode.info.taskFunctions
726 }
90d7d101 727 }
f2dbbf95 728 return []
90d7d101
JB
729 }
730
afc003b2 731 /** @inheritDoc */
7d91a8cd
JB
732 public async execute (
733 data?: Data,
734 name?: string,
735 transferList?: TransferListItem[]
736 ): Promise<Response> {
52b71763 737 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
738 if (!this.started) {
739 reject(new Error('Cannot execute a task on destroyed pool'))
740 }
7d91a8cd
JB
741 if (name != null && typeof name !== 'string') {
742 reject(new TypeError('name argument must be a string'))
743 }
90d7d101
JB
744 if (
745 name != null &&
746 typeof name === 'string' &&
747 name.trim().length === 0
748 ) {
f58b60b9 749 reject(new TypeError('name argument must not be an empty string'))
90d7d101 750 }
b558f6b5
JB
751 if (transferList != null && !Array.isArray(transferList)) {
752 reject(new TypeError('transferList argument must be an array'))
753 }
754 const timestamp = performance.now()
755 const workerNodeKey = this.chooseWorkerNode()
94407def 756 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
757 if (
758 name != null &&
a5d15204
JB
759 Array.isArray(workerInfo.taskFunctions) &&
760 !workerInfo.taskFunctions.includes(name)
cea399c8 761 ) {
90d7d101
JB
762 reject(
763 new Error(`Task function '${name}' is not registered in the pool`)
764 )
765 }
501aea93 766 const task: Task<Data> = {
52b71763
JB
767 name: name ?? DEFAULT_TASK_NAME,
768 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
769 data: data ?? ({} as Data),
7d91a8cd 770 transferList,
52b71763 771 timestamp,
a5d15204 772 workerId: workerInfo.id as number,
7629bdf1 773 taskId: randomUUID()
52b71763 774 }
7629bdf1 775 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
776 resolve,
777 reject,
501aea93 778 workerNodeKey
2e81254d 779 })
52b71763 780 if (
4e377863
JB
781 this.opts.enableTasksQueue === false ||
782 (this.opts.enableTasksQueue === true &&
783 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 784 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 785 ) {
501aea93 786 this.executeTask(workerNodeKey, task)
4e377863
JB
787 } else {
788 this.enqueueTask(workerNodeKey, task)
52b71763 789 }
2e81254d 790 })
280c2a77 791 }
c97c7edb 792
afc003b2 793 /** @inheritDoc */
c97c7edb 794 public async destroy (): Promise<void> {
1fbcaa7c 795 await Promise.all(
81c02522 796 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 797 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
798 })
799 )
33e6bb4c 800 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 801 this.started = false
c97c7edb
S
802 }
803
1e3214b6
JB
804 protected async sendKillMessageToWorker (
805 workerNodeKey: number,
806 workerId: number
807 ): Promise<void> {
9edb9717 808 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
809 this.registerWorkerMessageListener(workerNodeKey, (message) => {
810 if (message.kill === 'success') {
811 resolve()
812 } else if (message.kill === 'failure') {
e1af34e6 813 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
814 }
815 })
9edb9717 816 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 817 })
1e3214b6
JB
818 }
819
4a6952ff 820 /**
aa9eede8 821 * Terminates the worker node given its worker node key.
4a6952ff 822 *
aa9eede8 823 * @param workerNodeKey - The worker node key.
4a6952ff 824 */
81c02522 825 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 826
729c563d 827 /**
6677a3d3
JB
828 * Setup hook to execute code before worker nodes are created in the abstract constructor.
829 * Can be overridden.
afc003b2
JB
830 *
831 * @virtual
729c563d 832 */
280c2a77 833 protected setupHook (): void {
d99ba5a8 834 // Intentionally empty
280c2a77 835 }
c97c7edb 836
729c563d 837 /**
280c2a77
S
838 * Should return whether the worker is the main worker or not.
839 */
840 protected abstract isMain (): boolean
841
842 /**
2e81254d 843 * Hook executed before the worker task execution.
bf9549ae 844 * Can be overridden.
729c563d 845 *
f06e48d8 846 * @param workerNodeKey - The worker node key.
1c6fe997 847 * @param task - The task to execute.
729c563d 848 */
1c6fe997
JB
849 protected beforeTaskExecutionHook (
850 workerNodeKey: number,
851 task: Task<Data>
852 ): void {
94407def
JB
853 if (this.workerNodes[workerNodeKey]?.usage != null) {
854 const workerUsage = this.workerNodes[workerNodeKey].usage
855 ++workerUsage.tasks.executing
856 this.updateWaitTimeWorkerUsage(workerUsage, task)
857 }
858 if (
859 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
860 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
861 task.name as string
862 ) != null
863 ) {
db0e38ee 864 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 865 workerNodeKey
db0e38ee 866 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
867 ++taskFunctionWorkerUsage.tasks.executing
868 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 869 }
c97c7edb
S
870 }
871
c01733f1 872 /**
2e81254d 873 * Hook executed after the worker task execution.
bf9549ae 874 * Can be overridden.
c01733f1 875 *
501aea93 876 * @param workerNodeKey - The worker node key.
38e795c1 877 * @param message - The received message.
c01733f1 878 */
2e81254d 879 protected afterTaskExecutionHook (
501aea93 880 workerNodeKey: number,
2740a743 881 message: MessageValue<Response>
bf9549ae 882 ): void {
94407def
JB
883 if (this.workerNodes[workerNodeKey]?.usage != null) {
884 const workerUsage = this.workerNodes[workerNodeKey].usage
885 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
886 this.updateRunTimeWorkerUsage(workerUsage, message)
887 this.updateEluWorkerUsage(workerUsage, message)
888 }
889 if (
890 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
891 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 892 message.taskPerformance?.name as string
94407def
JB
893 ) != null
894 ) {
db0e38ee 895 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 896 workerNodeKey
db0e38ee 897 ].getTaskFunctionWorkerUsage(
0628755c 898 message.taskPerformance?.name as string
b558f6b5 899 ) as WorkerUsage
db0e38ee
JB
900 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
901 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
902 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
903 }
904 }
905
db0e38ee
JB
906 /**
907 * Whether the worker node shall update its task function worker usage or not.
908 *
909 * @param workerNodeKey - The worker node key.
910 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
911 */
912 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 913 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 914 return (
94407def 915 workerInfo != null &&
a5d15204 916 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 917 workerInfo.taskFunctions.length > 2
b558f6b5 918 )
f1c06930
JB
919 }
920
921 private updateTaskStatisticsWorkerUsage (
922 workerUsage: WorkerUsage,
923 message: MessageValue<Response>
924 ): void {
a4e07f72 925 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
926 if (
927 workerTaskStatistics.executing != null &&
928 workerTaskStatistics.executing > 0
929 ) {
930 --workerTaskStatistics.executing
5bb5be17 931 }
98e72cda
JB
932 if (message.taskError == null) {
933 ++workerTaskStatistics.executed
934 } else {
a4e07f72 935 ++workerTaskStatistics.failed
2740a743 936 }
f8eb0a2a
JB
937 }
938
a4e07f72
JB
939 private updateRunTimeWorkerUsage (
940 workerUsage: WorkerUsage,
f8eb0a2a
JB
941 message: MessageValue<Response>
942 ): void {
e4f20deb
JB
943 updateMeasurementStatistics(
944 workerUsage.runTime,
945 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
946 message.taskPerformance?.runTime ?? 0,
947 workerUsage.tasks.executed
948 )
f8eb0a2a
JB
949 }
950
a4e07f72
JB
951 private updateWaitTimeWorkerUsage (
952 workerUsage: WorkerUsage,
1c6fe997 953 task: Task<Data>
f8eb0a2a 954 ): void {
1c6fe997
JB
955 const timestamp = performance.now()
956 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
957 updateMeasurementStatistics(
958 workerUsage.waitTime,
959 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
960 taskWaitTime,
961 workerUsage.tasks.executed
962 )
c01733f1 963 }
964
a4e07f72 965 private updateEluWorkerUsage (
5df69fab 966 workerUsage: WorkerUsage,
62c15a68
JB
967 message: MessageValue<Response>
968 ): void {
008512c7
JB
969 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
970 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
971 updateMeasurementStatistics(
972 workerUsage.elu.active,
008512c7 973 eluTaskStatisticsRequirements,
e4f20deb
JB
974 message.taskPerformance?.elu?.active ?? 0,
975 workerUsage.tasks.executed
976 )
977 updateMeasurementStatistics(
978 workerUsage.elu.idle,
008512c7 979 eluTaskStatisticsRequirements,
e4f20deb
JB
980 message.taskPerformance?.elu?.idle ?? 0,
981 workerUsage.tasks.executed
982 )
008512c7 983 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 984 if (message.taskPerformance?.elu != null) {
f7510105
JB
985 if (workerUsage.elu.utilization != null) {
986 workerUsage.elu.utilization =
987 (workerUsage.elu.utilization +
988 message.taskPerformance.elu.utilization) /
989 2
990 } else {
991 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
992 }
62c15a68
JB
993 }
994 }
995 }
996
280c2a77 997 /**
f06e48d8 998 * Chooses a worker node for the next task.
280c2a77 999 *
6c6afb84 1000 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1001 *
aa9eede8 1002 * @returns The chosen worker node key
280c2a77 1003 */
6c6afb84 1004 private chooseWorkerNode (): number {
930dcf12 1005 if (this.shallCreateDynamicWorker()) {
aa9eede8 1006 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1007 if (
b1aae695 1008 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1009 ) {
aa9eede8 1010 return workerNodeKey
6c6afb84 1011 }
17393ac8 1012 }
930dcf12
JB
1013 return this.workerChoiceStrategyContext.execute()
1014 }
1015
6c6afb84
JB
1016 /**
1017 * Conditions for dynamic worker creation.
1018 *
1019 * @returns Whether to create a dynamic worker or not.
1020 */
1021 private shallCreateDynamicWorker (): boolean {
930dcf12 1022 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1023 }
1024
280c2a77 1025 /**
aa9eede8 1026 * Sends a message to worker given its worker node key.
280c2a77 1027 *
aa9eede8 1028 * @param workerNodeKey - The worker node key.
38e795c1 1029 * @param message - The message.
7d91a8cd 1030 * @param transferList - The optional array of transferable objects.
280c2a77
S
1031 */
1032 protected abstract sendToWorker (
aa9eede8 1033 workerNodeKey: number,
7d91a8cd
JB
1034 message: MessageValue<Data>,
1035 transferList?: TransferListItem[]
280c2a77
S
1036 ): void
1037
729c563d 1038 /**
41344292 1039 * Creates a new worker.
6c6afb84
JB
1040 *
1041 * @returns Newly created worker.
729c563d 1042 */
280c2a77 1043 protected abstract createWorker (): Worker
c97c7edb 1044
4a6952ff 1045 /**
aa9eede8 1046 * Creates a new, completely set up worker node.
4a6952ff 1047 *
aa9eede8 1048 * @returns New, completely set up worker node key.
4a6952ff 1049 */
aa9eede8 1050 protected createAndSetupWorkerNode (): number {
bdacc2d2 1051 const worker = this.createWorker()
280c2a77 1052
fd04474e 1053 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1054 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1055 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1056 worker.on('error', (error) => {
aad6fb64 1057 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1058 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1059 workerInfo.ready = false
0dc838e3 1060 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1061 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1062 if (
1063 this.opts.restartWorkerOnError === true &&
1064 !this.starting &&
1065 this.started
1066 ) {
9b106837 1067 if (workerInfo.dynamic) {
aa9eede8 1068 this.createAndSetupDynamicWorkerNode()
8a1260a3 1069 } else {
aa9eede8 1070 this.createAndSetupWorkerNode()
8a1260a3 1071 }
5baee0d7 1072 }
19dbc45b 1073 if (this.opts.enableTasksQueue === true) {
9b106837 1074 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1075 }
5baee0d7 1076 })
a35560ba 1077 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1078 worker.once('exit', () => {
f06e48d8 1079 this.removeWorkerNode(worker)
a974afa6 1080 })
280c2a77 1081
aa9eede8 1082 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1083
aa9eede8 1084 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1085
aa9eede8 1086 return workerNodeKey
c97c7edb 1087 }
be0676b3 1088
930dcf12 1089 /**
aa9eede8 1090 * Creates a new, completely set up dynamic worker node.
930dcf12 1091 *
aa9eede8 1092 * @returns New, completely set up dynamic worker node key.
930dcf12 1093 */
aa9eede8
JB
1094 protected createAndSetupDynamicWorkerNode (): number {
1095 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1096 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1097 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1098 message.workerId
aad6fb64 1099 )
aa9eede8 1100 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1101 // Kill message received from worker
930dcf12
JB
1102 if (
1103 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1104 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1105 ((this.opts.enableTasksQueue === false &&
aa9eede8 1106 workerUsage.tasks.executing === 0) ||
7b56f532 1107 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1108 workerUsage.tasks.executing === 0 &&
1109 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1110 ) {
5270d253
JB
1111 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1112 this.emitter?.emit(PoolEvents.error, error)
1113 })
930dcf12
JB
1114 }
1115 })
94407def 1116 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1117 this.sendToWorker(workerNodeKey, {
b0a4db63 1118 checkActive: true,
21f710aa
JB
1119 workerId: workerInfo.id as number
1120 })
b5e113f6 1121 workerInfo.dynamic = true
b1aae695
JB
1122 if (
1123 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1124 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1125 ) {
b5e113f6
JB
1126 workerInfo.ready = true
1127 }
33e6bb4c 1128 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1129 return workerNodeKey
930dcf12
JB
1130 }
1131
a2ed5053 1132 /**
aa9eede8 1133 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1134 *
aa9eede8 1135 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1136 * @param listener - The message listener callback.
1137 */
85aeb3f3
JB
1138 protected abstract registerWorkerMessageListener<
1139 Message extends Data | Response
aa9eede8
JB
1140 >(
1141 workerNodeKey: number,
1142 listener: (message: MessageValue<Message>) => void
1143 ): void
a2ed5053
JB
1144
1145 /**
aa9eede8 1146 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1147 * Can be overridden.
1148 *
aa9eede8 1149 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1150 */
aa9eede8 1151 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1152 // Listen to worker messages.
aa9eede8 1153 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1154 // Send the startup message to worker.
aa9eede8 1155 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1156 // Send the statistics message to worker.
1157 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86
JB
1158 if (this.opts.enableTasksQueue === true) {
1159 this.workerNodes[workerNodeKey].onBackPressure =
1160 this.tasksStealingOnBackPressure.bind(this)
1161 }
d2c73f82
JB
1162 }
1163
85aeb3f3 1164 /**
aa9eede8
JB
1165 * Sends the startup message to worker given its worker node key.
1166 *
1167 * @param workerNodeKey - The worker node key.
1168 */
1169 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1170
1171 /**
9edb9717 1172 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1173 *
aa9eede8 1174 * @param workerNodeKey - The worker node key.
85aeb3f3 1175 */
9edb9717 1176 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1177 this.sendToWorker(workerNodeKey, {
1178 statistics: {
1179 runTime:
1180 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1181 .runTime.aggregate,
1182 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1183 .elu.aggregate
1184 },
94407def 1185 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1186 })
1187 }
a2ed5053
JB
1188
1189 private redistributeQueuedTasks (workerNodeKey: number): void {
4de3d785
JB
1190 const workerNodes = this.workerNodes.filter(
1191 (_, workerNodeId) => workerNodeId !== workerNodeKey
1192 )
a2ed5053
JB
1193 while (this.tasksQueueSize(workerNodeKey) > 0) {
1194 let targetWorkerNodeKey: number = workerNodeKey
1195 let minQueuedTasks = Infinity
10ecf8fd 1196 let executeTask = false
4de3d785 1197 for (const [workerNodeId, workerNode] of workerNodes.entries()) {
72695f86 1198 if (
736671a1 1199 workerNode.usage.tasks.executing <
72695f86
JB
1200 (this.opts.tasksQueueOptions?.concurrency as number)
1201 ) {
1202 executeTask = true
1203 }
4de3d785 1204 if (workerNode.info.ready && workerNode.usage.tasks.queued === 0) {
a2ed5053
JB
1205 targetWorkerNodeKey = workerNodeId
1206 break
1207 }
1208 if (
72695f86 1209 workerNode.info.ready &&
a2ed5053
JB
1210 workerNode.usage.tasks.queued < minQueuedTasks
1211 ) {
1212 minQueuedTasks = workerNode.usage.tasks.queued
1213 targetWorkerNodeKey = workerNodeId
1214 }
1215 }
10ecf8fd
JB
1216 if (executeTask) {
1217 this.executeTask(
1218 targetWorkerNodeKey,
251d6ac2 1219 this.dequeueTask(workerNodeKey) as Task<Data>
10ecf8fd
JB
1220 )
1221 } else {
1222 this.enqueueTask(
1223 targetWorkerNodeKey,
251d6ac2 1224 this.dequeueTask(workerNodeKey) as Task<Data>
72695f86
JB
1225 )
1226 }
1227 }
1228 }
1229
1230 private tasksStealingOnBackPressure (workerId: number): void {
1231 const sourceWorkerNode =
1232 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1233 const workerNodes = this.workerNodes
1234 .filter((workerNode) => workerNode.info.id !== workerId)
1235 .sort(
1236 (workerNodeA, workerNodeB) =>
1237 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1238 )
1239 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1240 if (
1241 workerNode.info.ready &&
1242 sourceWorkerNode.usage.tasks.queued > 0 &&
4de3d785 1243 !workerNode.hasBackPressure()
72695f86 1244 ) {
4de3d785
JB
1245 if (
1246 workerNode.usage.tasks.executing <
72695f86 1247 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785
JB
1248 ) {
1249 this.executeTask(
1250 workerNodeKey,
1251 sourceWorkerNode.popTask() as Task<Data>
1252 )
1253 } else {
1254 this.enqueueTask(
1255 workerNodeKey,
1256 sourceWorkerNode.popTask() as Task<Data>
1257 )
1258 }
10ecf8fd 1259 }
a2ed5053
JB
1260 }
1261 }
1262
be0676b3 1263 /**
aa9eede8 1264 * This method is the listener registered for each worker message.
be0676b3 1265 *
bdacc2d2 1266 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1267 */
1268 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1269 return (message) => {
21f710aa 1270 this.checkMessageWorkerId(message)
a5d15204 1271 if (message.ready != null && message.taskFunctions != null) {
81c02522 1272 // Worker ready response received from worker
10e2aa7e 1273 this.handleWorkerReadyResponse(message)
7629bdf1 1274 } else if (message.taskId != null) {
81c02522 1275 // Task execution response received from worker
6b272951 1276 this.handleTaskExecutionResponse(message)
90d7d101
JB
1277 } else if (message.taskFunctions != null) {
1278 // Task functions message received from worker
94407def
JB
1279 (
1280 this.getWorkerInfo(
1281 this.getWorkerNodeKeyByWorkerId(message.workerId)
1282 ) as WorkerInfo
b558f6b5 1283 ).taskFunctions = message.taskFunctions
6b272951
JB
1284 }
1285 }
1286 }
1287
10e2aa7e 1288 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1289 if (message.ready === false) {
1290 throw new Error(`Worker ${message.workerId} failed to initialize`)
1291 }
a5d15204 1292 const workerInfo = this.getWorkerInfo(
aad6fb64 1293 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1294 ) as WorkerInfo
a5d15204
JB
1295 workerInfo.ready = message.ready as boolean
1296 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1297 if (this.emitter != null && this.ready) {
1298 this.emitter.emit(PoolEvents.ready, this.info)
1299 }
6b272951
JB
1300 }
1301
1302 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1303 const { taskId, taskError, data } = message
1304 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1305 if (promiseResponse != null) {
5441aea6
JB
1306 if (taskError != null) {
1307 this.emitter?.emit(PoolEvents.taskError, taskError)
1308 promiseResponse.reject(taskError.message)
6b272951 1309 } else {
5441aea6 1310 promiseResponse.resolve(data as Response)
6b272951 1311 }
501aea93
JB
1312 const workerNodeKey = promiseResponse.workerNodeKey
1313 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1314 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1315 if (
1316 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1317 this.tasksQueueSize(workerNodeKey) > 0 &&
1318 this.workerNodes[workerNodeKey].usage.tasks.executing <
1319 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1320 ) {
1321 this.executeTask(
1322 workerNodeKey,
1323 this.dequeueTask(workerNodeKey) as Task<Data>
1324 )
be0676b3 1325 }
6b272951 1326 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1327 }
be0676b3 1328 }
7c0ba920 1329
a1763c54 1330 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1331 if (this.busy) {
1332 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1333 }
1334 }
1335
1336 private checkAndEmitTaskQueuingEvents (): void {
1337 if (this.hasBackPressure()) {
1338 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1339 }
1340 }
1341
33e6bb4c
JB
1342 private checkAndEmitDynamicWorkerCreationEvents (): void {
1343 if (this.type === PoolTypes.dynamic) {
1344 if (this.full) {
1345 this.emitter?.emit(PoolEvents.full, this.info)
1346 }
1347 }
1348 }
1349
8a1260a3 1350 /**
aa9eede8 1351 * Gets the worker information given its worker node key.
8a1260a3
JB
1352 *
1353 * @param workerNodeKey - The worker node key.
3f09ed9f 1354 * @returns The worker information.
8a1260a3 1355 */
94407def
JB
1356 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1357 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1358 }
1359
a05c10de 1360 /**
b0a4db63 1361 * Adds the given worker in the pool worker nodes.
ea7a90d3 1362 *
38e795c1 1363 * @param worker - The worker.
aa9eede8
JB
1364 * @returns The added worker node key.
1365 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1366 */
b0a4db63 1367 private addWorkerNode (worker: Worker): number {
671d5154
JB
1368 const workerNode = new WorkerNode<Worker, Data>(
1369 worker,
1370 this.worker,
ff3f866a 1371 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1372 )
b97d82d8 1373 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1374 if (this.starting) {
1375 workerNode.info.ready = true
1376 }
aa9eede8 1377 this.workerNodes.push(workerNode)
aad6fb64 1378 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1379 if (workerNodeKey === -1) {
a5d15204 1380 throw new Error('Worker node added not found')
aa9eede8
JB
1381 }
1382 return workerNodeKey
ea7a90d3 1383 }
c923ce56 1384
51fe3d3c 1385 /**
f06e48d8 1386 * Removes the given worker from the pool worker nodes.
51fe3d3c 1387 *
f06e48d8 1388 * @param worker - The worker.
51fe3d3c 1389 */
416fd65c 1390 private removeWorkerNode (worker: Worker): void {
aad6fb64 1391 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1392 if (workerNodeKey !== -1) {
1393 this.workerNodes.splice(workerNodeKey, 1)
1394 this.workerChoiceStrategyContext.remove(workerNodeKey)
1395 }
51fe3d3c 1396 }
adc3c320 1397
e2b31e32
JB
1398 /** @inheritDoc */
1399 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1400 return (
e2b31e32
JB
1401 this.opts.enableTasksQueue === true &&
1402 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1403 )
1404 }
1405
1406 private hasBackPressure (): boolean {
1407 return (
1408 this.opts.enableTasksQueue === true &&
1409 this.workerNodes.findIndex(
1410 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1411 ) === -1
9e844245 1412 )
e2b31e32
JB
1413 }
1414
b0a4db63 1415 /**
aa9eede8 1416 * Executes the given task on the worker given its worker node key.
b0a4db63 1417 *
aa9eede8 1418 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1419 * @param task - The task to execute.
1420 */
2e81254d 1421 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1422 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1423 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1424 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1425 }
1426
f9f00b5f 1427 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1428 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1429 this.checkAndEmitTaskQueuingEvents()
1430 return tasksQueueSize
adc3c320
JB
1431 }
1432
416fd65c 1433 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1434 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1435 }
1436
416fd65c 1437 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1438 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1439 }
1440
81c02522 1441 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1442 while (this.tasksQueueSize(workerNodeKey) > 0) {
1443 this.executeTask(
1444 workerNodeKey,
1445 this.dequeueTask(workerNodeKey) as Task<Data>
1446 )
ff733df7 1447 }
4b628b48 1448 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1449 }
1450
ef41a6e6
JB
1451 private flushTasksQueues (): void {
1452 for (const [workerNodeKey] of this.workerNodes.entries()) {
1453 this.flushTasksQueue(workerNodeKey)
1454 }
1455 }
c97c7edb 1456}