feat: untangle worker choice strategies tasks distribution and dynamic worker creatio...
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
8735b4e5
JB
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
075e51d1 92 /**
adc9cc64 93 * Whether the pool is starting or not.
075e51d1
JB
94 */
95 private readonly starting: boolean
afe0d5bf
JB
96 /**
97 * The start timestamp of the pool.
98 */
99 private readonly startTimestamp
100
729c563d
S
101 /**
102 * Constructs a new poolifier pool.
103 *
38e795c1 104 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 105 * @param filePath - Path to the worker file.
38e795c1 106 * @param opts - Options for the pool.
729c563d 107 */
c97c7edb 108 public constructor (
b4213b7f
JB
109 protected readonly numberOfWorkers: number,
110 protected readonly filePath: string,
111 protected readonly opts: PoolOptions<Worker>
c97c7edb 112 ) {
78cea37e 113 if (!this.isMain()) {
04f45163 114 throw new Error(
8c6d4acf 115 'Cannot start a pool from a worker with the same type as the pool'
04f45163 116 )
c97c7edb 117 }
8d3782fa 118 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 119 this.checkFilePath(this.filePath)
7c0ba920 120 this.checkPoolOptions(this.opts)
1086026a 121
7254e419
JB
122 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
123 this.executeTask = this.executeTask.bind(this)
124 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 125
6bd72cd0 126 if (this.opts.enableEvents === true) {
7c0ba920
JB
127 this.emitter = new PoolEmitter()
128 }
d59df138
JB
129 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
130 Worker,
131 Data,
132 Response
da309861
JB
133 >(
134 this,
135 this.opts.workerChoiceStrategy,
136 this.opts.workerChoiceStrategyOptions
137 )
b6b32453
JB
138
139 this.setupHook()
140
075e51d1 141 this.starting = true
e761c033 142 this.startPool()
075e51d1 143 this.starting = false
afe0d5bf
JB
144
145 this.startTimestamp = performance.now()
c97c7edb
S
146 }
147
a35560ba 148 private checkFilePath (filePath: string): void {
ffcbbad8
JB
149 if (
150 filePath == null ||
3d6dd312 151 typeof filePath !== 'string' ||
ffcbbad8
JB
152 (typeof filePath === 'string' && filePath.trim().length === 0)
153 ) {
c510fea7
APA
154 throw new Error('Please specify a file with a worker implementation')
155 }
3d6dd312
JB
156 if (!existsSync(filePath)) {
157 throw new Error(`Cannot find the worker file '${filePath}'`)
158 }
c510fea7
APA
159 }
160
8d3782fa
JB
161 private checkNumberOfWorkers (numberOfWorkers: number): void {
162 if (numberOfWorkers == null) {
163 throw new Error(
164 'Cannot instantiate a pool without specifying the number of workers'
165 )
78cea37e 166 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 167 throw new TypeError(
0d80593b 168 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
169 )
170 } else if (numberOfWorkers < 0) {
473c717a 171 throw new RangeError(
8d3782fa
JB
172 'Cannot instantiate a pool with a negative number of workers'
173 )
6b27d407 174 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
175 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
176 }
177 }
178
179 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 180 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
181 if (max == null) {
182 throw new Error(
183 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
184 )
185 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
186 throw new TypeError(
187 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
188 )
189 } else if (min > max) {
079de991
JB
190 throw new RangeError(
191 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
192 )
b97d82d8 193 } else if (max === 0) {
079de991 194 throw new RangeError(
d640b48b 195 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
196 )
197 } else if (min === max) {
198 throw new RangeError(
199 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
200 )
201 }
8d3782fa
JB
202 }
203 }
204
7c0ba920 205 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
206 if (isPlainObject(opts)) {
207 this.opts.workerChoiceStrategy =
208 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
209 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
210 this.opts.workerChoiceStrategyOptions = {
211 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
212 ...opts.workerChoiceStrategyOptions
213 }
49be33fe
JB
214 this.checkValidWorkerChoiceStrategyOptions(
215 this.opts.workerChoiceStrategyOptions
216 )
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
221 this.checkValidTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
225 opts.tasksQueueOptions as TasksQueueOptions
226 )
227 }
228 } else {
229 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 230 }
aee46736
JB
231 }
232
233 private checkValidWorkerChoiceStrategy (
234 workerChoiceStrategy: WorkerChoiceStrategy
235 ): void {
236 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 237 throw new Error(
aee46736 238 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
239 )
240 }
7c0ba920
JB
241 }
242
0d80593b
JB
243 private checkValidWorkerChoiceStrategyOptions (
244 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
245 ): void {
246 if (!isPlainObject(workerChoiceStrategyOptions)) {
247 throw new TypeError(
248 'Invalid worker choice strategy options: must be a plain object'
249 )
250 }
8990357d
JB
251 if (
252 workerChoiceStrategyOptions.choiceRetries != null &&
253 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
254 ) {
255 throw new TypeError(
256 'Invalid worker choice strategy options: choice retries must be an integer'
257 )
258 }
259 if (
260 workerChoiceStrategyOptions.choiceRetries != null &&
261 workerChoiceStrategyOptions.choiceRetries <= 0
262 ) {
263 throw new RangeError(
264 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
265 )
266 }
49be33fe
JB
267 if (
268 workerChoiceStrategyOptions.weights != null &&
6b27d407 269 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
270 ) {
271 throw new Error(
272 'Invalid worker choice strategy options: must have a weight for each worker node'
273 )
274 }
f0d7f803
JB
275 if (
276 workerChoiceStrategyOptions.measurement != null &&
277 !Object.values(Measurements).includes(
278 workerChoiceStrategyOptions.measurement
279 )
280 ) {
281 throw new Error(
282 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
283 )
284 }
0d80593b
JB
285 }
286
a20f0ba5
JB
287 private checkValidTasksQueueOptions (
288 tasksQueueOptions: TasksQueueOptions
289 ): void {
0d80593b
JB
290 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
291 throw new TypeError('Invalid tasks queue options: must be a plain object')
292 }
f0d7f803
JB
293 if (
294 tasksQueueOptions?.concurrency != null &&
295 !Number.isSafeInteger(tasksQueueOptions.concurrency)
296 ) {
297 throw new TypeError(
298 'Invalid worker tasks concurrency: must be an integer'
299 )
300 }
301 if (
302 tasksQueueOptions?.concurrency != null &&
303 tasksQueueOptions.concurrency <= 0
304 ) {
a20f0ba5 305 throw new Error(
8735b4e5 306 `Invalid worker tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
a20f0ba5
JB
307 )
308 }
309 }
310
e761c033
JB
311 private startPool (): void {
312 while (
313 this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
316 0
317 ) < this.numberOfWorkers
318 ) {
aa9eede8 319 this.createAndSetupWorkerNode()
e761c033
JB
320 }
321 }
322
08f3f44c 323 /** @inheritDoc */
6b27d407
JB
324 public get info (): PoolInfo {
325 return {
23ccf9d7 326 version,
6b27d407 327 type: this.type,
184855e6 328 worker: this.worker,
2431bdb4
JB
329 ready: this.ready,
330 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
331 minSize: this.minSize,
332 maxSize: this.maxSize,
c05f0d50
JB
333 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
334 .runTime.aggregate &&
1305e9a8
JB
335 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
336 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
337 workerNodes: this.workerNodes.length,
338 idleWorkerNodes: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
f59e1027 340 workerNode.usage.tasks.executing === 0
a4e07f72
JB
341 ? accumulator + 1
342 : accumulator,
6b27d407
JB
343 0
344 ),
345 busyWorkerNodes: this.workerNodes.reduce(
346 (accumulator, workerNode) =>
f59e1027 347 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
348 0
349 ),
a4e07f72 350 executedTasks: this.workerNodes.reduce(
6b27d407 351 (accumulator, workerNode) =>
f59e1027 352 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
353 0
354 ),
355 executingTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
f59e1027 357 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
358 0
359 ),
daf86646
JB
360 ...(this.opts.enableTasksQueue === true && {
361 queuedTasks: this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + workerNode.usage.tasks.queued,
364 0
365 )
366 }),
367 ...(this.opts.enableTasksQueue === true && {
368 maxQueuedTasks: this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
371 0
372 )
373 }),
a1763c54
JB
374 ...(this.opts.enableTasksQueue === true && {
375 backPressure: this.hasBackPressure()
376 }),
a4e07f72
JB
377 failedTasks: this.workerNodes.reduce(
378 (accumulator, workerNode) =>
f59e1027 379 accumulator + workerNode.usage.tasks.failed,
a4e07f72 380 0
1dcf8b7b
JB
381 ),
382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
383 .runTime.aggregate && {
384 runTime: {
98e72cda
JB
385 minimum: round(
386 Math.min(
387 ...this.workerNodes.map(
8ebe6c30 388 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 389 )
1dcf8b7b
JB
390 )
391 ),
98e72cda
JB
392 maximum: round(
393 Math.max(
394 ...this.workerNodes.map(
8ebe6c30 395 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 396 )
1dcf8b7b 397 )
98e72cda
JB
398 ),
399 average: round(
400 this.workerNodes.reduce(
401 (accumulator, workerNode) =>
402 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
403 0
404 ) /
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.tasks?.executed ?? 0),
408 0
409 )
410 ),
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .runTime.median && {
413 median: round(
414 median(
415 this.workerNodes.map(
8ebe6c30 416 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
417 )
418 )
419 )
420 })
1dcf8b7b
JB
421 }
422 }),
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.aggregate && {
425 waitTime: {
98e72cda
JB
426 minimum: round(
427 Math.min(
428 ...this.workerNodes.map(
8ebe6c30 429 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 430 )
1dcf8b7b
JB
431 )
432 ),
98e72cda
JB
433 maximum: round(
434 Math.max(
435 ...this.workerNodes.map(
8ebe6c30 436 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 437 )
1dcf8b7b 438 )
98e72cda
JB
439 ),
440 average: round(
441 this.workerNodes.reduce(
442 (accumulator, workerNode) =>
443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
444 0
445 ) /
446 this.workerNodes.reduce(
447 (accumulator, workerNode) =>
448 accumulator + (workerNode.usage.tasks?.executed ?? 0),
449 0
450 )
451 ),
452 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
453 .waitTime.median && {
454 median: round(
455 median(
456 this.workerNodes.map(
8ebe6c30 457 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
458 )
459 )
460 )
461 })
1dcf8b7b
JB
462 }
463 })
6b27d407
JB
464 }
465 }
08f3f44c 466
aa9eede8
JB
467 /**
468 * The pool readiness boolean status.
469 */
2431bdb4
JB
470 private get ready (): boolean {
471 return (
b97d82d8
JB
472 this.workerNodes.reduce(
473 (accumulator, workerNode) =>
474 !workerNode.info.dynamic && workerNode.info.ready
475 ? accumulator + 1
476 : accumulator,
477 0
478 ) >= this.minSize
2431bdb4
JB
479 )
480 }
481
afe0d5bf 482 /**
aa9eede8 483 * The approximate pool utilization.
afe0d5bf
JB
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
8e5ca040 488 const poolTimeCapacity =
fe7d90db 489 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
490 const totalTasksRunTime = this.workerNodes.reduce(
491 (accumulator, workerNode) =>
71514351 492 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
493 0
494 )
495 const totalTasksWaitTime = this.workerNodes.reduce(
496 (accumulator, workerNode) =>
71514351 497 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
498 0
499 )
8e5ca040 500 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
501 }
502
8881ae32 503 /**
aa9eede8 504 * The pool type.
8881ae32
JB
505 *
506 * If it is `'dynamic'`, it provides the `max` property.
507 */
508 protected abstract get type (): PoolType
509
184855e6 510 /**
aa9eede8 511 * The worker type.
184855e6
JB
512 */
513 protected abstract get worker (): WorkerType
514
c2ade475 515 /**
aa9eede8 516 * The pool minimum size.
c2ade475 517 */
8735b4e5
JB
518 protected get minSize (): number {
519 return this.numberOfWorkers
520 }
ff733df7
JB
521
522 /**
aa9eede8 523 * The pool maximum size.
ff733df7 524 */
8735b4e5
JB
525 protected get maxSize (): number {
526 return this.max ?? this.numberOfWorkers
527 }
a35560ba 528
6b813701
JB
529 /**
530 * Checks if the worker id sent in the received message from a worker is valid.
531 *
532 * @param message - The received message.
533 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
534 */
21f710aa 535 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
536 if (message.workerId == null) {
537 throw new Error('Worker message received without worker id')
538 } else if (
21f710aa 539 message.workerId != null &&
aad6fb64 540 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
541 ) {
542 throw new Error(
543 `Worker message received from unknown worker '${message.workerId}'`
544 )
545 }
546 }
547
ffcbbad8 548 /**
f06e48d8 549 * Gets the given worker its worker node key.
ffcbbad8
JB
550 *
551 * @param worker - The worker.
f59e1027 552 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 553 */
aad6fb64 554 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 555 return this.workerNodes.findIndex(
8ebe6c30 556 (workerNode) => workerNode.worker === worker
f06e48d8 557 )
bf9549ae
JB
558 }
559
aa9eede8
JB
560 /**
561 * Gets the worker node key given its worker id.
562 *
563 * @param workerId - The worker id.
aad6fb64 564 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 565 */
aad6fb64
JB
566 private getWorkerNodeKeyByWorkerId (workerId: number): number {
567 return this.workerNodes.findIndex(
8ebe6c30 568 (workerNode) => workerNode.info.id === workerId
aad6fb64 569 )
aa9eede8
JB
570 }
571
afc003b2 572 /** @inheritDoc */
a35560ba 573 public setWorkerChoiceStrategy (
59219cbb
JB
574 workerChoiceStrategy: WorkerChoiceStrategy,
575 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 576 ): void {
aee46736 577 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 578 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
579 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
580 this.opts.workerChoiceStrategy
581 )
582 if (workerChoiceStrategyOptions != null) {
583 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
584 }
aa9eede8 585 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 586 workerNode.resetUsage()
9edb9717 587 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 588 }
a20f0ba5
JB
589 }
590
591 /** @inheritDoc */
592 public setWorkerChoiceStrategyOptions (
593 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
594 ): void {
0d80593b 595 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
596 this.opts.workerChoiceStrategyOptions = {
597 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
598 ...workerChoiceStrategyOptions
599 }
a20f0ba5
JB
600 this.workerChoiceStrategyContext.setOptions(
601 this.opts.workerChoiceStrategyOptions
a35560ba
S
602 )
603 }
604
a20f0ba5 605 /** @inheritDoc */
8f52842f
JB
606 public enableTasksQueue (
607 enable: boolean,
608 tasksQueueOptions?: TasksQueueOptions
609 ): void {
a20f0ba5 610 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 611 this.flushTasksQueues()
a20f0ba5
JB
612 }
613 this.opts.enableTasksQueue = enable
8f52842f 614 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
615 }
616
617 /** @inheritDoc */
8f52842f 618 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 619 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
620 this.checkValidTasksQueueOptions(tasksQueueOptions)
621 this.opts.tasksQueueOptions =
622 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 623 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
624 delete this.opts.tasksQueueOptions
625 }
626 }
627
628 private buildTasksQueueOptions (
629 tasksQueueOptions: TasksQueueOptions
630 ): TasksQueueOptions {
631 return {
632 concurrency: tasksQueueOptions?.concurrency ?? 1
633 }
634 }
635
c319c66b
JB
636 /**
637 * Whether the pool is full or not.
638 *
639 * The pool filling boolean status.
640 */
dea903a8
JB
641 protected get full (): boolean {
642 return this.workerNodes.length >= this.maxSize
643 }
c2ade475 644
c319c66b
JB
645 /**
646 * Whether the pool is busy or not.
647 *
648 * The pool busyness boolean status.
649 */
650 protected abstract get busy (): boolean
7c0ba920 651
6c6afb84 652 /**
3d76750a 653 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
654 *
655 * @returns Worker nodes busyness boolean status.
656 */
c2ade475 657 protected internalBusy (): boolean {
3d76750a
JB
658 if (this.opts.enableTasksQueue === true) {
659 return (
660 this.workerNodes.findIndex(
8ebe6c30 661 (workerNode) =>
3d76750a
JB
662 workerNode.info.ready &&
663 workerNode.usage.tasks.executing <
664 (this.opts.tasksQueueOptions?.concurrency as number)
665 ) === -1
666 )
667 } else {
668 return (
669 this.workerNodes.findIndex(
8ebe6c30 670 (workerNode) =>
3d76750a
JB
671 workerNode.info.ready && workerNode.usage.tasks.executing === 0
672 ) === -1
673 )
674 }
cb70b19d
JB
675 }
676
90d7d101
JB
677 /** @inheritDoc */
678 public listTaskFunctions (): string[] {
f2dbbf95
JB
679 for (const workerNode of this.workerNodes) {
680 if (
681 Array.isArray(workerNode.info.taskFunctions) &&
682 workerNode.info.taskFunctions.length > 0
683 ) {
684 return workerNode.info.taskFunctions
685 }
90d7d101 686 }
f2dbbf95 687 return []
90d7d101
JB
688 }
689
afc003b2 690 /** @inheritDoc */
7d91a8cd
JB
691 public async execute (
692 data?: Data,
693 name?: string,
694 transferList?: TransferListItem[]
695 ): Promise<Response> {
52b71763 696 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
697 if (name != null && typeof name !== 'string') {
698 reject(new TypeError('name argument must be a string'))
699 }
90d7d101
JB
700 if (
701 name != null &&
702 typeof name === 'string' &&
703 name.trim().length === 0
704 ) {
f58b60b9 705 reject(new TypeError('name argument must not be an empty string'))
90d7d101 706 }
b558f6b5
JB
707 if (transferList != null && !Array.isArray(transferList)) {
708 reject(new TypeError('transferList argument must be an array'))
709 }
710 const timestamp = performance.now()
711 const workerNodeKey = this.chooseWorkerNode()
94407def 712 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
713 if (
714 name != null &&
a5d15204
JB
715 Array.isArray(workerInfo.taskFunctions) &&
716 !workerInfo.taskFunctions.includes(name)
cea399c8 717 ) {
90d7d101
JB
718 reject(
719 new Error(`Task function '${name}' is not registered in the pool`)
720 )
721 }
501aea93 722 const task: Task<Data> = {
52b71763
JB
723 name: name ?? DEFAULT_TASK_NAME,
724 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
725 data: data ?? ({} as Data),
7d91a8cd 726 transferList,
52b71763 727 timestamp,
a5d15204 728 workerId: workerInfo.id as number,
7629bdf1 729 taskId: randomUUID()
52b71763 730 }
7629bdf1 731 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
732 resolve,
733 reject,
501aea93 734 workerNodeKey
2e81254d 735 })
52b71763 736 if (
4e377863
JB
737 this.opts.enableTasksQueue === false ||
738 (this.opts.enableTasksQueue === true &&
739 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 740 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 741 ) {
501aea93 742 this.executeTask(workerNodeKey, task)
4e377863
JB
743 } else {
744 this.enqueueTask(workerNodeKey, task)
52b71763 745 }
2e81254d 746 })
280c2a77 747 }
c97c7edb 748
afc003b2 749 /** @inheritDoc */
c97c7edb 750 public async destroy (): Promise<void> {
1fbcaa7c 751 await Promise.all(
81c02522 752 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 753 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
754 })
755 )
33e6bb4c 756 this.emitter?.emit(PoolEvents.destroy, this.info)
c97c7edb
S
757 }
758
1e3214b6
JB
759 protected async sendKillMessageToWorker (
760 workerNodeKey: number,
761 workerId: number
762 ): Promise<void> {
9edb9717 763 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
764 this.registerWorkerMessageListener(workerNodeKey, (message) => {
765 if (message.kill === 'success') {
766 resolve()
767 } else if (message.kill === 'failure') {
e1af34e6 768 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
769 }
770 })
9edb9717 771 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 772 })
1e3214b6
JB
773 }
774
4a6952ff 775 /**
aa9eede8 776 * Terminates the worker node given its worker node key.
4a6952ff 777 *
aa9eede8 778 * @param workerNodeKey - The worker node key.
4a6952ff 779 */
81c02522 780 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 781
729c563d 782 /**
6677a3d3
JB
783 * Setup hook to execute code before worker nodes are created in the abstract constructor.
784 * Can be overridden.
afc003b2
JB
785 *
786 * @virtual
729c563d 787 */
280c2a77 788 protected setupHook (): void {
d99ba5a8 789 // Intentionally empty
280c2a77 790 }
c97c7edb 791
729c563d 792 /**
280c2a77
S
793 * Should return whether the worker is the main worker or not.
794 */
795 protected abstract isMain (): boolean
796
797 /**
2e81254d 798 * Hook executed before the worker task execution.
bf9549ae 799 * Can be overridden.
729c563d 800 *
f06e48d8 801 * @param workerNodeKey - The worker node key.
1c6fe997 802 * @param task - The task to execute.
729c563d 803 */
1c6fe997
JB
804 protected beforeTaskExecutionHook (
805 workerNodeKey: number,
806 task: Task<Data>
807 ): void {
94407def
JB
808 if (this.workerNodes[workerNodeKey]?.usage != null) {
809 const workerUsage = this.workerNodes[workerNodeKey].usage
810 ++workerUsage.tasks.executing
811 this.updateWaitTimeWorkerUsage(workerUsage, task)
812 }
813 if (
814 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
815 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
816 task.name as string
817 ) != null
818 ) {
db0e38ee 819 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 820 workerNodeKey
db0e38ee 821 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
94407def
JB
822 if (taskFunctionWorkerUsage != null) {
823 ++taskFunctionWorkerUsage.tasks.executing
824 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
825 }
b558f6b5 826 }
c97c7edb
S
827 }
828
c01733f1 829 /**
2e81254d 830 * Hook executed after the worker task execution.
bf9549ae 831 * Can be overridden.
c01733f1 832 *
501aea93 833 * @param workerNodeKey - The worker node key.
38e795c1 834 * @param message - The received message.
c01733f1 835 */
2e81254d 836 protected afterTaskExecutionHook (
501aea93 837 workerNodeKey: number,
2740a743 838 message: MessageValue<Response>
bf9549ae 839 ): void {
94407def
JB
840 if (this.workerNodes[workerNodeKey]?.usage != null) {
841 const workerUsage = this.workerNodes[workerNodeKey].usage
842 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
843 this.updateRunTimeWorkerUsage(workerUsage, message)
844 this.updateEluWorkerUsage(workerUsage, message)
845 }
846 if (
847 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
848 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
849 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
850 ) != null
851 ) {
db0e38ee 852 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 853 workerNodeKey
db0e38ee 854 ].getTaskFunctionWorkerUsage(
b558f6b5
JB
855 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
856 ) as WorkerUsage
db0e38ee
JB
857 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
858 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
859 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
860 }
861 }
862
db0e38ee
JB
863 /**
864 * Whether the worker node shall update its task function worker usage or not.
865 *
866 * @param workerNodeKey - The worker node key.
867 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
868 */
869 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 870 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 871 return (
94407def 872 workerInfo != null &&
a5d15204 873 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 874 workerInfo.taskFunctions.length > 2
b558f6b5 875 )
f1c06930
JB
876 }
877
878 private updateTaskStatisticsWorkerUsage (
879 workerUsage: WorkerUsage,
880 message: MessageValue<Response>
881 ): void {
a4e07f72 882 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
883 if (
884 workerTaskStatistics.executing != null &&
885 workerTaskStatistics.executing > 0
886 ) {
887 --workerTaskStatistics.executing
888 } else if (
889 workerTaskStatistics.executing != null &&
890 workerTaskStatistics.executing < 0
891 ) {
892 throw new Error(
cf6e3991 893 'Worker usage statistic for tasks executing cannot be negative'
5bb5be17
JB
894 )
895 }
98e72cda
JB
896 if (message.taskError == null) {
897 ++workerTaskStatistics.executed
898 } else {
a4e07f72 899 ++workerTaskStatistics.failed
2740a743 900 }
f8eb0a2a
JB
901 }
902
a4e07f72
JB
903 private updateRunTimeWorkerUsage (
904 workerUsage: WorkerUsage,
f8eb0a2a
JB
905 message: MessageValue<Response>
906 ): void {
e4f20deb
JB
907 updateMeasurementStatistics(
908 workerUsage.runTime,
909 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
910 message.taskPerformance?.runTime ?? 0,
911 workerUsage.tasks.executed
912 )
f8eb0a2a
JB
913 }
914
a4e07f72
JB
915 private updateWaitTimeWorkerUsage (
916 workerUsage: WorkerUsage,
1c6fe997 917 task: Task<Data>
f8eb0a2a 918 ): void {
1c6fe997
JB
919 const timestamp = performance.now()
920 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
921 updateMeasurementStatistics(
922 workerUsage.waitTime,
923 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
924 taskWaitTime,
925 workerUsage.tasks.executed
926 )
c01733f1 927 }
928
a4e07f72 929 private updateEluWorkerUsage (
5df69fab 930 workerUsage: WorkerUsage,
62c15a68
JB
931 message: MessageValue<Response>
932 ): void {
008512c7
JB
933 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
934 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
935 updateMeasurementStatistics(
936 workerUsage.elu.active,
008512c7 937 eluTaskStatisticsRequirements,
e4f20deb
JB
938 message.taskPerformance?.elu?.active ?? 0,
939 workerUsage.tasks.executed
940 )
941 updateMeasurementStatistics(
942 workerUsage.elu.idle,
008512c7 943 eluTaskStatisticsRequirements,
e4f20deb
JB
944 message.taskPerformance?.elu?.idle ?? 0,
945 workerUsage.tasks.executed
946 )
008512c7 947 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 948 if (message.taskPerformance?.elu != null) {
f7510105
JB
949 if (workerUsage.elu.utilization != null) {
950 workerUsage.elu.utilization =
951 (workerUsage.elu.utilization +
952 message.taskPerformance.elu.utilization) /
953 2
954 } else {
955 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
956 }
62c15a68
JB
957 }
958 }
959 }
960
280c2a77 961 /**
f06e48d8 962 * Chooses a worker node for the next task.
280c2a77 963 *
6c6afb84 964 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 965 *
aa9eede8 966 * @returns The chosen worker node key
280c2a77 967 */
6c6afb84 968 private chooseWorkerNode (): number {
930dcf12 969 if (this.shallCreateDynamicWorker()) {
aa9eede8 970 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 971 if (
b1aae695 972 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 973 ) {
aa9eede8 974 return workerNodeKey
6c6afb84 975 }
17393ac8 976 }
930dcf12
JB
977 return this.workerChoiceStrategyContext.execute()
978 }
979
6c6afb84
JB
980 /**
981 * Conditions for dynamic worker creation.
982 *
983 * @returns Whether to create a dynamic worker or not.
984 */
985 private shallCreateDynamicWorker (): boolean {
930dcf12 986 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
987 }
988
280c2a77 989 /**
aa9eede8 990 * Sends a message to worker given its worker node key.
280c2a77 991 *
aa9eede8 992 * @param workerNodeKey - The worker node key.
38e795c1 993 * @param message - The message.
7d91a8cd 994 * @param transferList - The optional array of transferable objects.
280c2a77
S
995 */
996 protected abstract sendToWorker (
aa9eede8 997 workerNodeKey: number,
7d91a8cd
JB
998 message: MessageValue<Data>,
999 transferList?: TransferListItem[]
280c2a77
S
1000 ): void
1001
729c563d 1002 /**
41344292 1003 * Creates a new worker.
6c6afb84
JB
1004 *
1005 * @returns Newly created worker.
729c563d 1006 */
280c2a77 1007 protected abstract createWorker (): Worker
c97c7edb 1008
4a6952ff 1009 /**
aa9eede8 1010 * Creates a new, completely set up worker node.
4a6952ff 1011 *
aa9eede8 1012 * @returns New, completely set up worker node key.
4a6952ff 1013 */
aa9eede8 1014 protected createAndSetupWorkerNode (): number {
bdacc2d2 1015 const worker = this.createWorker()
280c2a77 1016
fd04474e 1017 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1018 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1019 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1020 worker.on('error', (error) => {
aad6fb64 1021 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1022 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1023 workerInfo.ready = false
0dc838e3 1024 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1025 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 1026 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 1027 if (workerInfo.dynamic) {
aa9eede8 1028 this.createAndSetupDynamicWorkerNode()
8a1260a3 1029 } else {
aa9eede8 1030 this.createAndSetupWorkerNode()
8a1260a3 1031 }
5baee0d7 1032 }
19dbc45b 1033 if (this.opts.enableTasksQueue === true) {
9b106837 1034 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1035 }
5baee0d7 1036 })
a35560ba 1037 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1038 worker.once('exit', () => {
f06e48d8 1039 this.removeWorkerNode(worker)
a974afa6 1040 })
280c2a77 1041
aa9eede8 1042 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1043
aa9eede8 1044 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1045
aa9eede8 1046 return workerNodeKey
c97c7edb 1047 }
be0676b3 1048
930dcf12 1049 /**
aa9eede8 1050 * Creates a new, completely set up dynamic worker node.
930dcf12 1051 *
aa9eede8 1052 * @returns New, completely set up dynamic worker node key.
930dcf12 1053 */
aa9eede8
JB
1054 protected createAndSetupDynamicWorkerNode (): number {
1055 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1056 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1057 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1058 message.workerId
aad6fb64 1059 )
aa9eede8 1060 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1061 // Kill message received from worker
930dcf12
JB
1062 if (
1063 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1064 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1065 ((this.opts.enableTasksQueue === false &&
aa9eede8 1066 workerUsage.tasks.executing === 0) ||
7b56f532 1067 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1068 workerUsage.tasks.executing === 0 &&
1069 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1070 ) {
5270d253
JB
1071 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1072 this.emitter?.emit(PoolEvents.error, error)
1073 })
930dcf12
JB
1074 }
1075 })
94407def 1076 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1077 this.sendToWorker(workerNodeKey, {
b0a4db63 1078 checkActive: true,
21f710aa
JB
1079 workerId: workerInfo.id as number
1080 })
b5e113f6 1081 workerInfo.dynamic = true
b1aae695
JB
1082 if (
1083 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1084 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1085 ) {
b5e113f6
JB
1086 workerInfo.ready = true
1087 }
33e6bb4c 1088 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1089 return workerNodeKey
930dcf12
JB
1090 }
1091
a2ed5053 1092 /**
aa9eede8 1093 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1094 *
aa9eede8 1095 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1096 * @param listener - The message listener callback.
1097 */
85aeb3f3
JB
1098 protected abstract registerWorkerMessageListener<
1099 Message extends Data | Response
aa9eede8
JB
1100 >(
1101 workerNodeKey: number,
1102 listener: (message: MessageValue<Message>) => void
1103 ): void
a2ed5053
JB
1104
1105 /**
aa9eede8 1106 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1107 * Can be overridden.
1108 *
aa9eede8 1109 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1110 */
aa9eede8 1111 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1112 // Listen to worker messages.
aa9eede8 1113 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1114 // Send the startup message to worker.
aa9eede8 1115 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1116 // Send the statistics message to worker.
1117 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1118 }
1119
85aeb3f3 1120 /**
aa9eede8
JB
1121 * Sends the startup message to worker given its worker node key.
1122 *
1123 * @param workerNodeKey - The worker node key.
1124 */
1125 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1126
1127 /**
9edb9717 1128 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1129 *
aa9eede8 1130 * @param workerNodeKey - The worker node key.
85aeb3f3 1131 */
9edb9717 1132 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1133 this.sendToWorker(workerNodeKey, {
1134 statistics: {
1135 runTime:
1136 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1137 .runTime.aggregate,
1138 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1139 .elu.aggregate
1140 },
94407def 1141 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1142 })
1143 }
a2ed5053
JB
1144
1145 private redistributeQueuedTasks (workerNodeKey: number): void {
1146 while (this.tasksQueueSize(workerNodeKey) > 0) {
1147 let targetWorkerNodeKey: number = workerNodeKey
1148 let minQueuedTasks = Infinity
10ecf8fd 1149 let executeTask = false
a2ed5053 1150 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
94407def 1151 const workerInfo = this.getWorkerInfo(workerNodeId) as WorkerInfo
a2ed5053
JB
1152 if (
1153 workerNodeId !== workerNodeKey &&
1154 workerInfo.ready &&
1155 workerNode.usage.tasks.queued === 0
1156 ) {
a5ed75b7
JB
1157 if (
1158 this.workerNodes[workerNodeId].usage.tasks.executing <
1159 (this.opts.tasksQueueOptions?.concurrency as number)
1160 ) {
10ecf8fd
JB
1161 executeTask = true
1162 }
a2ed5053
JB
1163 targetWorkerNodeKey = workerNodeId
1164 break
1165 }
1166 if (
1167 workerNodeId !== workerNodeKey &&
1168 workerInfo.ready &&
1169 workerNode.usage.tasks.queued < minQueuedTasks
1170 ) {
1171 minQueuedTasks = workerNode.usage.tasks.queued
1172 targetWorkerNodeKey = workerNodeId
1173 }
1174 }
10ecf8fd
JB
1175 if (executeTask) {
1176 this.executeTask(
1177 targetWorkerNodeKey,
1178 this.dequeueTask(workerNodeKey) as Task<Data>
1179 )
1180 } else {
1181 this.enqueueTask(
1182 targetWorkerNodeKey,
1183 this.dequeueTask(workerNodeKey) as Task<Data>
1184 )
1185 }
a2ed5053
JB
1186 }
1187 }
1188
be0676b3 1189 /**
aa9eede8 1190 * This method is the listener registered for each worker message.
be0676b3 1191 *
bdacc2d2 1192 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1193 */
1194 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1195 return (message) => {
21f710aa 1196 this.checkMessageWorkerId(message)
a5d15204 1197 if (message.ready != null && message.taskFunctions != null) {
81c02522 1198 // Worker ready response received from worker
10e2aa7e 1199 this.handleWorkerReadyResponse(message)
7629bdf1 1200 } else if (message.taskId != null) {
81c02522 1201 // Task execution response received from worker
6b272951 1202 this.handleTaskExecutionResponse(message)
90d7d101
JB
1203 } else if (message.taskFunctions != null) {
1204 // Task functions message received from worker
94407def
JB
1205 (
1206 this.getWorkerInfo(
1207 this.getWorkerNodeKeyByWorkerId(message.workerId)
1208 ) as WorkerInfo
b558f6b5 1209 ).taskFunctions = message.taskFunctions
6b272951
JB
1210 }
1211 }
1212 }
1213
10e2aa7e 1214 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1215 if (message.ready === false) {
1216 throw new Error(`Worker ${message.workerId} failed to initialize`)
1217 }
a5d15204 1218 const workerInfo = this.getWorkerInfo(
aad6fb64 1219 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1220 ) as WorkerInfo
a5d15204
JB
1221 workerInfo.ready = message.ready as boolean
1222 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1223 if (this.emitter != null && this.ready) {
1224 this.emitter.emit(PoolEvents.ready, this.info)
1225 }
6b272951
JB
1226 }
1227
1228 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1229 const { taskId, taskError, data } = message
1230 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1231 if (promiseResponse != null) {
5441aea6
JB
1232 if (taskError != null) {
1233 this.emitter?.emit(PoolEvents.taskError, taskError)
1234 promiseResponse.reject(taskError.message)
6b272951 1235 } else {
5441aea6 1236 promiseResponse.resolve(data as Response)
6b272951 1237 }
501aea93
JB
1238 const workerNodeKey = promiseResponse.workerNodeKey
1239 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1240 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1241 if (
1242 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1243 this.tasksQueueSize(workerNodeKey) > 0 &&
1244 this.workerNodes[workerNodeKey].usage.tasks.executing <
1245 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1246 ) {
1247 this.executeTask(
1248 workerNodeKey,
1249 this.dequeueTask(workerNodeKey) as Task<Data>
1250 )
be0676b3 1251 }
6b272951 1252 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1253 }
be0676b3 1254 }
7c0ba920 1255
a1763c54 1256 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1257 if (this.busy) {
1258 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1259 }
1260 }
1261
1262 private checkAndEmitTaskQueuingEvents (): void {
1263 if (this.hasBackPressure()) {
1264 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1265 }
1266 }
1267
33e6bb4c
JB
1268 private checkAndEmitDynamicWorkerCreationEvents (): void {
1269 if (this.type === PoolTypes.dynamic) {
1270 if (this.full) {
1271 this.emitter?.emit(PoolEvents.full, this.info)
1272 }
1273 }
1274 }
1275
8a1260a3 1276 /**
aa9eede8 1277 * Gets the worker information given its worker node key.
8a1260a3
JB
1278 *
1279 * @param workerNodeKey - The worker node key.
3f09ed9f 1280 * @returns The worker information.
8a1260a3 1281 */
94407def
JB
1282 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1283 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1284 }
1285
a05c10de 1286 /**
b0a4db63 1287 * Adds the given worker in the pool worker nodes.
ea7a90d3 1288 *
38e795c1 1289 * @param worker - The worker.
aa9eede8
JB
1290 * @returns The added worker node key.
1291 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1292 */
b0a4db63 1293 private addWorkerNode (worker: Worker): number {
671d5154
JB
1294 const workerNode = new WorkerNode<Worker, Data>(
1295 worker,
1296 this.worker,
1297 this.maxSize
1298 )
b97d82d8 1299 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1300 if (this.starting) {
1301 workerNode.info.ready = true
1302 }
aa9eede8 1303 this.workerNodes.push(workerNode)
aad6fb64 1304 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1305 if (workerNodeKey === -1) {
a5d15204 1306 throw new Error('Worker node added not found')
aa9eede8
JB
1307 }
1308 return workerNodeKey
ea7a90d3 1309 }
c923ce56 1310
51fe3d3c 1311 /**
f06e48d8 1312 * Removes the given worker from the pool worker nodes.
51fe3d3c 1313 *
f06e48d8 1314 * @param worker - The worker.
51fe3d3c 1315 */
416fd65c 1316 private removeWorkerNode (worker: Worker): void {
aad6fb64 1317 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1318 if (workerNodeKey !== -1) {
1319 this.workerNodes.splice(workerNodeKey, 1)
1320 this.workerChoiceStrategyContext.remove(workerNodeKey)
1321 }
51fe3d3c 1322 }
adc3c320 1323
e2b31e32
JB
1324 /** @inheritDoc */
1325 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1326 return (
e2b31e32
JB
1327 this.opts.enableTasksQueue === true &&
1328 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1329 )
1330 }
1331
1332 private hasBackPressure (): boolean {
1333 return (
1334 this.opts.enableTasksQueue === true &&
1335 this.workerNodes.findIndex(
1336 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1337 ) === -1
9e844245 1338 )
e2b31e32
JB
1339 }
1340
b0a4db63 1341 /**
aa9eede8 1342 * Executes the given task on the worker given its worker node key.
b0a4db63 1343 *
aa9eede8 1344 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1345 * @param task - The task to execute.
1346 */
2e81254d 1347 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1348 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1349 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1350 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1351 }
1352
f9f00b5f 1353 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1354 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1355 this.checkAndEmitTaskQueuingEvents()
1356 return tasksQueueSize
adc3c320
JB
1357 }
1358
416fd65c 1359 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1360 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1361 }
1362
416fd65c 1363 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1364 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1365 }
1366
81c02522 1367 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1368 while (this.tasksQueueSize(workerNodeKey) > 0) {
1369 this.executeTask(
1370 workerNodeKey,
1371 this.dequeueTask(workerNodeKey) as Task<Data>
1372 )
ff733df7 1373 }
4b628b48 1374 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1375 }
1376
ef41a6e6
JB
1377 private flushTasksQueues (): void {
1378 for (const [workerNodeKey] of this.workerNodes.entries()) {
1379 this.flushTasksQueue(workerNodeKey)
1380 }
1381 }
c97c7edb 1382}