feat: add pool and worker readyness tracking infrastructure
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
4b628b48 21 type TasksQueueOptions
c4855468 22} from './pool'
e102732c
JB
23import type {
24 IWorker,
4b628b48 25 IWorkerNode,
e102732c
JB
26 MessageHandler,
27 Task,
8a1260a3 28 WorkerInfo,
4b628b48 29 WorkerType,
e102732c
JB
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 39import { version } from './version'
4b628b48 40import { WorkerNode } from './worker-node'
23ccf9d7 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
4b628b48 55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
161 if (this.type === PoolTypes.dynamic && min > max) {
162 throw new RangeError(
163 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 )
165 } else if (this.type === PoolTypes.dynamic && min === max) {
166 throw new RangeError(
167 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
168 )
8d3782fa
JB
169 }
170 }
171
7c0ba920 172 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
173 if (isPlainObject(opts)) {
174 this.opts.workerChoiceStrategy =
175 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
176 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
177 this.opts.workerChoiceStrategyOptions =
178 opts.workerChoiceStrategyOptions ??
179 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
180 this.checkValidWorkerChoiceStrategyOptions(
181 this.opts.workerChoiceStrategyOptions
182 )
1f68cede 183 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
184 this.opts.enableEvents = opts.enableEvents ?? true
185 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
186 if (this.opts.enableTasksQueue) {
187 this.checkValidTasksQueueOptions(
188 opts.tasksQueueOptions as TasksQueueOptions
189 )
190 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
191 opts.tasksQueueOptions as TasksQueueOptions
192 )
193 }
194 } else {
195 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 196 }
aee46736
JB
197 }
198
199 private checkValidWorkerChoiceStrategy (
200 workerChoiceStrategy: WorkerChoiceStrategy
201 ): void {
202 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 203 throw new Error(
aee46736 204 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
205 )
206 }
7c0ba920
JB
207 }
208
0d80593b
JB
209 private checkValidWorkerChoiceStrategyOptions (
210 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
211 ): void {
212 if (!isPlainObject(workerChoiceStrategyOptions)) {
213 throw new TypeError(
214 'Invalid worker choice strategy options: must be a plain object'
215 )
216 }
49be33fe
JB
217 if (
218 workerChoiceStrategyOptions.weights != null &&
6b27d407 219 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
220 ) {
221 throw new Error(
222 'Invalid worker choice strategy options: must have a weight for each worker node'
223 )
224 }
f0d7f803
JB
225 if (
226 workerChoiceStrategyOptions.measurement != null &&
227 !Object.values(Measurements).includes(
228 workerChoiceStrategyOptions.measurement
229 )
230 ) {
231 throw new Error(
232 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
233 )
234 }
0d80593b
JB
235 }
236
a20f0ba5
JB
237 private checkValidTasksQueueOptions (
238 tasksQueueOptions: TasksQueueOptions
239 ): void {
0d80593b
JB
240 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
241 throw new TypeError('Invalid tasks queue options: must be a plain object')
242 }
f0d7f803
JB
243 if (
244 tasksQueueOptions?.concurrency != null &&
245 !Number.isSafeInteger(tasksQueueOptions.concurrency)
246 ) {
247 throw new TypeError(
248 'Invalid worker tasks concurrency: must be an integer'
249 )
250 }
251 if (
252 tasksQueueOptions?.concurrency != null &&
253 tasksQueueOptions.concurrency <= 0
254 ) {
a20f0ba5 255 throw new Error(
f0d7f803 256 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
257 )
258 }
259 }
260
08f3f44c 261 /** @inheritDoc */
6b27d407
JB
262 public get info (): PoolInfo {
263 return {
23ccf9d7 264 version,
6b27d407 265 type: this.type,
184855e6 266 worker: this.worker,
2431bdb4
JB
267 ready: this.ready,
268 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
269 minSize: this.minSize,
270 maxSize: this.maxSize,
c05f0d50
JB
271 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
272 .runTime.aggregate &&
1305e9a8
JB
273 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
274 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
275 workerNodes: this.workerNodes.length,
276 idleWorkerNodes: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
f59e1027 278 workerNode.usage.tasks.executing === 0
a4e07f72
JB
279 ? accumulator + 1
280 : accumulator,
6b27d407
JB
281 0
282 ),
283 busyWorkerNodes: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
f59e1027 285 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
286 0
287 ),
a4e07f72 288 executedTasks: this.workerNodes.reduce(
6b27d407 289 (accumulator, workerNode) =>
f59e1027 290 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
291 0
292 ),
293 executingTasks: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
296 0
297 ),
298 queuedTasks: this.workerNodes.reduce(
df593701 299 (accumulator, workerNode) =>
f59e1027 300 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
301 0
302 ),
303 maxQueuedTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 306 0
a4e07f72
JB
307 ),
308 failedTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 accumulator + workerNode.usage.tasks.failed,
a4e07f72 311 0
1dcf8b7b
JB
312 ),
313 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
314 .runTime.aggregate && {
315 runTime: {
98e72cda
JB
316 minimum: round(
317 Math.min(
318 ...this.workerNodes.map(
319 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
320 )
1dcf8b7b
JB
321 )
322 ),
98e72cda
JB
323 maximum: round(
324 Math.max(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
327 )
1dcf8b7b 328 )
98e72cda
JB
329 ),
330 average: round(
331 this.workerNodes.reduce(
332 (accumulator, workerNode) =>
333 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
334 0
335 ) /
336 this.workerNodes.reduce(
337 (accumulator, workerNode) =>
338 accumulator + (workerNode.usage.tasks?.executed ?? 0),
339 0
340 )
341 ),
342 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
343 .runTime.median && {
344 median: round(
345 median(
346 this.workerNodes.map(
347 workerNode => workerNode.usage.runTime?.median ?? 0
348 )
349 )
350 )
351 })
1dcf8b7b
JB
352 }
353 }),
354 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
355 .waitTime.aggregate && {
356 waitTime: {
98e72cda
JB
357 minimum: round(
358 Math.min(
359 ...this.workerNodes.map(
360 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
361 )
1dcf8b7b
JB
362 )
363 ),
98e72cda
JB
364 maximum: round(
365 Math.max(
366 ...this.workerNodes.map(
367 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
368 )
1dcf8b7b 369 )
98e72cda
JB
370 ),
371 average: round(
372 this.workerNodes.reduce(
373 (accumulator, workerNode) =>
374 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
375 0
376 ) /
377 this.workerNodes.reduce(
378 (accumulator, workerNode) =>
379 accumulator + (workerNode.usage.tasks?.executed ?? 0),
380 0
381 )
382 ),
383 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
384 .waitTime.median && {
385 median: round(
386 median(
387 this.workerNodes.map(
388 workerNode => workerNode.usage.waitTime?.median ?? 0
389 )
390 )
391 )
392 })
1dcf8b7b
JB
393 }
394 })
6b27d407
JB
395 }
396 }
08f3f44c 397
2431bdb4
JB
398 private get starting (): boolean {
399 return (
400 !this.full ||
401 (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
402 )
403 }
404
405 private get ready (): boolean {
406 return (
407 this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
408 )
409 }
410
afe0d5bf
JB
411 /**
412 * Gets the approximate pool utilization.
413 *
414 * @returns The pool utilization.
415 */
416 private get utilization (): number {
fe7d90db
JB
417 const poolRunTimeCapacity =
418 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
419 const totalTasksRunTime = this.workerNodes.reduce(
420 (accumulator, workerNode) =>
71514351 421 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
422 0
423 )
424 const totalTasksWaitTime = this.workerNodes.reduce(
425 (accumulator, workerNode) =>
71514351 426 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
427 0
428 )
429 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
430 }
431
8881ae32
JB
432 /**
433 * Pool type.
434 *
435 * If it is `'dynamic'`, it provides the `max` property.
436 */
437 protected abstract get type (): PoolType
438
184855e6
JB
439 /**
440 * Gets the worker type.
441 */
442 protected abstract get worker (): WorkerType
443
c2ade475 444 /**
6b27d407 445 * Pool minimum size.
c2ade475 446 */
6b27d407 447 protected abstract get minSize (): number
ff733df7
JB
448
449 /**
6b27d407 450 * Pool maximum size.
ff733df7 451 */
6b27d407 452 protected abstract get maxSize (): number
a35560ba 453
f59e1027
JB
454 /**
455 * Get the worker given its id.
456 *
457 * @param workerId - The worker id.
458 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
459 */
460 private getWorkerById (workerId: number): Worker | undefined {
461 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
462 ?.worker
463 }
464
ffcbbad8 465 /**
f06e48d8 466 * Gets the given worker its worker node key.
ffcbbad8
JB
467 *
468 * @param worker - The worker.
f59e1027 469 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 470 */
f06e48d8
JB
471 private getWorkerNodeKey (worker: Worker): number {
472 return this.workerNodes.findIndex(
473 workerNode => workerNode.worker === worker
474 )
bf9549ae
JB
475 }
476
afc003b2 477 /** @inheritDoc */
a35560ba 478 public setWorkerChoiceStrategy (
59219cbb
JB
479 workerChoiceStrategy: WorkerChoiceStrategy,
480 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 481 ): void {
aee46736 482 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 483 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
484 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
485 this.opts.workerChoiceStrategy
486 )
487 if (workerChoiceStrategyOptions != null) {
488 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
489 }
8a1260a3 490 for (const workerNode of this.workerNodes) {
4b628b48 491 workerNode.resetUsage()
b6b32453 492 this.setWorkerStatistics(workerNode.worker)
59219cbb 493 }
a20f0ba5
JB
494 }
495
496 /** @inheritDoc */
497 public setWorkerChoiceStrategyOptions (
498 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
499 ): void {
0d80593b 500 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
501 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
502 this.workerChoiceStrategyContext.setOptions(
503 this.opts.workerChoiceStrategyOptions
a35560ba
S
504 )
505 }
506
a20f0ba5 507 /** @inheritDoc */
8f52842f
JB
508 public enableTasksQueue (
509 enable: boolean,
510 tasksQueueOptions?: TasksQueueOptions
511 ): void {
a20f0ba5 512 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 513 this.flushTasksQueues()
a20f0ba5
JB
514 }
515 this.opts.enableTasksQueue = enable
8f52842f 516 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
517 }
518
519 /** @inheritDoc */
8f52842f 520 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 521 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
522 this.checkValidTasksQueueOptions(tasksQueueOptions)
523 this.opts.tasksQueueOptions =
524 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 525 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
526 delete this.opts.tasksQueueOptions
527 }
528 }
529
530 private buildTasksQueueOptions (
531 tasksQueueOptions: TasksQueueOptions
532 ): TasksQueueOptions {
533 return {
534 concurrency: tasksQueueOptions?.concurrency ?? 1
535 }
536 }
537
c319c66b
JB
538 /**
539 * Whether the pool is full or not.
540 *
541 * The pool filling boolean status.
542 */
dea903a8
JB
543 protected get full (): boolean {
544 return this.workerNodes.length >= this.maxSize
545 }
c2ade475 546
c319c66b
JB
547 /**
548 * Whether the pool is busy or not.
549 *
550 * The pool busyness boolean status.
551 */
552 protected abstract get busy (): boolean
7c0ba920 553
6c6afb84
JB
554 /**
555 * Whether worker nodes are executing at least one task.
556 *
557 * @returns Worker nodes busyness boolean status.
558 */
c2ade475 559 protected internalBusy (): boolean {
e0ae6100
JB
560 return (
561 this.workerNodes.findIndex(workerNode => {
f59e1027 562 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
563 }) === -1
564 )
cb70b19d
JB
565 }
566
afc003b2 567 /** @inheritDoc */
a86b6df1 568 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 569 const timestamp = performance.now()
20dcad1a 570 const workerNodeKey = this.chooseWorkerNode()
adc3c320 571 const submittedTask: Task<Data> = {
a86b6df1 572 name,
e5a5c0fc
JB
573 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
574 data: data ?? ({} as Data),
b6b32453 575 timestamp,
2845f2a5 576 id: randomUUID()
adc3c320 577 }
2e81254d 578 const res = new Promise<Response>((resolve, reject) => {
02706357 579 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
580 resolve,
581 reject,
20dcad1a 582 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
583 })
584 })
ff733df7
JB
585 if (
586 this.opts.enableTasksQueue === true &&
7171d33f 587 (this.busy ||
f59e1027 588 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 589 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 590 .concurrency as number))
ff733df7 591 ) {
26a929d7
JB
592 this.enqueueTask(workerNodeKey, submittedTask)
593 } else {
2e81254d 594 this.executeTask(workerNodeKey, submittedTask)
adc3c320 595 }
ff733df7 596 this.checkAndEmitEvents()
78cea37e 597 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
598 return res
599 }
c97c7edb 600
afc003b2 601 /** @inheritDoc */
c97c7edb 602 public async destroy (): Promise<void> {
1fbcaa7c 603 await Promise.all(
875a7c37
JB
604 this.workerNodes.map(async (workerNode, workerNodeKey) => {
605 this.flushTasksQueue(workerNodeKey)
47aacbaa 606 // FIXME: wait for tasks to be finished
920278a2
JB
607 const workerExitPromise = new Promise<void>(resolve => {
608 workerNode.worker.on('exit', () => {
609 resolve()
610 })
611 })
f06e48d8 612 await this.destroyWorker(workerNode.worker)
920278a2 613 await workerExitPromise
1fbcaa7c
JB
614 })
615 )
c97c7edb
S
616 }
617
4a6952ff 618 /**
6c6afb84 619 * Terminates the given worker.
4a6952ff 620 *
f06e48d8 621 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
622 */
623 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 624
729c563d 625 /**
6677a3d3
JB
626 * Setup hook to execute code before worker nodes are created in the abstract constructor.
627 * Can be overridden.
afc003b2
JB
628 *
629 * @virtual
729c563d 630 */
280c2a77 631 protected setupHook (): void {
d99ba5a8 632 // Intentionally empty
280c2a77 633 }
c97c7edb 634
729c563d 635 /**
280c2a77
S
636 * Should return whether the worker is the main worker or not.
637 */
638 protected abstract isMain (): boolean
639
640 /**
2e81254d 641 * Hook executed before the worker task execution.
bf9549ae 642 * Can be overridden.
729c563d 643 *
f06e48d8 644 * @param workerNodeKey - The worker node key.
1c6fe997 645 * @param task - The task to execute.
729c563d 646 */
1c6fe997
JB
647 protected beforeTaskExecutionHook (
648 workerNodeKey: number,
649 task: Task<Data>
650 ): void {
f59e1027 651 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
652 ++workerUsage.tasks.executing
653 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
654 }
655
c01733f1 656 /**
2e81254d 657 * Hook executed after the worker task execution.
bf9549ae 658 * Can be overridden.
c01733f1 659 *
c923ce56 660 * @param worker - The worker.
38e795c1 661 * @param message - The received message.
c01733f1 662 */
2e81254d 663 protected afterTaskExecutionHook (
c923ce56 664 worker: Worker,
2740a743 665 message: MessageValue<Response>
bf9549ae 666 ): void {
f59e1027 667 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
668 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
669 this.updateRunTimeWorkerUsage(workerUsage, message)
670 this.updateEluWorkerUsage(workerUsage, message)
671 }
672
673 private updateTaskStatisticsWorkerUsage (
674 workerUsage: WorkerUsage,
675 message: MessageValue<Response>
676 ): void {
a4e07f72
JB
677 const workerTaskStatistics = workerUsage.tasks
678 --workerTaskStatistics.executing
98e72cda
JB
679 if (message.taskError == null) {
680 ++workerTaskStatistics.executed
681 } else {
a4e07f72 682 ++workerTaskStatistics.failed
2740a743 683 }
f8eb0a2a
JB
684 }
685
a4e07f72
JB
686 private updateRunTimeWorkerUsage (
687 workerUsage: WorkerUsage,
f8eb0a2a
JB
688 message: MessageValue<Response>
689 ): void {
87de9ff5
JB
690 if (
691 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 692 .aggregate
87de9ff5 693 ) {
f7510105 694 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
695 workerUsage.runTime.aggregate =
696 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
697 workerUsage.runTime.minimum = Math.min(
698 taskRunTime,
71514351 699 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
700 )
701 workerUsage.runTime.maximum = Math.max(
702 taskRunTime,
71514351 703 workerUsage.runTime?.maximum ?? -Infinity
f7510105 704 )
c6bd2650 705 if (
932fc8be
JB
706 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
707 .average &&
a4e07f72 708 workerUsage.tasks.executed !== 0
c6bd2650 709 ) {
a4e07f72 710 workerUsage.runTime.average =
98e72cda 711 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 712 }
3fa4cdd2 713 if (
932fc8be
JB
714 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
715 .median &&
d715b7bc 716 message.taskPerformance?.runTime != null
3fa4cdd2 717 ) {
a4e07f72
JB
718 workerUsage.runTime.history.push(message.taskPerformance.runTime)
719 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 720 }
3032893a 721 }
f8eb0a2a
JB
722 }
723
a4e07f72
JB
724 private updateWaitTimeWorkerUsage (
725 workerUsage: WorkerUsage,
1c6fe997 726 task: Task<Data>
f8eb0a2a 727 ): void {
1c6fe997
JB
728 const timestamp = performance.now()
729 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
730 if (
731 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 732 .aggregate
87de9ff5 733 ) {
71514351
JB
734 workerUsage.waitTime.aggregate =
735 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
736 workerUsage.waitTime.minimum = Math.min(
737 taskWaitTime,
71514351 738 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
739 )
740 workerUsage.waitTime.maximum = Math.max(
741 taskWaitTime,
71514351 742 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 743 )
09a6305f 744 if (
87de9ff5 745 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 746 .waitTime.average &&
a4e07f72 747 workerUsage.tasks.executed !== 0
09a6305f 748 ) {
a4e07f72 749 workerUsage.waitTime.average =
98e72cda 750 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
751 }
752 if (
87de9ff5 753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 754 .waitTime.median
09a6305f 755 ) {
1c6fe997 756 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 757 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 758 }
0567595a 759 }
c01733f1 760 }
761
a4e07f72 762 private updateEluWorkerUsage (
5df69fab 763 workerUsage: WorkerUsage,
62c15a68
JB
764 message: MessageValue<Response>
765 ): void {
5df69fab
JB
766 if (
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
768 .aggregate
769 ) {
f7510105 770 if (message.taskPerformance?.elu != null) {
71514351
JB
771 workerUsage.elu.idle.aggregate =
772 (workerUsage.elu.idle?.aggregate ?? 0) +
773 message.taskPerformance.elu.idle
774 workerUsage.elu.active.aggregate =
775 (workerUsage.elu.active?.aggregate ?? 0) +
776 message.taskPerformance.elu.active
f7510105
JB
777 if (workerUsage.elu.utilization != null) {
778 workerUsage.elu.utilization =
779 (workerUsage.elu.utilization +
780 message.taskPerformance.elu.utilization) /
781 2
782 } else {
783 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
784 }
785 workerUsage.elu.idle.minimum = Math.min(
786 message.taskPerformance.elu.idle,
71514351 787 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
788 )
789 workerUsage.elu.idle.maximum = Math.max(
790 message.taskPerformance.elu.idle,
71514351 791 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
792 )
793 workerUsage.elu.active.minimum = Math.min(
794 message.taskPerformance.elu.active,
71514351 795 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
796 )
797 workerUsage.elu.active.maximum = Math.max(
798 message.taskPerformance.elu.active,
71514351 799 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 800 )
71514351
JB
801 if (
802 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
803 .average &&
804 workerUsage.tasks.executed !== 0
805 ) {
71514351 806 workerUsage.elu.idle.average =
98e72cda 807 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 808 workerUsage.elu.active.average =
98e72cda 809 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
810 }
811 if (
812 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 813 .median
71514351
JB
814 ) {
815 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
816 workerUsage.elu.active.history.push(
817 message.taskPerformance.elu.active
818 )
819 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
820 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
821 }
62c15a68
JB
822 }
823 }
824 }
825
280c2a77 826 /**
f06e48d8 827 * Chooses a worker node for the next task.
280c2a77 828 *
6c6afb84 829 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 830 *
20dcad1a 831 * @returns The worker node key
280c2a77 832 */
6c6afb84 833 private chooseWorkerNode (): number {
930dcf12 834 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
835 const worker = this.createAndSetupDynamicWorker()
836 if (
837 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
838 ) {
839 return this.getWorkerNodeKey(worker)
840 }
17393ac8 841 }
930dcf12
JB
842 return this.workerChoiceStrategyContext.execute()
843 }
844
6c6afb84
JB
845 /**
846 * Conditions for dynamic worker creation.
847 *
848 * @returns Whether to create a dynamic worker or not.
849 */
850 private shallCreateDynamicWorker (): boolean {
930dcf12 851 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
852 }
853
280c2a77 854 /**
675bb809 855 * Sends a message to the given worker.
280c2a77 856 *
38e795c1
JB
857 * @param worker - The worker which should receive the message.
858 * @param message - The message.
280c2a77
S
859 */
860 protected abstract sendToWorker (
861 worker: Worker,
862 message: MessageValue<Data>
863 ): void
864
4a6952ff 865 /**
f06e48d8 866 * Registers a listener callback on the given worker.
4a6952ff 867 *
38e795c1
JB
868 * @param worker - The worker which should register a listener.
869 * @param listener - The message listener callback.
4a6952ff 870 */
e102732c
JB
871 private registerWorkerMessageListener<Message extends Data | Response>(
872 worker: Worker,
873 listener: (message: MessageValue<Message>) => void
874 ): void {
875 worker.on('message', listener as MessageHandler<Worker>)
876 }
c97c7edb 877
729c563d 878 /**
41344292 879 * Creates a new worker.
6c6afb84
JB
880 *
881 * @returns Newly created worker.
729c563d 882 */
280c2a77 883 protected abstract createWorker (): Worker
c97c7edb 884
729c563d 885 /**
f06e48d8 886 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 887 * Can be overridden.
729c563d 888 *
38e795c1 889 * @param worker - The newly created worker.
729c563d 890 */
6677a3d3 891 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
892 // Listen to worker messages.
893 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
894 // Send startup message to worker.
895 this.sendToWorker(worker, {
896 ready: false,
897 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id
898 })
899 // Setup worker task statistics computation.
900 this.setWorkerStatistics(worker)
e102732c 901 }
c97c7edb 902
4a6952ff 903 /**
f06e48d8 904 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
905 *
906 * @returns New, completely set up worker.
907 */
908 protected createAndSetupWorker (): Worker {
bdacc2d2 909 const worker = this.createWorker()
280c2a77 910
35cf1c03 911 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 912 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
913 worker.on('error', error => {
914 if (this.emitter != null) {
915 this.emitter.emit(PoolEvents.error, error)
916 }
5bc91f3e 917 if (this.opts.enableTasksQueue === true) {
7d2e3f59 918 this.redistributeQueuedTasks(worker)
5bc91f3e 919 }
2431bdb4 920 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
921 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
922 this.createAndSetupDynamicWorker()
923 } else {
924 this.createAndSetupWorker()
925 }
5baee0d7
JB
926 }
927 })
a35560ba
S
928 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
929 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 930 worker.once('exit', () => {
f06e48d8 931 this.removeWorkerNode(worker)
a974afa6 932 })
280c2a77 933
f06e48d8 934 this.pushWorkerNode(worker)
280c2a77
S
935
936 this.afterWorkerSetup(worker)
937
c97c7edb
S
938 return worker
939 }
be0676b3 940
7d2e3f59
JB
941 private redistributeQueuedTasks (worker: Worker): void {
942 const workerNodeKey = this.getWorkerNodeKey(worker)
943 while (this.tasksQueueSize(workerNodeKey) > 0) {
944 let targetWorkerNodeKey: number = workerNodeKey
945 let minQueuedTasks = Infinity
946 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
947 if (
948 workerNodeId !== workerNodeKey &&
949 workerNode.usage.tasks.queued === 0
950 ) {
951 targetWorkerNodeKey = workerNodeId
952 break
953 }
954 if (
955 workerNodeId !== workerNodeKey &&
956 workerNode.usage.tasks.queued < minQueuedTasks
957 ) {
958 minQueuedTasks = workerNode.usage.tasks.queued
959 targetWorkerNodeKey = workerNodeId
960 }
961 }
962 this.enqueueTask(
963 targetWorkerNodeKey,
964 this.dequeueTask(workerNodeKey) as Task<Data>
965 )
966 }
967 }
968
930dcf12
JB
969 /**
970 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
971 *
972 * @returns New, completely set up dynamic worker.
973 */
974 protected createAndSetupDynamicWorker (): Worker {
975 const worker = this.createAndSetupWorker()
976 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 977 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
978 if (
979 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
980 (message.kill != null &&
981 ((this.opts.enableTasksQueue === false &&
f59e1027 982 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 983 (this.opts.enableTasksQueue === true &&
f59e1027 984 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 985 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
986 ) {
987 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
988 void (this.destroyWorker(worker) as Promise<void>)
989 }
990 })
2431bdb4 991 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
48487131 992 this.sendToWorker(worker, { checkAlive: true })
930dcf12
JB
993 return worker
994 }
995
be0676b3 996 /**
ff733df7 997 * This function is the listener registered for each worker message.
be0676b3 998 *
bdacc2d2 999 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1000 */
1001 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1002 return message => {
2431bdb4 1003 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1004 // Worker ready message received
1005 this.handleWorkerReadyMessage(message)
f59e1027 1006 } else if (message.id != null) {
a3445496 1007 // Task execution response received
6b272951
JB
1008 this.handleTaskExecutionResponse(message)
1009 }
1010 }
1011 }
1012
7c8381eb 1013 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
6b272951
JB
1014 const worker = this.getWorkerById(message.workerId as number)
1015 if (worker != null) {
7c8381eb
JB
1016 this.getWorkerInfo(this.getWorkerNodeKey(worker)).ready =
1017 message.ready as boolean
6b272951
JB
1018 } else {
1019 throw new Error(
7c8381eb 1020 `Worker ready message received from unknown worker '${
6b272951
JB
1021 message.workerId as number
1022 }'`
1023 )
1024 }
2431bdb4
JB
1025 if (this.emitter != null && this.ready) {
1026 this.emitter.emit(PoolEvents.ready, this.info)
1027 }
6b272951
JB
1028 }
1029
1030 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1031 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1032 if (promiseResponse != null) {
1033 if (message.taskError != null) {
1034 if (this.emitter != null) {
1035 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1036 }
6b272951
JB
1037 promiseResponse.reject(message.taskError.message)
1038 } else {
1039 promiseResponse.resolve(message.data as Response)
1040 }
1041 this.afterTaskExecutionHook(promiseResponse.worker, message)
1042 this.promiseResponseMap.delete(message.id as string)
1043 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1044 if (
1045 this.opts.enableTasksQueue === true &&
1046 this.tasksQueueSize(workerNodeKey) > 0
1047 ) {
1048 this.executeTask(
1049 workerNodeKey,
1050 this.dequeueTask(workerNodeKey) as Task<Data>
1051 )
be0676b3 1052 }
6b272951 1053 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1054 }
be0676b3 1055 }
7c0ba920 1056
ff733df7 1057 private checkAndEmitEvents (): void {
1f68cede 1058 if (this.emitter != null) {
ff733df7 1059 if (this.busy) {
2845f2a5 1060 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1061 }
6b27d407 1062 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1063 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1064 }
164d950a
JB
1065 }
1066 }
1067
8a1260a3
JB
1068 /**
1069 * Gets the worker information.
1070 *
1071 * @param workerNodeKey - The worker node key.
1072 */
1073 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1074 return this.workerNodes[workerNodeKey].info
1075 }
1076
a05c10de 1077 /**
f06e48d8 1078 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1079 *
38e795c1 1080 * @param worker - The worker.
f06e48d8 1081 * @returns The worker nodes length.
ea7a90d3 1082 */
f06e48d8 1083 private pushWorkerNode (worker: Worker): number {
4b628b48 1084 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1085 }
c923ce56 1086
51fe3d3c 1087 /**
f06e48d8 1088 * Removes the given worker from the pool worker nodes.
51fe3d3c 1089 *
f06e48d8 1090 * @param worker - The worker.
51fe3d3c 1091 */
416fd65c 1092 private removeWorkerNode (worker: Worker): void {
f06e48d8 1093 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1094 if (workerNodeKey !== -1) {
1095 this.workerNodes.splice(workerNodeKey, 1)
1096 this.workerChoiceStrategyContext.remove(workerNodeKey)
1097 }
51fe3d3c 1098 }
adc3c320 1099
2e81254d 1100 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1101 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1102 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1103 }
1104
f9f00b5f 1105 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1106 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1107 }
1108
416fd65c 1109 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1110 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1111 }
1112
416fd65c 1113 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1114 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1115 }
1116
416fd65c 1117 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1118 while (this.tasksQueueSize(workerNodeKey) > 0) {
1119 this.executeTask(
1120 workerNodeKey,
1121 this.dequeueTask(workerNodeKey) as Task<Data>
1122 )
ff733df7 1123 }
4b628b48 1124 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1125 }
1126
ef41a6e6
JB
1127 private flushTasksQueues (): void {
1128 for (const [workerNodeKey] of this.workerNodes.entries()) {
1129 this.flushTasksQueue(workerNodeKey)
1130 }
1131 }
b6b32453
JB
1132
1133 private setWorkerStatistics (worker: Worker): void {
1134 this.sendToWorker(worker, {
1135 statistics: {
87de9ff5
JB
1136 runTime:
1137 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1138 .runTime.aggregate,
87de9ff5 1139 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1140 .elu.aggregate
b6b32453
JB
1141 }
1142 })
1143 }
c97c7edb 1144}