perf: optimize worker node lookup by worker id
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
7c89e6a4 13 isAsyncFunction,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
e102732c
JB
31import type {
32 IWorker,
4b628b48 33 IWorkerNode,
8a1260a3 34 WorkerInfo,
4b628b48 35 WorkerType,
e102732c
JB
36 WorkerUsage
37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
c97c7edb
S
109 throw new Error('Cannot start a pool from a worker!')
110 }
8d3782fa 111 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 112 this.checkFilePath(this.filePath)
7c0ba920 113 this.checkPoolOptions(this.opts)
1086026a 114
7254e419
JB
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 118 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 175 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
176 if (max == null) {
177 throw new Error(
178 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
179 )
180 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
181 throw new TypeError(
182 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
183 )
184 } else if (min > max) {
079de991
JB
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
187 )
b97d82d8 188 } else if (max === 0) {
079de991 189 throw new RangeError(
b97d82d8 190 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
191 )
192 } else if (min === max) {
193 throw new RangeError(
194 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
195 )
196 }
8d3782fa
JB
197 }
198 }
199
7c0ba920 200 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
201 if (isPlainObject(opts)) {
202 this.opts.workerChoiceStrategy =
203 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
204 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
205 this.opts.workerChoiceStrategyOptions =
206 opts.workerChoiceStrategyOptions ??
207 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
208 this.checkValidWorkerChoiceStrategyOptions(
209 this.opts.workerChoiceStrategyOptions
210 )
1f68cede 211 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
212 this.opts.enableEvents = opts.enableEvents ?? true
213 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
214 if (this.opts.enableTasksQueue) {
215 this.checkValidTasksQueueOptions(
216 opts.tasksQueueOptions as TasksQueueOptions
217 )
218 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 }
222 } else {
223 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 224 }
aee46736
JB
225 }
226
227 private checkValidWorkerChoiceStrategy (
228 workerChoiceStrategy: WorkerChoiceStrategy
229 ): void {
230 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 231 throw new Error(
aee46736 232 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
233 )
234 }
7c0ba920
JB
235 }
236
0d80593b
JB
237 private checkValidWorkerChoiceStrategyOptions (
238 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
239 ): void {
240 if (!isPlainObject(workerChoiceStrategyOptions)) {
241 throw new TypeError(
242 'Invalid worker choice strategy options: must be a plain object'
243 )
244 }
49be33fe
JB
245 if (
246 workerChoiceStrategyOptions.weights != null &&
6b27d407 247 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
248 ) {
249 throw new Error(
250 'Invalid worker choice strategy options: must have a weight for each worker node'
251 )
252 }
f0d7f803
JB
253 if (
254 workerChoiceStrategyOptions.measurement != null &&
255 !Object.values(Measurements).includes(
256 workerChoiceStrategyOptions.measurement
257 )
258 ) {
259 throw new Error(
260 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
261 )
262 }
0d80593b
JB
263 }
264
a20f0ba5
JB
265 private checkValidTasksQueueOptions (
266 tasksQueueOptions: TasksQueueOptions
267 ): void {
0d80593b
JB
268 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
269 throw new TypeError('Invalid tasks queue options: must be a plain object')
270 }
f0d7f803
JB
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 !Number.isSafeInteger(tasksQueueOptions.concurrency)
274 ) {
275 throw new TypeError(
276 'Invalid worker tasks concurrency: must be an integer'
277 )
278 }
279 if (
280 tasksQueueOptions?.concurrency != null &&
281 tasksQueueOptions.concurrency <= 0
282 ) {
a20f0ba5 283 throw new Error(
f0d7f803 284 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
285 )
286 }
287 }
288
e761c033
JB
289 private startPool (): void {
290 while (
291 this.workerNodes.reduce(
292 (accumulator, workerNode) =>
293 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
294 0
295 ) < this.numberOfWorkers
296 ) {
aa9eede8 297 this.createAndSetupWorkerNode()
e761c033
JB
298 }
299 }
300
08f3f44c 301 /** @inheritDoc */
6b27d407
JB
302 public get info (): PoolInfo {
303 return {
23ccf9d7 304 version,
6b27d407 305 type: this.type,
184855e6 306 worker: this.worker,
2431bdb4
JB
307 ready: this.ready,
308 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
309 minSize: this.minSize,
310 maxSize: this.maxSize,
c05f0d50
JB
311 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
312 .runTime.aggregate &&
1305e9a8
JB
313 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
315 workerNodes: this.workerNodes.length,
316 idleWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
f59e1027 318 workerNode.usage.tasks.executing === 0
a4e07f72
JB
319 ? accumulator + 1
320 : accumulator,
6b27d407
JB
321 0
322 ),
323 busyWorkerNodes: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
f59e1027 325 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
326 0
327 ),
a4e07f72 328 executedTasks: this.workerNodes.reduce(
6b27d407 329 (accumulator, workerNode) =>
f59e1027 330 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
331 0
332 ),
333 executingTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
f59e1027 335 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
336 0
337 ),
338 queuedTasks: this.workerNodes.reduce(
df593701 339 (accumulator, workerNode) =>
f59e1027 340 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
341 0
342 ),
343 maxQueuedTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
b25a42cd 345 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 346 0
a4e07f72
JB
347 ),
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
f59e1027 350 accumulator + workerNode.usage.tasks.failed,
a4e07f72 351 0
1dcf8b7b
JB
352 ),
353 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
354 .runTime.aggregate && {
355 runTime: {
98e72cda
JB
356 minimum: round(
357 Math.min(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
360 )
1dcf8b7b
JB
361 )
362 ),
98e72cda
JB
363 maximum: round(
364 Math.max(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
367 )
1dcf8b7b 368 )
98e72cda
JB
369 ),
370 average: round(
371 this.workerNodes.reduce(
372 (accumulator, workerNode) =>
373 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
374 0
375 ) /
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.tasks?.executed ?? 0),
379 0
380 )
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.median && {
384 median: round(
385 median(
386 this.workerNodes.map(
387 workerNode => workerNode.usage.runTime?.median ?? 0
388 )
389 )
390 )
391 })
1dcf8b7b
JB
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
98e72cda
JB
397 minimum: round(
398 Math.min(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
401 )
1dcf8b7b
JB
402 )
403 ),
98e72cda
JB
404 maximum: round(
405 Math.max(
406 ...this.workerNodes.map(
407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
408 )
1dcf8b7b 409 )
98e72cda
JB
410 ),
411 average: round(
412 this.workerNodes.reduce(
413 (accumulator, workerNode) =>
414 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
415 0
416 ) /
417 this.workerNodes.reduce(
418 (accumulator, workerNode) =>
419 accumulator + (workerNode.usage.tasks?.executed ?? 0),
420 0
421 )
422 ),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
427 this.workerNodes.map(
428 workerNode => workerNode.usage.waitTime?.median ?? 0
429 )
430 )
431 )
432 })
1dcf8b7b
JB
433 }
434 })
6b27d407
JB
435 }
436 }
08f3f44c 437
aa9eede8
JB
438 /**
439 * The pool readiness boolean status.
440 */
2431bdb4
JB
441 private get ready (): boolean {
442 return (
b97d82d8
JB
443 this.workerNodes.reduce(
444 (accumulator, workerNode) =>
445 !workerNode.info.dynamic && workerNode.info.ready
446 ? accumulator + 1
447 : accumulator,
448 0
449 ) >= this.minSize
2431bdb4
JB
450 )
451 }
452
afe0d5bf 453 /**
aa9eede8 454 * The approximate pool utilization.
afe0d5bf
JB
455 *
456 * @returns The pool utilization.
457 */
458 private get utilization (): number {
8e5ca040 459 const poolTimeCapacity =
fe7d90db 460 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
461 const totalTasksRunTime = this.workerNodes.reduce(
462 (accumulator, workerNode) =>
71514351 463 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
464 0
465 )
466 const totalTasksWaitTime = this.workerNodes.reduce(
467 (accumulator, workerNode) =>
71514351 468 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
469 0
470 )
8e5ca040 471 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
472 }
473
8881ae32 474 /**
aa9eede8 475 * The pool type.
8881ae32
JB
476 *
477 * If it is `'dynamic'`, it provides the `max` property.
478 */
479 protected abstract get type (): PoolType
480
184855e6 481 /**
aa9eede8 482 * The worker type.
184855e6
JB
483 */
484 protected abstract get worker (): WorkerType
485
c2ade475 486 /**
aa9eede8 487 * The pool minimum size.
c2ade475 488 */
6b27d407 489 protected abstract get minSize (): number
ff733df7
JB
490
491 /**
aa9eede8 492 * The pool maximum size.
ff733df7 493 */
6b27d407 494 protected abstract get maxSize (): number
a35560ba 495
6b813701
JB
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
21f710aa
JB
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
aad6fb64 505 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
ffcbbad8 513 /**
f06e48d8 514 * Gets the given worker its worker node key.
ffcbbad8
JB
515 *
516 * @param worker - The worker.
f59e1027 517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 518 */
aad6fb64 519 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8
JB
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
bf9549ae
JB
523 }
524
aa9eede8
JB
525 /**
526 * Gets the worker node key given its worker id.
527 *
528 * @param workerId - The worker id.
aad6fb64 529 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 530 */
aad6fb64
JB
531 private getWorkerNodeKeyByWorkerId (workerId: number): number {
532 return this.workerNodes.findIndex(
533 workerNode => workerNode.info.id === workerId
534 )
aa9eede8
JB
535 }
536
afc003b2 537 /** @inheritDoc */
a35560ba 538 public setWorkerChoiceStrategy (
59219cbb
JB
539 workerChoiceStrategy: WorkerChoiceStrategy,
540 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 541 ): void {
aee46736 542 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 543 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
544 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
545 this.opts.workerChoiceStrategy
546 )
547 if (workerChoiceStrategyOptions != null) {
548 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
549 }
aa9eede8 550 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 551 workerNode.resetUsage()
aa9eede8 552 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
59219cbb 553 }
a20f0ba5
JB
554 }
555
556 /** @inheritDoc */
557 public setWorkerChoiceStrategyOptions (
558 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
559 ): void {
0d80593b 560 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
561 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
562 this.workerChoiceStrategyContext.setOptions(
563 this.opts.workerChoiceStrategyOptions
a35560ba
S
564 )
565 }
566
a20f0ba5 567 /** @inheritDoc */
8f52842f
JB
568 public enableTasksQueue (
569 enable: boolean,
570 tasksQueueOptions?: TasksQueueOptions
571 ): void {
a20f0ba5 572 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 573 this.flushTasksQueues()
a20f0ba5
JB
574 }
575 this.opts.enableTasksQueue = enable
8f52842f 576 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
577 }
578
579 /** @inheritDoc */
8f52842f 580 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 581 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
582 this.checkValidTasksQueueOptions(tasksQueueOptions)
583 this.opts.tasksQueueOptions =
584 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 585 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
586 delete this.opts.tasksQueueOptions
587 }
588 }
589
590 private buildTasksQueueOptions (
591 tasksQueueOptions: TasksQueueOptions
592 ): TasksQueueOptions {
593 return {
594 concurrency: tasksQueueOptions?.concurrency ?? 1
595 }
596 }
597
c319c66b
JB
598 /**
599 * Whether the pool is full or not.
600 *
601 * The pool filling boolean status.
602 */
dea903a8
JB
603 protected get full (): boolean {
604 return this.workerNodes.length >= this.maxSize
605 }
c2ade475 606
c319c66b
JB
607 /**
608 * Whether the pool is busy or not.
609 *
610 * The pool busyness boolean status.
611 */
612 protected abstract get busy (): boolean
7c0ba920 613
6c6afb84
JB
614 /**
615 * Whether worker nodes are executing at least one task.
616 *
617 * @returns Worker nodes busyness boolean status.
618 */
c2ade475 619 protected internalBusy (): boolean {
e0ae6100
JB
620 return (
621 this.workerNodes.findIndex(workerNode => {
f59e1027 622 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
623 }) === -1
624 )
cb70b19d
JB
625 }
626
afc003b2 627 /** @inheritDoc */
a86b6df1 628 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
629 return await new Promise<Response>((resolve, reject) => {
630 const timestamp = performance.now()
631 const workerNodeKey = this.chooseWorkerNode()
501aea93 632 const task: Task<Data> = {
52b71763
JB
633 name: name ?? DEFAULT_TASK_NAME,
634 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
635 data: data ?? ({} as Data),
636 timestamp,
637 workerId: this.getWorkerInfo(workerNodeKey).id as number,
638 id: randomUUID()
639 }
501aea93 640 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
641 resolve,
642 reject,
501aea93 643 workerNodeKey
2e81254d 644 })
52b71763
JB
645 if (
646 this.opts.enableTasksQueue === true &&
647 (this.busy ||
648 this.workerNodes[workerNodeKey].usage.tasks.executing >=
b5e113f6 649 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 650 ) {
501aea93 651 this.enqueueTask(workerNodeKey, task)
52b71763 652 } else {
501aea93 653 this.executeTask(workerNodeKey, task)
52b71763
JB
654 }
655 this.checkAndEmitEvents()
2e81254d 656 })
280c2a77 657 }
c97c7edb 658
afc003b2 659 /** @inheritDoc */
c97c7edb 660 public async destroy (): Promise<void> {
1fbcaa7c 661 await Promise.all(
875a7c37
JB
662 this.workerNodes.map(async (workerNode, workerNodeKey) => {
663 this.flushTasksQueue(workerNodeKey)
47aacbaa 664 // FIXME: wait for tasks to be finished
920278a2
JB
665 const workerExitPromise = new Promise<void>(resolve => {
666 workerNode.worker.on('exit', () => {
667 resolve()
668 })
669 })
aa9eede8 670 await this.destroyWorkerNode(workerNodeKey)
920278a2 671 await workerExitPromise
1fbcaa7c
JB
672 })
673 )
c97c7edb
S
674 }
675
4a6952ff 676 /**
aa9eede8 677 * Terminates the worker node given its worker node key.
4a6952ff 678 *
aa9eede8 679 * @param workerNodeKey - The worker node key.
4a6952ff 680 */
aa9eede8
JB
681 protected abstract destroyWorkerNode (
682 workerNodeKey: number
683 ): void | Promise<void>
c97c7edb 684
729c563d 685 /**
6677a3d3
JB
686 * Setup hook to execute code before worker nodes are created in the abstract constructor.
687 * Can be overridden.
afc003b2
JB
688 *
689 * @virtual
729c563d 690 */
280c2a77 691 protected setupHook (): void {
d99ba5a8 692 // Intentionally empty
280c2a77 693 }
c97c7edb 694
729c563d 695 /**
280c2a77
S
696 * Should return whether the worker is the main worker or not.
697 */
698 protected abstract isMain (): boolean
699
700 /**
2e81254d 701 * Hook executed before the worker task execution.
bf9549ae 702 * Can be overridden.
729c563d 703 *
f06e48d8 704 * @param workerNodeKey - The worker node key.
1c6fe997 705 * @param task - The task to execute.
729c563d 706 */
1c6fe997
JB
707 protected beforeTaskExecutionHook (
708 workerNodeKey: number,
709 task: Task<Data>
710 ): void {
f59e1027 711 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
712 ++workerUsage.tasks.executing
713 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 714 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
715 task.name as string
716 ) as WorkerUsage
eb8afc8a
JB
717 ++taskWorkerUsage.tasks.executing
718 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
719 }
720
c01733f1 721 /**
2e81254d 722 * Hook executed after the worker task execution.
bf9549ae 723 * Can be overridden.
c01733f1 724 *
501aea93 725 * @param workerNodeKey - The worker node key.
38e795c1 726 * @param message - The received message.
c01733f1 727 */
2e81254d 728 protected afterTaskExecutionHook (
501aea93 729 workerNodeKey: number,
2740a743 730 message: MessageValue<Response>
bf9549ae 731 ): void {
ff128cc9 732 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
733 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
734 this.updateRunTimeWorkerUsage(workerUsage, message)
735 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 736 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 737 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 738 ) as WorkerUsage
eb8afc8a
JB
739 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
740 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
741 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
742 }
743
744 private updateTaskStatisticsWorkerUsage (
745 workerUsage: WorkerUsage,
746 message: MessageValue<Response>
747 ): void {
a4e07f72
JB
748 const workerTaskStatistics = workerUsage.tasks
749 --workerTaskStatistics.executing
98e72cda
JB
750 if (message.taskError == null) {
751 ++workerTaskStatistics.executed
752 } else {
a4e07f72 753 ++workerTaskStatistics.failed
2740a743 754 }
f8eb0a2a
JB
755 }
756
a4e07f72
JB
757 private updateRunTimeWorkerUsage (
758 workerUsage: WorkerUsage,
f8eb0a2a
JB
759 message: MessageValue<Response>
760 ): void {
e4f20deb
JB
761 updateMeasurementStatistics(
762 workerUsage.runTime,
763 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
764 message.taskPerformance?.runTime ?? 0,
765 workerUsage.tasks.executed
766 )
f8eb0a2a
JB
767 }
768
a4e07f72
JB
769 private updateWaitTimeWorkerUsage (
770 workerUsage: WorkerUsage,
1c6fe997 771 task: Task<Data>
f8eb0a2a 772 ): void {
1c6fe997
JB
773 const timestamp = performance.now()
774 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
775 updateMeasurementStatistics(
776 workerUsage.waitTime,
777 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
778 taskWaitTime,
779 workerUsage.tasks.executed
780 )
c01733f1 781 }
782
a4e07f72 783 private updateEluWorkerUsage (
5df69fab 784 workerUsage: WorkerUsage,
62c15a68
JB
785 message: MessageValue<Response>
786 ): void {
008512c7
JB
787 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
788 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
789 updateMeasurementStatistics(
790 workerUsage.elu.active,
008512c7 791 eluTaskStatisticsRequirements,
e4f20deb
JB
792 message.taskPerformance?.elu?.active ?? 0,
793 workerUsage.tasks.executed
794 )
795 updateMeasurementStatistics(
796 workerUsage.elu.idle,
008512c7 797 eluTaskStatisticsRequirements,
e4f20deb
JB
798 message.taskPerformance?.elu?.idle ?? 0,
799 workerUsage.tasks.executed
800 )
008512c7 801 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 802 if (message.taskPerformance?.elu != null) {
f7510105
JB
803 if (workerUsage.elu.utilization != null) {
804 workerUsage.elu.utilization =
805 (workerUsage.elu.utilization +
806 message.taskPerformance.elu.utilization) /
807 2
808 } else {
809 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
810 }
62c15a68
JB
811 }
812 }
813 }
814
280c2a77 815 /**
f06e48d8 816 * Chooses a worker node for the next task.
280c2a77 817 *
6c6afb84 818 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 819 *
aa9eede8 820 * @returns The chosen worker node key
280c2a77 821 */
6c6afb84 822 private chooseWorkerNode (): number {
930dcf12 823 if (this.shallCreateDynamicWorker()) {
aa9eede8 824 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
825 if (
826 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
827 ) {
aa9eede8 828 return workerNodeKey
6c6afb84 829 }
17393ac8 830 }
930dcf12
JB
831 return this.workerChoiceStrategyContext.execute()
832 }
833
6c6afb84
JB
834 /**
835 * Conditions for dynamic worker creation.
836 *
837 * @returns Whether to create a dynamic worker or not.
838 */
839 private shallCreateDynamicWorker (): boolean {
930dcf12 840 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
841 }
842
280c2a77 843 /**
aa9eede8 844 * Sends a message to worker given its worker node key.
280c2a77 845 *
aa9eede8 846 * @param workerNodeKey - The worker node key.
38e795c1 847 * @param message - The message.
280c2a77
S
848 */
849 protected abstract sendToWorker (
aa9eede8 850 workerNodeKey: number,
280c2a77
S
851 message: MessageValue<Data>
852 ): void
853
729c563d 854 /**
41344292 855 * Creates a new worker.
6c6afb84
JB
856 *
857 * @returns Newly created worker.
729c563d 858 */
280c2a77 859 protected abstract createWorker (): Worker
c97c7edb 860
4a6952ff 861 /**
aa9eede8 862 * Creates a new, completely set up worker node.
4a6952ff 863 *
aa9eede8 864 * @returns New, completely set up worker node key.
4a6952ff 865 */
aa9eede8 866 protected createAndSetupWorkerNode (): number {
bdacc2d2 867 const worker = this.createWorker()
280c2a77 868
35cf1c03 869 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 870 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 871 worker.on('error', error => {
aad6fb64 872 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
873 const workerInfo = this.getWorkerInfo(workerNodeKey)
874 workerInfo.ready = false
0dc838e3 875 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 876 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 877 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 878 if (workerInfo.dynamic) {
aa9eede8 879 this.createAndSetupDynamicWorkerNode()
8a1260a3 880 } else {
aa9eede8 881 this.createAndSetupWorkerNode()
8a1260a3 882 }
5baee0d7 883 }
19dbc45b 884 if (this.opts.enableTasksQueue === true) {
9b106837 885 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 886 }
5baee0d7 887 })
a35560ba
S
888 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
889 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 890 worker.once('exit', () => {
f06e48d8 891 this.removeWorkerNode(worker)
a974afa6 892 })
280c2a77 893
aa9eede8 894 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 895
aa9eede8 896 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 897
aa9eede8 898 return workerNodeKey
c97c7edb 899 }
be0676b3 900
930dcf12 901 /**
aa9eede8 902 * Creates a new, completely set up dynamic worker node.
930dcf12 903 *
aa9eede8 904 * @returns New, completely set up dynamic worker node key.
930dcf12 905 */
aa9eede8
JB
906 protected createAndSetupDynamicWorkerNode (): number {
907 const workerNodeKey = this.createAndSetupWorkerNode()
908 this.registerWorkerMessageListener(workerNodeKey, message => {
909 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
910 message.workerId
aad6fb64 911 )
aa9eede8 912 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
930dcf12
JB
913 if (
914 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
915 (message.kill != null &&
916 ((this.opts.enableTasksQueue === false &&
aa9eede8 917 workerUsage.tasks.executing === 0) ||
7b56f532 918 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
919 workerUsage.tasks.executing === 0 &&
920 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12
JB
921 ) {
922 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
7c89e6a4
JB
923 const destroyWorkerNodeBounded = this.destroyWorkerNode.bind(this)
924 if (isAsyncFunction(destroyWorkerNodeBounded)) {
925 (
926 destroyWorkerNodeBounded as (workerNodeKey: number) => Promise<void>
927 )(localWorkerNodeKey).catch(EMPTY_FUNCTION)
928 } else {
929 (destroyWorkerNodeBounded as (workerNodeKey: number) => void)(
930 localWorkerNodeKey
931 )
932 }
930dcf12
JB
933 }
934 })
aa9eede8 935 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 936 this.sendToWorker(workerNodeKey, {
b0a4db63 937 checkActive: true,
21f710aa
JB
938 workerId: workerInfo.id as number
939 })
b5e113f6
JB
940 workerInfo.dynamic = true
941 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
942 workerInfo.ready = true
943 }
aa9eede8 944 return workerNodeKey
930dcf12
JB
945 }
946
a2ed5053 947 /**
aa9eede8 948 * Registers a listener callback on the worker given its worker node key.
a2ed5053 949 *
aa9eede8 950 * @param workerNodeKey - The worker node key.
a2ed5053
JB
951 * @param listener - The message listener callback.
952 */
85aeb3f3
JB
953 protected abstract registerWorkerMessageListener<
954 Message extends Data | Response
aa9eede8
JB
955 >(
956 workerNodeKey: number,
957 listener: (message: MessageValue<Message>) => void
958 ): void
a2ed5053
JB
959
960 /**
aa9eede8 961 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
962 * Can be overridden.
963 *
aa9eede8 964 * @param workerNodeKey - The newly created worker node key.
a2ed5053 965 */
aa9eede8 966 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 967 // Listen to worker messages.
aa9eede8 968 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 969 // Send the startup message to worker.
aa9eede8
JB
970 this.sendStartupMessageToWorker(workerNodeKey)
971 // Send the worker statistics message to worker.
972 this.sendWorkerStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
973 }
974
85aeb3f3 975 /**
aa9eede8
JB
976 * Sends the startup message to worker given its worker node key.
977 *
978 * @param workerNodeKey - The worker node key.
979 */
980 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
981
982 /**
983 * Sends the worker statistics message to worker given its worker node key.
85aeb3f3 984 *
aa9eede8 985 * @param workerNodeKey - The worker node key.
85aeb3f3 986 */
aa9eede8
JB
987 private sendWorkerStatisticsMessageToWorker (workerNodeKey: number): void {
988 this.sendToWorker(workerNodeKey, {
989 statistics: {
990 runTime:
991 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
992 .runTime.aggregate,
993 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
994 .elu.aggregate
995 },
996 workerId: this.getWorkerInfo(workerNodeKey).id as number
997 })
998 }
a2ed5053
JB
999
1000 private redistributeQueuedTasks (workerNodeKey: number): void {
1001 while (this.tasksQueueSize(workerNodeKey) > 0) {
1002 let targetWorkerNodeKey: number = workerNodeKey
1003 let minQueuedTasks = Infinity
10ecf8fd 1004 let executeTask = false
a2ed5053
JB
1005 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1006 const workerInfo = this.getWorkerInfo(workerNodeId)
1007 if (
1008 workerNodeId !== workerNodeKey &&
1009 workerInfo.ready &&
1010 workerNode.usage.tasks.queued === 0
1011 ) {
a5ed75b7
JB
1012 if (
1013 this.workerNodes[workerNodeId].usage.tasks.executing <
1014 (this.opts.tasksQueueOptions?.concurrency as number)
1015 ) {
10ecf8fd
JB
1016 executeTask = true
1017 }
a2ed5053
JB
1018 targetWorkerNodeKey = workerNodeId
1019 break
1020 }
1021 if (
1022 workerNodeId !== workerNodeKey &&
1023 workerInfo.ready &&
1024 workerNode.usage.tasks.queued < minQueuedTasks
1025 ) {
1026 minQueuedTasks = workerNode.usage.tasks.queued
1027 targetWorkerNodeKey = workerNodeId
1028 }
1029 }
10ecf8fd
JB
1030 if (executeTask) {
1031 this.executeTask(
1032 targetWorkerNodeKey,
1033 this.dequeueTask(workerNodeKey) as Task<Data>
1034 )
1035 } else {
1036 this.enqueueTask(
1037 targetWorkerNodeKey,
1038 this.dequeueTask(workerNodeKey) as Task<Data>
1039 )
1040 }
a2ed5053
JB
1041 }
1042 }
1043
be0676b3 1044 /**
aa9eede8 1045 * This method is the listener registered for each worker message.
be0676b3 1046 *
bdacc2d2 1047 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1048 */
1049 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1050 return message => {
21f710aa 1051 this.checkMessageWorkerId(message)
d2c73f82 1052 if (message.ready != null) {
10e2aa7e
JB
1053 // Worker ready response received
1054 this.handleWorkerReadyResponse(message)
f59e1027 1055 } else if (message.id != null) {
a3445496 1056 // Task execution response received
6b272951
JB
1057 this.handleTaskExecutionResponse(message)
1058 }
1059 }
1060 }
1061
10e2aa7e 1062 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1063 this.getWorkerInfo(
aad6fb64 1064 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1065 ).ready = message.ready as boolean
2431bdb4
JB
1066 if (this.emitter != null && this.ready) {
1067 this.emitter.emit(PoolEvents.ready, this.info)
1068 }
6b272951
JB
1069 }
1070
1071 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1072 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1073 if (promiseResponse != null) {
1074 if (message.taskError != null) {
2a69b8c5 1075 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1076 promiseResponse.reject(message.taskError.message)
1077 } else {
1078 promiseResponse.resolve(message.data as Response)
1079 }
501aea93
JB
1080 const workerNodeKey = promiseResponse.workerNodeKey
1081 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1082 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1083 if (
1084 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1085 this.tasksQueueSize(workerNodeKey) > 0 &&
1086 this.workerNodes[workerNodeKey].usage.tasks.executing <
1087 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1088 ) {
1089 this.executeTask(
1090 workerNodeKey,
1091 this.dequeueTask(workerNodeKey) as Task<Data>
1092 )
be0676b3 1093 }
6b272951 1094 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1095 }
be0676b3 1096 }
7c0ba920 1097
ff733df7 1098 private checkAndEmitEvents (): void {
1f68cede 1099 if (this.emitter != null) {
ff733df7 1100 if (this.busy) {
2845f2a5 1101 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1102 }
6b27d407 1103 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1104 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1105 }
164d950a
JB
1106 }
1107 }
1108
8a1260a3 1109 /**
aa9eede8 1110 * Gets the worker information given its worker node key.
8a1260a3
JB
1111 *
1112 * @param workerNodeKey - The worker node key.
3f09ed9f 1113 * @returns The worker information.
8a1260a3 1114 */
aa9eede8 1115 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1116 return this.workerNodes[workerNodeKey].info
e221309a
JB
1117 }
1118
a05c10de 1119 /**
b0a4db63 1120 * Adds the given worker in the pool worker nodes.
ea7a90d3 1121 *
38e795c1 1122 * @param worker - The worker.
aa9eede8
JB
1123 * @returns The added worker node key.
1124 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1125 */
b0a4db63 1126 private addWorkerNode (worker: Worker): number {
cc3ab78b 1127 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1128 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1129 if (this.starting) {
1130 workerNode.info.ready = true
1131 }
aa9eede8 1132 this.workerNodes.push(workerNode)
aad6fb64 1133 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1134 if (workerNodeKey === -1) {
1135 throw new Error('Worker node not found')
1136 }
1137 return workerNodeKey
ea7a90d3 1138 }
c923ce56 1139
51fe3d3c 1140 /**
f06e48d8 1141 * Removes the given worker from the pool worker nodes.
51fe3d3c 1142 *
f06e48d8 1143 * @param worker - The worker.
51fe3d3c 1144 */
416fd65c 1145 private removeWorkerNode (worker: Worker): void {
aad6fb64 1146 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1147 if (workerNodeKey !== -1) {
1148 this.workerNodes.splice(workerNodeKey, 1)
1149 this.workerChoiceStrategyContext.remove(workerNodeKey)
1150 }
51fe3d3c 1151 }
adc3c320 1152
b0a4db63 1153 /**
aa9eede8 1154 * Executes the given task on the worker given its worker node key.
b0a4db63 1155 *
aa9eede8 1156 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1157 * @param task - The task to execute.
1158 */
2e81254d 1159 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1160 this.beforeTaskExecutionHook(workerNodeKey, task)
aa9eede8 1161 this.sendToWorker(workerNodeKey, task)
2e81254d
JB
1162 }
1163
f9f00b5f 1164 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1165 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1166 }
1167
416fd65c 1168 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1169 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1170 }
1171
416fd65c 1172 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1173 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1174 }
1175
416fd65c 1176 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1177 while (this.tasksQueueSize(workerNodeKey) > 0) {
1178 this.executeTask(
1179 workerNodeKey,
1180 this.dequeueTask(workerNodeKey) as Task<Data>
1181 )
ff733df7 1182 }
4b628b48 1183 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1184 }
1185
ef41a6e6
JB
1186 private flushTasksQueues (): void {
1187 for (const [workerNodeKey] of this.workerNodes.entries()) {
1188 this.flushTasksQueue(workerNodeKey)
1189 }
1190 }
c97c7edb 1191}