chore: generate documentation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
f06e48d8 55 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
7c0ba920 160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
1f68cede 171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 184 }
aee46736
JB
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 191 throw new Error(
aee46736 192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
193 )
194 }
7c0ba920
JB
195 }
196
0d80593b
JB
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
49be33fe
JB
205 if (
206 workerChoiceStrategyOptions.weights != null &&
6b27d407 207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
f0d7f803
JB
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
0d80593b
JB
223 }
224
a20f0ba5
JB
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
0d80593b
JB
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
f0d7f803
JB
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
a20f0ba5 243 throw new Error(
f0d7f803 244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
245 )
246 }
247 }
248
08f3f44c 249 /** @inheritDoc */
6b27d407
JB
250 public get info (): PoolInfo {
251 return {
252 type: this.type,
184855e6 253 worker: this.worker,
6b27d407
JB
254 minSize: this.minSize,
255 maxSize: this.maxSize,
afe0d5bf 256 utilization: round(this.utilization),
6b27d407
JB
257 workerNodes: this.workerNodes.length,
258 idleWorkerNodes: this.workerNodes.reduce(
259 (accumulator, workerNode) =>
f59e1027 260 workerNode.usage.tasks.executing === 0
a4e07f72
JB
261 ? accumulator + 1
262 : accumulator,
6b27d407
JB
263 0
264 ),
265 busyWorkerNodes: this.workerNodes.reduce(
266 (accumulator, workerNode) =>
f59e1027 267 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
268 0
269 ),
a4e07f72 270 executedTasks: this.workerNodes.reduce(
6b27d407 271 (accumulator, workerNode) =>
f59e1027 272 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
273 0
274 ),
275 executingTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
f59e1027 277 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
278 0
279 ),
280 queuedTasks: this.workerNodes.reduce(
df593701 281 (accumulator, workerNode) =>
f59e1027 282 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
283 0
284 ),
285 maxQueuedTasks: this.workerNodes.reduce(
286 (accumulator, workerNode) =>
f59e1027 287 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 288 0
a4e07f72
JB
289 ),
290 failedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
f59e1027 292 accumulator + workerNode.usage.tasks.failed,
a4e07f72 293 0
6b27d407
JB
294 )
295 }
296 }
08f3f44c 297
afe0d5bf
JB
298 /**
299 * Gets the pool run time.
300 *
301 * @returns The pool run time in milliseconds.
302 */
303 private get runTime (): number {
304 return performance.now() - this.startTimestamp
305 }
306
307 /**
308 * Gets the approximate pool utilization.
309 *
310 * @returns The pool utilization.
311 */
312 private get utilization (): number {
313 const poolRunTimeCapacity = this.runTime * this.maxSize
314 const totalTasksRunTime = this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.runTime.aggregate,
317 0
318 )
319 const totalTasksWaitTime = this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.waitTime.aggregate,
322 0
323 )
324 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
325 }
326
8881ae32
JB
327 /**
328 * Pool type.
329 *
330 * If it is `'dynamic'`, it provides the `max` property.
331 */
332 protected abstract get type (): PoolType
333
184855e6
JB
334 /**
335 * Gets the worker type.
336 */
337 protected abstract get worker (): WorkerType
338
c2ade475 339 /**
6b27d407 340 * Pool minimum size.
c2ade475 341 */
6b27d407 342 protected abstract get minSize (): number
ff733df7
JB
343
344 /**
6b27d407 345 * Pool maximum size.
ff733df7 346 */
6b27d407 347 protected abstract get maxSize (): number
a35560ba 348
f59e1027
JB
349 /**
350 * Get the worker given its id.
351 *
352 * @param workerId - The worker id.
353 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
354 */
355 private getWorkerById (workerId: number): Worker | undefined {
356 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
357 ?.worker
358 }
359
ffcbbad8 360 /**
f06e48d8 361 * Gets the given worker its worker node key.
ffcbbad8
JB
362 *
363 * @param worker - The worker.
f59e1027 364 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 365 */
f06e48d8
JB
366 private getWorkerNodeKey (worker: Worker): number {
367 return this.workerNodes.findIndex(
368 workerNode => workerNode.worker === worker
369 )
bf9549ae
JB
370 }
371
afc003b2 372 /** @inheritDoc */
a35560ba 373 public setWorkerChoiceStrategy (
59219cbb
JB
374 workerChoiceStrategy: WorkerChoiceStrategy,
375 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 376 ): void {
aee46736 377 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 378 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
379 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
380 this.opts.workerChoiceStrategy
381 )
382 if (workerChoiceStrategyOptions != null) {
383 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
384 }
9c16fb4b 385 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
386 this.setWorkerNodeTasksUsage(
387 workerNode,
9c16fb4b 388 this.getWorkerUsage(workerNodeKey)
8604aaab 389 )
b6b32453 390 this.setWorkerStatistics(workerNode.worker)
59219cbb 391 }
a20f0ba5
JB
392 }
393
394 /** @inheritDoc */
395 public setWorkerChoiceStrategyOptions (
396 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
397 ): void {
0d80593b 398 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
399 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
400 this.workerChoiceStrategyContext.setOptions(
401 this.opts.workerChoiceStrategyOptions
a35560ba
S
402 )
403 }
404
a20f0ba5 405 /** @inheritDoc */
8f52842f
JB
406 public enableTasksQueue (
407 enable: boolean,
408 tasksQueueOptions?: TasksQueueOptions
409 ): void {
a20f0ba5 410 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 411 this.flushTasksQueues()
a20f0ba5
JB
412 }
413 this.opts.enableTasksQueue = enable
8f52842f 414 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
415 }
416
417 /** @inheritDoc */
8f52842f 418 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 419 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
420 this.checkValidTasksQueueOptions(tasksQueueOptions)
421 this.opts.tasksQueueOptions =
422 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 423 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
424 delete this.opts.tasksQueueOptions
425 }
426 }
427
428 private buildTasksQueueOptions (
429 tasksQueueOptions: TasksQueueOptions
430 ): TasksQueueOptions {
431 return {
432 concurrency: tasksQueueOptions?.concurrency ?? 1
433 }
434 }
435
c319c66b
JB
436 /**
437 * Whether the pool is full or not.
438 *
439 * The pool filling boolean status.
440 */
dea903a8
JB
441 protected get full (): boolean {
442 return this.workerNodes.length >= this.maxSize
443 }
c2ade475 444
c319c66b
JB
445 /**
446 * Whether the pool is busy or not.
447 *
448 * The pool busyness boolean status.
449 */
450 protected abstract get busy (): boolean
7c0ba920 451
6c6afb84
JB
452 /**
453 * Whether worker nodes are executing at least one task.
454 *
455 * @returns Worker nodes busyness boolean status.
456 */
c2ade475 457 protected internalBusy (): boolean {
e0ae6100
JB
458 return (
459 this.workerNodes.findIndex(workerNode => {
f59e1027 460 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
461 }) === -1
462 )
cb70b19d
JB
463 }
464
afc003b2 465 /** @inheritDoc */
a86b6df1 466 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 467 const timestamp = performance.now()
20dcad1a 468 const workerNodeKey = this.chooseWorkerNode()
adc3c320 469 const submittedTask: Task<Data> = {
a86b6df1 470 name,
e5a5c0fc
JB
471 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
472 data: data ?? ({} as Data),
b6b32453 473 timestamp,
adc3c320
JB
474 id: crypto.randomUUID()
475 }
2e81254d 476 const res = new Promise<Response>((resolve, reject) => {
02706357 477 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
478 resolve,
479 reject,
20dcad1a 480 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
481 })
482 })
ff733df7
JB
483 if (
484 this.opts.enableTasksQueue === true &&
7171d33f 485 (this.busy ||
f59e1027 486 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 487 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 488 .concurrency as number))
ff733df7 489 ) {
26a929d7
JB
490 this.enqueueTask(workerNodeKey, submittedTask)
491 } else {
2e81254d 492 this.executeTask(workerNodeKey, submittedTask)
adc3c320 493 }
ff733df7 494 this.checkAndEmitEvents()
78cea37e 495 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
496 return res
497 }
c97c7edb 498
afc003b2 499 /** @inheritDoc */
c97c7edb 500 public async destroy (): Promise<void> {
1fbcaa7c 501 await Promise.all(
875a7c37
JB
502 this.workerNodes.map(async (workerNode, workerNodeKey) => {
503 this.flushTasksQueue(workerNodeKey)
47aacbaa 504 // FIXME: wait for tasks to be finished
f06e48d8 505 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
506 })
507 )
c97c7edb
S
508 }
509
4a6952ff 510 /**
6c6afb84 511 * Terminates the given worker.
4a6952ff 512 *
f06e48d8 513 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
514 */
515 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 516
729c563d 517 /**
6677a3d3
JB
518 * Setup hook to execute code before worker nodes are created in the abstract constructor.
519 * Can be overridden.
afc003b2
JB
520 *
521 * @virtual
729c563d 522 */
280c2a77 523 protected setupHook (): void {
d99ba5a8 524 // Intentionally empty
280c2a77 525 }
c97c7edb 526
729c563d 527 /**
280c2a77
S
528 * Should return whether the worker is the main worker or not.
529 */
530 protected abstract isMain (): boolean
531
532 /**
2e81254d 533 * Hook executed before the worker task execution.
bf9549ae 534 * Can be overridden.
729c563d 535 *
f06e48d8 536 * @param workerNodeKey - The worker node key.
1c6fe997 537 * @param task - The task to execute.
729c563d 538 */
1c6fe997
JB
539 protected beforeTaskExecutionHook (
540 workerNodeKey: number,
541 task: Task<Data>
542 ): void {
f59e1027 543 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
544 ++workerUsage.tasks.executing
545 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
546 }
547
c01733f1 548 /**
2e81254d 549 * Hook executed after the worker task execution.
bf9549ae 550 * Can be overridden.
c01733f1 551 *
c923ce56 552 * @param worker - The worker.
38e795c1 553 * @param message - The received message.
c01733f1 554 */
2e81254d 555 protected afterTaskExecutionHook (
c923ce56 556 worker: Worker,
2740a743 557 message: MessageValue<Response>
bf9549ae 558 ): void {
f59e1027 559 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
560 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
561 this.updateRunTimeWorkerUsage(workerUsage, message)
562 this.updateEluWorkerUsage(workerUsage, message)
563 }
564
565 private updateTaskStatisticsWorkerUsage (
566 workerUsage: WorkerUsage,
567 message: MessageValue<Response>
568 ): void {
a4e07f72
JB
569 const workerTaskStatistics = workerUsage.tasks
570 --workerTaskStatistics.executing
571 ++workerTaskStatistics.executed
82f36766 572 if (message.taskError != null) {
a4e07f72 573 ++workerTaskStatistics.failed
2740a743 574 }
f8eb0a2a
JB
575 }
576
a4e07f72
JB
577 private updateRunTimeWorkerUsage (
578 workerUsage: WorkerUsage,
f8eb0a2a
JB
579 message: MessageValue<Response>
580 ): void {
87de9ff5
JB
581 if (
582 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 583 .aggregate
87de9ff5 584 ) {
932fc8be 585 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 586 if (
932fc8be
JB
587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
588 .average &&
a4e07f72 589 workerUsage.tasks.executed !== 0
c6bd2650 590 ) {
a4e07f72 591 workerUsage.runTime.average =
f1c06930
JB
592 workerUsage.runTime.aggregate /
593 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 594 }
3fa4cdd2 595 if (
932fc8be
JB
596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
597 .median &&
d715b7bc 598 message.taskPerformance?.runTime != null
3fa4cdd2 599 ) {
a4e07f72
JB
600 workerUsage.runTime.history.push(message.taskPerformance.runTime)
601 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 602 }
3032893a 603 }
f8eb0a2a
JB
604 }
605
a4e07f72
JB
606 private updateWaitTimeWorkerUsage (
607 workerUsage: WorkerUsage,
1c6fe997 608 task: Task<Data>
f8eb0a2a 609 ): void {
1c6fe997
JB
610 const timestamp = performance.now()
611 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
612 if (
613 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 614 .aggregate
87de9ff5 615 ) {
932fc8be 616 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 617 if (
87de9ff5 618 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 619 .waitTime.average &&
a4e07f72 620 workerUsage.tasks.executed !== 0
09a6305f 621 ) {
a4e07f72 622 workerUsage.waitTime.average =
f1c06930
JB
623 workerUsage.waitTime.aggregate /
624 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
625 }
626 if (
87de9ff5 627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 628 .waitTime.median &&
1c6fe997 629 taskWaitTime != null
09a6305f 630 ) {
1c6fe997 631 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 632 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 633 }
0567595a 634 }
c01733f1 635 }
636
a4e07f72 637 private updateEluWorkerUsage (
5df69fab 638 workerUsage: WorkerUsage,
62c15a68
JB
639 message: MessageValue<Response>
640 ): void {
5df69fab
JB
641 if (
642 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
643 .aggregate
644 ) {
645 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
646 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
647 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
648 workerUsage.elu.utilization =
649 (workerUsage.elu.utilization +
650 message.taskPerformance.elu.utilization) /
651 2
652 } else if (message.taskPerformance?.elu != null) {
653 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
654 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
655 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
656 }
d715b7bc 657 if (
5df69fab
JB
658 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
659 .average &&
660 workerUsage.tasks.executed !== 0
661 ) {
f1c06930
JB
662 const executedTasks =
663 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 664 workerUsage.elu.idle.average =
f1c06930 665 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 666 workerUsage.elu.active.average =
f1c06930 667 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
668 }
669 if (
670 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
671 .median &&
d715b7bc
JB
672 message.taskPerformance?.elu != null
673 ) {
5df69fab
JB
674 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
675 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
676 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
677 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
678 }
679 }
680 }
681
280c2a77 682 /**
f06e48d8 683 * Chooses a worker node for the next task.
280c2a77 684 *
6c6afb84 685 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 686 *
20dcad1a 687 * @returns The worker node key
280c2a77 688 */
6c6afb84 689 private chooseWorkerNode (): number {
930dcf12 690 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
691 const worker = this.createAndSetupDynamicWorker()
692 if (
693 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
694 ) {
695 return this.getWorkerNodeKey(worker)
696 }
17393ac8 697 }
930dcf12
JB
698 return this.workerChoiceStrategyContext.execute()
699 }
700
6c6afb84
JB
701 /**
702 * Conditions for dynamic worker creation.
703 *
704 * @returns Whether to create a dynamic worker or not.
705 */
706 private shallCreateDynamicWorker (): boolean {
930dcf12 707 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
708 }
709
280c2a77 710 /**
675bb809 711 * Sends a message to the given worker.
280c2a77 712 *
38e795c1
JB
713 * @param worker - The worker which should receive the message.
714 * @param message - The message.
280c2a77
S
715 */
716 protected abstract sendToWorker (
717 worker: Worker,
718 message: MessageValue<Data>
719 ): void
720
4a6952ff 721 /**
f06e48d8 722 * Registers a listener callback on the given worker.
4a6952ff 723 *
38e795c1
JB
724 * @param worker - The worker which should register a listener.
725 * @param listener - The message listener callback.
4a6952ff 726 */
e102732c
JB
727 private registerWorkerMessageListener<Message extends Data | Response>(
728 worker: Worker,
729 listener: (message: MessageValue<Message>) => void
730 ): void {
731 worker.on('message', listener as MessageHandler<Worker>)
732 }
c97c7edb 733
729c563d 734 /**
41344292 735 * Creates a new worker.
6c6afb84
JB
736 *
737 * @returns Newly created worker.
729c563d 738 */
280c2a77 739 protected abstract createWorker (): Worker
c97c7edb 740
729c563d 741 /**
f06e48d8 742 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 743 * Can be overridden.
729c563d 744 *
38e795c1 745 * @param worker - The newly created worker.
729c563d 746 */
6677a3d3 747 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
748 // Listen to worker messages.
749 this.registerWorkerMessageListener(worker, this.workerListener())
750 }
c97c7edb 751
4a6952ff 752 /**
f06e48d8 753 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
754 *
755 * @returns New, completely set up worker.
756 */
757 protected createAndSetupWorker (): Worker {
bdacc2d2 758 const worker = this.createWorker()
280c2a77 759
35cf1c03 760 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 761 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
762 worker.on('error', error => {
763 if (this.emitter != null) {
764 this.emitter.emit(PoolEvents.error, error)
765 }
e8a4c3ea 766 if (this.opts.restartWorkerOnError === true) {
1f68cede 767 this.createAndSetupWorker()
5baee0d7
JB
768 }
769 })
a35560ba
S
770 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
771 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 772 worker.once('exit', () => {
f06e48d8 773 this.removeWorkerNode(worker)
a974afa6 774 })
280c2a77 775
f06e48d8 776 this.pushWorkerNode(worker)
280c2a77 777
b6b32453
JB
778 this.setWorkerStatistics(worker)
779
280c2a77
S
780 this.afterWorkerSetup(worker)
781
c97c7edb
S
782 return worker
783 }
be0676b3 784
930dcf12
JB
785 /**
786 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
787 *
788 * @returns New, completely set up dynamic worker.
789 */
790 protected createAndSetupDynamicWorker (): Worker {
791 const worker = this.createAndSetupWorker()
792 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 793 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
794 if (
795 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
796 (message.kill != null &&
797 ((this.opts.enableTasksQueue === false &&
f59e1027 798 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 799 (this.opts.enableTasksQueue === true &&
f59e1027 800 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 801 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
802 ) {
803 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
804 void (this.destroyWorker(worker) as Promise<void>)
805 }
806 })
807 return worker
808 }
809
be0676b3 810 /**
ff733df7 811 * This function is the listener registered for each worker message.
be0676b3 812 *
bdacc2d2 813 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
814 */
815 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 816 return message => {
f59e1027
JB
817 if (message.workerId != null && message.started != null) {
818 // Worker started message received
6b272951 819 this.handleWorkerStartedMessage(message)
f59e1027 820 } else if (message.id != null) {
a3445496 821 // Task execution response received
6b272951
JB
822 this.handleTaskExecutionResponse(message)
823 }
824 }
825 }
826
827 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
828 // Worker started message received
829 const worker = this.getWorkerById(message.workerId as number)
830 if (worker != null) {
831 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
832 message.started as boolean
833 } else {
834 throw new Error(
835 `Worker started message received from unknown worker '${
836 message.workerId as number
837 }'`
838 )
839 }
840 }
841
842 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
843 const promiseResponse = this.promiseResponseMap.get(message.id as string)
844 if (promiseResponse != null) {
845 if (message.taskError != null) {
846 if (this.emitter != null) {
847 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 848 }
6b272951
JB
849 promiseResponse.reject(message.taskError.message)
850 } else {
851 promiseResponse.resolve(message.data as Response)
852 }
853 this.afterTaskExecutionHook(promiseResponse.worker, message)
854 this.promiseResponseMap.delete(message.id as string)
855 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
856 if (
857 this.opts.enableTasksQueue === true &&
858 this.tasksQueueSize(workerNodeKey) > 0
859 ) {
860 this.executeTask(
861 workerNodeKey,
862 this.dequeueTask(workerNodeKey) as Task<Data>
863 )
be0676b3 864 }
6b272951 865 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 866 }
be0676b3 867 }
7c0ba920 868
ff733df7 869 private checkAndEmitEvents (): void {
1f68cede 870 if (this.emitter != null) {
ff733df7 871 if (this.busy) {
6b27d407 872 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 873 }
6b27d407
JB
874 if (this.type === PoolTypes.dynamic && this.full) {
875 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 876 }
164d950a
JB
877 }
878 }
879
0ebe2a9f
JB
880 /**
881 * Sets the given worker node its tasks usage in the pool.
882 *
883 * @param workerNode - The worker node.
a4e07f72 884 * @param workerUsage - The worker usage.
0ebe2a9f
JB
885 */
886 private setWorkerNodeTasksUsage (
887 workerNode: WorkerNode<Worker, Data>,
a4e07f72 888 workerUsage: WorkerUsage
0ebe2a9f 889 ): void {
f59e1027 890 workerNode.usage = workerUsage
0ebe2a9f
JB
891 }
892
a05c10de 893 /**
f06e48d8 894 * Pushes the given worker in the pool worker nodes.
ea7a90d3 895 *
38e795c1 896 * @param worker - The worker.
f06e48d8 897 * @returns The worker nodes length.
ea7a90d3 898 */
f06e48d8 899 private pushWorkerNode (worker: Worker): number {
9c16fb4b 900 this.workerNodes.push({
ffcbbad8 901 worker,
7ae6fb74 902 info: { id: this.getWorkerId(worker), started: true },
f59e1027 903 usage: this.getWorkerUsage(),
29ee7e9a 904 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 905 })
9c16fb4b
JB
906 const workerNodeKey = this.getWorkerNodeKey(worker)
907 this.setWorkerNodeTasksUsage(
908 this.workerNodes[workerNodeKey],
909 this.getWorkerUsage(workerNodeKey)
910 )
911 return this.workerNodes.length
ea7a90d3 912 }
c923ce56 913
83fa0a36
JB
914 /**
915 * Gets the worker id.
916 *
917 * @param worker - The worker.
918 * @returns The worker id.
919 */
920 private getWorkerId (worker: Worker): number | undefined {
921 if (this.worker === WorkerTypes.thread) {
922 return worker.threadId
923 } else if (this.worker === WorkerTypes.cluster) {
924 return worker.id
925 }
926 }
927
8604aaab
JB
928 // /**
929 // * Sets the given worker in the pool worker nodes.
930 // *
931 // * @param workerNodeKey - The worker node key.
932 // * @param worker - The worker.
f59e1027 933 // * @param workerInfo - The worker info.
8604aaab
JB
934 // * @param workerUsage - The worker usage.
935 // * @param tasksQueue - The worker task queue.
936 // */
937 // private setWorkerNode (
938 // workerNodeKey: number,
939 // worker: Worker,
f59e1027 940 // workerInfo: WorkerInfo,
8604aaab
JB
941 // workerUsage: WorkerUsage,
942 // tasksQueue: Queue<Task<Data>>
943 // ): void {
944 // this.workerNodes[workerNodeKey] = {
945 // worker,
f59e1027
JB
946 // info: workerInfo,
947 // usage: workerUsage,
8604aaab
JB
948 // tasksQueue
949 // }
950 // }
51fe3d3c
JB
951
952 /**
f06e48d8 953 * Removes the given worker from the pool worker nodes.
51fe3d3c 954 *
f06e48d8 955 * @param worker - The worker.
51fe3d3c 956 */
416fd65c 957 private removeWorkerNode (worker: Worker): void {
f06e48d8 958 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
959 if (workerNodeKey !== -1) {
960 this.workerNodes.splice(workerNodeKey, 1)
961 this.workerChoiceStrategyContext.remove(workerNodeKey)
962 }
51fe3d3c 963 }
adc3c320 964
2e81254d 965 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 966 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
967 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
968 }
969
f9f00b5f 970 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 971 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
972 }
973
416fd65c 974 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 975 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
976 }
977
416fd65c 978 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 979 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 980 }
ff733df7 981
df593701
JB
982 private tasksMaxQueueSize (workerNodeKey: number): number {
983 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
984 }
985
416fd65c
JB
986 private flushTasksQueue (workerNodeKey: number): void {
987 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
988 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
989 this.executeTask(
990 workerNodeKey,
991 this.dequeueTask(workerNodeKey) as Task<Data>
992 )
ff733df7 993 }
ff733df7 994 }
df593701 995 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
996 }
997
ef41a6e6
JB
998 private flushTasksQueues (): void {
999 for (const [workerNodeKey] of this.workerNodes.entries()) {
1000 this.flushTasksQueue(workerNodeKey)
1001 }
1002 }
b6b32453
JB
1003
1004 private setWorkerStatistics (worker: Worker): void {
1005 this.sendToWorker(worker, {
1006 statistics: {
87de9ff5
JB
1007 runTime:
1008 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1009 .runTime.aggregate,
87de9ff5 1010 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1011 .elu.aggregate
b6b32453
JB
1012 }
1013 })
1014 }
8604aaab 1015
9c16fb4b 1016 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1017 const getTasksQueueSize = (workerNodeKey?: number): number => {
1018 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1019 }
df593701
JB
1020 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1021 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1022 }
8604aaab 1023 return {
9c16fb4b
JB
1024 tasks: {
1025 executed: 0,
1026 executing: 0,
1027 get queued (): number {
e3347a5c 1028 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1029 },
df593701
JB
1030 get maxQueued (): number {
1031 return getTasksMaxQueueSize(workerNodeKey)
1032 },
9c16fb4b
JB
1033 failed: 0
1034 },
8604aaab 1035 runTime: {
932fc8be 1036 aggregate: 0,
8604aaab
JB
1037 average: 0,
1038 median: 0,
1039 history: new CircularArray()
1040 },
1041 waitTime: {
932fc8be 1042 aggregate: 0,
8604aaab
JB
1043 average: 0,
1044 median: 0,
1045 history: new CircularArray()
1046 },
5df69fab
JB
1047 elu: {
1048 idle: {
1049 aggregate: 0,
1050 average: 0,
1051 median: 0,
1052 history: new CircularArray()
1053 },
1054 active: {
1055 aggregate: 0,
1056 average: 0,
1057 median: 0,
1058 history: new CircularArray()
1059 },
1060 utilization: 0
1061 }
8604aaab
JB
1062 }
1063 }
c97c7edb 1064}