refactor: safer Math.{min,max} usage
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
8a1260a3 31 WorkerInfo,
e102732c
JB
32 WorkerNode,
33 WorkerUsage
34} from './worker'
a35560ba 35import {
f0d7f803 36 Measurements,
a35560ba 37 WorkerChoiceStrategies,
a20f0ba5
JB
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
bdaf31cd
JB
40} from './selection-strategies/selection-strategies-types'
41import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 42import { version } from './version'
23ccf9d7 43
729c563d 44/**
ea7a90d3 45 * Base class that implements some shared logic for all poolifier pools.
729c563d 46 *
38e795c1 47 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 50 */
c97c7edb 51export abstract class AbstractPool<
f06e48d8 52 Worker extends IWorker,
d3c8a1a8
S
53 Data = unknown,
54 Response = unknown
c4855468 55> implements IPool<Worker, Data, Response> {
afc003b2 56 /** @inheritDoc */
f06e48d8 57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 58
afc003b2 59 /** @inheritDoc */
7c0ba920
JB
60 public readonly emitter?: PoolEmitter
61
be0676b3 62 /**
a3445496 63 * The execution response promise map.
be0676b3 64 *
2740a743 65 * - `key`: The message id of each submitted task.
a3445496 66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 67 *
a3445496 68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 69 */
c923ce56
JB
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 74
a35560ba 75 /**
51fe3d3c 76 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
79 Worker,
80 Data,
81 Response
a35560ba
S
82 >
83
afe0d5bf
JB
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
729c563d
S
89 /**
90 * Constructs a new poolifier pool.
91 *
38e795c1 92 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 93 * @param filePath - Path to the worker file.
38e795c1 94 * @param opts - Options for the pool.
729c563d 95 */
c97c7edb 96 public constructor (
b4213b7f
JB
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
c97c7edb 100 ) {
78cea37e 101 if (!this.isMain()) {
c97c7edb
S
102 throw new Error('Cannot start a pool from a worker!')
103 }
8d3782fa 104 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 105 this.checkFilePath(this.filePath)
7c0ba920 106 this.checkPoolOptions(this.opts)
1086026a 107
7254e419
JB
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 112
6bd72cd0 113 if (this.opts.enableEvents === true) {
7c0ba920
JB
114 this.emitter = new PoolEmitter()
115 }
d59df138
JB
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
da309861
JB
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
b6b32453
JB
125
126 this.setupHook()
127
afe0d5bf 128 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
129 this.createAndSetupWorker()
130 }
afe0d5bf
JB
131
132 this.startTimestamp = performance.now()
c97c7edb
S
133 }
134
a35560ba 135 private checkFilePath (filePath: string): void {
ffcbbad8
JB
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
c510fea7
APA
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
8d3782fa
JB
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
78cea37e 149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 150 throw new TypeError(
0d80593b 151 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
152 )
153 } else if (numberOfWorkers < 0) {
473c717a 154 throw new RangeError(
8d3782fa
JB
155 'Cannot instantiate a pool with a negative number of workers'
156 )
6b27d407 157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
7c0ba920 162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
1f68cede 173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 186 }
aee46736
JB
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 193 throw new Error(
aee46736 194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
195 )
196 }
7c0ba920
JB
197 }
198
0d80593b
JB
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
49be33fe
JB
207 if (
208 workerChoiceStrategyOptions.weights != null &&
6b27d407 209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
f0d7f803
JB
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
0d80593b
JB
225 }
226
a20f0ba5
JB
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
0d80593b
JB
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
f0d7f803
JB
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
a20f0ba5 245 throw new Error(
f0d7f803 246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
247 )
248 }
249 }
250
08f3f44c 251 /** @inheritDoc */
6b27d407
JB
252 public get info (): PoolInfo {
253 return {
23ccf9d7 254 version,
6b27d407 255 type: this.type,
184855e6 256 worker: this.worker,
6b27d407
JB
257 minSize: this.minSize,
258 maxSize: this.maxSize,
c05f0d50
JB
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
1305e9a8
JB
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
f59e1027 266 workerNode.usage.tasks.executing === 0
a4e07f72
JB
267 ? accumulator + 1
268 : accumulator,
6b27d407
JB
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
f59e1027 273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
274 0
275 ),
a4e07f72 276 executedTasks: this.workerNodes.reduce(
6b27d407 277 (accumulator, workerNode) =>
f59e1027 278 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
f59e1027 283 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
df593701 287 (accumulator, workerNode) =>
f59e1027 288 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
f59e1027 293 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 294 0
a4e07f72
JB
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
f59e1027 298 accumulator + workerNode.usage.tasks.failed,
a4e07f72 299 0
6b27d407
JB
300 )
301 }
302 }
08f3f44c 303
afe0d5bf
JB
304 /**
305 * Gets the approximate pool utilization.
306 *
307 * @returns The pool utilization.
308 */
309 private get utilization (): number {
fe7d90db
JB
310 const poolRunTimeCapacity =
311 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
312 const totalTasksRunTime = this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.runTime.aggregate,
315 0
316 )
317 const totalTasksWaitTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.waitTime.aggregate,
320 0
321 )
322 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
323 }
324
8881ae32
JB
325 /**
326 * Pool type.
327 *
328 * If it is `'dynamic'`, it provides the `max` property.
329 */
330 protected abstract get type (): PoolType
331
184855e6
JB
332 /**
333 * Gets the worker type.
334 */
335 protected abstract get worker (): WorkerType
336
c2ade475 337 /**
6b27d407 338 * Pool minimum size.
c2ade475 339 */
6b27d407 340 protected abstract get minSize (): number
ff733df7
JB
341
342 /**
6b27d407 343 * Pool maximum size.
ff733df7 344 */
6b27d407 345 protected abstract get maxSize (): number
a35560ba 346
f59e1027
JB
347 /**
348 * Get the worker given its id.
349 *
350 * @param workerId - The worker id.
351 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
352 */
353 private getWorkerById (workerId: number): Worker | undefined {
354 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
355 ?.worker
356 }
357
ffcbbad8 358 /**
f06e48d8 359 * Gets the given worker its worker node key.
ffcbbad8
JB
360 *
361 * @param worker - The worker.
f59e1027 362 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 363 */
f06e48d8
JB
364 private getWorkerNodeKey (worker: Worker): number {
365 return this.workerNodes.findIndex(
366 workerNode => workerNode.worker === worker
367 )
bf9549ae
JB
368 }
369
afc003b2 370 /** @inheritDoc */
a35560ba 371 public setWorkerChoiceStrategy (
59219cbb
JB
372 workerChoiceStrategy: WorkerChoiceStrategy,
373 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 374 ): void {
aee46736 375 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 376 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
377 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
378 this.opts.workerChoiceStrategy
379 )
380 if (workerChoiceStrategyOptions != null) {
381 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
382 }
8a1260a3 383 for (const workerNode of this.workerNodes) {
8604aaab
JB
384 this.setWorkerNodeTasksUsage(
385 workerNode,
8a1260a3 386 this.getInitialWorkerUsage(workerNode.worker)
8604aaab 387 )
b6b32453 388 this.setWorkerStatistics(workerNode.worker)
59219cbb 389 }
a20f0ba5
JB
390 }
391
392 /** @inheritDoc */
393 public setWorkerChoiceStrategyOptions (
394 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
395 ): void {
0d80593b 396 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
397 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
398 this.workerChoiceStrategyContext.setOptions(
399 this.opts.workerChoiceStrategyOptions
a35560ba
S
400 )
401 }
402
a20f0ba5 403 /** @inheritDoc */
8f52842f
JB
404 public enableTasksQueue (
405 enable: boolean,
406 tasksQueueOptions?: TasksQueueOptions
407 ): void {
a20f0ba5 408 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 409 this.flushTasksQueues()
a20f0ba5
JB
410 }
411 this.opts.enableTasksQueue = enable
8f52842f 412 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
413 }
414
415 /** @inheritDoc */
8f52842f 416 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 417 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
418 this.checkValidTasksQueueOptions(tasksQueueOptions)
419 this.opts.tasksQueueOptions =
420 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 421 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
422 delete this.opts.tasksQueueOptions
423 }
424 }
425
426 private buildTasksQueueOptions (
427 tasksQueueOptions: TasksQueueOptions
428 ): TasksQueueOptions {
429 return {
430 concurrency: tasksQueueOptions?.concurrency ?? 1
431 }
432 }
433
c319c66b
JB
434 /**
435 * Whether the pool is full or not.
436 *
437 * The pool filling boolean status.
438 */
dea903a8
JB
439 protected get full (): boolean {
440 return this.workerNodes.length >= this.maxSize
441 }
c2ade475 442
c319c66b
JB
443 /**
444 * Whether the pool is busy or not.
445 *
446 * The pool busyness boolean status.
447 */
448 protected abstract get busy (): boolean
7c0ba920 449
6c6afb84
JB
450 /**
451 * Whether worker nodes are executing at least one task.
452 *
453 * @returns Worker nodes busyness boolean status.
454 */
c2ade475 455 protected internalBusy (): boolean {
e0ae6100
JB
456 return (
457 this.workerNodes.findIndex(workerNode => {
f59e1027 458 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
459 }) === -1
460 )
cb70b19d
JB
461 }
462
afc003b2 463 /** @inheritDoc */
a86b6df1 464 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 465 const timestamp = performance.now()
20dcad1a 466 const workerNodeKey = this.chooseWorkerNode()
adc3c320 467 const submittedTask: Task<Data> = {
a86b6df1 468 name,
e5a5c0fc
JB
469 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
470 data: data ?? ({} as Data),
b6b32453 471 timestamp,
adc3c320
JB
472 id: crypto.randomUUID()
473 }
2e81254d 474 const res = new Promise<Response>((resolve, reject) => {
02706357 475 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
476 resolve,
477 reject,
20dcad1a 478 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
479 })
480 })
ff733df7
JB
481 if (
482 this.opts.enableTasksQueue === true &&
7171d33f 483 (this.busy ||
f59e1027 484 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 485 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 486 .concurrency as number))
ff733df7 487 ) {
26a929d7
JB
488 this.enqueueTask(workerNodeKey, submittedTask)
489 } else {
2e81254d 490 this.executeTask(workerNodeKey, submittedTask)
adc3c320 491 }
ff733df7 492 this.checkAndEmitEvents()
78cea37e 493 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
494 return res
495 }
c97c7edb 496
afc003b2 497 /** @inheritDoc */
c97c7edb 498 public async destroy (): Promise<void> {
1fbcaa7c 499 await Promise.all(
875a7c37
JB
500 this.workerNodes.map(async (workerNode, workerNodeKey) => {
501 this.flushTasksQueue(workerNodeKey)
47aacbaa 502 // FIXME: wait for tasks to be finished
920278a2
JB
503 const workerExitPromise = new Promise<void>(resolve => {
504 workerNode.worker.on('exit', () => {
505 resolve()
506 })
507 })
f06e48d8 508 await this.destroyWorker(workerNode.worker)
920278a2 509 await workerExitPromise
1fbcaa7c
JB
510 })
511 )
c97c7edb
S
512 }
513
4a6952ff 514 /**
6c6afb84 515 * Terminates the given worker.
4a6952ff 516 *
f06e48d8 517 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
518 */
519 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 520
729c563d 521 /**
6677a3d3
JB
522 * Setup hook to execute code before worker nodes are created in the abstract constructor.
523 * Can be overridden.
afc003b2
JB
524 *
525 * @virtual
729c563d 526 */
280c2a77 527 protected setupHook (): void {
d99ba5a8 528 // Intentionally empty
280c2a77 529 }
c97c7edb 530
729c563d 531 /**
280c2a77
S
532 * Should return whether the worker is the main worker or not.
533 */
534 protected abstract isMain (): boolean
535
536 /**
2e81254d 537 * Hook executed before the worker task execution.
bf9549ae 538 * Can be overridden.
729c563d 539 *
f06e48d8 540 * @param workerNodeKey - The worker node key.
1c6fe997 541 * @param task - The task to execute.
729c563d 542 */
1c6fe997
JB
543 protected beforeTaskExecutionHook (
544 workerNodeKey: number,
545 task: Task<Data>
546 ): void {
f59e1027 547 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
548 ++workerUsage.tasks.executing
549 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
550 }
551
c01733f1 552 /**
2e81254d 553 * Hook executed after the worker task execution.
bf9549ae 554 * Can be overridden.
c01733f1 555 *
c923ce56 556 * @param worker - The worker.
38e795c1 557 * @param message - The received message.
c01733f1 558 */
2e81254d 559 protected afterTaskExecutionHook (
c923ce56 560 worker: Worker,
2740a743 561 message: MessageValue<Response>
bf9549ae 562 ): void {
f59e1027 563 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
564 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
565 this.updateRunTimeWorkerUsage(workerUsage, message)
566 this.updateEluWorkerUsage(workerUsage, message)
567 }
568
569 private updateTaskStatisticsWorkerUsage (
570 workerUsage: WorkerUsage,
571 message: MessageValue<Response>
572 ): void {
a4e07f72
JB
573 const workerTaskStatistics = workerUsage.tasks
574 --workerTaskStatistics.executing
575 ++workerTaskStatistics.executed
82f36766 576 if (message.taskError != null) {
a4e07f72 577 ++workerTaskStatistics.failed
2740a743 578 }
f8eb0a2a
JB
579 }
580
a4e07f72
JB
581 private updateRunTimeWorkerUsage (
582 workerUsage: WorkerUsage,
f8eb0a2a
JB
583 message: MessageValue<Response>
584 ): void {
87de9ff5
JB
585 if (
586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 587 .aggregate
87de9ff5 588 ) {
f7510105
JB
589 const taskRunTime = message.taskPerformance?.runTime ?? 0
590 workerUsage.runTime.aggregate += taskRunTime
591 workerUsage.runTime.minimum = Math.min(
592 taskRunTime,
23e4439f 593 workerUsage.runTime.minimum ?? Infinity
f7510105
JB
594 )
595 workerUsage.runTime.maximum = Math.max(
596 taskRunTime,
23e4439f 597 workerUsage.runTime.maximum ?? -Infinity
f7510105 598 )
c6bd2650 599 if (
932fc8be
JB
600 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
601 .average &&
a4e07f72 602 workerUsage.tasks.executed !== 0
c6bd2650 603 ) {
a4e07f72 604 workerUsage.runTime.average =
f1c06930
JB
605 workerUsage.runTime.aggregate /
606 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 607 }
3fa4cdd2 608 if (
932fc8be
JB
609 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
610 .median &&
d715b7bc 611 message.taskPerformance?.runTime != null
3fa4cdd2 612 ) {
a4e07f72
JB
613 workerUsage.runTime.history.push(message.taskPerformance.runTime)
614 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 615 }
3032893a 616 }
f8eb0a2a
JB
617 }
618
a4e07f72
JB
619 private updateWaitTimeWorkerUsage (
620 workerUsage: WorkerUsage,
1c6fe997 621 task: Task<Data>
f8eb0a2a 622 ): void {
1c6fe997
JB
623 const timestamp = performance.now()
624 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
625 if (
626 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 627 .aggregate
87de9ff5 628 ) {
f7510105
JB
629 workerUsage.waitTime.aggregate += taskWaitTime
630 workerUsage.waitTime.minimum = Math.min(
631 taskWaitTime,
23e4439f 632 workerUsage.waitTime.minimum ?? Infinity
f7510105
JB
633 )
634 workerUsage.waitTime.maximum = Math.max(
635 taskWaitTime,
23e4439f 636 workerUsage.waitTime.maximum ?? -Infinity
f7510105 637 )
09a6305f 638 if (
87de9ff5 639 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 640 .waitTime.average &&
a4e07f72 641 workerUsage.tasks.executed !== 0
09a6305f 642 ) {
a4e07f72 643 workerUsage.waitTime.average =
f1c06930
JB
644 workerUsage.waitTime.aggregate /
645 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
646 }
647 if (
87de9ff5 648 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 649 .waitTime.median &&
1c6fe997 650 taskWaitTime != null
09a6305f 651 ) {
1c6fe997 652 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 653 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 654 }
0567595a 655 }
c01733f1 656 }
657
a4e07f72 658 private updateEluWorkerUsage (
5df69fab 659 workerUsage: WorkerUsage,
62c15a68
JB
660 message: MessageValue<Response>
661 ): void {
5df69fab
JB
662 if (
663 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
664 .aggregate
665 ) {
f7510105 666 if (message.taskPerformance?.elu != null) {
9adcefab
JB
667 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
668 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
f7510105
JB
669 if (workerUsage.elu.utilization != null) {
670 workerUsage.elu.utilization =
671 (workerUsage.elu.utilization +
672 message.taskPerformance.elu.utilization) /
673 2
674 } else {
675 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
676 }
677 workerUsage.elu.idle.minimum = Math.min(
678 message.taskPerformance.elu.idle,
23e4439f 679 workerUsage.elu.idle.minimum ?? Infinity
f7510105
JB
680 )
681 workerUsage.elu.idle.maximum = Math.max(
682 message.taskPerformance.elu.idle,
23e4439f 683 workerUsage.elu.idle.maximum ?? -Infinity
f7510105
JB
684 )
685 workerUsage.elu.active.minimum = Math.min(
686 message.taskPerformance.elu.active,
23e4439f 687 workerUsage.elu.active.minimum ?? Infinity
f7510105
JB
688 )
689 workerUsage.elu.active.maximum = Math.max(
690 message.taskPerformance.elu.active,
23e4439f 691 workerUsage.elu.active.maximum ?? -Infinity
f7510105 692 )
5df69fab 693 }
d715b7bc 694 if (
5df69fab
JB
695 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
696 .average &&
697 workerUsage.tasks.executed !== 0
698 ) {
f1c06930
JB
699 const executedTasks =
700 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 701 workerUsage.elu.idle.average =
f1c06930 702 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 703 workerUsage.elu.active.average =
f1c06930 704 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
705 }
706 if (
707 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
708 .median &&
d715b7bc
JB
709 message.taskPerformance?.elu != null
710 ) {
5df69fab
JB
711 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
712 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
713 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
714 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
715 }
716 }
717 }
718
280c2a77 719 /**
f06e48d8 720 * Chooses a worker node for the next task.
280c2a77 721 *
6c6afb84 722 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 723 *
20dcad1a 724 * @returns The worker node key
280c2a77 725 */
6c6afb84 726 private chooseWorkerNode (): number {
930dcf12 727 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
728 const worker = this.createAndSetupDynamicWorker()
729 if (
730 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
731 ) {
732 return this.getWorkerNodeKey(worker)
733 }
17393ac8 734 }
930dcf12
JB
735 return this.workerChoiceStrategyContext.execute()
736 }
737
6c6afb84
JB
738 /**
739 * Conditions for dynamic worker creation.
740 *
741 * @returns Whether to create a dynamic worker or not.
742 */
743 private shallCreateDynamicWorker (): boolean {
930dcf12 744 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
745 }
746
280c2a77 747 /**
675bb809 748 * Sends a message to the given worker.
280c2a77 749 *
38e795c1
JB
750 * @param worker - The worker which should receive the message.
751 * @param message - The message.
280c2a77
S
752 */
753 protected abstract sendToWorker (
754 worker: Worker,
755 message: MessageValue<Data>
756 ): void
757
4a6952ff 758 /**
f06e48d8 759 * Registers a listener callback on the given worker.
4a6952ff 760 *
38e795c1
JB
761 * @param worker - The worker which should register a listener.
762 * @param listener - The message listener callback.
4a6952ff 763 */
e102732c
JB
764 private registerWorkerMessageListener<Message extends Data | Response>(
765 worker: Worker,
766 listener: (message: MessageValue<Message>) => void
767 ): void {
768 worker.on('message', listener as MessageHandler<Worker>)
769 }
c97c7edb 770
729c563d 771 /**
41344292 772 * Creates a new worker.
6c6afb84
JB
773 *
774 * @returns Newly created worker.
729c563d 775 */
280c2a77 776 protected abstract createWorker (): Worker
c97c7edb 777
729c563d 778 /**
f06e48d8 779 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 780 * Can be overridden.
729c563d 781 *
38e795c1 782 * @param worker - The newly created worker.
729c563d 783 */
6677a3d3 784 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
785 // Listen to worker messages.
786 this.registerWorkerMessageListener(worker, this.workerListener())
787 }
c97c7edb 788
4a6952ff 789 /**
f06e48d8 790 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
791 *
792 * @returns New, completely set up worker.
793 */
794 protected createAndSetupWorker (): Worker {
bdacc2d2 795 const worker = this.createWorker()
280c2a77 796
35cf1c03 797 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 798 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
799 worker.on('error', error => {
800 if (this.emitter != null) {
801 this.emitter.emit(PoolEvents.error, error)
802 }
5bc91f3e
JB
803 if (this.opts.enableTasksQueue === true) {
804 const workerNodeKey = this.getWorkerNodeKey(worker)
805 while (this.tasksQueueSize(workerNodeKey) > 0) {
806 let targetWorkerNodeKey: number = workerNodeKey
807 let minQueuedTasks = Infinity
808 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
809 if (
810 workerNodeId !== workerNodeKey &&
811 workerNode.usage.tasks.queued === 0
812 ) {
813 targetWorkerNodeKey = workerNodeId
814 break
815 }
816 if (
817 workerNodeId !== workerNodeKey &&
818 workerNode.usage.tasks.queued < minQueuedTasks
819 ) {
820 minQueuedTasks = workerNode.usage.tasks.queued
821 targetWorkerNodeKey = workerNodeId
822 }
823 }
824 this.enqueueTask(
825 targetWorkerNodeKey,
826 this.dequeueTask(workerNodeKey) as Task<Data>
827 )
828 }
829 }
e8a4c3ea 830 if (this.opts.restartWorkerOnError === true) {
8a1260a3
JB
831 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
832 this.createAndSetupDynamicWorker()
833 } else {
834 this.createAndSetupWorker()
835 }
5baee0d7
JB
836 }
837 })
a35560ba
S
838 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
839 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 840 worker.once('exit', () => {
f06e48d8 841 this.removeWorkerNode(worker)
a974afa6 842 })
280c2a77 843
f06e48d8 844 this.pushWorkerNode(worker)
280c2a77 845
b6b32453
JB
846 this.setWorkerStatistics(worker)
847
280c2a77
S
848 this.afterWorkerSetup(worker)
849
c97c7edb
S
850 return worker
851 }
be0676b3 852
930dcf12
JB
853 /**
854 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
855 *
856 * @returns New, completely set up dynamic worker.
857 */
858 protected createAndSetupDynamicWorker (): Worker {
859 const worker = this.createAndSetupWorker()
8a1260a3 860 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
930dcf12 861 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 862 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
863 if (
864 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
865 (message.kill != null &&
866 ((this.opts.enableTasksQueue === false &&
f59e1027 867 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 868 (this.opts.enableTasksQueue === true &&
f59e1027 869 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 870 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
871 ) {
872 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
873 void (this.destroyWorker(worker) as Promise<void>)
874 }
875 })
876 return worker
877 }
878
be0676b3 879 /**
ff733df7 880 * This function is the listener registered for each worker message.
be0676b3 881 *
bdacc2d2 882 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
883 */
884 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 885 return message => {
f59e1027
JB
886 if (message.workerId != null && message.started != null) {
887 // Worker started message received
6b272951 888 this.handleWorkerStartedMessage(message)
f59e1027 889 } else if (message.id != null) {
a3445496 890 // Task execution response received
6b272951
JB
891 this.handleTaskExecutionResponse(message)
892 }
893 }
894 }
895
896 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
897 // Worker started message received
898 const worker = this.getWorkerById(message.workerId as number)
899 if (worker != null) {
900 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
901 message.started as boolean
902 } else {
903 throw new Error(
904 `Worker started message received from unknown worker '${
905 message.workerId as number
906 }'`
907 )
908 }
909 }
910
911 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
912 const promiseResponse = this.promiseResponseMap.get(message.id as string)
913 if (promiseResponse != null) {
914 if (message.taskError != null) {
915 if (this.emitter != null) {
916 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 917 }
6b272951
JB
918 promiseResponse.reject(message.taskError.message)
919 } else {
920 promiseResponse.resolve(message.data as Response)
921 }
922 this.afterTaskExecutionHook(promiseResponse.worker, message)
923 this.promiseResponseMap.delete(message.id as string)
924 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
925 if (
926 this.opts.enableTasksQueue === true &&
927 this.tasksQueueSize(workerNodeKey) > 0
928 ) {
929 this.executeTask(
930 workerNodeKey,
931 this.dequeueTask(workerNodeKey) as Task<Data>
932 )
be0676b3 933 }
6b272951 934 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 935 }
be0676b3 936 }
7c0ba920 937
ff733df7 938 private checkAndEmitEvents (): void {
1f68cede 939 if (this.emitter != null) {
ff733df7 940 if (this.busy) {
6b27d407 941 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 942 }
6b27d407
JB
943 if (this.type === PoolTypes.dynamic && this.full) {
944 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 945 }
164d950a
JB
946 }
947 }
948
0ebe2a9f
JB
949 /**
950 * Sets the given worker node its tasks usage in the pool.
951 *
952 * @param workerNode - The worker node.
a4e07f72 953 * @param workerUsage - The worker usage.
0ebe2a9f
JB
954 */
955 private setWorkerNodeTasksUsage (
956 workerNode: WorkerNode<Worker, Data>,
a4e07f72 957 workerUsage: WorkerUsage
0ebe2a9f 958 ): void {
f59e1027 959 workerNode.usage = workerUsage
0ebe2a9f
JB
960 }
961
8a1260a3
JB
962 /**
963 * Gets the worker information.
964 *
965 * @param workerNodeKey - The worker node key.
966 */
967 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
968 return this.workerNodes[workerNodeKey].info
969 }
970
a05c10de 971 /**
f06e48d8 972 * Pushes the given worker in the pool worker nodes.
ea7a90d3 973 *
38e795c1 974 * @param worker - The worker.
f06e48d8 975 * @returns The worker nodes length.
ea7a90d3 976 */
f06e48d8 977 private pushWorkerNode (worker: Worker): number {
9c16fb4b 978 this.workerNodes.push({
ffcbbad8 979 worker,
8a1260a3
JB
980 info: this.getInitialWorkerInfo(worker),
981 usage: this.getInitialWorkerUsage(),
29ee7e9a 982 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 983 })
9c16fb4b 984 this.setWorkerNodeTasksUsage(
8a1260a3
JB
985 this.workerNodes[this.getWorkerNodeKey(worker)],
986 this.getInitialWorkerUsage(worker)
9c16fb4b
JB
987 )
988 return this.workerNodes.length
ea7a90d3 989 }
c923ce56 990
83fa0a36
JB
991 /**
992 * Gets the worker id.
993 *
994 * @param worker - The worker.
995 * @returns The worker id.
996 */
997 private getWorkerId (worker: Worker): number | undefined {
998 if (this.worker === WorkerTypes.thread) {
999 return worker.threadId
1000 } else if (this.worker === WorkerTypes.cluster) {
1001 return worker.id
1002 }
1003 }
1004
8604aaab
JB
1005 // /**
1006 // * Sets the given worker in the pool worker nodes.
1007 // *
1008 // * @param workerNodeKey - The worker node key.
1009 // * @param worker - The worker.
f59e1027 1010 // * @param workerInfo - The worker info.
8604aaab
JB
1011 // * @param workerUsage - The worker usage.
1012 // * @param tasksQueue - The worker task queue.
1013 // */
1014 // private setWorkerNode (
1015 // workerNodeKey: number,
1016 // worker: Worker,
f59e1027 1017 // workerInfo: WorkerInfo,
8604aaab
JB
1018 // workerUsage: WorkerUsage,
1019 // tasksQueue: Queue<Task<Data>>
1020 // ): void {
1021 // this.workerNodes[workerNodeKey] = {
1022 // worker,
f59e1027
JB
1023 // info: workerInfo,
1024 // usage: workerUsage,
8604aaab
JB
1025 // tasksQueue
1026 // }
1027 // }
51fe3d3c
JB
1028
1029 /**
f06e48d8 1030 * Removes the given worker from the pool worker nodes.
51fe3d3c 1031 *
f06e48d8 1032 * @param worker - The worker.
51fe3d3c 1033 */
416fd65c 1034 private removeWorkerNode (worker: Worker): void {
f06e48d8 1035 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1036 if (workerNodeKey !== -1) {
1037 this.workerNodes.splice(workerNodeKey, 1)
1038 this.workerChoiceStrategyContext.remove(workerNodeKey)
1039 }
51fe3d3c 1040 }
adc3c320 1041
2e81254d 1042 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1043 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1044 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1045 }
1046
f9f00b5f 1047 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1048 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1049 }
1050
416fd65c 1051 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1052 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1053 }
1054
416fd65c 1055 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1056 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1057 }
ff733df7 1058
df593701
JB
1059 private tasksMaxQueueSize (workerNodeKey: number): number {
1060 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1061 }
1062
416fd65c 1063 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1064 while (this.tasksQueueSize(workerNodeKey) > 0) {
1065 this.executeTask(
1066 workerNodeKey,
1067 this.dequeueTask(workerNodeKey) as Task<Data>
1068 )
ff733df7 1069 }
df593701 1070 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1071 }
1072
ef41a6e6
JB
1073 private flushTasksQueues (): void {
1074 for (const [workerNodeKey] of this.workerNodes.entries()) {
1075 this.flushTasksQueue(workerNodeKey)
1076 }
1077 }
b6b32453
JB
1078
1079 private setWorkerStatistics (worker: Worker): void {
1080 this.sendToWorker(worker, {
1081 statistics: {
87de9ff5
JB
1082 runTime:
1083 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1084 .runTime.aggregate,
87de9ff5 1085 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1086 .elu.aggregate
b6b32453
JB
1087 }
1088 })
1089 }
8604aaab 1090
8a1260a3
JB
1091 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1092 const getTasksQueueSize = (worker?: Worker): number => {
1093 return worker != null
1094 ? this.tasksQueueSize(this.getWorkerNodeKey(worker))
1095 : 0
9c16fb4b 1096 }
8a1260a3
JB
1097 const getTasksMaxQueueSize = (worker?: Worker): number => {
1098 return worker != null
1099 ? this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1100 : 0
df593701 1101 }
8604aaab 1102 return {
9c16fb4b
JB
1103 tasks: {
1104 executed: 0,
1105 executing: 0,
1106 get queued (): number {
8a1260a3 1107 return getTasksQueueSize(worker)
9c16fb4b 1108 },
df593701 1109 get maxQueued (): number {
8a1260a3 1110 return getTasksMaxQueueSize(worker)
df593701 1111 },
9c16fb4b
JB
1112 failed: 0
1113 },
8604aaab 1114 runTime: {
932fc8be 1115 aggregate: 0,
f7510105
JB
1116 maximum: 0,
1117 minimum: 0,
8604aaab
JB
1118 average: 0,
1119 median: 0,
1120 history: new CircularArray()
1121 },
1122 waitTime: {
932fc8be 1123 aggregate: 0,
f7510105
JB
1124 maximum: 0,
1125 minimum: 0,
8604aaab
JB
1126 average: 0,
1127 median: 0,
1128 history: new CircularArray()
1129 },
5df69fab
JB
1130 elu: {
1131 idle: {
1132 aggregate: 0,
f7510105
JB
1133 maximum: 0,
1134 minimum: 0,
5df69fab
JB
1135 average: 0,
1136 median: 0,
1137 history: new CircularArray()
1138 },
1139 active: {
1140 aggregate: 0,
f7510105
JB
1141 maximum: 0,
1142 minimum: 0,
5df69fab
JB
1143 average: 0,
1144 median: 0,
1145 history: new CircularArray()
f7510105 1146 }
5df69fab 1147 }
8604aaab
JB
1148 }
1149 }
8a1260a3
JB
1150
1151 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1152 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1153 }
c97c7edb 1154}