feat: add minimum and maximum to internal measurements
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 41import { version } from './version'
23ccf9d7 42
729c563d 43/**
ea7a90d3 44 * Base class that implements some shared logic for all poolifier pools.
729c563d 45 *
38e795c1 46 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 49 */
c97c7edb 50export abstract class AbstractPool<
f06e48d8 51 Worker extends IWorker,
d3c8a1a8
S
52 Data = unknown,
53 Response = unknown
c4855468 54> implements IPool<Worker, Data, Response> {
afc003b2 55 /** @inheritDoc */
f06e48d8 56 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 57
afc003b2 58 /** @inheritDoc */
7c0ba920
JB
59 public readonly emitter?: PoolEmitter
60
be0676b3 61 /**
a3445496 62 * The execution response promise map.
be0676b3 63 *
2740a743 64 * - `key`: The message id of each submitted task.
a3445496 65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 66 *
a3445496 67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 68 */
c923ce56
JB
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 73
a35560ba 74 /**
51fe3d3c 75 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
78 Worker,
79 Data,
80 Response
a35560ba
S
81 >
82
afe0d5bf
JB
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
729c563d
S
88 /**
89 * Constructs a new poolifier pool.
90 *
38e795c1 91 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 92 * @param filePath - Path to the worker file.
38e795c1 93 * @param opts - Options for the pool.
729c563d 94 */
c97c7edb 95 public constructor (
b4213b7f
JB
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
c97c7edb 99 ) {
78cea37e 100 if (!this.isMain()) {
c97c7edb
S
101 throw new Error('Cannot start a pool from a worker!')
102 }
8d3782fa 103 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 104 this.checkFilePath(this.filePath)
7c0ba920 105 this.checkPoolOptions(this.opts)
1086026a 106
7254e419
JB
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 111
6bd72cd0 112 if (this.opts.enableEvents === true) {
7c0ba920
JB
113 this.emitter = new PoolEmitter()
114 }
d59df138
JB
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
da309861
JB
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
b6b32453
JB
124
125 this.setupHook()
126
afe0d5bf 127 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
128 this.createAndSetupWorker()
129 }
afe0d5bf
JB
130
131 this.startTimestamp = performance.now()
c97c7edb
S
132 }
133
a35560ba 134 private checkFilePath (filePath: string): void {
ffcbbad8
JB
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
c510fea7
APA
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
8d3782fa
JB
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
78cea37e 148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 149 throw new TypeError(
0d80593b 150 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
151 )
152 } else if (numberOfWorkers < 0) {
473c717a 153 throw new RangeError(
8d3782fa
JB
154 'Cannot instantiate a pool with a negative number of workers'
155 )
6b27d407 156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
157 throw new Error('Cannot instantiate a fixed pool with no worker')
158 }
159 }
160
7c0ba920 161 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
162 if (isPlainObject(opts)) {
163 this.opts.workerChoiceStrategy =
164 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
165 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
166 this.opts.workerChoiceStrategyOptions =
167 opts.workerChoiceStrategyOptions ??
168 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
169 this.checkValidWorkerChoiceStrategyOptions(
170 this.opts.workerChoiceStrategyOptions
171 )
1f68cede 172 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
173 this.opts.enableEvents = opts.enableEvents ?? true
174 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
175 if (this.opts.enableTasksQueue) {
176 this.checkValidTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
180 opts.tasksQueueOptions as TasksQueueOptions
181 )
182 }
183 } else {
184 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 185 }
aee46736
JB
186 }
187
188 private checkValidWorkerChoiceStrategy (
189 workerChoiceStrategy: WorkerChoiceStrategy
190 ): void {
191 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 192 throw new Error(
aee46736 193 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
194 )
195 }
7c0ba920
JB
196 }
197
0d80593b
JB
198 private checkValidWorkerChoiceStrategyOptions (
199 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
200 ): void {
201 if (!isPlainObject(workerChoiceStrategyOptions)) {
202 throw new TypeError(
203 'Invalid worker choice strategy options: must be a plain object'
204 )
205 }
49be33fe
JB
206 if (
207 workerChoiceStrategyOptions.weights != null &&
6b27d407 208 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
209 ) {
210 throw new Error(
211 'Invalid worker choice strategy options: must have a weight for each worker node'
212 )
213 }
f0d7f803
JB
214 if (
215 workerChoiceStrategyOptions.measurement != null &&
216 !Object.values(Measurements).includes(
217 workerChoiceStrategyOptions.measurement
218 )
219 ) {
220 throw new Error(
221 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
222 )
223 }
0d80593b
JB
224 }
225
a20f0ba5
JB
226 private checkValidTasksQueueOptions (
227 tasksQueueOptions: TasksQueueOptions
228 ): void {
0d80593b
JB
229 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
230 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 }
f0d7f803
JB
232 if (
233 tasksQueueOptions?.concurrency != null &&
234 !Number.isSafeInteger(tasksQueueOptions.concurrency)
235 ) {
236 throw new TypeError(
237 'Invalid worker tasks concurrency: must be an integer'
238 )
239 }
240 if (
241 tasksQueueOptions?.concurrency != null &&
242 tasksQueueOptions.concurrency <= 0
243 ) {
a20f0ba5 244 throw new Error(
f0d7f803 245 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
246 )
247 }
248 }
249
08f3f44c 250 /** @inheritDoc */
6b27d407
JB
251 public get info (): PoolInfo {
252 return {
23ccf9d7 253 version,
6b27d407 254 type: this.type,
184855e6 255 worker: this.worker,
6b27d407
JB
256 minSize: this.minSize,
257 maxSize: this.maxSize,
c05f0d50
JB
258 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .runTime.aggregate &&
1305e9a8
JB
260 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
261 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
262 workerNodes: this.workerNodes.length,
263 idleWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
f59e1027 265 workerNode.usage.tasks.executing === 0
a4e07f72
JB
266 ? accumulator + 1
267 : accumulator,
6b27d407
JB
268 0
269 ),
270 busyWorkerNodes: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
f59e1027 272 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
273 0
274 ),
a4e07f72 275 executedTasks: this.workerNodes.reduce(
6b27d407 276 (accumulator, workerNode) =>
f59e1027 277 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
278 0
279 ),
280 executingTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
f59e1027 282 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
283 0
284 ),
285 queuedTasks: this.workerNodes.reduce(
df593701 286 (accumulator, workerNode) =>
f59e1027 287 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
288 0
289 ),
290 maxQueuedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
f59e1027 292 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 293 0
a4e07f72
JB
294 ),
295 failedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
f59e1027 297 accumulator + workerNode.usage.tasks.failed,
a4e07f72 298 0
6b27d407
JB
299 )
300 }
301 }
08f3f44c 302
afe0d5bf
JB
303 /**
304 * Gets the approximate pool utilization.
305 *
306 * @returns The pool utilization.
307 */
308 private get utilization (): number {
fe7d90db
JB
309 const poolRunTimeCapacity =
310 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
311 const totalTasksRunTime = this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.runTime.aggregate,
314 0
315 )
316 const totalTasksWaitTime = this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.waitTime.aggregate,
319 0
320 )
321 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
322 }
323
8881ae32
JB
324 /**
325 * Pool type.
326 *
327 * If it is `'dynamic'`, it provides the `max` property.
328 */
329 protected abstract get type (): PoolType
330
184855e6
JB
331 /**
332 * Gets the worker type.
333 */
334 protected abstract get worker (): WorkerType
335
c2ade475 336 /**
6b27d407 337 * Pool minimum size.
c2ade475 338 */
6b27d407 339 protected abstract get minSize (): number
ff733df7
JB
340
341 /**
6b27d407 342 * Pool maximum size.
ff733df7 343 */
6b27d407 344 protected abstract get maxSize (): number
a35560ba 345
f59e1027
JB
346 /**
347 * Get the worker given its id.
348 *
349 * @param workerId - The worker id.
350 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
351 */
352 private getWorkerById (workerId: number): Worker | undefined {
353 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
354 ?.worker
355 }
356
ffcbbad8 357 /**
f06e48d8 358 * Gets the given worker its worker node key.
ffcbbad8
JB
359 *
360 * @param worker - The worker.
f59e1027 361 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 362 */
f06e48d8
JB
363 private getWorkerNodeKey (worker: Worker): number {
364 return this.workerNodes.findIndex(
365 workerNode => workerNode.worker === worker
366 )
bf9549ae
JB
367 }
368
afc003b2 369 /** @inheritDoc */
a35560ba 370 public setWorkerChoiceStrategy (
59219cbb
JB
371 workerChoiceStrategy: WorkerChoiceStrategy,
372 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 373 ): void {
aee46736 374 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 375 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
376 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
377 this.opts.workerChoiceStrategy
378 )
379 if (workerChoiceStrategyOptions != null) {
380 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
381 }
9c16fb4b 382 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
383 this.setWorkerNodeTasksUsage(
384 workerNode,
9c16fb4b 385 this.getWorkerUsage(workerNodeKey)
8604aaab 386 )
b6b32453 387 this.setWorkerStatistics(workerNode.worker)
59219cbb 388 }
a20f0ba5
JB
389 }
390
391 /** @inheritDoc */
392 public setWorkerChoiceStrategyOptions (
393 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
394 ): void {
0d80593b 395 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
396 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
397 this.workerChoiceStrategyContext.setOptions(
398 this.opts.workerChoiceStrategyOptions
a35560ba
S
399 )
400 }
401
a20f0ba5 402 /** @inheritDoc */
8f52842f
JB
403 public enableTasksQueue (
404 enable: boolean,
405 tasksQueueOptions?: TasksQueueOptions
406 ): void {
a20f0ba5 407 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 408 this.flushTasksQueues()
a20f0ba5
JB
409 }
410 this.opts.enableTasksQueue = enable
8f52842f 411 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
412 }
413
414 /** @inheritDoc */
8f52842f 415 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 416 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
417 this.checkValidTasksQueueOptions(tasksQueueOptions)
418 this.opts.tasksQueueOptions =
419 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 420 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
421 delete this.opts.tasksQueueOptions
422 }
423 }
424
425 private buildTasksQueueOptions (
426 tasksQueueOptions: TasksQueueOptions
427 ): TasksQueueOptions {
428 return {
429 concurrency: tasksQueueOptions?.concurrency ?? 1
430 }
431 }
432
c319c66b
JB
433 /**
434 * Whether the pool is full or not.
435 *
436 * The pool filling boolean status.
437 */
dea903a8
JB
438 protected get full (): boolean {
439 return this.workerNodes.length >= this.maxSize
440 }
c2ade475 441
c319c66b
JB
442 /**
443 * Whether the pool is busy or not.
444 *
445 * The pool busyness boolean status.
446 */
447 protected abstract get busy (): boolean
7c0ba920 448
6c6afb84
JB
449 /**
450 * Whether worker nodes are executing at least one task.
451 *
452 * @returns Worker nodes busyness boolean status.
453 */
c2ade475 454 protected internalBusy (): boolean {
e0ae6100
JB
455 return (
456 this.workerNodes.findIndex(workerNode => {
f59e1027 457 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
458 }) === -1
459 )
cb70b19d
JB
460 }
461
afc003b2 462 /** @inheritDoc */
a86b6df1 463 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 464 const timestamp = performance.now()
20dcad1a 465 const workerNodeKey = this.chooseWorkerNode()
adc3c320 466 const submittedTask: Task<Data> = {
a86b6df1 467 name,
e5a5c0fc
JB
468 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
469 data: data ?? ({} as Data),
b6b32453 470 timestamp,
adc3c320
JB
471 id: crypto.randomUUID()
472 }
2e81254d 473 const res = new Promise<Response>((resolve, reject) => {
02706357 474 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
475 resolve,
476 reject,
20dcad1a 477 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
478 })
479 })
ff733df7
JB
480 if (
481 this.opts.enableTasksQueue === true &&
7171d33f 482 (this.busy ||
f59e1027 483 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 484 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 485 .concurrency as number))
ff733df7 486 ) {
26a929d7
JB
487 this.enqueueTask(workerNodeKey, submittedTask)
488 } else {
2e81254d 489 this.executeTask(workerNodeKey, submittedTask)
adc3c320 490 }
ff733df7 491 this.checkAndEmitEvents()
78cea37e 492 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
493 return res
494 }
c97c7edb 495
afc003b2 496 /** @inheritDoc */
c97c7edb 497 public async destroy (): Promise<void> {
1fbcaa7c 498 await Promise.all(
875a7c37
JB
499 this.workerNodes.map(async (workerNode, workerNodeKey) => {
500 this.flushTasksQueue(workerNodeKey)
47aacbaa 501 // FIXME: wait for tasks to be finished
920278a2
JB
502 const workerExitPromise = new Promise<void>(resolve => {
503 workerNode.worker.on('exit', () => {
504 resolve()
505 })
506 })
f06e48d8 507 await this.destroyWorker(workerNode.worker)
920278a2 508 await workerExitPromise
1fbcaa7c
JB
509 })
510 )
c97c7edb
S
511 }
512
4a6952ff 513 /**
6c6afb84 514 * Terminates the given worker.
4a6952ff 515 *
f06e48d8 516 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
517 */
518 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 519
729c563d 520 /**
6677a3d3
JB
521 * Setup hook to execute code before worker nodes are created in the abstract constructor.
522 * Can be overridden.
afc003b2
JB
523 *
524 * @virtual
729c563d 525 */
280c2a77 526 protected setupHook (): void {
d99ba5a8 527 // Intentionally empty
280c2a77 528 }
c97c7edb 529
729c563d 530 /**
280c2a77
S
531 * Should return whether the worker is the main worker or not.
532 */
533 protected abstract isMain (): boolean
534
535 /**
2e81254d 536 * Hook executed before the worker task execution.
bf9549ae 537 * Can be overridden.
729c563d 538 *
f06e48d8 539 * @param workerNodeKey - The worker node key.
1c6fe997 540 * @param task - The task to execute.
729c563d 541 */
1c6fe997
JB
542 protected beforeTaskExecutionHook (
543 workerNodeKey: number,
544 task: Task<Data>
545 ): void {
f59e1027 546 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
547 ++workerUsage.tasks.executing
548 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
549 }
550
c01733f1 551 /**
2e81254d 552 * Hook executed after the worker task execution.
bf9549ae 553 * Can be overridden.
c01733f1 554 *
c923ce56 555 * @param worker - The worker.
38e795c1 556 * @param message - The received message.
c01733f1 557 */
2e81254d 558 protected afterTaskExecutionHook (
c923ce56 559 worker: Worker,
2740a743 560 message: MessageValue<Response>
bf9549ae 561 ): void {
f59e1027 562 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
563 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
564 this.updateRunTimeWorkerUsage(workerUsage, message)
565 this.updateEluWorkerUsage(workerUsage, message)
566 }
567
568 private updateTaskStatisticsWorkerUsage (
569 workerUsage: WorkerUsage,
570 message: MessageValue<Response>
571 ): void {
a4e07f72
JB
572 const workerTaskStatistics = workerUsage.tasks
573 --workerTaskStatistics.executing
574 ++workerTaskStatistics.executed
82f36766 575 if (message.taskError != null) {
a4e07f72 576 ++workerTaskStatistics.failed
2740a743 577 }
f8eb0a2a
JB
578 }
579
a4e07f72
JB
580 private updateRunTimeWorkerUsage (
581 workerUsage: WorkerUsage,
f8eb0a2a
JB
582 message: MessageValue<Response>
583 ): void {
87de9ff5
JB
584 if (
585 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 586 .aggregate
87de9ff5 587 ) {
f7510105
JB
588 const taskRunTime = message.taskPerformance?.runTime ?? 0
589 workerUsage.runTime.aggregate += taskRunTime
590 workerUsage.runTime.minimum = Math.min(
591 taskRunTime,
592 workerUsage.runTime.minimum
593 )
594 workerUsage.runTime.maximum = Math.max(
595 taskRunTime,
596 workerUsage.runTime.maximum
597 )
c6bd2650 598 if (
932fc8be
JB
599 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
600 .average &&
a4e07f72 601 workerUsage.tasks.executed !== 0
c6bd2650 602 ) {
a4e07f72 603 workerUsage.runTime.average =
f1c06930
JB
604 workerUsage.runTime.aggregate /
605 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 606 }
3fa4cdd2 607 if (
932fc8be
JB
608 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
609 .median &&
d715b7bc 610 message.taskPerformance?.runTime != null
3fa4cdd2 611 ) {
a4e07f72
JB
612 workerUsage.runTime.history.push(message.taskPerformance.runTime)
613 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 614 }
3032893a 615 }
f8eb0a2a
JB
616 }
617
a4e07f72
JB
618 private updateWaitTimeWorkerUsage (
619 workerUsage: WorkerUsage,
1c6fe997 620 task: Task<Data>
f8eb0a2a 621 ): void {
1c6fe997
JB
622 const timestamp = performance.now()
623 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
624 if (
625 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 626 .aggregate
87de9ff5 627 ) {
f7510105
JB
628 workerUsage.waitTime.aggregate += taskWaitTime
629 workerUsage.waitTime.minimum = Math.min(
630 taskWaitTime,
631 workerUsage.waitTime.minimum
632 )
633 workerUsage.waitTime.maximum = Math.max(
634 taskWaitTime,
635 workerUsage.waitTime.maximum
636 )
09a6305f 637 if (
87de9ff5 638 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 639 .waitTime.average &&
a4e07f72 640 workerUsage.tasks.executed !== 0
09a6305f 641 ) {
a4e07f72 642 workerUsage.waitTime.average =
f1c06930
JB
643 workerUsage.waitTime.aggregate /
644 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
645 }
646 if (
87de9ff5 647 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 648 .waitTime.median &&
1c6fe997 649 taskWaitTime != null
09a6305f 650 ) {
1c6fe997 651 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 652 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 653 }
0567595a 654 }
c01733f1 655 }
656
a4e07f72 657 private updateEluWorkerUsage (
5df69fab 658 workerUsage: WorkerUsage,
62c15a68
JB
659 message: MessageValue<Response>
660 ): void {
5df69fab
JB
661 if (
662 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
663 .aggregate
664 ) {
f7510105 665 if (message.taskPerformance?.elu != null) {
9adcefab
JB
666 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
667 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
f7510105
JB
668 if (workerUsage.elu.utilization != null) {
669 workerUsage.elu.utilization =
670 (workerUsage.elu.utilization +
671 message.taskPerformance.elu.utilization) /
672 2
673 } else {
674 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
675 }
676 workerUsage.elu.idle.minimum = Math.min(
677 message.taskPerformance.elu.idle,
678 workerUsage.elu.idle.minimum
679 )
680 workerUsage.elu.idle.maximum = Math.max(
681 message.taskPerformance.elu.idle,
682 workerUsage.elu.idle.maximum
683 )
684 workerUsage.elu.active.minimum = Math.min(
685 message.taskPerformance.elu.active,
686 workerUsage.elu.active.minimum
687 )
688 workerUsage.elu.active.maximum = Math.max(
689 message.taskPerformance.elu.active,
690 workerUsage.elu.active.maximum
691 )
5df69fab 692 }
d715b7bc 693 if (
5df69fab
JB
694 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
695 .average &&
696 workerUsage.tasks.executed !== 0
697 ) {
f1c06930
JB
698 const executedTasks =
699 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 700 workerUsage.elu.idle.average =
f1c06930 701 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 702 workerUsage.elu.active.average =
f1c06930 703 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
704 }
705 if (
706 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
707 .median &&
d715b7bc
JB
708 message.taskPerformance?.elu != null
709 ) {
5df69fab
JB
710 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
711 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
712 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
713 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
714 }
715 }
716 }
717
280c2a77 718 /**
f06e48d8 719 * Chooses a worker node for the next task.
280c2a77 720 *
6c6afb84 721 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 722 *
20dcad1a 723 * @returns The worker node key
280c2a77 724 */
6c6afb84 725 private chooseWorkerNode (): number {
930dcf12 726 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
727 const worker = this.createAndSetupDynamicWorker()
728 if (
729 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
730 ) {
731 return this.getWorkerNodeKey(worker)
732 }
17393ac8 733 }
930dcf12
JB
734 return this.workerChoiceStrategyContext.execute()
735 }
736
6c6afb84
JB
737 /**
738 * Conditions for dynamic worker creation.
739 *
740 * @returns Whether to create a dynamic worker or not.
741 */
742 private shallCreateDynamicWorker (): boolean {
930dcf12 743 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
744 }
745
280c2a77 746 /**
675bb809 747 * Sends a message to the given worker.
280c2a77 748 *
38e795c1
JB
749 * @param worker - The worker which should receive the message.
750 * @param message - The message.
280c2a77
S
751 */
752 protected abstract sendToWorker (
753 worker: Worker,
754 message: MessageValue<Data>
755 ): void
756
4a6952ff 757 /**
f06e48d8 758 * Registers a listener callback on the given worker.
4a6952ff 759 *
38e795c1
JB
760 * @param worker - The worker which should register a listener.
761 * @param listener - The message listener callback.
4a6952ff 762 */
e102732c
JB
763 private registerWorkerMessageListener<Message extends Data | Response>(
764 worker: Worker,
765 listener: (message: MessageValue<Message>) => void
766 ): void {
767 worker.on('message', listener as MessageHandler<Worker>)
768 }
c97c7edb 769
729c563d 770 /**
41344292 771 * Creates a new worker.
6c6afb84
JB
772 *
773 * @returns Newly created worker.
729c563d 774 */
280c2a77 775 protected abstract createWorker (): Worker
c97c7edb 776
729c563d 777 /**
f06e48d8 778 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 779 * Can be overridden.
729c563d 780 *
38e795c1 781 * @param worker - The newly created worker.
729c563d 782 */
6677a3d3 783 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
784 // Listen to worker messages.
785 this.registerWorkerMessageListener(worker, this.workerListener())
786 }
c97c7edb 787
4a6952ff 788 /**
f06e48d8 789 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
790 *
791 * @returns New, completely set up worker.
792 */
793 protected createAndSetupWorker (): Worker {
bdacc2d2 794 const worker = this.createWorker()
280c2a77 795
35cf1c03 796 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 797 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
798 worker.on('error', error => {
799 if (this.emitter != null) {
800 this.emitter.emit(PoolEvents.error, error)
801 }
5bc91f3e
JB
802 if (this.opts.enableTasksQueue === true) {
803 const workerNodeKey = this.getWorkerNodeKey(worker)
804 while (this.tasksQueueSize(workerNodeKey) > 0) {
805 let targetWorkerNodeKey: number = workerNodeKey
806 let minQueuedTasks = Infinity
807 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
808 if (
809 workerNodeId !== workerNodeKey &&
810 workerNode.usage.tasks.queued === 0
811 ) {
812 targetWorkerNodeKey = workerNodeId
813 break
814 }
815 if (
816 workerNodeId !== workerNodeKey &&
817 workerNode.usage.tasks.queued < minQueuedTasks
818 ) {
819 minQueuedTasks = workerNode.usage.tasks.queued
820 targetWorkerNodeKey = workerNodeId
821 }
822 }
823 this.enqueueTask(
824 targetWorkerNodeKey,
825 this.dequeueTask(workerNodeKey) as Task<Data>
826 )
827 }
828 }
e8a4c3ea 829 if (this.opts.restartWorkerOnError === true) {
1f68cede 830 this.createAndSetupWorker()
5baee0d7
JB
831 }
832 })
a35560ba
S
833 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
834 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 835 worker.once('exit', () => {
f06e48d8 836 this.removeWorkerNode(worker)
a974afa6 837 })
280c2a77 838
f06e48d8 839 this.pushWorkerNode(worker)
280c2a77 840
b6b32453
JB
841 this.setWorkerStatistics(worker)
842
280c2a77
S
843 this.afterWorkerSetup(worker)
844
c97c7edb
S
845 return worker
846 }
be0676b3 847
930dcf12
JB
848 /**
849 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
850 *
851 * @returns New, completely set up dynamic worker.
852 */
853 protected createAndSetupDynamicWorker (): Worker {
854 const worker = this.createAndSetupWorker()
855 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 856 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
857 if (
858 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
859 (message.kill != null &&
860 ((this.opts.enableTasksQueue === false &&
f59e1027 861 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 862 (this.opts.enableTasksQueue === true &&
f59e1027 863 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 864 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
865 ) {
866 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
867 void (this.destroyWorker(worker) as Promise<void>)
868 }
869 })
870 return worker
871 }
872
be0676b3 873 /**
ff733df7 874 * This function is the listener registered for each worker message.
be0676b3 875 *
bdacc2d2 876 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
877 */
878 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 879 return message => {
f59e1027
JB
880 if (message.workerId != null && message.started != null) {
881 // Worker started message received
6b272951 882 this.handleWorkerStartedMessage(message)
f59e1027 883 } else if (message.id != null) {
a3445496 884 // Task execution response received
6b272951
JB
885 this.handleTaskExecutionResponse(message)
886 }
887 }
888 }
889
890 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
891 // Worker started message received
892 const worker = this.getWorkerById(message.workerId as number)
893 if (worker != null) {
894 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
895 message.started as boolean
896 } else {
897 throw new Error(
898 `Worker started message received from unknown worker '${
899 message.workerId as number
900 }'`
901 )
902 }
903 }
904
905 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
906 const promiseResponse = this.promiseResponseMap.get(message.id as string)
907 if (promiseResponse != null) {
908 if (message.taskError != null) {
909 if (this.emitter != null) {
910 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 911 }
6b272951
JB
912 promiseResponse.reject(message.taskError.message)
913 } else {
914 promiseResponse.resolve(message.data as Response)
915 }
916 this.afterTaskExecutionHook(promiseResponse.worker, message)
917 this.promiseResponseMap.delete(message.id as string)
918 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
919 if (
920 this.opts.enableTasksQueue === true &&
921 this.tasksQueueSize(workerNodeKey) > 0
922 ) {
923 this.executeTask(
924 workerNodeKey,
925 this.dequeueTask(workerNodeKey) as Task<Data>
926 )
be0676b3 927 }
6b272951 928 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 929 }
be0676b3 930 }
7c0ba920 931
ff733df7 932 private checkAndEmitEvents (): void {
1f68cede 933 if (this.emitter != null) {
ff733df7 934 if (this.busy) {
6b27d407 935 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 936 }
6b27d407
JB
937 if (this.type === PoolTypes.dynamic && this.full) {
938 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 939 }
164d950a
JB
940 }
941 }
942
0ebe2a9f
JB
943 /**
944 * Sets the given worker node its tasks usage in the pool.
945 *
946 * @param workerNode - The worker node.
a4e07f72 947 * @param workerUsage - The worker usage.
0ebe2a9f
JB
948 */
949 private setWorkerNodeTasksUsage (
950 workerNode: WorkerNode<Worker, Data>,
a4e07f72 951 workerUsage: WorkerUsage
0ebe2a9f 952 ): void {
f59e1027 953 workerNode.usage = workerUsage
0ebe2a9f
JB
954 }
955
a05c10de 956 /**
f06e48d8 957 * Pushes the given worker in the pool worker nodes.
ea7a90d3 958 *
38e795c1 959 * @param worker - The worker.
f06e48d8 960 * @returns The worker nodes length.
ea7a90d3 961 */
f06e48d8 962 private pushWorkerNode (worker: Worker): number {
9c16fb4b 963 this.workerNodes.push({
ffcbbad8 964 worker,
7ae6fb74 965 info: { id: this.getWorkerId(worker), started: true },
f59e1027 966 usage: this.getWorkerUsage(),
29ee7e9a 967 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 968 })
9c16fb4b
JB
969 const workerNodeKey = this.getWorkerNodeKey(worker)
970 this.setWorkerNodeTasksUsage(
971 this.workerNodes[workerNodeKey],
972 this.getWorkerUsage(workerNodeKey)
973 )
974 return this.workerNodes.length
ea7a90d3 975 }
c923ce56 976
83fa0a36
JB
977 /**
978 * Gets the worker id.
979 *
980 * @param worker - The worker.
981 * @returns The worker id.
982 */
983 private getWorkerId (worker: Worker): number | undefined {
984 if (this.worker === WorkerTypes.thread) {
985 return worker.threadId
986 } else if (this.worker === WorkerTypes.cluster) {
987 return worker.id
988 }
989 }
990
8604aaab
JB
991 // /**
992 // * Sets the given worker in the pool worker nodes.
993 // *
994 // * @param workerNodeKey - The worker node key.
995 // * @param worker - The worker.
f59e1027 996 // * @param workerInfo - The worker info.
8604aaab
JB
997 // * @param workerUsage - The worker usage.
998 // * @param tasksQueue - The worker task queue.
999 // */
1000 // private setWorkerNode (
1001 // workerNodeKey: number,
1002 // worker: Worker,
f59e1027 1003 // workerInfo: WorkerInfo,
8604aaab
JB
1004 // workerUsage: WorkerUsage,
1005 // tasksQueue: Queue<Task<Data>>
1006 // ): void {
1007 // this.workerNodes[workerNodeKey] = {
1008 // worker,
f59e1027
JB
1009 // info: workerInfo,
1010 // usage: workerUsage,
8604aaab
JB
1011 // tasksQueue
1012 // }
1013 // }
51fe3d3c
JB
1014
1015 /**
f06e48d8 1016 * Removes the given worker from the pool worker nodes.
51fe3d3c 1017 *
f06e48d8 1018 * @param worker - The worker.
51fe3d3c 1019 */
416fd65c 1020 private removeWorkerNode (worker: Worker): void {
f06e48d8 1021 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1022 if (workerNodeKey !== -1) {
1023 this.workerNodes.splice(workerNodeKey, 1)
1024 this.workerChoiceStrategyContext.remove(workerNodeKey)
1025 }
51fe3d3c 1026 }
adc3c320 1027
2e81254d 1028 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1029 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1030 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1031 }
1032
f9f00b5f 1033 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1034 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1035 }
1036
416fd65c 1037 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1038 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1039 }
1040
416fd65c 1041 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1042 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1043 }
ff733df7 1044
df593701
JB
1045 private tasksMaxQueueSize (workerNodeKey: number): number {
1046 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1047 }
1048
416fd65c 1049 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1050 while (this.tasksQueueSize(workerNodeKey) > 0) {
1051 this.executeTask(
1052 workerNodeKey,
1053 this.dequeueTask(workerNodeKey) as Task<Data>
1054 )
ff733df7 1055 }
df593701 1056 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1057 }
1058
ef41a6e6
JB
1059 private flushTasksQueues (): void {
1060 for (const [workerNodeKey] of this.workerNodes.entries()) {
1061 this.flushTasksQueue(workerNodeKey)
1062 }
1063 }
b6b32453
JB
1064
1065 private setWorkerStatistics (worker: Worker): void {
1066 this.sendToWorker(worker, {
1067 statistics: {
87de9ff5
JB
1068 runTime:
1069 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1070 .runTime.aggregate,
87de9ff5 1071 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1072 .elu.aggregate
b6b32453
JB
1073 }
1074 })
1075 }
8604aaab 1076
9c16fb4b 1077 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1078 const getTasksQueueSize = (workerNodeKey?: number): number => {
1079 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1080 }
df593701
JB
1081 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1082 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1083 }
8604aaab 1084 return {
9c16fb4b
JB
1085 tasks: {
1086 executed: 0,
1087 executing: 0,
1088 get queued (): number {
e3347a5c 1089 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1090 },
df593701
JB
1091 get maxQueued (): number {
1092 return getTasksMaxQueueSize(workerNodeKey)
1093 },
9c16fb4b
JB
1094 failed: 0
1095 },
8604aaab 1096 runTime: {
932fc8be 1097 aggregate: 0,
f7510105
JB
1098 maximum: 0,
1099 minimum: 0,
8604aaab
JB
1100 average: 0,
1101 median: 0,
1102 history: new CircularArray()
1103 },
1104 waitTime: {
932fc8be 1105 aggregate: 0,
f7510105
JB
1106 maximum: 0,
1107 minimum: 0,
8604aaab
JB
1108 average: 0,
1109 median: 0,
1110 history: new CircularArray()
1111 },
5df69fab
JB
1112 elu: {
1113 idle: {
1114 aggregate: 0,
f7510105
JB
1115 maximum: 0,
1116 minimum: 0,
5df69fab
JB
1117 average: 0,
1118 median: 0,
1119 history: new CircularArray()
1120 },
1121 active: {
1122 aggregate: 0,
f7510105
JB
1123 maximum: 0,
1124 minimum: 0,
5df69fab
JB
1125 average: 0,
1126 median: 0,
1127 history: new CircularArray()
f7510105 1128 }
5df69fab 1129 }
8604aaab
JB
1130 }
1131 }
c97c7edb 1132}