Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
dc021bcc 15 average,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
afe0d5bf 18 median,
e4f20deb
JB
19 round,
20 updateMeasurementStatistics
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
bbfa38a2
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
23ccf9d7 50
729c563d 51/**
ea7a90d3 52 * Base class that implements some shared logic for all poolifier pools.
729c563d 53 *
38e795c1 54 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 57 */
c97c7edb 58export abstract class AbstractPool<
f06e48d8 59 Worker extends IWorker,
d3c8a1a8
S
60 Data = unknown,
61 Response = unknown
c4855468 62> implements IPool<Worker, Data, Response> {
afc003b2 63 /** @inheritDoc */
4b628b48 64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 65
afc003b2 66 /** @inheritDoc */
7c0ba920
JB
67 public readonly emitter?: PoolEmitter
68
be0676b3 69 /**
52b71763 70 * The task execution response promise map.
be0676b3 71 *
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
075e51d1 94 /**
adc9cc64 95 * Whether the pool is starting or not.
075e51d1
JB
96 */
97 private readonly starting: boolean
15b176e0
JB
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
075e51d1 147 this.starting = true
e761c033 148 this.startPool()
075e51d1 149 this.starting = false
15b176e0 150 this.started = true
afe0d5bf
JB
151
152 this.startTimestamp = performance.now()
c97c7edb
S
153 }
154
a35560ba 155 private checkFilePath (filePath: string): void {
ffcbbad8
JB
156 if (
157 filePath == null ||
3d6dd312 158 typeof filePath !== 'string' ||
ffcbbad8
JB
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
c510fea7
APA
161 throw new Error('Please specify a file with a worker implementation')
162 }
3d6dd312
JB
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
c510fea7
APA
166 }
167
8d3782fa
JB
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
78cea37e 173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 174 throw new TypeError(
0d80593b 175 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
176 )
177 } else if (numberOfWorkers < 0) {
473c717a 178 throw new RangeError(
8d3782fa
JB
179 'Cannot instantiate a pool with a negative number of workers'
180 )
6b27d407 181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 187 if (this.type === PoolTypes.dynamic) {
a5ed75b7 188 if (max == null) {
e695d66f 189 throw new TypeError(
a5ed75b7
JB
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
079de991
JB
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
b97d82d8 200 } else if (max === 0) {
079de991 201 throw new RangeError(
d640b48b 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
8d3782fa
JB
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
49be33fe
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 244 throw new Error(
aee46736 245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
246 )
247 }
7c0ba920
JB
248 }
249
0d80593b
JB
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
8990357d
JB
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
49be33fe
JB
274 if (
275 workerChoiceStrategyOptions.weights != null &&
6b27d407 276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
f0d7f803
JB
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
0d80593b
JB
292 }
293
a20f0ba5 294 private checkValidTasksQueueOptions (
ff3f866a 295 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 296 ): void {
0d80593b
JB
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
f0d7f803
JB
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
20c6f652 305 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
e695d66f 312 throw new RangeError(
20c6f652
JB
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
ff3f866a 318 tasksQueueOptions?.size != null
20c6f652 319 ) {
ff3f866a
JB
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
20c6f652
JB
322 )
323 }
ff3f866a
JB
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
20c6f652 327 if (
ff3f866a
JB
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 330 ) {
ff3f866a
JB
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 336 throw new RangeError(
ff3f866a 337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
338 )
339 }
340 }
341
e761c033
JB
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
aa9eede8 350 this.createAndSetupWorkerNode()
e761c033
JB
351 }
352 }
353
08f3f44c 354 /** @inheritDoc */
6b27d407
JB
355 public get info (): PoolInfo {
356 return {
23ccf9d7 357 version,
6b27d407 358 type: this.type,
184855e6 359 worker: this.worker,
2431bdb4
JB
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
362 minSize: this.minSize,
363 maxSize: this.maxSize,
c05f0d50
JB
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
1305e9a8
JB
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
f59e1027 371 workerNode.usage.tasks.executing === 0
a4e07f72
JB
372 ? accumulator + 1
373 : accumulator,
6b27d407
JB
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
379 0
380 ),
a4e07f72 381 executedTasks: this.workerNodes.reduce(
6b27d407 382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
f59e1027 388 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
389 0
390 ),
daf86646
JB
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
a1763c54
JB
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
a4e07f72
JB
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
f59e1027 410 accumulator + workerNode.usage.tasks.failed,
a4e07f72 411 0
1dcf8b7b
JB
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
98e72cda
JB
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
8ebe6c30 419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 420 )
1dcf8b7b
JB
421 )
422 ),
98e72cda
JB
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
8ebe6c30 426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 427 )
1dcf8b7b 428 )
98e72cda
JB
429 ),
430 average: round(
dc021bcc
JB
431 average(
432 this.workerNodes.reduce<number[]>(
98e72cda 433 (accumulator, workerNode) =>
dc021bcc
JB
434 accumulator.concat(workerNode.usage.runTime.history),
435 []
98e72cda 436 )
dc021bcc 437 )
98e72cda
JB
438 ),
439 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
440 .runTime.median && {
441 median: round(
442 median(
443 this.workerNodes.map(
8ebe6c30 444 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
445 )
446 )
447 )
448 })
1dcf8b7b
JB
449 }
450 }),
451 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
452 .waitTime.aggregate && {
453 waitTime: {
98e72cda
JB
454 minimum: round(
455 Math.min(
456 ...this.workerNodes.map(
8ebe6c30 457 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 458 )
1dcf8b7b
JB
459 )
460 ),
98e72cda
JB
461 maximum: round(
462 Math.max(
463 ...this.workerNodes.map(
8ebe6c30 464 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 465 )
1dcf8b7b 466 )
98e72cda
JB
467 ),
468 average: round(
dc021bcc
JB
469 average(
470 this.workerNodes.reduce<number[]>(
98e72cda 471 (accumulator, workerNode) =>
dc021bcc
JB
472 accumulator.concat(workerNode.usage.waitTime.history),
473 []
98e72cda 474 )
dc021bcc 475 )
98e72cda
JB
476 ),
477 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
478 .waitTime.median && {
479 median: round(
480 median(
481 this.workerNodes.map(
8ebe6c30 482 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
483 )
484 )
485 )
486 })
1dcf8b7b
JB
487 }
488 })
6b27d407
JB
489 }
490 }
08f3f44c 491
aa9eede8
JB
492 /**
493 * The pool readiness boolean status.
494 */
2431bdb4
JB
495 private get ready (): boolean {
496 return (
b97d82d8
JB
497 this.workerNodes.reduce(
498 (accumulator, workerNode) =>
499 !workerNode.info.dynamic && workerNode.info.ready
500 ? accumulator + 1
501 : accumulator,
502 0
503 ) >= this.minSize
2431bdb4
JB
504 )
505 }
506
afe0d5bf 507 /**
aa9eede8 508 * The approximate pool utilization.
afe0d5bf
JB
509 *
510 * @returns The pool utilization.
511 */
512 private get utilization (): number {
8e5ca040 513 const poolTimeCapacity =
fe7d90db 514 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
515 const totalTasksRunTime = this.workerNodes.reduce(
516 (accumulator, workerNode) =>
71514351 517 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
518 0
519 )
520 const totalTasksWaitTime = this.workerNodes.reduce(
521 (accumulator, workerNode) =>
71514351 522 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
523 0
524 )
8e5ca040 525 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
526 }
527
8881ae32 528 /**
aa9eede8 529 * The pool type.
8881ae32
JB
530 *
531 * If it is `'dynamic'`, it provides the `max` property.
532 */
533 protected abstract get type (): PoolType
534
184855e6 535 /**
aa9eede8 536 * The worker type.
184855e6
JB
537 */
538 protected abstract get worker (): WorkerType
539
c2ade475 540 /**
aa9eede8 541 * The pool minimum size.
c2ade475 542 */
8735b4e5
JB
543 protected get minSize (): number {
544 return this.numberOfWorkers
545 }
ff733df7
JB
546
547 /**
aa9eede8 548 * The pool maximum size.
ff733df7 549 */
8735b4e5
JB
550 protected get maxSize (): number {
551 return this.max ?? this.numberOfWorkers
552 }
a35560ba 553
6b813701
JB
554 /**
555 * Checks if the worker id sent in the received message from a worker is valid.
556 *
557 * @param message - The received message.
558 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
559 */
21f710aa 560 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
561 if (message.workerId == null) {
562 throw new Error('Worker message received without worker id')
563 } else if (
21f710aa 564 message.workerId != null &&
aad6fb64 565 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
566 ) {
567 throw new Error(
568 `Worker message received from unknown worker '${message.workerId}'`
569 )
570 }
571 }
572
ffcbbad8 573 /**
f06e48d8 574 * Gets the given worker its worker node key.
ffcbbad8
JB
575 *
576 * @param worker - The worker.
f59e1027 577 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 578 */
aad6fb64 579 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 580 return this.workerNodes.findIndex(
8ebe6c30 581 (workerNode) => workerNode.worker === worker
f06e48d8 582 )
bf9549ae
JB
583 }
584
aa9eede8
JB
585 /**
586 * Gets the worker node key given its worker id.
587 *
588 * @param workerId - The worker id.
aad6fb64 589 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 590 */
aad6fb64
JB
591 private getWorkerNodeKeyByWorkerId (workerId: number): number {
592 return this.workerNodes.findIndex(
8ebe6c30 593 (workerNode) => workerNode.info.id === workerId
aad6fb64 594 )
aa9eede8
JB
595 }
596
afc003b2 597 /** @inheritDoc */
a35560ba 598 public setWorkerChoiceStrategy (
59219cbb
JB
599 workerChoiceStrategy: WorkerChoiceStrategy,
600 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 601 ): void {
aee46736 602 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 603 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
604 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
605 this.opts.workerChoiceStrategy
606 )
607 if (workerChoiceStrategyOptions != null) {
608 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
609 }
aa9eede8 610 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 611 workerNode.resetUsage()
9edb9717 612 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 613 }
a20f0ba5
JB
614 }
615
616 /** @inheritDoc */
617 public setWorkerChoiceStrategyOptions (
618 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
619 ): void {
0d80593b 620 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
621 this.opts.workerChoiceStrategyOptions = {
622 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
623 ...workerChoiceStrategyOptions
624 }
a20f0ba5
JB
625 this.workerChoiceStrategyContext.setOptions(
626 this.opts.workerChoiceStrategyOptions
a35560ba
S
627 )
628 }
629
a20f0ba5 630 /** @inheritDoc */
8f52842f
JB
631 public enableTasksQueue (
632 enable: boolean,
633 tasksQueueOptions?: TasksQueueOptions
634 ): void {
a20f0ba5 635 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 636 this.flushTasksQueues()
a20f0ba5
JB
637 }
638 this.opts.enableTasksQueue = enable
8f52842f 639 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
640 }
641
642 /** @inheritDoc */
8f52842f 643 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 644 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
645 this.checkValidTasksQueueOptions(tasksQueueOptions)
646 this.opts.tasksQueueOptions =
647 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 648 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 649 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
650 delete this.opts.tasksQueueOptions
651 }
652 }
653
ff3f866a 654 private setTasksQueueMaxSize (size: number): void {
20c6f652 655 for (const workerNode of this.workerNodes) {
ff3f866a 656 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
657 }
658 }
659
a20f0ba5
JB
660 private buildTasksQueueOptions (
661 tasksQueueOptions: TasksQueueOptions
662 ): TasksQueueOptions {
663 return {
20c6f652 664 ...{
ff3f866a 665 size: Math.pow(this.maxSize, 2),
20c6f652
JB
666 concurrency: 1
667 },
668 ...tasksQueueOptions
a20f0ba5
JB
669 }
670 }
671
c319c66b
JB
672 /**
673 * Whether the pool is full or not.
674 *
675 * The pool filling boolean status.
676 */
dea903a8
JB
677 protected get full (): boolean {
678 return this.workerNodes.length >= this.maxSize
679 }
c2ade475 680
c319c66b
JB
681 /**
682 * Whether the pool is busy or not.
683 *
684 * The pool busyness boolean status.
685 */
686 protected abstract get busy (): boolean
7c0ba920 687
6c6afb84 688 /**
3d76750a 689 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
690 *
691 * @returns Worker nodes busyness boolean status.
692 */
c2ade475 693 protected internalBusy (): boolean {
3d76750a
JB
694 if (this.opts.enableTasksQueue === true) {
695 return (
696 this.workerNodes.findIndex(
8ebe6c30 697 (workerNode) =>
3d76750a
JB
698 workerNode.info.ready &&
699 workerNode.usage.tasks.executing <
700 (this.opts.tasksQueueOptions?.concurrency as number)
701 ) === -1
702 )
703 } else {
704 return (
705 this.workerNodes.findIndex(
8ebe6c30 706 (workerNode) =>
3d76750a
JB
707 workerNode.info.ready && workerNode.usage.tasks.executing === 0
708 ) === -1
709 )
710 }
cb70b19d
JB
711 }
712
90d7d101
JB
713 /** @inheritDoc */
714 public listTaskFunctions (): string[] {
f2dbbf95
JB
715 for (const workerNode of this.workerNodes) {
716 if (
717 Array.isArray(workerNode.info.taskFunctions) &&
718 workerNode.info.taskFunctions.length > 0
719 ) {
720 return workerNode.info.taskFunctions
721 }
90d7d101 722 }
f2dbbf95 723 return []
90d7d101
JB
724 }
725
afc003b2 726 /** @inheritDoc */
7d91a8cd
JB
727 public async execute (
728 data?: Data,
729 name?: string,
730 transferList?: TransferListItem[]
731 ): Promise<Response> {
52b71763 732 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
733 if (!this.started) {
734 reject(new Error('Cannot execute a task on destroyed pool'))
735 }
7d91a8cd
JB
736 if (name != null && typeof name !== 'string') {
737 reject(new TypeError('name argument must be a string'))
738 }
90d7d101
JB
739 if (
740 name != null &&
741 typeof name === 'string' &&
742 name.trim().length === 0
743 ) {
f58b60b9 744 reject(new TypeError('name argument must not be an empty string'))
90d7d101 745 }
b558f6b5
JB
746 if (transferList != null && !Array.isArray(transferList)) {
747 reject(new TypeError('transferList argument must be an array'))
748 }
749 const timestamp = performance.now()
750 const workerNodeKey = this.chooseWorkerNode()
94407def 751 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
752 if (
753 name != null &&
a5d15204
JB
754 Array.isArray(workerInfo.taskFunctions) &&
755 !workerInfo.taskFunctions.includes(name)
cea399c8 756 ) {
90d7d101
JB
757 reject(
758 new Error(`Task function '${name}' is not registered in the pool`)
759 )
760 }
501aea93 761 const task: Task<Data> = {
52b71763
JB
762 name: name ?? DEFAULT_TASK_NAME,
763 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
764 data: data ?? ({} as Data),
7d91a8cd 765 transferList,
52b71763 766 timestamp,
a5d15204 767 workerId: workerInfo.id as number,
7629bdf1 768 taskId: randomUUID()
52b71763 769 }
7629bdf1 770 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
771 resolve,
772 reject,
501aea93 773 workerNodeKey
2e81254d 774 })
52b71763 775 if (
4e377863
JB
776 this.opts.enableTasksQueue === false ||
777 (this.opts.enableTasksQueue === true &&
778 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 779 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 780 ) {
501aea93 781 this.executeTask(workerNodeKey, task)
4e377863
JB
782 } else {
783 this.enqueueTask(workerNodeKey, task)
52b71763 784 }
2e81254d 785 })
280c2a77 786 }
c97c7edb 787
afc003b2 788 /** @inheritDoc */
c97c7edb 789 public async destroy (): Promise<void> {
1fbcaa7c 790 await Promise.all(
81c02522 791 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 792 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
793 })
794 )
33e6bb4c 795 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 796 this.started = false
c97c7edb
S
797 }
798
1e3214b6
JB
799 protected async sendKillMessageToWorker (
800 workerNodeKey: number,
801 workerId: number
802 ): Promise<void> {
9edb9717 803 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
804 this.registerWorkerMessageListener(workerNodeKey, (message) => {
805 if (message.kill === 'success') {
806 resolve()
807 } else if (message.kill === 'failure') {
e1af34e6 808 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
809 }
810 })
9edb9717 811 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 812 })
1e3214b6
JB
813 }
814
4a6952ff 815 /**
aa9eede8 816 * Terminates the worker node given its worker node key.
4a6952ff 817 *
aa9eede8 818 * @param workerNodeKey - The worker node key.
4a6952ff 819 */
81c02522 820 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 821
729c563d 822 /**
6677a3d3
JB
823 * Setup hook to execute code before worker nodes are created in the abstract constructor.
824 * Can be overridden.
afc003b2
JB
825 *
826 * @virtual
729c563d 827 */
280c2a77 828 protected setupHook (): void {
d99ba5a8 829 // Intentionally empty
280c2a77 830 }
c97c7edb 831
729c563d 832 /**
280c2a77
S
833 * Should return whether the worker is the main worker or not.
834 */
835 protected abstract isMain (): boolean
836
837 /**
2e81254d 838 * Hook executed before the worker task execution.
bf9549ae 839 * Can be overridden.
729c563d 840 *
f06e48d8 841 * @param workerNodeKey - The worker node key.
1c6fe997 842 * @param task - The task to execute.
729c563d 843 */
1c6fe997
JB
844 protected beforeTaskExecutionHook (
845 workerNodeKey: number,
846 task: Task<Data>
847 ): void {
94407def
JB
848 if (this.workerNodes[workerNodeKey]?.usage != null) {
849 const workerUsage = this.workerNodes[workerNodeKey].usage
850 ++workerUsage.tasks.executing
851 this.updateWaitTimeWorkerUsage(workerUsage, task)
852 }
853 if (
854 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
855 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
856 task.name as string
857 ) != null
858 ) {
db0e38ee 859 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 860 workerNodeKey
db0e38ee 861 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
862 ++taskFunctionWorkerUsage.tasks.executing
863 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 864 }
c97c7edb
S
865 }
866
c01733f1 867 /**
2e81254d 868 * Hook executed after the worker task execution.
bf9549ae 869 * Can be overridden.
c01733f1 870 *
501aea93 871 * @param workerNodeKey - The worker node key.
38e795c1 872 * @param message - The received message.
c01733f1 873 */
2e81254d 874 protected afterTaskExecutionHook (
501aea93 875 workerNodeKey: number,
2740a743 876 message: MessageValue<Response>
bf9549ae 877 ): void {
94407def
JB
878 if (this.workerNodes[workerNodeKey]?.usage != null) {
879 const workerUsage = this.workerNodes[workerNodeKey].usage
880 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
881 this.updateRunTimeWorkerUsage(workerUsage, message)
882 this.updateEluWorkerUsage(workerUsage, message)
883 }
884 if (
885 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
886 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 887 message.taskPerformance?.name as string
94407def
JB
888 ) != null
889 ) {
db0e38ee 890 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 891 workerNodeKey
db0e38ee 892 ].getTaskFunctionWorkerUsage(
0628755c 893 message.taskPerformance?.name as string
b558f6b5 894 ) as WorkerUsage
db0e38ee
JB
895 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
896 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
897 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
898 }
899 }
900
db0e38ee
JB
901 /**
902 * Whether the worker node shall update its task function worker usage or not.
903 *
904 * @param workerNodeKey - The worker node key.
905 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
906 */
907 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 908 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 909 return (
94407def 910 workerInfo != null &&
a5d15204 911 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 912 workerInfo.taskFunctions.length > 2
b558f6b5 913 )
f1c06930
JB
914 }
915
916 private updateTaskStatisticsWorkerUsage (
917 workerUsage: WorkerUsage,
918 message: MessageValue<Response>
919 ): void {
a4e07f72 920 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
921 if (
922 workerTaskStatistics.executing != null &&
923 workerTaskStatistics.executing > 0
924 ) {
925 --workerTaskStatistics.executing
5bb5be17 926 }
98e72cda
JB
927 if (message.taskError == null) {
928 ++workerTaskStatistics.executed
929 } else {
a4e07f72 930 ++workerTaskStatistics.failed
2740a743 931 }
f8eb0a2a
JB
932 }
933
a4e07f72
JB
934 private updateRunTimeWorkerUsage (
935 workerUsage: WorkerUsage,
f8eb0a2a
JB
936 message: MessageValue<Response>
937 ): void {
dc021bcc
JB
938 if (message.taskError != null) {
939 return
940 }
e4f20deb
JB
941 updateMeasurementStatistics(
942 workerUsage.runTime,
943 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 944 message.taskPerformance?.runTime ?? 0
e4f20deb 945 )
f8eb0a2a
JB
946 }
947
a4e07f72
JB
948 private updateWaitTimeWorkerUsage (
949 workerUsage: WorkerUsage,
1c6fe997 950 task: Task<Data>
f8eb0a2a 951 ): void {
1c6fe997
JB
952 const timestamp = performance.now()
953 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
954 updateMeasurementStatistics(
955 workerUsage.waitTime,
956 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 957 taskWaitTime
e4f20deb 958 )
c01733f1 959 }
960
a4e07f72 961 private updateEluWorkerUsage (
5df69fab 962 workerUsage: WorkerUsage,
62c15a68
JB
963 message: MessageValue<Response>
964 ): void {
dc021bcc
JB
965 if (message.taskError != null) {
966 return
967 }
008512c7
JB
968 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
969 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
970 updateMeasurementStatistics(
971 workerUsage.elu.active,
008512c7 972 eluTaskStatisticsRequirements,
dc021bcc 973 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
974 )
975 updateMeasurementStatistics(
976 workerUsage.elu.idle,
008512c7 977 eluTaskStatisticsRequirements,
dc021bcc 978 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 979 )
008512c7 980 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 981 if (message.taskPerformance?.elu != null) {
f7510105
JB
982 if (workerUsage.elu.utilization != null) {
983 workerUsage.elu.utilization =
984 (workerUsage.elu.utilization +
985 message.taskPerformance.elu.utilization) /
986 2
987 } else {
988 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
989 }
62c15a68
JB
990 }
991 }
992 }
993
280c2a77 994 /**
f06e48d8 995 * Chooses a worker node for the next task.
280c2a77 996 *
6c6afb84 997 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 998 *
aa9eede8 999 * @returns The chosen worker node key
280c2a77 1000 */
6c6afb84 1001 private chooseWorkerNode (): number {
930dcf12 1002 if (this.shallCreateDynamicWorker()) {
aa9eede8 1003 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1004 if (
b1aae695 1005 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1006 ) {
aa9eede8 1007 return workerNodeKey
6c6afb84 1008 }
17393ac8 1009 }
930dcf12
JB
1010 return this.workerChoiceStrategyContext.execute()
1011 }
1012
6c6afb84
JB
1013 /**
1014 * Conditions for dynamic worker creation.
1015 *
1016 * @returns Whether to create a dynamic worker or not.
1017 */
1018 private shallCreateDynamicWorker (): boolean {
930dcf12 1019 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1020 }
1021
280c2a77 1022 /**
aa9eede8 1023 * Sends a message to worker given its worker node key.
280c2a77 1024 *
aa9eede8 1025 * @param workerNodeKey - The worker node key.
38e795c1 1026 * @param message - The message.
7d91a8cd 1027 * @param transferList - The optional array of transferable objects.
280c2a77
S
1028 */
1029 protected abstract sendToWorker (
aa9eede8 1030 workerNodeKey: number,
7d91a8cd
JB
1031 message: MessageValue<Data>,
1032 transferList?: TransferListItem[]
280c2a77
S
1033 ): void
1034
729c563d 1035 /**
41344292 1036 * Creates a new worker.
6c6afb84
JB
1037 *
1038 * @returns Newly created worker.
729c563d 1039 */
280c2a77 1040 protected abstract createWorker (): Worker
c97c7edb 1041
4a6952ff 1042 /**
aa9eede8 1043 * Creates a new, completely set up worker node.
4a6952ff 1044 *
aa9eede8 1045 * @returns New, completely set up worker node key.
4a6952ff 1046 */
aa9eede8 1047 protected createAndSetupWorkerNode (): number {
bdacc2d2 1048 const worker = this.createWorker()
280c2a77 1049
fd04474e 1050 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1051 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1052 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1053 worker.on('error', (error) => {
aad6fb64 1054 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1055 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1056 workerInfo.ready = false
0dc838e3 1057 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1058 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1059 if (
1060 this.opts.restartWorkerOnError === true &&
1061 !this.starting &&
1062 this.started
1063 ) {
9b106837 1064 if (workerInfo.dynamic) {
aa9eede8 1065 this.createAndSetupDynamicWorkerNode()
8a1260a3 1066 } else {
aa9eede8 1067 this.createAndSetupWorkerNode()
8a1260a3 1068 }
5baee0d7 1069 }
19dbc45b 1070 if (this.opts.enableTasksQueue === true) {
9b106837 1071 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1072 }
5baee0d7 1073 })
a35560ba 1074 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1075 worker.once('exit', () => {
f06e48d8 1076 this.removeWorkerNode(worker)
a974afa6 1077 })
280c2a77 1078
aa9eede8 1079 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1080
aa9eede8 1081 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1082
aa9eede8 1083 return workerNodeKey
c97c7edb 1084 }
be0676b3 1085
930dcf12 1086 /**
aa9eede8 1087 * Creates a new, completely set up dynamic worker node.
930dcf12 1088 *
aa9eede8 1089 * @returns New, completely set up dynamic worker node key.
930dcf12 1090 */
aa9eede8
JB
1091 protected createAndSetupDynamicWorkerNode (): number {
1092 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1093 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1094 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1095 message.workerId
aad6fb64 1096 )
aa9eede8 1097 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1098 // Kill message received from worker
930dcf12
JB
1099 if (
1100 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1101 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1102 ((this.opts.enableTasksQueue === false &&
aa9eede8 1103 workerUsage.tasks.executing === 0) ||
7b56f532 1104 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1105 workerUsage.tasks.executing === 0 &&
1106 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1107 ) {
5270d253
JB
1108 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1109 this.emitter?.emit(PoolEvents.error, error)
1110 })
930dcf12
JB
1111 }
1112 })
94407def 1113 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1114 this.sendToWorker(workerNodeKey, {
b0a4db63 1115 checkActive: true,
21f710aa
JB
1116 workerId: workerInfo.id as number
1117 })
b5e113f6 1118 workerInfo.dynamic = true
b1aae695
JB
1119 if (
1120 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1121 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1122 ) {
b5e113f6
JB
1123 workerInfo.ready = true
1124 }
33e6bb4c 1125 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1126 return workerNodeKey
930dcf12
JB
1127 }
1128
a2ed5053 1129 /**
aa9eede8 1130 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1131 *
aa9eede8 1132 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1133 * @param listener - The message listener callback.
1134 */
85aeb3f3
JB
1135 protected abstract registerWorkerMessageListener<
1136 Message extends Data | Response
aa9eede8
JB
1137 >(
1138 workerNodeKey: number,
1139 listener: (message: MessageValue<Message>) => void
1140 ): void
a2ed5053
JB
1141
1142 /**
aa9eede8 1143 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1144 * Can be overridden.
1145 *
aa9eede8 1146 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1147 */
aa9eede8 1148 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1149 // Listen to worker messages.
aa9eede8 1150 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1151 // Send the startup message to worker.
aa9eede8 1152 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1153 // Send the statistics message to worker.
1154 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1155 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1156 this.workerNodes[workerNodeKey].onEmptyQueue =
1157 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1158 this.workerNodes[workerNodeKey].onBackPressure =
1159 this.tasksStealingOnBackPressure.bind(this)
1160 }
d2c73f82
JB
1161 }
1162
85aeb3f3 1163 /**
aa9eede8
JB
1164 * Sends the startup message to worker given its worker node key.
1165 *
1166 * @param workerNodeKey - The worker node key.
1167 */
1168 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1169
1170 /**
9edb9717 1171 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1172 *
aa9eede8 1173 * @param workerNodeKey - The worker node key.
85aeb3f3 1174 */
9edb9717 1175 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1176 this.sendToWorker(workerNodeKey, {
1177 statistics: {
1178 runTime:
1179 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1180 .runTime.aggregate,
1181 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1182 .elu.aggregate
1183 },
94407def 1184 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1185 })
1186 }
a2ed5053
JB
1187
1188 private redistributeQueuedTasks (workerNodeKey: number): void {
1189 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1190 let destinationWorkerNodeKey!: number
a2ed5053 1191 let minQueuedTasks = Infinity
10ecf8fd 1192 let executeTask = false
a6b3272b 1193 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3
JB
1194 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
1195 if (
1196 workerNode.usage.tasks.executing <
a6b3272b 1197 (this.opts.tasksQueueOptions?.concurrency as number)
58baffd3
JB
1198 ) {
1199 executeTask = true
1200 }
1201 if (workerNode.usage.tasks.queued === 0) {
1202 destinationWorkerNodeKey = workerNodeId
1203 break
1204 }
1205 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1206 minQueuedTasks = workerNode.usage.tasks.queued
1207 destinationWorkerNodeKey = workerNodeId
1208 }
a2ed5053
JB
1209 }
1210 }
0bc68267
JB
1211 if (destinationWorkerNodeKey != null) {
1212 const task = {
1213 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1214 workerId: (this.getWorkerInfo(destinationWorkerNodeKey) as WorkerInfo)
1215 .id as number
1216 }
1217 if (executeTask) {
1218 this.executeTask(destinationWorkerNodeKey, task)
1219 } else {
1220 this.enqueueTask(destinationWorkerNodeKey, task)
1221 }
dd951876
JB
1222 }
1223 }
1224 }
1225
1226 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1227 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1228 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1229 const workerNodes = this.workerNodes
a6b3272b 1230 .slice()
dd951876
JB
1231 .sort(
1232 (workerNodeA, workerNodeB) =>
1233 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1234 )
dd951876 1235 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1236 if (sourceWorkerNode.usage.tasks.queued === 0) {
1237 break
1238 }
a6b3272b
JB
1239 if (
1240 sourceWorkerNode.info.ready &&
1241 sourceWorkerNode.info.id !== workerId &&
1242 sourceWorkerNode.usage.tasks.queued > 0
1243 ) {
1244 const task = {
1245 ...(sourceWorkerNode.popTask() as Task<Data>),
1246 workerId: destinationWorkerNode.info.id as number
1247 }
dd951876 1248 if (
a6b3272b 1249 destinationWorkerNode.usage.tasks.executing <
dd951876
JB
1250 (this.opts.tasksQueueOptions?.concurrency as number)
1251 ) {
dd951876
JB
1252 this.executeTask(destinationWorkerNodeKey, task)
1253 } else {
dd951876
JB
1254 this.enqueueTask(destinationWorkerNodeKey, task)
1255 }
1256 break
72695f86
JB
1257 }
1258 }
1259 }
1260
1261 private tasksStealingOnBackPressure (workerId: number): void {
1262 const sourceWorkerNode =
1263 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1264 const workerNodes = this.workerNodes
a6b3272b 1265 .slice()
72695f86
JB
1266 .sort(
1267 (workerNodeA, workerNodeB) =>
1268 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1269 )
1270 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1271 if (
0bc68267 1272 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1273 workerNode.info.ready &&
1274 workerNode.info.id !== workerId &&
0bc68267
JB
1275 workerNode.usage.tasks.queued <
1276 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1277 ) {
dd951876
JB
1278 const task = {
1279 ...(sourceWorkerNode.popTask() as Task<Data>),
1280 workerId: workerNode.info.id as number
1281 }
4de3d785
JB
1282 if (
1283 workerNode.usage.tasks.executing <
72695f86 1284 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785 1285 ) {
dd951876 1286 this.executeTask(workerNodeKey, task)
4de3d785 1287 } else {
dd951876 1288 this.enqueueTask(workerNodeKey, task)
4de3d785 1289 }
10ecf8fd 1290 }
a2ed5053
JB
1291 }
1292 }
1293
be0676b3 1294 /**
aa9eede8 1295 * This method is the listener registered for each worker message.
be0676b3 1296 *
bdacc2d2 1297 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1298 */
1299 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1300 return (message) => {
21f710aa 1301 this.checkMessageWorkerId(message)
a5d15204 1302 if (message.ready != null && message.taskFunctions != null) {
81c02522 1303 // Worker ready response received from worker
10e2aa7e 1304 this.handleWorkerReadyResponse(message)
7629bdf1 1305 } else if (message.taskId != null) {
81c02522 1306 // Task execution response received from worker
6b272951 1307 this.handleTaskExecutionResponse(message)
90d7d101
JB
1308 } else if (message.taskFunctions != null) {
1309 // Task functions message received from worker
94407def
JB
1310 (
1311 this.getWorkerInfo(
1312 this.getWorkerNodeKeyByWorkerId(message.workerId)
1313 ) as WorkerInfo
b558f6b5 1314 ).taskFunctions = message.taskFunctions
6b272951
JB
1315 }
1316 }
1317 }
1318
10e2aa7e 1319 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1320 if (message.ready === false) {
1321 throw new Error(`Worker ${message.workerId} failed to initialize`)
1322 }
a5d15204 1323 const workerInfo = this.getWorkerInfo(
aad6fb64 1324 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1325 ) as WorkerInfo
a5d15204
JB
1326 workerInfo.ready = message.ready as boolean
1327 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1328 if (this.emitter != null && this.ready) {
1329 this.emitter.emit(PoolEvents.ready, this.info)
1330 }
6b272951
JB
1331 }
1332
1333 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1334 const { taskId, taskError, data } = message
1335 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1336 if (promiseResponse != null) {
5441aea6
JB
1337 if (taskError != null) {
1338 this.emitter?.emit(PoolEvents.taskError, taskError)
1339 promiseResponse.reject(taskError.message)
6b272951 1340 } else {
5441aea6 1341 promiseResponse.resolve(data as Response)
6b272951 1342 }
501aea93
JB
1343 const workerNodeKey = promiseResponse.workerNodeKey
1344 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1345 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1346 if (
1347 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1348 this.tasksQueueSize(workerNodeKey) > 0 &&
1349 this.workerNodes[workerNodeKey].usage.tasks.executing <
1350 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1351 ) {
1352 this.executeTask(
1353 workerNodeKey,
1354 this.dequeueTask(workerNodeKey) as Task<Data>
1355 )
be0676b3 1356 }
6b272951 1357 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1358 }
be0676b3 1359 }
7c0ba920 1360
a1763c54 1361 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1362 if (this.busy) {
1363 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1364 }
1365 }
1366
1367 private checkAndEmitTaskQueuingEvents (): void {
1368 if (this.hasBackPressure()) {
1369 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1370 }
1371 }
1372
33e6bb4c
JB
1373 private checkAndEmitDynamicWorkerCreationEvents (): void {
1374 if (this.type === PoolTypes.dynamic) {
1375 if (this.full) {
1376 this.emitter?.emit(PoolEvents.full, this.info)
1377 }
1378 }
1379 }
1380
8a1260a3 1381 /**
aa9eede8 1382 * Gets the worker information given its worker node key.
8a1260a3
JB
1383 *
1384 * @param workerNodeKey - The worker node key.
3f09ed9f 1385 * @returns The worker information.
8a1260a3 1386 */
94407def
JB
1387 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1388 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1389 }
1390
a05c10de 1391 /**
b0a4db63 1392 * Adds the given worker in the pool worker nodes.
ea7a90d3 1393 *
38e795c1 1394 * @param worker - The worker.
aa9eede8
JB
1395 * @returns The added worker node key.
1396 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1397 */
b0a4db63 1398 private addWorkerNode (worker: Worker): number {
671d5154
JB
1399 const workerNode = new WorkerNode<Worker, Data>(
1400 worker,
1401 this.worker,
ff3f866a 1402 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1403 )
b97d82d8 1404 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1405 if (this.starting) {
1406 workerNode.info.ready = true
1407 }
aa9eede8 1408 this.workerNodes.push(workerNode)
aad6fb64 1409 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1410 if (workerNodeKey === -1) {
a5d15204 1411 throw new Error('Worker node added not found')
aa9eede8
JB
1412 }
1413 return workerNodeKey
ea7a90d3 1414 }
c923ce56 1415
51fe3d3c 1416 /**
f06e48d8 1417 * Removes the given worker from the pool worker nodes.
51fe3d3c 1418 *
f06e48d8 1419 * @param worker - The worker.
51fe3d3c 1420 */
416fd65c 1421 private removeWorkerNode (worker: Worker): void {
aad6fb64 1422 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1423 if (workerNodeKey !== -1) {
1424 this.workerNodes.splice(workerNodeKey, 1)
1425 this.workerChoiceStrategyContext.remove(workerNodeKey)
1426 }
51fe3d3c 1427 }
adc3c320 1428
e2b31e32
JB
1429 /** @inheritDoc */
1430 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1431 return (
e2b31e32
JB
1432 this.opts.enableTasksQueue === true &&
1433 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1434 )
1435 }
1436
1437 private hasBackPressure (): boolean {
1438 return (
1439 this.opts.enableTasksQueue === true &&
1440 this.workerNodes.findIndex(
1441 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1442 ) === -1
9e844245 1443 )
e2b31e32
JB
1444 }
1445
b0a4db63 1446 /**
aa9eede8 1447 * Executes the given task on the worker given its worker node key.
b0a4db63 1448 *
aa9eede8 1449 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1450 * @param task - The task to execute.
1451 */
2e81254d 1452 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1453 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1454 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1455 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1456 }
1457
f9f00b5f 1458 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1459 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1460 this.checkAndEmitTaskQueuingEvents()
1461 return tasksQueueSize
adc3c320
JB
1462 }
1463
416fd65c 1464 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1465 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1466 }
1467
416fd65c 1468 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1469 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1470 }
1471
81c02522 1472 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1473 while (this.tasksQueueSize(workerNodeKey) > 0) {
1474 this.executeTask(
1475 workerNodeKey,
1476 this.dequeueTask(workerNodeKey) as Task<Data>
1477 )
ff733df7 1478 }
4b628b48 1479 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1480 }
1481
ef41a6e6
JB
1482 private flushTasksQueues (): void {
1483 for (const [workerNodeKey] of this.workerNodes.entries()) {
1484 this.flushTasksQueue(workerNodeKey)
1485 }
1486 }
c97c7edb 1487}