refactor: add sanity check at getting worker info
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
a3445496 67 * The execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
c923ce56
JB
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
075e51d1 88 /**
adc9cc64 89 * Whether the pool is starting or not.
075e51d1
JB
90 */
91 private readonly starting: boolean
afe0d5bf
JB
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
729c563d
S
97 /**
98 * Constructs a new poolifier pool.
99 *
38e795c1 100 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 101 * @param filePath - Path to the worker file.
38e795c1 102 * @param opts - Options for the pool.
729c563d 103 */
c97c7edb 104 public constructor (
b4213b7f
JB
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
c97c7edb 108 ) {
78cea37e 109 if (!this.isMain()) {
c97c7edb
S
110 throw new Error('Cannot start a pool from a worker!')
111 }
8d3782fa 112 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 113 this.checkFilePath(this.filePath)
7c0ba920 114 this.checkPoolOptions(this.opts)
1086026a 115
7254e419
JB
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
b97d82d8 180 } else if (max === 0) {
079de991 181 throw new RangeError(
b97d82d8 182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
8d3782fa
JB
189 }
190 }
191
7c0ba920 192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 216 }
aee46736
JB
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 223 throw new Error(
aee46736 224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
225 )
226 }
7c0ba920
JB
227 }
228
0d80593b
JB
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
49be33fe
JB
237 if (
238 workerChoiceStrategyOptions.weights != null &&
6b27d407 239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
f0d7f803
JB
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
0d80593b
JB
255 }
256
a20f0ba5
JB
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
0d80593b
JB
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
f0d7f803
JB
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
a20f0ba5 275 throw new Error(
f0d7f803 276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
277 )
278 }
279 }
280
e761c033
JB
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
08f3f44c 293 /** @inheritDoc */
6b27d407
JB
294 public get info (): PoolInfo {
295 return {
23ccf9d7 296 version,
6b27d407 297 type: this.type,
184855e6 298 worker: this.worker,
2431bdb4
JB
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
301 minSize: this.minSize,
302 maxSize: this.maxSize,
c05f0d50
JB
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
1305e9a8
JB
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 workerNode.usage.tasks.executing === 0
a4e07f72
JB
311 ? accumulator + 1
312 : accumulator,
6b27d407
JB
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
318 0
319 ),
a4e07f72 320 executedTasks: this.workerNodes.reduce(
6b27d407 321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
df593701 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
b25a42cd 337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 338 0
a4e07f72
JB
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 accumulator + workerNode.usage.tasks.failed,
a4e07f72 343 0
1dcf8b7b
JB
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
98e72cda
JB
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
1dcf8b7b
JB
353 )
354 ),
98e72cda
JB
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
1dcf8b7b 360 )
98e72cda
JB
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
1dcf8b7b
JB
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
98e72cda
JB
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
1dcf8b7b
JB
394 )
395 ),
98e72cda
JB
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
1dcf8b7b 401 )
98e72cda
JB
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
1dcf8b7b
JB
425 }
426 })
6b27d407
JB
427 }
428 }
08f3f44c 429
2431bdb4
JB
430 private get ready (): boolean {
431 return (
b97d82d8
JB
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
2431bdb4
JB
439 )
440 }
441
afe0d5bf
JB
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
8e5ca040 448 const poolTimeCapacity =
fe7d90db 449 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
71514351 452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
71514351 457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
458 0
459 )
8e5ca040 460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
461 }
462
8881ae32
JB
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
184855e6
JB
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
c2ade475 475 /**
6b27d407 476 * Pool minimum size.
c2ade475 477 */
6b27d407 478 protected abstract get minSize (): number
ff733df7
JB
479
480 /**
6b27d407 481 * Pool maximum size.
ff733df7 482 */
6b27d407 483 protected abstract get maxSize (): number
a35560ba 484
f59e1027
JB
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
6b813701
JB
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
21f710aa
JB
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
ffcbbad8 513 /**
f06e48d8 514 * Gets the given worker its worker node key.
ffcbbad8
JB
515 *
516 * @param worker - The worker.
f59e1027 517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 518 */
f06e48d8
JB
519 private getWorkerNodeKey (worker: Worker): number {
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
bf9549ae
JB
523 }
524
afc003b2 525 /** @inheritDoc */
a35560ba 526 public setWorkerChoiceStrategy (
59219cbb
JB
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 529 ): void {
aee46736 530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 531 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
8a1260a3 538 for (const workerNode of this.workerNodes) {
4b628b48 539 workerNode.resetUsage()
b6b32453 540 this.setWorkerStatistics(workerNode.worker)
59219cbb 541 }
a20f0ba5
JB
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
0d80593b 548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
a35560ba
S
552 )
553 }
554
a20f0ba5 555 /** @inheritDoc */
8f52842f
JB
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
a20f0ba5 560 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 561 this.flushTasksQueues()
a20f0ba5
JB
562 }
563 this.opts.enableTasksQueue = enable
8f52842f 564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
565 }
566
567 /** @inheritDoc */
8f52842f 568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 569 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 573 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
c319c66b
JB
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
dea903a8
JB
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
c2ade475 594
c319c66b
JB
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
7c0ba920 601
6c6afb84
JB
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
c2ade475 607 protected internalBusy (): boolean {
e0ae6100
JB
608 return (
609 this.workerNodes.findIndex(workerNode => {
f59e1027 610 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
611 }) === -1
612 )
cb70b19d
JB
613 }
614
afc003b2 615 /** @inheritDoc */
a86b6df1 616 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 617 const timestamp = performance.now()
20dcad1a 618 const workerNodeKey = this.chooseWorkerNode()
adc3c320 619 const submittedTask: Task<Data> = {
ff128cc9 620 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
621 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
622 data: data ?? ({} as Data),
b6b32453 623 timestamp,
21f710aa 624 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 625 id: randomUUID()
adc3c320 626 }
2e81254d 627 const res = new Promise<Response>((resolve, reject) => {
02706357 628 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
629 resolve,
630 reject,
20dcad1a 631 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
632 })
633 })
ff733df7
JB
634 if (
635 this.opts.enableTasksQueue === true &&
7171d33f 636 (this.busy ||
f59e1027 637 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 638 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 639 .concurrency as number))
ff733df7 640 ) {
26a929d7
JB
641 this.enqueueTask(workerNodeKey, submittedTask)
642 } else {
2e81254d 643 this.executeTask(workerNodeKey, submittedTask)
adc3c320 644 }
ff733df7 645 this.checkAndEmitEvents()
78cea37e 646 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
647 return res
648 }
c97c7edb 649
afc003b2 650 /** @inheritDoc */
c97c7edb 651 public async destroy (): Promise<void> {
1fbcaa7c 652 await Promise.all(
875a7c37
JB
653 this.workerNodes.map(async (workerNode, workerNodeKey) => {
654 this.flushTasksQueue(workerNodeKey)
47aacbaa 655 // FIXME: wait for tasks to be finished
920278a2
JB
656 const workerExitPromise = new Promise<void>(resolve => {
657 workerNode.worker.on('exit', () => {
658 resolve()
659 })
660 })
f06e48d8 661 await this.destroyWorker(workerNode.worker)
920278a2 662 await workerExitPromise
1fbcaa7c
JB
663 })
664 )
c97c7edb
S
665 }
666
4a6952ff 667 /**
6c6afb84 668 * Terminates the given worker.
4a6952ff 669 *
f06e48d8 670 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
671 */
672 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 673
729c563d 674 /**
6677a3d3
JB
675 * Setup hook to execute code before worker nodes are created in the abstract constructor.
676 * Can be overridden.
afc003b2
JB
677 *
678 * @virtual
729c563d 679 */
280c2a77 680 protected setupHook (): void {
d99ba5a8 681 // Intentionally empty
280c2a77 682 }
c97c7edb 683
729c563d 684 /**
280c2a77
S
685 * Should return whether the worker is the main worker or not.
686 */
687 protected abstract isMain (): boolean
688
689 /**
2e81254d 690 * Hook executed before the worker task execution.
bf9549ae 691 * Can be overridden.
729c563d 692 *
f06e48d8 693 * @param workerNodeKey - The worker node key.
1c6fe997 694 * @param task - The task to execute.
729c563d 695 */
1c6fe997
JB
696 protected beforeTaskExecutionHook (
697 workerNodeKey: number,
698 task: Task<Data>
699 ): void {
f59e1027 700 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
701 ++workerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 703 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
704 task.name as string
705 ) as WorkerUsage
eb8afc8a
JB
706 ++taskWorkerUsage.tasks.executing
707 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
708 }
709
c01733f1 710 /**
2e81254d 711 * Hook executed after the worker task execution.
bf9549ae 712 * Can be overridden.
c01733f1 713 *
c923ce56 714 * @param worker - The worker.
38e795c1 715 * @param message - The received message.
c01733f1 716 */
2e81254d 717 protected afterTaskExecutionHook (
c923ce56 718 worker: Worker,
2740a743 719 message: MessageValue<Response>
bf9549ae 720 ): void {
ff128cc9
JB
721 const workerNodeKey = this.getWorkerNodeKey(worker)
722 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
723 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
724 this.updateRunTimeWorkerUsage(workerUsage, message)
725 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 726 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 727 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 728 ) as WorkerUsage
eb8afc8a
JB
729 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
730 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
731 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
732 }
733
734 private updateTaskStatisticsWorkerUsage (
735 workerUsage: WorkerUsage,
736 message: MessageValue<Response>
737 ): void {
a4e07f72
JB
738 const workerTaskStatistics = workerUsage.tasks
739 --workerTaskStatistics.executing
98e72cda
JB
740 if (message.taskError == null) {
741 ++workerTaskStatistics.executed
742 } else {
a4e07f72 743 ++workerTaskStatistics.failed
2740a743 744 }
f8eb0a2a
JB
745 }
746
a4e07f72
JB
747 private updateRunTimeWorkerUsage (
748 workerUsage: WorkerUsage,
f8eb0a2a
JB
749 message: MessageValue<Response>
750 ): void {
e4f20deb
JB
751 updateMeasurementStatistics(
752 workerUsage.runTime,
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
754 message.taskPerformance?.runTime ?? 0,
755 workerUsage.tasks.executed
756 )
f8eb0a2a
JB
757 }
758
a4e07f72
JB
759 private updateWaitTimeWorkerUsage (
760 workerUsage: WorkerUsage,
1c6fe997 761 task: Task<Data>
f8eb0a2a 762 ): void {
1c6fe997
JB
763 const timestamp = performance.now()
764 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
765 updateMeasurementStatistics(
766 workerUsage.waitTime,
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
768 taskWaitTime,
769 workerUsage.tasks.executed
770 )
c01733f1 771 }
772
a4e07f72 773 private updateEluWorkerUsage (
5df69fab 774 workerUsage: WorkerUsage,
62c15a68
JB
775 message: MessageValue<Response>
776 ): void {
008512c7
JB
777 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
779 updateMeasurementStatistics(
780 workerUsage.elu.active,
008512c7 781 eluTaskStatisticsRequirements,
e4f20deb
JB
782 message.taskPerformance?.elu?.active ?? 0,
783 workerUsage.tasks.executed
784 )
785 updateMeasurementStatistics(
786 workerUsage.elu.idle,
008512c7 787 eluTaskStatisticsRequirements,
e4f20deb
JB
788 message.taskPerformance?.elu?.idle ?? 0,
789 workerUsage.tasks.executed
790 )
008512c7 791 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 792 if (message.taskPerformance?.elu != null) {
f7510105
JB
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
62c15a68
JB
801 }
802 }
803 }
804
280c2a77 805 /**
f06e48d8 806 * Chooses a worker node for the next task.
280c2a77 807 *
6c6afb84 808 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 809 *
20dcad1a 810 * @returns The worker node key
280c2a77 811 */
6c6afb84 812 private chooseWorkerNode (): number {
930dcf12 813 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
814 const worker = this.createAndSetupDynamicWorker()
815 if (
816 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
817 ) {
818 return this.getWorkerNodeKey(worker)
819 }
17393ac8 820 }
930dcf12
JB
821 return this.workerChoiceStrategyContext.execute()
822 }
823
6c6afb84
JB
824 /**
825 * Conditions for dynamic worker creation.
826 *
827 * @returns Whether to create a dynamic worker or not.
828 */
829 private shallCreateDynamicWorker (): boolean {
930dcf12 830 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
831 }
832
280c2a77 833 /**
675bb809 834 * Sends a message to the given worker.
280c2a77 835 *
38e795c1
JB
836 * @param worker - The worker which should receive the message.
837 * @param message - The message.
280c2a77
S
838 */
839 protected abstract sendToWorker (
840 worker: Worker,
841 message: MessageValue<Data>
842 ): void
843
729c563d 844 /**
41344292 845 * Creates a new worker.
6c6afb84
JB
846 *
847 * @returns Newly created worker.
729c563d 848 */
280c2a77 849 protected abstract createWorker (): Worker
c97c7edb 850
4a6952ff 851 /**
f06e48d8 852 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
853 *
854 * @returns New, completely set up worker.
855 */
856 protected createAndSetupWorker (): Worker {
bdacc2d2 857 const worker = this.createWorker()
280c2a77 858
35cf1c03 859 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 860 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 861 worker.on('error', error => {
9b106837
JB
862 const workerNodeKey = this.getWorkerNodeKey(worker)
863 const workerInfo = this.getWorkerInfo(workerNodeKey)
864 workerInfo.ready = false
2a69b8c5 865 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 866 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 867 if (workerInfo.dynamic) {
8a1260a3
JB
868 this.createAndSetupDynamicWorker()
869 } else {
870 this.createAndSetupWorker()
871 }
5baee0d7 872 }
19dbc45b 873 if (this.opts.enableTasksQueue === true) {
9b106837 874 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 875 }
5baee0d7 876 })
a35560ba
S
877 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
878 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 879 worker.once('exit', () => {
85aeb3f3
JB
880 const workerInfo = this.getWorkerInfoByWorker(worker)
881 if (workerInfo.messageChannel != null) {
882 workerInfo.messageChannel?.port1.close()
883 workerInfo.messageChannel?.port1.close()
884 }
f06e48d8 885 this.removeWorkerNode(worker)
a974afa6 886 })
280c2a77 887
b0a4db63 888 this.addWorkerNode(worker)
280c2a77
S
889
890 this.afterWorkerSetup(worker)
891
c97c7edb
S
892 return worker
893 }
be0676b3 894
930dcf12
JB
895 /**
896 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
897 *
898 * @returns New, completely set up dynamic worker.
899 */
900 protected createAndSetupDynamicWorker (): Worker {
901 const worker = this.createAndSetupWorker()
902 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 903 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
904 if (
905 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
906 (message.kill != null &&
907 ((this.opts.enableTasksQueue === false &&
f59e1027 908 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 909 (this.opts.enableTasksQueue === true &&
f59e1027 910 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 911 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
912 ) {
913 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
914 void (this.destroyWorker(worker) as Promise<void>)
915 }
916 })
e221309a 917 const workerInfo = this.getWorkerInfoByWorker(worker)
b0a4db63 918 workerInfo.dynamic = true
b97d82d8
JB
919 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
920 workerInfo.ready = true
921 }
21f710aa 922 this.sendToWorker(worker, {
b0a4db63 923 checkActive: true,
21f710aa
JB
924 workerId: workerInfo.id as number
925 })
930dcf12
JB
926 return worker
927 }
928
a2ed5053
JB
929 /**
930 * Registers a listener callback on the given worker.
931 *
932 * @param worker - The worker which should register a listener.
933 * @param listener - The message listener callback.
934 */
85aeb3f3
JB
935 protected abstract registerWorkerMessageListener<
936 Message extends Data | Response
937 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
a2ed5053
JB
938
939 /**
940 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
941 * Can be overridden.
942 *
943 * @param worker - The newly created worker.
944 */
945 protected afterWorkerSetup (worker: Worker): void {
946 // Listen to worker messages.
947 this.registerWorkerMessageListener(worker, this.workerListener())
85aeb3f3
JB
948 // Send the startup message to worker.
949 this.sendStartupMessageToWorker(worker)
d2c73f82
JB
950 // Setup worker task statistics computation.
951 this.setWorkerStatistics(worker)
952 }
953
85aeb3f3
JB
954 /**
955 * Sends the startup message to the given worker.
956 *
957 * @param worker - The worker which should receive the startup message.
958 */
959 protected abstract sendStartupMessageToWorker (worker: Worker): void
a2ed5053
JB
960
961 private redistributeQueuedTasks (workerNodeKey: number): void {
962 while (this.tasksQueueSize(workerNodeKey) > 0) {
963 let targetWorkerNodeKey: number = workerNodeKey
964 let minQueuedTasks = Infinity
965 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
966 const workerInfo = this.getWorkerInfo(workerNodeId)
967 if (
968 workerNodeId !== workerNodeKey &&
969 workerInfo.ready &&
970 workerNode.usage.tasks.queued === 0
971 ) {
972 targetWorkerNodeKey = workerNodeId
973 break
974 }
975 if (
976 workerNodeId !== workerNodeKey &&
977 workerInfo.ready &&
978 workerNode.usage.tasks.queued < minQueuedTasks
979 ) {
980 minQueuedTasks = workerNode.usage.tasks.queued
981 targetWorkerNodeKey = workerNodeId
982 }
983 }
984 this.enqueueTask(
985 targetWorkerNodeKey,
986 this.dequeueTask(workerNodeKey) as Task<Data>
987 )
988 }
989 }
990
be0676b3 991 /**
ff733df7 992 * This function is the listener registered for each worker message.
be0676b3 993 *
bdacc2d2 994 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
995 */
996 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 997 return message => {
21f710aa 998 this.checkMessageWorkerId(message)
d2c73f82 999 if (message.ready != null) {
10e2aa7e
JB
1000 // Worker ready response received
1001 this.handleWorkerReadyResponse(message)
f59e1027 1002 } else if (message.id != null) {
a3445496 1003 // Task execution response received
6b272951
JB
1004 this.handleTaskExecutionResponse(message)
1005 }
1006 }
1007 }
1008
10e2aa7e 1009 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
e221309a
JB
1010 this.getWorkerInfoByWorker(
1011 this.getWorkerById(message.workerId) as Worker
1012 ).ready = message.ready as boolean
2431bdb4
JB
1013 if (this.emitter != null && this.ready) {
1014 this.emitter.emit(PoolEvents.ready, this.info)
1015 }
6b272951
JB
1016 }
1017
1018 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1019 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1020 if (promiseResponse != null) {
1021 if (message.taskError != null) {
2a69b8c5 1022 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1023 promiseResponse.reject(message.taskError.message)
1024 } else {
1025 promiseResponse.resolve(message.data as Response)
1026 }
1027 this.afterTaskExecutionHook(promiseResponse.worker, message)
1028 this.promiseResponseMap.delete(message.id as string)
1029 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1030 if (
1031 this.opts.enableTasksQueue === true &&
1032 this.tasksQueueSize(workerNodeKey) > 0
1033 ) {
1034 this.executeTask(
1035 workerNodeKey,
1036 this.dequeueTask(workerNodeKey) as Task<Data>
1037 )
be0676b3 1038 }
6b272951 1039 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1040 }
be0676b3 1041 }
7c0ba920 1042
ff733df7 1043 private checkAndEmitEvents (): void {
1f68cede 1044 if (this.emitter != null) {
ff733df7 1045 if (this.busy) {
2845f2a5 1046 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1047 }
6b27d407 1048 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1049 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1050 }
164d950a
JB
1051 }
1052 }
1053
8a1260a3 1054 /**
e221309a 1055 * Gets the worker information from the given worker node key.
8a1260a3
JB
1056 *
1057 * @param workerNodeKey - The worker node key.
1058 */
1059 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1060 return this.workerNodes[workerNodeKey].info
1061 }
1062
e221309a
JB
1063 /**
1064 * Gets the worker information from the given worker.
1065 *
1066 * @param worker - The worker.
1067 */
85aeb3f3 1068 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
dc02fc29
JB
1069 const workerNodeKey = this.getWorkerNodeKey(worker)
1070 if (workerNodeKey === -1) {
1071 throw new Error('Worker not found')
1072 }
1073 return this.workerNodes[workerNodeKey].info
e221309a
JB
1074 }
1075
a05c10de 1076 /**
b0a4db63 1077 * Adds the given worker in the pool worker nodes.
ea7a90d3 1078 *
38e795c1 1079 * @param worker - The worker.
f06e48d8 1080 * @returns The worker nodes length.
ea7a90d3 1081 */
b0a4db63 1082 private addWorkerNode (worker: Worker): number {
cc3ab78b 1083 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1084 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1085 if (this.starting) {
1086 workerNode.info.ready = true
1087 }
cc3ab78b 1088 return this.workerNodes.push(workerNode)
ea7a90d3 1089 }
c923ce56 1090
51fe3d3c 1091 /**
f06e48d8 1092 * Removes the given worker from the pool worker nodes.
51fe3d3c 1093 *
f06e48d8 1094 * @param worker - The worker.
51fe3d3c 1095 */
416fd65c 1096 private removeWorkerNode (worker: Worker): void {
f06e48d8 1097 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1098 if (workerNodeKey !== -1) {
1099 this.workerNodes.splice(workerNodeKey, 1)
1100 this.workerChoiceStrategyContext.remove(workerNodeKey)
1101 }
51fe3d3c 1102 }
adc3c320 1103
b0a4db63
JB
1104 /**
1105 * Executes the given task on the given worker.
1106 *
1107 * @param worker - The worker.
1108 * @param task - The task to execute.
1109 */
2e81254d 1110 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1111 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1112 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1113 }
1114
f9f00b5f 1115 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1116 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1117 }
1118
416fd65c 1119 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1120 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1121 }
1122
416fd65c 1123 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1124 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1125 }
1126
416fd65c 1127 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1128 while (this.tasksQueueSize(workerNodeKey) > 0) {
1129 this.executeTask(
1130 workerNodeKey,
1131 this.dequeueTask(workerNodeKey) as Task<Data>
1132 )
ff733df7 1133 }
4b628b48 1134 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1135 }
1136
ef41a6e6
JB
1137 private flushTasksQueues (): void {
1138 for (const [workerNodeKey] of this.workerNodes.entries()) {
1139 this.flushTasksQueue(workerNodeKey)
1140 }
1141 }
b6b32453
JB
1142
1143 private setWorkerStatistics (worker: Worker): void {
1144 this.sendToWorker(worker, {
1145 statistics: {
87de9ff5
JB
1146 runTime:
1147 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1148 .runTime.aggregate,
87de9ff5 1149 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1150 .elu.aggregate
21f710aa 1151 },
e221309a 1152 workerId: this.getWorkerInfoByWorker(worker).id as number
b6b32453
JB
1153 })
1154 }
c97c7edb 1155}