feat: add utilization to pool information
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
afe0d5bf
JB
8 median,
9 round
bbeadd16 10} from '../utils'
34a0cfab 11import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 12import { CircularArray } from '../circular-array'
29ee7e9a 13import { Queue } from '../queue'
c4855468 14import {
65d7a1c9 15 type IPool,
7c5a1080 16 PoolEmitter,
c4855468 17 PoolEvents,
6b27d407 18 type PoolInfo,
c4855468 19 type PoolOptions,
6b27d407
JB
20 type PoolType,
21 PoolTypes,
184855e6
JB
22 type TasksQueueOptions,
23 type WorkerType
c4855468 24} from './pool'
e102732c
JB
25import type {
26 IWorker,
27 MessageHandler,
28 Task,
29 WorkerNode,
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 39
729c563d 40/**
ea7a90d3 41 * Base class that implements some shared logic for all poolifier pools.
729c563d 42 *
38e795c1 43 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
44 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
45 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 46 */
c97c7edb 47export abstract class AbstractPool<
f06e48d8 48 Worker extends IWorker,
d3c8a1a8
S
49 Data = unknown,
50 Response = unknown
c4855468 51> implements IPool<Worker, Data, Response> {
afc003b2 52 /** @inheritDoc */
f06e48d8 53 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 54
afc003b2 55 /** @inheritDoc */
7c0ba920
JB
56 public readonly emitter?: PoolEmitter
57
be0676b3 58 /**
a3445496 59 * The execution response promise map.
be0676b3 60 *
2740a743 61 * - `key`: The message id of each submitted task.
a3445496 62 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 63 *
a3445496 64 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 65 */
c923ce56
JB
66 protected promiseResponseMap: Map<
67 string,
68 PromiseResponseWrapper<Worker, Response>
69 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 70
a35560ba 71 /**
51fe3d3c 72 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
73 */
74 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
75 Worker,
76 Data,
77 Response
a35560ba
S
78 >
79
afe0d5bf
JB
80 /**
81 * The start timestamp of the pool.
82 */
83 private readonly startTimestamp
84
729c563d
S
85 /**
86 * Constructs a new poolifier pool.
87 *
38e795c1 88 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 89 * @param filePath - Path to the worker file.
38e795c1 90 * @param opts - Options for the pool.
729c563d 91 */
c97c7edb 92 public constructor (
b4213b7f
JB
93 protected readonly numberOfWorkers: number,
94 protected readonly filePath: string,
95 protected readonly opts: PoolOptions<Worker>
c97c7edb 96 ) {
78cea37e 97 if (!this.isMain()) {
c97c7edb
S
98 throw new Error('Cannot start a pool from a worker!')
99 }
8d3782fa 100 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 101 this.checkFilePath(this.filePath)
7c0ba920 102 this.checkPoolOptions(this.opts)
1086026a 103
7254e419
JB
104 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
105 this.executeTask = this.executeTask.bind(this)
106 this.enqueueTask = this.enqueueTask.bind(this)
107 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 108
6bd72cd0 109 if (this.opts.enableEvents === true) {
7c0ba920
JB
110 this.emitter = new PoolEmitter()
111 }
d59df138
JB
112 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
113 Worker,
114 Data,
115 Response
da309861
JB
116 >(
117 this,
118 this.opts.workerChoiceStrategy,
119 this.opts.workerChoiceStrategyOptions
120 )
b6b32453
JB
121
122 this.setupHook()
123
afe0d5bf 124 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
125 this.createAndSetupWorker()
126 }
afe0d5bf
JB
127
128 this.startTimestamp = performance.now()
c97c7edb
S
129 }
130
a35560ba 131 private checkFilePath (filePath: string): void {
ffcbbad8
JB
132 if (
133 filePath == null ||
134 (typeof filePath === 'string' && filePath.trim().length === 0)
135 ) {
c510fea7
APA
136 throw new Error('Please specify a file with a worker implementation')
137 }
138 }
139
8d3782fa
JB
140 private checkNumberOfWorkers (numberOfWorkers: number): void {
141 if (numberOfWorkers == null) {
142 throw new Error(
143 'Cannot instantiate a pool without specifying the number of workers'
144 )
78cea37e 145 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 146 throw new TypeError(
0d80593b 147 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
148 )
149 } else if (numberOfWorkers < 0) {
473c717a 150 throw new RangeError(
8d3782fa
JB
151 'Cannot instantiate a pool with a negative number of workers'
152 )
6b27d407 153 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
154 throw new Error('Cannot instantiate a fixed pool with no worker')
155 }
156 }
157
7c0ba920 158 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
159 if (isPlainObject(opts)) {
160 this.opts.workerChoiceStrategy =
161 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
162 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
163 this.opts.workerChoiceStrategyOptions =
164 opts.workerChoiceStrategyOptions ??
165 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
166 this.checkValidWorkerChoiceStrategyOptions(
167 this.opts.workerChoiceStrategyOptions
168 )
1f68cede 169 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
170 this.opts.enableEvents = opts.enableEvents ?? true
171 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
172 if (this.opts.enableTasksQueue) {
173 this.checkValidTasksQueueOptions(
174 opts.tasksQueueOptions as TasksQueueOptions
175 )
176 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 }
180 } else {
181 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 182 }
aee46736
JB
183 }
184
185 private checkValidWorkerChoiceStrategy (
186 workerChoiceStrategy: WorkerChoiceStrategy
187 ): void {
188 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 189 throw new Error(
aee46736 190 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
191 )
192 }
7c0ba920
JB
193 }
194
0d80593b
JB
195 private checkValidWorkerChoiceStrategyOptions (
196 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
197 ): void {
198 if (!isPlainObject(workerChoiceStrategyOptions)) {
199 throw new TypeError(
200 'Invalid worker choice strategy options: must be a plain object'
201 )
202 }
49be33fe
JB
203 if (
204 workerChoiceStrategyOptions.weights != null &&
6b27d407 205 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
206 ) {
207 throw new Error(
208 'Invalid worker choice strategy options: must have a weight for each worker node'
209 )
210 }
f0d7f803
JB
211 if (
212 workerChoiceStrategyOptions.measurement != null &&
213 !Object.values(Measurements).includes(
214 workerChoiceStrategyOptions.measurement
215 )
216 ) {
217 throw new Error(
218 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
219 )
220 }
0d80593b
JB
221 }
222
a20f0ba5
JB
223 private checkValidTasksQueueOptions (
224 tasksQueueOptions: TasksQueueOptions
225 ): void {
0d80593b
JB
226 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
227 throw new TypeError('Invalid tasks queue options: must be a plain object')
228 }
f0d7f803
JB
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 !Number.isSafeInteger(tasksQueueOptions.concurrency)
232 ) {
233 throw new TypeError(
234 'Invalid worker tasks concurrency: must be an integer'
235 )
236 }
237 if (
238 tasksQueueOptions?.concurrency != null &&
239 tasksQueueOptions.concurrency <= 0
240 ) {
a20f0ba5 241 throw new Error(
f0d7f803 242 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
243 )
244 }
245 }
246
08f3f44c 247 /** @inheritDoc */
6b27d407
JB
248 public get info (): PoolInfo {
249 return {
250 type: this.type,
184855e6 251 worker: this.worker,
6b27d407
JB
252 minSize: this.minSize,
253 maxSize: this.maxSize,
afe0d5bf 254 utilization: round(this.utilization),
6b27d407
JB
255 workerNodes: this.workerNodes.length,
256 idleWorkerNodes: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
465b2940 258 workerNode.usage.tasks.executing === 0
a4e07f72
JB
259 ? accumulator + 1
260 : accumulator,
6b27d407
JB
261 0
262 ),
263 busyWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
465b2940 265 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
266 0
267 ),
a4e07f72 268 executedTasks: this.workerNodes.reduce(
6b27d407 269 (accumulator, workerNode) =>
465b2940 270 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
271 0
272 ),
273 executingTasks: this.workerNodes.reduce(
274 (accumulator, workerNode) =>
465b2940 275 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
276 0
277 ),
278 queuedTasks: this.workerNodes.reduce(
df593701 279 (accumulator, workerNode) =>
465b2940 280 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
281 0
282 ),
283 maxQueuedTasks: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
465b2940 285 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 286 0
a4e07f72
JB
287 ),
288 failedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
465b2940 290 accumulator + workerNode.usage.tasks.failed,
a4e07f72 291 0
6b27d407
JB
292 )
293 }
294 }
08f3f44c 295
afe0d5bf
JB
296 /**
297 * Gets the pool run time.
298 *
299 * @returns The pool run time in milliseconds.
300 */
301 private get runTime (): number {
302 return performance.now() - this.startTimestamp
303 }
304
305 /**
306 * Gets the approximate pool utilization.
307 *
308 * @returns The pool utilization.
309 */
310 private get utilization (): number {
311 const poolRunTimeCapacity = this.runTime * this.maxSize
312 const totalTasksRunTime = this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.runTime.aggregate,
315 0
316 )
317 const totalTasksWaitTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.waitTime.aggregate,
320 0
321 )
322 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
323 }
324
8881ae32
JB
325 /**
326 * Pool type.
327 *
328 * If it is `'dynamic'`, it provides the `max` property.
329 */
330 protected abstract get type (): PoolType
331
184855e6
JB
332 /**
333 * Gets the worker type.
334 */
335 protected abstract get worker (): WorkerType
336
c2ade475 337 /**
6b27d407 338 * Pool minimum size.
c2ade475 339 */
6b27d407 340 protected abstract get minSize (): number
ff733df7
JB
341
342 /**
6b27d407 343 * Pool maximum size.
ff733df7 344 */
6b27d407 345 protected abstract get maxSize (): number
a35560ba 346
ffcbbad8 347 /**
f06e48d8 348 * Gets the given worker its worker node key.
ffcbbad8
JB
349 *
350 * @param worker - The worker.
465b2940 351 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 352 */
f06e48d8
JB
353 private getWorkerNodeKey (worker: Worker): number {
354 return this.workerNodes.findIndex(
355 workerNode => workerNode.worker === worker
356 )
bf9549ae
JB
357 }
358
afc003b2 359 /** @inheritDoc */
a35560ba 360 public setWorkerChoiceStrategy (
59219cbb
JB
361 workerChoiceStrategy: WorkerChoiceStrategy,
362 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 363 ): void {
aee46736 364 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 365 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
366 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
367 this.opts.workerChoiceStrategy
368 )
369 if (workerChoiceStrategyOptions != null) {
370 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
371 }
9c16fb4b 372 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
373 this.setWorkerNodeTasksUsage(
374 workerNode,
9c16fb4b 375 this.getWorkerUsage(workerNodeKey)
8604aaab 376 )
b6b32453 377 this.setWorkerStatistics(workerNode.worker)
59219cbb 378 }
a20f0ba5
JB
379 }
380
381 /** @inheritDoc */
382 public setWorkerChoiceStrategyOptions (
383 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
384 ): void {
0d80593b 385 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
386 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
387 this.workerChoiceStrategyContext.setOptions(
388 this.opts.workerChoiceStrategyOptions
a35560ba
S
389 )
390 }
391
a20f0ba5 392 /** @inheritDoc */
8f52842f
JB
393 public enableTasksQueue (
394 enable: boolean,
395 tasksQueueOptions?: TasksQueueOptions
396 ): void {
a20f0ba5 397 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 398 this.flushTasksQueues()
a20f0ba5
JB
399 }
400 this.opts.enableTasksQueue = enable
8f52842f 401 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
402 }
403
404 /** @inheritDoc */
8f52842f 405 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 406 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
407 this.checkValidTasksQueueOptions(tasksQueueOptions)
408 this.opts.tasksQueueOptions =
409 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 410 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
411 delete this.opts.tasksQueueOptions
412 }
413 }
414
415 private buildTasksQueueOptions (
416 tasksQueueOptions: TasksQueueOptions
417 ): TasksQueueOptions {
418 return {
419 concurrency: tasksQueueOptions?.concurrency ?? 1
420 }
421 }
422
c319c66b
JB
423 /**
424 * Whether the pool is full or not.
425 *
426 * The pool filling boolean status.
427 */
dea903a8
JB
428 protected get full (): boolean {
429 return this.workerNodes.length >= this.maxSize
430 }
c2ade475 431
c319c66b
JB
432 /**
433 * Whether the pool is busy or not.
434 *
435 * The pool busyness boolean status.
436 */
437 protected abstract get busy (): boolean
7c0ba920 438
6c6afb84
JB
439 /**
440 * Whether worker nodes are executing at least one task.
441 *
442 * @returns Worker nodes busyness boolean status.
443 */
c2ade475 444 protected internalBusy (): boolean {
e0ae6100
JB
445 return (
446 this.workerNodes.findIndex(workerNode => {
465b2940 447 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
448 }) === -1
449 )
cb70b19d
JB
450 }
451
afc003b2 452 /** @inheritDoc */
a86b6df1 453 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 454 const timestamp = performance.now()
20dcad1a 455 const workerNodeKey = this.chooseWorkerNode()
adc3c320 456 const submittedTask: Task<Data> = {
a86b6df1 457 name,
e5a5c0fc
JB
458 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
459 data: data ?? ({} as Data),
b6b32453 460 timestamp,
adc3c320
JB
461 id: crypto.randomUUID()
462 }
2e81254d 463 const res = new Promise<Response>((resolve, reject) => {
02706357 464 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
465 resolve,
466 reject,
20dcad1a 467 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
468 })
469 })
ff733df7
JB
470 if (
471 this.opts.enableTasksQueue === true &&
7171d33f 472 (this.busy ||
465b2940 473 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 474 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 475 .concurrency as number))
ff733df7 476 ) {
26a929d7
JB
477 this.enqueueTask(workerNodeKey, submittedTask)
478 } else {
2e81254d 479 this.executeTask(workerNodeKey, submittedTask)
adc3c320 480 }
ff733df7 481 this.checkAndEmitEvents()
78cea37e 482 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
483 return res
484 }
c97c7edb 485
afc003b2 486 /** @inheritDoc */
c97c7edb 487 public async destroy (): Promise<void> {
1fbcaa7c 488 await Promise.all(
875a7c37
JB
489 this.workerNodes.map(async (workerNode, workerNodeKey) => {
490 this.flushTasksQueue(workerNodeKey)
47aacbaa 491 // FIXME: wait for tasks to be finished
f06e48d8 492 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
493 })
494 )
c97c7edb
S
495 }
496
4a6952ff 497 /**
6c6afb84 498 * Terminates the given worker.
4a6952ff 499 *
f06e48d8 500 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
501 */
502 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 503
729c563d 504 /**
6677a3d3
JB
505 * Setup hook to execute code before worker nodes are created in the abstract constructor.
506 * Can be overridden.
afc003b2
JB
507 *
508 * @virtual
729c563d 509 */
280c2a77 510 protected setupHook (): void {
d99ba5a8 511 // Intentionally empty
280c2a77 512 }
c97c7edb 513
729c563d 514 /**
280c2a77
S
515 * Should return whether the worker is the main worker or not.
516 */
517 protected abstract isMain (): boolean
518
519 /**
2e81254d 520 * Hook executed before the worker task execution.
bf9549ae 521 * Can be overridden.
729c563d 522 *
f06e48d8 523 * @param workerNodeKey - The worker node key.
1c6fe997 524 * @param task - The task to execute.
729c563d 525 */
1c6fe997
JB
526 protected beforeTaskExecutionHook (
527 workerNodeKey: number,
528 task: Task<Data>
529 ): void {
465b2940 530 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
531 ++workerUsage.tasks.executing
532 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
533 }
534
c01733f1 535 /**
2e81254d 536 * Hook executed after the worker task execution.
bf9549ae 537 * Can be overridden.
c01733f1 538 *
c923ce56 539 * @param worker - The worker.
38e795c1 540 * @param message - The received message.
c01733f1 541 */
2e81254d 542 protected afterTaskExecutionHook (
c923ce56 543 worker: Worker,
2740a743 544 message: MessageValue<Response>
bf9549ae 545 ): void {
465b2940 546 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
547 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
548 this.updateRunTimeWorkerUsage(workerUsage, message)
549 this.updateEluWorkerUsage(workerUsage, message)
550 }
551
552 private updateTaskStatisticsWorkerUsage (
553 workerUsage: WorkerUsage,
554 message: MessageValue<Response>
555 ): void {
a4e07f72
JB
556 const workerTaskStatistics = workerUsage.tasks
557 --workerTaskStatistics.executing
558 ++workerTaskStatistics.executed
82f36766 559 if (message.taskError != null) {
a4e07f72 560 ++workerTaskStatistics.failed
2740a743 561 }
f8eb0a2a
JB
562 }
563
a4e07f72
JB
564 private updateRunTimeWorkerUsage (
565 workerUsage: WorkerUsage,
f8eb0a2a
JB
566 message: MessageValue<Response>
567 ): void {
87de9ff5
JB
568 if (
569 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 570 .aggregate
87de9ff5 571 ) {
932fc8be 572 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 573 if (
932fc8be
JB
574 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
575 .average &&
a4e07f72 576 workerUsage.tasks.executed !== 0
c6bd2650 577 ) {
a4e07f72 578 workerUsage.runTime.average =
f1c06930
JB
579 workerUsage.runTime.aggregate /
580 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 581 }
3fa4cdd2 582 if (
932fc8be
JB
583 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
584 .median &&
d715b7bc 585 message.taskPerformance?.runTime != null
3fa4cdd2 586 ) {
a4e07f72
JB
587 workerUsage.runTime.history.push(message.taskPerformance.runTime)
588 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 589 }
3032893a 590 }
f8eb0a2a
JB
591 }
592
a4e07f72
JB
593 private updateWaitTimeWorkerUsage (
594 workerUsage: WorkerUsage,
1c6fe997 595 task: Task<Data>
f8eb0a2a 596 ): void {
1c6fe997
JB
597 const timestamp = performance.now()
598 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
599 if (
600 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 601 .aggregate
87de9ff5 602 ) {
932fc8be 603 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 604 if (
87de9ff5 605 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 606 .waitTime.average &&
a4e07f72 607 workerUsage.tasks.executed !== 0
09a6305f 608 ) {
a4e07f72 609 workerUsage.waitTime.average =
f1c06930
JB
610 workerUsage.waitTime.aggregate /
611 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
612 }
613 if (
87de9ff5 614 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 615 .waitTime.median &&
1c6fe997 616 taskWaitTime != null
09a6305f 617 ) {
1c6fe997 618 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 619 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 620 }
0567595a 621 }
c01733f1 622 }
623
a4e07f72 624 private updateEluWorkerUsage (
5df69fab 625 workerUsage: WorkerUsage,
62c15a68
JB
626 message: MessageValue<Response>
627 ): void {
5df69fab
JB
628 if (
629 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
630 .aggregate
631 ) {
632 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
633 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
634 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
635 workerUsage.elu.utilization =
636 (workerUsage.elu.utilization +
637 message.taskPerformance.elu.utilization) /
638 2
639 } else if (message.taskPerformance?.elu != null) {
640 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
641 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
642 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
643 }
d715b7bc 644 if (
5df69fab
JB
645 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
646 .average &&
647 workerUsage.tasks.executed !== 0
648 ) {
f1c06930
JB
649 const executedTasks =
650 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 651 workerUsage.elu.idle.average =
f1c06930 652 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 653 workerUsage.elu.active.average =
f1c06930 654 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
655 }
656 if (
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
658 .median &&
d715b7bc
JB
659 message.taskPerformance?.elu != null
660 ) {
5df69fab
JB
661 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
662 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
663 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
664 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
665 }
666 }
667 }
668
280c2a77 669 /**
f06e48d8 670 * Chooses a worker node for the next task.
280c2a77 671 *
6c6afb84 672 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 673 *
20dcad1a 674 * @returns The worker node key
280c2a77 675 */
6c6afb84 676 private chooseWorkerNode (): number {
930dcf12 677 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
678 const worker = this.createAndSetupDynamicWorker()
679 if (
680 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
681 ) {
682 return this.getWorkerNodeKey(worker)
683 }
17393ac8 684 }
930dcf12
JB
685 return this.workerChoiceStrategyContext.execute()
686 }
687
6c6afb84
JB
688 /**
689 * Conditions for dynamic worker creation.
690 *
691 * @returns Whether to create a dynamic worker or not.
692 */
693 private shallCreateDynamicWorker (): boolean {
930dcf12 694 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
695 }
696
280c2a77 697 /**
675bb809 698 * Sends a message to the given worker.
280c2a77 699 *
38e795c1
JB
700 * @param worker - The worker which should receive the message.
701 * @param message - The message.
280c2a77
S
702 */
703 protected abstract sendToWorker (
704 worker: Worker,
705 message: MessageValue<Data>
706 ): void
707
4a6952ff 708 /**
f06e48d8 709 * Registers a listener callback on the given worker.
4a6952ff 710 *
38e795c1
JB
711 * @param worker - The worker which should register a listener.
712 * @param listener - The message listener callback.
4a6952ff 713 */
e102732c
JB
714 private registerWorkerMessageListener<Message extends Data | Response>(
715 worker: Worker,
716 listener: (message: MessageValue<Message>) => void
717 ): void {
718 worker.on('message', listener as MessageHandler<Worker>)
719 }
c97c7edb 720
729c563d 721 /**
41344292 722 * Creates a new worker.
6c6afb84
JB
723 *
724 * @returns Newly created worker.
729c563d 725 */
280c2a77 726 protected abstract createWorker (): Worker
c97c7edb 727
729c563d 728 /**
f06e48d8 729 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 730 * Can be overridden.
729c563d 731 *
38e795c1 732 * @param worker - The newly created worker.
729c563d 733 */
6677a3d3 734 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
735 // Listen to worker messages.
736 this.registerWorkerMessageListener(worker, this.workerListener())
737 }
c97c7edb 738
4a6952ff 739 /**
f06e48d8 740 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
741 *
742 * @returns New, completely set up worker.
743 */
744 protected createAndSetupWorker (): Worker {
bdacc2d2 745 const worker = this.createWorker()
280c2a77 746
35cf1c03 747 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 748 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
749 worker.on('error', error => {
750 if (this.emitter != null) {
751 this.emitter.emit(PoolEvents.error, error)
752 }
5baee0d7 753 if (this.opts.restartWorkerOnError === true) {
1f68cede 754 this.createAndSetupWorker()
5baee0d7
JB
755 }
756 })
a35560ba
S
757 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
758 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 759 worker.once('exit', () => {
f06e48d8 760 this.removeWorkerNode(worker)
a974afa6 761 })
280c2a77 762
f06e48d8 763 this.pushWorkerNode(worker)
280c2a77 764
b6b32453
JB
765 this.setWorkerStatistics(worker)
766
280c2a77
S
767 this.afterWorkerSetup(worker)
768
c97c7edb
S
769 return worker
770 }
be0676b3 771
930dcf12
JB
772 /**
773 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
774 *
775 * @returns New, completely set up dynamic worker.
776 */
777 protected createAndSetupDynamicWorker (): Worker {
778 const worker = this.createAndSetupWorker()
779 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 780 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
781 if (
782 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
783 (message.kill != null &&
784 ((this.opts.enableTasksQueue === false &&
465b2940 785 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 786 (this.opts.enableTasksQueue === true &&
465b2940 787 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 788 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
789 ) {
790 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
791 void (this.destroyWorker(worker) as Promise<void>)
792 }
793 })
794 return worker
795 }
796
be0676b3 797 /**
ff733df7 798 * This function is the listener registered for each worker message.
be0676b3 799 *
bdacc2d2 800 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
801 */
802 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 803 return message => {
b1989cfd 804 if (message.id != null) {
a3445496 805 // Task execution response received
2740a743 806 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 807 if (promiseResponse != null) {
82f36766 808 if (message.taskError != null) {
91ee39ed 809 if (this.emitter != null) {
82f36766 810 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 811 }
cd9580e7 812 promiseResponse.reject(message.taskError.message)
a05c10de 813 } else {
2740a743 814 promiseResponse.resolve(message.data as Response)
a05c10de 815 }
2e81254d 816 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 817 this.promiseResponseMap.delete(message.id)
ff733df7
JB
818 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
819 if (
820 this.opts.enableTasksQueue === true &&
416fd65c 821 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 822 ) {
2e81254d
JB
823 this.executeTask(
824 workerNodeKey,
ff733df7
JB
825 this.dequeueTask(workerNodeKey) as Task<Data>
826 )
827 }
e5536a06 828 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
829 }
830 }
831 }
be0676b3 832 }
7c0ba920 833
ff733df7 834 private checkAndEmitEvents (): void {
1f68cede 835 if (this.emitter != null) {
ff733df7 836 if (this.busy) {
6b27d407 837 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 838 }
6b27d407
JB
839 if (this.type === PoolTypes.dynamic && this.full) {
840 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 841 }
164d950a
JB
842 }
843 }
844
0ebe2a9f
JB
845 /**
846 * Sets the given worker node its tasks usage in the pool.
847 *
848 * @param workerNode - The worker node.
a4e07f72 849 * @param workerUsage - The worker usage.
0ebe2a9f
JB
850 */
851 private setWorkerNodeTasksUsage (
852 workerNode: WorkerNode<Worker, Data>,
a4e07f72 853 workerUsage: WorkerUsage
0ebe2a9f 854 ): void {
465b2940 855 workerNode.usage = workerUsage
0ebe2a9f
JB
856 }
857
a05c10de 858 /**
f06e48d8 859 * Pushes the given worker in the pool worker nodes.
ea7a90d3 860 *
38e795c1 861 * @param worker - The worker.
f06e48d8 862 * @returns The worker nodes length.
ea7a90d3 863 */
f06e48d8 864 private pushWorkerNode (worker: Worker): number {
9c16fb4b 865 this.workerNodes.push({
ffcbbad8 866 worker,
465b2940 867 usage: this.getWorkerUsage(),
29ee7e9a 868 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 869 })
9c16fb4b
JB
870 const workerNodeKey = this.getWorkerNodeKey(worker)
871 this.setWorkerNodeTasksUsage(
872 this.workerNodes[workerNodeKey],
873 this.getWorkerUsage(workerNodeKey)
874 )
875 return this.workerNodes.length
ea7a90d3 876 }
c923ce56 877
8604aaab
JB
878 // /**
879 // * Sets the given worker in the pool worker nodes.
880 // *
881 // * @param workerNodeKey - The worker node key.
882 // * @param worker - The worker.
883 // * @param workerUsage - The worker usage.
884 // * @param tasksQueue - The worker task queue.
885 // */
886 // private setWorkerNode (
887 // workerNodeKey: number,
888 // worker: Worker,
889 // workerUsage: WorkerUsage,
890 // tasksQueue: Queue<Task<Data>>
891 // ): void {
892 // this.workerNodes[workerNodeKey] = {
893 // worker,
465b2940 894 // usage: workerUsage,
8604aaab
JB
895 // tasksQueue
896 // }
897 // }
51fe3d3c
JB
898
899 /**
f06e48d8 900 * Removes the given worker from the pool worker nodes.
51fe3d3c 901 *
f06e48d8 902 * @param worker - The worker.
51fe3d3c 903 */
416fd65c 904 private removeWorkerNode (worker: Worker): void {
f06e48d8 905 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
906 if (workerNodeKey !== -1) {
907 this.workerNodes.splice(workerNodeKey, 1)
908 this.workerChoiceStrategyContext.remove(workerNodeKey)
909 }
51fe3d3c 910 }
adc3c320 911
2e81254d 912 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 913 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
914 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
915 }
916
f9f00b5f 917 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 918 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
919 }
920
416fd65c 921 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 922 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
923 }
924
416fd65c 925 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 926 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 927 }
ff733df7 928
df593701
JB
929 private tasksMaxQueueSize (workerNodeKey: number): number {
930 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
931 }
932
416fd65c
JB
933 private flushTasksQueue (workerNodeKey: number): void {
934 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
935 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
936 this.executeTask(
937 workerNodeKey,
938 this.dequeueTask(workerNodeKey) as Task<Data>
939 )
ff733df7 940 }
ff733df7 941 }
df593701 942 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
943 }
944
ef41a6e6
JB
945 private flushTasksQueues (): void {
946 for (const [workerNodeKey] of this.workerNodes.entries()) {
947 this.flushTasksQueue(workerNodeKey)
948 }
949 }
b6b32453
JB
950
951 private setWorkerStatistics (worker: Worker): void {
952 this.sendToWorker(worker, {
953 statistics: {
87de9ff5
JB
954 runTime:
955 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 956 .runTime.aggregate,
87de9ff5 957 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 958 .elu.aggregate
b6b32453
JB
959 }
960 })
961 }
8604aaab 962
9c16fb4b 963 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
964 const getTasksQueueSize = (workerNodeKey?: number): number => {
965 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 966 }
df593701
JB
967 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
968 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
969 }
8604aaab 970 return {
9c16fb4b
JB
971 tasks: {
972 executed: 0,
973 executing: 0,
974 get queued (): number {
e3347a5c 975 return getTasksQueueSize(workerNodeKey)
9c16fb4b 976 },
df593701
JB
977 get maxQueued (): number {
978 return getTasksMaxQueueSize(workerNodeKey)
979 },
9c16fb4b
JB
980 failed: 0
981 },
8604aaab 982 runTime: {
932fc8be 983 aggregate: 0,
8604aaab
JB
984 average: 0,
985 median: 0,
986 history: new CircularArray()
987 },
988 waitTime: {
932fc8be 989 aggregate: 0,
8604aaab
JB
990 average: 0,
991 median: 0,
992 history: new CircularArray()
993 },
5df69fab
JB
994 elu: {
995 idle: {
996 aggregate: 0,
997 average: 0,
998 median: 0,
999 history: new CircularArray()
1000 },
1001 active: {
1002 aggregate: 0,
1003 average: 0,
1004 median: 0,
1005 history: new CircularArray()
1006 },
1007 utilization: 0
1008 }
8604aaab
JB
1009 }
1010 }
c97c7edb 1011}