perf: use worker node key instead of heavy worker reference in hot code path
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
501aea93
JB
74 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
75 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
075e51d1 86 /**
adc9cc64 87 * Whether the pool is starting or not.
075e51d1
JB
88 */
89 private readonly starting: boolean
afe0d5bf
JB
90 /**
91 * The start timestamp of the pool.
92 */
93 private readonly startTimestamp
94
729c563d
S
95 /**
96 * Constructs a new poolifier pool.
97 *
38e795c1 98 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 99 * @param filePath - Path to the worker file.
38e795c1 100 * @param opts - Options for the pool.
729c563d 101 */
c97c7edb 102 public constructor (
b4213b7f
JB
103 protected readonly numberOfWorkers: number,
104 protected readonly filePath: string,
105 protected readonly opts: PoolOptions<Worker>
c97c7edb 106 ) {
78cea37e 107 if (!this.isMain()) {
c97c7edb
S
108 throw new Error('Cannot start a pool from a worker!')
109 }
8d3782fa 110 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 111 this.checkFilePath(this.filePath)
7c0ba920 112 this.checkPoolOptions(this.opts)
1086026a 113
7254e419
JB
114 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
115 this.executeTask = this.executeTask.bind(this)
116 this.enqueueTask = this.enqueueTask.bind(this)
117 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 118
6bd72cd0 119 if (this.opts.enableEvents === true) {
7c0ba920
JB
120 this.emitter = new PoolEmitter()
121 }
d59df138
JB
122 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
123 Worker,
124 Data,
125 Response
da309861
JB
126 >(
127 this,
128 this.opts.workerChoiceStrategy,
129 this.opts.workerChoiceStrategyOptions
130 )
b6b32453
JB
131
132 this.setupHook()
133
075e51d1 134 this.starting = true
e761c033 135 this.startPool()
075e51d1 136 this.starting = false
afe0d5bf
JB
137
138 this.startTimestamp = performance.now()
c97c7edb
S
139 }
140
a35560ba 141 private checkFilePath (filePath: string): void {
ffcbbad8
JB
142 if (
143 filePath == null ||
3d6dd312 144 typeof filePath !== 'string' ||
ffcbbad8
JB
145 (typeof filePath === 'string' && filePath.trim().length === 0)
146 ) {
c510fea7
APA
147 throw new Error('Please specify a file with a worker implementation')
148 }
3d6dd312
JB
149 if (!existsSync(filePath)) {
150 throw new Error(`Cannot find the worker file '${filePath}'`)
151 }
c510fea7
APA
152 }
153
8d3782fa
JB
154 private checkNumberOfWorkers (numberOfWorkers: number): void {
155 if (numberOfWorkers == null) {
156 throw new Error(
157 'Cannot instantiate a pool without specifying the number of workers'
158 )
78cea37e 159 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 160 throw new TypeError(
0d80593b 161 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
162 )
163 } else if (numberOfWorkers < 0) {
473c717a 164 throw new RangeError(
8d3782fa
JB
165 'Cannot instantiate a pool with a negative number of workers'
166 )
6b27d407 167 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
168 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
169 }
170 }
171
172 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
173 if (this.type === PoolTypes.dynamic) {
174 if (min > max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
177 )
b97d82d8 178 } else if (max === 0) {
079de991 179 throw new RangeError(
b97d82d8 180 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
181 )
182 } else if (min === max) {
183 throw new RangeError(
184 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
185 )
186 }
8d3782fa
JB
187 }
188 }
189
7c0ba920 190 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
191 if (isPlainObject(opts)) {
192 this.opts.workerChoiceStrategy =
193 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
194 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
195 this.opts.workerChoiceStrategyOptions =
196 opts.workerChoiceStrategyOptions ??
197 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
198 this.checkValidWorkerChoiceStrategyOptions(
199 this.opts.workerChoiceStrategyOptions
200 )
1f68cede 201 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
202 this.opts.enableEvents = opts.enableEvents ?? true
203 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
204 if (this.opts.enableTasksQueue) {
205 this.checkValidTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 }
212 } else {
213 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 214 }
aee46736
JB
215 }
216
217 private checkValidWorkerChoiceStrategy (
218 workerChoiceStrategy: WorkerChoiceStrategy
219 ): void {
220 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 221 throw new Error(
aee46736 222 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
223 )
224 }
7c0ba920
JB
225 }
226
0d80593b
JB
227 private checkValidWorkerChoiceStrategyOptions (
228 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
229 ): void {
230 if (!isPlainObject(workerChoiceStrategyOptions)) {
231 throw new TypeError(
232 'Invalid worker choice strategy options: must be a plain object'
233 )
234 }
49be33fe
JB
235 if (
236 workerChoiceStrategyOptions.weights != null &&
6b27d407 237 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
238 ) {
239 throw new Error(
240 'Invalid worker choice strategy options: must have a weight for each worker node'
241 )
242 }
f0d7f803
JB
243 if (
244 workerChoiceStrategyOptions.measurement != null &&
245 !Object.values(Measurements).includes(
246 workerChoiceStrategyOptions.measurement
247 )
248 ) {
249 throw new Error(
250 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
251 )
252 }
0d80593b
JB
253 }
254
a20f0ba5
JB
255 private checkValidTasksQueueOptions (
256 tasksQueueOptions: TasksQueueOptions
257 ): void {
0d80593b
JB
258 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
259 throw new TypeError('Invalid tasks queue options: must be a plain object')
260 }
f0d7f803
JB
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 !Number.isSafeInteger(tasksQueueOptions.concurrency)
264 ) {
265 throw new TypeError(
266 'Invalid worker tasks concurrency: must be an integer'
267 )
268 }
269 if (
270 tasksQueueOptions?.concurrency != null &&
271 tasksQueueOptions.concurrency <= 0
272 ) {
a20f0ba5 273 throw new Error(
f0d7f803 274 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
275 )
276 }
277 }
278
e761c033
JB
279 private startPool (): void {
280 while (
281 this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
284 0
285 ) < this.numberOfWorkers
286 ) {
287 this.createAndSetupWorker()
288 }
289 }
290
08f3f44c 291 /** @inheritDoc */
6b27d407
JB
292 public get info (): PoolInfo {
293 return {
23ccf9d7 294 version,
6b27d407 295 type: this.type,
184855e6 296 worker: this.worker,
2431bdb4
JB
297 ready: this.ready,
298 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
299 minSize: this.minSize,
300 maxSize: this.maxSize,
c05f0d50
JB
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate &&
1305e9a8
JB
303 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
305 workerNodes: this.workerNodes.length,
306 idleWorkerNodes: this.workerNodes.reduce(
307 (accumulator, workerNode) =>
f59e1027 308 workerNode.usage.tasks.executing === 0
a4e07f72
JB
309 ? accumulator + 1
310 : accumulator,
6b27d407
JB
311 0
312 ),
313 busyWorkerNodes: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
f59e1027 315 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
316 0
317 ),
a4e07f72 318 executedTasks: this.workerNodes.reduce(
6b27d407 319 (accumulator, workerNode) =>
f59e1027 320 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
321 0
322 ),
323 executingTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
f59e1027 325 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
326 0
327 ),
328 queuedTasks: this.workerNodes.reduce(
df593701 329 (accumulator, workerNode) =>
f59e1027 330 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
331 0
332 ),
333 maxQueuedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
b25a42cd 335 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 336 0
a4e07f72
JB
337 ),
338 failedTasks: this.workerNodes.reduce(
339 (accumulator, workerNode) =>
f59e1027 340 accumulator + workerNode.usage.tasks.failed,
a4e07f72 341 0
1dcf8b7b
JB
342 ),
343 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
344 .runTime.aggregate && {
345 runTime: {
98e72cda
JB
346 minimum: round(
347 Math.min(
348 ...this.workerNodes.map(
349 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
350 )
1dcf8b7b
JB
351 )
352 ),
98e72cda
JB
353 maximum: round(
354 Math.max(
355 ...this.workerNodes.map(
356 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
357 )
1dcf8b7b 358 )
98e72cda
JB
359 ),
360 average: round(
361 this.workerNodes.reduce(
362 (accumulator, workerNode) =>
363 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
364 0
365 ) /
366 this.workerNodes.reduce(
367 (accumulator, workerNode) =>
368 accumulator + (workerNode.usage.tasks?.executed ?? 0),
369 0
370 )
371 ),
372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
373 .runTime.median && {
374 median: round(
375 median(
376 this.workerNodes.map(
377 workerNode => workerNode.usage.runTime?.median ?? 0
378 )
379 )
380 )
381 })
1dcf8b7b
JB
382 }
383 }),
384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
385 .waitTime.aggregate && {
386 waitTime: {
98e72cda
JB
387 minimum: round(
388 Math.min(
389 ...this.workerNodes.map(
390 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
391 )
1dcf8b7b
JB
392 )
393 ),
98e72cda
JB
394 maximum: round(
395 Math.max(
396 ...this.workerNodes.map(
397 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
398 )
1dcf8b7b 399 )
98e72cda
JB
400 ),
401 average: round(
402 this.workerNodes.reduce(
403 (accumulator, workerNode) =>
404 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
405 0
406 ) /
407 this.workerNodes.reduce(
408 (accumulator, workerNode) =>
409 accumulator + (workerNode.usage.tasks?.executed ?? 0),
410 0
411 )
412 ),
413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
414 .waitTime.median && {
415 median: round(
416 median(
417 this.workerNodes.map(
418 workerNode => workerNode.usage.waitTime?.median ?? 0
419 )
420 )
421 )
422 })
1dcf8b7b
JB
423 }
424 })
6b27d407
JB
425 }
426 }
08f3f44c 427
2431bdb4
JB
428 private get ready (): boolean {
429 return (
b97d82d8
JB
430 this.workerNodes.reduce(
431 (accumulator, workerNode) =>
432 !workerNode.info.dynamic && workerNode.info.ready
433 ? accumulator + 1
434 : accumulator,
435 0
436 ) >= this.minSize
2431bdb4
JB
437 )
438 }
439
afe0d5bf
JB
440 /**
441 * Gets the approximate pool utilization.
442 *
443 * @returns The pool utilization.
444 */
445 private get utilization (): number {
8e5ca040 446 const poolTimeCapacity =
fe7d90db 447 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
448 const totalTasksRunTime = this.workerNodes.reduce(
449 (accumulator, workerNode) =>
71514351 450 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
451 0
452 )
453 const totalTasksWaitTime = this.workerNodes.reduce(
454 (accumulator, workerNode) =>
71514351 455 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
456 0
457 )
8e5ca040 458 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
459 }
460
8881ae32
JB
461 /**
462 * Pool type.
463 *
464 * If it is `'dynamic'`, it provides the `max` property.
465 */
466 protected abstract get type (): PoolType
467
184855e6
JB
468 /**
469 * Gets the worker type.
470 */
471 protected abstract get worker (): WorkerType
472
c2ade475 473 /**
6b27d407 474 * Pool minimum size.
c2ade475 475 */
6b27d407 476 protected abstract get minSize (): number
ff733df7
JB
477
478 /**
6b27d407 479 * Pool maximum size.
ff733df7 480 */
6b27d407 481 protected abstract get maxSize (): number
a35560ba 482
f59e1027
JB
483 /**
484 * Get the worker given its id.
485 *
486 * @param workerId - The worker id.
487 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
488 */
489 private getWorkerById (workerId: number): Worker | undefined {
490 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
491 ?.worker
492 }
493
6b813701
JB
494 /**
495 * Checks if the worker id sent in the received message from a worker is valid.
496 *
497 * @param message - The received message.
498 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
499 */
21f710aa
JB
500 private checkMessageWorkerId (message: MessageValue<Response>): void {
501 if (
502 message.workerId != null &&
503 this.getWorkerById(message.workerId) == null
504 ) {
505 throw new Error(
506 `Worker message received from unknown worker '${message.workerId}'`
507 )
508 }
509 }
510
ffcbbad8 511 /**
f06e48d8 512 * Gets the given worker its worker node key.
ffcbbad8
JB
513 *
514 * @param worker - The worker.
f59e1027 515 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 516 */
3f09ed9f 517 protected getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
518 return this.workerNodes.findIndex(
519 workerNode => workerNode.worker === worker
520 )
bf9549ae
JB
521 }
522
afc003b2 523 /** @inheritDoc */
a35560ba 524 public setWorkerChoiceStrategy (
59219cbb
JB
525 workerChoiceStrategy: WorkerChoiceStrategy,
526 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 527 ): void {
aee46736 528 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 529 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
530 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
531 this.opts.workerChoiceStrategy
532 )
533 if (workerChoiceStrategyOptions != null) {
534 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
535 }
8a1260a3 536 for (const workerNode of this.workerNodes) {
4b628b48 537 workerNode.resetUsage()
b6b32453 538 this.setWorkerStatistics(workerNode.worker)
59219cbb 539 }
a20f0ba5
JB
540 }
541
542 /** @inheritDoc */
543 public setWorkerChoiceStrategyOptions (
544 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
545 ): void {
0d80593b 546 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
547 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
548 this.workerChoiceStrategyContext.setOptions(
549 this.opts.workerChoiceStrategyOptions
a35560ba
S
550 )
551 }
552
a20f0ba5 553 /** @inheritDoc */
8f52842f
JB
554 public enableTasksQueue (
555 enable: boolean,
556 tasksQueueOptions?: TasksQueueOptions
557 ): void {
a20f0ba5 558 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 559 this.flushTasksQueues()
a20f0ba5
JB
560 }
561 this.opts.enableTasksQueue = enable
8f52842f 562 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
563 }
564
565 /** @inheritDoc */
8f52842f 566 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 567 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
568 this.checkValidTasksQueueOptions(tasksQueueOptions)
569 this.opts.tasksQueueOptions =
570 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 571 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
572 delete this.opts.tasksQueueOptions
573 }
574 }
575
576 private buildTasksQueueOptions (
577 tasksQueueOptions: TasksQueueOptions
578 ): TasksQueueOptions {
579 return {
580 concurrency: tasksQueueOptions?.concurrency ?? 1
581 }
582 }
583
c319c66b
JB
584 /**
585 * Whether the pool is full or not.
586 *
587 * The pool filling boolean status.
588 */
dea903a8
JB
589 protected get full (): boolean {
590 return this.workerNodes.length >= this.maxSize
591 }
c2ade475 592
c319c66b
JB
593 /**
594 * Whether the pool is busy or not.
595 *
596 * The pool busyness boolean status.
597 */
598 protected abstract get busy (): boolean
7c0ba920 599
6c6afb84
JB
600 /**
601 * Whether worker nodes are executing at least one task.
602 *
603 * @returns Worker nodes busyness boolean status.
604 */
c2ade475 605 protected internalBusy (): boolean {
e0ae6100
JB
606 return (
607 this.workerNodes.findIndex(workerNode => {
f59e1027 608 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
609 }) === -1
610 )
cb70b19d
JB
611 }
612
afc003b2 613 /** @inheritDoc */
a86b6df1 614 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
615 return await new Promise<Response>((resolve, reject) => {
616 const timestamp = performance.now()
617 const workerNodeKey = this.chooseWorkerNode()
501aea93 618 const task: Task<Data> = {
52b71763
JB
619 name: name ?? DEFAULT_TASK_NAME,
620 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
621 data: data ?? ({} as Data),
622 timestamp,
623 workerId: this.getWorkerInfo(workerNodeKey).id as number,
624 id: randomUUID()
625 }
501aea93 626 this.promiseResponseMap.set(task.id as string, {
2e81254d
JB
627 resolve,
628 reject,
501aea93 629 workerNodeKey
2e81254d 630 })
52b71763
JB
631 if (
632 this.opts.enableTasksQueue === true &&
633 (this.busy ||
634 this.workerNodes[workerNodeKey].usage.tasks.executing >=
635 ((this.opts.tasksQueueOptions as TasksQueueOptions)
636 .concurrency as number))
637 ) {
501aea93 638 this.enqueueTask(workerNodeKey, task)
52b71763 639 } else {
501aea93 640 this.executeTask(workerNodeKey, task)
52b71763
JB
641 }
642 this.checkAndEmitEvents()
2e81254d 643 })
280c2a77 644 }
c97c7edb 645
afc003b2 646 /** @inheritDoc */
c97c7edb 647 public async destroy (): Promise<void> {
1fbcaa7c 648 await Promise.all(
875a7c37
JB
649 this.workerNodes.map(async (workerNode, workerNodeKey) => {
650 this.flushTasksQueue(workerNodeKey)
47aacbaa 651 // FIXME: wait for tasks to be finished
920278a2
JB
652 const workerExitPromise = new Promise<void>(resolve => {
653 workerNode.worker.on('exit', () => {
654 resolve()
655 })
656 })
f06e48d8 657 await this.destroyWorker(workerNode.worker)
920278a2 658 await workerExitPromise
1fbcaa7c
JB
659 })
660 )
c97c7edb
S
661 }
662
4a6952ff 663 /**
6c6afb84 664 * Terminates the given worker.
4a6952ff 665 *
f06e48d8 666 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
667 */
668 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 669
729c563d 670 /**
6677a3d3
JB
671 * Setup hook to execute code before worker nodes are created in the abstract constructor.
672 * Can be overridden.
afc003b2
JB
673 *
674 * @virtual
729c563d 675 */
280c2a77 676 protected setupHook (): void {
d99ba5a8 677 // Intentionally empty
280c2a77 678 }
c97c7edb 679
729c563d 680 /**
280c2a77
S
681 * Should return whether the worker is the main worker or not.
682 */
683 protected abstract isMain (): boolean
684
685 /**
2e81254d 686 * Hook executed before the worker task execution.
bf9549ae 687 * Can be overridden.
729c563d 688 *
f06e48d8 689 * @param workerNodeKey - The worker node key.
1c6fe997 690 * @param task - The task to execute.
729c563d 691 */
1c6fe997
JB
692 protected beforeTaskExecutionHook (
693 workerNodeKey: number,
694 task: Task<Data>
695 ): void {
f59e1027 696 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
697 ++workerUsage.tasks.executing
698 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 699 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
700 task.name as string
701 ) as WorkerUsage
eb8afc8a
JB
702 ++taskWorkerUsage.tasks.executing
703 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
704 }
705
c01733f1 706 /**
2e81254d 707 * Hook executed after the worker task execution.
bf9549ae 708 * Can be overridden.
c01733f1 709 *
501aea93 710 * @param workerNodeKey - The worker node key.
38e795c1 711 * @param message - The received message.
c01733f1 712 */
2e81254d 713 protected afterTaskExecutionHook (
501aea93 714 workerNodeKey: number,
2740a743 715 message: MessageValue<Response>
bf9549ae 716 ): void {
ff128cc9 717 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
718 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
719 this.updateRunTimeWorkerUsage(workerUsage, message)
720 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 721 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 722 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 723 ) as WorkerUsage
eb8afc8a
JB
724 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
725 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
726 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
727 }
728
729 private updateTaskStatisticsWorkerUsage (
730 workerUsage: WorkerUsage,
731 message: MessageValue<Response>
732 ): void {
a4e07f72
JB
733 const workerTaskStatistics = workerUsage.tasks
734 --workerTaskStatistics.executing
98e72cda
JB
735 if (message.taskError == null) {
736 ++workerTaskStatistics.executed
737 } else {
a4e07f72 738 ++workerTaskStatistics.failed
2740a743 739 }
f8eb0a2a
JB
740 }
741
a4e07f72
JB
742 private updateRunTimeWorkerUsage (
743 workerUsage: WorkerUsage,
f8eb0a2a
JB
744 message: MessageValue<Response>
745 ): void {
e4f20deb
JB
746 updateMeasurementStatistics(
747 workerUsage.runTime,
748 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
749 message.taskPerformance?.runTime ?? 0,
750 workerUsage.tasks.executed
751 )
f8eb0a2a
JB
752 }
753
a4e07f72
JB
754 private updateWaitTimeWorkerUsage (
755 workerUsage: WorkerUsage,
1c6fe997 756 task: Task<Data>
f8eb0a2a 757 ): void {
1c6fe997
JB
758 const timestamp = performance.now()
759 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
760 updateMeasurementStatistics(
761 workerUsage.waitTime,
762 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
763 taskWaitTime,
764 workerUsage.tasks.executed
765 )
c01733f1 766 }
767
a4e07f72 768 private updateEluWorkerUsage (
5df69fab 769 workerUsage: WorkerUsage,
62c15a68
JB
770 message: MessageValue<Response>
771 ): void {
008512c7
JB
772 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
773 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
774 updateMeasurementStatistics(
775 workerUsage.elu.active,
008512c7 776 eluTaskStatisticsRequirements,
e4f20deb
JB
777 message.taskPerformance?.elu?.active ?? 0,
778 workerUsage.tasks.executed
779 )
780 updateMeasurementStatistics(
781 workerUsage.elu.idle,
008512c7 782 eluTaskStatisticsRequirements,
e4f20deb
JB
783 message.taskPerformance?.elu?.idle ?? 0,
784 workerUsage.tasks.executed
785 )
008512c7 786 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 787 if (message.taskPerformance?.elu != null) {
f7510105
JB
788 if (workerUsage.elu.utilization != null) {
789 workerUsage.elu.utilization =
790 (workerUsage.elu.utilization +
791 message.taskPerformance.elu.utilization) /
792 2
793 } else {
794 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
795 }
62c15a68
JB
796 }
797 }
798 }
799
280c2a77 800 /**
f06e48d8 801 * Chooses a worker node for the next task.
280c2a77 802 *
6c6afb84 803 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 804 *
20dcad1a 805 * @returns The worker node key
280c2a77 806 */
6c6afb84 807 private chooseWorkerNode (): number {
930dcf12 808 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
809 const worker = this.createAndSetupDynamicWorker()
810 if (
811 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
812 ) {
813 return this.getWorkerNodeKey(worker)
814 }
17393ac8 815 }
930dcf12
JB
816 return this.workerChoiceStrategyContext.execute()
817 }
818
6c6afb84
JB
819 /**
820 * Conditions for dynamic worker creation.
821 *
822 * @returns Whether to create a dynamic worker or not.
823 */
824 private shallCreateDynamicWorker (): boolean {
930dcf12 825 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
826 }
827
280c2a77 828 /**
675bb809 829 * Sends a message to the given worker.
280c2a77 830 *
38e795c1
JB
831 * @param worker - The worker which should receive the message.
832 * @param message - The message.
280c2a77
S
833 */
834 protected abstract sendToWorker (
835 worker: Worker,
836 message: MessageValue<Data>
837 ): void
838
729c563d 839 /**
41344292 840 * Creates a new worker.
6c6afb84
JB
841 *
842 * @returns Newly created worker.
729c563d 843 */
280c2a77 844 protected abstract createWorker (): Worker
c97c7edb 845
4a6952ff 846 /**
f06e48d8 847 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
848 *
849 * @returns New, completely set up worker.
850 */
851 protected createAndSetupWorker (): Worker {
bdacc2d2 852 const worker = this.createWorker()
280c2a77 853
35cf1c03 854 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 855 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 856 worker.on('error', error => {
9b106837
JB
857 const workerNodeKey = this.getWorkerNodeKey(worker)
858 const workerInfo = this.getWorkerInfo(workerNodeKey)
859 workerInfo.ready = false
0dc838e3 860 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 861 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 862 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 863 if (workerInfo.dynamic) {
8a1260a3
JB
864 this.createAndSetupDynamicWorker()
865 } else {
866 this.createAndSetupWorker()
867 }
5baee0d7 868 }
19dbc45b 869 if (this.opts.enableTasksQueue === true) {
9b106837 870 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 871 }
5baee0d7 872 })
a35560ba
S
873 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
874 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 875 worker.once('exit', () => {
f06e48d8 876 this.removeWorkerNode(worker)
a974afa6 877 })
280c2a77 878
b0a4db63 879 this.addWorkerNode(worker)
280c2a77
S
880
881 this.afterWorkerSetup(worker)
882
c97c7edb
S
883 return worker
884 }
be0676b3 885
930dcf12
JB
886 /**
887 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
888 *
889 * @returns New, completely set up dynamic worker.
890 */
891 protected createAndSetupDynamicWorker (): Worker {
892 const worker = this.createAndSetupWorker()
893 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 894 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
895 if (
896 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
897 (message.kill != null &&
898 ((this.opts.enableTasksQueue === false &&
f59e1027 899 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 900 (this.opts.enableTasksQueue === true &&
f59e1027 901 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 902 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
903 ) {
904 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
905 void (this.destroyWorker(worker) as Promise<void>)
906 }
907 })
e221309a 908 const workerInfo = this.getWorkerInfoByWorker(worker)
b0a4db63 909 workerInfo.dynamic = true
b97d82d8
JB
910 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
911 workerInfo.ready = true
912 }
21f710aa 913 this.sendToWorker(worker, {
b0a4db63 914 checkActive: true,
21f710aa
JB
915 workerId: workerInfo.id as number
916 })
930dcf12
JB
917 return worker
918 }
919
a2ed5053
JB
920 /**
921 * Registers a listener callback on the given worker.
922 *
923 * @param worker - The worker which should register a listener.
924 * @param listener - The message listener callback.
925 */
85aeb3f3
JB
926 protected abstract registerWorkerMessageListener<
927 Message extends Data | Response
928 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
a2ed5053
JB
929
930 /**
931 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
932 * Can be overridden.
933 *
934 * @param worker - The newly created worker.
935 */
936 protected afterWorkerSetup (worker: Worker): void {
937 // Listen to worker messages.
938 this.registerWorkerMessageListener(worker, this.workerListener())
85aeb3f3
JB
939 // Send the startup message to worker.
940 this.sendStartupMessageToWorker(worker)
d2c73f82
JB
941 // Setup worker task statistics computation.
942 this.setWorkerStatistics(worker)
943 }
944
85aeb3f3
JB
945 /**
946 * Sends the startup message to the given worker.
947 *
948 * @param worker - The worker which should receive the startup message.
949 */
950 protected abstract sendStartupMessageToWorker (worker: Worker): void
a2ed5053
JB
951
952 private redistributeQueuedTasks (workerNodeKey: number): void {
953 while (this.tasksQueueSize(workerNodeKey) > 0) {
954 let targetWorkerNodeKey: number = workerNodeKey
955 let minQueuedTasks = Infinity
956 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
957 const workerInfo = this.getWorkerInfo(workerNodeId)
958 if (
959 workerNodeId !== workerNodeKey &&
960 workerInfo.ready &&
961 workerNode.usage.tasks.queued === 0
962 ) {
963 targetWorkerNodeKey = workerNodeId
964 break
965 }
966 if (
967 workerNodeId !== workerNodeKey &&
968 workerInfo.ready &&
969 workerNode.usage.tasks.queued < minQueuedTasks
970 ) {
971 minQueuedTasks = workerNode.usage.tasks.queued
972 targetWorkerNodeKey = workerNodeId
973 }
974 }
975 this.enqueueTask(
976 targetWorkerNodeKey,
977 this.dequeueTask(workerNodeKey) as Task<Data>
978 )
979 }
980 }
981
be0676b3 982 /**
ff733df7 983 * This function is the listener registered for each worker message.
be0676b3 984 *
bdacc2d2 985 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
986 */
987 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 988 return message => {
21f710aa 989 this.checkMessageWorkerId(message)
d2c73f82 990 if (message.ready != null) {
10e2aa7e
JB
991 // Worker ready response received
992 this.handleWorkerReadyResponse(message)
f59e1027 993 } else if (message.id != null) {
a3445496 994 // Task execution response received
6b272951
JB
995 this.handleTaskExecutionResponse(message)
996 }
997 }
998 }
999
10e2aa7e 1000 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
e221309a
JB
1001 this.getWorkerInfoByWorker(
1002 this.getWorkerById(message.workerId) as Worker
1003 ).ready = message.ready as boolean
2431bdb4
JB
1004 if (this.emitter != null && this.ready) {
1005 this.emitter.emit(PoolEvents.ready, this.info)
1006 }
6b272951
JB
1007 }
1008
1009 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1010 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1011 if (promiseResponse != null) {
1012 if (message.taskError != null) {
2a69b8c5 1013 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1014 promiseResponse.reject(message.taskError.message)
1015 } else {
1016 promiseResponse.resolve(message.data as Response)
1017 }
501aea93
JB
1018 const workerNodeKey = promiseResponse.workerNodeKey
1019 this.afterTaskExecutionHook(workerNodeKey, message)
6b272951 1020 this.promiseResponseMap.delete(message.id as string)
6b272951
JB
1021 if (
1022 this.opts.enableTasksQueue === true &&
1023 this.tasksQueueSize(workerNodeKey) > 0
1024 ) {
1025 this.executeTask(
1026 workerNodeKey,
1027 this.dequeueTask(workerNodeKey) as Task<Data>
1028 )
be0676b3 1029 }
6b272951 1030 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1031 }
be0676b3 1032 }
7c0ba920 1033
ff733df7 1034 private checkAndEmitEvents (): void {
1f68cede 1035 if (this.emitter != null) {
ff733df7 1036 if (this.busy) {
2845f2a5 1037 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1038 }
6b27d407 1039 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1040 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1041 }
164d950a
JB
1042 }
1043 }
1044
8a1260a3 1045 /**
e221309a 1046 * Gets the worker information from the given worker node key.
8a1260a3
JB
1047 *
1048 * @param workerNodeKey - The worker node key.
3f09ed9f 1049 * @returns The worker information.
8a1260a3
JB
1050 */
1051 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1052 return this.workerNodes[workerNodeKey].info
1053 }
1054
e221309a
JB
1055 /**
1056 * Gets the worker information from the given worker.
1057 *
1058 * @param worker - The worker.
3f09ed9f
JB
1059 * @returns The worker information.
1060 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
e221309a 1061 */
85aeb3f3 1062 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
dc02fc29
JB
1063 const workerNodeKey = this.getWorkerNodeKey(worker)
1064 if (workerNodeKey === -1) {
1065 throw new Error('Worker not found')
1066 }
1067 return this.workerNodes[workerNodeKey].info
e221309a
JB
1068 }
1069
a05c10de 1070 /**
b0a4db63 1071 * Adds the given worker in the pool worker nodes.
ea7a90d3 1072 *
38e795c1 1073 * @param worker - The worker.
f06e48d8 1074 * @returns The worker nodes length.
ea7a90d3 1075 */
b0a4db63 1076 private addWorkerNode (worker: Worker): number {
cc3ab78b 1077 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1078 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1079 if (this.starting) {
1080 workerNode.info.ready = true
1081 }
cc3ab78b 1082 return this.workerNodes.push(workerNode)
ea7a90d3 1083 }
c923ce56 1084
51fe3d3c 1085 /**
f06e48d8 1086 * Removes the given worker from the pool worker nodes.
51fe3d3c 1087 *
f06e48d8 1088 * @param worker - The worker.
51fe3d3c 1089 */
416fd65c 1090 private removeWorkerNode (worker: Worker): void {
f06e48d8 1091 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1092 if (workerNodeKey !== -1) {
1093 this.workerNodes.splice(workerNodeKey, 1)
1094 this.workerChoiceStrategyContext.remove(workerNodeKey)
1095 }
51fe3d3c 1096 }
adc3c320 1097
b0a4db63
JB
1098 /**
1099 * Executes the given task on the given worker.
1100 *
1101 * @param worker - The worker.
1102 * @param task - The task to execute.
1103 */
2e81254d 1104 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1105 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1106 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1107 }
1108
f9f00b5f 1109 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1110 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1111 }
1112
416fd65c 1113 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1114 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1115 }
1116
416fd65c 1117 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1118 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1119 }
1120
416fd65c 1121 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1122 while (this.tasksQueueSize(workerNodeKey) > 0) {
1123 this.executeTask(
1124 workerNodeKey,
1125 this.dequeueTask(workerNodeKey) as Task<Data>
1126 )
ff733df7 1127 }
4b628b48 1128 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1129 }
1130
ef41a6e6
JB
1131 private flushTasksQueues (): void {
1132 for (const [workerNodeKey] of this.workerNodes.entries()) {
1133 this.flushTasksQueue(workerNodeKey)
1134 }
1135 }
b6b32453
JB
1136
1137 private setWorkerStatistics (worker: Worker): void {
1138 this.sendToWorker(worker, {
1139 statistics: {
87de9ff5
JB
1140 runTime:
1141 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1142 .runTime.aggregate,
87de9ff5 1143 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1144 .elu.aggregate
21f710aa 1145 },
e221309a 1146 workerId: this.getWorkerInfoByWorker(worker).id as number
b6b32453
JB
1147 })
1148 }
c97c7edb 1149}