chore: v2.6.18
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
e102732c 33 MessageHandler,
8a1260a3 34 WorkerInfo,
4b628b48 35 WorkerType,
e102732c
JB
36 WorkerUsage
37} from './worker'
a35560ba 38import {
008512c7 39 type MeasurementStatisticsRequirements,
f0d7f803 40 Measurements,
a35560ba 41 WorkerChoiceStrategies,
a20f0ba5
JB
42 type WorkerChoiceStrategy,
43 type WorkerChoiceStrategyOptions
bdaf31cd
JB
44} from './selection-strategies/selection-strategies-types'
45import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 46import { version } from './version'
4b628b48 47import { WorkerNode } from './worker-node'
23ccf9d7 48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
4b628b48 62 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
a3445496 68 * The execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
c923ce56
JB
75 protected promiseResponseMap: Map<
76 string,
77 PromiseResponseWrapper<Worker, Response>
78 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
075e51d1 89 /**
adc9cc64 90 * Whether the pool is starting or not.
075e51d1
JB
91 */
92 private readonly starting: boolean
afe0d5bf
JB
93 /**
94 * The start timestamp of the pool.
95 */
96 private readonly startTimestamp
97
729c563d
S
98 /**
99 * Constructs a new poolifier pool.
100 *
38e795c1 101 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 102 * @param filePath - Path to the worker file.
38e795c1 103 * @param opts - Options for the pool.
729c563d 104 */
c97c7edb 105 public constructor (
b4213b7f
JB
106 protected readonly numberOfWorkers: number,
107 protected readonly filePath: string,
108 protected readonly opts: PoolOptions<Worker>
c97c7edb 109 ) {
78cea37e 110 if (!this.isMain()) {
c97c7edb
S
111 throw new Error('Cannot start a pool from a worker!')
112 }
8d3782fa 113 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 114 this.checkFilePath(this.filePath)
7c0ba920 115 this.checkPoolOptions(this.opts)
1086026a 116
7254e419
JB
117 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
118 this.executeTask = this.executeTask.bind(this)
119 this.enqueueTask = this.enqueueTask.bind(this)
120 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 121
6bd72cd0 122 if (this.opts.enableEvents === true) {
7c0ba920
JB
123 this.emitter = new PoolEmitter()
124 }
d59df138
JB
125 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
126 Worker,
127 Data,
128 Response
da309861
JB
129 >(
130 this,
131 this.opts.workerChoiceStrategy,
132 this.opts.workerChoiceStrategyOptions
133 )
b6b32453
JB
134
135 this.setupHook()
136
075e51d1 137 this.starting = true
e761c033 138 this.startPool()
075e51d1 139 this.starting = false
afe0d5bf
JB
140
141 this.startTimestamp = performance.now()
c97c7edb
S
142 }
143
a35560ba 144 private checkFilePath (filePath: string): void {
ffcbbad8
JB
145 if (
146 filePath == null ||
3d6dd312 147 typeof filePath !== 'string' ||
ffcbbad8
JB
148 (typeof filePath === 'string' && filePath.trim().length === 0)
149 ) {
c510fea7
APA
150 throw new Error('Please specify a file with a worker implementation')
151 }
3d6dd312
JB
152 if (!existsSync(filePath)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
154 }
c510fea7
APA
155 }
156
8d3782fa
JB
157 private checkNumberOfWorkers (numberOfWorkers: number): void {
158 if (numberOfWorkers == null) {
159 throw new Error(
160 'Cannot instantiate a pool without specifying the number of workers'
161 )
78cea37e 162 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 163 throw new TypeError(
0d80593b 164 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
165 )
166 } else if (numberOfWorkers < 0) {
473c717a 167 throw new RangeError(
8d3782fa
JB
168 'Cannot instantiate a pool with a negative number of workers'
169 )
6b27d407 170 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
172 }
173 }
174
175 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
176 if (this.type === PoolTypes.dynamic) {
177 if (min > max) {
178 throw new RangeError(
179 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
180 )
b97d82d8 181 } else if (max === 0) {
079de991 182 throw new RangeError(
b97d82d8 183 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
184 )
185 } else if (min === max) {
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 )
189 }
8d3782fa
JB
190 }
191 }
192
7c0ba920 193 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
194 if (isPlainObject(opts)) {
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
198 this.opts.workerChoiceStrategyOptions =
199 opts.workerChoiceStrategyOptions ??
200 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
201 this.checkValidWorkerChoiceStrategyOptions(
202 this.opts.workerChoiceStrategyOptions
203 )
1f68cede 204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
208 this.checkValidTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
212 opts.tasksQueueOptions as TasksQueueOptions
213 )
214 }
215 } else {
216 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 217 }
aee46736
JB
218 }
219
220 private checkValidWorkerChoiceStrategy (
221 workerChoiceStrategy: WorkerChoiceStrategy
222 ): void {
223 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 224 throw new Error(
aee46736 225 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
226 )
227 }
7c0ba920
JB
228 }
229
0d80593b
JB
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
232 ): void {
233 if (!isPlainObject(workerChoiceStrategyOptions)) {
234 throw new TypeError(
235 'Invalid worker choice strategy options: must be a plain object'
236 )
237 }
49be33fe
JB
238 if (
239 workerChoiceStrategyOptions.weights != null &&
6b27d407 240 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
241 ) {
242 throw new Error(
243 'Invalid worker choice strategy options: must have a weight for each worker node'
244 )
245 }
f0d7f803
JB
246 if (
247 workerChoiceStrategyOptions.measurement != null &&
248 !Object.values(Measurements).includes(
249 workerChoiceStrategyOptions.measurement
250 )
251 ) {
252 throw new Error(
253 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
254 )
255 }
0d80593b
JB
256 }
257
a20f0ba5
JB
258 private checkValidTasksQueueOptions (
259 tasksQueueOptions: TasksQueueOptions
260 ): void {
0d80593b
JB
261 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
262 throw new TypeError('Invalid tasks queue options: must be a plain object')
263 }
f0d7f803
JB
264 if (
265 tasksQueueOptions?.concurrency != null &&
266 !Number.isSafeInteger(tasksQueueOptions.concurrency)
267 ) {
268 throw new TypeError(
269 'Invalid worker tasks concurrency: must be an integer'
270 )
271 }
272 if (
273 tasksQueueOptions?.concurrency != null &&
274 tasksQueueOptions.concurrency <= 0
275 ) {
a20f0ba5 276 throw new Error(
f0d7f803 277 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
278 )
279 }
280 }
281
e761c033
JB
282 private startPool (): void {
283 while (
284 this.workerNodes.reduce(
285 (accumulator, workerNode) =>
286 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
287 0
288 ) < this.numberOfWorkers
289 ) {
290 this.createAndSetupWorker()
291 }
292 }
293
08f3f44c 294 /** @inheritDoc */
6b27d407
JB
295 public get info (): PoolInfo {
296 return {
23ccf9d7 297 version,
6b27d407 298 type: this.type,
184855e6 299 worker: this.worker,
2431bdb4
JB
300 ready: this.ready,
301 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
302 minSize: this.minSize,
303 maxSize: this.maxSize,
c05f0d50
JB
304 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
305 .runTime.aggregate &&
1305e9a8
JB
306 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
307 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
308 workerNodes: this.workerNodes.length,
309 idleWorkerNodes: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
f59e1027 311 workerNode.usage.tasks.executing === 0
a4e07f72
JB
312 ? accumulator + 1
313 : accumulator,
6b27d407
JB
314 0
315 ),
316 busyWorkerNodes: this.workerNodes.reduce(
317 (accumulator, workerNode) =>
f59e1027 318 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
319 0
320 ),
a4e07f72 321 executedTasks: this.workerNodes.reduce(
6b27d407 322 (accumulator, workerNode) =>
f59e1027 323 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
324 0
325 ),
326 executingTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
f59e1027 328 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
329 0
330 ),
331 queuedTasks: this.workerNodes.reduce(
df593701 332 (accumulator, workerNode) =>
f59e1027 333 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
334 0
335 ),
336 maxQueuedTasks: this.workerNodes.reduce(
337 (accumulator, workerNode) =>
b25a42cd 338 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 339 0
a4e07f72
JB
340 ),
341 failedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
f59e1027 343 accumulator + workerNode.usage.tasks.failed,
a4e07f72 344 0
1dcf8b7b
JB
345 ),
346 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .runTime.aggregate && {
348 runTime: {
98e72cda
JB
349 minimum: round(
350 Math.min(
351 ...this.workerNodes.map(
352 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
353 )
1dcf8b7b
JB
354 )
355 ),
98e72cda
JB
356 maximum: round(
357 Math.max(
358 ...this.workerNodes.map(
359 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
360 )
1dcf8b7b 361 )
98e72cda
JB
362 ),
363 average: round(
364 this.workerNodes.reduce(
365 (accumulator, workerNode) =>
366 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
367 0
368 ) /
369 this.workerNodes.reduce(
370 (accumulator, workerNode) =>
371 accumulator + (workerNode.usage.tasks?.executed ?? 0),
372 0
373 )
374 ),
375 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
376 .runTime.median && {
377 median: round(
378 median(
379 this.workerNodes.map(
380 workerNode => workerNode.usage.runTime?.median ?? 0
381 )
382 )
383 )
384 })
1dcf8b7b
JB
385 }
386 }),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .waitTime.aggregate && {
389 waitTime: {
98e72cda
JB
390 minimum: round(
391 Math.min(
392 ...this.workerNodes.map(
393 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
394 )
1dcf8b7b
JB
395 )
396 ),
98e72cda
JB
397 maximum: round(
398 Math.max(
399 ...this.workerNodes.map(
400 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
401 )
1dcf8b7b 402 )
98e72cda
JB
403 ),
404 average: round(
405 this.workerNodes.reduce(
406 (accumulator, workerNode) =>
407 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
408 0
409 ) /
410 this.workerNodes.reduce(
411 (accumulator, workerNode) =>
412 accumulator + (workerNode.usage.tasks?.executed ?? 0),
413 0
414 )
415 ),
416 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
417 .waitTime.median && {
418 median: round(
419 median(
420 this.workerNodes.map(
421 workerNode => workerNode.usage.waitTime?.median ?? 0
422 )
423 )
424 )
425 })
1dcf8b7b
JB
426 }
427 })
6b27d407
JB
428 }
429 }
08f3f44c 430
2431bdb4
JB
431 private get ready (): boolean {
432 return (
b97d82d8
JB
433 this.workerNodes.reduce(
434 (accumulator, workerNode) =>
435 !workerNode.info.dynamic && workerNode.info.ready
436 ? accumulator + 1
437 : accumulator,
438 0
439 ) >= this.minSize
2431bdb4
JB
440 )
441 }
442
afe0d5bf
JB
443 /**
444 * Gets the approximate pool utilization.
445 *
446 * @returns The pool utilization.
447 */
448 private get utilization (): number {
8e5ca040 449 const poolTimeCapacity =
fe7d90db 450 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
451 const totalTasksRunTime = this.workerNodes.reduce(
452 (accumulator, workerNode) =>
71514351 453 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
454 0
455 )
456 const totalTasksWaitTime = this.workerNodes.reduce(
457 (accumulator, workerNode) =>
71514351 458 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
459 0
460 )
8e5ca040 461 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
462 }
463
8881ae32
JB
464 /**
465 * Pool type.
466 *
467 * If it is `'dynamic'`, it provides the `max` property.
468 */
469 protected abstract get type (): PoolType
470
184855e6
JB
471 /**
472 * Gets the worker type.
473 */
474 protected abstract get worker (): WorkerType
475
c2ade475 476 /**
6b27d407 477 * Pool minimum size.
c2ade475 478 */
6b27d407 479 protected abstract get minSize (): number
ff733df7
JB
480
481 /**
6b27d407 482 * Pool maximum size.
ff733df7 483 */
6b27d407 484 protected abstract get maxSize (): number
a35560ba 485
f59e1027
JB
486 /**
487 * Get the worker given its id.
488 *
489 * @param workerId - The worker id.
490 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
491 */
492 private getWorkerById (workerId: number): Worker | undefined {
493 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
494 ?.worker
495 }
496
6b813701
JB
497 /**
498 * Checks if the worker id sent in the received message from a worker is valid.
499 *
500 * @param message - The received message.
501 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
502 */
21f710aa
JB
503 private checkMessageWorkerId (message: MessageValue<Response>): void {
504 if (
505 message.workerId != null &&
506 this.getWorkerById(message.workerId) == null
507 ) {
508 throw new Error(
509 `Worker message received from unknown worker '${message.workerId}'`
510 )
511 }
512 }
513
ffcbbad8 514 /**
f06e48d8 515 * Gets the given worker its worker node key.
ffcbbad8
JB
516 *
517 * @param worker - The worker.
f59e1027 518 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 519 */
f06e48d8
JB
520 private getWorkerNodeKey (worker: Worker): number {
521 return this.workerNodes.findIndex(
522 workerNode => workerNode.worker === worker
523 )
bf9549ae
JB
524 }
525
afc003b2 526 /** @inheritDoc */
a35560ba 527 public setWorkerChoiceStrategy (
59219cbb
JB
528 workerChoiceStrategy: WorkerChoiceStrategy,
529 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 530 ): void {
aee46736 531 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 532 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
533 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
534 this.opts.workerChoiceStrategy
535 )
536 if (workerChoiceStrategyOptions != null) {
537 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
538 }
8a1260a3 539 for (const workerNode of this.workerNodes) {
4b628b48 540 workerNode.resetUsage()
b6b32453 541 this.setWorkerStatistics(workerNode.worker)
59219cbb 542 }
a20f0ba5
JB
543 }
544
545 /** @inheritDoc */
546 public setWorkerChoiceStrategyOptions (
547 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
548 ): void {
0d80593b 549 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
550 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
551 this.workerChoiceStrategyContext.setOptions(
552 this.opts.workerChoiceStrategyOptions
a35560ba
S
553 )
554 }
555
a20f0ba5 556 /** @inheritDoc */
8f52842f
JB
557 public enableTasksQueue (
558 enable: boolean,
559 tasksQueueOptions?: TasksQueueOptions
560 ): void {
a20f0ba5 561 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 562 this.flushTasksQueues()
a20f0ba5
JB
563 }
564 this.opts.enableTasksQueue = enable
8f52842f 565 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
566 }
567
568 /** @inheritDoc */
8f52842f 569 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 570 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
571 this.checkValidTasksQueueOptions(tasksQueueOptions)
572 this.opts.tasksQueueOptions =
573 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 574 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
575 delete this.opts.tasksQueueOptions
576 }
577 }
578
579 private buildTasksQueueOptions (
580 tasksQueueOptions: TasksQueueOptions
581 ): TasksQueueOptions {
582 return {
583 concurrency: tasksQueueOptions?.concurrency ?? 1
584 }
585 }
586
c319c66b
JB
587 /**
588 * Whether the pool is full or not.
589 *
590 * The pool filling boolean status.
591 */
dea903a8
JB
592 protected get full (): boolean {
593 return this.workerNodes.length >= this.maxSize
594 }
c2ade475 595
c319c66b
JB
596 /**
597 * Whether the pool is busy or not.
598 *
599 * The pool busyness boolean status.
600 */
601 protected abstract get busy (): boolean
7c0ba920 602
6c6afb84
JB
603 /**
604 * Whether worker nodes are executing at least one task.
605 *
606 * @returns Worker nodes busyness boolean status.
607 */
c2ade475 608 protected internalBusy (): boolean {
e0ae6100
JB
609 return (
610 this.workerNodes.findIndex(workerNode => {
f59e1027 611 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
612 }) === -1
613 )
cb70b19d
JB
614 }
615
afc003b2 616 /** @inheritDoc */
a86b6df1 617 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 618 const timestamp = performance.now()
20dcad1a 619 const workerNodeKey = this.chooseWorkerNode()
adc3c320 620 const submittedTask: Task<Data> = {
ff128cc9 621 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
622 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
623 data: data ?? ({} as Data),
b6b32453 624 timestamp,
21f710aa 625 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 626 id: randomUUID()
adc3c320 627 }
2e81254d 628 const res = new Promise<Response>((resolve, reject) => {
02706357 629 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
630 resolve,
631 reject,
20dcad1a 632 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
633 })
634 })
ff733df7
JB
635 if (
636 this.opts.enableTasksQueue === true &&
7171d33f 637 (this.busy ||
f59e1027 638 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 639 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 640 .concurrency as number))
ff733df7 641 ) {
26a929d7
JB
642 this.enqueueTask(workerNodeKey, submittedTask)
643 } else {
2e81254d 644 this.executeTask(workerNodeKey, submittedTask)
adc3c320 645 }
ff733df7 646 this.checkAndEmitEvents()
78cea37e 647 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
648 return res
649 }
c97c7edb 650
afc003b2 651 /** @inheritDoc */
c97c7edb 652 public async destroy (): Promise<void> {
1fbcaa7c 653 await Promise.all(
875a7c37
JB
654 this.workerNodes.map(async (workerNode, workerNodeKey) => {
655 this.flushTasksQueue(workerNodeKey)
47aacbaa 656 // FIXME: wait for tasks to be finished
920278a2
JB
657 const workerExitPromise = new Promise<void>(resolve => {
658 workerNode.worker.on('exit', () => {
659 resolve()
660 })
661 })
f06e48d8 662 await this.destroyWorker(workerNode.worker)
920278a2 663 await workerExitPromise
1fbcaa7c
JB
664 })
665 )
c97c7edb
S
666 }
667
4a6952ff 668 /**
6c6afb84 669 * Terminates the given worker.
4a6952ff 670 *
f06e48d8 671 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
672 */
673 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 674
729c563d 675 /**
6677a3d3
JB
676 * Setup hook to execute code before worker nodes are created in the abstract constructor.
677 * Can be overridden.
afc003b2
JB
678 *
679 * @virtual
729c563d 680 */
280c2a77 681 protected setupHook (): void {
d99ba5a8 682 // Intentionally empty
280c2a77 683 }
c97c7edb 684
729c563d 685 /**
280c2a77
S
686 * Should return whether the worker is the main worker or not.
687 */
688 protected abstract isMain (): boolean
689
690 /**
2e81254d 691 * Hook executed before the worker task execution.
bf9549ae 692 * Can be overridden.
729c563d 693 *
f06e48d8 694 * @param workerNodeKey - The worker node key.
1c6fe997 695 * @param task - The task to execute.
729c563d 696 */
1c6fe997
JB
697 protected beforeTaskExecutionHook (
698 workerNodeKey: number,
699 task: Task<Data>
700 ): void {
f59e1027 701 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
702 ++workerUsage.tasks.executing
703 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 704 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
705 task.name as string
706 ) as WorkerUsage
eb8afc8a
JB
707 ++taskWorkerUsage.tasks.executing
708 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
709 }
710
c01733f1 711 /**
2e81254d 712 * Hook executed after the worker task execution.
bf9549ae 713 * Can be overridden.
c01733f1 714 *
c923ce56 715 * @param worker - The worker.
38e795c1 716 * @param message - The received message.
c01733f1 717 */
2e81254d 718 protected afterTaskExecutionHook (
c923ce56 719 worker: Worker,
2740a743 720 message: MessageValue<Response>
bf9549ae 721 ): void {
ff128cc9
JB
722 const workerNodeKey = this.getWorkerNodeKey(worker)
723 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
724 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
725 this.updateRunTimeWorkerUsage(workerUsage, message)
726 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 727 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 728 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 729 ) as WorkerUsage
eb8afc8a
JB
730 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
731 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
732 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
733 }
734
735 private updateTaskStatisticsWorkerUsage (
736 workerUsage: WorkerUsage,
737 message: MessageValue<Response>
738 ): void {
a4e07f72
JB
739 const workerTaskStatistics = workerUsage.tasks
740 --workerTaskStatistics.executing
98e72cda
JB
741 if (message.taskError == null) {
742 ++workerTaskStatistics.executed
743 } else {
a4e07f72 744 ++workerTaskStatistics.failed
2740a743 745 }
f8eb0a2a
JB
746 }
747
a4e07f72
JB
748 private updateRunTimeWorkerUsage (
749 workerUsage: WorkerUsage,
f8eb0a2a
JB
750 message: MessageValue<Response>
751 ): void {
e4f20deb
JB
752 updateMeasurementStatistics(
753 workerUsage.runTime,
754 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
755 message.taskPerformance?.runTime ?? 0,
756 workerUsage.tasks.executed
757 )
f8eb0a2a
JB
758 }
759
a4e07f72
JB
760 private updateWaitTimeWorkerUsage (
761 workerUsage: WorkerUsage,
1c6fe997 762 task: Task<Data>
f8eb0a2a 763 ): void {
1c6fe997
JB
764 const timestamp = performance.now()
765 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
766 updateMeasurementStatistics(
767 workerUsage.waitTime,
768 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
769 taskWaitTime,
770 workerUsage.tasks.executed
771 )
c01733f1 772 }
773
a4e07f72 774 private updateEluWorkerUsage (
5df69fab 775 workerUsage: WorkerUsage,
62c15a68
JB
776 message: MessageValue<Response>
777 ): void {
008512c7
JB
778 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
779 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
780 updateMeasurementStatistics(
781 workerUsage.elu.active,
008512c7 782 eluTaskStatisticsRequirements,
e4f20deb
JB
783 message.taskPerformance?.elu?.active ?? 0,
784 workerUsage.tasks.executed
785 )
786 updateMeasurementStatistics(
787 workerUsage.elu.idle,
008512c7 788 eluTaskStatisticsRequirements,
e4f20deb
JB
789 message.taskPerformance?.elu?.idle ?? 0,
790 workerUsage.tasks.executed
791 )
008512c7 792 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 793 if (message.taskPerformance?.elu != null) {
f7510105
JB
794 if (workerUsage.elu.utilization != null) {
795 workerUsage.elu.utilization =
796 (workerUsage.elu.utilization +
797 message.taskPerformance.elu.utilization) /
798 2
799 } else {
800 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
801 }
62c15a68
JB
802 }
803 }
804 }
805
280c2a77 806 /**
f06e48d8 807 * Chooses a worker node for the next task.
280c2a77 808 *
6c6afb84 809 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 810 *
20dcad1a 811 * @returns The worker node key
280c2a77 812 */
6c6afb84 813 private chooseWorkerNode (): number {
930dcf12 814 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
815 const worker = this.createAndSetupDynamicWorker()
816 if (
817 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
818 ) {
819 return this.getWorkerNodeKey(worker)
820 }
17393ac8 821 }
930dcf12
JB
822 return this.workerChoiceStrategyContext.execute()
823 }
824
6c6afb84
JB
825 /**
826 * Conditions for dynamic worker creation.
827 *
828 * @returns Whether to create a dynamic worker or not.
829 */
830 private shallCreateDynamicWorker (): boolean {
930dcf12 831 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
832 }
833
280c2a77 834 /**
675bb809 835 * Sends a message to the given worker.
280c2a77 836 *
38e795c1
JB
837 * @param worker - The worker which should receive the message.
838 * @param message - The message.
280c2a77
S
839 */
840 protected abstract sendToWorker (
841 worker: Worker,
842 message: MessageValue<Data>
843 ): void
844
729c563d 845 /**
41344292 846 * Creates a new worker.
6c6afb84
JB
847 *
848 * @returns Newly created worker.
729c563d 849 */
280c2a77 850 protected abstract createWorker (): Worker
c97c7edb 851
4a6952ff 852 /**
f06e48d8 853 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
854 *
855 * @returns New, completely set up worker.
856 */
857 protected createAndSetupWorker (): Worker {
bdacc2d2 858 const worker = this.createWorker()
280c2a77 859
35cf1c03 860 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 861 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 862 worker.on('error', error => {
9b106837
JB
863 const workerNodeKey = this.getWorkerNodeKey(worker)
864 const workerInfo = this.getWorkerInfo(workerNodeKey)
865 workerInfo.ready = false
2a69b8c5 866 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 867 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 868 if (workerInfo.dynamic) {
8a1260a3
JB
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
5baee0d7 873 }
19dbc45b 874 if (this.opts.enableTasksQueue === true) {
9b106837 875 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 876 }
5baee0d7 877 })
a35560ba
S
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 880 worker.once('exit', () => {
f06e48d8 881 this.removeWorkerNode(worker)
a974afa6 882 })
280c2a77 883
b0a4db63 884 this.addWorkerNode(worker)
280c2a77
S
885
886 this.afterWorkerSetup(worker)
887
c97c7edb
S
888 return worker
889 }
be0676b3 890
930dcf12
JB
891 /**
892 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
893 *
894 * @returns New, completely set up dynamic worker.
895 */
896 protected createAndSetupDynamicWorker (): Worker {
897 const worker = this.createAndSetupWorker()
898 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 899 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
f59e1027 904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 905 (this.opts.enableTasksQueue === true &&
f59e1027 906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 907 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
21f710aa 913 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
b0a4db63 914 workerInfo.dynamic = true
b97d82d8
JB
915 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
916 workerInfo.ready = true
917 }
21f710aa 918 this.sendToWorker(worker, {
b0a4db63 919 checkActive: true,
21f710aa
JB
920 workerId: workerInfo.id as number
921 })
930dcf12
JB
922 return worker
923 }
924
a2ed5053
JB
925 /**
926 * Registers a listener callback on the given worker.
927 *
928 * @param worker - The worker which should register a listener.
929 * @param listener - The message listener callback.
930 */
931 private registerWorkerMessageListener<Message extends Data | Response>(
932 worker: Worker,
933 listener: (message: MessageValue<Message>) => void
934 ): void {
935 worker.on('message', listener as MessageHandler<Worker>)
936 }
937
938 /**
939 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
940 * Can be overridden.
941 *
942 * @param worker - The newly created worker.
943 */
944 protected afterWorkerSetup (worker: Worker): void {
945 // Listen to worker messages.
946 this.registerWorkerMessageListener(worker, this.workerListener())
947 // Send startup message to worker.
d2c73f82
JB
948 this.sendWorkerStartupMessage(worker)
949 // Setup worker task statistics computation.
950 this.setWorkerStatistics(worker)
951 }
952
953 private sendWorkerStartupMessage (worker: Worker): void {
a2ed5053
JB
954 this.sendToWorker(worker, {
955 ready: false,
956 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
957 })
a2ed5053
JB
958 }
959
960 private redistributeQueuedTasks (workerNodeKey: number): void {
961 while (this.tasksQueueSize(workerNodeKey) > 0) {
962 let targetWorkerNodeKey: number = workerNodeKey
963 let minQueuedTasks = Infinity
964 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
965 const workerInfo = this.getWorkerInfo(workerNodeId)
966 if (
967 workerNodeId !== workerNodeKey &&
968 workerInfo.ready &&
969 workerNode.usage.tasks.queued === 0
970 ) {
971 targetWorkerNodeKey = workerNodeId
972 break
973 }
974 if (
975 workerNodeId !== workerNodeKey &&
976 workerInfo.ready &&
977 workerNode.usage.tasks.queued < minQueuedTasks
978 ) {
979 minQueuedTasks = workerNode.usage.tasks.queued
980 targetWorkerNodeKey = workerNodeId
981 }
982 }
983 this.enqueueTask(
984 targetWorkerNodeKey,
985 this.dequeueTask(workerNodeKey) as Task<Data>
986 )
987 }
988 }
989
be0676b3 990 /**
ff733df7 991 * This function is the listener registered for each worker message.
be0676b3 992 *
bdacc2d2 993 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
994 */
995 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 996 return message => {
21f710aa 997 this.checkMessageWorkerId(message)
d2c73f82 998 if (message.ready != null) {
10e2aa7e
JB
999 // Worker ready response received
1000 this.handleWorkerReadyResponse(message)
f59e1027 1001 } else if (message.id != null) {
a3445496 1002 // Task execution response received
6b272951
JB
1003 this.handleTaskExecutionResponse(message)
1004 }
1005 }
1006 }
1007
10e2aa7e 1008 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
21f710aa
JB
1009 const worker = this.getWorkerById(message.workerId)
1010 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1011 message.ready as boolean
2431bdb4
JB
1012 if (this.emitter != null && this.ready) {
1013 this.emitter.emit(PoolEvents.ready, this.info)
1014 }
6b272951
JB
1015 }
1016
1017 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1018 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1019 if (promiseResponse != null) {
1020 if (message.taskError != null) {
2a69b8c5 1021 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1022 promiseResponse.reject(message.taskError.message)
1023 } else {
1024 promiseResponse.resolve(message.data as Response)
1025 }
1026 this.afterTaskExecutionHook(promiseResponse.worker, message)
1027 this.promiseResponseMap.delete(message.id as string)
1028 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1029 if (
1030 this.opts.enableTasksQueue === true &&
1031 this.tasksQueueSize(workerNodeKey) > 0
1032 ) {
1033 this.executeTask(
1034 workerNodeKey,
1035 this.dequeueTask(workerNodeKey) as Task<Data>
1036 )
be0676b3 1037 }
6b272951 1038 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1039 }
be0676b3 1040 }
7c0ba920 1041
ff733df7 1042 private checkAndEmitEvents (): void {
1f68cede 1043 if (this.emitter != null) {
ff733df7 1044 if (this.busy) {
2845f2a5 1045 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1046 }
6b27d407 1047 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1048 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1049 }
164d950a
JB
1050 }
1051 }
1052
8a1260a3
JB
1053 /**
1054 * Gets the worker information.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 */
1058 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1059 return this.workerNodes[workerNodeKey].info
1060 }
1061
a05c10de 1062 /**
b0a4db63 1063 * Adds the given worker in the pool worker nodes.
ea7a90d3 1064 *
38e795c1 1065 * @param worker - The worker.
f06e48d8 1066 * @returns The worker nodes length.
ea7a90d3 1067 */
b0a4db63 1068 private addWorkerNode (worker: Worker): number {
cc3ab78b 1069 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1070 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1071 if (this.starting) {
1072 workerNode.info.ready = true
1073 }
cc3ab78b 1074 return this.workerNodes.push(workerNode)
ea7a90d3 1075 }
c923ce56 1076
51fe3d3c 1077 /**
f06e48d8 1078 * Removes the given worker from the pool worker nodes.
51fe3d3c 1079 *
f06e48d8 1080 * @param worker - The worker.
51fe3d3c 1081 */
416fd65c 1082 private removeWorkerNode (worker: Worker): void {
f06e48d8 1083 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1084 if (workerNodeKey !== -1) {
1085 this.workerNodes.splice(workerNodeKey, 1)
1086 this.workerChoiceStrategyContext.remove(workerNodeKey)
1087 }
51fe3d3c 1088 }
adc3c320 1089
b0a4db63
JB
1090 /**
1091 * Executes the given task on the given worker.
1092 *
1093 * @param worker - The worker.
1094 * @param task - The task to execute.
1095 */
2e81254d 1096 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1097 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1098 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1099 }
1100
f9f00b5f 1101 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1102 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1103 }
1104
416fd65c 1105 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1106 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1107 }
1108
416fd65c 1109 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1110 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1111 }
1112
416fd65c 1113 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1114 while (this.tasksQueueSize(workerNodeKey) > 0) {
1115 this.executeTask(
1116 workerNodeKey,
1117 this.dequeueTask(workerNodeKey) as Task<Data>
1118 )
ff733df7 1119 }
4b628b48 1120 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1121 }
1122
ef41a6e6
JB
1123 private flushTasksQueues (): void {
1124 for (const [workerNodeKey] of this.workerNodes.entries()) {
1125 this.flushTasksQueue(workerNodeKey)
1126 }
1127 }
b6b32453
JB
1128
1129 private setWorkerStatistics (worker: Worker): void {
1130 this.sendToWorker(worker, {
1131 statistics: {
87de9ff5
JB
1132 runTime:
1133 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1134 .runTime.aggregate,
87de9ff5 1135 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1136 .elu.aggregate
21f710aa
JB
1137 },
1138 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1139 })
1140 }
c97c7edb 1141}