refactor: cleanup promise usage code
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
04f45163 109 throw new Error(
8c6d4acf 110 'Cannot start a pool from a worker with the same type as the pool'
04f45163 111 )
c97c7edb 112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 120 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 121 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 122
6bd72cd0 123 if (this.opts.enableEvents === true) {
7c0ba920
JB
124 this.emitter = new PoolEmitter()
125 }
d59df138
JB
126 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
127 Worker,
128 Data,
129 Response
da309861
JB
130 >(
131 this,
132 this.opts.workerChoiceStrategy,
133 this.opts.workerChoiceStrategyOptions
134 )
b6b32453
JB
135
136 this.setupHook()
137
075e51d1 138 this.starting = true
e761c033 139 this.startPool()
075e51d1 140 this.starting = false
afe0d5bf
JB
141
142 this.startTimestamp = performance.now()
c97c7edb
S
143 }
144
a35560ba 145 private checkFilePath (filePath: string): void {
ffcbbad8
JB
146 if (
147 filePath == null ||
3d6dd312 148 typeof filePath !== 'string' ||
ffcbbad8
JB
149 (typeof filePath === 'string' && filePath.trim().length === 0)
150 ) {
c510fea7
APA
151 throw new Error('Please specify a file with a worker implementation')
152 }
3d6dd312
JB
153 if (!existsSync(filePath)) {
154 throw new Error(`Cannot find the worker file '${filePath}'`)
155 }
c510fea7
APA
156 }
157
8d3782fa
JB
158 private checkNumberOfWorkers (numberOfWorkers: number): void {
159 if (numberOfWorkers == null) {
160 throw new Error(
161 'Cannot instantiate a pool without specifying the number of workers'
162 )
78cea37e 163 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 164 throw new TypeError(
0d80593b 165 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
166 )
167 } else if (numberOfWorkers < 0) {
473c717a 168 throw new RangeError(
8d3782fa
JB
169 'Cannot instantiate a pool with a negative number of workers'
170 )
6b27d407 171 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
172 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
173 }
174 }
175
176 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 177 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
178 if (max == null) {
179 throw new Error(
180 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 )
182 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
183 throw new TypeError(
184 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 )
186 } else if (min > max) {
079de991
JB
187 throw new RangeError(
188 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 )
b97d82d8 190 } else if (max === 0) {
079de991 191 throw new RangeError(
d640b48b 192 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
193 )
194 } else if (min === max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 )
198 }
8d3782fa
JB
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
203 if (isPlainObject(opts)) {
204 this.opts.workerChoiceStrategy =
205 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
206 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
207 this.opts.workerChoiceStrategyOptions =
208 opts.workerChoiceStrategyOptions ??
209 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
210 this.checkValidWorkerChoiceStrategyOptions(
211 this.opts.workerChoiceStrategyOptions
212 )
1f68cede 213 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
214 this.opts.enableEvents = opts.enableEvents ?? true
215 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
216 if (this.opts.enableTasksQueue) {
217 this.checkValidTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
221 opts.tasksQueueOptions as TasksQueueOptions
222 )
223 }
224 } else {
225 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 226 }
aee46736
JB
227 }
228
229 private checkValidWorkerChoiceStrategy (
230 workerChoiceStrategy: WorkerChoiceStrategy
231 ): void {
232 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 233 throw new Error(
aee46736 234 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
235 )
236 }
7c0ba920
JB
237 }
238
0d80593b
JB
239 private checkValidWorkerChoiceStrategyOptions (
240 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
241 ): void {
242 if (!isPlainObject(workerChoiceStrategyOptions)) {
243 throw new TypeError(
244 'Invalid worker choice strategy options: must be a plain object'
245 )
246 }
49be33fe
JB
247 if (
248 workerChoiceStrategyOptions.weights != null &&
6b27d407 249 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
250 ) {
251 throw new Error(
252 'Invalid worker choice strategy options: must have a weight for each worker node'
253 )
254 }
f0d7f803
JB
255 if (
256 workerChoiceStrategyOptions.measurement != null &&
257 !Object.values(Measurements).includes(
258 workerChoiceStrategyOptions.measurement
259 )
260 ) {
261 throw new Error(
262 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
263 )
264 }
0d80593b
JB
265 }
266
a20f0ba5
JB
267 private checkValidTasksQueueOptions (
268 tasksQueueOptions: TasksQueueOptions
269 ): void {
0d80593b
JB
270 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
271 throw new TypeError('Invalid tasks queue options: must be a plain object')
272 }
f0d7f803
JB
273 if (
274 tasksQueueOptions?.concurrency != null &&
275 !Number.isSafeInteger(tasksQueueOptions.concurrency)
276 ) {
277 throw new TypeError(
278 'Invalid worker tasks concurrency: must be an integer'
279 )
280 }
281 if (
282 tasksQueueOptions?.concurrency != null &&
283 tasksQueueOptions.concurrency <= 0
284 ) {
a20f0ba5 285 throw new Error(
f0d7f803 286 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
287 )
288 }
289 }
290
e761c033
JB
291 private startPool (): void {
292 while (
293 this.workerNodes.reduce(
294 (accumulator, workerNode) =>
295 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
296 0
297 ) < this.numberOfWorkers
298 ) {
aa9eede8 299 this.createAndSetupWorkerNode()
e761c033
JB
300 }
301 }
302
08f3f44c 303 /** @inheritDoc */
6b27d407
JB
304 public get info (): PoolInfo {
305 return {
23ccf9d7 306 version,
6b27d407 307 type: this.type,
184855e6 308 worker: this.worker,
2431bdb4
JB
309 ready: this.ready,
310 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
311 minSize: this.minSize,
312 maxSize: this.maxSize,
c05f0d50
JB
313 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .runTime.aggregate &&
1305e9a8
JB
315 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
316 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
317 workerNodes: this.workerNodes.length,
318 idleWorkerNodes: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
f59e1027 320 workerNode.usage.tasks.executing === 0
a4e07f72
JB
321 ? accumulator + 1
322 : accumulator,
6b27d407
JB
323 0
324 ),
325 busyWorkerNodes: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
328 0
329 ),
a4e07f72 330 executedTasks: this.workerNodes.reduce(
6b27d407 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
333 0
334 ),
335 executingTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
f59e1027 337 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
338 0
339 ),
daf86646
JB
340 ...(this.opts.enableTasksQueue === true && {
341 queuedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.queued,
344 0
345 )
346 }),
347 ...(this.opts.enableTasksQueue === true && {
348 maxQueuedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
350 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
351 0
352 )
353 }),
a4e07f72
JB
354 failedTasks: this.workerNodes.reduce(
355 (accumulator, workerNode) =>
f59e1027 356 accumulator + workerNode.usage.tasks.failed,
a4e07f72 357 0
1dcf8b7b
JB
358 ),
359 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .runTime.aggregate && {
361 runTime: {
98e72cda
JB
362 minimum: round(
363 Math.min(
364 ...this.workerNodes.map(
8ebe6c30 365 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 366 )
1dcf8b7b
JB
367 )
368 ),
98e72cda
JB
369 maximum: round(
370 Math.max(
371 ...this.workerNodes.map(
8ebe6c30 372 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 373 )
1dcf8b7b 374 )
98e72cda
JB
375 ),
376 average: round(
377 this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
380 0
381 ) /
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.tasks?.executed ?? 0),
385 0
386 )
387 ),
388 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
389 .runTime.median && {
390 median: round(
391 median(
392 this.workerNodes.map(
8ebe6c30 393 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
394 )
395 )
396 )
397 })
1dcf8b7b
JB
398 }
399 }),
400 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
401 .waitTime.aggregate && {
402 waitTime: {
98e72cda
JB
403 minimum: round(
404 Math.min(
405 ...this.workerNodes.map(
8ebe6c30 406 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 407 )
1dcf8b7b
JB
408 )
409 ),
98e72cda
JB
410 maximum: round(
411 Math.max(
412 ...this.workerNodes.map(
8ebe6c30 413 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 414 )
1dcf8b7b 415 )
98e72cda
JB
416 ),
417 average: round(
418 this.workerNodes.reduce(
419 (accumulator, workerNode) =>
420 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
421 0
422 ) /
423 this.workerNodes.reduce(
424 (accumulator, workerNode) =>
425 accumulator + (workerNode.usage.tasks?.executed ?? 0),
426 0
427 )
428 ),
429 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
430 .waitTime.median && {
431 median: round(
432 median(
433 this.workerNodes.map(
8ebe6c30 434 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
435 )
436 )
437 )
438 })
1dcf8b7b
JB
439 }
440 })
6b27d407
JB
441 }
442 }
08f3f44c 443
aa9eede8
JB
444 /**
445 * The pool readiness boolean status.
446 */
2431bdb4
JB
447 private get ready (): boolean {
448 return (
b97d82d8
JB
449 this.workerNodes.reduce(
450 (accumulator, workerNode) =>
451 !workerNode.info.dynamic && workerNode.info.ready
452 ? accumulator + 1
453 : accumulator,
454 0
455 ) >= this.minSize
2431bdb4
JB
456 )
457 }
458
afe0d5bf 459 /**
aa9eede8 460 * The approximate pool utilization.
afe0d5bf
JB
461 *
462 * @returns The pool utilization.
463 */
464 private get utilization (): number {
8e5ca040 465 const poolTimeCapacity =
fe7d90db 466 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
467 const totalTasksRunTime = this.workerNodes.reduce(
468 (accumulator, workerNode) =>
71514351 469 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
470 0
471 )
472 const totalTasksWaitTime = this.workerNodes.reduce(
473 (accumulator, workerNode) =>
71514351 474 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
475 0
476 )
8e5ca040 477 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
478 }
479
8881ae32 480 /**
aa9eede8 481 * The pool type.
8881ae32
JB
482 *
483 * If it is `'dynamic'`, it provides the `max` property.
484 */
485 protected abstract get type (): PoolType
486
184855e6 487 /**
aa9eede8 488 * The worker type.
184855e6
JB
489 */
490 protected abstract get worker (): WorkerType
491
c2ade475 492 /**
aa9eede8 493 * The pool minimum size.
c2ade475 494 */
6b27d407 495 protected abstract get minSize (): number
ff733df7
JB
496
497 /**
aa9eede8 498 * The pool maximum size.
ff733df7 499 */
6b27d407 500 protected abstract get maxSize (): number
a35560ba 501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
21f710aa
JB
508 private checkMessageWorkerId (message: MessageValue<Response>): void {
509 if (
510 message.workerId != null &&
aad6fb64 511 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
512 ) {
513 throw new Error(
514 `Worker message received from unknown worker '${message.workerId}'`
515 )
516 }
517 }
518
ffcbbad8 519 /**
f06e48d8 520 * Gets the given worker its worker node key.
ffcbbad8
JB
521 *
522 * @param worker - The worker.
f59e1027 523 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 524 */
aad6fb64 525 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 526 return this.workerNodes.findIndex(
8ebe6c30 527 (workerNode) => workerNode.worker === worker
f06e48d8 528 )
bf9549ae
JB
529 }
530
aa9eede8
JB
531 /**
532 * Gets the worker node key given its worker id.
533 *
534 * @param workerId - The worker id.
aad6fb64 535 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 536 */
aad6fb64
JB
537 private getWorkerNodeKeyByWorkerId (workerId: number): number {
538 return this.workerNodes.findIndex(
8ebe6c30 539 (workerNode) => workerNode.info.id === workerId
aad6fb64 540 )
aa9eede8
JB
541 }
542
afc003b2 543 /** @inheritDoc */
a35560ba 544 public setWorkerChoiceStrategy (
59219cbb
JB
545 workerChoiceStrategy: WorkerChoiceStrategy,
546 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 547 ): void {
aee46736 548 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 549 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
550 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
551 this.opts.workerChoiceStrategy
552 )
553 if (workerChoiceStrategyOptions != null) {
554 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
555 }
aa9eede8 556 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 557 workerNode.resetUsage()
9edb9717 558 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 559 }
a20f0ba5
JB
560 }
561
562 /** @inheritDoc */
563 public setWorkerChoiceStrategyOptions (
564 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
565 ): void {
0d80593b 566 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
567 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
568 this.workerChoiceStrategyContext.setOptions(
569 this.opts.workerChoiceStrategyOptions
a35560ba
S
570 )
571 }
572
a20f0ba5 573 /** @inheritDoc */
8f52842f
JB
574 public enableTasksQueue (
575 enable: boolean,
576 tasksQueueOptions?: TasksQueueOptions
577 ): void {
a20f0ba5 578 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 579 this.flushTasksQueues()
a20f0ba5
JB
580 }
581 this.opts.enableTasksQueue = enable
8f52842f 582 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
583 }
584
585 /** @inheritDoc */
8f52842f 586 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 587 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
588 this.checkValidTasksQueueOptions(tasksQueueOptions)
589 this.opts.tasksQueueOptions =
590 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 591 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
592 delete this.opts.tasksQueueOptions
593 }
594 }
595
596 private buildTasksQueueOptions (
597 tasksQueueOptions: TasksQueueOptions
598 ): TasksQueueOptions {
599 return {
600 concurrency: tasksQueueOptions?.concurrency ?? 1
601 }
602 }
603
c319c66b
JB
604 /**
605 * Whether the pool is full or not.
606 *
607 * The pool filling boolean status.
608 */
dea903a8
JB
609 protected get full (): boolean {
610 return this.workerNodes.length >= this.maxSize
611 }
c2ade475 612
c319c66b
JB
613 /**
614 * Whether the pool is busy or not.
615 *
616 * The pool busyness boolean status.
617 */
618 protected abstract get busy (): boolean
7c0ba920 619
6c6afb84 620 /**
3d76750a 621 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
622 *
623 * @returns Worker nodes busyness boolean status.
624 */
c2ade475 625 protected internalBusy (): boolean {
3d76750a
JB
626 if (this.opts.enableTasksQueue === true) {
627 return (
628 this.workerNodes.findIndex(
8ebe6c30 629 (workerNode) =>
3d76750a
JB
630 workerNode.info.ready &&
631 workerNode.usage.tasks.executing <
632 (this.opts.tasksQueueOptions?.concurrency as number)
633 ) === -1
634 )
635 } else {
636 return (
637 this.workerNodes.findIndex(
8ebe6c30 638 (workerNode) =>
3d76750a
JB
639 workerNode.info.ready && workerNode.usage.tasks.executing === 0
640 ) === -1
641 )
642 }
cb70b19d
JB
643 }
644
afc003b2 645 /** @inheritDoc */
7d91a8cd
JB
646 public async execute (
647 data?: Data,
648 name?: string,
649 transferList?: TransferListItem[]
650 ): Promise<Response> {
52b71763 651 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
652 if (name != null && typeof name !== 'string') {
653 reject(new TypeError('name argument must be a string'))
654 }
655 if (transferList != null && !Array.isArray(transferList)) {
656 reject(new TypeError('transferList argument must be an array'))
657 }
52b71763
JB
658 const timestamp = performance.now()
659 const workerNodeKey = this.chooseWorkerNode()
501aea93 660 const task: Task<Data> = {
52b71763
JB
661 name: name ?? DEFAULT_TASK_NAME,
662 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
663 data: data ?? ({} as Data),
7d91a8cd 664 transferList,
52b71763
JB
665 timestamp,
666 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 667 taskId: randomUUID()
52b71763 668 }
7629bdf1 669 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
670 resolve,
671 reject,
501aea93 672 workerNodeKey
2e81254d 673 })
52b71763 674 if (
4e377863
JB
675 this.opts.enableTasksQueue === false ||
676 (this.opts.enableTasksQueue === true &&
677 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 678 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 679 ) {
501aea93 680 this.executeTask(workerNodeKey, task)
4e377863
JB
681 } else {
682 this.enqueueTask(workerNodeKey, task)
52b71763
JB
683 }
684 this.checkAndEmitEvents()
2e81254d 685 })
280c2a77 686 }
c97c7edb 687
afc003b2 688 /** @inheritDoc */
c97c7edb 689 public async destroy (): Promise<void> {
1fbcaa7c 690 await Promise.all(
81c02522 691 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 692 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
693 })
694 )
c97c7edb
S
695 }
696
1e3214b6
JB
697 protected async sendKillMessageToWorker (
698 workerNodeKey: number,
699 workerId: number
700 ): Promise<void> {
9edb9717 701 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
702 this.registerWorkerMessageListener(workerNodeKey, (message) => {
703 if (message.kill === 'success') {
704 resolve()
705 } else if (message.kill === 'failure') {
e1af34e6 706 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
707 }
708 })
9edb9717 709 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 710 })
1e3214b6
JB
711 }
712
4a6952ff 713 /**
aa9eede8 714 * Terminates the worker node given its worker node key.
4a6952ff 715 *
aa9eede8 716 * @param workerNodeKey - The worker node key.
4a6952ff 717 */
81c02522 718 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 719
729c563d 720 /**
6677a3d3
JB
721 * Setup hook to execute code before worker nodes are created in the abstract constructor.
722 * Can be overridden.
afc003b2
JB
723 *
724 * @virtual
729c563d 725 */
280c2a77 726 protected setupHook (): void {
d99ba5a8 727 // Intentionally empty
280c2a77 728 }
c97c7edb 729
729c563d 730 /**
280c2a77
S
731 * Should return whether the worker is the main worker or not.
732 */
733 protected abstract isMain (): boolean
734
735 /**
2e81254d 736 * Hook executed before the worker task execution.
bf9549ae 737 * Can be overridden.
729c563d 738 *
f06e48d8 739 * @param workerNodeKey - The worker node key.
1c6fe997 740 * @param task - The task to execute.
729c563d 741 */
1c6fe997
JB
742 protected beforeTaskExecutionHook (
743 workerNodeKey: number,
744 task: Task<Data>
745 ): void {
f59e1027 746 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
747 ++workerUsage.tasks.executing
748 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 749 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
750 task.name as string
751 ) as WorkerUsage
eb8afc8a
JB
752 ++taskWorkerUsage.tasks.executing
753 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
754 }
755
c01733f1 756 /**
2e81254d 757 * Hook executed after the worker task execution.
bf9549ae 758 * Can be overridden.
c01733f1 759 *
501aea93 760 * @param workerNodeKey - The worker node key.
38e795c1 761 * @param message - The received message.
c01733f1 762 */
2e81254d 763 protected afterTaskExecutionHook (
501aea93 764 workerNodeKey: number,
2740a743 765 message: MessageValue<Response>
bf9549ae 766 ): void {
ff128cc9 767 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
768 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
769 this.updateRunTimeWorkerUsage(workerUsage, message)
770 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 771 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 772 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 773 ) as WorkerUsage
eb8afc8a
JB
774 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
775 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
776 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
777 }
778
779 private updateTaskStatisticsWorkerUsage (
780 workerUsage: WorkerUsage,
781 message: MessageValue<Response>
782 ): void {
a4e07f72
JB
783 const workerTaskStatistics = workerUsage.tasks
784 --workerTaskStatistics.executing
98e72cda
JB
785 if (message.taskError == null) {
786 ++workerTaskStatistics.executed
787 } else {
a4e07f72 788 ++workerTaskStatistics.failed
2740a743 789 }
f8eb0a2a
JB
790 }
791
a4e07f72
JB
792 private updateRunTimeWorkerUsage (
793 workerUsage: WorkerUsage,
f8eb0a2a
JB
794 message: MessageValue<Response>
795 ): void {
e4f20deb
JB
796 updateMeasurementStatistics(
797 workerUsage.runTime,
798 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
799 message.taskPerformance?.runTime ?? 0,
800 workerUsage.tasks.executed
801 )
f8eb0a2a
JB
802 }
803
a4e07f72
JB
804 private updateWaitTimeWorkerUsage (
805 workerUsage: WorkerUsage,
1c6fe997 806 task: Task<Data>
f8eb0a2a 807 ): void {
1c6fe997
JB
808 const timestamp = performance.now()
809 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
810 updateMeasurementStatistics(
811 workerUsage.waitTime,
812 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
813 taskWaitTime,
814 workerUsage.tasks.executed
815 )
c01733f1 816 }
817
a4e07f72 818 private updateEluWorkerUsage (
5df69fab 819 workerUsage: WorkerUsage,
62c15a68
JB
820 message: MessageValue<Response>
821 ): void {
008512c7
JB
822 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
823 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
824 updateMeasurementStatistics(
825 workerUsage.elu.active,
008512c7 826 eluTaskStatisticsRequirements,
e4f20deb
JB
827 message.taskPerformance?.elu?.active ?? 0,
828 workerUsage.tasks.executed
829 )
830 updateMeasurementStatistics(
831 workerUsage.elu.idle,
008512c7 832 eluTaskStatisticsRequirements,
e4f20deb
JB
833 message.taskPerformance?.elu?.idle ?? 0,
834 workerUsage.tasks.executed
835 )
008512c7 836 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 837 if (message.taskPerformance?.elu != null) {
f7510105
JB
838 if (workerUsage.elu.utilization != null) {
839 workerUsage.elu.utilization =
840 (workerUsage.elu.utilization +
841 message.taskPerformance.elu.utilization) /
842 2
843 } else {
844 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
845 }
62c15a68
JB
846 }
847 }
848 }
849
280c2a77 850 /**
f06e48d8 851 * Chooses a worker node for the next task.
280c2a77 852 *
6c6afb84 853 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 854 *
aa9eede8 855 * @returns The chosen worker node key
280c2a77 856 */
6c6afb84 857 private chooseWorkerNode (): number {
930dcf12 858 if (this.shallCreateDynamicWorker()) {
aa9eede8 859 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
860 if (
861 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
862 ) {
aa9eede8 863 return workerNodeKey
6c6afb84 864 }
17393ac8 865 }
930dcf12
JB
866 return this.workerChoiceStrategyContext.execute()
867 }
868
6c6afb84
JB
869 /**
870 * Conditions for dynamic worker creation.
871 *
872 * @returns Whether to create a dynamic worker or not.
873 */
874 private shallCreateDynamicWorker (): boolean {
930dcf12 875 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
876 }
877
280c2a77 878 /**
aa9eede8 879 * Sends a message to worker given its worker node key.
280c2a77 880 *
aa9eede8 881 * @param workerNodeKey - The worker node key.
38e795c1 882 * @param message - The message.
7d91a8cd 883 * @param transferList - The optional array of transferable objects.
280c2a77
S
884 */
885 protected abstract sendToWorker (
aa9eede8 886 workerNodeKey: number,
7d91a8cd
JB
887 message: MessageValue<Data>,
888 transferList?: TransferListItem[]
280c2a77
S
889 ): void
890
729c563d 891 /**
41344292 892 * Creates a new worker.
6c6afb84
JB
893 *
894 * @returns Newly created worker.
729c563d 895 */
280c2a77 896 protected abstract createWorker (): Worker
c97c7edb 897
4a6952ff 898 /**
aa9eede8 899 * Creates a new, completely set up worker node.
4a6952ff 900 *
aa9eede8 901 * @returns New, completely set up worker node key.
4a6952ff 902 */
aa9eede8 903 protected createAndSetupWorkerNode (): number {
bdacc2d2 904 const worker = this.createWorker()
280c2a77 905
35cf1c03 906 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 907 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 908 worker.on('error', (error) => {
aad6fb64 909 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
910 const workerInfo = this.getWorkerInfo(workerNodeKey)
911 workerInfo.ready = false
0dc838e3 912 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 913 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 914 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 915 if (workerInfo.dynamic) {
aa9eede8 916 this.createAndSetupDynamicWorkerNode()
8a1260a3 917 } else {
aa9eede8 918 this.createAndSetupWorkerNode()
8a1260a3 919 }
5baee0d7 920 }
19dbc45b 921 if (this.opts.enableTasksQueue === true) {
9b106837 922 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 923 }
5baee0d7 924 })
a35560ba
S
925 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
926 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 927 worker.once('exit', () => {
f06e48d8 928 this.removeWorkerNode(worker)
a974afa6 929 })
280c2a77 930
aa9eede8 931 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 932
aa9eede8 933 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 934
aa9eede8 935 return workerNodeKey
c97c7edb 936 }
be0676b3 937
930dcf12 938 /**
aa9eede8 939 * Creates a new, completely set up dynamic worker node.
930dcf12 940 *
aa9eede8 941 * @returns New, completely set up dynamic worker node key.
930dcf12 942 */
aa9eede8
JB
943 protected createAndSetupDynamicWorkerNode (): number {
944 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 945 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
946 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
947 message.workerId
aad6fb64 948 )
aa9eede8 949 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 950 // Kill message received from worker
930dcf12
JB
951 if (
952 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 953 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 954 ((this.opts.enableTasksQueue === false &&
aa9eede8 955 workerUsage.tasks.executing === 0) ||
7b56f532 956 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
957 workerUsage.tasks.executing === 0 &&
958 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 959 ) {
5270d253
JB
960 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
961 this.emitter?.emit(PoolEvents.error, error)
962 })
930dcf12
JB
963 }
964 })
aa9eede8 965 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 966 this.sendToWorker(workerNodeKey, {
b0a4db63 967 checkActive: true,
21f710aa
JB
968 workerId: workerInfo.id as number
969 })
b5e113f6
JB
970 workerInfo.dynamic = true
971 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
972 workerInfo.ready = true
973 }
aa9eede8 974 return workerNodeKey
930dcf12
JB
975 }
976
a2ed5053 977 /**
aa9eede8 978 * Registers a listener callback on the worker given its worker node key.
a2ed5053 979 *
aa9eede8 980 * @param workerNodeKey - The worker node key.
a2ed5053
JB
981 * @param listener - The message listener callback.
982 */
85aeb3f3
JB
983 protected abstract registerWorkerMessageListener<
984 Message extends Data | Response
aa9eede8
JB
985 >(
986 workerNodeKey: number,
987 listener: (message: MessageValue<Message>) => void
988 ): void
a2ed5053
JB
989
990 /**
aa9eede8 991 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
992 * Can be overridden.
993 *
aa9eede8 994 * @param workerNodeKey - The newly created worker node key.
a2ed5053 995 */
aa9eede8 996 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 997 // Listen to worker messages.
aa9eede8 998 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 999 // Send the startup message to worker.
aa9eede8 1000 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1001 // Send the statistics message to worker.
1002 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1003 }
1004
85aeb3f3 1005 /**
aa9eede8
JB
1006 * Sends the startup message to worker given its worker node key.
1007 *
1008 * @param workerNodeKey - The worker node key.
1009 */
1010 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1011
1012 /**
9edb9717 1013 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1014 *
aa9eede8 1015 * @param workerNodeKey - The worker node key.
85aeb3f3 1016 */
9edb9717 1017 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1018 this.sendToWorker(workerNodeKey, {
1019 statistics: {
1020 runTime:
1021 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1022 .runTime.aggregate,
1023 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1024 .elu.aggregate
1025 },
1026 workerId: this.getWorkerInfo(workerNodeKey).id as number
1027 })
1028 }
a2ed5053
JB
1029
1030 private redistributeQueuedTasks (workerNodeKey: number): void {
1031 while (this.tasksQueueSize(workerNodeKey) > 0) {
1032 let targetWorkerNodeKey: number = workerNodeKey
1033 let minQueuedTasks = Infinity
10ecf8fd 1034 let executeTask = false
a2ed5053
JB
1035 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1036 const workerInfo = this.getWorkerInfo(workerNodeId)
1037 if (
1038 workerNodeId !== workerNodeKey &&
1039 workerInfo.ready &&
1040 workerNode.usage.tasks.queued === 0
1041 ) {
a5ed75b7
JB
1042 if (
1043 this.workerNodes[workerNodeId].usage.tasks.executing <
1044 (this.opts.tasksQueueOptions?.concurrency as number)
1045 ) {
10ecf8fd
JB
1046 executeTask = true
1047 }
a2ed5053
JB
1048 targetWorkerNodeKey = workerNodeId
1049 break
1050 }
1051 if (
1052 workerNodeId !== workerNodeKey &&
1053 workerInfo.ready &&
1054 workerNode.usage.tasks.queued < minQueuedTasks
1055 ) {
1056 minQueuedTasks = workerNode.usage.tasks.queued
1057 targetWorkerNodeKey = workerNodeId
1058 }
1059 }
10ecf8fd
JB
1060 if (executeTask) {
1061 this.executeTask(
1062 targetWorkerNodeKey,
1063 this.dequeueTask(workerNodeKey) as Task<Data>
1064 )
1065 } else {
1066 this.enqueueTask(
1067 targetWorkerNodeKey,
1068 this.dequeueTask(workerNodeKey) as Task<Data>
1069 )
1070 }
a2ed5053
JB
1071 }
1072 }
1073
be0676b3 1074 /**
aa9eede8 1075 * This method is the listener registered for each worker message.
be0676b3 1076 *
bdacc2d2 1077 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1078 */
1079 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1080 return (message) => {
21f710aa 1081 this.checkMessageWorkerId(message)
d2c73f82 1082 if (message.ready != null) {
81c02522 1083 // Worker ready response received from worker
10e2aa7e 1084 this.handleWorkerReadyResponse(message)
7629bdf1 1085 } else if (message.taskId != null) {
81c02522 1086 // Task execution response received from worker
6b272951
JB
1087 this.handleTaskExecutionResponse(message)
1088 }
1089 }
1090 }
1091
10e2aa7e 1092 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1093 this.getWorkerInfo(
aad6fb64 1094 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1095 ).ready = message.ready as boolean
2431bdb4
JB
1096 if (this.emitter != null && this.ready) {
1097 this.emitter.emit(PoolEvents.ready, this.info)
1098 }
6b272951
JB
1099 }
1100
1101 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
7629bdf1
JB
1102 const promiseResponse = this.promiseResponseMap.get(
1103 message.taskId as string
1104 )
6b272951
JB
1105 if (promiseResponse != null) {
1106 if (message.taskError != null) {
2a69b8c5 1107 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1108 promiseResponse.reject(message.taskError.message)
1109 } else {
1110 promiseResponse.resolve(message.data as Response)
1111 }
501aea93
JB
1112 const workerNodeKey = promiseResponse.workerNodeKey
1113 this.afterTaskExecutionHook(workerNodeKey, message)
7629bdf1 1114 this.promiseResponseMap.delete(message.taskId as string)
6b272951
JB
1115 if (
1116 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1117 this.tasksQueueSize(workerNodeKey) > 0 &&
1118 this.workerNodes[workerNodeKey].usage.tasks.executing <
1119 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1120 ) {
1121 this.executeTask(
1122 workerNodeKey,
1123 this.dequeueTask(workerNodeKey) as Task<Data>
1124 )
be0676b3 1125 }
6b272951 1126 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1127 }
be0676b3 1128 }
7c0ba920 1129
ff733df7 1130 private checkAndEmitEvents (): void {
1f68cede 1131 if (this.emitter != null) {
ff733df7 1132 if (this.busy) {
2845f2a5 1133 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1134 }
6b27d407 1135 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1136 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1137 }
164d950a
JB
1138 }
1139 }
1140
8a1260a3 1141 /**
aa9eede8 1142 * Gets the worker information given its worker node key.
8a1260a3
JB
1143 *
1144 * @param workerNodeKey - The worker node key.
3f09ed9f 1145 * @returns The worker information.
8a1260a3 1146 */
aa9eede8 1147 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1148 return this.workerNodes[workerNodeKey].info
e221309a
JB
1149 }
1150
a05c10de 1151 /**
b0a4db63 1152 * Adds the given worker in the pool worker nodes.
ea7a90d3 1153 *
38e795c1 1154 * @param worker - The worker.
aa9eede8
JB
1155 * @returns The added worker node key.
1156 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1157 */
b0a4db63 1158 private addWorkerNode (worker: Worker): number {
cc3ab78b 1159 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1160 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1161 if (this.starting) {
1162 workerNode.info.ready = true
1163 }
aa9eede8 1164 this.workerNodes.push(workerNode)
aad6fb64 1165 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1166 if (workerNodeKey === -1) {
1167 throw new Error('Worker node not found')
1168 }
1169 return workerNodeKey
ea7a90d3 1170 }
c923ce56 1171
51fe3d3c 1172 /**
f06e48d8 1173 * Removes the given worker from the pool worker nodes.
51fe3d3c 1174 *
f06e48d8 1175 * @param worker - The worker.
51fe3d3c 1176 */
416fd65c 1177 private removeWorkerNode (worker: Worker): void {
aad6fb64 1178 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1179 if (workerNodeKey !== -1) {
1180 this.workerNodes.splice(workerNodeKey, 1)
1181 this.workerChoiceStrategyContext.remove(workerNodeKey)
1182 }
51fe3d3c 1183 }
adc3c320 1184
b0a4db63 1185 /**
aa9eede8 1186 * Executes the given task on the worker given its worker node key.
b0a4db63 1187 *
aa9eede8 1188 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1189 * @param task - The task to execute.
1190 */
2e81254d 1191 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1192 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1193 this.sendToWorker(workerNodeKey, task, task.transferList)
2e81254d
JB
1194 }
1195
f9f00b5f 1196 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1197 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1198 }
1199
416fd65c 1200 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1201 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1202 }
1203
416fd65c 1204 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1205 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1206 }
1207
81c02522 1208 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1209 while (this.tasksQueueSize(workerNodeKey) > 0) {
1210 this.executeTask(
1211 workerNodeKey,
1212 this.dequeueTask(workerNodeKey) as Task<Data>
1213 )
ff733df7 1214 }
4b628b48 1215 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1216 }
1217
ef41a6e6
JB
1218 private flushTasksQueues (): void {
1219 for (const [workerNodeKey] of this.workerNodes.entries()) {
1220 this.flushTasksQueue(workerNodeKey)
1221 }
1222 }
c97c7edb 1223}