build(deps-dev): apply updates
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
04f45163 109 throw new Error(
8c6d4acf 110 'Cannot start a pool from a worker with the same type as the pool'
04f45163 111 )
c97c7edb 112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
10ecf8fd 120 this.dequeueTask = this.dequeueTask.bind(this)
7254e419 121 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 122
6bd72cd0 123 if (this.opts.enableEvents === true) {
7c0ba920
JB
124 this.emitter = new PoolEmitter()
125 }
d59df138
JB
126 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
127 Worker,
128 Data,
129 Response
da309861
JB
130 >(
131 this,
132 this.opts.workerChoiceStrategy,
133 this.opts.workerChoiceStrategyOptions
134 )
b6b32453
JB
135
136 this.setupHook()
137
075e51d1 138 this.starting = true
e761c033 139 this.startPool()
075e51d1 140 this.starting = false
afe0d5bf
JB
141
142 this.startTimestamp = performance.now()
c97c7edb
S
143 }
144
a35560ba 145 private checkFilePath (filePath: string): void {
ffcbbad8
JB
146 if (
147 filePath == null ||
3d6dd312 148 typeof filePath !== 'string' ||
ffcbbad8
JB
149 (typeof filePath === 'string' && filePath.trim().length === 0)
150 ) {
c510fea7
APA
151 throw new Error('Please specify a file with a worker implementation')
152 }
3d6dd312
JB
153 if (!existsSync(filePath)) {
154 throw new Error(`Cannot find the worker file '${filePath}'`)
155 }
c510fea7
APA
156 }
157
8d3782fa
JB
158 private checkNumberOfWorkers (numberOfWorkers: number): void {
159 if (numberOfWorkers == null) {
160 throw new Error(
161 'Cannot instantiate a pool without specifying the number of workers'
162 )
78cea37e 163 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 164 throw new TypeError(
0d80593b 165 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
166 )
167 } else if (numberOfWorkers < 0) {
473c717a 168 throw new RangeError(
8d3782fa
JB
169 'Cannot instantiate a pool with a negative number of workers'
170 )
6b27d407 171 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
172 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
173 }
174 }
175
176 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 177 if (this.type === PoolTypes.dynamic) {
a5ed75b7
JB
178 if (max == null) {
179 throw new Error(
180 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
181 )
182 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
183 throw new TypeError(
184 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
185 )
186 } else if (min > max) {
079de991
JB
187 throw new RangeError(
188 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
189 )
b97d82d8 190 } else if (max === 0) {
079de991 191 throw new RangeError(
d640b48b 192 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
193 )
194 } else if (min === max) {
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
197 )
198 }
8d3782fa
JB
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
203 if (isPlainObject(opts)) {
204 this.opts.workerChoiceStrategy =
205 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
206 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
207 this.opts.workerChoiceStrategyOptions = {
208 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
209 ...opts.workerChoiceStrategyOptions
210 }
49be33fe
JB
211 this.checkValidWorkerChoiceStrategyOptions(
212 this.opts.workerChoiceStrategyOptions
213 )
1f68cede 214 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
215 this.opts.enableEvents = opts.enableEvents ?? true
216 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
217 if (this.opts.enableTasksQueue) {
218 this.checkValidTasksQueueOptions(
219 opts.tasksQueueOptions as TasksQueueOptions
220 )
221 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
222 opts.tasksQueueOptions as TasksQueueOptions
223 )
224 }
225 } else {
226 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 227 }
aee46736
JB
228 }
229
230 private checkValidWorkerChoiceStrategy (
231 workerChoiceStrategy: WorkerChoiceStrategy
232 ): void {
233 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 234 throw new Error(
aee46736 235 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
236 )
237 }
7c0ba920
JB
238 }
239
0d80593b
JB
240 private checkValidWorkerChoiceStrategyOptions (
241 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
242 ): void {
243 if (!isPlainObject(workerChoiceStrategyOptions)) {
244 throw new TypeError(
245 'Invalid worker choice strategy options: must be a plain object'
246 )
247 }
8990357d
JB
248 if (
249 workerChoiceStrategyOptions.choiceRetries != null &&
250 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
251 ) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: choice retries must be an integer'
254 )
255 }
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 workerChoiceStrategyOptions.choiceRetries <= 0
259 ) {
260 throw new RangeError(
261 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
262 )
263 }
49be33fe
JB
264 if (
265 workerChoiceStrategyOptions.weights != null &&
6b27d407 266 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
267 ) {
268 throw new Error(
269 'Invalid worker choice strategy options: must have a weight for each worker node'
270 )
271 }
f0d7f803
JB
272 if (
273 workerChoiceStrategyOptions.measurement != null &&
274 !Object.values(Measurements).includes(
275 workerChoiceStrategyOptions.measurement
276 )
277 ) {
278 throw new Error(
279 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
280 )
281 }
0d80593b
JB
282 }
283
a20f0ba5
JB
284 private checkValidTasksQueueOptions (
285 tasksQueueOptions: TasksQueueOptions
286 ): void {
0d80593b
JB
287 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
288 throw new TypeError('Invalid tasks queue options: must be a plain object')
289 }
f0d7f803
JB
290 if (
291 tasksQueueOptions?.concurrency != null &&
292 !Number.isSafeInteger(tasksQueueOptions.concurrency)
293 ) {
294 throw new TypeError(
295 'Invalid worker tasks concurrency: must be an integer'
296 )
297 }
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 tasksQueueOptions.concurrency <= 0
301 ) {
a20f0ba5 302 throw new Error(
5137e2ae 303 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}' is a negative integer or zero`
a20f0ba5
JB
304 )
305 }
306 }
307
e761c033
JB
308 private startPool (): void {
309 while (
310 this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
313 0
314 ) < this.numberOfWorkers
315 ) {
aa9eede8 316 this.createAndSetupWorkerNode()
e761c033
JB
317 }
318 }
319
08f3f44c 320 /** @inheritDoc */
6b27d407
JB
321 public get info (): PoolInfo {
322 return {
23ccf9d7 323 version,
6b27d407 324 type: this.type,
184855e6 325 worker: this.worker,
2431bdb4
JB
326 ready: this.ready,
327 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
328 minSize: this.minSize,
329 maxSize: this.maxSize,
c05f0d50
JB
330 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
331 .runTime.aggregate &&
1305e9a8
JB
332 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
333 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
334 workerNodes: this.workerNodes.length,
335 idleWorkerNodes: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
f59e1027 337 workerNode.usage.tasks.executing === 0
a4e07f72
JB
338 ? accumulator + 1
339 : accumulator,
6b27d407
JB
340 0
341 ),
342 busyWorkerNodes: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
f59e1027 344 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
345 0
346 ),
a4e07f72 347 executedTasks: this.workerNodes.reduce(
6b27d407 348 (accumulator, workerNode) =>
f59e1027 349 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
350 0
351 ),
352 executingTasks: this.workerNodes.reduce(
353 (accumulator, workerNode) =>
f59e1027 354 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
355 0
356 ),
daf86646
JB
357 ...(this.opts.enableTasksQueue === true && {
358 queuedTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.queued,
361 0
362 )
363 }),
364 ...(this.opts.enableTasksQueue === true && {
365 maxQueuedTasks: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
368 0
369 )
370 }),
a4e07f72
JB
371 failedTasks: this.workerNodes.reduce(
372 (accumulator, workerNode) =>
f59e1027 373 accumulator + workerNode.usage.tasks.failed,
a4e07f72 374 0
1dcf8b7b
JB
375 ),
376 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
377 .runTime.aggregate && {
378 runTime: {
98e72cda
JB
379 minimum: round(
380 Math.min(
381 ...this.workerNodes.map(
8ebe6c30 382 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 383 )
1dcf8b7b
JB
384 )
385 ),
98e72cda
JB
386 maximum: round(
387 Math.max(
388 ...this.workerNodes.map(
8ebe6c30 389 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 390 )
1dcf8b7b 391 )
98e72cda
JB
392 ),
393 average: round(
394 this.workerNodes.reduce(
395 (accumulator, workerNode) =>
396 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
397 0
398 ) /
399 this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.executed ?? 0),
402 0
403 )
404 ),
405 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
406 .runTime.median && {
407 median: round(
408 median(
409 this.workerNodes.map(
8ebe6c30 410 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
411 )
412 )
413 )
414 })
1dcf8b7b
JB
415 }
416 }),
417 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
418 .waitTime.aggregate && {
419 waitTime: {
98e72cda
JB
420 minimum: round(
421 Math.min(
422 ...this.workerNodes.map(
8ebe6c30 423 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 424 )
1dcf8b7b
JB
425 )
426 ),
98e72cda
JB
427 maximum: round(
428 Math.max(
429 ...this.workerNodes.map(
8ebe6c30 430 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 431 )
1dcf8b7b 432 )
98e72cda
JB
433 ),
434 average: round(
435 this.workerNodes.reduce(
436 (accumulator, workerNode) =>
437 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
438 0
439 ) /
440 this.workerNodes.reduce(
441 (accumulator, workerNode) =>
442 accumulator + (workerNode.usage.tasks?.executed ?? 0),
443 0
444 )
445 ),
446 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
447 .waitTime.median && {
448 median: round(
449 median(
450 this.workerNodes.map(
8ebe6c30 451 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
452 )
453 )
454 )
455 })
1dcf8b7b
JB
456 }
457 })
6b27d407
JB
458 }
459 }
08f3f44c 460
aa9eede8
JB
461 /**
462 * The pool readiness boolean status.
463 */
2431bdb4
JB
464 private get ready (): boolean {
465 return (
b97d82d8
JB
466 this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 !workerNode.info.dynamic && workerNode.info.ready
469 ? accumulator + 1
470 : accumulator,
471 0
472 ) >= this.minSize
2431bdb4
JB
473 )
474 }
475
afe0d5bf 476 /**
aa9eede8 477 * The approximate pool utilization.
afe0d5bf
JB
478 *
479 * @returns The pool utilization.
480 */
481 private get utilization (): number {
8e5ca040 482 const poolTimeCapacity =
fe7d90db 483 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
484 const totalTasksRunTime = this.workerNodes.reduce(
485 (accumulator, workerNode) =>
71514351 486 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
487 0
488 )
489 const totalTasksWaitTime = this.workerNodes.reduce(
490 (accumulator, workerNode) =>
71514351 491 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
492 0
493 )
8e5ca040 494 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
495 }
496
8881ae32 497 /**
aa9eede8 498 * The pool type.
8881ae32
JB
499 *
500 * If it is `'dynamic'`, it provides the `max` property.
501 */
502 protected abstract get type (): PoolType
503
184855e6 504 /**
aa9eede8 505 * The worker type.
184855e6
JB
506 */
507 protected abstract get worker (): WorkerType
508
c2ade475 509 /**
aa9eede8 510 * The pool minimum size.
c2ade475 511 */
6b27d407 512 protected abstract get minSize (): number
ff733df7
JB
513
514 /**
aa9eede8 515 * The pool maximum size.
ff733df7 516 */
6b27d407 517 protected abstract get maxSize (): number
a35560ba 518
6b813701
JB
519 /**
520 * Checks if the worker id sent in the received message from a worker is valid.
521 *
522 * @param message - The received message.
523 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
524 */
21f710aa 525 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
526 if (message.workerId == null) {
527 throw new Error('Worker message received without worker id')
528 } else if (
21f710aa 529 message.workerId != null &&
aad6fb64 530 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
531 ) {
532 throw new Error(
533 `Worker message received from unknown worker '${message.workerId}'`
534 )
535 }
536 }
537
ffcbbad8 538 /**
f06e48d8 539 * Gets the given worker its worker node key.
ffcbbad8
JB
540 *
541 * @param worker - The worker.
f59e1027 542 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 543 */
aad6fb64 544 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 545 return this.workerNodes.findIndex(
8ebe6c30 546 (workerNode) => workerNode.worker === worker
f06e48d8 547 )
bf9549ae
JB
548 }
549
aa9eede8
JB
550 /**
551 * Gets the worker node key given its worker id.
552 *
553 * @param workerId - The worker id.
aad6fb64 554 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 555 */
aad6fb64
JB
556 private getWorkerNodeKeyByWorkerId (workerId: number): number {
557 return this.workerNodes.findIndex(
8ebe6c30 558 (workerNode) => workerNode.info.id === workerId
aad6fb64 559 )
aa9eede8
JB
560 }
561
afc003b2 562 /** @inheritDoc */
a35560ba 563 public setWorkerChoiceStrategy (
59219cbb
JB
564 workerChoiceStrategy: WorkerChoiceStrategy,
565 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 566 ): void {
aee46736 567 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 568 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
569 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
570 this.opts.workerChoiceStrategy
571 )
572 if (workerChoiceStrategyOptions != null) {
573 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
574 }
aa9eede8 575 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 576 workerNode.resetUsage()
9edb9717 577 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 578 }
a20f0ba5
JB
579 }
580
581 /** @inheritDoc */
582 public setWorkerChoiceStrategyOptions (
583 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
584 ): void {
0d80593b 585 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
586 this.opts.workerChoiceStrategyOptions = {
587 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
588 ...workerChoiceStrategyOptions
589 }
a20f0ba5
JB
590 this.workerChoiceStrategyContext.setOptions(
591 this.opts.workerChoiceStrategyOptions
a35560ba
S
592 )
593 }
594
a20f0ba5 595 /** @inheritDoc */
8f52842f
JB
596 public enableTasksQueue (
597 enable: boolean,
598 tasksQueueOptions?: TasksQueueOptions
599 ): void {
a20f0ba5 600 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 601 this.flushTasksQueues()
a20f0ba5
JB
602 }
603 this.opts.enableTasksQueue = enable
8f52842f 604 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
605 }
606
607 /** @inheritDoc */
8f52842f 608 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 609 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
610 this.checkValidTasksQueueOptions(tasksQueueOptions)
611 this.opts.tasksQueueOptions =
612 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 613 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
614 delete this.opts.tasksQueueOptions
615 }
616 }
617
618 private buildTasksQueueOptions (
619 tasksQueueOptions: TasksQueueOptions
620 ): TasksQueueOptions {
621 return {
622 concurrency: tasksQueueOptions?.concurrency ?? 1
623 }
624 }
625
c319c66b
JB
626 /**
627 * Whether the pool is full or not.
628 *
629 * The pool filling boolean status.
630 */
dea903a8
JB
631 protected get full (): boolean {
632 return this.workerNodes.length >= this.maxSize
633 }
c2ade475 634
c319c66b
JB
635 /**
636 * Whether the pool is busy or not.
637 *
638 * The pool busyness boolean status.
639 */
640 protected abstract get busy (): boolean
7c0ba920 641
6c6afb84 642 /**
3d76750a 643 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
644 *
645 * @returns Worker nodes busyness boolean status.
646 */
c2ade475 647 protected internalBusy (): boolean {
3d76750a
JB
648 if (this.opts.enableTasksQueue === true) {
649 return (
650 this.workerNodes.findIndex(
8ebe6c30 651 (workerNode) =>
3d76750a
JB
652 workerNode.info.ready &&
653 workerNode.usage.tasks.executing <
654 (this.opts.tasksQueueOptions?.concurrency as number)
655 ) === -1
656 )
657 } else {
658 return (
659 this.workerNodes.findIndex(
8ebe6c30 660 (workerNode) =>
3d76750a
JB
661 workerNode.info.ready && workerNode.usage.tasks.executing === 0
662 ) === -1
663 )
664 }
cb70b19d
JB
665 }
666
90d7d101
JB
667 /** @inheritDoc */
668 public listTaskFunctions (): string[] {
f2dbbf95
JB
669 for (const workerNode of this.workerNodes) {
670 if (
671 Array.isArray(workerNode.info.taskFunctions) &&
672 workerNode.info.taskFunctions.length > 0
673 ) {
674 return workerNode.info.taskFunctions
675 }
90d7d101 676 }
f2dbbf95 677 return []
90d7d101
JB
678 }
679
afc003b2 680 /** @inheritDoc */
7d91a8cd
JB
681 public async execute (
682 data?: Data,
683 name?: string,
684 transferList?: TransferListItem[]
685 ): Promise<Response> {
52b71763 686 return await new Promise<Response>((resolve, reject) => {
7d91a8cd
JB
687 if (name != null && typeof name !== 'string') {
688 reject(new TypeError('name argument must be a string'))
689 }
90d7d101
JB
690 if (
691 name != null &&
692 typeof name === 'string' &&
693 name.trim().length === 0
694 ) {
f58b60b9 695 reject(new TypeError('name argument must not be an empty string'))
90d7d101 696 }
b558f6b5
JB
697 if (transferList != null && !Array.isArray(transferList)) {
698 reject(new TypeError('transferList argument must be an array'))
699 }
700 const timestamp = performance.now()
701 const workerNodeKey = this.chooseWorkerNode()
a5d15204 702 const workerInfo = this.getWorkerInfo(workerNodeKey)
cea399c8
JB
703 if (
704 name != null &&
a5d15204
JB
705 Array.isArray(workerInfo.taskFunctions) &&
706 !workerInfo.taskFunctions.includes(name)
cea399c8 707 ) {
90d7d101
JB
708 reject(
709 new Error(`Task function '${name}' is not registered in the pool`)
710 )
711 }
501aea93 712 const task: Task<Data> = {
52b71763
JB
713 name: name ?? DEFAULT_TASK_NAME,
714 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
715 data: data ?? ({} as Data),
7d91a8cd 716 transferList,
52b71763 717 timestamp,
a5d15204 718 workerId: workerInfo.id as number,
7629bdf1 719 taskId: randomUUID()
52b71763 720 }
7629bdf1 721 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
722 resolve,
723 reject,
501aea93 724 workerNodeKey
2e81254d 725 })
52b71763 726 if (
4e377863
JB
727 this.opts.enableTasksQueue === false ||
728 (this.opts.enableTasksQueue === true &&
729 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 730 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 731 ) {
501aea93 732 this.executeTask(workerNodeKey, task)
4e377863
JB
733 } else {
734 this.enqueueTask(workerNodeKey, task)
52b71763
JB
735 }
736 this.checkAndEmitEvents()
2e81254d 737 })
280c2a77 738 }
c97c7edb 739
afc003b2 740 /** @inheritDoc */
c97c7edb 741 public async destroy (): Promise<void> {
1fbcaa7c 742 await Promise.all(
81c02522 743 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 744 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
745 })
746 )
ef3891a3 747 this.emitter?.emit(PoolEvents.destroy)
c97c7edb
S
748 }
749
1e3214b6
JB
750 protected async sendKillMessageToWorker (
751 workerNodeKey: number,
752 workerId: number
753 ): Promise<void> {
9edb9717 754 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
755 this.registerWorkerMessageListener(workerNodeKey, (message) => {
756 if (message.kill === 'success') {
757 resolve()
758 } else if (message.kill === 'failure') {
e1af34e6 759 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
760 }
761 })
9edb9717 762 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 763 })
1e3214b6
JB
764 }
765
4a6952ff 766 /**
aa9eede8 767 * Terminates the worker node given its worker node key.
4a6952ff 768 *
aa9eede8 769 * @param workerNodeKey - The worker node key.
4a6952ff 770 */
81c02522 771 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 772
729c563d 773 /**
6677a3d3
JB
774 * Setup hook to execute code before worker nodes are created in the abstract constructor.
775 * Can be overridden.
afc003b2
JB
776 *
777 * @virtual
729c563d 778 */
280c2a77 779 protected setupHook (): void {
d99ba5a8 780 // Intentionally empty
280c2a77 781 }
c97c7edb 782
729c563d 783 /**
280c2a77
S
784 * Should return whether the worker is the main worker or not.
785 */
786 protected abstract isMain (): boolean
787
788 /**
2e81254d 789 * Hook executed before the worker task execution.
bf9549ae 790 * Can be overridden.
729c563d 791 *
f06e48d8 792 * @param workerNodeKey - The worker node key.
1c6fe997 793 * @param task - The task to execute.
729c563d 794 */
1c6fe997
JB
795 protected beforeTaskExecutionHook (
796 workerNodeKey: number,
797 task: Task<Data>
798 ): void {
f59e1027 799 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
800 ++workerUsage.tasks.executing
801 this.updateWaitTimeWorkerUsage(workerUsage, task)
db0e38ee
JB
802 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
803 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 804 workerNodeKey
db0e38ee
JB
805 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
806 ++taskFunctionWorkerUsage.tasks.executing
807 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 808 }
c97c7edb
S
809 }
810
c01733f1 811 /**
2e81254d 812 * Hook executed after the worker task execution.
bf9549ae 813 * Can be overridden.
c01733f1 814 *
501aea93 815 * @param workerNodeKey - The worker node key.
38e795c1 816 * @param message - The received message.
c01733f1 817 */
2e81254d 818 protected afterTaskExecutionHook (
501aea93 819 workerNodeKey: number,
2740a743 820 message: MessageValue<Response>
bf9549ae 821 ): void {
ff128cc9 822 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
823 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
824 this.updateRunTimeWorkerUsage(workerUsage, message)
825 this.updateEluWorkerUsage(workerUsage, message)
db0e38ee
JB
826 if (this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey)) {
827 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 828 workerNodeKey
db0e38ee 829 ].getTaskFunctionWorkerUsage(
b558f6b5
JB
830 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
831 ) as WorkerUsage
db0e38ee
JB
832 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
833 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
834 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
835 }
836 }
837
db0e38ee
JB
838 /**
839 * Whether the worker node shall update its task function worker usage or not.
840 *
841 * @param workerNodeKey - The worker node key.
842 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
843 */
844 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 845 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 846 return (
a5d15204 847 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 848 workerInfo.taskFunctions.length > 2
b558f6b5 849 )
f1c06930
JB
850 }
851
852 private updateTaskStatisticsWorkerUsage (
853 workerUsage: WorkerUsage,
854 message: MessageValue<Response>
855 ): void {
a4e07f72 856 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
857 if (
858 workerTaskStatistics.executing != null &&
859 workerTaskStatistics.executing > 0
860 ) {
861 --workerTaskStatistics.executing
862 } else if (
863 workerTaskStatistics.executing != null &&
864 workerTaskStatistics.executing < 0
865 ) {
866 throw new Error(
cf6e3991 867 'Worker usage statistic for tasks executing cannot be negative'
5bb5be17
JB
868 )
869 }
98e72cda
JB
870 if (message.taskError == null) {
871 ++workerTaskStatistics.executed
872 } else {
a4e07f72 873 ++workerTaskStatistics.failed
2740a743 874 }
f8eb0a2a
JB
875 }
876
a4e07f72
JB
877 private updateRunTimeWorkerUsage (
878 workerUsage: WorkerUsage,
f8eb0a2a
JB
879 message: MessageValue<Response>
880 ): void {
e4f20deb
JB
881 updateMeasurementStatistics(
882 workerUsage.runTime,
883 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
884 message.taskPerformance?.runTime ?? 0,
885 workerUsage.tasks.executed
886 )
f8eb0a2a
JB
887 }
888
a4e07f72
JB
889 private updateWaitTimeWorkerUsage (
890 workerUsage: WorkerUsage,
1c6fe997 891 task: Task<Data>
f8eb0a2a 892 ): void {
1c6fe997
JB
893 const timestamp = performance.now()
894 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
895 updateMeasurementStatistics(
896 workerUsage.waitTime,
897 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
898 taskWaitTime,
899 workerUsage.tasks.executed
900 )
c01733f1 901 }
902
a4e07f72 903 private updateEluWorkerUsage (
5df69fab 904 workerUsage: WorkerUsage,
62c15a68
JB
905 message: MessageValue<Response>
906 ): void {
008512c7
JB
907 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
908 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
909 updateMeasurementStatistics(
910 workerUsage.elu.active,
008512c7 911 eluTaskStatisticsRequirements,
e4f20deb
JB
912 message.taskPerformance?.elu?.active ?? 0,
913 workerUsage.tasks.executed
914 )
915 updateMeasurementStatistics(
916 workerUsage.elu.idle,
008512c7 917 eluTaskStatisticsRequirements,
e4f20deb
JB
918 message.taskPerformance?.elu?.idle ?? 0,
919 workerUsage.tasks.executed
920 )
008512c7 921 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 922 if (message.taskPerformance?.elu != null) {
f7510105
JB
923 if (workerUsage.elu.utilization != null) {
924 workerUsage.elu.utilization =
925 (workerUsage.elu.utilization +
926 message.taskPerformance.elu.utilization) /
927 2
928 } else {
929 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
930 }
62c15a68
JB
931 }
932 }
933 }
934
280c2a77 935 /**
f06e48d8 936 * Chooses a worker node for the next task.
280c2a77 937 *
6c6afb84 938 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 939 *
aa9eede8 940 * @returns The chosen worker node key
280c2a77 941 */
6c6afb84 942 private chooseWorkerNode (): number {
930dcf12 943 if (this.shallCreateDynamicWorker()) {
aa9eede8 944 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84
JB
945 if (
946 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
947 ) {
aa9eede8 948 return workerNodeKey
6c6afb84 949 }
17393ac8 950 }
930dcf12
JB
951 return this.workerChoiceStrategyContext.execute()
952 }
953
6c6afb84
JB
954 /**
955 * Conditions for dynamic worker creation.
956 *
957 * @returns Whether to create a dynamic worker or not.
958 */
959 private shallCreateDynamicWorker (): boolean {
930dcf12 960 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
961 }
962
280c2a77 963 /**
aa9eede8 964 * Sends a message to worker given its worker node key.
280c2a77 965 *
aa9eede8 966 * @param workerNodeKey - The worker node key.
38e795c1 967 * @param message - The message.
7d91a8cd 968 * @param transferList - The optional array of transferable objects.
280c2a77
S
969 */
970 protected abstract sendToWorker (
aa9eede8 971 workerNodeKey: number,
7d91a8cd
JB
972 message: MessageValue<Data>,
973 transferList?: TransferListItem[]
280c2a77
S
974 ): void
975
729c563d 976 /**
41344292 977 * Creates a new worker.
6c6afb84
JB
978 *
979 * @returns Newly created worker.
729c563d 980 */
280c2a77 981 protected abstract createWorker (): Worker
c97c7edb 982
4a6952ff 983 /**
aa9eede8 984 * Creates a new, completely set up worker node.
4a6952ff 985 *
aa9eede8 986 * @returns New, completely set up worker node key.
4a6952ff 987 */
aa9eede8 988 protected createAndSetupWorkerNode (): number {
bdacc2d2 989 const worker = this.createWorker()
280c2a77 990
fd04474e 991 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 992 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 993 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 994 worker.on('error', (error) => {
aad6fb64 995 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
9b106837
JB
996 const workerInfo = this.getWorkerInfo(workerNodeKey)
997 workerInfo.ready = false
0dc838e3 998 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 999 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 1000 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 1001 if (workerInfo.dynamic) {
aa9eede8 1002 this.createAndSetupDynamicWorkerNode()
8a1260a3 1003 } else {
aa9eede8 1004 this.createAndSetupWorkerNode()
8a1260a3 1005 }
5baee0d7 1006 }
19dbc45b 1007 if (this.opts.enableTasksQueue === true) {
9b106837 1008 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1009 }
5baee0d7 1010 })
a35560ba 1011 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1012 worker.once('exit', () => {
f06e48d8 1013 this.removeWorkerNode(worker)
a974afa6 1014 })
280c2a77 1015
aa9eede8 1016 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1017
aa9eede8 1018 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1019
aa9eede8 1020 return workerNodeKey
c97c7edb 1021 }
be0676b3 1022
930dcf12 1023 /**
aa9eede8 1024 * Creates a new, completely set up dynamic worker node.
930dcf12 1025 *
aa9eede8 1026 * @returns New, completely set up dynamic worker node key.
930dcf12 1027 */
aa9eede8
JB
1028 protected createAndSetupDynamicWorkerNode (): number {
1029 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1030 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1031 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1032 message.workerId
aad6fb64 1033 )
aa9eede8 1034 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1035 // Kill message received from worker
930dcf12
JB
1036 if (
1037 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1038 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1039 ((this.opts.enableTasksQueue === false &&
aa9eede8 1040 workerUsage.tasks.executing === 0) ||
7b56f532 1041 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1042 workerUsage.tasks.executing === 0 &&
1043 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1044 ) {
5270d253
JB
1045 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1046 this.emitter?.emit(PoolEvents.error, error)
1047 })
930dcf12
JB
1048 }
1049 })
aa9eede8 1050 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1051 this.sendToWorker(workerNodeKey, {
b0a4db63 1052 checkActive: true,
21f710aa
JB
1053 workerId: workerInfo.id as number
1054 })
b5e113f6
JB
1055 workerInfo.dynamic = true
1056 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
1057 workerInfo.ready = true
1058 }
aa9eede8 1059 return workerNodeKey
930dcf12
JB
1060 }
1061
a2ed5053 1062 /**
aa9eede8 1063 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1064 *
aa9eede8 1065 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1066 * @param listener - The message listener callback.
1067 */
85aeb3f3
JB
1068 protected abstract registerWorkerMessageListener<
1069 Message extends Data | Response
aa9eede8
JB
1070 >(
1071 workerNodeKey: number,
1072 listener: (message: MessageValue<Message>) => void
1073 ): void
a2ed5053
JB
1074
1075 /**
aa9eede8 1076 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1077 * Can be overridden.
1078 *
aa9eede8 1079 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1080 */
aa9eede8 1081 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1082 // Listen to worker messages.
aa9eede8 1083 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1084 // Send the startup message to worker.
aa9eede8 1085 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1086 // Send the statistics message to worker.
1087 this.sendStatisticsMessageToWorker(workerNodeKey)
d2c73f82
JB
1088 }
1089
85aeb3f3 1090 /**
aa9eede8
JB
1091 * Sends the startup message to worker given its worker node key.
1092 *
1093 * @param workerNodeKey - The worker node key.
1094 */
1095 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1096
1097 /**
9edb9717 1098 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1099 *
aa9eede8 1100 * @param workerNodeKey - The worker node key.
85aeb3f3 1101 */
9edb9717 1102 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1103 this.sendToWorker(workerNodeKey, {
1104 statistics: {
1105 runTime:
1106 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1107 .runTime.aggregate,
1108 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1109 .elu.aggregate
1110 },
1111 workerId: this.getWorkerInfo(workerNodeKey).id as number
1112 })
1113 }
a2ed5053
JB
1114
1115 private redistributeQueuedTasks (workerNodeKey: number): void {
1116 while (this.tasksQueueSize(workerNodeKey) > 0) {
1117 let targetWorkerNodeKey: number = workerNodeKey
1118 let minQueuedTasks = Infinity
10ecf8fd 1119 let executeTask = false
a2ed5053
JB
1120 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1121 const workerInfo = this.getWorkerInfo(workerNodeId)
1122 if (
1123 workerNodeId !== workerNodeKey &&
1124 workerInfo.ready &&
1125 workerNode.usage.tasks.queued === 0
1126 ) {
a5ed75b7
JB
1127 if (
1128 this.workerNodes[workerNodeId].usage.tasks.executing <
1129 (this.opts.tasksQueueOptions?.concurrency as number)
1130 ) {
10ecf8fd
JB
1131 executeTask = true
1132 }
a2ed5053
JB
1133 targetWorkerNodeKey = workerNodeId
1134 break
1135 }
1136 if (
1137 workerNodeId !== workerNodeKey &&
1138 workerInfo.ready &&
1139 workerNode.usage.tasks.queued < minQueuedTasks
1140 ) {
1141 minQueuedTasks = workerNode.usage.tasks.queued
1142 targetWorkerNodeKey = workerNodeId
1143 }
1144 }
10ecf8fd
JB
1145 if (executeTask) {
1146 this.executeTask(
1147 targetWorkerNodeKey,
1148 this.dequeueTask(workerNodeKey) as Task<Data>
1149 )
1150 } else {
1151 this.enqueueTask(
1152 targetWorkerNodeKey,
1153 this.dequeueTask(workerNodeKey) as Task<Data>
1154 )
1155 }
a2ed5053
JB
1156 }
1157 }
1158
be0676b3 1159 /**
aa9eede8 1160 * This method is the listener registered for each worker message.
be0676b3 1161 *
bdacc2d2 1162 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1163 */
1164 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1165 return (message) => {
21f710aa 1166 this.checkMessageWorkerId(message)
a5d15204 1167 if (message.ready != null && message.taskFunctions != null) {
81c02522 1168 // Worker ready response received from worker
10e2aa7e 1169 this.handleWorkerReadyResponse(message)
7629bdf1 1170 } else if (message.taskId != null) {
81c02522 1171 // Task execution response received from worker
6b272951 1172 this.handleTaskExecutionResponse(message)
90d7d101
JB
1173 } else if (message.taskFunctions != null) {
1174 // Task functions message received from worker
b558f6b5
JB
1175 this.getWorkerInfo(
1176 this.getWorkerNodeKeyByWorkerId(message.workerId)
1177 ).taskFunctions = message.taskFunctions
6b272951
JB
1178 }
1179 }
1180 }
1181
10e2aa7e 1182 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1183 if (message.ready === false) {
1184 throw new Error(`Worker ${message.workerId} failed to initialize`)
1185 }
a5d15204 1186 const workerInfo = this.getWorkerInfo(
aad6fb64 1187 this.getWorkerNodeKeyByWorkerId(message.workerId)
a5d15204
JB
1188 )
1189 workerInfo.ready = message.ready as boolean
1190 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1191 if (this.emitter != null && this.ready) {
1192 this.emitter.emit(PoolEvents.ready, this.info)
1193 }
6b272951
JB
1194 }
1195
1196 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1197 const { taskId, taskError, data } = message
1198 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1199 if (promiseResponse != null) {
5441aea6
JB
1200 if (taskError != null) {
1201 this.emitter?.emit(PoolEvents.taskError, taskError)
1202 promiseResponse.reject(taskError.message)
6b272951 1203 } else {
5441aea6 1204 promiseResponse.resolve(data as Response)
6b272951 1205 }
501aea93
JB
1206 const workerNodeKey = promiseResponse.workerNodeKey
1207 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1208 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1209 if (
1210 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1211 this.tasksQueueSize(workerNodeKey) > 0 &&
1212 this.workerNodes[workerNodeKey].usage.tasks.executing <
1213 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1214 ) {
1215 this.executeTask(
1216 workerNodeKey,
1217 this.dequeueTask(workerNodeKey) as Task<Data>
1218 )
be0676b3 1219 }
6b272951 1220 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1221 }
be0676b3 1222 }
7c0ba920 1223
ff733df7 1224 private checkAndEmitEvents (): void {
1f68cede 1225 if (this.emitter != null) {
ff733df7 1226 if (this.busy) {
2845f2a5 1227 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1228 }
6b27d407 1229 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1230 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1231 }
164d950a
JB
1232 }
1233 }
1234
8a1260a3 1235 /**
aa9eede8 1236 * Gets the worker information given its worker node key.
8a1260a3
JB
1237 *
1238 * @param workerNodeKey - The worker node key.
3f09ed9f 1239 * @returns The worker information.
8a1260a3 1240 */
aa9eede8 1241 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dc02fc29 1242 return this.workerNodes[workerNodeKey].info
e221309a
JB
1243 }
1244
a05c10de 1245 /**
b0a4db63 1246 * Adds the given worker in the pool worker nodes.
ea7a90d3 1247 *
38e795c1 1248 * @param worker - The worker.
aa9eede8
JB
1249 * @returns The added worker node key.
1250 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1251 */
b0a4db63 1252 private addWorkerNode (worker: Worker): number {
671d5154
JB
1253 const workerNode = new WorkerNode<Worker, Data>(
1254 worker,
1255 this.worker,
1256 this.maxSize
1257 )
b97d82d8 1258 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1259 if (this.starting) {
1260 workerNode.info.ready = true
1261 }
aa9eede8 1262 this.workerNodes.push(workerNode)
aad6fb64 1263 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1264 if (workerNodeKey === -1) {
a5d15204 1265 throw new Error('Worker node added not found')
aa9eede8
JB
1266 }
1267 return workerNodeKey
ea7a90d3 1268 }
c923ce56 1269
51fe3d3c 1270 /**
f06e48d8 1271 * Removes the given worker from the pool worker nodes.
51fe3d3c 1272 *
f06e48d8 1273 * @param worker - The worker.
51fe3d3c 1274 */
416fd65c 1275 private removeWorkerNode (worker: Worker): void {
aad6fb64 1276 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1277 if (workerNodeKey !== -1) {
1278 this.workerNodes.splice(workerNodeKey, 1)
1279 this.workerChoiceStrategyContext.remove(workerNodeKey)
1280 }
51fe3d3c 1281 }
adc3c320 1282
e2b31e32
JB
1283 /** @inheritDoc */
1284 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1285 return (
e2b31e32
JB
1286 this.opts.enableTasksQueue === true &&
1287 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1288 )
1289 }
1290
1291 private hasBackPressure (): boolean {
1292 return (
1293 this.opts.enableTasksQueue === true &&
1294 this.workerNodes.findIndex(
1295 (workerNode) => !workerNode.hasBackPressure()
1296 ) !== -1
1297 )
e2b31e32
JB
1298 }
1299
b0a4db63 1300 /**
aa9eede8 1301 * Executes the given task on the worker given its worker node key.
b0a4db63 1302 *
aa9eede8 1303 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1304 * @param task - The task to execute.
1305 */
2e81254d 1306 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1307 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1308 this.sendToWorker(workerNodeKey, task, task.transferList)
2e81254d
JB
1309 }
1310
f9f00b5f 1311 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
e2b31e32 1312 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
9e844245
JB
1313 if (this.hasBackPressure()) {
1314 this.emitter?.emit(PoolEvents.backPressure, this.info)
671d5154 1315 }
e2b31e32 1316 return tasksQueueSize
adc3c320
JB
1317 }
1318
416fd65c 1319 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1320 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1321 }
1322
416fd65c 1323 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1324 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1325 }
1326
81c02522 1327 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1328 while (this.tasksQueueSize(workerNodeKey) > 0) {
1329 this.executeTask(
1330 workerNodeKey,
1331 this.dequeueTask(workerNodeKey) as Task<Data>
1332 )
ff733df7 1333 }
4b628b48 1334 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1335 }
1336
ef41a6e6
JB
1337 private flushTasksQueues (): void {
1338 for (const [workerNodeKey] of this.workerNodes.entries()) {
1339 this.flushTasksQueue(workerNodeKey)
1340 }
1341 }
c97c7edb 1342}