perf: remove unneeded condition in tasks stealing code
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
dc021bcc 15 average,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
afe0d5bf 18 median,
e4f20deb
JB
19 round,
20 updateMeasurementStatistics
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
bbfa38a2
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
23ccf9d7 50
729c563d 51/**
ea7a90d3 52 * Base class that implements some shared logic for all poolifier pools.
729c563d 53 *
38e795c1 54 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 57 */
c97c7edb 58export abstract class AbstractPool<
f06e48d8 59 Worker extends IWorker,
d3c8a1a8
S
60 Data = unknown,
61 Response = unknown
c4855468 62> implements IPool<Worker, Data, Response> {
afc003b2 63 /** @inheritDoc */
4b628b48 64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 65
afc003b2 66 /** @inheritDoc */
7c0ba920
JB
67 public readonly emitter?: PoolEmitter
68
be0676b3 69 /**
52b71763 70 * The task execution response promise map.
be0676b3 71 *
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
075e51d1 94 /**
adc9cc64 95 * Whether the pool is starting or not.
075e51d1
JB
96 */
97 private readonly starting: boolean
15b176e0
JB
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
075e51d1 147 this.starting = true
e761c033 148 this.startPool()
075e51d1 149 this.starting = false
15b176e0 150 this.started = true
afe0d5bf
JB
151
152 this.startTimestamp = performance.now()
c97c7edb
S
153 }
154
a35560ba 155 private checkFilePath (filePath: string): void {
ffcbbad8
JB
156 if (
157 filePath == null ||
3d6dd312 158 typeof filePath !== 'string' ||
ffcbbad8
JB
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
c510fea7
APA
161 throw new Error('Please specify a file with a worker implementation')
162 }
3d6dd312
JB
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
c510fea7
APA
166 }
167
8d3782fa
JB
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
78cea37e 173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 174 throw new TypeError(
0d80593b 175 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
176 )
177 } else if (numberOfWorkers < 0) {
473c717a 178 throw new RangeError(
8d3782fa
JB
179 'Cannot instantiate a pool with a negative number of workers'
180 )
6b27d407 181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 187 if (this.type === PoolTypes.dynamic) {
a5ed75b7 188 if (max == null) {
e695d66f 189 throw new TypeError(
a5ed75b7
JB
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
079de991
JB
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
b97d82d8 200 } else if (max === 0) {
079de991 201 throw new RangeError(
d640b48b 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
8d3782fa
JB
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
49be33fe
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 244 throw new Error(
aee46736 245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
246 )
247 }
7c0ba920
JB
248 }
249
0d80593b
JB
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
8990357d
JB
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
7e653ee0 268 workerChoiceStrategyOptions.choiceRetries < 0
8990357d
JB
269 ) {
270 throw new RangeError(
7e653ee0 271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater or equal than zero`
8990357d
JB
272 )
273 }
49be33fe
JB
274 if (
275 workerChoiceStrategyOptions.weights != null &&
6b27d407 276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
f0d7f803
JB
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
0d80593b
JB
292 }
293
a20f0ba5 294 private checkValidTasksQueueOptions (
ff3f866a 295 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 296 ): void {
0d80593b
JB
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
f0d7f803
JB
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
20c6f652 305 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
e695d66f 312 throw new RangeError(
20c6f652
JB
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
68dbcdc0 316 if (tasksQueueOptions?.queueMaxSize != null) {
ff3f866a 317 throw new Error(
68dbcdc0 318 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
20c6f652
JB
319 )
320 }
321 if (
ff3f866a
JB
322 tasksQueueOptions?.size != null &&
323 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 324 ) {
ff3f866a 325 throw new TypeError(
68dbcdc0 326 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
327 )
328 }
329 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 330 throw new RangeError(
68dbcdc0 331 `Invalid worker node tasks queue size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
332 )
333 }
334 }
335
e761c033
JB
336 private startPool (): void {
337 while (
338 this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
341 0
342 ) < this.numberOfWorkers
343 ) {
aa9eede8 344 this.createAndSetupWorkerNode()
e761c033
JB
345 }
346 }
347
08f3f44c 348 /** @inheritDoc */
6b27d407
JB
349 public get info (): PoolInfo {
350 return {
23ccf9d7 351 version,
6b27d407 352 type: this.type,
184855e6 353 worker: this.worker,
2431bdb4
JB
354 ready: this.ready,
355 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
356 minSize: this.minSize,
357 maxSize: this.maxSize,
c05f0d50
JB
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .runTime.aggregate &&
1305e9a8
JB
360 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
362 workerNodes: this.workerNodes.length,
363 idleWorkerNodes: this.workerNodes.reduce(
364 (accumulator, workerNode) =>
f59e1027 365 workerNode.usage.tasks.executing === 0
a4e07f72
JB
366 ? accumulator + 1
367 : accumulator,
6b27d407
JB
368 0
369 ),
370 busyWorkerNodes: this.workerNodes.reduce(
371 (accumulator, workerNode) =>
f59e1027 372 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
373 0
374 ),
a4e07f72 375 executedTasks: this.workerNodes.reduce(
6b27d407 376 (accumulator, workerNode) =>
f59e1027 377 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
378 0
379 ),
380 executingTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
f59e1027 382 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
383 0
384 ),
daf86646
JB
385 ...(this.opts.enableTasksQueue === true && {
386 queuedTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
388 accumulator + workerNode.usage.tasks.queued,
389 0
390 )
391 }),
392 ...(this.opts.enableTasksQueue === true && {
393 maxQueuedTasks: this.workerNodes.reduce(
394 (accumulator, workerNode) =>
395 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
396 0
397 )
398 }),
a1763c54
JB
399 ...(this.opts.enableTasksQueue === true && {
400 backPressure: this.hasBackPressure()
401 }),
68cbdc84
JB
402 ...(this.opts.enableTasksQueue === true && {
403 stolenTasks: this.workerNodes.reduce(
404 (accumulator, workerNode) =>
405 accumulator + workerNode.usage.tasks.stolen,
406 0
407 )
408 }),
a4e07f72
JB
409 failedTasks: this.workerNodes.reduce(
410 (accumulator, workerNode) =>
f59e1027 411 accumulator + workerNode.usage.tasks.failed,
a4e07f72 412 0
1dcf8b7b
JB
413 ),
414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
415 .runTime.aggregate && {
416 runTime: {
98e72cda
JB
417 minimum: round(
418 Math.min(
419 ...this.workerNodes.map(
8ebe6c30 420 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 421 )
1dcf8b7b
JB
422 )
423 ),
98e72cda
JB
424 maximum: round(
425 Math.max(
426 ...this.workerNodes.map(
8ebe6c30 427 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 428 )
1dcf8b7b 429 )
98e72cda 430 ),
3baa0837
JB
431 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
432 .runTime.average && {
433 average: round(
434 average(
435 this.workerNodes.reduce<number[]>(
436 (accumulator, workerNode) =>
437 accumulator.concat(workerNode.usage.runTime.history),
438 []
439 )
98e72cda 440 )
dc021bcc 441 )
3baa0837 442 }),
98e72cda
JB
443 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
444 .runTime.median && {
445 median: round(
446 median(
3baa0837
JB
447 this.workerNodes.reduce<number[]>(
448 (accumulator, workerNode) =>
449 accumulator.concat(workerNode.usage.runTime.history),
450 []
98e72cda
JB
451 )
452 )
453 )
454 })
1dcf8b7b
JB
455 }
456 }),
457 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
458 .waitTime.aggregate && {
459 waitTime: {
98e72cda
JB
460 minimum: round(
461 Math.min(
462 ...this.workerNodes.map(
8ebe6c30 463 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 464 )
1dcf8b7b
JB
465 )
466 ),
98e72cda
JB
467 maximum: round(
468 Math.max(
469 ...this.workerNodes.map(
8ebe6c30 470 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 471 )
1dcf8b7b 472 )
98e72cda 473 ),
3baa0837
JB
474 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
475 .waitTime.average && {
476 average: round(
477 average(
478 this.workerNodes.reduce<number[]>(
479 (accumulator, workerNode) =>
480 accumulator.concat(workerNode.usage.waitTime.history),
481 []
482 )
98e72cda 483 )
dc021bcc 484 )
3baa0837 485 }),
98e72cda
JB
486 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
487 .waitTime.median && {
488 median: round(
489 median(
3baa0837
JB
490 this.workerNodes.reduce<number[]>(
491 (accumulator, workerNode) =>
492 accumulator.concat(workerNode.usage.waitTime.history),
493 []
98e72cda
JB
494 )
495 )
496 )
497 })
1dcf8b7b
JB
498 }
499 })
6b27d407
JB
500 }
501 }
08f3f44c 502
aa9eede8
JB
503 /**
504 * The pool readiness boolean status.
505 */
2431bdb4
JB
506 private get ready (): boolean {
507 return (
b97d82d8
JB
508 this.workerNodes.reduce(
509 (accumulator, workerNode) =>
510 !workerNode.info.dynamic && workerNode.info.ready
511 ? accumulator + 1
512 : accumulator,
513 0
514 ) >= this.minSize
2431bdb4
JB
515 )
516 }
517
afe0d5bf 518 /**
aa9eede8 519 * The approximate pool utilization.
afe0d5bf
JB
520 *
521 * @returns The pool utilization.
522 */
523 private get utilization (): number {
8e5ca040 524 const poolTimeCapacity =
fe7d90db 525 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
526 const totalTasksRunTime = this.workerNodes.reduce(
527 (accumulator, workerNode) =>
71514351 528 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
529 0
530 )
531 const totalTasksWaitTime = this.workerNodes.reduce(
532 (accumulator, workerNode) =>
71514351 533 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
534 0
535 )
8e5ca040 536 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
537 }
538
8881ae32 539 /**
aa9eede8 540 * The pool type.
8881ae32
JB
541 *
542 * If it is `'dynamic'`, it provides the `max` property.
543 */
544 protected abstract get type (): PoolType
545
184855e6 546 /**
aa9eede8 547 * The worker type.
184855e6
JB
548 */
549 protected abstract get worker (): WorkerType
550
c2ade475 551 /**
aa9eede8 552 * The pool minimum size.
c2ade475 553 */
8735b4e5
JB
554 protected get minSize (): number {
555 return this.numberOfWorkers
556 }
ff733df7
JB
557
558 /**
aa9eede8 559 * The pool maximum size.
ff733df7 560 */
8735b4e5
JB
561 protected get maxSize (): number {
562 return this.max ?? this.numberOfWorkers
563 }
a35560ba 564
6b813701
JB
565 /**
566 * Checks if the worker id sent in the received message from a worker is valid.
567 *
568 * @param message - The received message.
569 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
570 */
21f710aa 571 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
572 if (message.workerId == null) {
573 throw new Error('Worker message received without worker id')
574 } else if (
21f710aa 575 message.workerId != null &&
aad6fb64 576 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
577 ) {
578 throw new Error(
579 `Worker message received from unknown worker '${message.workerId}'`
580 )
581 }
582 }
583
ffcbbad8 584 /**
f06e48d8 585 * Gets the given worker its worker node key.
ffcbbad8
JB
586 *
587 * @param worker - The worker.
f59e1027 588 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 589 */
aad6fb64 590 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 591 return this.workerNodes.findIndex(
8ebe6c30 592 (workerNode) => workerNode.worker === worker
f06e48d8 593 )
bf9549ae
JB
594 }
595
aa9eede8
JB
596 /**
597 * Gets the worker node key given its worker id.
598 *
599 * @param workerId - The worker id.
aad6fb64 600 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 601 */
aad6fb64
JB
602 private getWorkerNodeKeyByWorkerId (workerId: number): number {
603 return this.workerNodes.findIndex(
8ebe6c30 604 (workerNode) => workerNode.info.id === workerId
aad6fb64 605 )
aa9eede8
JB
606 }
607
afc003b2 608 /** @inheritDoc */
a35560ba 609 public setWorkerChoiceStrategy (
59219cbb
JB
610 workerChoiceStrategy: WorkerChoiceStrategy,
611 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 612 ): void {
aee46736 613 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 614 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
615 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
616 this.opts.workerChoiceStrategy
617 )
618 if (workerChoiceStrategyOptions != null) {
619 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
620 }
aa9eede8 621 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 622 workerNode.resetUsage()
9edb9717 623 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 624 }
a20f0ba5
JB
625 }
626
627 /** @inheritDoc */
628 public setWorkerChoiceStrategyOptions (
629 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
630 ): void {
0d80593b 631 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
632 this.opts.workerChoiceStrategyOptions = {
633 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
634 ...workerChoiceStrategyOptions
635 }
a20f0ba5
JB
636 this.workerChoiceStrategyContext.setOptions(
637 this.opts.workerChoiceStrategyOptions
a35560ba
S
638 )
639 }
640
a20f0ba5 641 /** @inheritDoc */
8f52842f
JB
642 public enableTasksQueue (
643 enable: boolean,
644 tasksQueueOptions?: TasksQueueOptions
645 ): void {
a20f0ba5 646 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 647 this.flushTasksQueues()
a20f0ba5
JB
648 }
649 this.opts.enableTasksQueue = enable
8f52842f 650 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
651 }
652
653 /** @inheritDoc */
8f52842f 654 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 655 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
656 this.checkValidTasksQueueOptions(tasksQueueOptions)
657 this.opts.tasksQueueOptions =
658 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 659 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 660 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
661 delete this.opts.tasksQueueOptions
662 }
663 }
664
ff3f866a 665 private setTasksQueueMaxSize (size: number): void {
20c6f652 666 for (const workerNode of this.workerNodes) {
ff3f866a 667 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
668 }
669 }
670
a20f0ba5
JB
671 private buildTasksQueueOptions (
672 tasksQueueOptions: TasksQueueOptions
673 ): TasksQueueOptions {
674 return {
20c6f652 675 ...{
ff3f866a 676 size: Math.pow(this.maxSize, 2),
20c6f652
JB
677 concurrency: 1
678 },
679 ...tasksQueueOptions
a20f0ba5
JB
680 }
681 }
682
c319c66b
JB
683 /**
684 * Whether the pool is full or not.
685 *
686 * The pool filling boolean status.
687 */
dea903a8
JB
688 protected get full (): boolean {
689 return this.workerNodes.length >= this.maxSize
690 }
c2ade475 691
c319c66b
JB
692 /**
693 * Whether the pool is busy or not.
694 *
695 * The pool busyness boolean status.
696 */
697 protected abstract get busy (): boolean
7c0ba920 698
6c6afb84 699 /**
3d76750a 700 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
701 *
702 * @returns Worker nodes busyness boolean status.
703 */
c2ade475 704 protected internalBusy (): boolean {
3d76750a
JB
705 if (this.opts.enableTasksQueue === true) {
706 return (
707 this.workerNodes.findIndex(
8ebe6c30 708 (workerNode) =>
3d76750a
JB
709 workerNode.info.ready &&
710 workerNode.usage.tasks.executing <
711 (this.opts.tasksQueueOptions?.concurrency as number)
712 ) === -1
713 )
714 } else {
715 return (
716 this.workerNodes.findIndex(
8ebe6c30 717 (workerNode) =>
3d76750a
JB
718 workerNode.info.ready && workerNode.usage.tasks.executing === 0
719 ) === -1
720 )
721 }
cb70b19d
JB
722 }
723
90d7d101
JB
724 /** @inheritDoc */
725 public listTaskFunctions (): string[] {
f2dbbf95
JB
726 for (const workerNode of this.workerNodes) {
727 if (
728 Array.isArray(workerNode.info.taskFunctions) &&
729 workerNode.info.taskFunctions.length > 0
730 ) {
731 return workerNode.info.taskFunctions
732 }
90d7d101 733 }
f2dbbf95 734 return []
90d7d101
JB
735 }
736
afc003b2 737 /** @inheritDoc */
7d91a8cd
JB
738 public async execute (
739 data?: Data,
740 name?: string,
741 transferList?: TransferListItem[]
742 ): Promise<Response> {
52b71763 743 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
744 if (!this.started) {
745 reject(new Error('Cannot execute a task on destroyed pool'))
9d2d0da1 746 return
15b176e0 747 }
7d91a8cd
JB
748 if (name != null && typeof name !== 'string') {
749 reject(new TypeError('name argument must be a string'))
9d2d0da1 750 return
7d91a8cd 751 }
90d7d101
JB
752 if (
753 name != null &&
754 typeof name === 'string' &&
755 name.trim().length === 0
756 ) {
f58b60b9 757 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 758 return
90d7d101 759 }
b558f6b5
JB
760 if (transferList != null && !Array.isArray(transferList)) {
761 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 762 return
b558f6b5
JB
763 }
764 const timestamp = performance.now()
765 const workerNodeKey = this.chooseWorkerNode()
94407def 766 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
501aea93 767 const task: Task<Data> = {
52b71763
JB
768 name: name ?? DEFAULT_TASK_NAME,
769 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
770 data: data ?? ({} as Data),
7d91a8cd 771 transferList,
52b71763 772 timestamp,
a5d15204 773 workerId: workerInfo.id as number,
7629bdf1 774 taskId: randomUUID()
52b71763 775 }
7629bdf1 776 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
777 resolve,
778 reject,
501aea93 779 workerNodeKey
2e81254d 780 })
52b71763 781 if (
4e377863
JB
782 this.opts.enableTasksQueue === false ||
783 (this.opts.enableTasksQueue === true &&
033f1776 784 this.tasksQueueSize(workerNodeKey) === 0 &&
4e377863 785 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 786 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 787 ) {
501aea93 788 this.executeTask(workerNodeKey, task)
4e377863
JB
789 } else {
790 this.enqueueTask(workerNodeKey, task)
52b71763 791 }
2e81254d 792 })
280c2a77 793 }
c97c7edb 794
afc003b2 795 /** @inheritDoc */
c97c7edb 796 public async destroy (): Promise<void> {
1fbcaa7c 797 await Promise.all(
81c02522 798 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 799 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
800 })
801 )
33e6bb4c 802 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 803 this.started = false
c97c7edb
S
804 }
805
1e3214b6
JB
806 protected async sendKillMessageToWorker (
807 workerNodeKey: number,
808 workerId: number
809 ): Promise<void> {
9edb9717 810 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
811 this.registerWorkerMessageListener(workerNodeKey, (message) => {
812 if (message.kill === 'success') {
813 resolve()
814 } else if (message.kill === 'failure') {
e1af34e6 815 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
816 }
817 })
9edb9717 818 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 819 })
1e3214b6
JB
820 }
821
4a6952ff 822 /**
aa9eede8 823 * Terminates the worker node given its worker node key.
4a6952ff 824 *
aa9eede8 825 * @param workerNodeKey - The worker node key.
4a6952ff 826 */
81c02522 827 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 828
729c563d 829 /**
6677a3d3
JB
830 * Setup hook to execute code before worker nodes are created in the abstract constructor.
831 * Can be overridden.
afc003b2
JB
832 *
833 * @virtual
729c563d 834 */
280c2a77 835 protected setupHook (): void {
033f1776 836 /** Intentionally empty */
280c2a77 837 }
c97c7edb 838
729c563d 839 /**
280c2a77
S
840 * Should return whether the worker is the main worker or not.
841 */
842 protected abstract isMain (): boolean
843
844 /**
2e81254d 845 * Hook executed before the worker task execution.
bf9549ae 846 * Can be overridden.
729c563d 847 *
f06e48d8 848 * @param workerNodeKey - The worker node key.
1c6fe997 849 * @param task - The task to execute.
729c563d 850 */
1c6fe997
JB
851 protected beforeTaskExecutionHook (
852 workerNodeKey: number,
853 task: Task<Data>
854 ): void {
94407def
JB
855 if (this.workerNodes[workerNodeKey]?.usage != null) {
856 const workerUsage = this.workerNodes[workerNodeKey].usage
857 ++workerUsage.tasks.executing
858 this.updateWaitTimeWorkerUsage(workerUsage, task)
859 }
860 if (
861 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
862 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
863 task.name as string
864 ) != null
865 ) {
db0e38ee 866 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 867 workerNodeKey
db0e38ee 868 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
869 ++taskFunctionWorkerUsage.tasks.executing
870 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 871 }
c97c7edb
S
872 }
873
c01733f1 874 /**
2e81254d 875 * Hook executed after the worker task execution.
bf9549ae 876 * Can be overridden.
c01733f1 877 *
501aea93 878 * @param workerNodeKey - The worker node key.
38e795c1 879 * @param message - The received message.
c01733f1 880 */
2e81254d 881 protected afterTaskExecutionHook (
501aea93 882 workerNodeKey: number,
2740a743 883 message: MessageValue<Response>
bf9549ae 884 ): void {
94407def
JB
885 if (this.workerNodes[workerNodeKey]?.usage != null) {
886 const workerUsage = this.workerNodes[workerNodeKey].usage
887 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
888 this.updateRunTimeWorkerUsage(workerUsage, message)
889 this.updateEluWorkerUsage(workerUsage, message)
890 }
891 if (
892 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
893 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 894 message.taskPerformance?.name as string
94407def
JB
895 ) != null
896 ) {
db0e38ee 897 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 898 workerNodeKey
db0e38ee 899 ].getTaskFunctionWorkerUsage(
0628755c 900 message.taskPerformance?.name as string
b558f6b5 901 ) as WorkerUsage
db0e38ee
JB
902 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
903 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
904 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
905 }
906 }
907
db0e38ee
JB
908 /**
909 * Whether the worker node shall update its task function worker usage or not.
910 *
911 * @param workerNodeKey - The worker node key.
912 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
913 */
914 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 915 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 916 return (
94407def 917 workerInfo != null &&
a5d15204 918 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 919 workerInfo.taskFunctions.length > 2
b558f6b5 920 )
f1c06930
JB
921 }
922
923 private updateTaskStatisticsWorkerUsage (
924 workerUsage: WorkerUsage,
925 message: MessageValue<Response>
926 ): void {
a4e07f72 927 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
928 if (
929 workerTaskStatistics.executing != null &&
930 workerTaskStatistics.executing > 0
931 ) {
932 --workerTaskStatistics.executing
5bb5be17 933 }
98e72cda
JB
934 if (message.taskError == null) {
935 ++workerTaskStatistics.executed
936 } else {
a4e07f72 937 ++workerTaskStatistics.failed
2740a743 938 }
f8eb0a2a
JB
939 }
940
a4e07f72
JB
941 private updateRunTimeWorkerUsage (
942 workerUsage: WorkerUsage,
f8eb0a2a
JB
943 message: MessageValue<Response>
944 ): void {
dc021bcc
JB
945 if (message.taskError != null) {
946 return
947 }
e4f20deb
JB
948 updateMeasurementStatistics(
949 workerUsage.runTime,
950 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 951 message.taskPerformance?.runTime ?? 0
e4f20deb 952 )
f8eb0a2a
JB
953 }
954
a4e07f72
JB
955 private updateWaitTimeWorkerUsage (
956 workerUsage: WorkerUsage,
1c6fe997 957 task: Task<Data>
f8eb0a2a 958 ): void {
1c6fe997
JB
959 const timestamp = performance.now()
960 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
961 updateMeasurementStatistics(
962 workerUsage.waitTime,
963 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 964 taskWaitTime
e4f20deb 965 )
c01733f1 966 }
967
a4e07f72 968 private updateEluWorkerUsage (
5df69fab 969 workerUsage: WorkerUsage,
62c15a68
JB
970 message: MessageValue<Response>
971 ): void {
dc021bcc
JB
972 if (message.taskError != null) {
973 return
974 }
008512c7
JB
975 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
976 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
977 updateMeasurementStatistics(
978 workerUsage.elu.active,
008512c7 979 eluTaskStatisticsRequirements,
dc021bcc 980 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
981 )
982 updateMeasurementStatistics(
983 workerUsage.elu.idle,
008512c7 984 eluTaskStatisticsRequirements,
dc021bcc 985 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 986 )
008512c7 987 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 988 if (message.taskPerformance?.elu != null) {
f7510105
JB
989 if (workerUsage.elu.utilization != null) {
990 workerUsage.elu.utilization =
991 (workerUsage.elu.utilization +
992 message.taskPerformance.elu.utilization) /
993 2
994 } else {
995 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
996 }
62c15a68
JB
997 }
998 }
999 }
1000
280c2a77 1001 /**
f06e48d8 1002 * Chooses a worker node for the next task.
280c2a77 1003 *
6c6afb84 1004 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1005 *
aa9eede8 1006 * @returns The chosen worker node key
280c2a77 1007 */
6c6afb84 1008 private chooseWorkerNode (): number {
930dcf12 1009 if (this.shallCreateDynamicWorker()) {
aa9eede8 1010 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1011 if (
b1aae695 1012 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1013 ) {
aa9eede8 1014 return workerNodeKey
6c6afb84 1015 }
17393ac8 1016 }
930dcf12
JB
1017 return this.workerChoiceStrategyContext.execute()
1018 }
1019
6c6afb84
JB
1020 /**
1021 * Conditions for dynamic worker creation.
1022 *
1023 * @returns Whether to create a dynamic worker or not.
1024 */
1025 private shallCreateDynamicWorker (): boolean {
930dcf12 1026 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1027 }
1028
280c2a77 1029 /**
aa9eede8 1030 * Sends a message to worker given its worker node key.
280c2a77 1031 *
aa9eede8 1032 * @param workerNodeKey - The worker node key.
38e795c1 1033 * @param message - The message.
7d91a8cd 1034 * @param transferList - The optional array of transferable objects.
280c2a77
S
1035 */
1036 protected abstract sendToWorker (
aa9eede8 1037 workerNodeKey: number,
7d91a8cd
JB
1038 message: MessageValue<Data>,
1039 transferList?: TransferListItem[]
280c2a77
S
1040 ): void
1041
729c563d 1042 /**
41344292 1043 * Creates a new worker.
6c6afb84
JB
1044 *
1045 * @returns Newly created worker.
729c563d 1046 */
280c2a77 1047 protected abstract createWorker (): Worker
c97c7edb 1048
4a6952ff 1049 /**
aa9eede8 1050 * Creates a new, completely set up worker node.
4a6952ff 1051 *
aa9eede8 1052 * @returns New, completely set up worker node key.
4a6952ff 1053 */
aa9eede8 1054 protected createAndSetupWorkerNode (): number {
bdacc2d2 1055 const worker = this.createWorker()
280c2a77 1056
fd04474e 1057 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1058 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1059 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1060 worker.on('error', (error) => {
aad6fb64 1061 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1062 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1063 workerInfo.ready = false
0dc838e3 1064 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1065 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1066 if (
1067 this.opts.restartWorkerOnError === true &&
1068 !this.starting &&
1069 this.started
1070 ) {
9b106837 1071 if (workerInfo.dynamic) {
aa9eede8 1072 this.createAndSetupDynamicWorkerNode()
8a1260a3 1073 } else {
aa9eede8 1074 this.createAndSetupWorkerNode()
8a1260a3 1075 }
5baee0d7 1076 }
19dbc45b 1077 if (this.opts.enableTasksQueue === true) {
9b106837 1078 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1079 }
5baee0d7 1080 })
a35560ba 1081 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1082 worker.once('exit', () => {
f06e48d8 1083 this.removeWorkerNode(worker)
a974afa6 1084 })
280c2a77 1085
aa9eede8 1086 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1087
aa9eede8 1088 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1089
aa9eede8 1090 return workerNodeKey
c97c7edb 1091 }
be0676b3 1092
930dcf12 1093 /**
aa9eede8 1094 * Creates a new, completely set up dynamic worker node.
930dcf12 1095 *
aa9eede8 1096 * @returns New, completely set up dynamic worker node key.
930dcf12 1097 */
aa9eede8
JB
1098 protected createAndSetupDynamicWorkerNode (): number {
1099 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1100 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1101 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1102 message.workerId
aad6fb64 1103 )
aa9eede8 1104 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1105 // Kill message received from worker
930dcf12
JB
1106 if (
1107 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1108 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1109 ((this.opts.enableTasksQueue === false &&
aa9eede8 1110 workerUsage.tasks.executing === 0) ||
7b56f532 1111 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1112 workerUsage.tasks.executing === 0 &&
1113 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1114 ) {
5270d253
JB
1115 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1116 this.emitter?.emit(PoolEvents.error, error)
1117 })
930dcf12
JB
1118 }
1119 })
94407def 1120 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1121 this.sendToWorker(workerNodeKey, {
b0a4db63 1122 checkActive: true,
21f710aa
JB
1123 workerId: workerInfo.id as number
1124 })
b5e113f6 1125 workerInfo.dynamic = true
b1aae695
JB
1126 if (
1127 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1128 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1129 ) {
b5e113f6
JB
1130 workerInfo.ready = true
1131 }
33e6bb4c 1132 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1133 return workerNodeKey
930dcf12
JB
1134 }
1135
a2ed5053 1136 /**
aa9eede8 1137 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1138 *
aa9eede8 1139 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1140 * @param listener - The message listener callback.
1141 */
85aeb3f3
JB
1142 protected abstract registerWorkerMessageListener<
1143 Message extends Data | Response
aa9eede8
JB
1144 >(
1145 workerNodeKey: number,
1146 listener: (message: MessageValue<Message>) => void
1147 ): void
a2ed5053
JB
1148
1149 /**
aa9eede8 1150 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1151 * Can be overridden.
1152 *
aa9eede8 1153 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1154 */
aa9eede8 1155 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1156 // Listen to worker messages.
aa9eede8 1157 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1158 // Send the startup message to worker.
aa9eede8 1159 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1160 // Send the statistics message to worker.
1161 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1162 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1163 this.workerNodes[workerNodeKey].onEmptyQueue =
1164 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1165 this.workerNodes[workerNodeKey].onBackPressure =
1166 this.tasksStealingOnBackPressure.bind(this)
1167 }
d2c73f82
JB
1168 }
1169
85aeb3f3 1170 /**
aa9eede8
JB
1171 * Sends the startup message to worker given its worker node key.
1172 *
1173 * @param workerNodeKey - The worker node key.
1174 */
1175 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1176
1177 /**
9edb9717 1178 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1179 *
aa9eede8 1180 * @param workerNodeKey - The worker node key.
85aeb3f3 1181 */
9edb9717 1182 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1183 this.sendToWorker(workerNodeKey, {
1184 statistics: {
1185 runTime:
1186 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1187 .runTime.aggregate,
1188 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1189 .elu.aggregate
1190 },
94407def 1191 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1192 })
1193 }
a2ed5053
JB
1194
1195 private redistributeQueuedTasks (workerNodeKey: number): void {
1196 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1197 let destinationWorkerNodeKey!: number
a2ed5053 1198 let minQueuedTasks = Infinity
a6b3272b 1199 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3 1200 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
58baffd3
JB
1201 if (workerNode.usage.tasks.queued === 0) {
1202 destinationWorkerNodeKey = workerNodeId
1203 break
1204 }
1205 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1206 minQueuedTasks = workerNode.usage.tasks.queued
1207 destinationWorkerNodeKey = workerNodeId
1208 }
a2ed5053
JB
1209 }
1210 }
0bc68267 1211 if (destinationWorkerNodeKey != null) {
033f1776 1212 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1213 const task = {
1214 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1215 workerId: destinationWorkerNode.info.id as number
0bc68267 1216 }
033f1776
JB
1217 if (
1218 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1219 destinationWorkerNode.usage.tasks.executing <
1220 (this.opts.tasksQueueOptions?.concurrency as number)
1221 ) {
0bc68267
JB
1222 this.executeTask(destinationWorkerNodeKey, task)
1223 } else {
1224 this.enqueueTask(destinationWorkerNodeKey, task)
1225 }
dd951876
JB
1226 }
1227 }
1228 }
1229
1230 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1231 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1232 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1233 const workerNodes = this.workerNodes
a6b3272b 1234 .slice()
dd951876
JB
1235 .sort(
1236 (workerNodeA, workerNodeB) =>
1237 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1238 )
dd951876 1239 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1240 if (sourceWorkerNode.usage.tasks.queued === 0) {
1241 break
1242 }
a6b3272b
JB
1243 if (
1244 sourceWorkerNode.info.ready &&
1245 sourceWorkerNode.info.id !== workerId &&
1246 sourceWorkerNode.usage.tasks.queued > 0
1247 ) {
1248 const task = {
1249 ...(sourceWorkerNode.popTask() as Task<Data>),
1250 workerId: destinationWorkerNode.info.id as number
1251 }
dd951876 1252 if (
2cb4ff45 1253 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
a6b3272b 1254 destinationWorkerNode.usage.tasks.executing <
033f1776 1255 (this.opts.tasksQueueOptions?.concurrency as number)
dd951876 1256 ) {
2cb4ff45
JB
1257 this.executeTask(destinationWorkerNodeKey, task)
1258 } else {
1259 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876 1260 }
b2559a74
JB
1261 if (destinationWorkerNode?.usage != null) {
1262 ++destinationWorkerNode.usage.tasks.stolen
1263 }
1264 if (
1265 this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
1266 destinationWorkerNode.getTaskFunctionWorkerUsage(
1267 task.name as string
1268 ) != null
1269 ) {
68cbdc84
JB
1270 const taskFunctionWorkerUsage =
1271 destinationWorkerNode.getTaskFunctionWorkerUsage(
1272 task.name as string
1273 ) as WorkerUsage
1274 ++taskFunctionWorkerUsage.tasks.stolen
1275 }
dd951876 1276 break
72695f86
JB
1277 }
1278 }
1279 }
1280
1281 private tasksStealingOnBackPressure (workerId: number): void {
68dbcdc0
JB
1282 if ((this.opts.tasksQueueOptions?.size as number) <= 1) {
1283 return
1284 }
72695f86
JB
1285 const sourceWorkerNode =
1286 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1287 const workerNodes = this.workerNodes
a6b3272b 1288 .slice()
72695f86
JB
1289 .sort(
1290 (workerNodeA, workerNodeB) =>
1291 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1292 )
1293 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1294 if (
0bc68267 1295 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1296 workerNode.info.ready &&
1297 workerNode.info.id !== workerId &&
0bc68267
JB
1298 workerNode.usage.tasks.queued <
1299 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1300 ) {
dd951876
JB
1301 const task = {
1302 ...(sourceWorkerNode.popTask() as Task<Data>),
1303 workerId: workerNode.info.id as number
1304 }
8b80810a 1305 if (this.tasksQueueSize(workerNodeKey) === 0) {
dd951876 1306 this.executeTask(workerNodeKey, task)
4de3d785 1307 } else {
dd951876 1308 this.enqueueTask(workerNodeKey, task)
4de3d785 1309 }
b2559a74
JB
1310 if (workerNode?.usage != null) {
1311 ++workerNode.usage.tasks.stolen
1312 }
1313 if (
1314 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1315 workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
1316 ) {
68cbdc84
JB
1317 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1318 task.name as string
1319 ) as WorkerUsage
1320 ++taskFunctionWorkerUsage.tasks.stolen
1321 }
10ecf8fd 1322 }
a2ed5053
JB
1323 }
1324 }
1325
be0676b3 1326 /**
aa9eede8 1327 * This method is the listener registered for each worker message.
be0676b3 1328 *
bdacc2d2 1329 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1330 */
1331 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1332 return (message) => {
21f710aa 1333 this.checkMessageWorkerId(message)
a5d15204 1334 if (message.ready != null && message.taskFunctions != null) {
81c02522 1335 // Worker ready response received from worker
10e2aa7e 1336 this.handleWorkerReadyResponse(message)
7629bdf1 1337 } else if (message.taskId != null) {
81c02522 1338 // Task execution response received from worker
6b272951 1339 this.handleTaskExecutionResponse(message)
90d7d101
JB
1340 } else if (message.taskFunctions != null) {
1341 // Task functions message received from worker
94407def
JB
1342 (
1343 this.getWorkerInfo(
1344 this.getWorkerNodeKeyByWorkerId(message.workerId)
1345 ) as WorkerInfo
b558f6b5 1346 ).taskFunctions = message.taskFunctions
6b272951
JB
1347 }
1348 }
1349 }
1350
10e2aa7e 1351 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1352 if (message.ready === false) {
1353 throw new Error(`Worker ${message.workerId} failed to initialize`)
1354 }
a5d15204 1355 const workerInfo = this.getWorkerInfo(
aad6fb64 1356 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1357 ) as WorkerInfo
a5d15204
JB
1358 workerInfo.ready = message.ready as boolean
1359 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1360 if (this.emitter != null && this.ready) {
1361 this.emitter.emit(PoolEvents.ready, this.info)
1362 }
6b272951
JB
1363 }
1364
1365 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1366 const { taskId, taskError, data } = message
1367 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1368 if (promiseResponse != null) {
5441aea6
JB
1369 if (taskError != null) {
1370 this.emitter?.emit(PoolEvents.taskError, taskError)
1371 promiseResponse.reject(taskError.message)
6b272951 1372 } else {
5441aea6 1373 promiseResponse.resolve(data as Response)
6b272951 1374 }
501aea93
JB
1375 const workerNodeKey = promiseResponse.workerNodeKey
1376 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1377 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1378 if (
1379 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1380 this.tasksQueueSize(workerNodeKey) > 0 &&
1381 this.workerNodes[workerNodeKey].usage.tasks.executing <
1382 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1383 ) {
1384 this.executeTask(
1385 workerNodeKey,
1386 this.dequeueTask(workerNodeKey) as Task<Data>
1387 )
be0676b3 1388 }
6b272951 1389 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1390 }
be0676b3 1391 }
7c0ba920 1392
a1763c54 1393 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1394 if (this.busy) {
1395 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1396 }
1397 }
1398
1399 private checkAndEmitTaskQueuingEvents (): void {
1400 if (this.hasBackPressure()) {
1401 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1402 }
1403 }
1404
33e6bb4c
JB
1405 private checkAndEmitDynamicWorkerCreationEvents (): void {
1406 if (this.type === PoolTypes.dynamic) {
1407 if (this.full) {
1408 this.emitter?.emit(PoolEvents.full, this.info)
1409 }
1410 }
1411 }
1412
8a1260a3 1413 /**
aa9eede8 1414 * Gets the worker information given its worker node key.
8a1260a3
JB
1415 *
1416 * @param workerNodeKey - The worker node key.
3f09ed9f 1417 * @returns The worker information.
8a1260a3 1418 */
94407def
JB
1419 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1420 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1421 }
1422
a05c10de 1423 /**
b0a4db63 1424 * Adds the given worker in the pool worker nodes.
ea7a90d3 1425 *
38e795c1 1426 * @param worker - The worker.
aa9eede8
JB
1427 * @returns The added worker node key.
1428 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1429 */
b0a4db63 1430 private addWorkerNode (worker: Worker): number {
671d5154
JB
1431 const workerNode = new WorkerNode<Worker, Data>(
1432 worker,
1433 this.worker,
ff3f866a 1434 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1435 )
b97d82d8 1436 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1437 if (this.starting) {
1438 workerNode.info.ready = true
1439 }
aa9eede8 1440 this.workerNodes.push(workerNode)
aad6fb64 1441 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1442 if (workerNodeKey === -1) {
a5d15204 1443 throw new Error('Worker node added not found')
aa9eede8
JB
1444 }
1445 return workerNodeKey
ea7a90d3 1446 }
c923ce56 1447
51fe3d3c 1448 /**
f06e48d8 1449 * Removes the given worker from the pool worker nodes.
51fe3d3c 1450 *
f06e48d8 1451 * @param worker - The worker.
51fe3d3c 1452 */
416fd65c 1453 private removeWorkerNode (worker: Worker): void {
aad6fb64 1454 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1455 if (workerNodeKey !== -1) {
1456 this.workerNodes.splice(workerNodeKey, 1)
1457 this.workerChoiceStrategyContext.remove(workerNodeKey)
1458 }
51fe3d3c 1459 }
adc3c320 1460
e2b31e32
JB
1461 /** @inheritDoc */
1462 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1463 return (
e2b31e32
JB
1464 this.opts.enableTasksQueue === true &&
1465 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1466 )
1467 }
1468
1469 private hasBackPressure (): boolean {
1470 return (
1471 this.opts.enableTasksQueue === true &&
1472 this.workerNodes.findIndex(
1473 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1474 ) === -1
9e844245 1475 )
e2b31e32
JB
1476 }
1477
b0a4db63 1478 /**
aa9eede8 1479 * Executes the given task on the worker given its worker node key.
b0a4db63 1480 *
aa9eede8 1481 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1482 * @param task - The task to execute.
1483 */
2e81254d 1484 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1485 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1486 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1487 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1488 }
1489
f9f00b5f 1490 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1491 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1492 this.checkAndEmitTaskQueuingEvents()
1493 return tasksQueueSize
adc3c320
JB
1494 }
1495
416fd65c 1496 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1497 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1498 }
1499
416fd65c 1500 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1501 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1502 }
1503
81c02522 1504 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1505 while (this.tasksQueueSize(workerNodeKey) > 0) {
1506 this.executeTask(
1507 workerNodeKey,
1508 this.dequeueTask(workerNodeKey) as Task<Data>
1509 )
ff733df7 1510 }
4b628b48 1511 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1512 }
1513
ef41a6e6
JB
1514 private flushTasksQueues (): void {
1515 for (const [workerNodeKey] of this.workerNodes.entries()) {
1516 this.flushTasksQueue(workerNodeKey)
1517 }
1518 }
c97c7edb 1519}