fix: ensure no task can be executed on destroyed pool
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
8 Task
9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
59317253 14 isKillBehavior,
0d80593b 15 isPlainObject,
afe0d5bf 16 median,
e4f20deb
JB
17 round,
18 updateMeasurementStatistics
bbeadd16 19} from '../utils'
59317253 20import { KillBehaviors } from '../worker/worker-options'
c4855468 21import {
65d7a1c9 22 type IPool,
7c5a1080 23 PoolEmitter,
c4855468 24 PoolEvents,
6b27d407 25 type PoolInfo,
c4855468 26 type PoolOptions,
6b27d407
JB
27 type PoolType,
28 PoolTypes,
4b628b48 29 type TasksQueueOptions
c4855468 30} from './pool'
bbfa38a2
JB
31import type {
32 IWorker,
33 IWorkerNode,
34 WorkerInfo,
35 WorkerType,
36 WorkerUsage
e102732c 37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
52b71763 68 * The task execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
501aea93
JB
75 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
76 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
8735b4e5
JB
87 /**
88 * Dynamic pool maximum size property placeholder.
89 */
90 protected readonly max?: number
91
075e51d1 92 /**
adc9cc64 93 * Whether the pool is starting or not.
075e51d1
JB
94 */
95 private readonly starting: boolean
15b176e0
JB
96 /**
97 * Whether the pool is started or not.
98 */
99 private started: boolean
afe0d5bf
JB
100 /**
101 * The start timestamp of the pool.
102 */
103 private readonly startTimestamp
104
729c563d
S
105 /**
106 * Constructs a new poolifier pool.
107 *
38e795c1 108 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 109 * @param filePath - Path to the worker file.
38e795c1 110 * @param opts - Options for the pool.
729c563d 111 */
c97c7edb 112 public constructor (
b4213b7f
JB
113 protected readonly numberOfWorkers: number,
114 protected readonly filePath: string,
115 protected readonly opts: PoolOptions<Worker>
c97c7edb 116 ) {
78cea37e 117 if (!this.isMain()) {
04f45163 118 throw new Error(
8c6d4acf 119 'Cannot start a pool from a worker with the same type as the pool'
04f45163 120 )
c97c7edb 121 }
8d3782fa 122 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 123 this.checkFilePath(this.filePath)
7c0ba920 124 this.checkPoolOptions(this.opts)
1086026a 125
7254e419
JB
126 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
127 this.executeTask = this.executeTask.bind(this)
128 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 129
6bd72cd0 130 if (this.opts.enableEvents === true) {
7c0ba920
JB
131 this.emitter = new PoolEmitter()
132 }
d59df138
JB
133 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
134 Worker,
135 Data,
136 Response
da309861
JB
137 >(
138 this,
139 this.opts.workerChoiceStrategy,
140 this.opts.workerChoiceStrategyOptions
141 )
b6b32453
JB
142
143 this.setupHook()
144
075e51d1 145 this.starting = true
e761c033 146 this.startPool()
075e51d1 147 this.starting = false
15b176e0 148 this.started = true
afe0d5bf
JB
149
150 this.startTimestamp = performance.now()
c97c7edb
S
151 }
152
a35560ba 153 private checkFilePath (filePath: string): void {
ffcbbad8
JB
154 if (
155 filePath == null ||
3d6dd312 156 typeof filePath !== 'string' ||
ffcbbad8
JB
157 (typeof filePath === 'string' && filePath.trim().length === 0)
158 ) {
c510fea7
APA
159 throw new Error('Please specify a file with a worker implementation')
160 }
3d6dd312
JB
161 if (!existsSync(filePath)) {
162 throw new Error(`Cannot find the worker file '${filePath}'`)
163 }
c510fea7
APA
164 }
165
8d3782fa
JB
166 private checkNumberOfWorkers (numberOfWorkers: number): void {
167 if (numberOfWorkers == null) {
168 throw new Error(
169 'Cannot instantiate a pool without specifying the number of workers'
170 )
78cea37e 171 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 172 throw new TypeError(
0d80593b 173 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
174 )
175 } else if (numberOfWorkers < 0) {
473c717a 176 throw new RangeError(
8d3782fa
JB
177 'Cannot instantiate a pool with a negative number of workers'
178 )
6b27d407 179 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
180 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
181 }
182 }
183
184 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 185 if (this.type === PoolTypes.dynamic) {
a5ed75b7 186 if (max == null) {
e695d66f 187 throw new TypeError(
a5ed75b7
JB
188 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
189 )
190 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
191 throw new TypeError(
192 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
193 )
194 } else if (min > max) {
079de991
JB
195 throw new RangeError(
196 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
197 )
b97d82d8 198 } else if (max === 0) {
079de991 199 throw new RangeError(
d640b48b 200 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
201 )
202 } else if (min === max) {
203 throw new RangeError(
204 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
205 )
206 }
8d3782fa
JB
207 }
208 }
209
7c0ba920 210 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
211 if (isPlainObject(opts)) {
212 this.opts.workerChoiceStrategy =
213 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
214 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
215 this.opts.workerChoiceStrategyOptions = {
216 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
217 ...opts.workerChoiceStrategyOptions
218 }
49be33fe
JB
219 this.checkValidWorkerChoiceStrategyOptions(
220 this.opts.workerChoiceStrategyOptions
221 )
1f68cede 222 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
223 this.opts.enableEvents = opts.enableEvents ?? true
224 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
225 if (this.opts.enableTasksQueue) {
226 this.checkValidTasksQueueOptions(
227 opts.tasksQueueOptions as TasksQueueOptions
228 )
229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
230 opts.tasksQueueOptions as TasksQueueOptions
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 235 }
aee46736
JB
236 }
237
238 private checkValidWorkerChoiceStrategy (
239 workerChoiceStrategy: WorkerChoiceStrategy
240 ): void {
241 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 242 throw new Error(
aee46736 243 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
244 )
245 }
7c0ba920
JB
246 }
247
0d80593b
JB
248 private checkValidWorkerChoiceStrategyOptions (
249 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
250 ): void {
251 if (!isPlainObject(workerChoiceStrategyOptions)) {
252 throw new TypeError(
253 'Invalid worker choice strategy options: must be a plain object'
254 )
255 }
8990357d
JB
256 if (
257 workerChoiceStrategyOptions.choiceRetries != null &&
258 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
259 ) {
260 throw new TypeError(
261 'Invalid worker choice strategy options: choice retries must be an integer'
262 )
263 }
264 if (
265 workerChoiceStrategyOptions.choiceRetries != null &&
266 workerChoiceStrategyOptions.choiceRetries <= 0
267 ) {
268 throw new RangeError(
269 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
270 )
271 }
49be33fe
JB
272 if (
273 workerChoiceStrategyOptions.weights != null &&
6b27d407 274 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
275 ) {
276 throw new Error(
277 'Invalid worker choice strategy options: must have a weight for each worker node'
278 )
279 }
f0d7f803
JB
280 if (
281 workerChoiceStrategyOptions.measurement != null &&
282 !Object.values(Measurements).includes(
283 workerChoiceStrategyOptions.measurement
284 )
285 ) {
286 throw new Error(
287 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
288 )
289 }
0d80593b
JB
290 }
291
a20f0ba5
JB
292 private checkValidTasksQueueOptions (
293 tasksQueueOptions: TasksQueueOptions
294 ): void {
0d80593b
JB
295 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
296 throw new TypeError('Invalid tasks queue options: must be a plain object')
297 }
f0d7f803
JB
298 if (
299 tasksQueueOptions?.concurrency != null &&
300 !Number.isSafeInteger(tasksQueueOptions.concurrency)
301 ) {
302 throw new TypeError(
20c6f652 303 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
304 )
305 }
306 if (
307 tasksQueueOptions?.concurrency != null &&
308 tasksQueueOptions.concurrency <= 0
309 ) {
e695d66f 310 throw new RangeError(
20c6f652
JB
311 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
312 )
313 }
314 if (
315 tasksQueueOptions?.queueMaxSize != null &&
316 !Number.isSafeInteger(tasksQueueOptions.queueMaxSize)
317 ) {
318 throw new TypeError(
319 'Invalid worker node tasks queue max size: must be an integer'
320 )
321 }
322 if (
323 tasksQueueOptions?.queueMaxSize != null &&
324 tasksQueueOptions.queueMaxSize <= 0
325 ) {
326 throw new RangeError(
327 `Invalid worker node tasks queue max size: ${tasksQueueOptions.queueMaxSize} is a negative integer or zero`
a20f0ba5
JB
328 )
329 }
330 }
331
e761c033
JB
332 private startPool (): void {
333 while (
334 this.workerNodes.reduce(
335 (accumulator, workerNode) =>
336 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
337 0
338 ) < this.numberOfWorkers
339 ) {
aa9eede8 340 this.createAndSetupWorkerNode()
e761c033
JB
341 }
342 }
343
08f3f44c 344 /** @inheritDoc */
6b27d407
JB
345 public get info (): PoolInfo {
346 return {
23ccf9d7 347 version,
6b27d407 348 type: this.type,
184855e6 349 worker: this.worker,
2431bdb4
JB
350 ready: this.ready,
351 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
352 minSize: this.minSize,
353 maxSize: this.maxSize,
c05f0d50
JB
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .runTime.aggregate &&
1305e9a8
JB
356 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
357 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
358 workerNodes: this.workerNodes.length,
359 idleWorkerNodes: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
f59e1027 361 workerNode.usage.tasks.executing === 0
a4e07f72
JB
362 ? accumulator + 1
363 : accumulator,
6b27d407
JB
364 0
365 ),
366 busyWorkerNodes: this.workerNodes.reduce(
367 (accumulator, workerNode) =>
f59e1027 368 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
369 0
370 ),
a4e07f72 371 executedTasks: this.workerNodes.reduce(
6b27d407 372 (accumulator, workerNode) =>
f59e1027 373 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
374 0
375 ),
376 executingTasks: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
379 0
380 ),
daf86646
JB
381 ...(this.opts.enableTasksQueue === true && {
382 queuedTasks: this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + workerNode.usage.tasks.queued,
385 0
386 )
387 }),
388 ...(this.opts.enableTasksQueue === true && {
389 maxQueuedTasks: this.workerNodes.reduce(
390 (accumulator, workerNode) =>
391 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
392 0
393 )
394 }),
a1763c54
JB
395 ...(this.opts.enableTasksQueue === true && {
396 backPressure: this.hasBackPressure()
397 }),
a4e07f72
JB
398 failedTasks: this.workerNodes.reduce(
399 (accumulator, workerNode) =>
f59e1027 400 accumulator + workerNode.usage.tasks.failed,
a4e07f72 401 0
1dcf8b7b
JB
402 ),
403 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
404 .runTime.aggregate && {
405 runTime: {
98e72cda
JB
406 minimum: round(
407 Math.min(
408 ...this.workerNodes.map(
8ebe6c30 409 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 410 )
1dcf8b7b
JB
411 )
412 ),
98e72cda
JB
413 maximum: round(
414 Math.max(
415 ...this.workerNodes.map(
8ebe6c30 416 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 417 )
1dcf8b7b 418 )
98e72cda
JB
419 ),
420 average: round(
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
424 0
425 ) /
426 this.workerNodes.reduce(
427 (accumulator, workerNode) =>
428 accumulator + (workerNode.usage.tasks?.executed ?? 0),
429 0
430 )
431 ),
432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
433 .runTime.median && {
434 median: round(
435 median(
436 this.workerNodes.map(
8ebe6c30 437 (workerNode) => workerNode.usage.runTime?.median ?? 0
98e72cda
JB
438 )
439 )
440 )
441 })
1dcf8b7b
JB
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
98e72cda
JB
447 minimum: round(
448 Math.min(
449 ...this.workerNodes.map(
8ebe6c30 450 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 451 )
1dcf8b7b
JB
452 )
453 ),
98e72cda
JB
454 maximum: round(
455 Math.max(
456 ...this.workerNodes.map(
8ebe6c30 457 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 458 )
1dcf8b7b 459 )
98e72cda
JB
460 ),
461 average: round(
462 this.workerNodes.reduce(
463 (accumulator, workerNode) =>
464 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
465 0
466 ) /
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 accumulator + (workerNode.usage.tasks?.executed ?? 0),
470 0
471 )
472 ),
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
477 this.workerNodes.map(
8ebe6c30 478 (workerNode) => workerNode.usage.waitTime?.median ?? 0
98e72cda
JB
479 )
480 )
481 )
482 })
1dcf8b7b
JB
483 }
484 })
6b27d407
JB
485 }
486 }
08f3f44c 487
aa9eede8
JB
488 /**
489 * The pool readiness boolean status.
490 */
2431bdb4
JB
491 private get ready (): boolean {
492 return (
b97d82d8
JB
493 this.workerNodes.reduce(
494 (accumulator, workerNode) =>
495 !workerNode.info.dynamic && workerNode.info.ready
496 ? accumulator + 1
497 : accumulator,
498 0
499 ) >= this.minSize
2431bdb4
JB
500 )
501 }
502
afe0d5bf 503 /**
aa9eede8 504 * The approximate pool utilization.
afe0d5bf
JB
505 *
506 * @returns The pool utilization.
507 */
508 private get utilization (): number {
8e5ca040 509 const poolTimeCapacity =
fe7d90db 510 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
511 const totalTasksRunTime = this.workerNodes.reduce(
512 (accumulator, workerNode) =>
71514351 513 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
514 0
515 )
516 const totalTasksWaitTime = this.workerNodes.reduce(
517 (accumulator, workerNode) =>
71514351 518 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
519 0
520 )
8e5ca040 521 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
522 }
523
8881ae32 524 /**
aa9eede8 525 * The pool type.
8881ae32
JB
526 *
527 * If it is `'dynamic'`, it provides the `max` property.
528 */
529 protected abstract get type (): PoolType
530
184855e6 531 /**
aa9eede8 532 * The worker type.
184855e6
JB
533 */
534 protected abstract get worker (): WorkerType
535
c2ade475 536 /**
aa9eede8 537 * The pool minimum size.
c2ade475 538 */
8735b4e5
JB
539 protected get minSize (): number {
540 return this.numberOfWorkers
541 }
ff733df7
JB
542
543 /**
aa9eede8 544 * The pool maximum size.
ff733df7 545 */
8735b4e5
JB
546 protected get maxSize (): number {
547 return this.max ?? this.numberOfWorkers
548 }
a35560ba 549
6b813701
JB
550 /**
551 * Checks if the worker id sent in the received message from a worker is valid.
552 *
553 * @param message - The received message.
554 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
555 */
21f710aa 556 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
557 if (message.workerId == null) {
558 throw new Error('Worker message received without worker id')
559 } else if (
21f710aa 560 message.workerId != null &&
aad6fb64 561 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
562 ) {
563 throw new Error(
564 `Worker message received from unknown worker '${message.workerId}'`
565 )
566 }
567 }
568
ffcbbad8 569 /**
f06e48d8 570 * Gets the given worker its worker node key.
ffcbbad8
JB
571 *
572 * @param worker - The worker.
f59e1027 573 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 574 */
aad6fb64 575 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 576 return this.workerNodes.findIndex(
8ebe6c30 577 (workerNode) => workerNode.worker === worker
f06e48d8 578 )
bf9549ae
JB
579 }
580
aa9eede8
JB
581 /**
582 * Gets the worker node key given its worker id.
583 *
584 * @param workerId - The worker id.
aad6fb64 585 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 586 */
aad6fb64
JB
587 private getWorkerNodeKeyByWorkerId (workerId: number): number {
588 return this.workerNodes.findIndex(
8ebe6c30 589 (workerNode) => workerNode.info.id === workerId
aad6fb64 590 )
aa9eede8
JB
591 }
592
afc003b2 593 /** @inheritDoc */
a35560ba 594 public setWorkerChoiceStrategy (
59219cbb
JB
595 workerChoiceStrategy: WorkerChoiceStrategy,
596 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 597 ): void {
aee46736 598 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 599 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
600 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
601 this.opts.workerChoiceStrategy
602 )
603 if (workerChoiceStrategyOptions != null) {
604 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
605 }
aa9eede8 606 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 607 workerNode.resetUsage()
9edb9717 608 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 609 }
a20f0ba5
JB
610 }
611
612 /** @inheritDoc */
613 public setWorkerChoiceStrategyOptions (
614 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
615 ): void {
0d80593b 616 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
617 this.opts.workerChoiceStrategyOptions = {
618 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
619 ...workerChoiceStrategyOptions
620 }
a20f0ba5
JB
621 this.workerChoiceStrategyContext.setOptions(
622 this.opts.workerChoiceStrategyOptions
a35560ba
S
623 )
624 }
625
a20f0ba5 626 /** @inheritDoc */
8f52842f
JB
627 public enableTasksQueue (
628 enable: boolean,
629 tasksQueueOptions?: TasksQueueOptions
630 ): void {
a20f0ba5 631 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 632 this.flushTasksQueues()
a20f0ba5
JB
633 }
634 this.opts.enableTasksQueue = enable
8f52842f 635 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
636 }
637
638 /** @inheritDoc */
8f52842f 639 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 640 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
641 this.checkValidTasksQueueOptions(tasksQueueOptions)
642 this.opts.tasksQueueOptions =
643 this.buildTasksQueueOptions(tasksQueueOptions)
20c6f652
JB
644 this.setTasksQueueMaxSize(
645 this.opts.tasksQueueOptions.queueMaxSize as number
646 )
5baee0d7 647 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
20c6f652
JB
652 private setTasksQueueMaxSize (queueMaxSize: number): void {
653 for (const workerNode of this.workerNodes) {
654 workerNode.tasksQueueBackPressureSize = queueMaxSize
655 }
656 }
657
a20f0ba5
JB
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
20c6f652
JB
662 ...{
663 queueMaxSize: Math.pow(this.maxSize, 2),
664 concurrency: 1
665 },
666 ...tasksQueueOptions
a20f0ba5
JB
667 }
668 }
669
c319c66b
JB
670 /**
671 * Whether the pool is full or not.
672 *
673 * The pool filling boolean status.
674 */
dea903a8
JB
675 protected get full (): boolean {
676 return this.workerNodes.length >= this.maxSize
677 }
c2ade475 678
c319c66b
JB
679 /**
680 * Whether the pool is busy or not.
681 *
682 * The pool busyness boolean status.
683 */
684 protected abstract get busy (): boolean
7c0ba920 685
6c6afb84 686 /**
3d76750a 687 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
688 *
689 * @returns Worker nodes busyness boolean status.
690 */
c2ade475 691 protected internalBusy (): boolean {
3d76750a
JB
692 if (this.opts.enableTasksQueue === true) {
693 return (
694 this.workerNodes.findIndex(
8ebe6c30 695 (workerNode) =>
3d76750a
JB
696 workerNode.info.ready &&
697 workerNode.usage.tasks.executing <
698 (this.opts.tasksQueueOptions?.concurrency as number)
699 ) === -1
700 )
701 } else {
702 return (
703 this.workerNodes.findIndex(
8ebe6c30 704 (workerNode) =>
3d76750a
JB
705 workerNode.info.ready && workerNode.usage.tasks.executing === 0
706 ) === -1
707 )
708 }
cb70b19d
JB
709 }
710
90d7d101
JB
711 /** @inheritDoc */
712 public listTaskFunctions (): string[] {
f2dbbf95
JB
713 for (const workerNode of this.workerNodes) {
714 if (
715 Array.isArray(workerNode.info.taskFunctions) &&
716 workerNode.info.taskFunctions.length > 0
717 ) {
718 return workerNode.info.taskFunctions
719 }
90d7d101 720 }
f2dbbf95 721 return []
90d7d101
JB
722 }
723
afc003b2 724 /** @inheritDoc */
7d91a8cd
JB
725 public async execute (
726 data?: Data,
727 name?: string,
728 transferList?: TransferListItem[]
729 ): Promise<Response> {
52b71763 730 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
731 if (!this.started) {
732 reject(new Error('Cannot execute a task on destroyed pool'))
733 }
7d91a8cd
JB
734 if (name != null && typeof name !== 'string') {
735 reject(new TypeError('name argument must be a string'))
736 }
90d7d101
JB
737 if (
738 name != null &&
739 typeof name === 'string' &&
740 name.trim().length === 0
741 ) {
f58b60b9 742 reject(new TypeError('name argument must not be an empty string'))
90d7d101 743 }
b558f6b5
JB
744 if (transferList != null && !Array.isArray(transferList)) {
745 reject(new TypeError('transferList argument must be an array'))
746 }
747 const timestamp = performance.now()
748 const workerNodeKey = this.chooseWorkerNode()
94407def 749 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
750 if (
751 name != null &&
a5d15204
JB
752 Array.isArray(workerInfo.taskFunctions) &&
753 !workerInfo.taskFunctions.includes(name)
cea399c8 754 ) {
90d7d101
JB
755 reject(
756 new Error(`Task function '${name}' is not registered in the pool`)
757 )
758 }
501aea93 759 const task: Task<Data> = {
52b71763
JB
760 name: name ?? DEFAULT_TASK_NAME,
761 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
762 data: data ?? ({} as Data),
7d91a8cd 763 transferList,
52b71763 764 timestamp,
a5d15204 765 workerId: workerInfo.id as number,
7629bdf1 766 taskId: randomUUID()
52b71763 767 }
7629bdf1 768 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
769 resolve,
770 reject,
501aea93 771 workerNodeKey
2e81254d 772 })
52b71763 773 if (
4e377863
JB
774 this.opts.enableTasksQueue === false ||
775 (this.opts.enableTasksQueue === true &&
776 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 777 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 778 ) {
501aea93 779 this.executeTask(workerNodeKey, task)
4e377863
JB
780 } else {
781 this.enqueueTask(workerNodeKey, task)
52b71763 782 }
2e81254d 783 })
280c2a77 784 }
c97c7edb 785
afc003b2 786 /** @inheritDoc */
c97c7edb 787 public async destroy (): Promise<void> {
1fbcaa7c 788 await Promise.all(
81c02522 789 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 790 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
791 })
792 )
33e6bb4c 793 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 794 this.started = false
c97c7edb
S
795 }
796
1e3214b6
JB
797 protected async sendKillMessageToWorker (
798 workerNodeKey: number,
799 workerId: number
800 ): Promise<void> {
9edb9717 801 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
802 this.registerWorkerMessageListener(workerNodeKey, (message) => {
803 if (message.kill === 'success') {
804 resolve()
805 } else if (message.kill === 'failure') {
e1af34e6 806 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
807 }
808 })
9edb9717 809 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 810 })
1e3214b6
JB
811 }
812
4a6952ff 813 /**
aa9eede8 814 * Terminates the worker node given its worker node key.
4a6952ff 815 *
aa9eede8 816 * @param workerNodeKey - The worker node key.
4a6952ff 817 */
81c02522 818 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 819
729c563d 820 /**
6677a3d3
JB
821 * Setup hook to execute code before worker nodes are created in the abstract constructor.
822 * Can be overridden.
afc003b2
JB
823 *
824 * @virtual
729c563d 825 */
280c2a77 826 protected setupHook (): void {
d99ba5a8 827 // Intentionally empty
280c2a77 828 }
c97c7edb 829
729c563d 830 /**
280c2a77
S
831 * Should return whether the worker is the main worker or not.
832 */
833 protected abstract isMain (): boolean
834
835 /**
2e81254d 836 * Hook executed before the worker task execution.
bf9549ae 837 * Can be overridden.
729c563d 838 *
f06e48d8 839 * @param workerNodeKey - The worker node key.
1c6fe997 840 * @param task - The task to execute.
729c563d 841 */
1c6fe997
JB
842 protected beforeTaskExecutionHook (
843 workerNodeKey: number,
844 task: Task<Data>
845 ): void {
94407def
JB
846 if (this.workerNodes[workerNodeKey]?.usage != null) {
847 const workerUsage = this.workerNodes[workerNodeKey].usage
848 ++workerUsage.tasks.executing
849 this.updateWaitTimeWorkerUsage(workerUsage, task)
850 }
851 if (
852 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
853 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
854 task.name as string
855 ) != null
856 ) {
db0e38ee 857 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 858 workerNodeKey
db0e38ee 859 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
860 ++taskFunctionWorkerUsage.tasks.executing
861 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 862 }
c97c7edb
S
863 }
864
c01733f1 865 /**
2e81254d 866 * Hook executed after the worker task execution.
bf9549ae 867 * Can be overridden.
c01733f1 868 *
501aea93 869 * @param workerNodeKey - The worker node key.
38e795c1 870 * @param message - The received message.
c01733f1 871 */
2e81254d 872 protected afterTaskExecutionHook (
501aea93 873 workerNodeKey: number,
2740a743 874 message: MessageValue<Response>
bf9549ae 875 ): void {
94407def
JB
876 if (this.workerNodes[workerNodeKey]?.usage != null) {
877 const workerUsage = this.workerNodes[workerNodeKey].usage
878 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
879 this.updateRunTimeWorkerUsage(workerUsage, message)
880 this.updateEluWorkerUsage(workerUsage, message)
881 }
882 if (
883 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
884 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 885 message.taskPerformance?.name as string
94407def
JB
886 ) != null
887 ) {
db0e38ee 888 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 889 workerNodeKey
db0e38ee 890 ].getTaskFunctionWorkerUsage(
0628755c 891 message.taskPerformance?.name as string
b558f6b5 892 ) as WorkerUsage
db0e38ee
JB
893 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
894 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
895 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
896 }
897 }
898
db0e38ee
JB
899 /**
900 * Whether the worker node shall update its task function worker usage or not.
901 *
902 * @param workerNodeKey - The worker node key.
903 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
904 */
905 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 906 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 907 return (
94407def 908 workerInfo != null &&
a5d15204 909 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 910 workerInfo.taskFunctions.length > 2
b558f6b5 911 )
f1c06930
JB
912 }
913
914 private updateTaskStatisticsWorkerUsage (
915 workerUsage: WorkerUsage,
916 message: MessageValue<Response>
917 ): void {
a4e07f72 918 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
919 if (
920 workerTaskStatistics.executing != null &&
921 workerTaskStatistics.executing > 0
922 ) {
923 --workerTaskStatistics.executing
924 } else if (
925 workerTaskStatistics.executing != null &&
926 workerTaskStatistics.executing < 0
927 ) {
928 throw new Error(
cf6e3991 929 'Worker usage statistic for tasks executing cannot be negative'
5bb5be17
JB
930 )
931 }
98e72cda
JB
932 if (message.taskError == null) {
933 ++workerTaskStatistics.executed
934 } else {
a4e07f72 935 ++workerTaskStatistics.failed
2740a743 936 }
f8eb0a2a
JB
937 }
938
a4e07f72
JB
939 private updateRunTimeWorkerUsage (
940 workerUsage: WorkerUsage,
f8eb0a2a
JB
941 message: MessageValue<Response>
942 ): void {
e4f20deb
JB
943 updateMeasurementStatistics(
944 workerUsage.runTime,
945 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
946 message.taskPerformance?.runTime ?? 0,
947 workerUsage.tasks.executed
948 )
f8eb0a2a
JB
949 }
950
a4e07f72
JB
951 private updateWaitTimeWorkerUsage (
952 workerUsage: WorkerUsage,
1c6fe997 953 task: Task<Data>
f8eb0a2a 954 ): void {
1c6fe997
JB
955 const timestamp = performance.now()
956 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
957 updateMeasurementStatistics(
958 workerUsage.waitTime,
959 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
960 taskWaitTime,
961 workerUsage.tasks.executed
962 )
c01733f1 963 }
964
a4e07f72 965 private updateEluWorkerUsage (
5df69fab 966 workerUsage: WorkerUsage,
62c15a68
JB
967 message: MessageValue<Response>
968 ): void {
008512c7
JB
969 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
970 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
971 updateMeasurementStatistics(
972 workerUsage.elu.active,
008512c7 973 eluTaskStatisticsRequirements,
e4f20deb
JB
974 message.taskPerformance?.elu?.active ?? 0,
975 workerUsage.tasks.executed
976 )
977 updateMeasurementStatistics(
978 workerUsage.elu.idle,
008512c7 979 eluTaskStatisticsRequirements,
e4f20deb
JB
980 message.taskPerformance?.elu?.idle ?? 0,
981 workerUsage.tasks.executed
982 )
008512c7 983 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 984 if (message.taskPerformance?.elu != null) {
f7510105
JB
985 if (workerUsage.elu.utilization != null) {
986 workerUsage.elu.utilization =
987 (workerUsage.elu.utilization +
988 message.taskPerformance.elu.utilization) /
989 2
990 } else {
991 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
992 }
62c15a68
JB
993 }
994 }
995 }
996
280c2a77 997 /**
f06e48d8 998 * Chooses a worker node for the next task.
280c2a77 999 *
6c6afb84 1000 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1001 *
aa9eede8 1002 * @returns The chosen worker node key
280c2a77 1003 */
6c6afb84 1004 private chooseWorkerNode (): number {
930dcf12 1005 if (this.shallCreateDynamicWorker()) {
aa9eede8 1006 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1007 if (
b1aae695 1008 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1009 ) {
aa9eede8 1010 return workerNodeKey
6c6afb84 1011 }
17393ac8 1012 }
930dcf12
JB
1013 return this.workerChoiceStrategyContext.execute()
1014 }
1015
6c6afb84
JB
1016 /**
1017 * Conditions for dynamic worker creation.
1018 *
1019 * @returns Whether to create a dynamic worker or not.
1020 */
1021 private shallCreateDynamicWorker (): boolean {
930dcf12 1022 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1023 }
1024
280c2a77 1025 /**
aa9eede8 1026 * Sends a message to worker given its worker node key.
280c2a77 1027 *
aa9eede8 1028 * @param workerNodeKey - The worker node key.
38e795c1 1029 * @param message - The message.
7d91a8cd 1030 * @param transferList - The optional array of transferable objects.
280c2a77
S
1031 */
1032 protected abstract sendToWorker (
aa9eede8 1033 workerNodeKey: number,
7d91a8cd
JB
1034 message: MessageValue<Data>,
1035 transferList?: TransferListItem[]
280c2a77
S
1036 ): void
1037
729c563d 1038 /**
41344292 1039 * Creates a new worker.
6c6afb84
JB
1040 *
1041 * @returns Newly created worker.
729c563d 1042 */
280c2a77 1043 protected abstract createWorker (): Worker
c97c7edb 1044
4a6952ff 1045 /**
aa9eede8 1046 * Creates a new, completely set up worker node.
4a6952ff 1047 *
aa9eede8 1048 * @returns New, completely set up worker node key.
4a6952ff 1049 */
aa9eede8 1050 protected createAndSetupWorkerNode (): number {
bdacc2d2 1051 const worker = this.createWorker()
280c2a77 1052
fd04474e 1053 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1054 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1055 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1056 worker.on('error', (error) => {
aad6fb64 1057 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1058 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1059 workerInfo.ready = false
0dc838e3 1060 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1061 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1062 if (
1063 this.opts.restartWorkerOnError === true &&
1064 !this.starting &&
1065 this.started
1066 ) {
9b106837 1067 if (workerInfo.dynamic) {
aa9eede8 1068 this.createAndSetupDynamicWorkerNode()
8a1260a3 1069 } else {
aa9eede8 1070 this.createAndSetupWorkerNode()
8a1260a3 1071 }
5baee0d7 1072 }
19dbc45b 1073 if (this.opts.enableTasksQueue === true) {
9b106837 1074 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1075 }
5baee0d7 1076 })
a35560ba 1077 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1078 worker.once('exit', () => {
f06e48d8 1079 this.removeWorkerNode(worker)
a974afa6 1080 })
280c2a77 1081
aa9eede8 1082 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1083
aa9eede8 1084 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1085
aa9eede8 1086 return workerNodeKey
c97c7edb 1087 }
be0676b3 1088
930dcf12 1089 /**
aa9eede8 1090 * Creates a new, completely set up dynamic worker node.
930dcf12 1091 *
aa9eede8 1092 * @returns New, completely set up dynamic worker node key.
930dcf12 1093 */
aa9eede8
JB
1094 protected createAndSetupDynamicWorkerNode (): number {
1095 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1096 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1097 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1098 message.workerId
aad6fb64 1099 )
aa9eede8 1100 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1101 // Kill message received from worker
930dcf12
JB
1102 if (
1103 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1104 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1105 ((this.opts.enableTasksQueue === false &&
aa9eede8 1106 workerUsage.tasks.executing === 0) ||
7b56f532 1107 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1108 workerUsage.tasks.executing === 0 &&
1109 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1110 ) {
5270d253
JB
1111 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1112 this.emitter?.emit(PoolEvents.error, error)
1113 })
930dcf12
JB
1114 }
1115 })
94407def 1116 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1117 this.sendToWorker(workerNodeKey, {
b0a4db63 1118 checkActive: true,
21f710aa
JB
1119 workerId: workerInfo.id as number
1120 })
b5e113f6 1121 workerInfo.dynamic = true
b1aae695
JB
1122 if (
1123 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1124 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1125 ) {
b5e113f6
JB
1126 workerInfo.ready = true
1127 }
33e6bb4c 1128 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1129 return workerNodeKey
930dcf12
JB
1130 }
1131
a2ed5053 1132 /**
aa9eede8 1133 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1134 *
aa9eede8 1135 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1136 * @param listener - The message listener callback.
1137 */
85aeb3f3
JB
1138 protected abstract registerWorkerMessageListener<
1139 Message extends Data | Response
aa9eede8
JB
1140 >(
1141 workerNodeKey: number,
1142 listener: (message: MessageValue<Message>) => void
1143 ): void
a2ed5053
JB
1144
1145 /**
aa9eede8 1146 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1147 * Can be overridden.
1148 *
aa9eede8 1149 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1150 */
aa9eede8 1151 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1152 // Listen to worker messages.
aa9eede8 1153 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1154 // Send the startup message to worker.
aa9eede8 1155 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1156 // Send the statistics message to worker.
1157 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86
JB
1158 if (this.opts.enableTasksQueue === true) {
1159 this.workerNodes[workerNodeKey].onBackPressure =
1160 this.tasksStealingOnBackPressure.bind(this)
1161 }
d2c73f82
JB
1162 }
1163
85aeb3f3 1164 /**
aa9eede8
JB
1165 * Sends the startup message to worker given its worker node key.
1166 *
1167 * @param workerNodeKey - The worker node key.
1168 */
1169 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1170
1171 /**
9edb9717 1172 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1173 *
aa9eede8 1174 * @param workerNodeKey - The worker node key.
85aeb3f3 1175 */
9edb9717 1176 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1177 this.sendToWorker(workerNodeKey, {
1178 statistics: {
1179 runTime:
1180 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1181 .runTime.aggregate,
1182 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1183 .elu.aggregate
1184 },
94407def 1185 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1186 })
1187 }
a2ed5053
JB
1188
1189 private redistributeQueuedTasks (workerNodeKey: number): void {
1190 while (this.tasksQueueSize(workerNodeKey) > 0) {
1191 let targetWorkerNodeKey: number = workerNodeKey
1192 let minQueuedTasks = Infinity
10ecf8fd 1193 let executeTask = false
a2ed5053 1194 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
72695f86
JB
1195 if (
1196 this.workerNodes[workerNodeId].usage.tasks.executing <
1197 (this.opts.tasksQueueOptions?.concurrency as number)
1198 ) {
1199 executeTask = true
1200 }
a2ed5053
JB
1201 if (
1202 workerNodeId !== workerNodeKey &&
72695f86 1203 workerNode.info.ready &&
a2ed5053
JB
1204 workerNode.usage.tasks.queued === 0
1205 ) {
1206 targetWorkerNodeKey = workerNodeId
1207 break
1208 }
1209 if (
1210 workerNodeId !== workerNodeKey &&
72695f86 1211 workerNode.info.ready &&
a2ed5053
JB
1212 workerNode.usage.tasks.queued < minQueuedTasks
1213 ) {
1214 minQueuedTasks = workerNode.usage.tasks.queued
1215 targetWorkerNodeKey = workerNodeId
1216 }
1217 }
10ecf8fd
JB
1218 if (executeTask) {
1219 this.executeTask(
1220 targetWorkerNodeKey,
251d6ac2 1221 this.dequeueTask(workerNodeKey) as Task<Data>
10ecf8fd
JB
1222 )
1223 } else {
1224 this.enqueueTask(
1225 targetWorkerNodeKey,
251d6ac2 1226 this.dequeueTask(workerNodeKey) as Task<Data>
72695f86
JB
1227 )
1228 }
1229 }
1230 }
1231
1232 private tasksStealingOnBackPressure (workerId: number): void {
1233 const sourceWorkerNode =
1234 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1235 const workerNodes = this.workerNodes
1236 .filter((workerNode) => workerNode.info.id !== workerId)
1237 .sort(
1238 (workerNodeA, workerNodeB) =>
1239 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1240 )
1241 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1242 if (
1243 workerNode.info.ready &&
1244 sourceWorkerNode.usage.tasks.queued > 0 &&
1245 !workerNode.hasBackPressure() &&
1246 workerNode.usage.tasks.executing <
1247 (this.opts.tasksQueueOptions?.concurrency as number)
1248 ) {
1249 this.executeTask(
1250 workerNodeKey,
1251 sourceWorkerNode.popTask() as Task<Data>
1252 )
1253 } else if (
1254 workerNode.info.ready &&
1255 sourceWorkerNode.usage.tasks.queued > 0 &&
1256 !workerNode.hasBackPressure() &&
1257 workerNode.usage.tasks.executing >=
1258 (this.opts.tasksQueueOptions?.concurrency as number)
1259 ) {
1260 this.enqueueTask(
1261 workerNodeKey,
1262 sourceWorkerNode.popTask() as Task<Data>
10ecf8fd
JB
1263 )
1264 }
a2ed5053
JB
1265 }
1266 }
1267
be0676b3 1268 /**
aa9eede8 1269 * This method is the listener registered for each worker message.
be0676b3 1270 *
bdacc2d2 1271 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1272 */
1273 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1274 return (message) => {
21f710aa 1275 this.checkMessageWorkerId(message)
a5d15204 1276 if (message.ready != null && message.taskFunctions != null) {
81c02522 1277 // Worker ready response received from worker
10e2aa7e 1278 this.handleWorkerReadyResponse(message)
7629bdf1 1279 } else if (message.taskId != null) {
81c02522 1280 // Task execution response received from worker
6b272951 1281 this.handleTaskExecutionResponse(message)
90d7d101
JB
1282 } else if (message.taskFunctions != null) {
1283 // Task functions message received from worker
94407def
JB
1284 (
1285 this.getWorkerInfo(
1286 this.getWorkerNodeKeyByWorkerId(message.workerId)
1287 ) as WorkerInfo
b558f6b5 1288 ).taskFunctions = message.taskFunctions
6b272951
JB
1289 }
1290 }
1291 }
1292
10e2aa7e 1293 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1294 if (message.ready === false) {
1295 throw new Error(`Worker ${message.workerId} failed to initialize`)
1296 }
a5d15204 1297 const workerInfo = this.getWorkerInfo(
aad6fb64 1298 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1299 ) as WorkerInfo
a5d15204
JB
1300 workerInfo.ready = message.ready as boolean
1301 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1302 if (this.emitter != null && this.ready) {
1303 this.emitter.emit(PoolEvents.ready, this.info)
1304 }
6b272951
JB
1305 }
1306
1307 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1308 const { taskId, taskError, data } = message
1309 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1310 if (promiseResponse != null) {
5441aea6
JB
1311 if (taskError != null) {
1312 this.emitter?.emit(PoolEvents.taskError, taskError)
1313 promiseResponse.reject(taskError.message)
6b272951 1314 } else {
5441aea6 1315 promiseResponse.resolve(data as Response)
6b272951 1316 }
501aea93
JB
1317 const workerNodeKey = promiseResponse.workerNodeKey
1318 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1319 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1320 if (
1321 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1322 this.tasksQueueSize(workerNodeKey) > 0 &&
1323 this.workerNodes[workerNodeKey].usage.tasks.executing <
1324 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1325 ) {
1326 this.executeTask(
1327 workerNodeKey,
1328 this.dequeueTask(workerNodeKey) as Task<Data>
1329 )
be0676b3 1330 }
6b272951 1331 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1332 }
be0676b3 1333 }
7c0ba920 1334
a1763c54 1335 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1336 if (this.busy) {
1337 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1338 }
1339 }
1340
1341 private checkAndEmitTaskQueuingEvents (): void {
1342 if (this.hasBackPressure()) {
1343 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1344 }
1345 }
1346
33e6bb4c
JB
1347 private checkAndEmitDynamicWorkerCreationEvents (): void {
1348 if (this.type === PoolTypes.dynamic) {
1349 if (this.full) {
1350 this.emitter?.emit(PoolEvents.full, this.info)
1351 }
1352 }
1353 }
1354
8a1260a3 1355 /**
aa9eede8 1356 * Gets the worker information given its worker node key.
8a1260a3
JB
1357 *
1358 * @param workerNodeKey - The worker node key.
3f09ed9f 1359 * @returns The worker information.
8a1260a3 1360 */
94407def
JB
1361 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1362 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1363 }
1364
a05c10de 1365 /**
b0a4db63 1366 * Adds the given worker in the pool worker nodes.
ea7a90d3 1367 *
38e795c1 1368 * @param worker - The worker.
aa9eede8
JB
1369 * @returns The added worker node key.
1370 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1371 */
b0a4db63 1372 private addWorkerNode (worker: Worker): number {
671d5154
JB
1373 const workerNode = new WorkerNode<Worker, Data>(
1374 worker,
1375 this.worker,
20c6f652 1376 this.opts.tasksQueueOptions?.queueMaxSize ?? Math.pow(this.maxSize, 2)
671d5154 1377 )
b97d82d8 1378 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1379 if (this.starting) {
1380 workerNode.info.ready = true
1381 }
aa9eede8 1382 this.workerNodes.push(workerNode)
aad6fb64 1383 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1384 if (workerNodeKey === -1) {
a5d15204 1385 throw new Error('Worker node added not found')
aa9eede8
JB
1386 }
1387 return workerNodeKey
ea7a90d3 1388 }
c923ce56 1389
51fe3d3c 1390 /**
f06e48d8 1391 * Removes the given worker from the pool worker nodes.
51fe3d3c 1392 *
f06e48d8 1393 * @param worker - The worker.
51fe3d3c 1394 */
416fd65c 1395 private removeWorkerNode (worker: Worker): void {
aad6fb64 1396 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1397 if (workerNodeKey !== -1) {
1398 this.workerNodes.splice(workerNodeKey, 1)
1399 this.workerChoiceStrategyContext.remove(workerNodeKey)
1400 }
51fe3d3c 1401 }
adc3c320 1402
e2b31e32
JB
1403 /** @inheritDoc */
1404 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1405 return (
e2b31e32
JB
1406 this.opts.enableTasksQueue === true &&
1407 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1408 )
1409 }
1410
1411 private hasBackPressure (): boolean {
1412 return (
1413 this.opts.enableTasksQueue === true &&
1414 this.workerNodes.findIndex(
1415 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1416 ) === -1
9e844245 1417 )
e2b31e32
JB
1418 }
1419
b0a4db63 1420 /**
aa9eede8 1421 * Executes the given task on the worker given its worker node key.
b0a4db63 1422 *
aa9eede8 1423 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1424 * @param task - The task to execute.
1425 */
2e81254d 1426 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1427 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1428 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1429 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1430 }
1431
f9f00b5f 1432 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1433 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1434 this.checkAndEmitTaskQueuingEvents()
1435 return tasksQueueSize
adc3c320
JB
1436 }
1437
416fd65c 1438 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1439 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1440 }
1441
416fd65c 1442 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1443 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1444 }
1445
81c02522 1446 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1447 while (this.tasksQueueSize(workerNodeKey) > 0) {
1448 this.executeTask(
1449 workerNodeKey,
1450 this.dequeueTask(workerNodeKey) as Task<Data>
1451 )
ff733df7 1452 }
4b628b48 1453 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1454 }
1455
ef41a6e6
JB
1456 private flushTasksQueues (): void {
1457 for (const [workerNodeKey] of this.workerNodes.entries()) {
1458 this.flushTasksQueue(workerNodeKey)
1459 }
1460 }
c97c7edb 1461}