chore: v2.6.19
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
a3445496 67 * The execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
c923ce56
JB
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
075e51d1 88 /**
adc9cc64 89 * Whether the pool is starting or not.
075e51d1
JB
90 */
91 private readonly starting: boolean
afe0d5bf
JB
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
729c563d
S
97 /**
98 * Constructs a new poolifier pool.
99 *
38e795c1 100 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 101 * @param filePath - Path to the worker file.
38e795c1 102 * @param opts - Options for the pool.
729c563d 103 */
c97c7edb 104 public constructor (
b4213b7f
JB
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
c97c7edb 108 ) {
78cea37e 109 if (!this.isMain()) {
c97c7edb
S
110 throw new Error('Cannot start a pool from a worker!')
111 }
8d3782fa 112 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 113 this.checkFilePath(this.filePath)
7c0ba920 114 this.checkPoolOptions(this.opts)
1086026a 115
7254e419
JB
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
b97d82d8 180 } else if (max === 0) {
079de991 181 throw new RangeError(
b97d82d8 182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
8d3782fa
JB
189 }
190 }
191
7c0ba920 192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 216 }
aee46736
JB
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 223 throw new Error(
aee46736 224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
225 )
226 }
7c0ba920
JB
227 }
228
0d80593b
JB
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
49be33fe
JB
237 if (
238 workerChoiceStrategyOptions.weights != null &&
6b27d407 239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
f0d7f803
JB
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
0d80593b
JB
255 }
256
a20f0ba5
JB
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
0d80593b
JB
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
f0d7f803
JB
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
a20f0ba5 275 throw new Error(
f0d7f803 276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
277 )
278 }
279 }
280
e761c033
JB
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
08f3f44c 293 /** @inheritDoc */
6b27d407
JB
294 public get info (): PoolInfo {
295 return {
23ccf9d7 296 version,
6b27d407 297 type: this.type,
184855e6 298 worker: this.worker,
2431bdb4
JB
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
301 minSize: this.minSize,
302 maxSize: this.maxSize,
c05f0d50
JB
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
1305e9a8
JB
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 workerNode.usage.tasks.executing === 0
a4e07f72
JB
311 ? accumulator + 1
312 : accumulator,
6b27d407
JB
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
318 0
319 ),
a4e07f72 320 executedTasks: this.workerNodes.reduce(
6b27d407 321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
df593701 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
b25a42cd 337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 338 0
a4e07f72
JB
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 accumulator + workerNode.usage.tasks.failed,
a4e07f72 343 0
1dcf8b7b
JB
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
98e72cda
JB
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
1dcf8b7b
JB
353 )
354 ),
98e72cda
JB
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
1dcf8b7b 360 )
98e72cda
JB
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
1dcf8b7b
JB
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
98e72cda
JB
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
1dcf8b7b
JB
394 )
395 ),
98e72cda
JB
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
1dcf8b7b 401 )
98e72cda
JB
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
1dcf8b7b
JB
425 }
426 })
6b27d407
JB
427 }
428 }
08f3f44c 429
2431bdb4
JB
430 private get ready (): boolean {
431 return (
b97d82d8
JB
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
2431bdb4
JB
439 )
440 }
441
afe0d5bf
JB
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
8e5ca040 448 const poolTimeCapacity =
fe7d90db 449 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
71514351 452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
71514351 457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
458 0
459 )
8e5ca040 460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
461 }
462
8881ae32
JB
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
184855e6
JB
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
c2ade475 475 /**
6b27d407 476 * Pool minimum size.
c2ade475 477 */
6b27d407 478 protected abstract get minSize (): number
ff733df7
JB
479
480 /**
6b27d407 481 * Pool maximum size.
ff733df7 482 */
6b27d407 483 protected abstract get maxSize (): number
a35560ba 484
f59e1027
JB
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
6b813701
JB
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
21f710aa
JB
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
ffcbbad8 513 /**
f06e48d8 514 * Gets the given worker its worker node key.
ffcbbad8
JB
515 *
516 * @param worker - The worker.
f59e1027 517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 518 */
3f09ed9f 519 protected getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
bf9549ae
JB
523 }
524
afc003b2 525 /** @inheritDoc */
a35560ba 526 public setWorkerChoiceStrategy (
59219cbb
JB
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 529 ): void {
aee46736 530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 531 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
8a1260a3 538 for (const workerNode of this.workerNodes) {
4b628b48 539 workerNode.resetUsage()
b6b32453 540 this.setWorkerStatistics(workerNode.worker)
59219cbb 541 }
a20f0ba5
JB
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
0d80593b 548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
a35560ba
S
552 )
553 }
554
a20f0ba5 555 /** @inheritDoc */
8f52842f
JB
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
a20f0ba5 560 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 561 this.flushTasksQueues()
a20f0ba5
JB
562 }
563 this.opts.enableTasksQueue = enable
8f52842f 564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
565 }
566
567 /** @inheritDoc */
8f52842f 568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 569 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 573 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
c319c66b
JB
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
dea903a8
JB
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
c2ade475 594
c319c66b
JB
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
7c0ba920 601
6c6afb84
JB
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
c2ade475 607 protected internalBusy (): boolean {
e0ae6100
JB
608 return (
609 this.workerNodes.findIndex(workerNode => {
f59e1027 610 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
611 }) === -1
612 )
cb70b19d
JB
613 }
614
afc003b2 615 /** @inheritDoc */
a86b6df1 616 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 617 const timestamp = performance.now()
20dcad1a 618 const workerNodeKey = this.chooseWorkerNode()
adc3c320 619 const submittedTask: Task<Data> = {
ff128cc9 620 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
621 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
622 data: data ?? ({} as Data),
b6b32453 623 timestamp,
21f710aa 624 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 625 id: randomUUID()
adc3c320 626 }
2e81254d 627 const res = new Promise<Response>((resolve, reject) => {
02706357 628 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
629 resolve,
630 reject,
20dcad1a 631 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
632 })
633 })
ff733df7
JB
634 if (
635 this.opts.enableTasksQueue === true &&
7171d33f 636 (this.busy ||
f59e1027 637 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 638 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 639 .concurrency as number))
ff733df7 640 ) {
26a929d7
JB
641 this.enqueueTask(workerNodeKey, submittedTask)
642 } else {
2e81254d 643 this.executeTask(workerNodeKey, submittedTask)
adc3c320 644 }
ff733df7 645 this.checkAndEmitEvents()
78cea37e 646 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
647 return res
648 }
c97c7edb 649
afc003b2 650 /** @inheritDoc */
c97c7edb 651 public async destroy (): Promise<void> {
1fbcaa7c 652 await Promise.all(
875a7c37
JB
653 this.workerNodes.map(async (workerNode, workerNodeKey) => {
654 this.flushTasksQueue(workerNodeKey)
47aacbaa 655 // FIXME: wait for tasks to be finished
920278a2
JB
656 const workerExitPromise = new Promise<void>(resolve => {
657 workerNode.worker.on('exit', () => {
658 resolve()
659 })
660 })
f06e48d8 661 await this.destroyWorker(workerNode.worker)
920278a2 662 await workerExitPromise
1fbcaa7c
JB
663 })
664 )
c97c7edb
S
665 }
666
4a6952ff 667 /**
6c6afb84 668 * Terminates the given worker.
4a6952ff 669 *
f06e48d8 670 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
671 */
672 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 673
729c563d 674 /**
6677a3d3
JB
675 * Setup hook to execute code before worker nodes are created in the abstract constructor.
676 * Can be overridden.
afc003b2
JB
677 *
678 * @virtual
729c563d 679 */
280c2a77 680 protected setupHook (): void {
d99ba5a8 681 // Intentionally empty
280c2a77 682 }
c97c7edb 683
729c563d 684 /**
280c2a77
S
685 * Should return whether the worker is the main worker or not.
686 */
687 protected abstract isMain (): boolean
688
689 /**
2e81254d 690 * Hook executed before the worker task execution.
bf9549ae 691 * Can be overridden.
729c563d 692 *
f06e48d8 693 * @param workerNodeKey - The worker node key.
1c6fe997 694 * @param task - The task to execute.
729c563d 695 */
1c6fe997
JB
696 protected beforeTaskExecutionHook (
697 workerNodeKey: number,
698 task: Task<Data>
699 ): void {
f59e1027 700 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
701 ++workerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 703 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
704 task.name as string
705 ) as WorkerUsage
eb8afc8a
JB
706 ++taskWorkerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
708 }
709
c01733f1 710 /**
2e81254d 711 * Hook executed after the worker task execution.
bf9549ae 712 * Can be overridden.
c01733f1 713 *
c923ce56 714 * @param worker - The worker.
38e795c1 715 * @param message - The received message.
c01733f1 716 */
2e81254d 717 protected afterTaskExecutionHook (
c923ce56 718 worker: Worker,
2740a743 719 message: MessageValue<Response>
bf9549ae 720 ): void {
ff128cc9
JB
721 const workerNodeKey = this.getWorkerNodeKey(worker)
722 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 728 ) as WorkerUsage
eb8afc8a
JB
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
a4e07f72
JB
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
98e72cda
JB
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
a4e07f72 743 ++workerTaskStatistics.failed
2740a743 744 }
f8eb0a2a
JB
745 }
746
a4e07f72
JB
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
f8eb0a2a
JB
749 message: MessageValue<Response>
750 ): void {
e4f20deb
JB
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
f8eb0a2a
JB
757 }
758
a4e07f72
JB
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
1c6fe997 761 task: Task<Data>
f8eb0a2a 762 ): void {
1c6fe997
JB
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
c01733f1 771 }
772
a4e07f72 773 private updateEluWorkerUsage (
5df69fab 774 workerUsage: WorkerUsage,
62c15a68
JB
775 message: MessageValue<Response>
776 ): void {
008512c7
JB
777 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
779 updateMeasurementStatistics(
780 workerUsage.elu.active,
008512c7 781 eluTaskStatisticsRequirements,
e4f20deb
JB
782 message.taskPerformance?.elu?.active ?? 0,
783 workerUsage.tasks.executed
784 )
785 updateMeasurementStatistics(
786 workerUsage.elu.idle,
008512c7 787 eluTaskStatisticsRequirements,
e4f20deb
JB
788 message.taskPerformance?.elu?.idle ?? 0,
789 workerUsage.tasks.executed
790 )
008512c7 791 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 792 if (message.taskPerformance?.elu != null) {
f7510105
JB
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
62c15a68
JB
801 }
802 }
803 }
804
280c2a77 805 /**
f06e48d8 806 * Chooses a worker node for the next task.
280c2a77 807 *
6c6afb84 808 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 809 *
20dcad1a 810 * @returns The worker node key
280c2a77 811 */
6c6afb84 812 private chooseWorkerNode (): number {
930dcf12 813 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
814 const worker = this.createAndSetupDynamicWorker()
815 if (
816 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
817 ) {
818 return this.getWorkerNodeKey(worker)
819 }
17393ac8 820 }
930dcf12
JB
821 return this.workerChoiceStrategyContext.execute()
822 }
823
6c6afb84
JB
824 /**
825 * Conditions for dynamic worker creation.
826 *
827 * @returns Whether to create a dynamic worker or not.
828 */
829 private shallCreateDynamicWorker (): boolean {
930dcf12 830 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
831 }
832
280c2a77 833 /**
675bb809 834 * Sends a message to the given worker.
280c2a77 835 *
38e795c1
JB
836 * @param worker - The worker which should receive the message.
837 * @param message - The message.
280c2a77
S
838 */
839 protected abstract sendToWorker (
840 worker: Worker,
841 message: MessageValue<Data>
842 ): void
843
729c563d 844 /**
41344292 845 * Creates a new worker.
6c6afb84
JB
846 *
847 * @returns Newly created worker.
729c563d 848 */
280c2a77 849 protected abstract createWorker (): Worker
c97c7edb 850
4a6952ff 851 /**
f06e48d8 852 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
853 *
854 * @returns New, completely set up worker.
855 */
856 protected createAndSetupWorker (): Worker {
bdacc2d2 857 const worker = this.createWorker()
280c2a77 858
35cf1c03 859 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 860 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 861 worker.on('error', error => {
9b106837
JB
862 const workerNodeKey = this.getWorkerNodeKey(worker)
863 const workerInfo = this.getWorkerInfo(workerNodeKey)
864 workerInfo.ready = false
0dc838e3 865 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 866 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 867 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 868 if (workerInfo.dynamic) {
8a1260a3
JB
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
5baee0d7 873 }
19dbc45b 874 if (this.opts.enableTasksQueue === true) {
9b106837 875 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 876 }
5baee0d7 877 })
a35560ba
S
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 880 worker.once('exit', () => {
f06e48d8 881 this.removeWorkerNode(worker)
a974afa6 882 })
280c2a77 883
b0a4db63 884 this.addWorkerNode(worker)
280c2a77
S
885
886 this.afterWorkerSetup(worker)
887
c97c7edb
S
888 return worker
889 }
be0676b3 890
930dcf12
JB
891 /**
892 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
893 *
894 * @returns New, completely set up dynamic worker.
895 */
896 protected createAndSetupDynamicWorker (): Worker {
897 const worker = this.createAndSetupWorker()
898 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 899 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
f59e1027 904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 905 (this.opts.enableTasksQueue === true &&
f59e1027 906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 907 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
e221309a 913 const workerInfo = this.getWorkerInfoByWorker(worker)
b0a4db63 914 workerInfo.dynamic = true
b97d82d8
JB
915 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
916 workerInfo.ready = true
917 }
21f710aa 918 this.sendToWorker(worker, {
b0a4db63 919 checkActive: true,
21f710aa
JB
920 workerId: workerInfo.id as number
921 })
930dcf12
JB
922 return worker
923 }
924
a2ed5053
JB
925 /**
926 * Registers a listener callback on the given worker.
927 *
928 * @param worker - The worker which should register a listener.
929 * @param listener - The message listener callback.
930 */
85aeb3f3
JB
931 protected abstract registerWorkerMessageListener<
932 Message extends Data | Response
933 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
a2ed5053
JB
934
935 /**
936 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
937 * Can be overridden.
938 *
939 * @param worker - The newly created worker.
940 */
941 protected afterWorkerSetup (worker: Worker): void {
942 // Listen to worker messages.
943 this.registerWorkerMessageListener(worker, this.workerListener())
85aeb3f3
JB
944 // Send the startup message to worker.
945 this.sendStartupMessageToWorker(worker)
d2c73f82
JB
946 // Setup worker task statistics computation.
947 this.setWorkerStatistics(worker)
948 }
949
85aeb3f3
JB
950 /**
951 * Sends the startup message to the given worker.
952 *
953 * @param worker - The worker which should receive the startup message.
954 */
955 protected abstract sendStartupMessageToWorker (worker: Worker): void
a2ed5053
JB
956
957 private redistributeQueuedTasks (workerNodeKey: number): void {
958 while (this.tasksQueueSize(workerNodeKey) > 0) {
959 let targetWorkerNodeKey: number = workerNodeKey
960 let minQueuedTasks = Infinity
961 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
962 const workerInfo = this.getWorkerInfo(workerNodeId)
963 if (
964 workerNodeId !== workerNodeKey &&
965 workerInfo.ready &&
966 workerNode.usage.tasks.queued === 0
967 ) {
968 targetWorkerNodeKey = workerNodeId
969 break
970 }
971 if (
972 workerNodeId !== workerNodeKey &&
973 workerInfo.ready &&
974 workerNode.usage.tasks.queued < minQueuedTasks
975 ) {
976 minQueuedTasks = workerNode.usage.tasks.queued
977 targetWorkerNodeKey = workerNodeId
978 }
979 }
980 this.enqueueTask(
981 targetWorkerNodeKey,
982 this.dequeueTask(workerNodeKey) as Task<Data>
983 )
984 }
985 }
986
be0676b3 987 /**
ff733df7 988 * This function is the listener registered for each worker message.
be0676b3 989 *
bdacc2d2 990 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
991 */
992 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 993 return message => {
21f710aa 994 this.checkMessageWorkerId(message)
d2c73f82 995 if (message.ready != null) {
10e2aa7e
JB
996 // Worker ready response received
997 this.handleWorkerReadyResponse(message)
f59e1027 998 } else if (message.id != null) {
a3445496 999 // Task execution response received
6b272951
JB
1000 this.handleTaskExecutionResponse(message)
1001 }
1002 }
1003 }
1004
10e2aa7e 1005 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
e221309a
JB
1006 this.getWorkerInfoByWorker(
1007 this.getWorkerById(message.workerId) as Worker
1008 ).ready = message.ready as boolean
2431bdb4
JB
1009 if (this.emitter != null && this.ready) {
1010 this.emitter.emit(PoolEvents.ready, this.info)
1011 }
6b272951
JB
1012 }
1013
1014 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1015 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1016 if (promiseResponse != null) {
1017 if (message.taskError != null) {
2a69b8c5 1018 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1019 promiseResponse.reject(message.taskError.message)
1020 } else {
1021 promiseResponse.resolve(message.data as Response)
1022 }
1023 this.afterTaskExecutionHook(promiseResponse.worker, message)
1024 this.promiseResponseMap.delete(message.id as string)
1025 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1026 if (
1027 this.opts.enableTasksQueue === true &&
1028 this.tasksQueueSize(workerNodeKey) > 0
1029 ) {
1030 this.executeTask(
1031 workerNodeKey,
1032 this.dequeueTask(workerNodeKey) as Task<Data>
1033 )
be0676b3 1034 }
6b272951 1035 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1036 }
be0676b3 1037 }
7c0ba920 1038
ff733df7 1039 private checkAndEmitEvents (): void {
1f68cede 1040 if (this.emitter != null) {
ff733df7 1041 if (this.busy) {
2845f2a5 1042 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1043 }
6b27d407 1044 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1045 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1046 }
164d950a
JB
1047 }
1048 }
1049
8a1260a3 1050 /**
e221309a 1051 * Gets the worker information from the given worker node key.
8a1260a3
JB
1052 *
1053 * @param workerNodeKey - The worker node key.
3f09ed9f 1054 * @returns The worker information.
8a1260a3
JB
1055 */
1056 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1057 return this.workerNodes[workerNodeKey].info
1058 }
1059
e221309a
JB
1060 /**
1061 * Gets the worker information from the given worker.
1062 *
1063 * @param worker - The worker.
3f09ed9f
JB
1064 * @returns The worker information.
1065 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
e221309a 1066 */
85aeb3f3 1067 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
dc02fc29
JB
1068 const workerNodeKey = this.getWorkerNodeKey(worker)
1069 if (workerNodeKey === -1) {
1070 throw new Error('Worker not found')
1071 }
1072 return this.workerNodes[workerNodeKey].info
e221309a
JB
1073 }
1074
a05c10de 1075 /**
b0a4db63 1076 * Adds the given worker in the pool worker nodes.
ea7a90d3 1077 *
38e795c1 1078 * @param worker - The worker.
f06e48d8 1079 * @returns The worker nodes length.
ea7a90d3 1080 */
b0a4db63 1081 private addWorkerNode (worker: Worker): number {
cc3ab78b 1082 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1083 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1084 if (this.starting) {
1085 workerNode.info.ready = true
1086 }
cc3ab78b 1087 return this.workerNodes.push(workerNode)
ea7a90d3 1088 }
c923ce56 1089
51fe3d3c 1090 /**
f06e48d8 1091 * Removes the given worker from the pool worker nodes.
51fe3d3c 1092 *
f06e48d8 1093 * @param worker - The worker.
51fe3d3c 1094 */
416fd65c 1095 private removeWorkerNode (worker: Worker): void {
f06e48d8 1096 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1097 if (workerNodeKey !== -1) {
1098 this.workerNodes.splice(workerNodeKey, 1)
1099 this.workerChoiceStrategyContext.remove(workerNodeKey)
1100 }
51fe3d3c 1101 }
adc3c320 1102
b0a4db63
JB
1103 /**
1104 * Executes the given task on the given worker.
1105 *
1106 * @param worker - The worker.
1107 * @param task - The task to execute.
1108 */
2e81254d 1109 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1110 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1111 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1112 }
1113
f9f00b5f 1114 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1115 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1116 }
1117
416fd65c 1118 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1119 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1120 }
1121
416fd65c 1122 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1123 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1124 }
1125
416fd65c 1126 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1127 while (this.tasksQueueSize(workerNodeKey) > 0) {
1128 this.executeTask(
1129 workerNodeKey,
1130 this.dequeueTask(workerNodeKey) as Task<Data>
1131 )
ff733df7 1132 }
4b628b48 1133 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1134 }
1135
ef41a6e6
JB
1136 private flushTasksQueues (): void {
1137 for (const [workerNodeKey] of this.workerNodes.entries()) {
1138 this.flushTasksQueue(workerNodeKey)
1139 }
1140 }
b6b32453
JB
1141
1142 private setWorkerStatistics (worker: Worker): void {
1143 this.sendToWorker(worker, {
1144 statistics: {
87de9ff5
JB
1145 runTime:
1146 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1147 .runTime.aggregate,
87de9ff5 1148 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1149 .elu.aggregate
21f710aa 1150 },
e221309a 1151 workerId: this.getWorkerInfoByWorker(worker).id as number
b6b32453
JB
1152 })
1153 }
c97c7edb 1154}