test: fix continuous benchmark pool implementation name
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
59317253 15 isKillBehavior,
0d80593b 16 isPlainObject,
90d6701c 17 max,
afe0d5bf 18 median,
90d6701c 19 min,
e4f20deb
JB
20 round,
21 updateMeasurementStatistics
bbeadd16 22} from '../utils'
59317253 23import { KillBehaviors } from '../worker/worker-options'
c4855468 24import {
65d7a1c9 25 type IPool,
7c5a1080 26 PoolEmitter,
c4855468 27 PoolEvents,
6b27d407 28 type PoolInfo,
c4855468 29 type PoolOptions,
6b27d407
JB
30 type PoolType,
31 PoolTypes,
4b628b48 32 type TasksQueueOptions
c4855468 33} from './pool'
bbfa38a2
JB
34import type {
35 IWorker,
36 IWorkerNode,
37 WorkerInfo,
38 WorkerType,
39 WorkerUsage
e102732c 40} from './worker'
a35560ba 41import {
008512c7 42 type MeasurementStatisticsRequirements,
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
bdaf31cd
JB
47} from './selection-strategies/selection-strategies-types'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 49import { version } from './version'
4b628b48 50import { WorkerNode } from './worker-node'
23ccf9d7 51
729c563d 52/**
ea7a90d3 53 * Base class that implements some shared logic for all poolifier pools.
729c563d 54 *
38e795c1 55 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
56 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
57 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 58 */
c97c7edb 59export abstract class AbstractPool<
f06e48d8 60 Worker extends IWorker,
d3c8a1a8
S
61 Data = unknown,
62 Response = unknown
c4855468 63> implements IPool<Worker, Data, Response> {
afc003b2 64 /** @inheritDoc */
4b628b48 65 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 66
afc003b2 67 /** @inheritDoc */
7c0ba920
JB
68 public readonly emitter?: PoolEmitter
69
be0676b3 70 /**
52b71763 71 * The task execution response promise map.
be0676b3 72 *
2740a743 73 * - `key`: The message id of each submitted task.
a3445496 74 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 75 *
a3445496 76 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 77 */
501aea93
JB
78 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
79 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 80
a35560ba 81 /**
51fe3d3c 82 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
83 */
84 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
85 Worker,
86 Data,
87 Response
a35560ba
S
88 >
89
8735b4e5
JB
90 /**
91 * Dynamic pool maximum size property placeholder.
92 */
93 protected readonly max?: number
94
15b176e0
JB
95 /**
96 * Whether the pool is started or not.
97 */
98 private started: boolean
47352846
JB
99 /**
100 * Whether the pool is starting or not.
101 */
102 private starting: boolean
afe0d5bf
JB
103 /**
104 * The start timestamp of the pool.
105 */
106 private readonly startTimestamp
107
729c563d
S
108 /**
109 * Constructs a new poolifier pool.
110 *
38e795c1 111 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 112 * @param filePath - Path to the worker file.
38e795c1 113 * @param opts - Options for the pool.
729c563d 114 */
c97c7edb 115 public constructor (
b4213b7f
JB
116 protected readonly numberOfWorkers: number,
117 protected readonly filePath: string,
118 protected readonly opts: PoolOptions<Worker>
c97c7edb 119 ) {
78cea37e 120 if (!this.isMain()) {
04f45163 121 throw new Error(
8c6d4acf 122 'Cannot start a pool from a worker with the same type as the pool'
04f45163 123 )
c97c7edb 124 }
8d3782fa 125 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 126 this.checkFilePath(this.filePath)
7c0ba920 127 this.checkPoolOptions(this.opts)
1086026a 128
7254e419
JB
129 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
130 this.executeTask = this.executeTask.bind(this)
131 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 132
6bd72cd0 133 if (this.opts.enableEvents === true) {
7c0ba920
JB
134 this.emitter = new PoolEmitter()
135 }
d59df138
JB
136 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
137 Worker,
138 Data,
139 Response
da309861
JB
140 >(
141 this,
142 this.opts.workerChoiceStrategy,
143 this.opts.workerChoiceStrategyOptions
144 )
b6b32453
JB
145
146 this.setupHook()
147
47352846 148 this.started = false
075e51d1 149 this.starting = false
47352846
JB
150 if (this.opts.startWorkers === true) {
151 this.start()
152 }
afe0d5bf
JB
153
154 this.startTimestamp = performance.now()
c97c7edb
S
155 }
156
a35560ba 157 private checkFilePath (filePath: string): void {
ffcbbad8
JB
158 if (
159 filePath == null ||
3d6dd312 160 typeof filePath !== 'string' ||
ffcbbad8
JB
161 (typeof filePath === 'string' && filePath.trim().length === 0)
162 ) {
c510fea7
APA
163 throw new Error('Please specify a file with a worker implementation')
164 }
3d6dd312
JB
165 if (!existsSync(filePath)) {
166 throw new Error(`Cannot find the worker file '${filePath}'`)
167 }
c510fea7
APA
168 }
169
8d3782fa
JB
170 private checkNumberOfWorkers (numberOfWorkers: number): void {
171 if (numberOfWorkers == null) {
172 throw new Error(
173 'Cannot instantiate a pool without specifying the number of workers'
174 )
78cea37e 175 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 176 throw new TypeError(
0d80593b 177 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
178 )
179 } else if (numberOfWorkers < 0) {
473c717a 180 throw new RangeError(
8d3782fa
JB
181 'Cannot instantiate a pool with a negative number of workers'
182 )
6b27d407 183 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
184 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
185 }
186 }
187
188 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 189 if (this.type === PoolTypes.dynamic) {
a5ed75b7 190 if (max == null) {
e695d66f 191 throw new TypeError(
a5ed75b7
JB
192 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
193 )
194 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
195 throw new TypeError(
196 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
197 )
198 } else if (min > max) {
079de991
JB
199 throw new RangeError(
200 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
201 )
b97d82d8 202 } else if (max === 0) {
079de991 203 throw new RangeError(
d640b48b 204 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
205 )
206 } else if (min === max) {
207 throw new RangeError(
208 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
209 )
210 }
8d3782fa
JB
211 }
212 }
213
7c0ba920 214 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 215 if (isPlainObject(opts)) {
47352846 216 this.opts.startWorkers = opts.startWorkers ?? true
0d80593b
JB
217 this.opts.workerChoiceStrategy =
218 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
219 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
220 this.opts.workerChoiceStrategyOptions = {
221 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
222 ...opts.workerChoiceStrategyOptions
223 }
49be33fe
JB
224 this.checkValidWorkerChoiceStrategyOptions(
225 this.opts.workerChoiceStrategyOptions
226 )
1f68cede 227 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
228 this.opts.enableEvents = opts.enableEvents ?? true
229 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
230 if (this.opts.enableTasksQueue) {
231 this.checkValidTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
235 opts.tasksQueueOptions as TasksQueueOptions
236 )
237 }
238 } else {
239 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 240 }
aee46736
JB
241 }
242
243 private checkValidWorkerChoiceStrategy (
244 workerChoiceStrategy: WorkerChoiceStrategy
245 ): void {
246 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 247 throw new Error(
aee46736 248 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
249 )
250 }
7c0ba920
JB
251 }
252
0d80593b
JB
253 private checkValidWorkerChoiceStrategyOptions (
254 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
255 ): void {
256 if (!isPlainObject(workerChoiceStrategyOptions)) {
257 throw new TypeError(
258 'Invalid worker choice strategy options: must be a plain object'
259 )
260 }
8990357d 261 if (
8c0b113f
JB
262 workerChoiceStrategyOptions.retries != null &&
263 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
264 ) {
265 throw new TypeError(
8c0b113f 266 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
267 )
268 }
269 if (
8c0b113f
JB
270 workerChoiceStrategyOptions.retries != null &&
271 workerChoiceStrategyOptions.retries < 0
8990357d
JB
272 ) {
273 throw new RangeError(
8c0b113f 274 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
275 )
276 }
49be33fe
JB
277 if (
278 workerChoiceStrategyOptions.weights != null &&
6b27d407 279 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
280 ) {
281 throw new Error(
282 'Invalid worker choice strategy options: must have a weight for each worker node'
283 )
284 }
f0d7f803
JB
285 if (
286 workerChoiceStrategyOptions.measurement != null &&
287 !Object.values(Measurements).includes(
288 workerChoiceStrategyOptions.measurement
289 )
290 ) {
291 throw new Error(
292 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
293 )
294 }
0d80593b
JB
295 }
296
a20f0ba5 297 private checkValidTasksQueueOptions (
76d91ea0 298 tasksQueueOptions: TasksQueueOptions
a20f0ba5 299 ): void {
0d80593b
JB
300 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
301 throw new TypeError('Invalid tasks queue options: must be a plain object')
302 }
f0d7f803 303 if (
b7d085c4
JB
304 tasksQueueOptions?.concurrency != null &&
305 !Number.isSafeInteger(tasksQueueOptions?.concurrency)
f0d7f803
JB
306 ) {
307 throw new TypeError(
20c6f652 308 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
309 )
310 }
311 if (
b7d085c4
JB
312 tasksQueueOptions?.concurrency != null &&
313 tasksQueueOptions?.concurrency <= 0
f0d7f803 314 ) {
e695d66f 315 throw new RangeError(
b7d085c4 316 `Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
20c6f652
JB
317 )
318 }
20c6f652 319 if (
b7d085c4
JB
320 tasksQueueOptions?.size != null &&
321 !Number.isSafeInteger(tasksQueueOptions?.size)
20c6f652 322 ) {
ff3f866a 323 throw new TypeError(
68dbcdc0 324 'Invalid worker node tasks queue size: must be an integer'
ff3f866a
JB
325 )
326 }
b7d085c4 327 if (tasksQueueOptions?.size != null && tasksQueueOptions?.size <= 0) {
20c6f652 328 throw new RangeError(
b7d085c4 329 `Invalid worker node tasks queue size: ${tasksQueueOptions?.size} is a negative integer or zero`
a20f0ba5
JB
330 )
331 }
332 }
333
08f3f44c 334 /** @inheritDoc */
6b27d407
JB
335 public get info (): PoolInfo {
336 return {
23ccf9d7 337 version,
6b27d407 338 type: this.type,
184855e6 339 worker: this.worker,
47352846 340 started: this.started,
2431bdb4
JB
341 ready: this.ready,
342 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
343 minSize: this.minSize,
344 maxSize: this.maxSize,
c05f0d50
JB
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate &&
1305e9a8
JB
347 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
348 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
349 workerNodes: this.workerNodes.length,
350 idleWorkerNodes: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
f59e1027 352 workerNode.usage.tasks.executing === 0
a4e07f72
JB
353 ? accumulator + 1
354 : accumulator,
6b27d407
JB
355 0
356 ),
357 busyWorkerNodes: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
f59e1027 359 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
360 0
361 ),
a4e07f72 362 executedTasks: this.workerNodes.reduce(
6b27d407 363 (accumulator, workerNode) =>
f59e1027 364 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
365 0
366 ),
367 executingTasks: this.workerNodes.reduce(
368 (accumulator, workerNode) =>
f59e1027 369 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
370 0
371 ),
daf86646
JB
372 ...(this.opts.enableTasksQueue === true && {
373 queuedTasks: this.workerNodes.reduce(
374 (accumulator, workerNode) =>
375 accumulator + workerNode.usage.tasks.queued,
376 0
377 )
378 }),
379 ...(this.opts.enableTasksQueue === true && {
380 maxQueuedTasks: this.workerNodes.reduce(
381 (accumulator, workerNode) =>
382 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
383 0
384 )
385 }),
a1763c54
JB
386 ...(this.opts.enableTasksQueue === true && {
387 backPressure: this.hasBackPressure()
388 }),
68cbdc84
JB
389 ...(this.opts.enableTasksQueue === true && {
390 stolenTasks: this.workerNodes.reduce(
391 (accumulator, workerNode) =>
392 accumulator + workerNode.usage.tasks.stolen,
393 0
394 )
395 }),
a4e07f72
JB
396 failedTasks: this.workerNodes.reduce(
397 (accumulator, workerNode) =>
f59e1027 398 accumulator + workerNode.usage.tasks.failed,
a4e07f72 399 0
1dcf8b7b
JB
400 ),
401 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
402 .runTime.aggregate && {
403 runTime: {
98e72cda 404 minimum: round(
90d6701c 405 min(
98e72cda 406 ...this.workerNodes.map(
041dc05b 407 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 408 )
1dcf8b7b
JB
409 )
410 ),
98e72cda 411 maximum: round(
90d6701c 412 max(
98e72cda 413 ...this.workerNodes.map(
041dc05b 414 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 415 )
1dcf8b7b 416 )
98e72cda 417 ),
3baa0837
JB
418 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
419 .runTime.average && {
420 average: round(
421 average(
422 this.workerNodes.reduce<number[]>(
423 (accumulator, workerNode) =>
424 accumulator.concat(workerNode.usage.runTime.history),
425 []
426 )
98e72cda 427 )
dc021bcc 428 )
3baa0837 429 }),
98e72cda
JB
430 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
431 .runTime.median && {
432 median: round(
433 median(
3baa0837
JB
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.runTime.history),
437 []
98e72cda
JB
438 )
439 )
440 )
441 })
1dcf8b7b
JB
442 }
443 }),
444 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
445 .waitTime.aggregate && {
446 waitTime: {
98e72cda 447 minimum: round(
90d6701c 448 min(
98e72cda 449 ...this.workerNodes.map(
041dc05b 450 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 451 )
1dcf8b7b
JB
452 )
453 ),
98e72cda 454 maximum: round(
90d6701c 455 max(
98e72cda 456 ...this.workerNodes.map(
041dc05b 457 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 458 )
1dcf8b7b 459 )
98e72cda 460 ),
3baa0837
JB
461 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
462 .waitTime.average && {
463 average: round(
464 average(
465 this.workerNodes.reduce<number[]>(
466 (accumulator, workerNode) =>
467 accumulator.concat(workerNode.usage.waitTime.history),
468 []
469 )
98e72cda 470 )
dc021bcc 471 )
3baa0837 472 }),
98e72cda
JB
473 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
474 .waitTime.median && {
475 median: round(
476 median(
3baa0837
JB
477 this.workerNodes.reduce<number[]>(
478 (accumulator, workerNode) =>
479 accumulator.concat(workerNode.usage.waitTime.history),
480 []
98e72cda
JB
481 )
482 )
483 )
484 })
1dcf8b7b
JB
485 }
486 })
6b27d407
JB
487 }
488 }
08f3f44c 489
aa9eede8
JB
490 /**
491 * The pool readiness boolean status.
492 */
2431bdb4
JB
493 private get ready (): boolean {
494 return (
b97d82d8
JB
495 this.workerNodes.reduce(
496 (accumulator, workerNode) =>
497 !workerNode.info.dynamic && workerNode.info.ready
498 ? accumulator + 1
499 : accumulator,
500 0
501 ) >= this.minSize
2431bdb4
JB
502 )
503 }
504
afe0d5bf 505 /**
aa9eede8 506 * The approximate pool utilization.
afe0d5bf
JB
507 *
508 * @returns The pool utilization.
509 */
510 private get utilization (): number {
8e5ca040 511 const poolTimeCapacity =
fe7d90db 512 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
513 const totalTasksRunTime = this.workerNodes.reduce(
514 (accumulator, workerNode) =>
71514351 515 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
516 0
517 )
518 const totalTasksWaitTime = this.workerNodes.reduce(
519 (accumulator, workerNode) =>
71514351 520 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
521 0
522 )
8e5ca040 523 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
524 }
525
8881ae32 526 /**
aa9eede8 527 * The pool type.
8881ae32
JB
528 *
529 * If it is `'dynamic'`, it provides the `max` property.
530 */
531 protected abstract get type (): PoolType
532
184855e6 533 /**
aa9eede8 534 * The worker type.
184855e6
JB
535 */
536 protected abstract get worker (): WorkerType
537
c2ade475 538 /**
aa9eede8 539 * The pool minimum size.
c2ade475 540 */
8735b4e5
JB
541 protected get minSize (): number {
542 return this.numberOfWorkers
543 }
ff733df7
JB
544
545 /**
aa9eede8 546 * The pool maximum size.
ff733df7 547 */
8735b4e5
JB
548 protected get maxSize (): number {
549 return this.max ?? this.numberOfWorkers
550 }
a35560ba 551
6b813701
JB
552 /**
553 * Checks if the worker id sent in the received message from a worker is valid.
554 *
555 * @param message - The received message.
556 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
557 */
21f710aa 558 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
559 if (message.workerId == null) {
560 throw new Error('Worker message received without worker id')
561 } else if (
21f710aa 562 message.workerId != null &&
aad6fb64 563 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
564 ) {
565 throw new Error(
566 `Worker message received from unknown worker '${message.workerId}'`
567 )
568 }
569 }
570
ffcbbad8 571 /**
f06e48d8 572 * Gets the given worker its worker node key.
ffcbbad8
JB
573 *
574 * @param worker - The worker.
f59e1027 575 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 576 */
aad6fb64 577 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 578 return this.workerNodes.findIndex(
041dc05b 579 workerNode => workerNode.worker === worker
f06e48d8 580 )
bf9549ae
JB
581 }
582
aa9eede8
JB
583 /**
584 * Gets the worker node key given its worker id.
585 *
586 * @param workerId - The worker id.
aad6fb64 587 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 588 */
aad6fb64
JB
589 private getWorkerNodeKeyByWorkerId (workerId: number): number {
590 return this.workerNodes.findIndex(
041dc05b 591 workerNode => workerNode.info.id === workerId
aad6fb64 592 )
aa9eede8
JB
593 }
594
afc003b2 595 /** @inheritDoc */
a35560ba 596 public setWorkerChoiceStrategy (
59219cbb
JB
597 workerChoiceStrategy: WorkerChoiceStrategy,
598 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 599 ): void {
aee46736 600 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 601 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
602 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
603 this.opts.workerChoiceStrategy
604 )
605 if (workerChoiceStrategyOptions != null) {
606 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
607 }
aa9eede8 608 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 609 workerNode.resetUsage()
9edb9717 610 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 611 }
a20f0ba5
JB
612 }
613
614 /** @inheritDoc */
615 public setWorkerChoiceStrategyOptions (
616 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
617 ): void {
0d80593b 618 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
619 this.opts.workerChoiceStrategyOptions = {
620 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
621 ...workerChoiceStrategyOptions
622 }
a20f0ba5
JB
623 this.workerChoiceStrategyContext.setOptions(
624 this.opts.workerChoiceStrategyOptions
a35560ba
S
625 )
626 }
627
a20f0ba5 628 /** @inheritDoc */
8f52842f
JB
629 public enableTasksQueue (
630 enable: boolean,
631 tasksQueueOptions?: TasksQueueOptions
632 ): void {
a20f0ba5 633 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 634 this.flushTasksQueues()
a20f0ba5
JB
635 }
636 this.opts.enableTasksQueue = enable
8f52842f 637 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
638 }
639
640 /** @inheritDoc */
8f52842f 641 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 642 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
643 this.checkValidTasksQueueOptions(tasksQueueOptions)
644 this.opts.tasksQueueOptions =
645 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 646 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 647 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
648 delete this.opts.tasksQueueOptions
649 }
650 }
651
5b49e864 652 private setTasksQueueSize (size: number): void {
20c6f652 653 for (const workerNode of this.workerNodes) {
ff3f866a 654 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
655 }
656 }
657
a20f0ba5
JB
658 private buildTasksQueueOptions (
659 tasksQueueOptions: TasksQueueOptions
660 ): TasksQueueOptions {
661 return {
20c6f652 662 ...{
ff3f866a 663 size: Math.pow(this.maxSize, 2),
47352846
JB
664 concurrency: 1,
665 tasksStealing: true,
666 tasksStealingOnBackPressure: true
20c6f652
JB
667 },
668 ...tasksQueueOptions
a20f0ba5
JB
669 }
670 }
671
c319c66b
JB
672 /**
673 * Whether the pool is full or not.
674 *
675 * The pool filling boolean status.
676 */
dea903a8
JB
677 protected get full (): boolean {
678 return this.workerNodes.length >= this.maxSize
679 }
c2ade475 680
c319c66b
JB
681 /**
682 * Whether the pool is busy or not.
683 *
684 * The pool busyness boolean status.
685 */
686 protected abstract get busy (): boolean
7c0ba920 687
6c6afb84 688 /**
3d76750a 689 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
690 *
691 * @returns Worker nodes busyness boolean status.
692 */
c2ade475 693 protected internalBusy (): boolean {
3d76750a
JB
694 if (this.opts.enableTasksQueue === true) {
695 return (
696 this.workerNodes.findIndex(
041dc05b 697 workerNode =>
3d76750a
JB
698 workerNode.info.ready &&
699 workerNode.usage.tasks.executing <
700 (this.opts.tasksQueueOptions?.concurrency as number)
701 ) === -1
702 )
703 } else {
704 return (
705 this.workerNodes.findIndex(
041dc05b 706 workerNode =>
3d76750a
JB
707 workerNode.info.ready && workerNode.usage.tasks.executing === 0
708 ) === -1
709 )
710 }
cb70b19d
JB
711 }
712
90d7d101
JB
713 /** @inheritDoc */
714 public listTaskFunctions (): string[] {
f2dbbf95
JB
715 for (const workerNode of this.workerNodes) {
716 if (
717 Array.isArray(workerNode.info.taskFunctions) &&
718 workerNode.info.taskFunctions.length > 0
719 ) {
720 return workerNode.info.taskFunctions
721 }
90d7d101 722 }
f2dbbf95 723 return []
90d7d101
JB
724 }
725
375f7504
JB
726 private shallExecuteTask (workerNodeKey: number): boolean {
727 return (
728 this.tasksQueueSize(workerNodeKey) === 0 &&
729 this.workerNodes[workerNodeKey].usage.tasks.executing <
730 (this.opts.tasksQueueOptions?.concurrency as number)
731 )
732 }
733
afc003b2 734 /** @inheritDoc */
7d91a8cd
JB
735 public async execute (
736 data?: Data,
737 name?: string,
738 transferList?: TransferListItem[]
739 ): Promise<Response> {
52b71763 740 return await new Promise<Response>((resolve, reject) => {
15b176e0 741 if (!this.started) {
47352846 742 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 743 return
15b176e0 744 }
7d91a8cd
JB
745 if (name != null && typeof name !== 'string') {
746 reject(new TypeError('name argument must be a string'))
9d2d0da1 747 return
7d91a8cd 748 }
90d7d101
JB
749 if (
750 name != null &&
751 typeof name === 'string' &&
752 name.trim().length === 0
753 ) {
f58b60b9 754 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 755 return
90d7d101 756 }
b558f6b5
JB
757 if (transferList != null && !Array.isArray(transferList)) {
758 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 759 return
b558f6b5
JB
760 }
761 const timestamp = performance.now()
762 const workerNodeKey = this.chooseWorkerNode()
501aea93 763 const task: Task<Data> = {
52b71763
JB
764 name: name ?? DEFAULT_TASK_NAME,
765 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
766 data: data ?? ({} as Data),
7d91a8cd 767 transferList,
52b71763 768 timestamp,
1a28f967 769 workerId: this.getWorkerInfo(workerNodeKey).id as number,
7629bdf1 770 taskId: randomUUID()
52b71763 771 }
7629bdf1 772 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
773 resolve,
774 reject,
501aea93 775 workerNodeKey
2e81254d 776 })
52b71763 777 if (
4e377863
JB
778 this.opts.enableTasksQueue === false ||
779 (this.opts.enableTasksQueue === true &&
375f7504 780 this.shallExecuteTask(workerNodeKey))
52b71763 781 ) {
501aea93 782 this.executeTask(workerNodeKey, task)
4e377863
JB
783 } else {
784 this.enqueueTask(workerNodeKey, task)
52b71763 785 }
2e81254d 786 })
280c2a77 787 }
c97c7edb 788
47352846
JB
789 /** @inheritdoc */
790 public start (): void {
791 this.starting = true
792 while (
793 this.workerNodes.reduce(
794 (accumulator, workerNode) =>
795 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
796 0
797 ) < this.numberOfWorkers
798 ) {
799 this.createAndSetupWorkerNode()
800 }
801 this.starting = false
802 this.started = true
803 }
804
afc003b2 805 /** @inheritDoc */
c97c7edb 806 public async destroy (): Promise<void> {
1fbcaa7c 807 await Promise.all(
81c02522 808 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 809 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
810 })
811 )
33e6bb4c 812 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 813 this.started = false
c97c7edb
S
814 }
815
1e3214b6
JB
816 protected async sendKillMessageToWorker (
817 workerNodeKey: number,
818 workerId: number
819 ): Promise<void> {
9edb9717 820 await new Promise<void>((resolve, reject) => {
041dc05b 821 this.registerWorkerMessageListener(workerNodeKey, message => {
1e3214b6
JB
822 if (message.kill === 'success') {
823 resolve()
824 } else if (message.kill === 'failure') {
e1af34e6 825 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
826 }
827 })
9edb9717 828 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 829 })
1e3214b6
JB
830 }
831
4a6952ff 832 /**
aa9eede8 833 * Terminates the worker node given its worker node key.
4a6952ff 834 *
aa9eede8 835 * @param workerNodeKey - The worker node key.
4a6952ff 836 */
81c02522 837 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 838
729c563d 839 /**
6677a3d3
JB
840 * Setup hook to execute code before worker nodes are created in the abstract constructor.
841 * Can be overridden.
afc003b2
JB
842 *
843 * @virtual
729c563d 844 */
280c2a77 845 protected setupHook (): void {
965df41c 846 /* Intentionally empty */
280c2a77 847 }
c97c7edb 848
729c563d 849 /**
280c2a77
S
850 * Should return whether the worker is the main worker or not.
851 */
852 protected abstract isMain (): boolean
853
854 /**
2e81254d 855 * Hook executed before the worker task execution.
bf9549ae 856 * Can be overridden.
729c563d 857 *
f06e48d8 858 * @param workerNodeKey - The worker node key.
1c6fe997 859 * @param task - The task to execute.
729c563d 860 */
1c6fe997
JB
861 protected beforeTaskExecutionHook (
862 workerNodeKey: number,
863 task: Task<Data>
864 ): void {
94407def
JB
865 if (this.workerNodes[workerNodeKey]?.usage != null) {
866 const workerUsage = this.workerNodes[workerNodeKey].usage
867 ++workerUsage.tasks.executing
868 this.updateWaitTimeWorkerUsage(workerUsage, task)
869 }
870 if (
871 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
872 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
873 task.name as string
874 ) != null
875 ) {
db0e38ee 876 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 877 workerNodeKey
db0e38ee 878 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
879 ++taskFunctionWorkerUsage.tasks.executing
880 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 881 }
c97c7edb
S
882 }
883
c01733f1 884 /**
2e81254d 885 * Hook executed after the worker task execution.
bf9549ae 886 * Can be overridden.
c01733f1 887 *
501aea93 888 * @param workerNodeKey - The worker node key.
38e795c1 889 * @param message - The received message.
c01733f1 890 */
2e81254d 891 protected afterTaskExecutionHook (
501aea93 892 workerNodeKey: number,
2740a743 893 message: MessageValue<Response>
bf9549ae 894 ): void {
94407def
JB
895 if (this.workerNodes[workerNodeKey]?.usage != null) {
896 const workerUsage = this.workerNodes[workerNodeKey].usage
897 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
898 this.updateRunTimeWorkerUsage(workerUsage, message)
899 this.updateEluWorkerUsage(workerUsage, message)
900 }
901 if (
902 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
903 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 904 message.taskPerformance?.name as string
94407def
JB
905 ) != null
906 ) {
db0e38ee 907 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 908 workerNodeKey
db0e38ee 909 ].getTaskFunctionWorkerUsage(
0628755c 910 message.taskPerformance?.name as string
b558f6b5 911 ) as WorkerUsage
db0e38ee
JB
912 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
913 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
914 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
915 }
916 }
917
db0e38ee
JB
918 /**
919 * Whether the worker node shall update its task function worker usage or not.
920 *
921 * @param workerNodeKey - The worker node key.
922 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
923 */
924 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 925 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 926 return (
94407def 927 workerInfo != null &&
a5d15204 928 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 929 workerInfo.taskFunctions.length > 2
b558f6b5 930 )
f1c06930
JB
931 }
932
933 private updateTaskStatisticsWorkerUsage (
934 workerUsage: WorkerUsage,
935 message: MessageValue<Response>
936 ): void {
a4e07f72 937 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
938 if (
939 workerTaskStatistics.executing != null &&
940 workerTaskStatistics.executing > 0
941 ) {
942 --workerTaskStatistics.executing
5bb5be17 943 }
98e72cda
JB
944 if (message.taskError == null) {
945 ++workerTaskStatistics.executed
946 } else {
a4e07f72 947 ++workerTaskStatistics.failed
2740a743 948 }
f8eb0a2a
JB
949 }
950
a4e07f72
JB
951 private updateRunTimeWorkerUsage (
952 workerUsage: WorkerUsage,
f8eb0a2a
JB
953 message: MessageValue<Response>
954 ): void {
dc021bcc
JB
955 if (message.taskError != null) {
956 return
957 }
e4f20deb
JB
958 updateMeasurementStatistics(
959 workerUsage.runTime,
960 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 961 message.taskPerformance?.runTime ?? 0
e4f20deb 962 )
f8eb0a2a
JB
963 }
964
a4e07f72
JB
965 private updateWaitTimeWorkerUsage (
966 workerUsage: WorkerUsage,
1c6fe997 967 task: Task<Data>
f8eb0a2a 968 ): void {
1c6fe997
JB
969 const timestamp = performance.now()
970 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
971 updateMeasurementStatistics(
972 workerUsage.waitTime,
973 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 974 taskWaitTime
e4f20deb 975 )
c01733f1 976 }
977
a4e07f72 978 private updateEluWorkerUsage (
5df69fab 979 workerUsage: WorkerUsage,
62c15a68
JB
980 message: MessageValue<Response>
981 ): void {
dc021bcc
JB
982 if (message.taskError != null) {
983 return
984 }
008512c7
JB
985 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
986 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
987 updateMeasurementStatistics(
988 workerUsage.elu.active,
008512c7 989 eluTaskStatisticsRequirements,
dc021bcc 990 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
991 )
992 updateMeasurementStatistics(
993 workerUsage.elu.idle,
008512c7 994 eluTaskStatisticsRequirements,
dc021bcc 995 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 996 )
008512c7 997 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 998 if (message.taskPerformance?.elu != null) {
f7510105
JB
999 if (workerUsage.elu.utilization != null) {
1000 workerUsage.elu.utilization =
1001 (workerUsage.elu.utilization +
1002 message.taskPerformance.elu.utilization) /
1003 2
1004 } else {
1005 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1006 }
62c15a68
JB
1007 }
1008 }
1009 }
1010
280c2a77 1011 /**
f06e48d8 1012 * Chooses a worker node for the next task.
280c2a77 1013 *
6c6afb84 1014 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1015 *
aa9eede8 1016 * @returns The chosen worker node key
280c2a77 1017 */
6c6afb84 1018 private chooseWorkerNode (): number {
930dcf12 1019 if (this.shallCreateDynamicWorker()) {
aa9eede8 1020 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1021 if (
b1aae695 1022 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1023 ) {
aa9eede8 1024 return workerNodeKey
6c6afb84 1025 }
17393ac8 1026 }
930dcf12
JB
1027 return this.workerChoiceStrategyContext.execute()
1028 }
1029
6c6afb84
JB
1030 /**
1031 * Conditions for dynamic worker creation.
1032 *
1033 * @returns Whether to create a dynamic worker or not.
1034 */
1035 private shallCreateDynamicWorker (): boolean {
930dcf12 1036 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1037 }
1038
280c2a77 1039 /**
aa9eede8 1040 * Sends a message to worker given its worker node key.
280c2a77 1041 *
aa9eede8 1042 * @param workerNodeKey - The worker node key.
38e795c1 1043 * @param message - The message.
7d91a8cd 1044 * @param transferList - The optional array of transferable objects.
280c2a77
S
1045 */
1046 protected abstract sendToWorker (
aa9eede8 1047 workerNodeKey: number,
7d91a8cd
JB
1048 message: MessageValue<Data>,
1049 transferList?: TransferListItem[]
280c2a77
S
1050 ): void
1051
729c563d 1052 /**
41344292 1053 * Creates a new worker.
6c6afb84
JB
1054 *
1055 * @returns Newly created worker.
729c563d 1056 */
280c2a77 1057 protected abstract createWorker (): Worker
c97c7edb 1058
4a6952ff 1059 /**
aa9eede8 1060 * Creates a new, completely set up worker node.
4a6952ff 1061 *
aa9eede8 1062 * @returns New, completely set up worker node key.
4a6952ff 1063 */
aa9eede8 1064 protected createAndSetupWorkerNode (): number {
bdacc2d2 1065 const worker = this.createWorker()
280c2a77 1066
fd04474e 1067 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1068 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1069 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1070 worker.on('error', error => {
aad6fb64 1071 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
46b0bb09 1072 const workerInfo = this.getWorkerInfo(workerNodeKey)
9b106837 1073 workerInfo.ready = false
0dc838e3 1074 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1075 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1076 if (
1077 this.opts.restartWorkerOnError === true &&
b6bfca01
JB
1078 this.started &&
1079 !this.starting
15b176e0 1080 ) {
9b106837 1081 if (workerInfo.dynamic) {
aa9eede8 1082 this.createAndSetupDynamicWorkerNode()
8a1260a3 1083 } else {
aa9eede8 1084 this.createAndSetupWorkerNode()
8a1260a3 1085 }
5baee0d7 1086 }
19dbc45b 1087 if (this.opts.enableTasksQueue === true) {
9b106837 1088 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1089 }
5baee0d7 1090 })
a35560ba 1091 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1092 worker.once('exit', () => {
f06e48d8 1093 this.removeWorkerNode(worker)
a974afa6 1094 })
280c2a77 1095
aa9eede8 1096 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1097
aa9eede8 1098 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1099
aa9eede8 1100 return workerNodeKey
c97c7edb 1101 }
be0676b3 1102
930dcf12 1103 /**
aa9eede8 1104 * Creates a new, completely set up dynamic worker node.
930dcf12 1105 *
aa9eede8 1106 * @returns New, completely set up dynamic worker node key.
930dcf12 1107 */
aa9eede8
JB
1108 protected createAndSetupDynamicWorkerNode (): number {
1109 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1110 this.registerWorkerMessageListener(workerNodeKey, message => {
aa9eede8
JB
1111 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1112 message.workerId
aad6fb64 1113 )
aa9eede8 1114 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1115 // Kill message received from worker
930dcf12
JB
1116 if (
1117 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1118 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1119 ((this.opts.enableTasksQueue === false &&
aa9eede8 1120 workerUsage.tasks.executing === 0) ||
7b56f532 1121 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1122 workerUsage.tasks.executing === 0 &&
1123 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1124 ) {
041dc05b 1125 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1126 this.emitter?.emit(PoolEvents.error, error)
1127 })
930dcf12
JB
1128 }
1129 })
46b0bb09 1130 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1131 this.sendToWorker(workerNodeKey, {
b0a4db63 1132 checkActive: true,
21f710aa
JB
1133 workerId: workerInfo.id as number
1134 })
b5e113f6 1135 workerInfo.dynamic = true
b1aae695
JB
1136 if (
1137 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1138 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1139 ) {
b5e113f6
JB
1140 workerInfo.ready = true
1141 }
33e6bb4c 1142 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1143 return workerNodeKey
930dcf12
JB
1144 }
1145
a2ed5053 1146 /**
aa9eede8 1147 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1148 *
aa9eede8 1149 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1150 * @param listener - The message listener callback.
1151 */
85aeb3f3
JB
1152 protected abstract registerWorkerMessageListener<
1153 Message extends Data | Response
aa9eede8
JB
1154 >(
1155 workerNodeKey: number,
1156 listener: (message: MessageValue<Message>) => void
1157 ): void
a2ed5053
JB
1158
1159 /**
aa9eede8 1160 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1161 * Can be overridden.
1162 *
aa9eede8 1163 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1164 */
aa9eede8 1165 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1166 // Listen to worker messages.
aa9eede8 1167 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1168 // Send the startup message to worker.
aa9eede8 1169 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1170 // Send the statistics message to worker.
1171 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1172 if (this.opts.enableTasksQueue === true) {
47352846
JB
1173 if (this.opts.tasksQueueOptions?.tasksStealing === true) {
1174 this.workerNodes[workerNodeKey].onEmptyQueue =
1175 this.taskStealingOnEmptyQueue.bind(this)
1176 }
1177 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
1178 this.workerNodes[workerNodeKey].onBackPressure =
1179 this.tasksStealingOnBackPressure.bind(this)
1180 }
72695f86 1181 }
d2c73f82
JB
1182 }
1183
85aeb3f3 1184 /**
aa9eede8
JB
1185 * Sends the startup message to worker given its worker node key.
1186 *
1187 * @param workerNodeKey - The worker node key.
1188 */
1189 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1190
1191 /**
9edb9717 1192 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1193 *
aa9eede8 1194 * @param workerNodeKey - The worker node key.
85aeb3f3 1195 */
9edb9717 1196 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1197 this.sendToWorker(workerNodeKey, {
1198 statistics: {
1199 runTime:
1200 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1201 .runTime.aggregate,
1202 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1203 .elu.aggregate
1204 },
46b0bb09 1205 workerId: this.getWorkerInfo(workerNodeKey).id as number
aa9eede8
JB
1206 })
1207 }
a2ed5053
JB
1208
1209 private redistributeQueuedTasks (workerNodeKey: number): void {
1210 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1211 const destinationWorkerNodeKey = this.workerNodes.reduce(
1212 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1213 return workerNode.info.ready &&
1214 workerNode.usage.tasks.queued <
1215 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1216 ? workerNodeKey
1217 : minWorkerNodeKey
1218 },
1219 0
1220 )
3f690f25
JB
1221 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
1222 const task = {
1223 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
1224 workerId: destinationWorkerNode.info.id as number
1225 }
1226 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1227 this.executeTask(destinationWorkerNodeKey, task)
1228 } else {
1229 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1230 }
1231 }
1232 }
1233
b1838604
JB
1234 private updateTaskStolenStatisticsWorkerUsage (
1235 workerNodeKey: number,
b1838604
JB
1236 taskName: string
1237 ): void {
1a880eca 1238 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1239 if (workerNode?.usage != null) {
1240 ++workerNode.usage.tasks.stolen
1241 }
1242 if (
1243 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1244 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1245 ) {
1246 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1247 taskName
1248 ) as WorkerUsage
1249 ++taskFunctionWorkerUsage.tasks.stolen
1250 }
1251 }
1252
dd951876 1253 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1254 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1255 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1256 const workerNodes = this.workerNodes
a6b3272b 1257 .slice()
dd951876
JB
1258 .sort(
1259 (workerNodeA, workerNodeB) =>
1260 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1261 )
f201a0cd 1262 const sourceWorkerNode = workerNodes.find(
041dc05b 1263 workerNode =>
f201a0cd
JB
1264 workerNode.info.ready &&
1265 workerNode.info.id !== workerId &&
1266 workerNode.usage.tasks.queued > 0
1267 )
1268 if (sourceWorkerNode != null) {
1269 const task = {
1270 ...(sourceWorkerNode.popTask() as Task<Data>),
1271 workerId: destinationWorkerNode.info.id as number
0bc68267 1272 }
f201a0cd
JB
1273 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1274 this.executeTask(destinationWorkerNodeKey, task)
1275 } else {
1276 this.enqueueTask(destinationWorkerNodeKey, task)
72695f86 1277 }
f201a0cd
JB
1278 this.updateTaskStolenStatisticsWorkerUsage(
1279 destinationWorkerNodeKey,
1280 task.name as string
1281 )
72695f86
JB
1282 }
1283 }
1284
1285 private tasksStealingOnBackPressure (workerId: number): void {
f778c355
JB
1286 const sizeOffset = 1
1287 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1288 return
1289 }
72695f86
JB
1290 const sourceWorkerNode =
1291 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1292 const workerNodes = this.workerNodes
a6b3272b 1293 .slice()
72695f86
JB
1294 .sort(
1295 (workerNodeA, workerNodeB) =>
1296 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1297 )
1298 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1299 if (
0bc68267 1300 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1301 workerNode.info.ready &&
1302 workerNode.info.id !== workerId &&
0bc68267 1303 workerNode.usage.tasks.queued <
f778c355 1304 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1305 ) {
dd951876
JB
1306 const task = {
1307 ...(sourceWorkerNode.popTask() as Task<Data>),
1308 workerId: workerNode.info.id as number
1309 }
375f7504 1310 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1311 this.executeTask(workerNodeKey, task)
4de3d785 1312 } else {
dd951876 1313 this.enqueueTask(workerNodeKey, task)
4de3d785 1314 }
b1838604
JB
1315 this.updateTaskStolenStatisticsWorkerUsage(
1316 workerNodeKey,
b1838604
JB
1317 task.name as string
1318 )
10ecf8fd 1319 }
a2ed5053
JB
1320 }
1321 }
1322
be0676b3 1323 /**
aa9eede8 1324 * This method is the listener registered for each worker message.
be0676b3 1325 *
bdacc2d2 1326 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1327 */
1328 protected workerListener (): (message: MessageValue<Response>) => void {
041dc05b 1329 return message => {
21f710aa 1330 this.checkMessageWorkerId(message)
a5d15204 1331 if (message.ready != null && message.taskFunctions != null) {
81c02522 1332 // Worker ready response received from worker
10e2aa7e 1333 this.handleWorkerReadyResponse(message)
7629bdf1 1334 } else if (message.taskId != null) {
81c02522 1335 // Task execution response received from worker
6b272951 1336 this.handleTaskExecutionResponse(message)
90d7d101
JB
1337 } else if (message.taskFunctions != null) {
1338 // Task functions message received from worker
46b0bb09
JB
1339 this.getWorkerInfo(
1340 this.getWorkerNodeKeyByWorkerId(message.workerId)
b558f6b5 1341 ).taskFunctions = message.taskFunctions
6b272951
JB
1342 }
1343 }
1344 }
1345
10e2aa7e 1346 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1347 if (message.ready === false) {
1348 throw new Error(`Worker ${message.workerId} failed to initialize`)
1349 }
a5d15204 1350 const workerInfo = this.getWorkerInfo(
aad6fb64 1351 this.getWorkerNodeKeyByWorkerId(message.workerId)
46b0bb09 1352 )
a5d15204
JB
1353 workerInfo.ready = message.ready as boolean
1354 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1355 if (this.emitter != null && this.ready) {
1356 this.emitter.emit(PoolEvents.ready, this.info)
1357 }
6b272951
JB
1358 }
1359
1360 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1361 const { taskId, taskError, data } = message
1362 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1363 if (promiseResponse != null) {
5441aea6
JB
1364 if (taskError != null) {
1365 this.emitter?.emit(PoolEvents.taskError, taskError)
1366 promiseResponse.reject(taskError.message)
6b272951 1367 } else {
5441aea6 1368 promiseResponse.resolve(data as Response)
6b272951 1369 }
501aea93
JB
1370 const workerNodeKey = promiseResponse.workerNodeKey
1371 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1372 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1373 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1374 if (
1375 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1376 this.tasksQueueSize(workerNodeKey) > 0 &&
1377 this.workerNodes[workerNodeKey].usage.tasks.executing <
1378 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1379 ) {
1380 this.executeTask(
1381 workerNodeKey,
1382 this.dequeueTask(workerNodeKey) as Task<Data>
1383 )
be0676b3
APA
1384 }
1385 }
be0676b3 1386 }
7c0ba920 1387
a1763c54 1388 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1389 if (this.busy) {
1390 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1391 }
1392 }
1393
1394 private checkAndEmitTaskQueuingEvents (): void {
1395 if (this.hasBackPressure()) {
1396 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1397 }
1398 }
1399
33e6bb4c
JB
1400 private checkAndEmitDynamicWorkerCreationEvents (): void {
1401 if (this.type === PoolTypes.dynamic) {
1402 if (this.full) {
1403 this.emitter?.emit(PoolEvents.full, this.info)
1404 }
1405 }
1406 }
1407
8a1260a3 1408 /**
aa9eede8 1409 * Gets the worker information given its worker node key.
8a1260a3
JB
1410 *
1411 * @param workerNodeKey - The worker node key.
3f09ed9f 1412 * @returns The worker information.
8a1260a3 1413 */
46b0bb09
JB
1414 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
1415 return this.workerNodes[workerNodeKey].info
e221309a
JB
1416 }
1417
a05c10de 1418 /**
b0a4db63 1419 * Adds the given worker in the pool worker nodes.
ea7a90d3 1420 *
38e795c1 1421 * @param worker - The worker.
aa9eede8
JB
1422 * @returns The added worker node key.
1423 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1424 */
b0a4db63 1425 private addWorkerNode (worker: Worker): number {
671d5154
JB
1426 const workerNode = new WorkerNode<Worker, Data>(
1427 worker,
ff3f866a 1428 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1429 )
b97d82d8 1430 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1431 if (this.starting) {
1432 workerNode.info.ready = true
1433 }
aa9eede8 1434 this.workerNodes.push(workerNode)
aad6fb64 1435 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1436 if (workerNodeKey === -1) {
86ed0598 1437 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1438 }
1439 return workerNodeKey
ea7a90d3 1440 }
c923ce56 1441
51fe3d3c 1442 /**
f06e48d8 1443 * Removes the given worker from the pool worker nodes.
51fe3d3c 1444 *
f06e48d8 1445 * @param worker - The worker.
51fe3d3c 1446 */
416fd65c 1447 private removeWorkerNode (worker: Worker): void {
aad6fb64 1448 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1449 if (workerNodeKey !== -1) {
1450 this.workerNodes.splice(workerNodeKey, 1)
1451 this.workerChoiceStrategyContext.remove(workerNodeKey)
1452 }
51fe3d3c 1453 }
adc3c320 1454
e2b31e32
JB
1455 /** @inheritDoc */
1456 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1457 return (
e2b31e32
JB
1458 this.opts.enableTasksQueue === true &&
1459 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1460 )
1461 }
1462
1463 private hasBackPressure (): boolean {
1464 return (
1465 this.opts.enableTasksQueue === true &&
1466 this.workerNodes.findIndex(
041dc05b 1467 workerNode => !workerNode.hasBackPressure()
a1763c54 1468 ) === -1
9e844245 1469 )
e2b31e32
JB
1470 }
1471
b0a4db63 1472 /**
aa9eede8 1473 * Executes the given task on the worker given its worker node key.
b0a4db63 1474 *
aa9eede8 1475 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1476 * @param task - The task to execute.
1477 */
2e81254d 1478 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1479 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1480 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1481 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1482 }
1483
f9f00b5f 1484 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1485 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1486 this.checkAndEmitTaskQueuingEvents()
1487 return tasksQueueSize
adc3c320
JB
1488 }
1489
416fd65c 1490 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1491 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1492 }
1493
416fd65c 1494 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1495 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1496 }
1497
81c02522 1498 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1499 while (this.tasksQueueSize(workerNodeKey) > 0) {
1500 this.executeTask(
1501 workerNodeKey,
1502 this.dequeueTask(workerNodeKey) as Task<Data>
1503 )
ff733df7 1504 }
4b628b48 1505 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1506 }
1507
ef41a6e6
JB
1508 private flushTasksQueues (): void {
1509 for (const [workerNodeKey] of this.workerNodes.entries()) {
1510 this.flushTasksQueue(workerNodeKey)
1511 }
1512 }
c97c7edb 1513}