refactor: factor out measurement statistics update
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
e102732c 33 MessageHandler,
8a1260a3 34 WorkerInfo,
4b628b48 35 WorkerType,
e102732c
JB
36 WorkerUsage
37} from './worker'
a35560ba 38import {
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
a3445496 67 * The execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
c923ce56
JB
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
075e51d1 88 /**
adc9cc64 89 * Whether the pool is starting or not.
075e51d1
JB
90 */
91 private readonly starting: boolean
afe0d5bf
JB
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
729c563d
S
97 /**
98 * Constructs a new poolifier pool.
99 *
38e795c1 100 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 101 * @param filePath - Path to the worker file.
38e795c1 102 * @param opts - Options for the pool.
729c563d 103 */
c97c7edb 104 public constructor (
b4213b7f
JB
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
c97c7edb 108 ) {
78cea37e 109 if (!this.isMain()) {
c97c7edb
S
110 throw new Error('Cannot start a pool from a worker!')
111 }
8d3782fa 112 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 113 this.checkFilePath(this.filePath)
7c0ba920 114 this.checkPoolOptions(this.opts)
1086026a 115
7254e419
JB
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
b97d82d8 180 } else if (max === 0) {
079de991 181 throw new RangeError(
b97d82d8 182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
8d3782fa
JB
189 }
190 }
191
7c0ba920 192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 216 }
aee46736
JB
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 223 throw new Error(
aee46736 224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
225 )
226 }
7c0ba920
JB
227 }
228
0d80593b
JB
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
49be33fe
JB
237 if (
238 workerChoiceStrategyOptions.weights != null &&
6b27d407 239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
f0d7f803
JB
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
0d80593b
JB
255 }
256
a20f0ba5
JB
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
0d80593b
JB
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
f0d7f803
JB
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
a20f0ba5 275 throw new Error(
f0d7f803 276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
277 )
278 }
279 }
280
e761c033
JB
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
08f3f44c 293 /** @inheritDoc */
6b27d407
JB
294 public get info (): PoolInfo {
295 return {
23ccf9d7 296 version,
6b27d407 297 type: this.type,
184855e6 298 worker: this.worker,
2431bdb4
JB
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
301 minSize: this.minSize,
302 maxSize: this.maxSize,
c05f0d50
JB
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
1305e9a8
JB
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 workerNode.usage.tasks.executing === 0
a4e07f72
JB
311 ? accumulator + 1
312 : accumulator,
6b27d407
JB
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
318 0
319 ),
a4e07f72 320 executedTasks: this.workerNodes.reduce(
6b27d407 321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
df593701 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
b25a42cd 337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 338 0
a4e07f72
JB
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 accumulator + workerNode.usage.tasks.failed,
a4e07f72 343 0
1dcf8b7b
JB
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
98e72cda
JB
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
1dcf8b7b
JB
353 )
354 ),
98e72cda
JB
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
1dcf8b7b 360 )
98e72cda
JB
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
1dcf8b7b
JB
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
98e72cda
JB
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
1dcf8b7b
JB
394 )
395 ),
98e72cda
JB
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
1dcf8b7b 401 )
98e72cda
JB
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
1dcf8b7b
JB
425 }
426 })
6b27d407
JB
427 }
428 }
08f3f44c 429
2431bdb4
JB
430 private get ready (): boolean {
431 return (
b97d82d8
JB
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
2431bdb4
JB
439 )
440 }
441
afe0d5bf
JB
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
8e5ca040 448 const poolTimeCapacity =
fe7d90db 449 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
71514351 452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
71514351 457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
458 0
459 )
8e5ca040 460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
461 }
462
8881ae32
JB
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
184855e6
JB
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
c2ade475 475 /**
6b27d407 476 * Pool minimum size.
c2ade475 477 */
6b27d407 478 protected abstract get minSize (): number
ff733df7
JB
479
480 /**
6b27d407 481 * Pool maximum size.
ff733df7 482 */
6b27d407 483 protected abstract get maxSize (): number
a35560ba 484
f59e1027
JB
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
6b813701
JB
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
21f710aa
JB
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
ffcbbad8 513 /**
f06e48d8 514 * Gets the given worker its worker node key.
ffcbbad8
JB
515 *
516 * @param worker - The worker.
f59e1027 517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 518 */
f06e48d8
JB
519 private getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
bf9549ae
JB
523 }
524
afc003b2 525 /** @inheritDoc */
a35560ba 526 public setWorkerChoiceStrategy (
59219cbb
JB
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 529 ): void {
aee46736 530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 531 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
8a1260a3 538 for (const workerNode of this.workerNodes) {
4b628b48 539 workerNode.resetUsage()
b6b32453 540 this.setWorkerStatistics(workerNode.worker)
59219cbb 541 }
a20f0ba5
JB
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
0d80593b 548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
a35560ba
S
552 )
553 }
554
a20f0ba5 555 /** @inheritDoc */
8f52842f
JB
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
a20f0ba5 560 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 561 this.flushTasksQueues()
a20f0ba5
JB
562 }
563 this.opts.enableTasksQueue = enable
8f52842f 564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
565 }
566
567 /** @inheritDoc */
8f52842f 568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 569 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 573 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
c319c66b
JB
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
dea903a8
JB
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
c2ade475 594
c319c66b
JB
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
7c0ba920 601
6c6afb84
JB
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
c2ade475 607 protected internalBusy (): boolean {
e0ae6100
JB
608 return (
609 this.workerNodes.findIndex(workerNode => {
f59e1027 610 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
611 }) === -1
612 )
cb70b19d
JB
613 }
614
afc003b2 615 /** @inheritDoc */
a86b6df1 616 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 617 const timestamp = performance.now()
20dcad1a 618 const workerNodeKey = this.chooseWorkerNode()
adc3c320 619 const submittedTask: Task<Data> = {
ff128cc9 620 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
621 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
622 data: data ?? ({} as Data),
b6b32453 623 timestamp,
21f710aa 624 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 625 id: randomUUID()
adc3c320 626 }
2e81254d 627 const res = new Promise<Response>((resolve, reject) => {
02706357 628 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
629 resolve,
630 reject,
20dcad1a 631 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
632 })
633 })
ff733df7
JB
634 if (
635 this.opts.enableTasksQueue === true &&
7171d33f 636 (this.busy ||
f59e1027 637 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 638 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 639 .concurrency as number))
ff733df7 640 ) {
26a929d7
JB
641 this.enqueueTask(workerNodeKey, submittedTask)
642 } else {
2e81254d 643 this.executeTask(workerNodeKey, submittedTask)
adc3c320 644 }
ff733df7 645 this.checkAndEmitEvents()
78cea37e 646 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
647 return res
648 }
c97c7edb 649
afc003b2 650 /** @inheritDoc */
c97c7edb 651 public async destroy (): Promise<void> {
1fbcaa7c 652 await Promise.all(
875a7c37
JB
653 this.workerNodes.map(async (workerNode, workerNodeKey) => {
654 this.flushTasksQueue(workerNodeKey)
47aacbaa 655 // FIXME: wait for tasks to be finished
920278a2
JB
656 const workerExitPromise = new Promise<void>(resolve => {
657 workerNode.worker.on('exit', () => {
658 resolve()
659 })
660 })
f06e48d8 661 await this.destroyWorker(workerNode.worker)
920278a2 662 await workerExitPromise
1fbcaa7c
JB
663 })
664 )
c97c7edb
S
665 }
666
4a6952ff 667 /**
6c6afb84 668 * Terminates the given worker.
4a6952ff 669 *
f06e48d8 670 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
671 */
672 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 673
729c563d 674 /**
6677a3d3
JB
675 * Setup hook to execute code before worker nodes are created in the abstract constructor.
676 * Can be overridden.
afc003b2
JB
677 *
678 * @virtual
729c563d 679 */
280c2a77 680 protected setupHook (): void {
d99ba5a8 681 // Intentionally empty
280c2a77 682 }
c97c7edb 683
729c563d 684 /**
280c2a77
S
685 * Should return whether the worker is the main worker or not.
686 */
687 protected abstract isMain (): boolean
688
689 /**
2e81254d 690 * Hook executed before the worker task execution.
bf9549ae 691 * Can be overridden.
729c563d 692 *
f06e48d8 693 * @param workerNodeKey - The worker node key.
1c6fe997 694 * @param task - The task to execute.
729c563d 695 */
1c6fe997
JB
696 protected beforeTaskExecutionHook (
697 workerNodeKey: number,
698 task: Task<Data>
699 ): void {
f59e1027 700 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
701 ++workerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 703 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
704 task.name as string
705 ) as WorkerUsage
eb8afc8a
JB
706 ++taskWorkerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
708 }
709
c01733f1 710 /**
2e81254d 711 * Hook executed after the worker task execution.
bf9549ae 712 * Can be overridden.
c01733f1 713 *
c923ce56 714 * @param worker - The worker.
38e795c1 715 * @param message - The received message.
c01733f1 716 */
2e81254d 717 protected afterTaskExecutionHook (
c923ce56 718 worker: Worker,
2740a743 719 message: MessageValue<Response>
bf9549ae 720 ): void {
ff128cc9
JB
721 const workerNodeKey = this.getWorkerNodeKey(worker)
722 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 728 ) as WorkerUsage
eb8afc8a
JB
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
a4e07f72
JB
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
98e72cda
JB
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
a4e07f72 743 ++workerTaskStatistics.failed
2740a743 744 }
f8eb0a2a
JB
745 }
746
a4e07f72
JB
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
f8eb0a2a
JB
749 message: MessageValue<Response>
750 ): void {
e4f20deb
JB
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
f8eb0a2a
JB
757 }
758
a4e07f72
JB
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
1c6fe997 761 task: Task<Data>
f8eb0a2a 762 ): void {
1c6fe997
JB
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
c01733f1 771 }
772
a4e07f72 773 private updateEluWorkerUsage (
5df69fab 774 workerUsage: WorkerUsage,
62c15a68
JB
775 message: MessageValue<Response>
776 ): void {
e4f20deb
JB
777 updateMeasurementStatistics(
778 workerUsage.elu.active,
779 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu,
780 message.taskPerformance?.elu?.active ?? 0,
781 workerUsage.tasks.executed
782 )
783 updateMeasurementStatistics(
784 workerUsage.elu.idle,
785 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu,
786 message.taskPerformance?.elu?.idle ?? 0,
787 workerUsage.tasks.executed
788 )
5df69fab
JB
789 if (
790 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
791 .aggregate
792 ) {
f7510105 793 if (message.taskPerformance?.elu != null) {
f7510105
JB
794 if (workerUsage.elu.utilization != null) {
795 workerUsage.elu.utilization =
796 (workerUsage.elu.utilization +
797 message.taskPerformance.elu.utilization) /
798 2
799 } else {
800 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
801 }
62c15a68
JB
802 }
803 }
804 }
805
280c2a77 806 /**
f06e48d8 807 * Chooses a worker node for the next task.
280c2a77 808 *
6c6afb84 809 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 810 *
20dcad1a 811 * @returns The worker node key
280c2a77 812 */
6c6afb84 813 private chooseWorkerNode (): number {
930dcf12 814 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
815 const worker = this.createAndSetupDynamicWorker()
816 if (
817 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
818 ) {
819 return this.getWorkerNodeKey(worker)
820 }
17393ac8 821 }
930dcf12
JB
822 return this.workerChoiceStrategyContext.execute()
823 }
824
6c6afb84
JB
825 /**
826 * Conditions for dynamic worker creation.
827 *
828 * @returns Whether to create a dynamic worker or not.
829 */
830 private shallCreateDynamicWorker (): boolean {
930dcf12 831 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
832 }
833
280c2a77 834 /**
675bb809 835 * Sends a message to the given worker.
280c2a77 836 *
38e795c1
JB
837 * @param worker - The worker which should receive the message.
838 * @param message - The message.
280c2a77
S
839 */
840 protected abstract sendToWorker (
841 worker: Worker,
842 message: MessageValue<Data>
843 ): void
844
729c563d 845 /**
41344292 846 * Creates a new worker.
6c6afb84
JB
847 *
848 * @returns Newly created worker.
729c563d 849 */
280c2a77 850 protected abstract createWorker (): Worker
c97c7edb 851
4a6952ff 852 /**
f06e48d8 853 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
854 *
855 * @returns New, completely set up worker.
856 */
857 protected createAndSetupWorker (): Worker {
bdacc2d2 858 const worker = this.createWorker()
280c2a77 859
35cf1c03 860 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 861 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 862 worker.on('error', error => {
9b106837
JB
863 const workerNodeKey = this.getWorkerNodeKey(worker)
864 const workerInfo = this.getWorkerInfo(workerNodeKey)
865 workerInfo.ready = false
2a69b8c5 866 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 867 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 868 if (workerInfo.dynamic) {
8a1260a3
JB
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
5baee0d7 873 }
19dbc45b 874 if (this.opts.enableTasksQueue === true) {
9b106837 875 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 876 }
5baee0d7 877 })
a35560ba
S
878 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
879 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 880 worker.once('exit', () => {
f06e48d8 881 this.removeWorkerNode(worker)
a974afa6 882 })
280c2a77 883
b0a4db63 884 this.addWorkerNode(worker)
280c2a77
S
885
886 this.afterWorkerSetup(worker)
887
c97c7edb
S
888 return worker
889 }
be0676b3 890
930dcf12
JB
891 /**
892 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
893 *
894 * @returns New, completely set up dynamic worker.
895 */
896 protected createAndSetupDynamicWorker (): Worker {
897 const worker = this.createAndSetupWorker()
898 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 899 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
f59e1027 904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 905 (this.opts.enableTasksQueue === true &&
f59e1027 906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 907 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
21f710aa 913 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
b0a4db63 914 workerInfo.dynamic = true
b97d82d8
JB
915 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
916 workerInfo.ready = true
917 }
21f710aa 918 this.sendToWorker(worker, {
b0a4db63 919 checkActive: true,
21f710aa
JB
920 workerId: workerInfo.id as number
921 })
930dcf12
JB
922 return worker
923 }
924
a2ed5053
JB
925 /**
926 * Registers a listener callback on the given worker.
927 *
928 * @param worker - The worker which should register a listener.
929 * @param listener - The message listener callback.
930 */
931 private registerWorkerMessageListener<Message extends Data | Response>(
932 worker: Worker,
933 listener: (message: MessageValue<Message>) => void
934 ): void {
935 worker.on('message', listener as MessageHandler<Worker>)
936 }
937
938 /**
939 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
940 * Can be overridden.
941 *
942 * @param worker - The newly created worker.
943 */
944 protected afterWorkerSetup (worker: Worker): void {
945 // Listen to worker messages.
946 this.registerWorkerMessageListener(worker, this.workerListener())
947 // Send startup message to worker.
d2c73f82
JB
948 this.sendWorkerStartupMessage(worker)
949 // Setup worker task statistics computation.
950 this.setWorkerStatistics(worker)
951 }
952
953 private sendWorkerStartupMessage (worker: Worker): void {
a2ed5053
JB
954 this.sendToWorker(worker, {
955 ready: false,
956 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
957 })
a2ed5053
JB
958 }
959
960 private redistributeQueuedTasks (workerNodeKey: number): void {
961 while (this.tasksQueueSize(workerNodeKey) > 0) {
962 let targetWorkerNodeKey: number = workerNodeKey
963 let minQueuedTasks = Infinity
964 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
965 const workerInfo = this.getWorkerInfo(workerNodeId)
966 if (
967 workerNodeId !== workerNodeKey &&
968 workerInfo.ready &&
969 workerNode.usage.tasks.queued === 0
970 ) {
971 targetWorkerNodeKey = workerNodeId
972 break
973 }
974 if (
975 workerNodeId !== workerNodeKey &&
976 workerInfo.ready &&
977 workerNode.usage.tasks.queued < minQueuedTasks
978 ) {
979 minQueuedTasks = workerNode.usage.tasks.queued
980 targetWorkerNodeKey = workerNodeId
981 }
982 }
983 this.enqueueTask(
984 targetWorkerNodeKey,
985 this.dequeueTask(workerNodeKey) as Task<Data>
986 )
987 }
988 }
989
be0676b3 990 /**
ff733df7 991 * This function is the listener registered for each worker message.
be0676b3 992 *
bdacc2d2 993 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
994 */
995 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 996 return message => {
21f710aa 997 this.checkMessageWorkerId(message)
d2c73f82 998 if (message.ready != null) {
10e2aa7e
JB
999 // Worker ready response received
1000 this.handleWorkerReadyResponse(message)
f59e1027 1001 } else if (message.id != null) {
a3445496 1002 // Task execution response received
6b272951
JB
1003 this.handleTaskExecutionResponse(message)
1004 }
1005 }
1006 }
1007
10e2aa7e 1008 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
21f710aa
JB
1009 const worker = this.getWorkerById(message.workerId)
1010 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1011 message.ready as boolean
2431bdb4
JB
1012 if (this.emitter != null && this.ready) {
1013 this.emitter.emit(PoolEvents.ready, this.info)
1014 }
6b272951
JB
1015 }
1016
1017 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1018 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1019 if (promiseResponse != null) {
1020 if (message.taskError != null) {
2a69b8c5 1021 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1022 promiseResponse.reject(message.taskError.message)
1023 } else {
1024 promiseResponse.resolve(message.data as Response)
1025 }
1026 this.afterTaskExecutionHook(promiseResponse.worker, message)
1027 this.promiseResponseMap.delete(message.id as string)
1028 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1029 if (
1030 this.opts.enableTasksQueue === true &&
1031 this.tasksQueueSize(workerNodeKey) > 0
1032 ) {
1033 this.executeTask(
1034 workerNodeKey,
1035 this.dequeueTask(workerNodeKey) as Task<Data>
1036 )
be0676b3 1037 }
6b272951 1038 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1039 }
be0676b3 1040 }
7c0ba920 1041
ff733df7 1042 private checkAndEmitEvents (): void {
1f68cede 1043 if (this.emitter != null) {
ff733df7 1044 if (this.busy) {
2845f2a5 1045 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1046 }
6b27d407 1047 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1048 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1049 }
164d950a
JB
1050 }
1051 }
1052
8a1260a3
JB
1053 /**
1054 * Gets the worker information.
1055 *
1056 * @param workerNodeKey - The worker node key.
1057 */
1058 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1059 return this.workerNodes[workerNodeKey].info
1060 }
1061
a05c10de 1062 /**
b0a4db63 1063 * Adds the given worker in the pool worker nodes.
ea7a90d3 1064 *
38e795c1 1065 * @param worker - The worker.
f06e48d8 1066 * @returns The worker nodes length.
ea7a90d3 1067 */
b0a4db63 1068 private addWorkerNode (worker: Worker): number {
cc3ab78b 1069 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1070 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1071 if (this.starting) {
1072 workerNode.info.ready = true
1073 }
cc3ab78b 1074 return this.workerNodes.push(workerNode)
ea7a90d3 1075 }
c923ce56 1076
51fe3d3c 1077 /**
f06e48d8 1078 * Removes the given worker from the pool worker nodes.
51fe3d3c 1079 *
f06e48d8 1080 * @param worker - The worker.
51fe3d3c 1081 */
416fd65c 1082 private removeWorkerNode (worker: Worker): void {
f06e48d8 1083 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1084 if (workerNodeKey !== -1) {
1085 this.workerNodes.splice(workerNodeKey, 1)
1086 this.workerChoiceStrategyContext.remove(workerNodeKey)
1087 }
51fe3d3c 1088 }
adc3c320 1089
b0a4db63
JB
1090 /**
1091 * Executes the given task on the given worker.
1092 *
1093 * @param worker - The worker.
1094 * @param task - The task to execute.
1095 */
2e81254d 1096 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1097 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1098 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1099 }
1100
f9f00b5f 1101 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1102 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1103 }
1104
416fd65c 1105 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1106 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1107 }
1108
416fd65c 1109 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1110 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1111 }
1112
416fd65c 1113 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1114 while (this.tasksQueueSize(workerNodeKey) > 0) {
1115 this.executeTask(
1116 workerNodeKey,
1117 this.dequeueTask(workerNodeKey) as Task<Data>
1118 )
ff733df7 1119 }
4b628b48 1120 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1121 }
1122
ef41a6e6
JB
1123 private flushTasksQueues (): void {
1124 for (const [workerNodeKey] of this.workerNodes.entries()) {
1125 this.flushTasksQueue(workerNodeKey)
1126 }
1127 }
b6b32453
JB
1128
1129 private setWorkerStatistics (worker: Worker): void {
1130 this.sendToWorker(worker, {
1131 statistics: {
87de9ff5
JB
1132 runTime:
1133 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1134 .runTime.aggregate,
87de9ff5 1135 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1136 .elu.aggregate
21f710aa
JB
1137 },
1138 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1139 })
1140 }
c97c7edb 1141}