test: add missing pool destroy() calls
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
afe0d5bf 17 median,
e4f20deb
JB
18 round,
19 updateMeasurementStatistics
bbeadd16 20} from '../utils'
59317253 21import { KillBehaviors } from '../worker/worker-options'
c4855468 22import {
65d7a1c9 23 type IPool,
7c5a1080 24 PoolEmitter,
c4855468 25 PoolEvents,
6b27d407 26 type PoolInfo,
c4855468 27 type PoolOptions,
6b27d407
JB
28 type PoolType,
29 PoolTypes,
4b628b48 30 type TasksQueueOptions
c4855468 31} from './pool'
bbfa38a2
JB
32import type {
33 IWorker,
34 IWorkerNode,
35 WorkerInfo,
36 WorkerType,
37 WorkerUsage
e102732c 38} from './worker'
a35560ba 39import {
008512c7 40 type MeasurementStatisticsRequirements,
f0d7f803 41 Measurements,
a35560ba 42 WorkerChoiceStrategies,
a20f0ba5
JB
43 type WorkerChoiceStrategy,
44 type WorkerChoiceStrategyOptions
bdaf31cd
JB
45} from './selection-strategies/selection-strategies-types'
46import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 47import { version } from './version'
4b628b48 48import { WorkerNode } from './worker-node'
23ccf9d7 49
729c563d 50/**
ea7a90d3 51 * Base class that implements some shared logic for all poolifier pools.
729c563d 52 *
38e795c1 53 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
54 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
55 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 56 */
c97c7edb 57export abstract class AbstractPool<
f06e48d8 58 Worker extends IWorker,
d3c8a1a8
S
59 Data = unknown,
60 Response = unknown
c4855468 61> implements IPool<Worker, Data, Response> {
afc003b2 62 /** @inheritDoc */
4b628b48 63 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 64
afc003b2 65 /** @inheritDoc */
7c0ba920
JB
66 public readonly emitter?: PoolEmitter
67
be0676b3 68 /**
52b71763 69 * The task execution response promise map.
be0676b3 70 *
2740a743 71 * - `key`: The message id of each submitted task.
a3445496 72 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 73 *
a3445496 74 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 75 */
501aea93
JB
76 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
77 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
8735b4e5
JB
88 /**
89 * Dynamic pool maximum size property placeholder.
90 */
91 protected readonly max?: number
92
075e51d1 93 /**
adc9cc64 94 * Whether the pool is starting or not.
075e51d1
JB
95 */
96 private readonly starting: boolean
15b176e0
JB
97 /**
98 * Whether the pool is started or not.
99 */
100 private started: boolean
afe0d5bf
JB
101 /**
102 * The start timestamp of the pool.
103 */
104 private readonly startTimestamp
105
729c563d
S
106 /**
107 * Constructs a new poolifier pool.
108 *
38e795c1 109 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 110 * @param filePath - Path to the worker file.
38e795c1 111 * @param opts - Options for the pool.
729c563d 112 */
c97c7edb 113 public constructor (
b4213b7f
JB
114 protected readonly numberOfWorkers: number,
115 protected readonly filePath: string,
116 protected readonly opts: PoolOptions<Worker>
c97c7edb 117 ) {
78cea37e 118 if (!this.isMain()) {
04f45163 119 throw new Error(
8c6d4acf 120 'Cannot start a pool from a worker with the same type as the pool'
04f45163 121 )
c97c7edb 122 }
8d3782fa 123 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 124 this.checkFilePath(this.filePath)
7c0ba920 125 this.checkPoolOptions(this.opts)
1086026a 126
7254e419
JB
127 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
128 this.executeTask = this.executeTask.bind(this)
129 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 130
6bd72cd0 131 if (this.opts.enableEvents === true) {
7c0ba920
JB
132 this.emitter = new PoolEmitter()
133 }
d59df138
JB
134 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
135 Worker,
136 Data,
137 Response
da309861
JB
138 >(
139 this,
140 this.opts.workerChoiceStrategy,
141 this.opts.workerChoiceStrategyOptions
142 )
b6b32453
JB
143
144 this.setupHook()
145
075e51d1 146 this.starting = true
e761c033 147 this.startPool()
075e51d1 148 this.starting = false
15b176e0 149 this.started = true
afe0d5bf
JB
150
151 this.startTimestamp = performance.now()
c97c7edb
S
152 }
153
a35560ba 154 private checkFilePath (filePath: string): void {
ffcbbad8
JB
155 if (
156 filePath == null ||
3d6dd312 157 typeof filePath !== 'string' ||
ffcbbad8
JB
158 (typeof filePath === 'string' && filePath.trim().length === 0)
159 ) {
c510fea7
APA
160 throw new Error('Please specify a file with a worker implementation')
161 }
3d6dd312
JB
162 if (!existsSync(filePath)) {
163 throw new Error(`Cannot find the worker file '${filePath}'`)
164 }
c510fea7
APA
165 }
166
8d3782fa
JB
167 private checkNumberOfWorkers (numberOfWorkers: number): void {
168 if (numberOfWorkers == null) {
169 throw new Error(
170 'Cannot instantiate a pool without specifying the number of workers'
171 )
78cea37e 172 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 173 throw new TypeError(
0d80593b 174 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
175 )
176 } else if (numberOfWorkers < 0) {
473c717a 177 throw new RangeError(
8d3782fa
JB
178 'Cannot instantiate a pool with a negative number of workers'
179 )
6b27d407 180 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
181 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
182 }
183 }
184
185 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 186 if (this.type === PoolTypes.dynamic) {
a5ed75b7 187 if (max == null) {
e695d66f 188 throw new TypeError(
a5ed75b7
JB
189 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
190 )
191 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
192 throw new TypeError(
193 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
194 )
195 } else if (min > max) {
079de991
JB
196 throw new RangeError(
197 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
198 )
b97d82d8 199 } else if (max === 0) {
079de991 200 throw new RangeError(
d640b48b 201 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
202 )
203 } else if (min === max) {
204 throw new RangeError(
205 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
206 )
207 }
8d3782fa
JB
208 }
209 }
210
7c0ba920 211 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
212 if (isPlainObject(opts)) {
213 this.opts.workerChoiceStrategy =
214 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
215 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
216 this.opts.workerChoiceStrategyOptions = {
217 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
218 ...opts.workerChoiceStrategyOptions
219 }
49be33fe
JB
220 this.checkValidWorkerChoiceStrategyOptions(
221 this.opts.workerChoiceStrategyOptions
222 )
1f68cede 223 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
224 this.opts.enableEvents = opts.enableEvents ?? true
225 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
226 if (this.opts.enableTasksQueue) {
227 this.checkValidTasksQueueOptions(
228 opts.tasksQueueOptions as TasksQueueOptions
229 )
230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
231 opts.tasksQueueOptions as TasksQueueOptions
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 236 }
aee46736
JB
237 }
238
239 private checkValidWorkerChoiceStrategy (
240 workerChoiceStrategy: WorkerChoiceStrategy
241 ): void {
242 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 243 throw new Error(
aee46736 244 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
245 )
246 }
7c0ba920
JB
247 }
248
0d80593b
JB
249 private checkValidWorkerChoiceStrategyOptions (
250 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
251 ): void {
252 if (!isPlainObject(workerChoiceStrategyOptions)) {
253 throw new TypeError(
254 'Invalid worker choice strategy options: must be a plain object'
255 )
256 }
8990357d
JB
257 if (
258 workerChoiceStrategyOptions.choiceRetries != null &&
259 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
260 ) {
261 throw new TypeError(
262 'Invalid worker choice strategy options: choice retries must be an integer'
263 )
264 }
265 if (
266 workerChoiceStrategyOptions.choiceRetries != null &&
7e653ee0 267 workerChoiceStrategyOptions.choiceRetries < 0
8990357d
JB
268 ) {
269 throw new RangeError(
7e653ee0 270 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero`
8990357d
JB
271 )
272 }
49be33fe
JB
273 if (
274 workerChoiceStrategyOptions.weights != null &&
6b27d407 275 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
276 ) {
277 throw new Error(
278 'Invalid worker choice strategy options: must have a weight for each worker node'
279 )
280 }
f0d7f803
JB
281 if (
282 workerChoiceStrategyOptions.measurement != null &&
283 !Object.values(Measurements).includes(
284 workerChoiceStrategyOptions.measurement
285 )
286 ) {
287 throw new Error(
288 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
289 )
290 }
0d80593b
JB
291 }
292
a20f0ba5 293 private checkValidTasksQueueOptions (
76d91ea0 294 tasksQueueOptions: TasksQueueOptions
a20f0ba5 295 ): void {
0d80593b
JB
296 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
297 throw new TypeError('Invalid tasks queue options: must be a plain object')
298 }
f0d7f803
JB
299 if (
300 tasksQueueOptions?.concurrency != null &&
301 !Number.isSafeInteger(tasksQueueOptions.concurrency)
302 ) {
303 throw new TypeError(
20c6f652 304 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
305 )
306 }
307 if (
308 tasksQueueOptions?.concurrency != null &&
309 tasksQueueOptions.concurrency <= 0
310 ) {
e695d66f 311 throw new RangeError(
20c6f652
JB
312 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
313 )
314 }
68dbcdc0 315 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 316 throw new Error(
68dbcdc0 317 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
318 )
319 }
320 if (
ff3f866a
JB
321 tasksQueueOptions?.size != null &&
322 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 323 ) {
ff3f866a 324 throw new TypeError(
68dbcdc0 325 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
326 )
327 }
328 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 329 throw new RangeError(
68dbcdc0 330 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
331 )
332 }
333 }
334
e761c033
JB
335 private startPool (): void {
336 while (
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
340 0
341 ) < this.numberOfWorkers
342 ) {
aa9eede8 343 this.createAndSetupWorkerNode()
e761c033
JB
344 }
345 }
346
08f3f44c 347 /** @inheritDoc */
6b27d407
JB
348 public get info (): PoolInfo {
349 return {
23ccf9d7 350 version,
6b27d407 351 type: this.type,
184855e6 352 worker: this.worker,
2431bdb4
JB
353 ready: this.ready,
354 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
355 minSize: this.minSize,
356 maxSize: this.maxSize,
c05f0d50
JB
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.aggregate &&
1305e9a8
JB
359 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
360 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
361 workerNodes: this.workerNodes.length,
362 idleWorkerNodes: this.workerNodes.reduce(
363 (accumulator, workerNode) =>
f59e1027 364 workerNode.usage.tasks.executing === 0
a4e07f72
JB
365 ? accumulator + 1
366 : accumulator,
6b27d407
JB
367 0
368 ),
369 busyWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
f59e1027 371 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
372 0
373 ),
a4e07f72 374 executedTasks: this.workerNodes.reduce(
6b27d407 375 (accumulator, workerNode) =>
f59e1027 376 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
377 0
378 ),
379 executingTasks: this.workerNodes.reduce(
380 (accumulator, workerNode) =>
f59e1027 381 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
382 0
383 ),
daf86646
JB
384 ...(this.opts.enableTasksQueue === true && {
385 queuedTasks: this.workerNodes.reduce(
386 (accumulator, workerNode) =>
387 accumulator + workerNode.usage.tasks.queued,
388 0
389 )
390 }),
391 ...(this.opts.enableTasksQueue === true && {
392 maxQueuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
395 0
396 )
397 }),
a1763c54
JB
398 ...(this.opts.enableTasksQueue === true && {
399 backPressure: this.hasBackPressure()
400 }),
68cbdc84
JB
401 ...(this.opts.enableTasksQueue === true && {
402 stolenTasks: this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + workerNode.usage.tasks.stolen,
405 0
406 )
407 }),
a4e07f72
JB
408 failedTasks: this.workerNodes.reduce(
409 (accumulator, workerNode) =>
f59e1027 410 accumulator + workerNode.usage.tasks.failed,
a4e07f72 411 0
1dcf8b7b
JB
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .runTime.aggregate && {
415 runTime: {
98e72cda
JB
416 minimum: round(
417 Math.min(
418 ...this.workerNodes.map(
8ebe6c30 419 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 420 )
1dcf8b7b
JB
421 )
422 ),
98e72cda
JB
423 maximum: round(
424 Math.max(
425 ...this.workerNodes.map(
8ebe6c30 426 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 427 )
1dcf8b7b 428 )
98e72cda 429 ),
3baa0837
JB
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
438 )
98e72cda 439 )
dc021bcc 440 )
3baa0837 441 }),
98e72cda
JB
442 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
443 .runTime.median && {
444 median: round(
445 median(
3baa0837
JB
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.runTime.history),
449 []
98e72cda
JB
450 )
451 )
452 )
453 })
1dcf8b7b
JB
454 }
455 }),
456 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
457 .waitTime.aggregate && {
458 waitTime: {
98e72cda
JB
459 minimum: round(
460 Math.min(
461 ...this.workerNodes.map(
8ebe6c30 462 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 463 )
1dcf8b7b
JB
464 )
465 ),
98e72cda
JB
466 maximum: round(
467 Math.max(
468 ...this.workerNodes.map(
8ebe6c30 469 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 470 )
1dcf8b7b 471 )
98e72cda 472 ),
3baa0837
JB
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.average && {
475 average: round(
476 average(
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
481 )
98e72cda 482 )
dc021bcc 483 )
3baa0837 484 }),
98e72cda
JB
485 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
486 .waitTime.median && {
487 median: round(
488 median(
3baa0837
JB
489 this.workerNodes.reduce<number[]>(
490 (accumulator, workerNode) =>
491 accumulator.concat(workerNode.usage.waitTime.history),
492 []
98e72cda
JB
493 )
494 )
495 )
496 })
1dcf8b7b
JB
497 }
498 })
6b27d407
JB
499 }
500 }
08f3f44c 501
aa9eede8
JB
502 /**
503 * The pool readiness boolean status.
504 */
2431bdb4
JB
505 private get ready (): boolean {
506 return (
b97d82d8
JB
507 this.workerNodes.reduce(
508 (accumulator, workerNode) =>
509 !workerNode.info.dynamic && workerNode.info.ready
510 ? accumulator + 1
511 : accumulator,
512 0
513 ) >= this.minSize
2431bdb4
JB
514 )
515 }
516
afe0d5bf 517 /**
aa9eede8 518 * The approximate pool utilization.
afe0d5bf
JB
519 *
520 * @returns The pool utilization.
521 */
522 private get utilization (): number {
8e5ca040 523 const poolTimeCapacity =
fe7d90db 524 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
525 const totalTasksRunTime = this.workerNodes.reduce(
526 (accumulator, workerNode) =>
71514351 527 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
528 0
529 )
530 const totalTasksWaitTime = this.workerNodes.reduce(
531 (accumulator, workerNode) =>
71514351 532 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
533 0
534 )
8e5ca040 535 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
536 }
537
8881ae32 538 /**
aa9eede8 539 * The pool type.
8881ae32
JB
540 *
541 * If it is `'dynamic'`, it provides the `max` property.
542 */
543 protected abstract get type (): PoolType
544
184855e6 545 /**
aa9eede8 546 * The worker type.
184855e6
JB
547 */
548 protected abstract get worker (): WorkerType
549
c2ade475 550 /**
aa9eede8 551 * The pool minimum size.
c2ade475 552 */
8735b4e5
JB
553 protected get minSize (): number {
554 return this.numberOfWorkers
555 }
ff733df7
JB
556
557 /**
aa9eede8 558 * The pool maximum size.
ff733df7 559 */
8735b4e5
JB
560 protected get maxSize (): number {
561 return this.max ?? this.numberOfWorkers
562 }
a35560ba 563
6b813701
JB
564 /**
565 * Checks if the worker id sent in the received message from a worker is valid.
566 *
567 * @param message - The received message.
568 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
569 */
21f710aa 570 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
571 if (message.workerId == null) {
572 throw new Error('Worker message received without worker id')
573 } else if (
21f710aa 574 message.workerId != null &&
aad6fb64 575 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
576 ) {
577 throw new Error(
578 `Worker message received from unknown worker '${message.workerId}'`
579 )
580 }
581 }
582
ffcbbad8 583 /**
f06e48d8 584 * Gets the given worker its worker node key.
ffcbbad8
JB
585 *
586 * @param worker - The worker.
f59e1027 587 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 588 */
aad6fb64 589 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 590 return this.workerNodes.findIndex(
8ebe6c30 591 (workerNode) => workerNode.worker === worker
f06e48d8 592 )
bf9549ae
JB
593 }
594
aa9eede8
JB
595 /**
596 * Gets the worker node key given its worker id.
597 *
598 * @param workerId - The worker id.
aad6fb64 599 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 600 */
aad6fb64
JB
601 private getWorkerNodeKeyByWorkerId (workerId: number): number {
602 return this.workerNodes.findIndex(
8ebe6c30 603 (workerNode) => workerNode.info.id === workerId
aad6fb64 604 )
aa9eede8
JB
605 }
606
afc003b2 607 /** @inheritDoc */
a35560ba 608 public setWorkerChoiceStrategy (
59219cbb
JB
609 workerChoiceStrategy: WorkerChoiceStrategy,
610 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 611 ): void {
aee46736 612 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 613 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
614 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
615 this.opts.workerChoiceStrategy
616 )
617 if (workerChoiceStrategyOptions != null) {
618 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
619 }
aa9eede8 620 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 621 workerNode.resetUsage()
9edb9717 622 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 623 }
a20f0ba5
JB
624 }
625
626 /** @inheritDoc */
627 public setWorkerChoiceStrategyOptions (
628 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
629 ): void {
0d80593b 630 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
631 this.opts.workerChoiceStrategyOptions = {
632 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
633 ...workerChoiceStrategyOptions
634 }
a20f0ba5
JB
635 this.workerChoiceStrategyContext.setOptions(
636 this.opts.workerChoiceStrategyOptions
a35560ba
S
637 )
638 }
639
a20f0ba5 640 /** @inheritDoc */
8f52842f
JB
641 public enableTasksQueue (
642 enable: boolean,
643 tasksQueueOptions?: TasksQueueOptions
644 ): void {
a20f0ba5 645 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 646 this.flushTasksQueues()
a20f0ba5
JB
647 }
648 this.opts.enableTasksQueue = enable
8f52842f 649 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
650 }
651
652 /** @inheritDoc */
8f52842f 653 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 654 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
655 this.checkValidTasksQueueOptions(tasksQueueOptions)
656 this.opts.tasksQueueOptions =
657 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 658 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 659 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
660 delete this.opts.tasksQueueOptions
661 }
662 }
663
ff3f866a 664 private setTasksQueueMaxSize (size: number): void {
20c6f652 665 for (const workerNode of this.workerNodes) {
ff3f866a 666 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
667 }
668 }
669
a20f0ba5
JB
670 private buildTasksQueueOptions (
671 tasksQueueOptions: TasksQueueOptions
672 ): TasksQueueOptions {
673 return {
20c6f652 674 ...{
ff3f866a 675 size: Math.pow(this.maxSize, 2),
20c6f652
JB
676 concurrency: 1
677 },
678 ...tasksQueueOptions
a20f0ba5
JB
679 }
680 }
681
c319c66b
JB
682 /**
683 * Whether the pool is full or not.
684 *
685 * The pool filling boolean status.
686 */
dea903a8
JB
687 protected get full (): boolean {
688 return this.workerNodes.length >= this.maxSize
689 }
c2ade475 690
c319c66b
JB
691 /**
692 * Whether the pool is busy or not.
693 *
694 * The pool busyness boolean status.
695 */
696 protected abstract get busy (): boolean
7c0ba920 697
6c6afb84 698 /**
3d76750a 699 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
700 *
701 * @returns Worker nodes busyness boolean status.
702 */
c2ade475 703 protected internalBusy (): boolean {
3d76750a
JB
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes.findIndex(
8ebe6c30 707 (workerNode) =>
3d76750a
JB
708 workerNode.info.ready &&
709 workerNode.usage.tasks.executing <
710 (this.opts.tasksQueueOptions?.concurrency as number)
711 ) === -1
712 )
713 } else {
714 return (
715 this.workerNodes.findIndex(
8ebe6c30 716 (workerNode) =>
3d76750a
JB
717 workerNode.info.ready && workerNode.usage.tasks.executing === 0
718 ) === -1
719 )
720 }
cb70b19d
JB
721 }
722
90d7d101
JB
723 /** @inheritDoc */
724 public listTaskFunctions (): string[] {
f2dbbf95
JB
725 for (const workerNode of this.workerNodes) {
726 if (
727 Array.isArray(workerNode.info.taskFunctions) &&
728 workerNode.info.taskFunctions.length > 0
729 ) {
730 return workerNode.info.taskFunctions
731 }
90d7d101 732 }
f2dbbf95 733 return []
90d7d101
JB
734 }
735
afc003b2 736 /** @inheritDoc */
7d91a8cd
JB
737 public async execute (
738 data?: Data,
739 name?: string,
740 transferList?: TransferListItem[]
741 ): Promise<Response> {
52b71763 742 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
743 if (!this.started) {
744 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 745 return
15b176e0 746 }
7d91a8cd
JB
747 if (name != null && typeof name !== 'string') {
748 reject(new TypeError('name argument must be a string'))
9d2d0da1 749 return
7d91a8cd 750 }
90d7d101
JB
751 if (
752 name != null &&
753 typeof name === 'string' &&
754 name.trim().length === 0
755 ) {
f58b60b9 756 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 757 return
90d7d101 758 }
b558f6b5
JB
759 if (transferList != null && !Array.isArray(transferList)) {
760 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 761 return
b558f6b5
JB
762 }
763 const timestamp = performance.now()
764 const workerNodeKey = this.chooseWorkerNode()
94407def 765 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
501aea93 766 const task: Task<Data> = {
52b71763
JB
767 name: name ?? DEFAULT_TASK_NAME,
768 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
769 data: data ?? ({} as Data),
7d91a8cd 770 transferList,
52b71763 771 timestamp,
a5d15204 772 workerId: workerInfo.id as number,
7629bdf1 773 taskId: randomUUID()
52b71763 774 }
7629bdf1 775 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
776 resolve,
777 reject,
501aea93 778 workerNodeKey
2e81254d 779 })
52b71763 780 if (
4e377863
JB
781 this.opts.enableTasksQueue === false ||
782 (this.opts.enableTasksQueue === true &&
033f1776 783 this.tasksQueueSize(workerNodeKey) === 0 &&
4e377863 784 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 785 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 786 ) {
501aea93 787 this.executeTask(workerNodeKey, task)
4e377863
JB
788 } else {
789 this.enqueueTask(workerNodeKey, task)
52b71763 790 }
2e81254d 791 })
280c2a77 792 }
c97c7edb 793
afc003b2 794 /** @inheritDoc */
c97c7edb 795 public async destroy (): Promise<void> {
1fbcaa7c 796 await Promise.all(
81c02522 797 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 798 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
799 })
800 )
33e6bb4c 801 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 802 this.started = false
c97c7edb
S
803 }
804
1e3214b6
JB
805 protected async sendKillMessageToWorker (
806 workerNodeKey: number,
807 workerId: number
808 ): Promise<void> {
9edb9717 809 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
810 this.registerWorkerMessageListener(workerNodeKey, (message) => {
811 if (message.kill === 'success') {
812 resolve()
813 } else if (message.kill === 'failure') {
e1af34e6 814 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
815 }
816 })
9edb9717 817 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 818 })
1e3214b6
JB
819 }
820
4a6952ff 821 /**
aa9eede8 822 * Terminates the worker node given its worker node key.
4a6952ff 823 *
aa9eede8 824 * @param workerNodeKey - The worker node key.
4a6952ff 825 */
81c02522 826 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 827
729c563d 828 /**
6677a3d3
JB
829 * Setup hook to execute code before worker nodes are created in the abstract constructor.
830 * Can be overridden.
afc003b2
JB
831 *
832 * @virtual
729c563d 833 */
280c2a77 834 protected setupHook (): void {
033f1776 835 /** Intentionally empty */
280c2a77 836 }
c97c7edb 837
729c563d 838 /**
280c2a77
S
839 * Should return whether the worker is the main worker or not.
840 */
841 protected abstract isMain (): boolean
842
843 /**
2e81254d 844 * Hook executed before the worker task execution.
bf9549ae 845 * Can be overridden.
729c563d 846 *
f06e48d8 847 * @param workerNodeKey - The worker node key.
1c6fe997 848 * @param task - The task to execute.
729c563d 849 */
1c6fe997
JB
850 protected beforeTaskExecutionHook (
851 workerNodeKey: number,
852 task: Task<Data>
853 ): void {
94407def
JB
854 if (this.workerNodes[workerNodeKey]?.usage != null) {
855 const workerUsage = this.workerNodes[workerNodeKey].usage
856 ++workerUsage.tasks.executing
857 this.updateWaitTimeWorkerUsage(workerUsage, task)
858 }
859 if (
860 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
861 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
862 task.name as string
863 ) != null
864 ) {
db0e38ee 865 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 866 workerNodeKey
db0e38ee 867 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
868 ++taskFunctionWorkerUsage.tasks.executing
869 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 870 }
c97c7edb
S
871 }
872
c01733f1 873 /**
2e81254d 874 * Hook executed after the worker task execution.
bf9549ae 875 * Can be overridden.
c01733f1 876 *
501aea93 877 * @param workerNodeKey - The worker node key.
38e795c1 878 * @param message - The received message.
c01733f1 879 */
2e81254d 880 protected afterTaskExecutionHook (
501aea93 881 workerNodeKey: number,
2740a743 882 message: MessageValue<Response>
bf9549ae 883 ): void {
94407def
JB
884 if (this.workerNodes[workerNodeKey]?.usage != null) {
885 const workerUsage = this.workerNodes[workerNodeKey].usage
886 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
887 this.updateRunTimeWorkerUsage(workerUsage, message)
888 this.updateEluWorkerUsage(workerUsage, message)
889 }
890 if (
891 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
892 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 893 message.taskPerformance?.name as string
94407def
JB
894 ) != null
895 ) {
db0e38ee 896 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 897 workerNodeKey
db0e38ee 898 ].getTaskFunctionWorkerUsage(
0628755c 899 message.taskPerformance?.name as string
b558f6b5 900 ) as WorkerUsage
db0e38ee
JB
901 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
902 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
903 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
904 }
905 }
906
db0e38ee
JB
907 /**
908 * Whether the worker node shall update its task function worker usage or not.
909 *
910 * @param workerNodeKey - The worker node key.
911 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
912 */
913 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 914 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 915 return (
94407def 916 workerInfo != null &&
a5d15204 917 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 918 workerInfo.taskFunctions.length > 2
b558f6b5 919 )
f1c06930
JB
920 }
921
922 private updateTaskStatisticsWorkerUsage (
923 workerUsage: WorkerUsage,
924 message: MessageValue<Response>
925 ): void {
a4e07f72 926 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
927 if (
928 workerTaskStatistics.executing != null &&
929 workerTaskStatistics.executing > 0
930 ) {
931 --workerTaskStatistics.executing
5bb5be17 932 }
98e72cda
JB
933 if (message.taskError == null) {
934 ++workerTaskStatistics.executed
935 } else {
a4e07f72 936 ++workerTaskStatistics.failed
2740a743 937 }
f8eb0a2a
JB
938 }
939
a4e07f72
JB
940 private updateRunTimeWorkerUsage (
941 workerUsage: WorkerUsage,
f8eb0a2a
JB
942 message: MessageValue<Response>
943 ): void {
dc021bcc
JB
944 if (message.taskError != null) {
945 return
946 }
e4f20deb
JB
947 updateMeasurementStatistics(
948 workerUsage.runTime,
949 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 950 message.taskPerformance?.runTime ?? 0
e4f20deb 951 )
f8eb0a2a
JB
952 }
953
a4e07f72
JB
954 private updateWaitTimeWorkerUsage (
955 workerUsage: WorkerUsage,
1c6fe997 956 task: Task<Data>
f8eb0a2a 957 ): void {
1c6fe997
JB
958 const timestamp = performance.now()
959 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
960 updateMeasurementStatistics(
961 workerUsage.waitTime,
962 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 963 taskWaitTime
e4f20deb 964 )
c01733f1 965 }
966
a4e07f72 967 private updateEluWorkerUsage (
5df69fab 968 workerUsage: WorkerUsage,
62c15a68
JB
969 message: MessageValue<Response>
970 ): void {
dc021bcc
JB
971 if (message.taskError != null) {
972 return
973 }
008512c7
JB
974 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
975 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
976 updateMeasurementStatistics(
977 workerUsage.elu.active,
008512c7 978 eluTaskStatisticsRequirements,
dc021bcc 979 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
980 )
981 updateMeasurementStatistics(
982 workerUsage.elu.idle,
008512c7 983 eluTaskStatisticsRequirements,
dc021bcc 984 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 985 )
008512c7 986 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 987 if (message.taskPerformance?.elu != null) {
f7510105
JB
988 if (workerUsage.elu.utilization != null) {
989 workerUsage.elu.utilization =
990 (workerUsage.elu.utilization +
991 message.taskPerformance.elu.utilization) /
992 2
993 } else {
994 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
995 }
62c15a68
JB
996 }
997 }
998 }
999
280c2a77 1000 /**
f06e48d8 1001 * Chooses a worker node for the next task.
280c2a77 1002 *
6c6afb84 1003 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1004 *
aa9eede8 1005 * @returns The chosen worker node key
280c2a77 1006 */
6c6afb84 1007 private chooseWorkerNode (): number {
930dcf12 1008 if (this.shallCreateDynamicWorker()) {
aa9eede8 1009 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1010 if (
b1aae695 1011 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1012 ) {
aa9eede8 1013 return workerNodeKey
6c6afb84 1014 }
17393ac8 1015 }
930dcf12
JB
1016 return this.workerChoiceStrategyContext.execute()
1017 }
1018
6c6afb84
JB
1019 /**
1020 * Conditions for dynamic worker creation.
1021 *
1022 * @returns Whether to create a dynamic worker or not.
1023 */
1024 private shallCreateDynamicWorker (): boolean {
930dcf12 1025 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1026 }
1027
280c2a77 1028 /**
aa9eede8 1029 * Sends a message to worker given its worker node key.
280c2a77 1030 *
aa9eede8 1031 * @param workerNodeKey - The worker node key.
38e795c1 1032 * @param message - The message.
7d91a8cd 1033 * @param transferList - The optional array of transferable objects.
280c2a77
S
1034 */
1035 protected abstract sendToWorker (
aa9eede8 1036 workerNodeKey: number,
7d91a8cd
JB
1037 message: MessageValue<Data>,
1038 transferList?: TransferListItem[]
280c2a77
S
1039 ): void
1040
729c563d 1041 /**
41344292 1042 * Creates a new worker.
6c6afb84
JB
1043 *
1044 * @returns Newly created worker.
729c563d 1045 */
280c2a77 1046 protected abstract createWorker (): Worker
c97c7edb 1047
4a6952ff 1048 /**
aa9eede8 1049 * Creates a new, completely set up worker node.
4a6952ff 1050 *
aa9eede8 1051 * @returns New, completely set up worker node key.
4a6952ff 1052 */
aa9eede8 1053 protected createAndSetupWorkerNode (): number {
bdacc2d2 1054 const worker = this.createWorker()
280c2a77 1055
fd04474e 1056 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1057 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1058 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1059 worker.on('error', (error) => {
aad6fb64 1060 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1061 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1062 workerInfo.ready = false
0dc838e3 1063 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1064 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1065 if (
1066 this.opts.restartWorkerOnError === true &&
1067 !this.starting &&
1068 this.started
1069 ) {
9b106837 1070 if (workerInfo.dynamic) {
aa9eede8 1071 this.createAndSetupDynamicWorkerNode()
8a1260a3 1072 } else {
aa9eede8 1073 this.createAndSetupWorkerNode()
8a1260a3 1074 }
5baee0d7 1075 }
19dbc45b 1076 if (this.opts.enableTasksQueue === true) {
9b106837 1077 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1078 }
5baee0d7 1079 })
a35560ba 1080 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1081 worker.once('exit', () => {
f06e48d8 1082 this.removeWorkerNode(worker)
a974afa6 1083 })
280c2a77 1084
aa9eede8 1085 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1086
aa9eede8 1087 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1088
aa9eede8 1089 return workerNodeKey
c97c7edb 1090 }
be0676b3 1091
930dcf12 1092 /**
aa9eede8 1093 * Creates a new, completely set up dynamic worker node.
930dcf12 1094 *
aa9eede8 1095 * @returns New, completely set up dynamic worker node key.
930dcf12 1096 */
aa9eede8
JB
1097 protected createAndSetupDynamicWorkerNode (): number {
1098 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1099 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1100 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1101 message.workerId
aad6fb64 1102 )
aa9eede8 1103 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1104 // Kill message received from worker
930dcf12
JB
1105 if (
1106 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1107 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1108 ((this.opts.enableTasksQueue === false &&
aa9eede8 1109 workerUsage.tasks.executing === 0) ||
7b56f532 1110 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1111 workerUsage.tasks.executing === 0 &&
1112 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1113 ) {
5270d253
JB
1114 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1115 this.emitter?.emit(PoolEvents.error, error)
1116 })
930dcf12
JB
1117 }
1118 })
94407def 1119 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1120 this.sendToWorker(workerNodeKey, {
b0a4db63 1121 checkActive: true,
21f710aa
JB
1122 workerId: workerInfo.id as number
1123 })
b5e113f6 1124 workerInfo.dynamic = true
b1aae695
JB
1125 if (
1126 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1127 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1128 ) {
b5e113f6
JB
1129 workerInfo.ready = true
1130 }
33e6bb4c 1131 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1132 return workerNodeKey
930dcf12
JB
1133 }
1134
a2ed5053 1135 /**
aa9eede8 1136 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1137 *
aa9eede8 1138 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1139 * @param listener - The message listener callback.
1140 */
85aeb3f3
JB
1141 protected abstract registerWorkerMessageListener<
1142 Message extends Data | Response
aa9eede8
JB
1143 >(
1144 workerNodeKey: number,
1145 listener: (message: MessageValue<Message>) => void
1146 ): void
a2ed5053
JB
1147
1148 /**
aa9eede8 1149 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1150 * Can be overridden.
1151 *
aa9eede8 1152 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1153 */
aa9eede8 1154 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1155 // Listen to worker messages.
aa9eede8 1156 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1157 // Send the startup message to worker.
aa9eede8 1158 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1159 // Send the statistics message to worker.
1160 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1161 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1162 this.workerNodes[workerNodeKey].onEmptyQueue =
1163 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1164 this.workerNodes[workerNodeKey].onBackPressure =
1165 this.tasksStealingOnBackPressure.bind(this)
1166 }
d2c73f82
JB
1167 }
1168
85aeb3f3 1169 /**
aa9eede8
JB
1170 * Sends the startup message to worker given its worker node key.
1171 *
1172 * @param workerNodeKey - The worker node key.
1173 */
1174 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1175
1176 /**
9edb9717 1177 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1178 *
aa9eede8 1179 * @param workerNodeKey - The worker node key.
85aeb3f3 1180 */
9edb9717 1181 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1182 this.sendToWorker(workerNodeKey, {
1183 statistics: {
1184 runTime:
1185 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1186 .runTime.aggregate,
1187 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1188 .elu.aggregate
1189 },
94407def 1190 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1191 })
1192 }
a2ed5053
JB
1193
1194 private redistributeQueuedTasks (workerNodeKey: number): void {
1195 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1196 let destinationWorkerNodeKey!: number
a2ed5053 1197 let minQueuedTasks = Infinity
a6b3272b 1198 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3 1199 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
58baffd3
JB
1200 if (workerNode.usage.tasks.queued === 0) {
1201 destinationWorkerNodeKey = workerNodeId
1202 break
1203 }
1204 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1205 minQueuedTasks = workerNode.usage.tasks.queued
1206 destinationWorkerNodeKey = workerNodeId
1207 }
a2ed5053
JB
1208 }
1209 }
0bc68267 1210 if (destinationWorkerNodeKey != null) {
033f1776 1211 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1212 const task = {
1213 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1214 workerId: destinationWorkerNode.info.id as number
0bc68267 1215 }
033f1776
JB
1216 if (
1217 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1218 destinationWorkerNode.usage.tasks.executing <
1219 (this.opts.tasksQueueOptions?.concurrency as number)
1220 ) {
0bc68267
JB
1221 this.executeTask(destinationWorkerNodeKey, task)
1222 } else {
1223 this.enqueueTask(destinationWorkerNodeKey, task)
1224 }
dd951876
JB
1225 }
1226 }
1227 }
1228
1229 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1230 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1231 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1232 const workerNodes = this.workerNodes
a6b3272b 1233 .slice()
dd951876
JB
1234 .sort(
1235 (workerNodeA, workerNodeB) =>
1236 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1237 )
dd951876 1238 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1239 if (sourceWorkerNode.usage.tasks.queued === 0) {
1240 break
1241 }
a6b3272b
JB
1242 if (
1243 sourceWorkerNode.info.ready &&
1244 sourceWorkerNode.info.id !== workerId &&
1245 sourceWorkerNode.usage.tasks.queued > 0
1246 ) {
1247 const task = {
1248 ...(sourceWorkerNode.popTask() as Task<Data>),
1249 workerId: destinationWorkerNode.info.id as number
1250 }
dd951876 1251 if (
2cb4ff45 1252 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
a6b3272b 1253 destinationWorkerNode.usage.tasks.executing <
033f1776 1254 (this.opts.tasksQueueOptions?.concurrency as number)
dd951876 1255 ) {
2cb4ff45
JB
1256 this.executeTask(destinationWorkerNodeKey, task)
1257 } else {
1258 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876 1259 }
b2559a74
JB
1260 if (destinationWorkerNode?.usage != null) {
1261 ++destinationWorkerNode.usage.tasks.stolen
1262 }
1263 if (
1264 this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
1265 destinationWorkerNode.getTaskFunctionWorkerUsage(
1266 task.name as string
1267 ) != null
1268 ) {
68cbdc84
JB
1269 const taskFunctionWorkerUsage =
1270 destinationWorkerNode.getTaskFunctionWorkerUsage(
1271 task.name as string
1272 ) as WorkerUsage
1273 ++taskFunctionWorkerUsage.tasks.stolen
1274 }
dd951876 1275 break
72695f86
JB
1276 }
1277 }
1278 }
1279
1280 private tasksStealingOnBackPressure (workerId: number): void {
68dbcdc0
JB
1281 if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
1282 return
1283 }
72695f86
JB
1284 const sourceWorkerNode =
1285 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1286 const workerNodes = this.workerNodes
a6b3272b 1287 .slice()
72695f86
JB
1288 .sort(
1289 (workerNodeA, workerNodeB) =>
1290 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1291 )
1292 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1293 if (
0bc68267 1294 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1295 workerNode.info.ready &&
1296 workerNode.info.id !== workerId &&
0bc68267
JB
1297 workerNode.usage.tasks.queued <
1298 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1299 ) {
dd951876
JB
1300 const task = {
1301 ...(sourceWorkerNode.popTask() as Task<Data>),
1302 workerId: workerNode.info.id as number
1303 }
241cf15e
JB
1304 if (
1305 this.tasksQueueSize(workerNodeKey) === 0 &&
1306 workerNode.usage.tasks.executing <
1307 (this.opts.tasksQueueOptions?.concurrency as number)
1308 ) {
dd951876 1309 this.executeTask(workerNodeKey, task)
4de3d785 1310 } else {
dd951876 1311 this.enqueueTask(workerNodeKey, task)
4de3d785 1312 }
b2559a74
JB
1313 if (workerNode?.usage != null) {
1314 ++workerNode.usage.tasks.stolen
1315 }
1316 if (
1317 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1318 workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
1319 ) {
68cbdc84
JB
1320 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1321 task.name as string
1322 ) as WorkerUsage
1323 ++taskFunctionWorkerUsage.tasks.stolen
1324 }
10ecf8fd 1325 }
a2ed5053
JB
1326 }
1327 }
1328
be0676b3 1329 /**
aa9eede8 1330 * This method is the listener registered for each worker message.
be0676b3 1331 *
bdacc2d2 1332 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1333 */
1334 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1335 return (message) => {
21f710aa 1336 this.checkMessageWorkerId(message)
a5d15204 1337 if (message.ready != null && message.taskFunctions != null) {
81c02522 1338 // Worker ready response received from worker
10e2aa7e 1339 this.handleWorkerReadyResponse(message)
7629bdf1 1340 } else if (message.taskId != null) {
81c02522 1341 // Task execution response received from worker
6b272951 1342 this.handleTaskExecutionResponse(message)
90d7d101
JB
1343 } else if (message.taskFunctions != null) {
1344 // Task functions message received from worker
94407def
JB
1345 (
1346 this.getWorkerInfo(
1347 this.getWorkerNodeKeyByWorkerId(message.workerId)
1348 ) as WorkerInfo
b558f6b5 1349 ).taskFunctions = message.taskFunctions
6b272951
JB
1350 }
1351 }
1352 }
1353
10e2aa7e 1354 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1355 if (message.ready === false) {
1356 throw new Error(`Worker ${message.workerId} failed to initialize`)
1357 }
a5d15204 1358 const workerInfo = this.getWorkerInfo(
aad6fb64 1359 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1360 ) as WorkerInfo
a5d15204
JB
1361 workerInfo.ready = message.ready as boolean
1362 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1363 if (this.emitter != null && this.ready) {
1364 this.emitter.emit(PoolEvents.ready, this.info)
1365 }
6b272951
JB
1366 }
1367
1368 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1369 const { taskId, taskError, data } = message
1370 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1371 if (promiseResponse != null) {
5441aea6
JB
1372 if (taskError != null) {
1373 this.emitter?.emit(PoolEvents.taskError, taskError)
1374 promiseResponse.reject(taskError.message)
6b272951 1375 } else {
5441aea6 1376 promiseResponse.resolve(data as Response)
6b272951 1377 }
501aea93
JB
1378 const workerNodeKey = promiseResponse.workerNodeKey
1379 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1380 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1381 if (
1382 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1383 this.tasksQueueSize(workerNodeKey) > 0 &&
1384 this.workerNodes[workerNodeKey].usage.tasks.executing <
1385 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1386 ) {
1387 this.executeTask(
1388 workerNodeKey,
1389 this.dequeueTask(workerNodeKey) as Task<Data>
1390 )
be0676b3 1391 }
6b272951 1392 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1393 }
be0676b3 1394 }
7c0ba920 1395
a1763c54 1396 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1397 if (this.busy) {
1398 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1399 }
1400 }
1401
1402 private checkAndEmitTaskQueuingEvents (): void {
1403 if (this.hasBackPressure()) {
1404 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1405 }
1406 }
1407
33e6bb4c
JB
1408 private checkAndEmitDynamicWorkerCreationEvents (): void {
1409 if (this.type === PoolTypes.dynamic) {
1410 if (this.full) {
1411 this.emitter?.emit(PoolEvents.full, this.info)
1412 }
1413 }
1414 }
1415
8a1260a3 1416 /**
aa9eede8 1417 * Gets the worker information given its worker node key.
8a1260a3
JB
1418 *
1419 * @param workerNodeKey - The worker node key.
3f09ed9f 1420 * @returns The worker information.
8a1260a3 1421 */
94407def
JB
1422 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1423 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1424 }
1425
a05c10de 1426 /**
b0a4db63 1427 * Adds the given worker in the pool worker nodes.
ea7a90d3 1428 *
38e795c1 1429 * @param worker - The worker.
aa9eede8
JB
1430 * @returns The added worker node key.
1431 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1432 */
b0a4db63 1433 private addWorkerNode (worker: Worker): number {
671d5154
JB
1434 const workerNode = new WorkerNode<Worker, Data>(
1435 worker,
1436 this.worker,
ff3f866a 1437 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1438 )
b97d82d8 1439 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1440 if (this.starting) {
1441 workerNode.info.ready = true
1442 }
aa9eede8 1443 this.workerNodes.push(workerNode)
aad6fb64 1444 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1445 if (workerNodeKey === -1) {
a5d15204 1446 throw new Error('Worker node added not found')
aa9eede8
JB
1447 }
1448 return workerNodeKey
ea7a90d3 1449 }
c923ce56 1450
51fe3d3c 1451 /**
f06e48d8 1452 * Removes the given worker from the pool worker nodes.
51fe3d3c 1453 *
f06e48d8 1454 * @param worker - The worker.
51fe3d3c 1455 */
416fd65c 1456 private removeWorkerNode (worker: Worker): void {
aad6fb64 1457 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1458 if (workerNodeKey !== -1) {
1459 this.workerNodes.splice(workerNodeKey, 1)
1460 this.workerChoiceStrategyContext.remove(workerNodeKey)
1461 }
51fe3d3c 1462 }
adc3c320 1463
e2b31e32
JB
1464 /** @inheritDoc */
1465 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1466 return (
e2b31e32
JB
1467 this.opts.enableTasksQueue === true &&
1468 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1469 )
1470 }
1471
1472 private hasBackPressure (): boolean {
1473 return (
1474 this.opts.enableTasksQueue === true &&
1475 this.workerNodes.findIndex(
1476 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1477 ) === -1
9e844245 1478 )
e2b31e32
JB
1479 }
1480
b0a4db63 1481 /**
aa9eede8 1482 * Executes the given task on the worker given its worker node key.
b0a4db63 1483 *
aa9eede8 1484 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1485 * @param task - The task to execute.
1486 */
2e81254d 1487 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1488 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1489 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1490 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1491 }
1492
f9f00b5f 1493 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1494 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1495 this.checkAndEmitTaskQueuingEvents()
1496 return tasksQueueSize
adc3c320
JB
1497 }
1498
416fd65c 1499 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1500 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1501 }
1502
416fd65c 1503 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1504 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1505 }
1506
81c02522 1507 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1508 while (this.tasksQueueSize(workerNodeKey) > 0) {
1509 this.executeTask(
1510 workerNodeKey,
1511 this.dequeueTask(workerNodeKey) as Task<Data>
1512 )
ff733df7 1513 }
4b628b48 1514 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1515 }
1516
ef41a6e6
JB
1517 private flushTasksQueues (): void {
1518 for (const [workerNodeKey] of this.workerNodes.entries()) {
1519 this.flushTasksQueue(workerNodeKey)
1520 }
1521 }
c97c7edb 1522}