perf: remove unneeded branching in message handling code
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
90d7d101
JB
95 /**
96 * The task function names.
97 */
98 private taskFunctions!: string[]
afe0d5bf 99
729c563d
S
100 /**
101 * Constructs a new poolifier pool.
102 *
38e795c1 103 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 104 * @param filePath - Path to the worker file.
38e795c1 105 * @param opts - Options for the pool.
729c563d 106 */
c97c7edb 107 public constructor (
b4213b7f
JB
108 protected readonly numberOfWorkers: number,
109 protected readonly filePath: string,
110 protected readonly opts: PoolOptions<Worker>
c97c7edb 111 ) {
78cea37e 112 if (!this.isMain()) {
04f45163 113 throw new Error(
8c6d4acf 114 'Cannot start a pool from a worker with the same type as the pool'
04f45163 115 )
c97c7edb 116 }
8d3782fa 117 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 118 this.checkFilePath(this.filePath)
7c0ba920 119 this.checkPoolOptions(this.opts)
1086026a 120
7254e419
JB
121 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
122 this.executeTask = this.executeTask.bind(this)
123 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 124 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 125 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 126
6bd72cd0 127 if (this.opts.enableEvents === true) {
7c0ba920
JB
128 this.emitter = new PoolEmitter()
129 }
d59df138
JB
130 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
131 Worker,
132 Data,
133 Response
da309861
JB
134 >(
135 this,
136 this.opts.workerChoiceStrategy,
137 this.opts.workerChoiceStrategyOptions
138 )
b6b32453
JB
139
140 this.setupHook()
141
075e51d1 142 this.starting = true
e761c033 143 this.startPool()
075e51d1 144 this.starting = false
afe0d5bf
JB
145
146 this.startTimestamp = performance.now()
c97c7edb
S
147 }
148
a35560ba 149 private checkFilePath (filePath: string): void {
ffcbbad8
JB
150 if (
151 filePath == null ||
3d6dd312 152 typeof filePath !== 'string' ||
ffcbbad8
JB
153 (typeof filePath === 'string' && filePath.trim().length === 0)
154 ) {
c510fea7
APA
155 throw new Error('Please specify a file with a worker implementation')
156 }
3d6dd312
JB
157 if (!existsSync(filePath)) {
158 throw new Error(`Cannot find the worker file '${filePath}'`)
159 }
c510fea7
APA
160 }
161
8d3782fa
JB
162 private checkNumberOfWorkers (numberOfWorkers: number): void {
163 if (numberOfWorkers == null) {
164 throw new Error(
165 'Cannot instantiate a pool without specifying the number of workers'
166 )
78cea37e 167 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 168 throw new TypeError(
0d80593b 169 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
170 )
171 } else if (numberOfWorkers < 0) {
473c717a 172 throw new RangeError(
8d3782fa
JB
173 'Cannot instantiate a pool with a negative number of workers'
174 )
6b27d407 175 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
176 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
177 }
178 }
179
180 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 181 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
182 if (max == null) {
183 throw new Error(
184 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
185 )
186 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
187 throw new TypeError(
188 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
189 )
190 } else if (min > max) {
079de991
JB
191 throw new RangeError(
192 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
193 )
b97d82d8 194 } else if (max === 0) {
079de991 195 throw new RangeError(
d640b48b 196 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
197 )
198 } else if (min === max) {
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
201 )
202 }
8d3782fa
JB
203 }
204 }
205
7c0ba920 206 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
207 if (isPlainObject(opts)) {
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
210 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
211 this.opts.workerChoiceStrategyOptions =
212 opts.workerChoiceStrategyOptions ??
213 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 230 }
aee46736
JB
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 237 throw new Error(
aee46736 238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
239 )
240 }
7c0ba920
JB
241 }
242
0d80593b
JB
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
49be33fe
JB
251 if (
252 workerChoiceStrategyOptions.weights != null &&
6b27d407 253 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
f0d7f803
JB
259 if (
260 workerChoiceStrategyOptions.measurement != null &&
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
0d80593b
JB
269 }
270
a20f0ba5
JB
271 private checkValidTasksQueueOptions (
272 tasksQueueOptions: TasksQueueOptions
273 ): void {
0d80593b
JB
274 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
275 throw new TypeError('Invalid tasks queue options: must be a plain object')
276 }
f0d7f803
JB
277 if (
278 tasksQueueOptions?.concurrency != null &&
279 !Number.isSafeInteger(tasksQueueOptions.concurrency)
280 ) {
281 throw new TypeError(
282 'Invalid worker tasks concurrency: must be an integer'
283 )
284 }
285 if (
286 tasksQueueOptions?.concurrency != null &&
287 tasksQueueOptions.concurrency <= 0
288 ) {
a20f0ba5 289 throw new Error(
f0d7f803 290 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
291 )
292 }
293 }
294
e761c033
JB
295 private startPool (): void {
296 while (
297 this.workerNodes.reduce(
298 (accumulator, workerNode) =>
299 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
300 0
301 ) < this.numberOfWorkers
302 ) {
aa9eede8 303 this.createAndSetupWorkerNode()
e761c033
JB
304 }
305 }
306
08f3f44c 307 /** @inheritDoc */
6b27d407
JB
308 public get info (): PoolInfo {
309 return {
23ccf9d7 310 version,
6b27d407 311 type: this.type,
184855e6 312 worker: this.worker,
2431bdb4
JB
313 ready: this.ready,
314 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
315 minSize: this.minSize,
316 maxSize: this.maxSize,
c05f0d50
JB
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate &&
1305e9a8
JB
319 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
321 workerNodes: this.workerNodes.length,
322 idleWorkerNodes: this.workerNodes.reduce(
323 (accumulator, workerNode) =>
f59e1027 324 workerNode.usage.tasks.executing === 0
a4e07f72
JB
325 ? accumulator + 1
326 : accumulator,
6b27d407
JB
327 0
328 ),
329 busyWorkerNodes: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
f59e1027 331 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
332 0
333 ),
a4e07f72 334 executedTasks: this.workerNodes.reduce(
6b27d407 335 (accumulator, workerNode) =>
f59e1027 336 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
337 0
338 ),
339 executingTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
f59e1027 341 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
342 0
343 ),
daf86646
JB
344 ...(this.opts.enableTasksQueue === true && {
345 queuedTasks: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
347 accumulator + workerNode.usage.tasks.queued,
348 0
349 )
350 }),
351 ...(this.opts.enableTasksQueue === true && {
352 maxQueuedTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
355 0
356 )
357 }),
a4e07f72
JB
358 failedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
f59e1027 360 accumulator + workerNode.usage.tasks.failed,
a4e07f72 361 0
1dcf8b7b
JB
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.aggregate && {
365 runTime: {
98e72cda
JB
366 minimum: round(
367 Math.min(
368 ...this.workerNodes.map(
8ebe6c30 369 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 370 )
1dcf8b7b
JB
371 )
372 ),
98e72cda
JB
373 maximum: round(
374 Math.max(
375 ...this.workerNodes.map(
8ebe6c30 376 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 377 )
1dcf8b7b 378 )
98e72cda
JB
379 ),
380 average: round(
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
384 0
385 ) /
386 this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + (workerNode.usage.tasks?.executed ?? 0),
389 0
390 )
391 ),
392 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
393 .runTime.median && {
394 median: round(
395 median(
396 this.workerNodes.map(
8ebe6c30 397 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
398 )
399 )
400 )
401 })
1dcf8b7b
JB
402 }
403 }),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.aggregate && {
406 waitTime: {
98e72cda
JB
407 minimum: round(
408 Math.min(
409 ...this.workerNodes.map(
8ebe6c30 410 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 411 )
1dcf8b7b
JB
412 )
413 ),
98e72cda
JB
414 maximum: round(
415 Math.max(
416 ...this.workerNodes.map(
8ebe6c30 417 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 418 )
1dcf8b7b 419 )
98e72cda
JB
420 ),
421 average: round(
422 this.workerNodes.reduce(
423 (accumulator, workerNode) =>
424 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
425 0
426 ) /
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 accumulator + (workerNode.usage.tasks?.executed ?? 0),
430 0
431 )
432 ),
433 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
434 .waitTime.median && {
435 median: round(
436 median(
437 this.workerNodes.map(
8ebe6c30 438 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
439 )
440 )
441 )
442 })
1dcf8b7b
JB
443 }
444 })
6b27d407
JB
445 }
446 }
08f3f44c 447
aa9eede8
JB
448 /**
449 * The pool readiness boolean status.
450 */
2431bdb4
JB
451 private get ready (): boolean {
452 return (
b97d82d8
JB
453 this.workerNodes.reduce(
454 (accumulator, workerNode) =>
455 !workerNode.info.dynamic && workerNode.info.ready
456 ? accumulator + 1
457 : accumulator,
458 0
459 ) >= this.minSize
2431bdb4
JB
460 )
461 }
462
afe0d5bf 463 /**
aa9eede8 464 * The approximate pool utilization.
afe0d5bf
JB
465 *
466 * @returns The pool utilization.
467 */
468 private get utilization (): number {
8e5ca040 469 const poolTimeCapacity =
fe7d90db 470 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
471 const totalTasksRunTime = this.workerNodes.reduce(
472 (accumulator, workerNode) =>
71514351 473 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
474 0
475 )
476 const totalTasksWaitTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
71514351 478 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
479 0
480 )
8e5ca040 481 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
482 }
483
8881ae32 484 /**
aa9eede8 485 * The pool type.
8881ae32
JB
486 *
487 * If it is `'dynamic'`, it provides the `max` property.
488 */
489 protected abstract get type (): PoolType
490
184855e6 491 /**
aa9eede8 492 * The worker type.
184855e6
JB
493 */
494 protected abstract get worker (): WorkerType
495
c2ade475 496 /**
aa9eede8 497 * The pool minimum size.
c2ade475 498 */
6b27d407 499 protected abstract get minSize (): number
ff733df7
JB
500
501 /**
aa9eede8 502 * The pool maximum size.
ff733df7 503 */
6b27d407 504 protected abstract get maxSize (): number
a35560ba 505
6b813701
JB
506 /**
507 * Checks if the worker id sent in the received message from a worker is valid.
508 *
509 * @param message - The received message.
510 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
511 */
21f710aa 512 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
513 if (message.workerId == null) {
514 throw new Error('Worker message received without worker id')
515 } else if (
21f710aa 516 message.workerId != null &&
aad6fb64 517 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
518 ) {
519 throw new Error(
520 `Worker message received from unknown worker '${message.workerId}'`
521 )
522 }
523 }
524
ffcbbad8 525 /**
f06e48d8 526 * Gets the given worker its worker node key.
ffcbbad8
JB
527 *
528 * @param worker - The worker.
f59e1027 529 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 530 */
aad6fb64 531 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 532 return this.workerNodes.findIndex(
8ebe6c30 533 (workerNode) => workerNode.worker === worker
f06e48d8 534 )
bf9549ae
JB
535 }
536
aa9eede8
JB
537 /**
538 * Gets the worker node key given its worker id.
539 *
540 * @param workerId - The worker id.
aad6fb64 541 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 542 */
aad6fb64
JB
543 private getWorkerNodeKeyByWorkerId (workerId: number): number {
544 return this.workerNodes.findIndex(
8ebe6c30 545 (workerNode) => workerNode.info.id === workerId
aad6fb64 546 )
aa9eede8
JB
547 }
548
afc003b2 549 /** @inheritDoc */
a35560ba 550 public setWorkerChoiceStrategy (
59219cbb
JB
551 workerChoiceStrategy: WorkerChoiceStrategy,
552 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 553 ): void {
aee46736 554 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 555 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
556 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
557 this.opts.workerChoiceStrategy
558 )
559 if (workerChoiceStrategyOptions != null) {
560 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
561 }
aa9eede8 562 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 563 workerNode.resetUsage()
9edb9717 564 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 565 }
a20f0ba5
JB
566 }
567
568 /** @inheritDoc */
569 public setWorkerChoiceStrategyOptions (
570 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
571 ): void {
0d80593b 572 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
573 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
574 this.workerChoiceStrategyContext.setOptions(
575 this.opts.workerChoiceStrategyOptions
a35560ba
S
576 )
577 }
578
a20f0ba5 579 /** @inheritDoc */
8f52842f
JB
580 public enableTasksQueue (
581 enable: boolean,
582 tasksQueueOptions?: TasksQueueOptions
583 ): void {
a20f0ba5 584 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 585 this.flushTasksQueues()
a20f0ba5
JB
586 }
587 this.opts.enableTasksQueue = enable
8f52842f 588 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
589 }
590
591 /** @inheritDoc */
8f52842f 592 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 593 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
594 this.checkValidTasksQueueOptions(tasksQueueOptions)
595 this.opts.tasksQueueOptions =
596 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 597 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
598 delete this.opts.tasksQueueOptions
599 }
600 }
601
602 private buildTasksQueueOptions (
603 tasksQueueOptions: TasksQueueOptions
604 ): TasksQueueOptions {
605 return {
606 concurrency: tasksQueueOptions?.concurrency ?? 1
607 }
608 }
609
c319c66b
JB
610 /**
611 * Whether the pool is full or not.
612 *
613 * The pool filling boolean status.
614 */
dea903a8
JB
615 protected get full (): boolean {
616 return this.workerNodes.length >= this.maxSize
617 }
c2ade475 618
c319c66b
JB
619 /**
620 * Whether the pool is busy or not.
621 *
622 * The pool busyness boolean status.
623 */
624 protected abstract get busy (): boolean
7c0ba920 625
6c6afb84 626 /**
3d76750a 627 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
628 *
629 * @returns Worker nodes busyness boolean status.
630 */
c2ade475 631 protected internalBusy (): boolean {
3d76750a
JB
632 if (this.opts.enableTasksQueue === true) {
633 return (
634 this.workerNodes.findIndex(
8ebe6c30 635 (workerNode) =>
3d76750a
JB
636 workerNode.info.ready &&
637 workerNode.usage.tasks.executing <
638 (this.opts.tasksQueueOptions?.concurrency as number)
639 ) === -1
640 )
641 } else {
642 return (
643 this.workerNodes.findIndex(
8ebe6c30 644 (workerNode) =>
3d76750a
JB
645 workerNode.info.ready && workerNode.usage.tasks.executing === 0
646 ) === -1
647 )
648 }
cb70b19d
JB
649 }
650
90d7d101
JB
651 /** @inheritDoc */
652 public listTaskFunctions (): string[] {
653 if (this.taskFunctions != null) {
654 return this.taskFunctions
655 } else {
656 return []
657 }
658 }
659
afc003b2 660 /** @inheritDoc */
7d91a8cd
JB
661 public async execute (
662 data?: Data,
663 name?: string,
664 transferList?: TransferListItem[]
665 ): Promise<Response> {
52b71763 666 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
667 if (name != null && typeof name !== 'string') {
668 reject(new TypeError('name argument must be a string'))
669 }
90d7d101
JB
670 if (
671 name != null &&
672 typeof name === 'string' &&
673 name.trim().length === 0
674 ) {
f58b60b9 675 reject(new TypeError('name argument must not be an empty string'))
90d7d101 676 }
cea399c8
JB
677 if (
678 name != null &&
679 this.taskFunctions != null &&
680 !this.taskFunctions.includes(name)
681 ) {
90d7d101
JB
682 reject(
683 new Error(`Task function '${name}' is not registered in the pool`)
684 )
685 }
7d91a8cd
JB
686 if (transferList != null && !Array.isArray(transferList)) {
687 reject(new TypeError('transferList argument must be an array'))
688 }
52b71763
JB
689 const timestamp = performance.now()
690 const workerNodeKey = this.chooseWorkerNode()
501aea93 691 const task: Task<Data> = {
52b71763
JB
692 name: name ?? DEFAULT_TASK_NAME,
693 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
694 data: data ?? ({} as Data),
7d91a8cd 695 transferList,
52b71763
JB
696 timestamp,
697 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 698 taskId: randomUUID()
52b71763 699 }
7629bdf1 700 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
701 resolve,
702 reject,
501aea93 703 workerNodeKey
2e81254d 704 })
52b71763 705 if (
4e377863
JB
706 this.opts.enableTasksQueue === false ||
707 (this.opts.enableTasksQueue === true &&
708 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 709 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 710 ) {
501aea93 711 this.executeTask(workerNodeKey, task)
4e377863
JB
712 } else {
713 this.enqueueTask(workerNodeKey, task)
52b71763
JB
714 }
715 this.checkAndEmitEvents()
2e81254d 716 })
280c2a77 717 }
c97c7edb 718
afc003b2 719 /** @inheritDoc */
c97c7edb 720 public async destroy (): Promise<void> {
1fbcaa7c 721 await Promise.all(
81c02522 722 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 723 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
724 })
725 )
ef3891a3 726 this.emitter?.emit(PoolEvents.destroy)
c97c7edb
S
727 }
728
1e3214b6
JB
729 protected async sendKillMessageToWorker (
730 workerNodeKey: number,
731 workerId: number
732 ): Promise<void> {
9edb9717 733 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
734 this.registerWorkerMessageListener(workerNodeKey, (message) => {
735 if (message.kill === 'success') {
736 resolve()
737 } else if (message.kill === 'failure') {
e1af34e6 738 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
739 }
740 })
9edb9717 741 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 742 })
1e3214b6
JB
743 }
744
4a6952ff 745 /**
aa9eede8 746 * Terminates the worker node given its worker node key.
4a6952ff 747 *
aa9eede8 748 * @param workerNodeKey - The worker node key.
4a6952ff 749 */
81c02522 750 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 751
729c563d 752 /**
6677a3d3
JB
753 * Setup hook to execute code before worker nodes are created in the abstract constructor.
754 * Can be overridden.
afc003b2
JB
755 *
756 * @virtual
729c563d 757 */
280c2a77 758 protected setupHook (): void {
d99ba5a8 759 // Intentionally empty
280c2a77 760 }
c97c7edb 761
729c563d 762 /**
280c2a77
S
763 * Should return whether the worker is the main worker or not.
764 */
765 protected abstract isMain (): boolean
766
767 /**
2e81254d 768 * Hook executed before the worker task execution.
bf9549ae 769 * Can be overridden.
729c563d 770 *
f06e48d8 771 * @param workerNodeKey - The worker node key.
1c6fe997 772 * @param task - The task to execute.
729c563d 773 */
1c6fe997
JB
774 protected beforeTaskExecutionHook (
775 workerNodeKey: number,
776 task: Task<Data>
777 ): void {
f59e1027 778 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
779 ++workerUsage.tasks.executing
780 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 781 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
782 task.name as string
783 ) as WorkerUsage
eb8afc8a
JB
784 ++taskWorkerUsage.tasks.executing
785 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
786 }
787
c01733f1 788 /**
2e81254d 789 * Hook executed after the worker task execution.
bf9549ae 790 * Can be overridden.
c01733f1 791 *
501aea93 792 * @param workerNodeKey - The worker node key.
38e795c1 793 * @param message - The received message.
c01733f1 794 */
2e81254d 795 protected afterTaskExecutionHook (
501aea93 796 workerNodeKey: number,
2740a743 797 message: MessageValue<Response>
bf9549ae 798 ): void {
ff128cc9 799 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
800 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
801 this.updateRunTimeWorkerUsage(workerUsage, message)
802 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 803 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 804 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 805 ) as WorkerUsage
eb8afc8a
JB
806 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
807 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
808 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
809 }
810
811 private updateTaskStatisticsWorkerUsage (
812 workerUsage: WorkerUsage,
813 message: MessageValue<Response>
814 ): void {
a4e07f72
JB
815 const workerTaskStatistics = workerUsage.tasks
816 --workerTaskStatistics.executing
98e72cda
JB
817 if (message.taskError == null) {
818 ++workerTaskStatistics.executed
819 } else {
a4e07f72 820 ++workerTaskStatistics.failed
2740a743 821 }
f8eb0a2a
JB
822 }
823
a4e07f72
JB
824 private updateRunTimeWorkerUsage (
825 workerUsage: WorkerUsage,
f8eb0a2a
JB
826 message: MessageValue<Response>
827 ): void {
e4f20deb
JB
828 updateMeasurementStatistics(
829 workerUsage.runTime,
830 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
831 message.taskPerformance?.runTime ?? 0,
832 workerUsage.tasks.executed
833 )
f8eb0a2a
JB
834 }
835
a4e07f72
JB
836 private updateWaitTimeWorkerUsage (
837 workerUsage: WorkerUsage,
1c6fe997 838 task: Task<Data>
f8eb0a2a 839 ): void {
1c6fe997
JB
840 const timestamp = performance.now()
841 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
842 updateMeasurementStatistics(
843 workerUsage.waitTime,
844 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
845 taskWaitTime,
846 workerUsage.tasks.executed
847 )
c01733f1 848 }
849
a4e07f72 850 private updateEluWorkerUsage (
5df69fab 851 workerUsage: WorkerUsage,
62c15a68
JB
852 message: MessageValue<Response>
853 ): void {
008512c7
JB
854 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
855 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
856 updateMeasurementStatistics(
857 workerUsage.elu.active,
008512c7 858 eluTaskStatisticsRequirements,
e4f20deb
JB
859 message.taskPerformance?.elu?.active ?? 0,
860 workerUsage.tasks.executed
861 )
862 updateMeasurementStatistics(
863 workerUsage.elu.idle,
008512c7 864 eluTaskStatisticsRequirements,
e4f20deb
JB
865 message.taskPerformance?.elu?.idle ?? 0,
866 workerUsage.tasks.executed
867 )
008512c7 868 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 869 if (message.taskPerformance?.elu != null) {
f7510105
JB
870 if (workerUsage.elu.utilization != null) {
871 workerUsage.elu.utilization =
872 (workerUsage.elu.utilization +
873 message.taskPerformance.elu.utilization) /
874 2
875 } else {
876 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
877 }
62c15a68
JB
878 }
879 }
880 }
881
280c2a77 882 /**
f06e48d8 883 * Chooses a worker node for the next task.
280c2a77 884 *
6c6afb84 885 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 886 *
aa9eede8 887 * @returns The chosen worker node key
280c2a77 888 */
6c6afb84 889 private chooseWorkerNode (): number {
930dcf12 890 if (this.shallCreateDynamicWorker()) {
aa9eede8 891 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
892 if (
893 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
894 ) {
aa9eede8 895 return workerNodeKey
6c6afb84 896 }
17393ac8 897 }
930dcf12
JB
898 return this.workerChoiceStrategyContext.execute()
899 }
900
6c6afb84
JB
901 /**
902 * Conditions for dynamic worker creation.
903 *
904 * @returns Whether to create a dynamic worker or not.
905 */
906 private shallCreateDynamicWorker (): boolean {
930dcf12 907 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
908 }
909
280c2a77 910 /**
aa9eede8 911 * Sends a message to worker given its worker node key.
280c2a77 912 *
aa9eede8 913 * @param workerNodeKey - The worker node key.
38e795c1 914 * @param message - The message.
7d91a8cd 915 * @param transferList - The optional array of transferable objects.
280c2a77
S
916 */
917 protected abstract sendToWorker (
aa9eede8 918 workerNodeKey: number,
7d91a8cd
JB
919 message: MessageValue<Data>,
920 transferList?: TransferListItem[]
280c2a77
S
921 ): void
922
729c563d 923 /**
41344292 924 * Creates a new worker.
6c6afb84
JB
925 *
926 * @returns Newly created worker.
729c563d 927 */
280c2a77 928 protected abstract createWorker (): Worker
c97c7edb 929
4a6952ff 930 /**
aa9eede8 931 * Creates a new, completely set up worker node.
4a6952ff 932 *
aa9eede8 933 * @returns New, completely set up worker node key.
4a6952ff 934 */
aa9eede8 935 protected createAndSetupWorkerNode (): number {
bdacc2d2 936 const worker = this.createWorker()
280c2a77 937
fd04474e 938 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 939 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 940 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 941 worker.on('error', (error) => {
aad6fb64 942 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
943 const workerInfo = this.getWorkerInfo(workerNodeKey)
944 workerInfo.ready = false
0dc838e3 945 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 946 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 947 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 948 if (workerInfo.dynamic) {
aa9eede8 949 this.createAndSetupDynamicWorkerNode()
8a1260a3 950 } else {
aa9eede8 951 this.createAndSetupWorkerNode()
8a1260a3 952 }
5baee0d7 953 }
19dbc45b 954 if (this.opts.enableTasksQueue === true) {
9b106837 955 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 956 }
5baee0d7 957 })
a35560ba 958 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 959 worker.once('exit', () => {
f06e48d8 960 this.removeWorkerNode(worker)
a974afa6 961 })
280c2a77 962
aa9eede8 963 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 964
aa9eede8 965 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 966
aa9eede8 967 return workerNodeKey
c97c7edb 968 }
be0676b3 969
930dcf12 970 /**
aa9eede8 971 * Creates a new, completely set up dynamic worker node.
930dcf12 972 *
aa9eede8 973 * @returns New, completely set up dynamic worker node key.
930dcf12 974 */
aa9eede8
JB
975 protected createAndSetupDynamicWorkerNode (): number {
976 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 977 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
978 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
979 message.workerId
aad6fb64 980 )
aa9eede8 981 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 982 // Kill message received from worker
930dcf12
JB
983 if (
984 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 985 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 986 ((this.opts.enableTasksQueue === false &&
aa9eede8 987 workerUsage.tasks.executing === 0) ||
7b56f532 988 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
989 workerUsage.tasks.executing === 0 &&
990 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 991 ) {
5270d253
JB
992 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
993 this.emitter?.emit(PoolEvents.error, error)
994 })
930dcf12
JB
995 }
996 })
aa9eede8 997 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 998 this.sendToWorker(workerNodeKey, {
b0a4db63 999 checkActive: true,
21f710aa
JB
1000 workerId: workerInfo.id as number
1001 })
b5e113f6
JB
1002 workerInfo.dynamic = true
1003 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1004 workerInfo.ready = true
1005 }
aa9eede8 1006 return workerNodeKey
930dcf12
JB
1007 }
1008
a2ed5053 1009 /**
aa9eede8 1010 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1011 *
aa9eede8 1012 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1013 * @param listener - The message listener callback.
1014 */
85aeb3f3
JB
1015 protected abstract registerWorkerMessageListener<
1016 Message extends Data | Response
aa9eede8
JB
1017 >(
1018 workerNodeKey: number,
1019 listener: (message: MessageValue<Message>) => void
1020 ): void
a2ed5053
JB
1021
1022 /**
aa9eede8 1023 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1024 * Can be overridden.
1025 *
aa9eede8 1026 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1027 */
aa9eede8 1028 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1029 // Listen to worker messages.
aa9eede8 1030 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1031 // Send the startup message to worker.
aa9eede8 1032 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1033 // Send the statistics message to worker.
1034 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1035 }
1036
85aeb3f3 1037 /**
aa9eede8
JB
1038 * Sends the startup message to worker given its worker node key.
1039 *
1040 * @param workerNodeKey - The worker node key.
1041 */
1042 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1043
1044 /**
9edb9717 1045 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1046 *
aa9eede8 1047 * @param workerNodeKey - The worker node key.
85aeb3f3 1048 */
9edb9717 1049 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1050 this.sendToWorker(workerNodeKey, {
1051 statistics: {
1052 runTime:
1053 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1054 .runTime.aggregate,
1055 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1056 .elu.aggregate
1057 },
1058 workerId: this.getWorkerInfo(workerNodeKey).id as number
1059 })
1060 }
a2ed5053
JB
1061
1062 private redistributeQueuedTasks (workerNodeKey: number): void {
1063 while (this.tasksQueueSize(workerNodeKey) > 0) {
1064 let targetWorkerNodeKey: number = workerNodeKey
1065 let minQueuedTasks = Infinity
10ecf8fd 1066 let executeTask = false
a2ed5053
JB
1067 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1068 const workerInfo = this.getWorkerInfo(workerNodeId)
1069 if (
1070 workerNodeId !== workerNodeKey &&
1071 workerInfo.ready &&
1072 workerNode.usage.tasks.queued === 0
1073 ) {
a5ed75b7
JB
1074 if (
1075 this.workerNodes[workerNodeId].usage.tasks.executing <
1076 (this.opts.tasksQueueOptions?.concurrency as number)
1077 ) {
10ecf8fd
JB
1078 executeTask = true
1079 }
a2ed5053
JB
1080 targetWorkerNodeKey = workerNodeId
1081 break
1082 }
1083 if (
1084 workerNodeId !== workerNodeKey &&
1085 workerInfo.ready &&
1086 workerNode.usage.tasks.queued < minQueuedTasks
1087 ) {
1088 minQueuedTasks = workerNode.usage.tasks.queued
1089 targetWorkerNodeKey = workerNodeId
1090 }
1091 }
10ecf8fd
JB
1092 if (executeTask) {
1093 this.executeTask(
1094 targetWorkerNodeKey,
1095 this.dequeueTask(workerNodeKey) as Task<Data>
1096 )
1097 } else {
1098 this.enqueueTask(
1099 targetWorkerNodeKey,
1100 this.dequeueTask(workerNodeKey) as Task<Data>
1101 )
1102 }
a2ed5053
JB
1103 }
1104 }
1105
be0676b3 1106 /**
aa9eede8 1107 * This method is the listener registered for each worker message.
be0676b3 1108 *
bdacc2d2 1109 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1110 */
1111 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1112 return (message) => {
21f710aa 1113 this.checkMessageWorkerId(message)
d2c73f82 1114 if (message.ready != null) {
81c02522 1115 // Worker ready response received from worker
10e2aa7e 1116 this.handleWorkerReadyResponse(message)
7629bdf1 1117 } else if (message.taskId != null) {
81c02522 1118 // Task execution response received from worker
6b272951 1119 this.handleTaskExecutionResponse(message)
90d7d101
JB
1120 } else if (message.taskFunctions != null) {
1121 // Task functions message received from worker
1122 this.taskFunctions = message.taskFunctions
6b272951
JB
1123 }
1124 }
1125 }
1126
10e2aa7e 1127 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
aa9eede8 1128 this.getWorkerInfo(
aad6fb64 1129 this.getWorkerNodeKeyByWorkerId(message.workerId)
e221309a 1130 ).ready = message.ready as boolean
2431bdb4
JB
1131 if (this.emitter != null && this.ready) {
1132 this.emitter.emit(PoolEvents.ready, this.info)
1133 }
6b272951
JB
1134 }
1135
1136 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
7629bdf1
JB
1137 const promiseResponse = this.promiseResponseMap.get(
1138 message.taskId as string
1139 )
6b272951
JB
1140 if (promiseResponse != null) {
1141 if (message.taskError != null) {
2a69b8c5 1142 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1143 promiseResponse.reject(message.taskError.message)
1144 } else {
1145 promiseResponse.resolve(message.data as Response)
1146 }
501aea93
JB
1147 const workerNodeKey = promiseResponse.workerNodeKey
1148 this.afterTaskExecutionHook(workerNodeKey, message)
7629bdf1 1149 this.promiseResponseMap.delete(message.taskId as string)
6b272951
JB
1150 if (
1151 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1152 this.tasksQueueSize(workerNodeKey) > 0 &&
1153 this.workerNodes[workerNodeKey].usage.tasks.executing <
1154 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1155 ) {
1156 this.executeTask(
1157 workerNodeKey,
1158 this.dequeueTask(workerNodeKey) as Task<Data>
1159 )
be0676b3 1160 }
6b272951 1161 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1162 }
be0676b3 1163 }
7c0ba920 1164
ff733df7 1165 private checkAndEmitEvents (): void {
1f68cede 1166 if (this.emitter != null) {
ff733df7 1167 if (this.busy) {
2845f2a5 1168 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1169 }
6b27d407 1170 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1171 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1172 }
164d950a
JB
1173 }
1174 }
1175
8a1260a3 1176 /**
aa9eede8 1177 * Gets the worker information given its worker node key.
8a1260a3
JB
1178 *
1179 * @param workerNodeKey - The worker node key.
3f09ed9f 1180 * @returns The worker information.
8a1260a3 1181 */
aa9eede8 1182 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1183 return this.workerNodes[workerNodeKey].info
e221309a
JB
1184 }
1185
a05c10de 1186 /**
b0a4db63 1187 * Adds the given worker in the pool worker nodes.
ea7a90d3 1188 *
38e795c1 1189 * @param worker - The worker.
aa9eede8
JB
1190 * @returns The added worker node key.
1191 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1192 */
b0a4db63 1193 private addWorkerNode (worker: Worker): number {
cc3ab78b 1194 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1195 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1196 if (this.starting) {
1197 workerNode.info.ready = true
1198 }
aa9eede8 1199 this.workerNodes.push(workerNode)
aad6fb64 1200 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8
JB
1201 if (workerNodeKey === -1) {
1202 throw new Error('Worker node not found')
1203 }
1204 return workerNodeKey
ea7a90d3 1205 }
c923ce56 1206
51fe3d3c 1207 /**
f06e48d8 1208 * Removes the given worker from the pool worker nodes.
51fe3d3c 1209 *
f06e48d8 1210 * @param worker - The worker.
51fe3d3c 1211 */
416fd65c 1212 private removeWorkerNode (worker: Worker): void {
aad6fb64 1213 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1214 if (workerNodeKey !== -1) {
1215 this.workerNodes.splice(workerNodeKey, 1)
1216 this.workerChoiceStrategyContext.remove(workerNodeKey)
1217 }
51fe3d3c 1218 }
adc3c320 1219
b0a4db63 1220 /**
aa9eede8 1221 * Executes the given task on the worker given its worker node key.
b0a4db63 1222 *
aa9eede8 1223 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1224 * @param task - The task to execute.
1225 */
2e81254d 1226 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1227 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1228 this.sendToWorker(workerNodeKey, task, task.transferList)
2e81254d
JB
1229 }
1230
f9f00b5f 1231 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1232 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1233 }
1234
416fd65c 1235 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1236 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1237 }
1238
416fd65c 1239 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1240 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1241 }
1242
81c02522 1243 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1244 while (this.tasksQueueSize(workerNodeKey) > 0) {
1245 this.executeTask(
1246 workerNodeKey,
1247 this.dequeueTask(workerNodeKey) as Task<Data>
1248 )
ff733df7 1249 }
4b628b48 1250 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1251 }
1252
ef41a6e6
JB
1253 private flushTasksQueues (): void {
1254 for (const [workerNodeKey] of this.workerNodes.entries()) {
1255 this.flushTasksQueue(workerNodeKey)
1256 }
1257 }
c97c7edb 1258}