build: silence linter
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
7d91a8cd
JB
31import {
32 type IWorker,
33 type IWorkerNode,
34 type WorkerInfo,
35 type WorkerType,
36 WorkerTypes,
37 type WorkerUsage
e102732c 38} from './worker'
a35560ba 39import {
008512c7 40 type MeasurementStatisticsRequirements,
f0d7f803 41 Measurements,
a35560ba 42 WorkerChoiceStrategies,
a20f0ba5
JB
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
bdaf31cd
JB
45} from './selection-strategies/selection-strategies-types'
46import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 47import { version } from './version'
4b628b48 48import { WorkerNode } from './worker-node'
23ccf9d7 49
729c563d 50/**
ea7a90d3 51 * Base class that implements some shared logic for all poolifier pools.
729c563d 52 *
38e795c1 53 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 56 */
c97c7edb 57export abstract class AbstractPool<
f06e48d8 58 Worker extends IWorker,
d3c8a1a8
S
59 Data = unknown,
60 Response = unknown
c4855468 61> implements IPool<Worker, Data, Response> {
afc003b2 62 /** @inheritDoc */
4b628b48 63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 64
afc003b2 65 /** @inheritDoc */
7c0ba920
JB
66 public readonly emitter?: PoolEmitter
67
be0676b3 68 /**
52b71763 69 * The task execution response promise map.
be0676b3 70 *
2740a743 71 * - `key`: The message id of each submitted task.
a3445496 72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 73 *
a3445496 74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 75 */
501aea93
JB
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
075e51d1 88 /**
adc9cc64 89 * Whether the pool is starting or not.
075e51d1
JB
90 */
91 private readonly starting: boolean
afe0d5bf
JB
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
729c563d
S
97 /**
98 * Constructs a new poolifier pool.
99 *
38e795c1 100 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 101 * @param filePath - Path to the worker file.
38e795c1 102 * @param opts - Options for the pool.
729c563d 103 */
c97c7edb 104 public constructor (
b4213b7f
JB
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
c97c7edb 108 ) {
78cea37e 109 if (!this.isMain()) {
c97c7edb
S
110 throw new Error('Cannot start a pool from a worker!')
111 }
8d3782fa 112 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 113 this.checkFilePath(this.filePath)
7c0ba920 114 this.checkPoolOptions(this.opts)
1086026a 115
7254e419
JB
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 119 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 120 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 121
6bd72cd0 122 if (this.opts.enableEvents === true) {
7c0ba920
JB
123 this.emitter = new PoolEmitter()
124 }
d59df138
JB
125 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
126 Worker,
127 Data,
128 Response
da309861
JB
129 >(
130 this,
131 this.opts.workerChoiceStrategy,
132 this.opts.workerChoiceStrategyOptions
133 )
b6b32453
JB
134
135 this.setupHook()
136
075e51d1 137 this.starting = true
e761c033 138 this.startPool()
075e51d1 139 this.starting = false
afe0d5bf
JB
140
141 this.startTimestamp = performance.now()
c97c7edb
S
142 }
143
a35560ba 144 private checkFilePath (filePath: string): void {
ffcbbad8
JB
145 if (
146 filePath == null ||
3d6dd312 147 typeof filePath !== 'string' ||
ffcbbad8
JB
148 (typeof filePath === 'string' && filePath.trim().length === 0)
149 ) {
c510fea7
APA
150 throw new Error('Please specify a file with a worker implementation')
151 }
3d6dd312
JB
152 if (!existsSync(filePath)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
154 }
c510fea7
APA
155 }
156
8d3782fa
JB
157 private checkNumberOfWorkers (numberOfWorkers: number): void {
158 if (numberOfWorkers == null) {
159 throw new Error(
160 'Cannot instantiate a pool without specifying the number of workers'
161 )
78cea37e 162 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 163 throw new TypeError(
0d80593b 164 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
165 )
166 } else if (numberOfWorkers < 0) {
473c717a 167 throw new RangeError(
8d3782fa
JB
168 'Cannot instantiate a pool with a negative number of workers'
169 )
6b27d407 170 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
172 }
173 }
174
175 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 176 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
177 if (max == null) {
178 throw new Error(
179 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
180 )
181 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
182 throw new TypeError(
183 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
184 )
185 } else if (min > max) {
079de991
JB
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
188 )
b97d82d8 189 } else if (max === 0) {
079de991 190 throw new RangeError(
d640b48b 191 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
192 )
193 } else if (min === max) {
194 throw new RangeError(
195 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
196 )
197 }
8d3782fa
JB
198 }
199 }
200
7c0ba920 201 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
202 if (isPlainObject(opts)) {
203 this.opts.workerChoiceStrategy =
204 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
205 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
206 this.opts.workerChoiceStrategyOptions =
207 opts.workerChoiceStrategyOptions ??
208 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
209 this.checkValidWorkerChoiceStrategyOptions(
210 this.opts.workerChoiceStrategyOptions
211 )
1f68cede 212 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
213 this.opts.enableEvents = opts.enableEvents ?? true
214 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
215 if (this.opts.enableTasksQueue) {
216 this.checkValidTasksQueueOptions(
217 opts.tasksQueueOptions as TasksQueueOptions
218 )
219 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
220 opts.tasksQueueOptions as TasksQueueOptions
221 )
222 }
223 } else {
224 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 225 }
aee46736
JB
226 }
227
228 private checkValidWorkerChoiceStrategy (
229 workerChoiceStrategy: WorkerChoiceStrategy
230 ): void {
231 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 232 throw new Error(
aee46736 233 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
234 )
235 }
7c0ba920
JB
236 }
237
0d80593b
JB
238 private checkValidWorkerChoiceStrategyOptions (
239 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
240 ): void {
241 if (!isPlainObject(workerChoiceStrategyOptions)) {
242 throw new TypeError(
243 'Invalid worker choice strategy options: must be a plain object'
244 )
245 }
49be33fe
JB
246 if (
247 workerChoiceStrategyOptions.weights != null &&
6b27d407 248 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
249 ) {
250 throw new Error(
251 'Invalid worker choice strategy options: must have a weight for each worker node'
252 )
253 }
f0d7f803
JB
254 if (
255 workerChoiceStrategyOptions.measurement != null &&
256 !Object.values(Measurements).includes(
257 workerChoiceStrategyOptions.measurement
258 )
259 ) {
260 throw new Error(
261 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
262 )
263 }
0d80593b
JB
264 }
265
a20f0ba5
JB
266 private checkValidTasksQueueOptions (
267 tasksQueueOptions: TasksQueueOptions
268 ): void {
0d80593b
JB
269 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
270 throw new TypeError('Invalid tasks queue options: must be a plain object')
271 }
f0d7f803
JB
272 if (
273 tasksQueueOptions?.concurrency != null &&
274 !Number.isSafeInteger(tasksQueueOptions.concurrency)
275 ) {
276 throw new TypeError(
277 'Invalid worker tasks concurrency: must be an integer'
278 )
279 }
280 if (
281 tasksQueueOptions?.concurrency != null &&
282 tasksQueueOptions.concurrency <= 0
283 ) {
a20f0ba5 284 throw new Error(
f0d7f803 285 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
286 )
287 }
288 }
289
e761c033
JB
290 private startPool (): void {
291 while (
292 this.workerNodes.reduce(
293 (accumulator, workerNode) =>
294 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
295 0
296 ) < this.numberOfWorkers
297 ) {
aa9eede8 298 this.createAndSetupWorkerNode()
e761c033
JB
299 }
300 }
301
08f3f44c 302 /** @inheritDoc */
6b27d407
JB
303 public get info (): PoolInfo {
304 return {
23ccf9d7 305 version,
6b27d407 306 type: this.type,
184855e6 307 worker: this.worker,
2431bdb4
JB
308 ready: this.ready,
309 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
310 minSize: this.minSize,
311 maxSize: this.maxSize,
c05f0d50
JB
312 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
313 .runTime.aggregate &&
1305e9a8
JB
314 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
315 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
316 workerNodes: this.workerNodes.length,
317 idleWorkerNodes: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
f59e1027 319 workerNode.usage.tasks.executing === 0
a4e07f72
JB
320 ? accumulator + 1
321 : accumulator,
6b27d407
JB
322 0
323 ),
324 busyWorkerNodes: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
f59e1027 326 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
327 0
328 ),
a4e07f72 329 executedTasks: this.workerNodes.reduce(
6b27d407 330 (accumulator, workerNode) =>
f59e1027 331 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
332 0
333 ),
334 executingTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
f59e1027 336 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
337 0
338 ),
daf86646
JB
339 ...(this.opts.enableTasksQueue === true && {
340 queuedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.queued,
343 0
344 )
345 }),
346 ...(this.opts.enableTasksQueue === true && {
347 maxQueuedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
349 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
350 0
351 )
352 }),
a4e07f72
JB
353 failedTasks: this.workerNodes.reduce(
354 (accumulator, workerNode) =>
f59e1027 355 accumulator + workerNode.usage.tasks.failed,
a4e07f72 356 0
1dcf8b7b
JB
357 ),
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .runTime.aggregate && {
360 runTime: {
98e72cda
JB
361 minimum: round(
362 Math.min(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
365 )
1dcf8b7b
JB
366 )
367 ),
98e72cda
JB
368 maximum: round(
369 Math.max(
370 ...this.workerNodes.map(
371 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
372 )
1dcf8b7b 373 )
98e72cda
JB
374 ),
375 average: round(
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
379 0
380 ) /
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.tasks?.executed ?? 0),
384 0
385 )
386 ),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .runTime.median && {
389 median: round(
390 median(
391 this.workerNodes.map(
392 workerNode => workerNode.usage.runTime?.median ?? 0
393 )
394 )
395 )
396 })
1dcf8b7b
JB
397 }
398 }),
399 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
400 .waitTime.aggregate && {
401 waitTime: {
98e72cda
JB
402 minimum: round(
403 Math.min(
404 ...this.workerNodes.map(
405 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
406 )
1dcf8b7b
JB
407 )
408 ),
98e72cda
JB
409 maximum: round(
410 Math.max(
411 ...this.workerNodes.map(
412 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
413 )
1dcf8b7b 414 )
98e72cda
JB
415 ),
416 average: round(
417 this.workerNodes.reduce(
418 (accumulator, workerNode) =>
419 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
420 0
421 ) /
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.tasks?.executed ?? 0),
425 0
426 )
427 ),
428 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
429 .waitTime.median && {
430 median: round(
431 median(
432 this.workerNodes.map(
433 workerNode => workerNode.usage.waitTime?.median ?? 0
434 )
435 )
436 )
437 })
1dcf8b7b
JB
438 }
439 })
6b27d407
JB
440 }
441 }
08f3f44c 442
aa9eede8
JB
443 /**
444 * The pool readiness boolean status.
445 */
2431bdb4
JB
446 private get ready (): boolean {
447 return (
b97d82d8
JB
448 this.workerNodes.reduce(
449 (accumulator, workerNode) =>
450 !workerNode.info.dynamic && workerNode.info.ready
451 ? accumulator + 1
452 : accumulator,
453 0
454 ) >= this.minSize
2431bdb4
JB
455 )
456 }
457
afe0d5bf 458 /**
aa9eede8 459 * The approximate pool utilization.
afe0d5bf
JB
460 *
461 * @returns The pool utilization.
462 */
463 private get utilization (): number {
8e5ca040 464 const poolTimeCapacity =
fe7d90db 465 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
466 const totalTasksRunTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
71514351 468 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
469 0
470 )
471 const totalTasksWaitTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
71514351 473 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
474 0
475 )
8e5ca040 476 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
477 }
478
8881ae32 479 /**
aa9eede8 480 * The pool type.
8881ae32
JB
481 *
482 * If it is `'dynamic'`, it provides the `max` property.
483 */
484 protected abstract get type (): PoolType
485
184855e6 486 /**
aa9eede8 487 * The worker type.
184855e6
JB
488 */
489 protected abstract get worker (): WorkerType
490
c2ade475 491 /**
aa9eede8 492 * The pool minimum size.
c2ade475 493 */
6b27d407 494 protected abstract get minSize (): number
ff733df7
JB
495
496 /**
aa9eede8 497 * The pool maximum size.
ff733df7 498 */
6b27d407 499 protected abstract get maxSize (): number
a35560ba 500
6b813701
JB
501 /**
502 * Checks if the worker id sent in the received message from a worker is valid.
503 *
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
506 */
21f710aa
JB
507 private checkMessageWorkerId (message: MessageValue<Response>): void {
508 if (
509 message.workerId != null &&
aad6fb64 510 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
511 ) {
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
ffcbbad8 518 /**
f06e48d8 519 * Gets the given worker its worker node key.
ffcbbad8
JB
520 *
521 * @param worker - The worker.
f59e1027 522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 523 */
aad6fb64 524 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8
JB
525 return this.workerNodes.findIndex(
526 workerNode => workerNode.worker === worker
527 )
bf9549ae
JB
528 }
529
aa9eede8
JB
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
aad6fb64 534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 535 */
aad6fb64
JB
536 private getWorkerNodeKeyByWorkerId (workerId: number): number {
537 return this.workerNodes.findIndex(
538 workerNode => workerNode.info.id === workerId
539 )
aa9eede8
JB
540 }
541
afc003b2 542 /** @inheritDoc */
a35560ba 543 public setWorkerChoiceStrategy (
59219cbb
JB
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 546 ): void {
aee46736 547 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 548 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
aa9eede8 555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 556 workerNode.resetUsage()
aa9eede8 557 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 558 }
a20f0ba5
JB
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
0d80593b 565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
566 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
567 this.workerChoiceStrategyContext.setOptions(
568 this.opts.workerChoiceStrategyOptions
a35560ba
S
569 )
570 }
571
a20f0ba5 572 /** @inheritDoc */
8f52842f
JB
573 public enableTasksQueue (
574 enable: boolean,
575 tasksQueueOptions?: TasksQueueOptions
576 ): void {
a20f0ba5 577 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 578 this.flushTasksQueues()
a20f0ba5
JB
579 }
580 this.opts.enableTasksQueue = enable
8f52842f 581 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
582 }
583
584 /** @inheritDoc */
8f52842f 585 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 586 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
587 this.checkValidTasksQueueOptions(tasksQueueOptions)
588 this.opts.tasksQueueOptions =
589 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 590 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
591 delete this.opts.tasksQueueOptions
592 }
593 }
594
595 private buildTasksQueueOptions (
596 tasksQueueOptions: TasksQueueOptions
597 ): TasksQueueOptions {
598 return {
599 concurrency: tasksQueueOptions?.concurrency ?? 1
600 }
601 }
602
c319c66b
JB
603 /**
604 * Whether the pool is full or not.
605 *
606 * The pool filling boolean status.
607 */
dea903a8
JB
608 protected get full (): boolean {
609 return this.workerNodes.length >= this.maxSize
610 }
c2ade475 611
c319c66b
JB
612 /**
613 * Whether the pool is busy or not.
614 *
615 * The pool busyness boolean status.
616 */
617 protected abstract get busy (): boolean
7c0ba920 618
6c6afb84 619 /**
3d76750a 620 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
621 *
622 * @returns Worker nodes busyness boolean status.
623 */
c2ade475 624 protected internalBusy (): boolean {
3d76750a
JB
625 if (this.opts.enableTasksQueue === true) {
626 return (
627 this.workerNodes.findIndex(
628 workerNode =>
629 workerNode.info.ready &&
630 workerNode.usage.tasks.executing <
631 (this.opts.tasksQueueOptions?.concurrency as number)
632 ) === -1
633 )
634 } else {
635 return (
636 this.workerNodes.findIndex(
637 workerNode =>
638 workerNode.info.ready && workerNode.usage.tasks.executing === 0
639 ) === -1
640 )
641 }
cb70b19d
JB
642 }
643
afc003b2 644 /** @inheritDoc */
7d91a8cd
JB
645 public async execute (
646 data?: Data,
647 name?: string,
648 transferList?: TransferListItem[]
649 ): Promise<Response> {
52b71763 650 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
651 if (name != null && typeof name !== 'string') {
652 reject(new TypeError('name argument must be a string'))
653 }
654 if (transferList != null && !Array.isArray(transferList)) {
655 reject(new TypeError('transferList argument must be an array'))
656 }
52b71763
JB
657 const timestamp = performance.now()
658 const workerNodeKey = this.chooseWorkerNode()
501aea93 659 const task: Task<Data> = {
52b71763
JB
660 name: name ?? DEFAULT_TASK_NAME,
661 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
662 data: data ?? ({} as Data),
7d91a8cd 663 transferList,
52b71763
JB
664 timestamp,
665 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 666 taskId: randomUUID()
52b71763 667 }
7629bdf1 668 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
669 resolve,
670 reject,
501aea93 671 workerNodeKey
2e81254d 672 })
52b71763 673 if (
4e377863
JB
674 this.opts.enableTasksQueue === false ||
675 (this.opts.enableTasksQueue === true &&
676 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 677 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 678 ) {
501aea93 679 this.executeTask(workerNodeKey, task)
4e377863
JB
680 } else {
681 this.enqueueTask(workerNodeKey, task)
52b71763
JB
682 }
683 this.checkAndEmitEvents()
2e81254d 684 })
280c2a77 685 }
c97c7edb 686
afc003b2 687 /** @inheritDoc */
c97c7edb 688 public async destroy (): Promise<void> {
1fbcaa7c 689 await Promise.all(
81c02522 690 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 691 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
692 })
693 )
c97c7edb
S
694 }
695
4a6952ff 696 /**
aa9eede8 697 * Terminates the worker node given its worker node key.
4a6952ff 698 *
aa9eede8 699 * @param workerNodeKey - The worker node key.
4a6952ff 700 */
81c02522 701 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 702
729c563d 703 /**
6677a3d3
JB
704 * Setup hook to execute code before worker nodes are created in the abstract constructor.
705 * Can be overridden.
afc003b2
JB
706 *
707 * @virtual
729c563d 708 */
280c2a77 709 protected setupHook (): void {
d99ba5a8 710 // Intentionally empty
280c2a77 711 }
c97c7edb 712
729c563d 713 /**
280c2a77
S
714 * Should return whether the worker is the main worker or not.
715 */
716 protected abstract isMain (): boolean
717
718 /**
2e81254d 719 * Hook executed before the worker task execution.
bf9549ae 720 * Can be overridden.
729c563d 721 *
f06e48d8 722 * @param workerNodeKey - The worker node key.
1c6fe997 723 * @param task - The task to execute.
729c563d 724 */
1c6fe997
JB
725 protected beforeTaskExecutionHook (
726 workerNodeKey: number,
727 task: Task<Data>
728 ): void {
f59e1027 729 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
730 ++workerUsage.tasks.executing
731 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 732 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
733 task.name as string
734 ) as WorkerUsage
eb8afc8a
JB
735 ++taskWorkerUsage.tasks.executing
736 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
737 }
738
c01733f1 739 /**
2e81254d 740 * Hook executed after the worker task execution.
bf9549ae 741 * Can be overridden.
c01733f1 742 *
501aea93 743 * @param workerNodeKey - The worker node key.
38e795c1 744 * @param message - The received message.
c01733f1 745 */
2e81254d 746 protected afterTaskExecutionHook (
501aea93 747 workerNodeKey: number,
2740a743 748 message: MessageValue<Response>
bf9549ae 749 ): void {
ff128cc9 750 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
751 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
752 this.updateRunTimeWorkerUsage(workerUsage, message)
753 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 754 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 755 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 756 ) as WorkerUsage
eb8afc8a
JB
757 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
758 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
759 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
760 }
761
762 private updateTaskStatisticsWorkerUsage (
763 workerUsage: WorkerUsage,
764 message: MessageValue<Response>
765 ): void {
a4e07f72
JB
766 const workerTaskStatistics = workerUsage.tasks
767 --workerTaskStatistics.executing
98e72cda
JB
768 if (message.taskError == null) {
769 ++workerTaskStatistics.executed
770 } else {
a4e07f72 771 ++workerTaskStatistics.failed
2740a743 772 }
f8eb0a2a
JB
773 }
774
a4e07f72
JB
775 private updateRunTimeWorkerUsage (
776 workerUsage: WorkerUsage,
f8eb0a2a
JB
777 message: MessageValue<Response>
778 ): void {
e4f20deb
JB
779 updateMeasurementStatistics(
780 workerUsage.runTime,
781 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
782 message.taskPerformance?.runTime ?? 0,
783 workerUsage.tasks.executed
784 )
f8eb0a2a
JB
785 }
786
a4e07f72
JB
787 private updateWaitTimeWorkerUsage (
788 workerUsage: WorkerUsage,
1c6fe997 789 task: Task<Data>
f8eb0a2a 790 ): void {
1c6fe997
JB
791 const timestamp = performance.now()
792 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
793 updateMeasurementStatistics(
794 workerUsage.waitTime,
795 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
796 taskWaitTime,
797 workerUsage.tasks.executed
798 )
c01733f1 799 }
800
a4e07f72 801 private updateEluWorkerUsage (
5df69fab 802 workerUsage: WorkerUsage,
62c15a68
JB
803 message: MessageValue<Response>
804 ): void {
008512c7
JB
805 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
806 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
807 updateMeasurementStatistics(
808 workerUsage.elu.active,
008512c7 809 eluTaskStatisticsRequirements,
e4f20deb
JB
810 message.taskPerformance?.elu?.active ?? 0,
811 workerUsage.tasks.executed
812 )
813 updateMeasurementStatistics(
814 workerUsage.elu.idle,
008512c7 815 eluTaskStatisticsRequirements,
e4f20deb
JB
816 message.taskPerformance?.elu?.idle ?? 0,
817 workerUsage.tasks.executed
818 )
008512c7 819 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 820 if (message.taskPerformance?.elu != null) {
f7510105
JB
821 if (workerUsage.elu.utilization != null) {
822 workerUsage.elu.utilization =
823 (workerUsage.elu.utilization +
824 message.taskPerformance.elu.utilization) /
825 2
826 } else {
827 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
828 }
62c15a68
JB
829 }
830 }
831 }
832
280c2a77 833 /**
f06e48d8 834 * Chooses a worker node for the next task.
280c2a77 835 *
6c6afb84 836 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 837 *
aa9eede8 838 * @returns The chosen worker node key
280c2a77 839 */
6c6afb84 840 private chooseWorkerNode (): number {
930dcf12 841 if (this.shallCreateDynamicWorker()) {
aa9eede8 842 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
843 if (
844 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
845 ) {
aa9eede8 846 return workerNodeKey
6c6afb84 847 }
17393ac8 848 }
930dcf12
JB
849 return this.workerChoiceStrategyContext.execute()
850 }
851
6c6afb84
JB
852 /**
853 * Conditions for dynamic worker creation.
854 *
855 * @returns Whether to create a dynamic worker or not.
856 */
857 private shallCreateDynamicWorker (): boolean {
930dcf12 858 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
859 }
860
280c2a77 861 /**
aa9eede8 862 * Sends a message to worker given its worker node key.
280c2a77 863 *
aa9eede8 864 * @param workerNodeKey - The worker node key.
38e795c1 865 * @param message - The message.
7d91a8cd 866 * @param transferList - The optional array of transferable objects.
280c2a77
S
867 */
868 protected abstract sendToWorker (
aa9eede8 869 workerNodeKey: number,
7d91a8cd
JB
870 message: MessageValue<Data>,
871 transferList?: TransferListItem[]
280c2a77
S
872 ): void
873
729c563d 874 /**
41344292 875 * Creates a new worker.
6c6afb84
JB
876 *
877 * @returns Newly created worker.
729c563d 878 */
280c2a77 879 protected abstract createWorker (): Worker
c97c7edb 880
4a6952ff 881 /**
aa9eede8 882 * Creates a new, completely set up worker node.
4a6952ff 883 *
aa9eede8 884 * @returns New, completely set up worker node key.
4a6952ff 885 */
aa9eede8 886 protected createAndSetupWorkerNode (): number {
bdacc2d2 887 const worker = this.createWorker()
280c2a77 888
35cf1c03 889 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 890 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 891 worker.on('error', error => {
aad6fb64 892 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
893 const workerInfo = this.getWorkerInfo(workerNodeKey)
894 workerInfo.ready = false
0dc838e3 895 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 896 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 897 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 898 if (workerInfo.dynamic) {
aa9eede8 899 this.createAndSetupDynamicWorkerNode()
8a1260a3 900 } else {
aa9eede8 901 this.createAndSetupWorkerNode()
8a1260a3 902 }
5baee0d7 903 }
19dbc45b 904 if (this.opts.enableTasksQueue === true) {
9b106837 905 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 906 }
5baee0d7 907 })
a35560ba
S
908 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
909 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 910 worker.once('exit', () => {
f06e48d8 911 this.removeWorkerNode(worker)
a974afa6 912 })
280c2a77 913
aa9eede8 914 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 915
aa9eede8 916 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 917
aa9eede8 918 return workerNodeKey
c97c7edb 919 }
be0676b3 920
930dcf12 921 /**
aa9eede8 922 * Creates a new, completely set up dynamic worker node.
930dcf12 923 *
aa9eede8 924 * @returns New, completely set up dynamic worker node key.
930dcf12 925 */
aa9eede8
JB
926 protected createAndSetupDynamicWorkerNode (): number {
927 const workerNodeKey = this.createAndSetupWorkerNode()
928 this.registerWorkerMessageListener(workerNodeKey, message => {
929 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
930 message.workerId
aad6fb64 931 )
aa9eede8 932 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 933 // Kill message received from worker
930dcf12
JB
934 if (
935 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
936 (message.kill != null &&
937 ((this.opts.enableTasksQueue === false &&
aa9eede8 938 workerUsage.tasks.executing === 0) ||
7b56f532 939 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
940 workerUsage.tasks.executing === 0 &&
941 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 942 ) {
81c02522 943 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930dcf12
JB
944 }
945 })
aa9eede8 946 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 947 this.sendToWorker(workerNodeKey, {
b0a4db63 948 checkActive: true,
21f710aa
JB
949 workerId: workerInfo.id as number
950 })
b5e113f6
JB
951 workerInfo.dynamic = true
952 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
953 workerInfo.ready = true
954 }
aa9eede8 955 return workerNodeKey
930dcf12
JB
956 }
957
a2ed5053 958 /**
aa9eede8 959 * Registers a listener callback on the worker given its worker node key.
a2ed5053 960 *
aa9eede8 961 * @param workerNodeKey - The worker node key.
a2ed5053
JB
962 * @param listener - The message listener callback.
963 */
85aeb3f3
JB
964 protected abstract registerWorkerMessageListener<
965 Message extends Data | Response
aa9eede8
JB
966 >(
967 workerNodeKey: number,
968 listener: (message: MessageValue<Message>) => void
969 ): void
a2ed5053
JB
970
971 /**
aa9eede8 972 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
973 * Can be overridden.
974 *
aa9eede8 975 * @param workerNodeKey - The newly created worker node key.
a2ed5053 976 */
aa9eede8 977 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 978 // Listen to worker messages.
aa9eede8 979 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 980 // Send the startup message to worker.
aa9eede8
JB
981 this.sendStartupMessageToWorker(workerNodeKey)
982 // Send the worker statistics message to worker.
983 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
984 }
985
85aeb3f3 986 /**
aa9eede8
JB
987 * Sends the startup message to worker given its worker node key.
988 *
989 * @param workerNodeKey - The worker node key.
990 */
991 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
992
993 /**
994 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 995 *
aa9eede8 996 * @param workerNodeKey - The worker node key.
85aeb3f3 997 */
aa9eede8
JB
998 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
999 this.sendToWorker(workerNodeKey, {
1000 statistics: {
1001 runTime:
1002 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1003 .runTime.aggregate,
1004 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1005 .elu.aggregate
1006 },
1007 workerId: this.getWorkerInfo(workerNodeKey).id as number
1008 })
1009 }
a2ed5053
JB
1010
1011 private redistributeQueuedTasks (workerNodeKey: number): void {
1012 while (this.tasksQueueSize(workerNodeKey) > 0) {
1013 let targetWorkerNodeKey: number = workerNodeKey
1014 let minQueuedTasks = Infinity
10ecf8fd 1015 let executeTask = false
a2ed5053
JB
1016 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1017 const workerInfo = this.getWorkerInfo(workerNodeId)
1018 if (
1019 workerNodeId !== workerNodeKey &&
1020 workerInfo.ready &&
1021 workerNode.usage.tasks.queued === 0
1022 ) {
a5ed75b7
JB
1023 if (
1024 this.workerNodes[workerNodeId].usage.tasks.executing <
1025 (this.opts.tasksQueueOptions?.concurrency as number)
1026 ) {
10ecf8fd
JB
1027 executeTask = true
1028 }
a2ed5053
JB
1029 targetWorkerNodeKey = workerNodeId
1030 break
1031 }
1032 if (
1033 workerNodeId !== workerNodeKey &&
1034 workerInfo.ready &&
1035 workerNode.usage.tasks.queued < minQueuedTasks
1036 ) {
1037 minQueuedTasks = workerNode.usage.tasks.queued
1038 targetWorkerNodeKey = workerNodeId
1039 }
1040 }
10ecf8fd
JB
1041 if (executeTask) {
1042 this.executeTask(
1043 targetWorkerNodeKey,
1044 this.dequeueTask(workerNodeKey) as Task<Data>
1045 )
1046 } else {
1047 this.enqueueTask(
1048 targetWorkerNodeKey,
1049 this.dequeueTask(workerNodeKey) as Task<Data>
1050 )
1051 }
a2ed5053
JB
1052 }
1053 }
1054
be0676b3 1055 /**
aa9eede8 1056 * This method is the listener registered for each worker message.
be0676b3 1057 *
bdacc2d2 1058 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1059 */
1060 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1061 return message => {
21f710aa 1062 this.checkMessageWorkerId(message)
d2c73f82 1063 if (message.ready != null) {
81c02522 1064 // Worker ready response received from worker
10e2aa7e 1065 this.handleWorkerReadyResponse(message)
7629bdf1 1066 } else if (message.taskId != null) {
81c02522 1067 // Task execution response received from worker
6b272951
JB
1068 this.handleTaskExecutionResponse(message)
1069 }
1070 }
1071 }
1072
10e2aa7e 1073 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1074 this.getWorkerInfo(
aad6fb64 1075 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1076 ).ready = message.ready as boolean
2431bdb4
JB
1077 if (this.emitter != null && this.ready) {
1078 this.emitter.emit(PoolEvents.ready, this.info)
1079 }
6b272951
JB
1080 }
1081
1082 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
7629bdf1
JB
1083 const promiseResponse = this.promiseResponseMap.get(
1084 message.taskId as string
1085 )
6b272951
JB
1086 if (promiseResponse != null) {
1087 if (message.taskError != null) {
2a69b8c5 1088 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1089 promiseResponse.reject(message.taskError.message)
1090 } else {
1091 promiseResponse.resolve(message.data as Response)
1092 }
501aea93
JB
1093 const workerNodeKey = promiseResponse.workerNodeKey
1094 this.afterTaskExecutionHook(workerNodeKey, message)
7629bdf1 1095 this.promiseResponseMap.delete(message.taskId as string)
6b272951
JB
1096 if (
1097 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1098 this.tasksQueueSize(workerNodeKey) > 0 &&
1099 this.workerNodes[workerNodeKey].usage.tasks.executing <
1100 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1101 ) {
1102 this.executeTask(
1103 workerNodeKey,
1104 this.dequeueTask(workerNodeKey) as Task<Data>
1105 )
be0676b3 1106 }
6b272951 1107 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1108 }
be0676b3 1109 }
7c0ba920 1110
ff733df7 1111 private checkAndEmitEvents (): void {
1f68cede 1112 if (this.emitter != null) {
ff733df7 1113 if (this.busy) {
2845f2a5 1114 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1115 }
6b27d407 1116 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1117 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1118 }
164d950a
JB
1119 }
1120 }
1121
8a1260a3 1122 /**
aa9eede8 1123 * Gets the worker information given its worker node key.
8a1260a3
JB
1124 *
1125 * @param workerNodeKey - The worker node key.
3f09ed9f 1126 * @returns The worker information.
8a1260a3 1127 */
aa9eede8 1128 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1129 return this.workerNodes[workerNodeKey].info
e221309a
JB
1130 }
1131
a05c10de 1132 /**
b0a4db63 1133 * Adds the given worker in the pool worker nodes.
ea7a90d3 1134 *
38e795c1 1135 * @param worker - The worker.
aa9eede8
JB
1136 * @returns The added worker node key.
1137 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1138 */
b0a4db63 1139 private addWorkerNode (worker: Worker): number {
cc3ab78b 1140 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1141 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1142 if (this.starting) {
1143 workerNode.info.ready = true
1144 }
aa9eede8 1145 this.workerNodes.push(workerNode)
aad6fb64 1146 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1147 if (workerNodeKey === -1) {
1148 throw new Error('Worker node not found')
1149 }
1150 return workerNodeKey
ea7a90d3 1151 }
c923ce56 1152
51fe3d3c 1153 /**
f06e48d8 1154 * Removes the given worker from the pool worker nodes.
51fe3d3c 1155 *
f06e48d8 1156 * @param worker - The worker.
51fe3d3c 1157 */
416fd65c 1158 private removeWorkerNode (worker: Worker): void {
aad6fb64 1159 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1160 if (workerNodeKey !== -1) {
1161 this.workerNodes.splice(workerNodeKey, 1)
1162 this.workerChoiceStrategyContext.remove(workerNodeKey)
1163 }
51fe3d3c 1164 }
adc3c320 1165
b0a4db63 1166 /**
aa9eede8 1167 * Executes the given task on the worker given its worker node key.
b0a4db63 1168 *
aa9eede8 1169 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1170 * @param task - The task to execute.
1171 */
2e81254d 1172 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1173 this.beforeTaskExecutionHook(workerNodeKey, task)
7d91a8cd
JB
1174 this.sendToWorker(
1175 workerNodeKey,
1176 task,
1177 this.worker === WorkerTypes.thread && task.transferList != null
1178 ? task.transferList
1179 : undefined
1180 )
2e81254d
JB
1181 }
1182
f9f00b5f 1183 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1184 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1185 }
1186
416fd65c 1187 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1188 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1189 }
1190
416fd65c 1191 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1192 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1193 }
1194
81c02522 1195 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1196 while (this.tasksQueueSize(workerNodeKey) > 0) {
1197 this.executeTask(
1198 workerNodeKey,
1199 this.dequeueTask(workerNodeKey) as Task<Data>
1200 )
ff733df7 1201 }
4b628b48 1202 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1203 }
1204
ef41a6e6
JB
1205 private flushTasksQueues (): void {
1206 for (const [workerNodeKey] of this.workerNodes.entries()) {
1207 this.flushTasksQueue(workerNodeKey)
1208 }
1209 }
c97c7edb 1210}