Merge branch 'master' of github.com:poolifier/poolifier
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf 15 median,
e4f20deb
JB
16 round,
17 updateMeasurementStatistics
bbeadd16 18} from '../utils'
59317253 19import { KillBehaviors } from '../worker/worker-options'
c4855468 20import {
65d7a1c9 21 type IPool,
7c5a1080 22 PoolEmitter,
c4855468 23 PoolEvents,
6b27d407 24 type PoolInfo,
c4855468 25 type PoolOptions,
6b27d407
JB
26 type PoolType,
27 PoolTypes,
4b628b48 28 type TasksQueueOptions
c4855468 29} from './pool'
e102732c
JB
30import type {
31 IWorker,
4b628b48 32 IWorkerNode,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
008512c7 38 type MeasurementStatisticsRequirements,
f0d7f803 39 Measurements,
a35560ba 40 WorkerChoiceStrategies,
a20f0ba5
JB
41 type WorkerChoiceStrategy,
42 type WorkerChoiceStrategyOptions
bdaf31cd
JB
43} from './selection-strategies/selection-strategies-types'
44import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 45import { version } from './version'
4b628b48 46import { WorkerNode } from './worker-node'
23ccf9d7 47
729c563d 48/**
ea7a90d3 49 * Base class that implements some shared logic for all poolifier pools.
729c563d 50 *
38e795c1 51 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
52 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
53 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 54 */
c97c7edb 55export abstract class AbstractPool<
f06e48d8 56 Worker extends IWorker,
d3c8a1a8
S
57 Data = unknown,
58 Response = unknown
c4855468 59> implements IPool<Worker, Data, Response> {
afc003b2 60 /** @inheritDoc */
4b628b48 61 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 62
afc003b2 63 /** @inheritDoc */
7c0ba920
JB
64 public readonly emitter?: PoolEmitter
65
be0676b3 66 /**
52b71763 67 * The task execution response promise map.
be0676b3 68 *
2740a743 69 * - `key`: The message id of each submitted task.
a3445496 70 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 71 *
a3445496 72 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 73 */
c923ce56
JB
74 protected promiseResponseMap: Map<
75 string,
76 PromiseResponseWrapper<Worker, Response>
77 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 78
a35560ba 79 /**
51fe3d3c 80 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
81 */
82 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
83 Worker,
84 Data,
85 Response
a35560ba
S
86 >
87
075e51d1 88 /**
adc9cc64 89 * Whether the pool is starting or not.
075e51d1
JB
90 */
91 private readonly starting: boolean
afe0d5bf
JB
92 /**
93 * The start timestamp of the pool.
94 */
95 private readonly startTimestamp
96
729c563d
S
97 /**
98 * Constructs a new poolifier pool.
99 *
38e795c1 100 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 101 * @param filePath - Path to the worker file.
38e795c1 102 * @param opts - Options for the pool.
729c563d 103 */
c97c7edb 104 public constructor (
b4213b7f
JB
105 protected readonly numberOfWorkers: number,
106 protected readonly filePath: string,
107 protected readonly opts: PoolOptions<Worker>
c97c7edb 108 ) {
78cea37e 109 if (!this.isMain()) {
c97c7edb
S
110 throw new Error('Cannot start a pool from a worker!')
111 }
8d3782fa 112 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 113 this.checkFilePath(this.filePath)
7c0ba920 114 this.checkPoolOptions(this.opts)
1086026a 115
7254e419
JB
116 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
117 this.executeTask = this.executeTask.bind(this)
118 this.enqueueTask = this.enqueueTask.bind(this)
119 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 120
6bd72cd0 121 if (this.opts.enableEvents === true) {
7c0ba920
JB
122 this.emitter = new PoolEmitter()
123 }
d59df138
JB
124 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
125 Worker,
126 Data,
127 Response
da309861
JB
128 >(
129 this,
130 this.opts.workerChoiceStrategy,
131 this.opts.workerChoiceStrategyOptions
132 )
b6b32453
JB
133
134 this.setupHook()
135
075e51d1 136 this.starting = true
e761c033 137 this.startPool()
075e51d1 138 this.starting = false
afe0d5bf
JB
139
140 this.startTimestamp = performance.now()
c97c7edb
S
141 }
142
a35560ba 143 private checkFilePath (filePath: string): void {
ffcbbad8
JB
144 if (
145 filePath == null ||
3d6dd312 146 typeof filePath !== 'string' ||
ffcbbad8
JB
147 (typeof filePath === 'string' && filePath.trim().length === 0)
148 ) {
c510fea7
APA
149 throw new Error('Please specify a file with a worker implementation')
150 }
3d6dd312
JB
151 if (!existsSync(filePath)) {
152 throw new Error(`Cannot find the worker file '${filePath}'`)
153 }
c510fea7
APA
154 }
155
8d3782fa
JB
156 private checkNumberOfWorkers (numberOfWorkers: number): void {
157 if (numberOfWorkers == null) {
158 throw new Error(
159 'Cannot instantiate a pool without specifying the number of workers'
160 )
78cea37e 161 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 162 throw new TypeError(
0d80593b 163 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
164 )
165 } else if (numberOfWorkers < 0) {
473c717a 166 throw new RangeError(
8d3782fa
JB
167 'Cannot instantiate a pool with a negative number of workers'
168 )
6b27d407 169 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
170 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
171 }
172 }
173
174 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
175 if (this.type === PoolTypes.dynamic) {
176 if (min > max) {
177 throw new RangeError(
178 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
179 )
b97d82d8 180 } else if (max === 0) {
079de991 181 throw new RangeError(
b97d82d8 182 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
183 )
184 } else if (min === max) {
185 throw new RangeError(
186 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
187 )
188 }
8d3782fa
JB
189 }
190 }
191
7c0ba920 192 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
193 if (isPlainObject(opts)) {
194 this.opts.workerChoiceStrategy =
195 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
196 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
197 this.opts.workerChoiceStrategyOptions =
198 opts.workerChoiceStrategyOptions ??
199 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
200 this.checkValidWorkerChoiceStrategyOptions(
201 this.opts.workerChoiceStrategyOptions
202 )
1f68cede 203 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
204 this.opts.enableEvents = opts.enableEvents ?? true
205 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
206 if (this.opts.enableTasksQueue) {
207 this.checkValidTasksQueueOptions(
208 opts.tasksQueueOptions as TasksQueueOptions
209 )
210 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
211 opts.tasksQueueOptions as TasksQueueOptions
212 )
213 }
214 } else {
215 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 216 }
aee46736
JB
217 }
218
219 private checkValidWorkerChoiceStrategy (
220 workerChoiceStrategy: WorkerChoiceStrategy
221 ): void {
222 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 223 throw new Error(
aee46736 224 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
225 )
226 }
7c0ba920
JB
227 }
228
0d80593b
JB
229 private checkValidWorkerChoiceStrategyOptions (
230 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
231 ): void {
232 if (!isPlainObject(workerChoiceStrategyOptions)) {
233 throw new TypeError(
234 'Invalid worker choice strategy options: must be a plain object'
235 )
236 }
49be33fe
JB
237 if (
238 workerChoiceStrategyOptions.weights != null &&
6b27d407 239 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
240 ) {
241 throw new Error(
242 'Invalid worker choice strategy options: must have a weight for each worker node'
243 )
244 }
f0d7f803
JB
245 if (
246 workerChoiceStrategyOptions.measurement != null &&
247 !Object.values(Measurements).includes(
248 workerChoiceStrategyOptions.measurement
249 )
250 ) {
251 throw new Error(
252 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
253 )
254 }
0d80593b
JB
255 }
256
a20f0ba5
JB
257 private checkValidTasksQueueOptions (
258 tasksQueueOptions: TasksQueueOptions
259 ): void {
0d80593b
JB
260 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
261 throw new TypeError('Invalid tasks queue options: must be a plain object')
262 }
f0d7f803
JB
263 if (
264 tasksQueueOptions?.concurrency != null &&
265 !Number.isSafeInteger(tasksQueueOptions.concurrency)
266 ) {
267 throw new TypeError(
268 'Invalid worker tasks concurrency: must be an integer'
269 )
270 }
271 if (
272 tasksQueueOptions?.concurrency != null &&
273 tasksQueueOptions.concurrency <= 0
274 ) {
a20f0ba5 275 throw new Error(
f0d7f803 276 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
277 )
278 }
279 }
280
e761c033
JB
281 private startPool (): void {
282 while (
283 this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
286 0
287 ) < this.numberOfWorkers
288 ) {
289 this.createAndSetupWorker()
290 }
291 }
292
08f3f44c 293 /** @inheritDoc */
6b27d407
JB
294 public get info (): PoolInfo {
295 return {
23ccf9d7 296 version,
6b27d407 297 type: this.type,
184855e6 298 worker: this.worker,
2431bdb4
JB
299 ready: this.ready,
300 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
301 minSize: this.minSize,
302 maxSize: this.maxSize,
c05f0d50
JB
303 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
304 .runTime.aggregate &&
1305e9a8
JB
305 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
306 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
307 workerNodes: this.workerNodes.length,
308 idleWorkerNodes: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 workerNode.usage.tasks.executing === 0
a4e07f72
JB
311 ? accumulator + 1
312 : accumulator,
6b27d407
JB
313 0
314 ),
315 busyWorkerNodes: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
318 0
319 ),
a4e07f72 320 executedTasks: this.workerNodes.reduce(
6b27d407 321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
323 0
324 ),
325 executingTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
f59e1027 327 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
328 0
329 ),
330 queuedTasks: this.workerNodes.reduce(
df593701 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
333 0
334 ),
335 maxQueuedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
b25a42cd 337 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 338 0
a4e07f72
JB
339 ),
340 failedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
f59e1027 342 accumulator + workerNode.usage.tasks.failed,
a4e07f72 343 0
1dcf8b7b
JB
344 ),
345 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
346 .runTime.aggregate && {
347 runTime: {
98e72cda
JB
348 minimum: round(
349 Math.min(
350 ...this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
352 )
1dcf8b7b
JB
353 )
354 ),
98e72cda
JB
355 maximum: round(
356 Math.max(
357 ...this.workerNodes.map(
358 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
359 )
1dcf8b7b 360 )
98e72cda
JB
361 ),
362 average: round(
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
366 0
367 ) /
368 this.workerNodes.reduce(
369 (accumulator, workerNode) =>
370 accumulator + (workerNode.usage.tasks?.executed ?? 0),
371 0
372 )
373 ),
374 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
375 .runTime.median && {
376 median: round(
377 median(
378 this.workerNodes.map(
379 workerNode => workerNode.usage.runTime?.median ?? 0
380 )
381 )
382 )
383 })
1dcf8b7b
JB
384 }
385 }),
386 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
387 .waitTime.aggregate && {
388 waitTime: {
98e72cda
JB
389 minimum: round(
390 Math.min(
391 ...this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
393 )
1dcf8b7b
JB
394 )
395 ),
98e72cda
JB
396 maximum: round(
397 Math.max(
398 ...this.workerNodes.map(
399 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
400 )
1dcf8b7b 401 )
98e72cda
JB
402 ),
403 average: round(
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
407 0
408 ) /
409 this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + (workerNode.usage.tasks?.executed ?? 0),
412 0
413 )
414 ),
415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
416 .waitTime.median && {
417 median: round(
418 median(
419 this.workerNodes.map(
420 workerNode => workerNode.usage.waitTime?.median ?? 0
421 )
422 )
423 )
424 })
1dcf8b7b
JB
425 }
426 })
6b27d407
JB
427 }
428 }
08f3f44c 429
2431bdb4
JB
430 private get ready (): boolean {
431 return (
b97d82d8
JB
432 this.workerNodes.reduce(
433 (accumulator, workerNode) =>
434 !workerNode.info.dynamic && workerNode.info.ready
435 ? accumulator + 1
436 : accumulator,
437 0
438 ) >= this.minSize
2431bdb4
JB
439 )
440 }
441
afe0d5bf
JB
442 /**
443 * Gets the approximate pool utilization.
444 *
445 * @returns The pool utilization.
446 */
447 private get utilization (): number {
8e5ca040 448 const poolTimeCapacity =
fe7d90db 449 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
450 const totalTasksRunTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
71514351 452 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
453 0
454 )
455 const totalTasksWaitTime = this.workerNodes.reduce(
456 (accumulator, workerNode) =>
71514351 457 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
458 0
459 )
8e5ca040 460 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
461 }
462
8881ae32
JB
463 /**
464 * Pool type.
465 *
466 * If it is `'dynamic'`, it provides the `max` property.
467 */
468 protected abstract get type (): PoolType
469
184855e6
JB
470 /**
471 * Gets the worker type.
472 */
473 protected abstract get worker (): WorkerType
474
c2ade475 475 /**
6b27d407 476 * Pool minimum size.
c2ade475 477 */
6b27d407 478 protected abstract get minSize (): number
ff733df7
JB
479
480 /**
6b27d407 481 * Pool maximum size.
ff733df7 482 */
6b27d407 483 protected abstract get maxSize (): number
a35560ba 484
f59e1027
JB
485 /**
486 * Get the worker given its id.
487 *
488 * @param workerId - The worker id.
489 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
490 */
491 private getWorkerById (workerId: number): Worker | undefined {
492 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
493 ?.worker
494 }
495
6b813701
JB
496 /**
497 * Checks if the worker id sent in the received message from a worker is valid.
498 *
499 * @param message - The received message.
500 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
501 */
21f710aa
JB
502 private checkMessageWorkerId (message: MessageValue<Response>): void {
503 if (
504 message.workerId != null &&
505 this.getWorkerById(message.workerId) == null
506 ) {
507 throw new Error(
508 `Worker message received from unknown worker '${message.workerId}'`
509 )
510 }
511 }
512
ffcbbad8 513 /**
f06e48d8 514 * Gets the given worker its worker node key.
ffcbbad8
JB
515 *
516 * @param worker - The worker.
f59e1027 517 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 518 */
3f09ed9f 519 protected getWorkerNodeKey (worker: Worker): number {
f06e48d8
JB
520 return this.workerNodes.findIndex(
521 workerNode => workerNode.worker === worker
522 )
bf9549ae
JB
523 }
524
afc003b2 525 /** @inheritDoc */
a35560ba 526 public setWorkerChoiceStrategy (
59219cbb
JB
527 workerChoiceStrategy: WorkerChoiceStrategy,
528 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 529 ): void {
aee46736 530 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 531 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
532 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
533 this.opts.workerChoiceStrategy
534 )
535 if (workerChoiceStrategyOptions != null) {
536 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
537 }
8a1260a3 538 for (const workerNode of this.workerNodes) {
4b628b48 539 workerNode.resetUsage()
b6b32453 540 this.setWorkerStatistics(workerNode.worker)
59219cbb 541 }
a20f0ba5
JB
542 }
543
544 /** @inheritDoc */
545 public setWorkerChoiceStrategyOptions (
546 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
547 ): void {
0d80593b 548 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
549 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
550 this.workerChoiceStrategyContext.setOptions(
551 this.opts.workerChoiceStrategyOptions
a35560ba
S
552 )
553 }
554
a20f0ba5 555 /** @inheritDoc */
8f52842f
JB
556 public enableTasksQueue (
557 enable: boolean,
558 tasksQueueOptions?: TasksQueueOptions
559 ): void {
a20f0ba5 560 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 561 this.flushTasksQueues()
a20f0ba5
JB
562 }
563 this.opts.enableTasksQueue = enable
8f52842f 564 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
565 }
566
567 /** @inheritDoc */
8f52842f 568 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 569 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
570 this.checkValidTasksQueueOptions(tasksQueueOptions)
571 this.opts.tasksQueueOptions =
572 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 573 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
574 delete this.opts.tasksQueueOptions
575 }
576 }
577
578 private buildTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions
580 ): TasksQueueOptions {
581 return {
582 concurrency: tasksQueueOptions?.concurrency ?? 1
583 }
584 }
585
c319c66b
JB
586 /**
587 * Whether the pool is full or not.
588 *
589 * The pool filling boolean status.
590 */
dea903a8
JB
591 protected get full (): boolean {
592 return this.workerNodes.length >= this.maxSize
593 }
c2ade475 594
c319c66b
JB
595 /**
596 * Whether the pool is busy or not.
597 *
598 * The pool busyness boolean status.
599 */
600 protected abstract get busy (): boolean
7c0ba920 601
6c6afb84
JB
602 /**
603 * Whether worker nodes are executing at least one task.
604 *
605 * @returns Worker nodes busyness boolean status.
606 */
c2ade475 607 protected internalBusy (): boolean {
e0ae6100
JB
608 return (
609 this.workerNodes.findIndex(workerNode => {
f59e1027 610 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
611 }) === -1
612 )
cb70b19d
JB
613 }
614
afc003b2 615 /** @inheritDoc */
a86b6df1 616 public async execute (data?: Data, name?: string): Promise<Response> {
52b71763
JB
617 return await new Promise<Response>((resolve, reject) => {
618 const timestamp = performance.now()
619 const workerNodeKey = this.chooseWorkerNode()
620 const submittedTask: Task<Data> = {
621 name: name ?? DEFAULT_TASK_NAME,
622 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
623 data: data ?? ({} as Data),
624 timestamp,
625 workerId: this.getWorkerInfo(workerNodeKey).id as number,
626 id: randomUUID()
627 }
02706357 628 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
629 resolve,
630 reject,
20dcad1a 631 worker: this.workerNodes[workerNodeKey].worker
2e81254d 632 })
52b71763
JB
633 if (
634 this.opts.enableTasksQueue === true &&
635 (this.busy ||
636 this.workerNodes[workerNodeKey].usage.tasks.executing >=
637 ((this.opts.tasksQueueOptions as TasksQueueOptions)
638 .concurrency as number))
639 ) {
640 this.enqueueTask(workerNodeKey, submittedTask)
641 } else {
642 this.executeTask(workerNodeKey, submittedTask)
643 }
644 this.checkAndEmitEvents()
2e81254d 645 })
280c2a77 646 }
c97c7edb 647
afc003b2 648 /** @inheritDoc */
c97c7edb 649 public async destroy (): Promise<void> {
1fbcaa7c 650 await Promise.all(
875a7c37
JB
651 this.workerNodes.map(async (workerNode, workerNodeKey) => {
652 this.flushTasksQueue(workerNodeKey)
47aacbaa 653 // FIXME: wait for tasks to be finished
920278a2
JB
654 const workerExitPromise = new Promise<void>(resolve => {
655 workerNode.worker.on('exit', () => {
656 resolve()
657 })
658 })
f06e48d8 659 await this.destroyWorker(workerNode.worker)
920278a2 660 await workerExitPromise
1fbcaa7c
JB
661 })
662 )
c97c7edb
S
663 }
664
4a6952ff 665 /**
6c6afb84 666 * Terminates the given worker.
4a6952ff 667 *
f06e48d8 668 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
669 */
670 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 671
729c563d 672 /**
6677a3d3
JB
673 * Setup hook to execute code before worker nodes are created in the abstract constructor.
674 * Can be overridden.
afc003b2
JB
675 *
676 * @virtual
729c563d 677 */
280c2a77 678 protected setupHook (): void {
d99ba5a8 679 // Intentionally empty
280c2a77 680 }
c97c7edb 681
729c563d 682 /**
280c2a77
S
683 * Should return whether the worker is the main worker or not.
684 */
685 protected abstract isMain (): boolean
686
687 /**
2e81254d 688 * Hook executed before the worker task execution.
bf9549ae 689 * Can be overridden.
729c563d 690 *
f06e48d8 691 * @param workerNodeKey - The worker node key.
1c6fe997 692 * @param task - The task to execute.
729c563d 693 */
1c6fe997
JB
694 protected beforeTaskExecutionHook (
695 workerNodeKey: number,
696 task: Task<Data>
697 ): void {
f59e1027 698 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
699 ++workerUsage.tasks.executing
700 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 701 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
702 task.name as string
703 ) as WorkerUsage
eb8afc8a
JB
704 ++taskWorkerUsage.tasks.executing
705 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
706 }
707
c01733f1 708 /**
2e81254d 709 * Hook executed after the worker task execution.
bf9549ae 710 * Can be overridden.
c01733f1 711 *
c923ce56 712 * @param worker - The worker.
38e795c1 713 * @param message - The received message.
c01733f1 714 */
2e81254d 715 protected afterTaskExecutionHook (
c923ce56 716 worker: Worker,
2740a743 717 message: MessageValue<Response>
bf9549ae 718 ): void {
ff128cc9
JB
719 const workerNodeKey = this.getWorkerNodeKey(worker)
720 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
721 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
722 this.updateRunTimeWorkerUsage(workerUsage, message)
723 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 724 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 725 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 726 ) as WorkerUsage
eb8afc8a
JB
727 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
728 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
729 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
730 }
731
732 private updateTaskStatisticsWorkerUsage (
733 workerUsage: WorkerUsage,
734 message: MessageValue<Response>
735 ): void {
a4e07f72
JB
736 const workerTaskStatistics = workerUsage.tasks
737 --workerTaskStatistics.executing
98e72cda
JB
738 if (message.taskError == null) {
739 ++workerTaskStatistics.executed
740 } else {
a4e07f72 741 ++workerTaskStatistics.failed
2740a743 742 }
f8eb0a2a
JB
743 }
744
a4e07f72
JB
745 private updateRunTimeWorkerUsage (
746 workerUsage: WorkerUsage,
f8eb0a2a
JB
747 message: MessageValue<Response>
748 ): void {
e4f20deb
JB
749 updateMeasurementStatistics(
750 workerUsage.runTime,
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
752 message.taskPerformance?.runTime ?? 0,
753 workerUsage.tasks.executed
754 )
f8eb0a2a
JB
755 }
756
a4e07f72
JB
757 private updateWaitTimeWorkerUsage (
758 workerUsage: WorkerUsage,
1c6fe997 759 task: Task<Data>
f8eb0a2a 760 ): void {
1c6fe997
JB
761 const timestamp = performance.now()
762 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
763 updateMeasurementStatistics(
764 workerUsage.waitTime,
765 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
766 taskWaitTime,
767 workerUsage.tasks.executed
768 )
c01733f1 769 }
770
a4e07f72 771 private updateEluWorkerUsage (
5df69fab 772 workerUsage: WorkerUsage,
62c15a68
JB
773 message: MessageValue<Response>
774 ): void {
008512c7
JB
775 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
776 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
777 updateMeasurementStatistics(
778 workerUsage.elu.active,
008512c7 779 eluTaskStatisticsRequirements,
e4f20deb
JB
780 message.taskPerformance?.elu?.active ?? 0,
781 workerUsage.tasks.executed
782 )
783 updateMeasurementStatistics(
784 workerUsage.elu.idle,
008512c7 785 eluTaskStatisticsRequirements,
e4f20deb
JB
786 message.taskPerformance?.elu?.idle ?? 0,
787 workerUsage.tasks.executed
788 )
008512c7 789 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 790 if (message.taskPerformance?.elu != null) {
f7510105
JB
791 if (workerUsage.elu.utilization != null) {
792 workerUsage.elu.utilization =
793 (workerUsage.elu.utilization +
794 message.taskPerformance.elu.utilization) /
795 2
796 } else {
797 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
798 }
62c15a68
JB
799 }
800 }
801 }
802
280c2a77 803 /**
f06e48d8 804 * Chooses a worker node for the next task.
280c2a77 805 *
6c6afb84 806 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 807 *
20dcad1a 808 * @returns The worker node key
280c2a77 809 */
6c6afb84 810 private chooseWorkerNode (): number {
930dcf12 811 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
812 const worker = this.createAndSetupDynamicWorker()
813 if (
814 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
815 ) {
816 return this.getWorkerNodeKey(worker)
817 }
17393ac8 818 }
930dcf12
JB
819 return this.workerChoiceStrategyContext.execute()
820 }
821
6c6afb84
JB
822 /**
823 * Conditions for dynamic worker creation.
824 *
825 * @returns Whether to create a dynamic worker or not.
826 */
827 private shallCreateDynamicWorker (): boolean {
930dcf12 828 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
829 }
830
280c2a77 831 /**
675bb809 832 * Sends a message to the given worker.
280c2a77 833 *
38e795c1
JB
834 * @param worker - The worker which should receive the message.
835 * @param message - The message.
280c2a77
S
836 */
837 protected abstract sendToWorker (
838 worker: Worker,
839 message: MessageValue<Data>
840 ): void
841
729c563d 842 /**
41344292 843 * Creates a new worker.
6c6afb84
JB
844 *
845 * @returns Newly created worker.
729c563d 846 */
280c2a77 847 protected abstract createWorker (): Worker
c97c7edb 848
4a6952ff 849 /**
f06e48d8 850 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
851 *
852 * @returns New, completely set up worker.
853 */
854 protected createAndSetupWorker (): Worker {
bdacc2d2 855 const worker = this.createWorker()
280c2a77 856
35cf1c03 857 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 858 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 859 worker.on('error', error => {
9b106837
JB
860 const workerNodeKey = this.getWorkerNodeKey(worker)
861 const workerInfo = this.getWorkerInfo(workerNodeKey)
862 workerInfo.ready = false
0dc838e3 863 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 864 this.emitter?.emit(PoolEvents.error, error)
2431bdb4 865 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 866 if (workerInfo.dynamic) {
8a1260a3
JB
867 this.createAndSetupDynamicWorker()
868 } else {
869 this.createAndSetupWorker()
870 }
5baee0d7 871 }
19dbc45b 872 if (this.opts.enableTasksQueue === true) {
9b106837 873 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 874 }
5baee0d7 875 })
a35560ba
S
876 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
877 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 878 worker.once('exit', () => {
f06e48d8 879 this.removeWorkerNode(worker)
a974afa6 880 })
280c2a77 881
b0a4db63 882 this.addWorkerNode(worker)
280c2a77
S
883
884 this.afterWorkerSetup(worker)
885
c97c7edb
S
886 return worker
887 }
be0676b3 888
930dcf12
JB
889 /**
890 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
891 *
892 * @returns New, completely set up dynamic worker.
893 */
894 protected createAndSetupDynamicWorker (): Worker {
895 const worker = this.createAndSetupWorker()
896 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 897 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
898 if (
899 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
900 (message.kill != null &&
901 ((this.opts.enableTasksQueue === false &&
f59e1027 902 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 903 (this.opts.enableTasksQueue === true &&
f59e1027 904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 905 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
906 ) {
907 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
908 void (this.destroyWorker(worker) as Promise<void>)
909 }
910 })
e221309a 911 const workerInfo = this.getWorkerInfoByWorker(worker)
b0a4db63 912 workerInfo.dynamic = true
b97d82d8
JB
913 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
914 workerInfo.ready = true
915 }
21f710aa 916 this.sendToWorker(worker, {
b0a4db63 917 checkActive: true,
21f710aa
JB
918 workerId: workerInfo.id as number
919 })
930dcf12
JB
920 return worker
921 }
922
a2ed5053
JB
923 /**
924 * Registers a listener callback on the given worker.
925 *
926 * @param worker - The worker which should register a listener.
927 * @param listener - The message listener callback.
928 */
85aeb3f3
JB
929 protected abstract registerWorkerMessageListener<
930 Message extends Data | Response
931 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
a2ed5053
JB
932
933 /**
934 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
935 * Can be overridden.
936 *
937 * @param worker - The newly created worker.
938 */
939 protected afterWorkerSetup (worker: Worker): void {
940 // Listen to worker messages.
941 this.registerWorkerMessageListener(worker, this.workerListener())
85aeb3f3
JB
942 // Send the startup message to worker.
943 this.sendStartupMessageToWorker(worker)
d2c73f82
JB
944 // Setup worker task statistics computation.
945 this.setWorkerStatistics(worker)
946 }
947
85aeb3f3
JB
948 /**
949 * Sends the startup message to the given worker.
950 *
951 * @param worker - The worker which should receive the startup message.
952 */
953 protected abstract sendStartupMessageToWorker (worker: Worker): void
a2ed5053
JB
954
955 private redistributeQueuedTasks (workerNodeKey: number): void {
956 while (this.tasksQueueSize(workerNodeKey) > 0) {
957 let targetWorkerNodeKey: number = workerNodeKey
958 let minQueuedTasks = Infinity
959 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
960 const workerInfo = this.getWorkerInfo(workerNodeId)
961 if (
962 workerNodeId !== workerNodeKey &&
963 workerInfo.ready &&
964 workerNode.usage.tasks.queued === 0
965 ) {
966 targetWorkerNodeKey = workerNodeId
967 break
968 }
969 if (
970 workerNodeId !== workerNodeKey &&
971 workerInfo.ready &&
972 workerNode.usage.tasks.queued < minQueuedTasks
973 ) {
974 minQueuedTasks = workerNode.usage.tasks.queued
975 targetWorkerNodeKey = workerNodeId
976 }
977 }
978 this.enqueueTask(
979 targetWorkerNodeKey,
980 this.dequeueTask(workerNodeKey) as Task<Data>
981 )
982 }
983 }
984
be0676b3 985 /**
ff733df7 986 * This function is the listener registered for each worker message.
be0676b3 987 *
bdacc2d2 988 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
989 */
990 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 991 return message => {
21f710aa 992 this.checkMessageWorkerId(message)
d2c73f82 993 if (message.ready != null) {
10e2aa7e
JB
994 // Worker ready response received
995 this.handleWorkerReadyResponse(message)
f59e1027 996 } else if (message.id != null) {
a3445496 997 // Task execution response received
6b272951
JB
998 this.handleTaskExecutionResponse(message)
999 }
1000 }
1001 }
1002
10e2aa7e 1003 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
e221309a
JB
1004 this.getWorkerInfoByWorker(
1005 this.getWorkerById(message.workerId) as Worker
1006 ).ready = message.ready as boolean
2431bdb4
JB
1007 if (this.emitter != null && this.ready) {
1008 this.emitter.emit(PoolEvents.ready, this.info)
1009 }
6b272951
JB
1010 }
1011
1012 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1013 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1014 if (promiseResponse != null) {
1015 if (message.taskError != null) {
2a69b8c5 1016 this.emitter?.emit(PoolEvents.taskError, message.taskError)
6b272951
JB
1017 promiseResponse.reject(message.taskError.message)
1018 } else {
1019 promiseResponse.resolve(message.data as Response)
1020 }
1021 this.afterTaskExecutionHook(promiseResponse.worker, message)
1022 this.promiseResponseMap.delete(message.id as string)
1023 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1024 if (
1025 this.opts.enableTasksQueue === true &&
1026 this.tasksQueueSize(workerNodeKey) > 0
1027 ) {
1028 this.executeTask(
1029 workerNodeKey,
1030 this.dequeueTask(workerNodeKey) as Task<Data>
1031 )
be0676b3 1032 }
6b272951 1033 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1034 }
be0676b3 1035 }
7c0ba920 1036
ff733df7 1037 private checkAndEmitEvents (): void {
1f68cede 1038 if (this.emitter != null) {
ff733df7 1039 if (this.busy) {
2845f2a5 1040 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1041 }
6b27d407 1042 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1043 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1044 }
164d950a
JB
1045 }
1046 }
1047
8a1260a3 1048 /**
e221309a 1049 * Gets the worker information from the given worker node key.
8a1260a3
JB
1050 *
1051 * @param workerNodeKey - The worker node key.
3f09ed9f 1052 * @returns The worker information.
8a1260a3
JB
1053 */
1054 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1055 return this.workerNodes[workerNodeKey].info
1056 }
1057
e221309a
JB
1058 /**
1059 * Gets the worker information from the given worker.
1060 *
1061 * @param worker - The worker.
3f09ed9f
JB
1062 * @returns The worker information.
1063 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker is not found.
e221309a 1064 */
85aeb3f3 1065 protected getWorkerInfoByWorker (worker: Worker): WorkerInfo {
dc02fc29
JB
1066 const workerNodeKey = this.getWorkerNodeKey(worker)
1067 if (workerNodeKey === -1) {
1068 throw new Error('Worker not found')
1069 }
1070 return this.workerNodes[workerNodeKey].info
e221309a
JB
1071 }
1072
a05c10de 1073 /**
b0a4db63 1074 * Adds the given worker in the pool worker nodes.
ea7a90d3 1075 *
38e795c1 1076 * @param worker - The worker.
f06e48d8 1077 * @returns The worker nodes length.
ea7a90d3 1078 */
b0a4db63 1079 private addWorkerNode (worker: Worker): number {
cc3ab78b 1080 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1081 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1082 if (this.starting) {
1083 workerNode.info.ready = true
1084 }
cc3ab78b 1085 return this.workerNodes.push(workerNode)
ea7a90d3 1086 }
c923ce56 1087
51fe3d3c 1088 /**
f06e48d8 1089 * Removes the given worker from the pool worker nodes.
51fe3d3c 1090 *
f06e48d8 1091 * @param worker - The worker.
51fe3d3c 1092 */
416fd65c 1093 private removeWorkerNode (worker: Worker): void {
f06e48d8 1094 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1095 if (workerNodeKey !== -1) {
1096 this.workerNodes.splice(workerNodeKey, 1)
1097 this.workerChoiceStrategyContext.remove(workerNodeKey)
1098 }
51fe3d3c 1099 }
adc3c320 1100
b0a4db63
JB
1101 /**
1102 * Executes the given task on the given worker.
1103 *
1104 * @param worker - The worker.
1105 * @param task - The task to execute.
1106 */
2e81254d 1107 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1108 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1109 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1110 }
1111
f9f00b5f 1112 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1113 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1114 }
1115
416fd65c 1116 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1117 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1118 }
1119
416fd65c 1120 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1121 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1122 }
1123
416fd65c 1124 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1125 while (this.tasksQueueSize(workerNodeKey) > 0) {
1126 this.executeTask(
1127 workerNodeKey,
1128 this.dequeueTask(workerNodeKey) as Task<Data>
1129 )
ff733df7 1130 }
4b628b48 1131 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1132 }
1133
ef41a6e6
JB
1134 private flushTasksQueues (): void {
1135 for (const [workerNodeKey] of this.workerNodes.entries()) {
1136 this.flushTasksQueue(workerNodeKey)
1137 }
1138 }
b6b32453
JB
1139
1140 private setWorkerStatistics (worker: Worker): void {
1141 this.sendToWorker(worker, {
1142 statistics: {
87de9ff5
JB
1143 runTime:
1144 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1145 .runTime.aggregate,
87de9ff5 1146 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1147 .elu.aggregate
21f710aa 1148 },
e221309a 1149 workerId: this.getWorkerInfoByWorker(worker).id as number
b6b32453
JB
1150 })
1151 }
c97c7edb 1152}