refactor: cleanup main worker checks
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
c97c7edb
S
109 throw new Error('Cannot start a pool from a worker!')
110 }
8d3782fa 111 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 112 this.checkFilePath(this.filePath)
7c0ba920 113 this.checkPoolOptions(this.opts)
1086026a 114
7254e419
JB
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 118 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 175 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
176 if (max == null) {
177 throw new Error(
178 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
179 )
180 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
181 throw new TypeError(
182 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 )
184 } else if (min > max) {
079de991
JB
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
187 )
b97d82d8 188 } else if (max === 0) {
079de991 189 throw new RangeError(
d640b48b 190 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
191 )
192 } else if (min === max) {
193 throw new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 }
8d3782fa
JB
197 }
198 }
199
7c0ba920 200 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
201 if (isPlainObject(opts)) {
202 this.opts.workerChoiceStrategy =
203 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
204 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
205 this.opts.workerChoiceStrategyOptions =
206 opts.workerChoiceStrategyOptions ??
207 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
208 this.checkValidWorkerChoiceStrategyOptions(
209 this.opts.workerChoiceStrategyOptions
210 )
1f68cede 211 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
212 this.opts.enableEvents = opts.enableEvents ?? true
213 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
214 if (this.opts.enableTasksQueue) {
215 this.checkValidTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 }
222 } else {
223 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 224 }
aee46736
JB
225 }
226
227 private checkValidWorkerChoiceStrategy (
228 workerChoiceStrategy: WorkerChoiceStrategy
229 ): void {
230 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 231 throw new Error(
aee46736 232 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
233 )
234 }
7c0ba920
JB
235 }
236
0d80593b
JB
237 private checkValidWorkerChoiceStrategyOptions (
238 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
239 ): void {
240 if (!isPlainObject(workerChoiceStrategyOptions)) {
241 throw new TypeError(
242 'Invalid worker choice strategy options: must be a plain object'
243 )
244 }
49be33fe
JB
245 if (
246 workerChoiceStrategyOptions.weights != null &&
6b27d407 247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
f0d7f803
JB
253 if (
254 workerChoiceStrategyOptions.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
0d80593b
JB
263 }
264
a20f0ba5
JB
265 private checkValidTasksQueueOptions (
266 tasksQueueOptions: TasksQueueOptions
267 ): void {
0d80593b
JB
268 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
269 throw new TypeError('Invalid tasks queue options: must be a plain object')
270 }
f0d7f803
JB
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 !Number.isSafeInteger(tasksQueueOptions.concurrency)
274 ) {
275 throw new TypeError(
276 'Invalid worker tasks concurrency: must be an integer'
277 )
278 }
279 if (
280 tasksQueueOptions?.concurrency != null &&
281 tasksQueueOptions.concurrency <= 0
282 ) {
a20f0ba5 283 throw new Error(
f0d7f803 284 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
285 )
286 }
287 }
288
e761c033
JB
289 private startPool (): void {
290 while (
291 this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
294 0
295 ) < this.numberOfWorkers
296 ) {
aa9eede8 297 this.createAndSetupWorkerNode()
e761c033
JB
298 }
299 }
300
08f3f44c 301 /** @inheritDoc */
6b27d407
JB
302 public get info (): PoolInfo {
303 return {
23ccf9d7 304 version,
6b27d407 305 type: this.type,
184855e6 306 worker: this.worker,
2431bdb4
JB
307 ready: this.ready,
308 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
309 minSize: this.minSize,
310 maxSize: this.maxSize,
c05f0d50
JB
311 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
312 .runTime.aggregate &&
1305e9a8
JB
313 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
315 workerNodes: this.workerNodes.length,
316 idleWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
f59e1027 318 workerNode.usage.tasks.executing === 0
a4e07f72
JB
319 ? accumulator + 1
320 : accumulator,
6b27d407
JB
321 0
322 ),
323 busyWorkerNodes: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
f59e1027 325 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
326 0
327 ),
a4e07f72 328 executedTasks: this.workerNodes.reduce(
6b27d407 329 (accumulator, workerNode) =>
f59e1027 330 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
331 0
332 ),
333 executingTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
f59e1027 335 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
336 0
337 ),
daf86646
JB
338 ...(this.opts.enableTasksQueue === true && {
339 queuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
341 accumulator + workerNode.usage.tasks.queued,
342 0
343 )
344 }),
345 ...(this.opts.enableTasksQueue === true && {
346 maxQueuedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
349 0
350 )
351 }),
a4e07f72
JB
352 failedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
f59e1027 354 accumulator + workerNode.usage.tasks.failed,
a4e07f72 355 0
1dcf8b7b
JB
356 ),
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.aggregate && {
359 runTime: {
98e72cda
JB
360 minimum: round(
361 Math.min(
362 ...this.workerNodes.map(
8ebe6c30 363 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 364 )
1dcf8b7b
JB
365 )
366 ),
98e72cda
JB
367 maximum: round(
368 Math.max(
369 ...this.workerNodes.map(
8ebe6c30 370 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 371 )
1dcf8b7b 372 )
98e72cda
JB
373 ),
374 average: round(
375 this.workerNodes.reduce(
376 (accumulator, workerNode) =>
377 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
378 0
379 ) /
380 this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + (workerNode.usage.tasks?.executed ?? 0),
383 0
384 )
385 ),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .runTime.median && {
388 median: round(
389 median(
390 this.workerNodes.map(
8ebe6c30 391 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
392 )
393 )
394 )
395 })
1dcf8b7b
JB
396 }
397 }),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.aggregate && {
400 waitTime: {
98e72cda
JB
401 minimum: round(
402 Math.min(
403 ...this.workerNodes.map(
8ebe6c30 404 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 405 )
1dcf8b7b
JB
406 )
407 ),
98e72cda
JB
408 maximum: round(
409 Math.max(
410 ...this.workerNodes.map(
8ebe6c30 411 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 412 )
1dcf8b7b 413 )
98e72cda
JB
414 ),
415 average: round(
416 this.workerNodes.reduce(
417 (accumulator, workerNode) =>
418 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
419 0
420 ) /
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.tasks?.executed ?? 0),
424 0
425 )
426 ),
427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
428 .waitTime.median && {
429 median: round(
430 median(
431 this.workerNodes.map(
8ebe6c30 432 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
433 )
434 )
435 )
436 })
1dcf8b7b
JB
437 }
438 })
6b27d407
JB
439 }
440 }
08f3f44c 441
aa9eede8
JB
442 /**
443 * The pool readiness boolean status.
444 */
2431bdb4
JB
445 private get ready (): boolean {
446 return (
b97d82d8
JB
447 this.workerNodes.reduce(
448 (accumulator, workerNode) =>
449 !workerNode.info.dynamic && workerNode.info.ready
450 ? accumulator + 1
451 : accumulator,
452 0
453 ) >= this.minSize
2431bdb4
JB
454 )
455 }
456
afe0d5bf 457 /**
aa9eede8 458 * The approximate pool utilization.
afe0d5bf
JB
459 *
460 * @returns The pool utilization.
461 */
462 private get utilization (): number {
8e5ca040 463 const poolTimeCapacity =
fe7d90db 464 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
465 const totalTasksRunTime = this.workerNodes.reduce(
466 (accumulator, workerNode) =>
71514351 467 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
468 0
469 )
470 const totalTasksWaitTime = this.workerNodes.reduce(
471 (accumulator, workerNode) =>
71514351 472 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
473 0
474 )
8e5ca040 475 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
476 }
477
8881ae32 478 /**
aa9eede8 479 * The pool type.
8881ae32
JB
480 *
481 * If it is `'dynamic'`, it provides the `max` property.
482 */
483 protected abstract get type (): PoolType
484
184855e6 485 /**
aa9eede8 486 * The worker type.
184855e6
JB
487 */
488 protected abstract get worker (): WorkerType
489
c2ade475 490 /**
aa9eede8 491 * The pool minimum size.
c2ade475 492 */
6b27d407 493 protected abstract get minSize (): number
ff733df7
JB
494
495 /**
aa9eede8 496 * The pool maximum size.
ff733df7 497 */
6b27d407 498 protected abstract get maxSize (): number
a35560ba 499
6b813701
JB
500 /**
501 * Checks if the worker id sent in the received message from a worker is valid.
502 *
503 * @param message - The received message.
504 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
505 */
21f710aa
JB
506 private checkMessageWorkerId (message: MessageValue<Response>): void {
507 if (
508 message.workerId != null &&
aad6fb64 509 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
510 ) {
511 throw new Error(
512 `Worker message received from unknown worker '${message.workerId}'`
513 )
514 }
515 }
516
ffcbbad8 517 /**
f06e48d8 518 * Gets the given worker its worker node key.
ffcbbad8
JB
519 *
520 * @param worker - The worker.
f59e1027 521 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 522 */
aad6fb64 523 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 524 return this.workerNodes.findIndex(
8ebe6c30 525 (workerNode) => workerNode.worker === worker
f06e48d8 526 )
bf9549ae
JB
527 }
528
aa9eede8
JB
529 /**
530 * Gets the worker node key given its worker id.
531 *
532 * @param workerId - The worker id.
aad6fb64 533 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 534 */
aad6fb64
JB
535 private getWorkerNodeKeyByWorkerId (workerId: number): number {
536 return this.workerNodes.findIndex(
8ebe6c30 537 (workerNode) => workerNode.info.id === workerId
aad6fb64 538 )
aa9eede8
JB
539 }
540
afc003b2 541 /** @inheritDoc */
a35560ba 542 public setWorkerChoiceStrategy (
59219cbb
JB
543 workerChoiceStrategy: WorkerChoiceStrategy,
544 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 545 ): void {
aee46736 546 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 547 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
548 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
549 this.opts.workerChoiceStrategy
550 )
551 if (workerChoiceStrategyOptions != null) {
552 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
553 }
aa9eede8 554 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 555 workerNode.resetUsage()
aa9eede8 556 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 557 }
a20f0ba5
JB
558 }
559
560 /** @inheritDoc */
561 public setWorkerChoiceStrategyOptions (
562 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
563 ): void {
0d80593b 564 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
565 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
566 this.workerChoiceStrategyContext.setOptions(
567 this.opts.workerChoiceStrategyOptions
a35560ba
S
568 )
569 }
570
a20f0ba5 571 /** @inheritDoc */
8f52842f
JB
572 public enableTasksQueue (
573 enable: boolean,
574 tasksQueueOptions?: TasksQueueOptions
575 ): void {
a20f0ba5 576 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 577 this.flushTasksQueues()
a20f0ba5
JB
578 }
579 this.opts.enableTasksQueue = enable
8f52842f 580 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
581 }
582
583 /** @inheritDoc */
8f52842f 584 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 585 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
586 this.checkValidTasksQueueOptions(tasksQueueOptions)
587 this.opts.tasksQueueOptions =
588 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 589 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
590 delete this.opts.tasksQueueOptions
591 }
592 }
593
594 private buildTasksQueueOptions (
595 tasksQueueOptions: TasksQueueOptions
596 ): TasksQueueOptions {
597 return {
598 concurrency: tasksQueueOptions?.concurrency ?? 1
599 }
600 }
601
c319c66b
JB
602 /**
603 * Whether the pool is full or not.
604 *
605 * The pool filling boolean status.
606 */
dea903a8
JB
607 protected get full (): boolean {
608 return this.workerNodes.length >= this.maxSize
609 }
c2ade475 610
c319c66b
JB
611 /**
612 * Whether the pool is busy or not.
613 *
614 * The pool busyness boolean status.
615 */
616 protected abstract get busy (): boolean
7c0ba920 617
6c6afb84 618 /**
3d76750a 619 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
620 *
621 * @returns Worker nodes busyness boolean status.
622 */
c2ade475 623 protected internalBusy (): boolean {
3d76750a
JB
624 if (this.opts.enableTasksQueue === true) {
625 return (
626 this.workerNodes.findIndex(
8ebe6c30 627 (workerNode) =>
3d76750a
JB
628 workerNode.info.ready &&
629 workerNode.usage.tasks.executing <
630 (this.opts.tasksQueueOptions?.concurrency as number)
631 ) === -1
632 )
633 } else {
634 return (
635 this.workerNodes.findIndex(
8ebe6c30 636 (workerNode) =>
3d76750a
JB
637 workerNode.info.ready && workerNode.usage.tasks.executing === 0
638 ) === -1
639 )
640 }
cb70b19d
JB
641 }
642
afc003b2 643 /** @inheritDoc */
7d91a8cd
JB
644 public async execute (
645 data?: Data,
646 name?: string,
647 transferList?: TransferListItem[]
648 ): Promise<Response> {
52b71763 649 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
650 if (name != null && typeof name !== 'string') {
651 reject(new TypeError('name argument must be a string'))
652 }
653 if (transferList != null && !Array.isArray(transferList)) {
654 reject(new TypeError('transferList argument must be an array'))
655 }
52b71763
JB
656 const timestamp = performance.now()
657 const workerNodeKey = this.chooseWorkerNode()
501aea93 658 const task: Task<Data> = {
52b71763
JB
659 name: name ?? DEFAULT_TASK_NAME,
660 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
661 data: data ?? ({} as Data),
7d91a8cd 662 transferList,
52b71763
JB
663 timestamp,
664 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 665 taskId: randomUUID()
52b71763 666 }
7629bdf1 667 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
668 resolve,
669 reject,
501aea93 670 workerNodeKey
2e81254d 671 })
52b71763 672 if (
4e377863
JB
673 this.opts.enableTasksQueue === false ||
674 (this.opts.enableTasksQueue === true &&
675 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 676 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 677 ) {
501aea93 678 this.executeTask(workerNodeKey, task)
4e377863
JB
679 } else {
680 this.enqueueTask(workerNodeKey, task)
52b71763
JB
681 }
682 this.checkAndEmitEvents()
2e81254d 683 })
280c2a77 684 }
c97c7edb 685
afc003b2 686 /** @inheritDoc */
c97c7edb 687 public async destroy (): Promise<void> {
1fbcaa7c 688 await Promise.all(
81c02522 689 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 690 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
691 })
692 )
c97c7edb
S
693 }
694
4a6952ff 695 /**
aa9eede8 696 * Terminates the worker node given its worker node key.
4a6952ff 697 *
aa9eede8 698 * @param workerNodeKey - The worker node key.
4a6952ff 699 */
81c02522 700 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 701
729c563d 702 /**
6677a3d3
JB
703 * Setup hook to execute code before worker nodes are created in the abstract constructor.
704 * Can be overridden.
afc003b2
JB
705 *
706 * @virtual
729c563d 707 */
280c2a77 708 protected setupHook (): void {
d99ba5a8 709 // Intentionally empty
280c2a77 710 }
c97c7edb 711
729c563d 712 /**
280c2a77
S
713 * Should return whether the worker is the main worker or not.
714 */
715 protected abstract isMain (): boolean
716
717 /**
2e81254d 718 * Hook executed before the worker task execution.
bf9549ae 719 * Can be overridden.
729c563d 720 *
f06e48d8 721 * @param workerNodeKey - The worker node key.
1c6fe997 722 * @param task - The task to execute.
729c563d 723 */
1c6fe997
JB
724 protected beforeTaskExecutionHook (
725 workerNodeKey: number,
726 task: Task<Data>
727 ): void {
f59e1027 728 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
729 ++workerUsage.tasks.executing
730 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 731 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
732 task.name as string
733 ) as WorkerUsage
eb8afc8a
JB
734 ++taskWorkerUsage.tasks.executing
735 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
736 }
737
c01733f1 738 /**
2e81254d 739 * Hook executed after the worker task execution.
bf9549ae 740 * Can be overridden.
c01733f1 741 *
501aea93 742 * @param workerNodeKey - The worker node key.
38e795c1 743 * @param message - The received message.
c01733f1 744 */
2e81254d 745 protected afterTaskExecutionHook (
501aea93 746 workerNodeKey: number,
2740a743 747 message: MessageValue<Response>
bf9549ae 748 ): void {
ff128cc9 749 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
750 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
751 this.updateRunTimeWorkerUsage(workerUsage, message)
752 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 753 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 754 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 755 ) as WorkerUsage
eb8afc8a
JB
756 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
757 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
758 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
759 }
760
761 private updateTaskStatisticsWorkerUsage (
762 workerUsage: WorkerUsage,
763 message: MessageValue<Response>
764 ): void {
a4e07f72
JB
765 const workerTaskStatistics = workerUsage.tasks
766 --workerTaskStatistics.executing
98e72cda
JB
767 if (message.taskError == null) {
768 ++workerTaskStatistics.executed
769 } else {
a4e07f72 770 ++workerTaskStatistics.failed
2740a743 771 }
f8eb0a2a
JB
772 }
773
a4e07f72
JB
774 private updateRunTimeWorkerUsage (
775 workerUsage: WorkerUsage,
f8eb0a2a
JB
776 message: MessageValue<Response>
777 ): void {
e4f20deb
JB
778 updateMeasurementStatistics(
779 workerUsage.runTime,
780 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
781 message.taskPerformance?.runTime ?? 0,
782 workerUsage.tasks.executed
783 )
f8eb0a2a
JB
784 }
785
a4e07f72
JB
786 private updateWaitTimeWorkerUsage (
787 workerUsage: WorkerUsage,
1c6fe997 788 task: Task<Data>
f8eb0a2a 789 ): void {
1c6fe997
JB
790 const timestamp = performance.now()
791 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
792 updateMeasurementStatistics(
793 workerUsage.waitTime,
794 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
795 taskWaitTime,
796 workerUsage.tasks.executed
797 )
c01733f1 798 }
799
a4e07f72 800 private updateEluWorkerUsage (
5df69fab 801 workerUsage: WorkerUsage,
62c15a68
JB
802 message: MessageValue<Response>
803 ): void {
008512c7
JB
804 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
805 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
806 updateMeasurementStatistics(
807 workerUsage.elu.active,
008512c7 808 eluTaskStatisticsRequirements,
e4f20deb
JB
809 message.taskPerformance?.elu?.active ?? 0,
810 workerUsage.tasks.executed
811 )
812 updateMeasurementStatistics(
813 workerUsage.elu.idle,
008512c7 814 eluTaskStatisticsRequirements,
e4f20deb
JB
815 message.taskPerformance?.elu?.idle ?? 0,
816 workerUsage.tasks.executed
817 )
008512c7 818 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 819 if (message.taskPerformance?.elu != null) {
f7510105
JB
820 if (workerUsage.elu.utilization != null) {
821 workerUsage.elu.utilization =
822 (workerUsage.elu.utilization +
823 message.taskPerformance.elu.utilization) /
824 2
825 } else {
826 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
827 }
62c15a68
JB
828 }
829 }
830 }
831
280c2a77 832 /**
f06e48d8 833 * Chooses a worker node for the next task.
280c2a77 834 *
6c6afb84 835 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 836 *
aa9eede8 837 * @returns The chosen worker node key
280c2a77 838 */
6c6afb84 839 private chooseWorkerNode (): number {
930dcf12 840 if (this.shallCreateDynamicWorker()) {
aa9eede8 841 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
842 if (
843 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
844 ) {
aa9eede8 845 return workerNodeKey
6c6afb84 846 }
17393ac8 847 }
930dcf12
JB
848 return this.workerChoiceStrategyContext.execute()
849 }
850
6c6afb84
JB
851 /**
852 * Conditions for dynamic worker creation.
853 *
854 * @returns Whether to create a dynamic worker or not.
855 */
856 private shallCreateDynamicWorker (): boolean {
930dcf12 857 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
858 }
859
280c2a77 860 /**
aa9eede8 861 * Sends a message to worker given its worker node key.
280c2a77 862 *
aa9eede8 863 * @param workerNodeKey - The worker node key.
38e795c1 864 * @param message - The message.
7d91a8cd 865 * @param transferList - The optional array of transferable objects.
280c2a77
S
866 */
867 protected abstract sendToWorker (
aa9eede8 868 workerNodeKey: number,
7d91a8cd
JB
869 message: MessageValue<Data>,
870 transferList?: TransferListItem[]
280c2a77
S
871 ): void
872
729c563d 873 /**
41344292 874 * Creates a new worker.
6c6afb84
JB
875 *
876 * @returns Newly created worker.
729c563d 877 */
280c2a77 878 protected abstract createWorker (): Worker
c97c7edb 879
4a6952ff 880 /**
aa9eede8 881 * Creates a new, completely set up worker node.
4a6952ff 882 *
aa9eede8 883 * @returns New, completely set up worker node key.
4a6952ff 884 */
aa9eede8 885 protected createAndSetupWorkerNode (): number {
bdacc2d2 886 const worker = this.createWorker()
280c2a77 887
35cf1c03 888 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 889 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 890 worker.on('error', (error) => {
aad6fb64 891 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
892 const workerInfo = this.getWorkerInfo(workerNodeKey)
893 workerInfo.ready = false
0dc838e3 894 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 895 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 896 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 897 if (workerInfo.dynamic) {
aa9eede8 898 this.createAndSetupDynamicWorkerNode()
8a1260a3 899 } else {
aa9eede8 900 this.createAndSetupWorkerNode()
8a1260a3 901 }
5baee0d7 902 }
19dbc45b 903 if (this.opts.enableTasksQueue === true) {
9b106837 904 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 905 }
5baee0d7 906 })
a35560ba
S
907 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
908 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 909 worker.once('exit', () => {
f06e48d8 910 this.removeWorkerNode(worker)
a974afa6 911 })
280c2a77 912
aa9eede8 913 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 914
aa9eede8 915 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 916
aa9eede8 917 return workerNodeKey
c97c7edb 918 }
be0676b3 919
930dcf12 920 /**
aa9eede8 921 * Creates a new, completely set up dynamic worker node.
930dcf12 922 *
aa9eede8 923 * @returns New, completely set up dynamic worker node key.
930dcf12 924 */
aa9eede8
JB
925 protected createAndSetupDynamicWorkerNode (): number {
926 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 927 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
928 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
929 message.workerId
aad6fb64 930 )
aa9eede8 931 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 932 // Kill message received from worker
930dcf12
JB
933 if (
934 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
935 (message.kill != null &&
936 ((this.opts.enableTasksQueue === false &&
aa9eede8 937 workerUsage.tasks.executing === 0) ||
7b56f532 938 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
939 workerUsage.tasks.executing === 0 &&
940 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 941 ) {
81c02522 942 this.destroyWorkerNode(localWorkerNodeKey).catch(EMPTY_FUNCTION)
930dcf12
JB
943 }
944 })
aa9eede8 945 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 946 this.sendToWorker(workerNodeKey, {
b0a4db63 947 checkActive: true,
21f710aa
JB
948 workerId: workerInfo.id as number
949 })
b5e113f6
JB
950 workerInfo.dynamic = true
951 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
952 workerInfo.ready = true
953 }
aa9eede8 954 return workerNodeKey
930dcf12
JB
955 }
956
a2ed5053 957 /**
aa9eede8 958 * Registers a listener callback on the worker given its worker node key.
a2ed5053 959 *
aa9eede8 960 * @param workerNodeKey - The worker node key.
a2ed5053
JB
961 * @param listener - The message listener callback.
962 */
85aeb3f3
JB
963 protected abstract registerWorkerMessageListener<
964 Message extends Data | Response
aa9eede8
JB
965 >(
966 workerNodeKey: number,
967 listener: (message: MessageValue<Message>) => void
968 ): void
a2ed5053
JB
969
970 /**
aa9eede8 971 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
972 * Can be overridden.
973 *
aa9eede8 974 * @param workerNodeKey - The newly created worker node key.
a2ed5053 975 */
aa9eede8 976 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 977 // Listen to worker messages.
aa9eede8 978 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 979 // Send the startup message to worker.
aa9eede8
JB
980 this.sendStartupMessageToWorker(workerNodeKey)
981 // Send the worker statistics message to worker.
982 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
983 }
984
85aeb3f3 985 /**
aa9eede8
JB
986 * Sends the startup message to worker given its worker node key.
987 *
988 * @param workerNodeKey - The worker node key.
989 */
990 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
991
992 /**
993 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 994 *
aa9eede8 995 * @param workerNodeKey - The worker node key.
85aeb3f3 996 */
aa9eede8
JB
997 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
998 this.sendToWorker(workerNodeKey, {
999 statistics: {
1000 runTime:
1001 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1002 .runTime.aggregate,
1003 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1004 .elu.aggregate
1005 },
1006 workerId: this.getWorkerInfo(workerNodeKey).id as number
1007 })
1008 }
a2ed5053
JB
1009
1010 private redistributeQueuedTasks (workerNodeKey: number): void {
1011 while (this.tasksQueueSize(workerNodeKey) > 0) {
1012 let targetWorkerNodeKey: number = workerNodeKey
1013 let minQueuedTasks = Infinity
10ecf8fd 1014 let executeTask = false
a2ed5053
JB
1015 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1016 const workerInfo = this.getWorkerInfo(workerNodeId)
1017 if (
1018 workerNodeId !== workerNodeKey &&
1019 workerInfo.ready &&
1020 workerNode.usage.tasks.queued === 0
1021 ) {
a5ed75b7
JB
1022 if (
1023 this.workerNodes[workerNodeId].usage.tasks.executing <
1024 (this.opts.tasksQueueOptions?.concurrency as number)
1025 ) {
10ecf8fd
JB
1026 executeTask = true
1027 }
a2ed5053
JB
1028 targetWorkerNodeKey = workerNodeId
1029 break
1030 }
1031 if (
1032 workerNodeId !== workerNodeKey &&
1033 workerInfo.ready &&
1034 workerNode.usage.tasks.queued < minQueuedTasks
1035 ) {
1036 minQueuedTasks = workerNode.usage.tasks.queued
1037 targetWorkerNodeKey = workerNodeId
1038 }
1039 }
10ecf8fd
JB
1040 if (executeTask) {
1041 this.executeTask(
1042 targetWorkerNodeKey,
1043 this.dequeueTask(workerNodeKey) as Task<Data>
1044 )
1045 } else {
1046 this.enqueueTask(
1047 targetWorkerNodeKey,
1048 this.dequeueTask(workerNodeKey) as Task<Data>
1049 )
1050 }
a2ed5053
JB
1051 }
1052 }
1053
be0676b3 1054 /**
aa9eede8 1055 * This method is the listener registered for each worker message.
be0676b3 1056 *
bdacc2d2 1057 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1058 */
1059 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1060 return (message) => {
21f710aa 1061 this.checkMessageWorkerId(message)
d2c73f82 1062 if (message.ready != null) {
81c02522 1063 // Worker ready response received from worker
10e2aa7e 1064 this.handleWorkerReadyResponse(message)
7629bdf1 1065 } else if (message.taskId != null) {
81c02522 1066 // Task execution response received from worker
6b272951
JB
1067 this.handleTaskExecutionResponse(message)
1068 }
1069 }
1070 }
1071
10e2aa7e 1072 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1073 this.getWorkerInfo(
aad6fb64 1074 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1075 ).ready = message.ready as boolean
2431bdb4
JB
1076 if (this.emitter != null && this.ready) {
1077 this.emitter.emit(PoolEvents.ready, this.info)
1078 }
6b272951
JB
1079 }
1080
1081 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
7629bdf1
JB
1082 const promiseResponse = this.promiseResponseMap.get(
1083 message.taskId as string
1084 )
6b272951
JB
1085 if (promiseResponse != null) {
1086 if (message.taskError != null) {
2a69b8c5 1087 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1088 promiseResponse.reject(message.taskError.message)
1089 } else {
1090 promiseResponse.resolve(message.data as Response)
1091 }
501aea93
JB
1092 const workerNodeKey = promiseResponse.workerNodeKey
1093 this.afterTaskExecutionHook(workerNodeKey, message)
7629bdf1 1094 this.promiseResponseMap.delete(message.taskId as string)
6b272951
JB
1095 if (
1096 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1097 this.tasksQueueSize(workerNodeKey) > 0 &&
1098 this.workerNodes[workerNodeKey].usage.tasks.executing <
1099 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1100 ) {
1101 this.executeTask(
1102 workerNodeKey,
1103 this.dequeueTask(workerNodeKey) as Task<Data>
1104 )
be0676b3 1105 }
6b272951 1106 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1107 }
be0676b3 1108 }
7c0ba920 1109
ff733df7 1110 private checkAndEmitEvents (): void {
1f68cede 1111 if (this.emitter != null) {
ff733df7 1112 if (this.busy) {
2845f2a5 1113 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1114 }
6b27d407 1115 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1116 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1117 }
164d950a
JB
1118 }
1119 }
1120
8a1260a3 1121 /**
aa9eede8 1122 * Gets the worker information given its worker node key.
8a1260a3
JB
1123 *
1124 * @param workerNodeKey - The worker node key.
3f09ed9f 1125 * @returns The worker information.
8a1260a3 1126 */
aa9eede8 1127 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1128 return this.workerNodes[workerNodeKey].info
e221309a
JB
1129 }
1130
a05c10de 1131 /**
b0a4db63 1132 * Adds the given worker in the pool worker nodes.
ea7a90d3 1133 *
38e795c1 1134 * @param worker - The worker.
aa9eede8
JB
1135 * @returns The added worker node key.
1136 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1137 */
b0a4db63 1138 private addWorkerNode (worker: Worker): number {
cc3ab78b 1139 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1140 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1141 if (this.starting) {
1142 workerNode.info.ready = true
1143 }
aa9eede8 1144 this.workerNodes.push(workerNode)
aad6fb64 1145 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1146 if (workerNodeKey === -1) {
1147 throw new Error('Worker node not found')
1148 }
1149 return workerNodeKey
ea7a90d3 1150 }
c923ce56 1151
51fe3d3c 1152 /**
f06e48d8 1153 * Removes the given worker from the pool worker nodes.
51fe3d3c 1154 *
f06e48d8 1155 * @param worker - The worker.
51fe3d3c 1156 */
416fd65c 1157 private removeWorkerNode (worker: Worker): void {
aad6fb64 1158 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1159 if (workerNodeKey !== -1) {
1160 this.workerNodes.splice(workerNodeKey, 1)
1161 this.workerChoiceStrategyContext.remove(workerNodeKey)
1162 }
51fe3d3c 1163 }
adc3c320 1164
b0a4db63 1165 /**
aa9eede8 1166 * Executes the given task on the worker given its worker node key.
b0a4db63 1167 *
aa9eede8 1168 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1169 * @param task - The task to execute.
1170 */
2e81254d 1171 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1172 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1173 this.sendToWorker(workerNodeKey, task, task.transferList)
2e81254d
JB
1174 }
1175
f9f00b5f 1176 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1177 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1178 }
1179
416fd65c 1180 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1181 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1182 }
1183
416fd65c 1184 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1185 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1186 }
1187
81c02522 1188 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1189 while (this.tasksQueueSize(workerNodeKey) > 0) {
1190 this.executeTask(
1191 workerNodeKey,
1192 this.dequeueTask(workerNodeKey) as Task<Data>
1193 )
ff733df7 1194 }
4b628b48 1195 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1196 }
1197
ef41a6e6
JB
1198 private flushTasksQueues (): void {
1199 for (const [workerNodeKey] of this.workerNodes.entries()) {
1200 this.flushTasksQueue(workerNodeKey)
1201 }
1202 }
c97c7edb 1203}