refactor: cleanup exports
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
f06e48d8 55 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
7c0ba920 160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
1f68cede 171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 184 }
aee46736
JB
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 191 throw new Error(
aee46736 192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
193 )
194 }
7c0ba920
JB
195 }
196
0d80593b
JB
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
49be33fe
JB
205 if (
206 workerChoiceStrategyOptions.weights != null &&
6b27d407 207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
f0d7f803
JB
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
0d80593b
JB
223 }
224
a20f0ba5
JB
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
0d80593b
JB
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
f0d7f803
JB
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
a20f0ba5 243 throw new Error(
f0d7f803 244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
245 )
246 }
247 }
248
08f3f44c 249 /** @inheritDoc */
6b27d407
JB
250 public get info (): PoolInfo {
251 return {
252 type: this.type,
184855e6 253 worker: this.worker,
6b27d407
JB
254 minSize: this.minSize,
255 maxSize: this.maxSize,
afe0d5bf 256 utilization: round(this.utilization),
6b27d407
JB
257 workerNodes: this.workerNodes.length,
258 idleWorkerNodes: this.workerNodes.reduce(
259 (accumulator, workerNode) =>
f59e1027 260 workerNode.usage.tasks.executing === 0
a4e07f72
JB
261 ? accumulator + 1
262 : accumulator,
6b27d407
JB
263 0
264 ),
265 busyWorkerNodes: this.workerNodes.reduce(
266 (accumulator, workerNode) =>
f59e1027 267 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
268 0
269 ),
a4e07f72 270 executedTasks: this.workerNodes.reduce(
6b27d407 271 (accumulator, workerNode) =>
f59e1027 272 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
273 0
274 ),
275 executingTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
f59e1027 277 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
278 0
279 ),
280 queuedTasks: this.workerNodes.reduce(
df593701 281 (accumulator, workerNode) =>
f59e1027 282 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
283 0
284 ),
285 maxQueuedTasks: this.workerNodes.reduce(
286 (accumulator, workerNode) =>
f59e1027 287 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 288 0
a4e07f72
JB
289 ),
290 failedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
f59e1027 292 accumulator + workerNode.usage.tasks.failed,
a4e07f72 293 0
6b27d407
JB
294 )
295 }
296 }
08f3f44c 297
afe0d5bf
JB
298 /**
299 * Gets the pool run time.
300 *
301 * @returns The pool run time in milliseconds.
302 */
303 private get runTime (): number {
304 return performance.now() - this.startTimestamp
305 }
306
307 /**
308 * Gets the approximate pool utilization.
309 *
310 * @returns The pool utilization.
311 */
312 private get utilization (): number {
313 const poolRunTimeCapacity = this.runTime * this.maxSize
314 const totalTasksRunTime = this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.runTime.aggregate,
317 0
318 )
319 const totalTasksWaitTime = this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.waitTime.aggregate,
322 0
323 )
324 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
325 }
326
8881ae32
JB
327 /**
328 * Pool type.
329 *
330 * If it is `'dynamic'`, it provides the `max` property.
331 */
332 protected abstract get type (): PoolType
333
184855e6
JB
334 /**
335 * Gets the worker type.
336 */
337 protected abstract get worker (): WorkerType
338
c2ade475 339 /**
6b27d407 340 * Pool minimum size.
c2ade475 341 */
6b27d407 342 protected abstract get minSize (): number
ff733df7
JB
343
344 /**
6b27d407 345 * Pool maximum size.
ff733df7 346 */
6b27d407 347 protected abstract get maxSize (): number
a35560ba 348
f59e1027
JB
349 /**
350 * Get the worker given its id.
351 *
352 * @param workerId - The worker id.
353 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
354 */
355 private getWorkerById (workerId: number): Worker | undefined {
356 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
357 ?.worker
358 }
359
ffcbbad8 360 /**
f06e48d8 361 * Gets the given worker its worker node key.
ffcbbad8
JB
362 *
363 * @param worker - The worker.
f59e1027 364 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 365 */
f06e48d8
JB
366 private getWorkerNodeKey (worker: Worker): number {
367 return this.workerNodes.findIndex(
368 workerNode => workerNode.worker === worker
369 )
bf9549ae
JB
370 }
371
afc003b2 372 /** @inheritDoc */
a35560ba 373 public setWorkerChoiceStrategy (
59219cbb
JB
374 workerChoiceStrategy: WorkerChoiceStrategy,
375 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 376 ): void {
aee46736 377 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 378 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
379 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
380 this.opts.workerChoiceStrategy
381 )
382 if (workerChoiceStrategyOptions != null) {
383 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
384 }
9c16fb4b 385 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
386 this.setWorkerNodeTasksUsage(
387 workerNode,
9c16fb4b 388 this.getWorkerUsage(workerNodeKey)
8604aaab 389 )
b6b32453 390 this.setWorkerStatistics(workerNode.worker)
59219cbb 391 }
a20f0ba5
JB
392 }
393
394 /** @inheritDoc */
395 public setWorkerChoiceStrategyOptions (
396 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
397 ): void {
0d80593b 398 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
399 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
400 this.workerChoiceStrategyContext.setOptions(
401 this.opts.workerChoiceStrategyOptions
a35560ba
S
402 )
403 }
404
a20f0ba5 405 /** @inheritDoc */
8f52842f
JB
406 public enableTasksQueue (
407 enable: boolean,
408 tasksQueueOptions?: TasksQueueOptions
409 ): void {
a20f0ba5 410 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 411 this.flushTasksQueues()
a20f0ba5
JB
412 }
413 this.opts.enableTasksQueue = enable
8f52842f 414 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
415 }
416
417 /** @inheritDoc */
8f52842f 418 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 419 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
420 this.checkValidTasksQueueOptions(tasksQueueOptions)
421 this.opts.tasksQueueOptions =
422 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 423 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
424 delete this.opts.tasksQueueOptions
425 }
426 }
427
428 private buildTasksQueueOptions (
429 tasksQueueOptions: TasksQueueOptions
430 ): TasksQueueOptions {
431 return {
432 concurrency: tasksQueueOptions?.concurrency ?? 1
433 }
434 }
435
c319c66b
JB
436 /**
437 * Whether the pool is full or not.
438 *
439 * The pool filling boolean status.
440 */
dea903a8
JB
441 protected get full (): boolean {
442 return this.workerNodes.length >= this.maxSize
443 }
c2ade475 444
c319c66b
JB
445 /**
446 * Whether the pool is busy or not.
447 *
448 * The pool busyness boolean status.
449 */
450 protected abstract get busy (): boolean
7c0ba920 451
6c6afb84
JB
452 /**
453 * Whether worker nodes are executing at least one task.
454 *
455 * @returns Worker nodes busyness boolean status.
456 */
c2ade475 457 protected internalBusy (): boolean {
e0ae6100
JB
458 return (
459 this.workerNodes.findIndex(workerNode => {
f59e1027 460 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
461 }) === -1
462 )
cb70b19d
JB
463 }
464
afc003b2 465 /** @inheritDoc */
a86b6df1 466 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 467 const timestamp = performance.now()
20dcad1a 468 const workerNodeKey = this.chooseWorkerNode()
adc3c320 469 const submittedTask: Task<Data> = {
a86b6df1 470 name,
e5a5c0fc
JB
471 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
472 data: data ?? ({} as Data),
b6b32453 473 timestamp,
adc3c320
JB
474 id: crypto.randomUUID()
475 }
2e81254d 476 const res = new Promise<Response>((resolve, reject) => {
02706357 477 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
478 resolve,
479 reject,
20dcad1a 480 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
481 })
482 })
ff733df7
JB
483 if (
484 this.opts.enableTasksQueue === true &&
7171d33f 485 (this.busy ||
f59e1027 486 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 487 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 488 .concurrency as number))
ff733df7 489 ) {
26a929d7
JB
490 this.enqueueTask(workerNodeKey, submittedTask)
491 } else {
2e81254d 492 this.executeTask(workerNodeKey, submittedTask)
adc3c320 493 }
ff733df7 494 this.checkAndEmitEvents()
78cea37e 495 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
496 return res
497 }
c97c7edb 498
afc003b2 499 /** @inheritDoc */
c97c7edb 500 public async destroy (): Promise<void> {
1fbcaa7c 501 await Promise.all(
875a7c37
JB
502 this.workerNodes.map(async (workerNode, workerNodeKey) => {
503 this.flushTasksQueue(workerNodeKey)
47aacbaa 504 // FIXME: wait for tasks to be finished
f06e48d8 505 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
506 })
507 )
c97c7edb
S
508 }
509
4a6952ff 510 /**
6c6afb84 511 * Terminates the given worker.
4a6952ff 512 *
f06e48d8 513 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
514 */
515 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 516
729c563d 517 /**
6677a3d3
JB
518 * Setup hook to execute code before worker nodes are created in the abstract constructor.
519 * Can be overridden.
afc003b2
JB
520 *
521 * @virtual
729c563d 522 */
280c2a77 523 protected setupHook (): void {
d99ba5a8 524 // Intentionally empty
280c2a77 525 }
c97c7edb 526
729c563d 527 /**
280c2a77
S
528 * Should return whether the worker is the main worker or not.
529 */
530 protected abstract isMain (): boolean
531
532 /**
2e81254d 533 * Hook executed before the worker task execution.
bf9549ae 534 * Can be overridden.
729c563d 535 *
f06e48d8 536 * @param workerNodeKey - The worker node key.
1c6fe997 537 * @param task - The task to execute.
729c563d 538 */
1c6fe997
JB
539 protected beforeTaskExecutionHook (
540 workerNodeKey: number,
541 task: Task<Data>
542 ): void {
f59e1027 543 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
544 ++workerUsage.tasks.executing
545 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
546 }
547
c01733f1 548 /**
2e81254d 549 * Hook executed after the worker task execution.
bf9549ae 550 * Can be overridden.
c01733f1 551 *
c923ce56 552 * @param worker - The worker.
38e795c1 553 * @param message - The received message.
c01733f1 554 */
2e81254d 555 protected afterTaskExecutionHook (
c923ce56 556 worker: Worker,
2740a743 557 message: MessageValue<Response>
bf9549ae 558 ): void {
f59e1027 559 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
560 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
561 this.updateRunTimeWorkerUsage(workerUsage, message)
562 this.updateEluWorkerUsage(workerUsage, message)
563 }
564
565 private updateTaskStatisticsWorkerUsage (
566 workerUsage: WorkerUsage,
567 message: MessageValue<Response>
568 ): void {
a4e07f72
JB
569 const workerTaskStatistics = workerUsage.tasks
570 --workerTaskStatistics.executing
571 ++workerTaskStatistics.executed
82f36766 572 if (message.taskError != null) {
a4e07f72 573 ++workerTaskStatistics.failed
2740a743 574 }
f8eb0a2a
JB
575 }
576
a4e07f72
JB
577 private updateRunTimeWorkerUsage (
578 workerUsage: WorkerUsage,
f8eb0a2a
JB
579 message: MessageValue<Response>
580 ): void {
87de9ff5
JB
581 if (
582 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 583 .aggregate
87de9ff5 584 ) {
932fc8be 585 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 586 if (
932fc8be
JB
587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
588 .average &&
a4e07f72 589 workerUsage.tasks.executed !== 0
c6bd2650 590 ) {
a4e07f72 591 workerUsage.runTime.average =
f1c06930
JB
592 workerUsage.runTime.aggregate /
593 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 594 }
3fa4cdd2 595 if (
932fc8be
JB
596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
597 .median &&
d715b7bc 598 message.taskPerformance?.runTime != null
3fa4cdd2 599 ) {
a4e07f72
JB
600 workerUsage.runTime.history.push(message.taskPerformance.runTime)
601 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 602 }
3032893a 603 }
f8eb0a2a
JB
604 }
605
a4e07f72
JB
606 private updateWaitTimeWorkerUsage (
607 workerUsage: WorkerUsage,
1c6fe997 608 task: Task<Data>
f8eb0a2a 609 ): void {
1c6fe997
JB
610 const timestamp = performance.now()
611 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
612 if (
613 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 614 .aggregate
87de9ff5 615 ) {
932fc8be 616 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 617 if (
87de9ff5 618 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 619 .waitTime.average &&
a4e07f72 620 workerUsage.tasks.executed !== 0
09a6305f 621 ) {
a4e07f72 622 workerUsage.waitTime.average =
f1c06930
JB
623 workerUsage.waitTime.aggregate /
624 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
625 }
626 if (
87de9ff5 627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 628 .waitTime.median &&
1c6fe997 629 taskWaitTime != null
09a6305f 630 ) {
1c6fe997 631 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 632 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 633 }
0567595a 634 }
c01733f1 635 }
636
a4e07f72 637 private updateEluWorkerUsage (
5df69fab 638 workerUsage: WorkerUsage,
62c15a68
JB
639 message: MessageValue<Response>
640 ): void {
5df69fab
JB
641 if (
642 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
643 .aggregate
644 ) {
645 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
646 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
647 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
648 workerUsage.elu.utilization =
649 (workerUsage.elu.utilization +
650 message.taskPerformance.elu.utilization) /
651 2
652 } else if (message.taskPerformance?.elu != null) {
653 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
654 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
655 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
656 }
d715b7bc 657 if (
5df69fab
JB
658 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
659 .average &&
660 workerUsage.tasks.executed !== 0
661 ) {
f1c06930
JB
662 const executedTasks =
663 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 664 workerUsage.elu.idle.average =
f1c06930 665 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 666 workerUsage.elu.active.average =
f1c06930 667 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
668 }
669 if (
670 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
671 .median &&
d715b7bc
JB
672 message.taskPerformance?.elu != null
673 ) {
5df69fab
JB
674 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
675 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
676 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
677 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
678 }
679 }
680 }
681
280c2a77 682 /**
f06e48d8 683 * Chooses a worker node for the next task.
280c2a77 684 *
6c6afb84 685 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 686 *
20dcad1a 687 * @returns The worker node key
280c2a77 688 */
6c6afb84 689 private chooseWorkerNode (): number {
930dcf12 690 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
691 const worker = this.createAndSetupDynamicWorker()
692 if (
693 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
694 ) {
695 return this.getWorkerNodeKey(worker)
696 }
17393ac8 697 }
930dcf12
JB
698 return this.workerChoiceStrategyContext.execute()
699 }
700
6c6afb84
JB
701 /**
702 * Conditions for dynamic worker creation.
703 *
704 * @returns Whether to create a dynamic worker or not.
705 */
706 private shallCreateDynamicWorker (): boolean {
930dcf12 707 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
708 }
709
280c2a77 710 /**
675bb809 711 * Sends a message to the given worker.
280c2a77 712 *
38e795c1
JB
713 * @param worker - The worker which should receive the message.
714 * @param message - The message.
280c2a77
S
715 */
716 protected abstract sendToWorker (
717 worker: Worker,
718 message: MessageValue<Data>
719 ): void
720
4a6952ff 721 /**
f06e48d8 722 * Registers a listener callback on the given worker.
4a6952ff 723 *
38e795c1
JB
724 * @param worker - The worker which should register a listener.
725 * @param listener - The message listener callback.
4a6952ff 726 */
e102732c
JB
727 private registerWorkerMessageListener<Message extends Data | Response>(
728 worker: Worker,
729 listener: (message: MessageValue<Message>) => void
730 ): void {
731 worker.on('message', listener as MessageHandler<Worker>)
732 }
c97c7edb 733
729c563d 734 /**
41344292 735 * Creates a new worker.
6c6afb84
JB
736 *
737 * @returns Newly created worker.
729c563d 738 */
280c2a77 739 protected abstract createWorker (): Worker
c97c7edb 740
729c563d 741 /**
f06e48d8 742 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 743 * Can be overridden.
729c563d 744 *
38e795c1 745 * @param worker - The newly created worker.
729c563d 746 */
6677a3d3 747 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
748 // Listen to worker messages.
749 this.registerWorkerMessageListener(worker, this.workerListener())
750 }
c97c7edb 751
4a6952ff 752 /**
f06e48d8 753 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
754 *
755 * @returns New, completely set up worker.
756 */
757 protected createAndSetupWorker (): Worker {
bdacc2d2 758 const worker = this.createWorker()
280c2a77 759
35cf1c03 760 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 761 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
762 worker.on('error', error => {
763 if (this.emitter != null) {
764 this.emitter.emit(PoolEvents.error, error)
765 }
e8a4c3ea 766 if (this.opts.restartWorkerOnError === true) {
1f68cede 767 this.createAndSetupWorker()
5baee0d7
JB
768 }
769 })
a35560ba
S
770 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
771 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 772 worker.once('exit', () => {
f06e48d8 773 this.removeWorkerNode(worker)
a974afa6 774 })
280c2a77 775
f06e48d8 776 this.pushWorkerNode(worker)
280c2a77 777
b6b32453
JB
778 this.setWorkerStatistics(worker)
779
280c2a77
S
780 this.afterWorkerSetup(worker)
781
c97c7edb
S
782 return worker
783 }
be0676b3 784
930dcf12
JB
785 /**
786 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
787 *
788 * @returns New, completely set up dynamic worker.
789 */
790 protected createAndSetupDynamicWorker (): Worker {
791 const worker = this.createAndSetupWorker()
792 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 793 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
794 if (
795 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
796 (message.kill != null &&
797 ((this.opts.enableTasksQueue === false &&
f59e1027 798 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 799 (this.opts.enableTasksQueue === true &&
f59e1027 800 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 801 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
802 ) {
803 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
804 void (this.destroyWorker(worker) as Promise<void>)
805 }
806 })
807 return worker
808 }
809
be0676b3 810 /**
ff733df7 811 * This function is the listener registered for each worker message.
be0676b3 812 *
bdacc2d2 813 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
814 */
815 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 816 return message => {
f59e1027
JB
817 if (message.workerId != null && message.started != null) {
818 // Worker started message received
83fa0a36
JB
819 const worker = this.getWorkerById(message.workerId)
820 if (worker != null) {
821 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
822 message.started
823 } else {
7ae6fb74
JB
824 throw new Error(
825 `Worker started message received from unknown worker '${message.workerId}'`
826 )
83fa0a36 827 }
f59e1027 828 } else if (message.id != null) {
a3445496 829 // Task execution response received
2740a743 830 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 831 if (promiseResponse != null) {
82f36766 832 if (message.taskError != null) {
91ee39ed 833 if (this.emitter != null) {
82f36766 834 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 835 }
985d0e79 836 promiseResponse.reject(message.taskError.message)
a05c10de 837 } else {
2740a743 838 promiseResponse.resolve(message.data as Response)
a05c10de 839 }
2e81254d 840 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 841 this.promiseResponseMap.delete(message.id)
ff733df7
JB
842 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
843 if (
844 this.opts.enableTasksQueue === true &&
416fd65c 845 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 846 ) {
2e81254d
JB
847 this.executeTask(
848 workerNodeKey,
ff733df7
JB
849 this.dequeueTask(workerNodeKey) as Task<Data>
850 )
851 }
e5536a06 852 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
853 }
854 }
855 }
be0676b3 856 }
7c0ba920 857
ff733df7 858 private checkAndEmitEvents (): void {
1f68cede 859 if (this.emitter != null) {
ff733df7 860 if (this.busy) {
6b27d407 861 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 862 }
6b27d407
JB
863 if (this.type === PoolTypes.dynamic && this.full) {
864 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 865 }
164d950a
JB
866 }
867 }
868
0ebe2a9f
JB
869 /**
870 * Sets the given worker node its tasks usage in the pool.
871 *
872 * @param workerNode - The worker node.
a4e07f72 873 * @param workerUsage - The worker usage.
0ebe2a9f
JB
874 */
875 private setWorkerNodeTasksUsage (
876 workerNode: WorkerNode<Worker, Data>,
a4e07f72 877 workerUsage: WorkerUsage
0ebe2a9f 878 ): void {
f59e1027 879 workerNode.usage = workerUsage
0ebe2a9f
JB
880 }
881
a05c10de 882 /**
f06e48d8 883 * Pushes the given worker in the pool worker nodes.
ea7a90d3 884 *
38e795c1 885 * @param worker - The worker.
f06e48d8 886 * @returns The worker nodes length.
ea7a90d3 887 */
f06e48d8 888 private pushWorkerNode (worker: Worker): number {
9c16fb4b 889 this.workerNodes.push({
ffcbbad8 890 worker,
7ae6fb74 891 info: { id: this.getWorkerId(worker), started: true },
f59e1027 892 usage: this.getWorkerUsage(),
29ee7e9a 893 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 894 })
9c16fb4b
JB
895 const workerNodeKey = this.getWorkerNodeKey(worker)
896 this.setWorkerNodeTasksUsage(
897 this.workerNodes[workerNodeKey],
898 this.getWorkerUsage(workerNodeKey)
899 )
900 return this.workerNodes.length
ea7a90d3 901 }
c923ce56 902
83fa0a36
JB
903 /**
904 * Gets the worker id.
905 *
906 * @param worker - The worker.
907 * @returns The worker id.
908 */
909 private getWorkerId (worker: Worker): number | undefined {
910 if (this.worker === WorkerTypes.thread) {
911 return worker.threadId
912 } else if (this.worker === WorkerTypes.cluster) {
913 return worker.id
914 }
915 }
916
8604aaab
JB
917 // /**
918 // * Sets the given worker in the pool worker nodes.
919 // *
920 // * @param workerNodeKey - The worker node key.
921 // * @param worker - The worker.
f59e1027 922 // * @param workerInfo - The worker info.
8604aaab
JB
923 // * @param workerUsage - The worker usage.
924 // * @param tasksQueue - The worker task queue.
925 // */
926 // private setWorkerNode (
927 // workerNodeKey: number,
928 // worker: Worker,
f59e1027 929 // workerInfo: WorkerInfo,
8604aaab
JB
930 // workerUsage: WorkerUsage,
931 // tasksQueue: Queue<Task<Data>>
932 // ): void {
933 // this.workerNodes[workerNodeKey] = {
934 // worker,
f59e1027
JB
935 // info: workerInfo,
936 // usage: workerUsage,
8604aaab
JB
937 // tasksQueue
938 // }
939 // }
51fe3d3c
JB
940
941 /**
f06e48d8 942 * Removes the given worker from the pool worker nodes.
51fe3d3c 943 *
f06e48d8 944 * @param worker - The worker.
51fe3d3c 945 */
416fd65c 946 private removeWorkerNode (worker: Worker): void {
f06e48d8 947 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
948 if (workerNodeKey !== -1) {
949 this.workerNodes.splice(workerNodeKey, 1)
950 this.workerChoiceStrategyContext.remove(workerNodeKey)
951 }
51fe3d3c 952 }
adc3c320 953
2e81254d 954 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 955 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
956 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
957 }
958
f9f00b5f 959 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 960 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
961 }
962
416fd65c 963 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 964 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
965 }
966
416fd65c 967 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 968 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 969 }
ff733df7 970
df593701
JB
971 private tasksMaxQueueSize (workerNodeKey: number): number {
972 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
973 }
974
416fd65c
JB
975 private flushTasksQueue (workerNodeKey: number): void {
976 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
977 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
978 this.executeTask(
979 workerNodeKey,
980 this.dequeueTask(workerNodeKey) as Task<Data>
981 )
ff733df7 982 }
ff733df7 983 }
df593701 984 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
985 }
986
ef41a6e6
JB
987 private flushTasksQueues (): void {
988 for (const [workerNodeKey] of this.workerNodes.entries()) {
989 this.flushTasksQueue(workerNodeKey)
990 }
991 }
b6b32453
JB
992
993 private setWorkerStatistics (worker: Worker): void {
994 this.sendToWorker(worker, {
995 statistics: {
87de9ff5
JB
996 runTime:
997 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 998 .runTime.aggregate,
87de9ff5 999 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1000 .elu.aggregate
b6b32453
JB
1001 }
1002 })
1003 }
8604aaab 1004
9c16fb4b 1005 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1006 const getTasksQueueSize = (workerNodeKey?: number): number => {
1007 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1008 }
df593701
JB
1009 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1010 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1011 }
8604aaab 1012 return {
9c16fb4b
JB
1013 tasks: {
1014 executed: 0,
1015 executing: 0,
1016 get queued (): number {
e3347a5c 1017 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1018 },
df593701
JB
1019 get maxQueued (): number {
1020 return getTasksMaxQueueSize(workerNodeKey)
1021 },
9c16fb4b
JB
1022 failed: 0
1023 },
8604aaab 1024 runTime: {
932fc8be 1025 aggregate: 0,
8604aaab
JB
1026 average: 0,
1027 median: 0,
1028 history: new CircularArray()
1029 },
1030 waitTime: {
932fc8be 1031 aggregate: 0,
8604aaab
JB
1032 average: 0,
1033 median: 0,
1034 history: new CircularArray()
1035 },
5df69fab
JB
1036 elu: {
1037 idle: {
1038 aggregate: 0,
1039 average: 0,
1040 median: 0,
1041 history: new CircularArray()
1042 },
1043 active: {
1044 aggregate: 0,
1045 average: 0,
1046 median: 0,
1047 history: new CircularArray()
1048 },
1049 utilization: 0
1050 }
8604aaab
JB
1051 }
1052 }
c97c7edb 1053}