feat: add version to pool information
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
23ccf9d7 3import { readFileSync } from 'node:fs'
2740a743 4import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
5import {
6 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
7 EMPTY_FUNCTION,
59317253 8 isKillBehavior,
0d80593b 9 isPlainObject,
afe0d5bf
JB
10 median,
11 round
bbeadd16 12} from '../utils'
59317253 13import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 14import { CircularArray } from '../circular-array'
29ee7e9a 15import { Queue } from '../queue'
c4855468 16import {
65d7a1c9 17 type IPool,
7c5a1080 18 PoolEmitter,
c4855468 19 PoolEvents,
6b27d407 20 type PoolInfo,
c4855468 21 type PoolOptions,
6b27d407
JB
22 type PoolType,
23 PoolTypes,
184855e6 24 type TasksQueueOptions,
83fa0a36
JB
25 type WorkerType,
26 WorkerTypes
c4855468 27} from './pool'
e102732c
JB
28import type {
29 IWorker,
30 MessageHandler,
31 Task,
32 WorkerNode,
33 WorkerUsage
34} from './worker'
a35560ba 35import {
f0d7f803 36 Measurements,
a35560ba 37 WorkerChoiceStrategies,
a20f0ba5
JB
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
bdaf31cd
JB
40} from './selection-strategies/selection-strategies-types'
41import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 42
23ccf9d7
JB
43const version = (
44 JSON.parse(
45 readFileSync(new URL('../../package.json', import.meta.url), 'utf8')
46 ) as Record<string, unknown>
47).version as string
48
729c563d 49/**
ea7a90d3 50 * Base class that implements some shared logic for all poolifier pools.
729c563d 51 *
38e795c1 52 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
53 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
54 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 55 */
c97c7edb 56export abstract class AbstractPool<
f06e48d8 57 Worker extends IWorker,
d3c8a1a8
S
58 Data = unknown,
59 Response = unknown
c4855468 60> implements IPool<Worker, Data, Response> {
afc003b2 61 /** @inheritDoc */
f06e48d8 62 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 63
afc003b2 64 /** @inheritDoc */
7c0ba920
JB
65 public readonly emitter?: PoolEmitter
66
be0676b3 67 /**
a3445496 68 * The execution response promise map.
be0676b3 69 *
2740a743 70 * - `key`: The message id of each submitted task.
a3445496 71 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 72 *
a3445496 73 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 74 */
c923ce56
JB
75 protected promiseResponseMap: Map<
76 string,
77 PromiseResponseWrapper<Worker, Response>
78 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
afe0d5bf
JB
89 /**
90 * The start timestamp of the pool.
91 */
92 private readonly startTimestamp
93
729c563d
S
94 /**
95 * Constructs a new poolifier pool.
96 *
38e795c1 97 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 98 * @param filePath - Path to the worker file.
38e795c1 99 * @param opts - Options for the pool.
729c563d 100 */
c97c7edb 101 public constructor (
b4213b7f
JB
102 protected readonly numberOfWorkers: number,
103 protected readonly filePath: string,
104 protected readonly opts: PoolOptions<Worker>
c97c7edb 105 ) {
78cea37e 106 if (!this.isMain()) {
c97c7edb
S
107 throw new Error('Cannot start a pool from a worker!')
108 }
8d3782fa 109 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 110 this.checkFilePath(this.filePath)
7c0ba920 111 this.checkPoolOptions(this.opts)
1086026a 112
7254e419
JB
113 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
114 this.executeTask = this.executeTask.bind(this)
115 this.enqueueTask = this.enqueueTask.bind(this)
116 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 117
6bd72cd0 118 if (this.opts.enableEvents === true) {
7c0ba920
JB
119 this.emitter = new PoolEmitter()
120 }
d59df138
JB
121 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
122 Worker,
123 Data,
124 Response
da309861
JB
125 >(
126 this,
127 this.opts.workerChoiceStrategy,
128 this.opts.workerChoiceStrategyOptions
129 )
b6b32453
JB
130
131 this.setupHook()
132
afe0d5bf 133 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
134 this.createAndSetupWorker()
135 }
afe0d5bf
JB
136
137 this.startTimestamp = performance.now()
c97c7edb
S
138 }
139
a35560ba 140 private checkFilePath (filePath: string): void {
ffcbbad8
JB
141 if (
142 filePath == null ||
143 (typeof filePath === 'string' && filePath.trim().length === 0)
144 ) {
c510fea7
APA
145 throw new Error('Please specify a file with a worker implementation')
146 }
147 }
148
8d3782fa
JB
149 private checkNumberOfWorkers (numberOfWorkers: number): void {
150 if (numberOfWorkers == null) {
151 throw new Error(
152 'Cannot instantiate a pool without specifying the number of workers'
153 )
78cea37e 154 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 155 throw new TypeError(
0d80593b 156 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
157 )
158 } else if (numberOfWorkers < 0) {
473c717a 159 throw new RangeError(
8d3782fa
JB
160 'Cannot instantiate a pool with a negative number of workers'
161 )
6b27d407 162 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
163 throw new Error('Cannot instantiate a fixed pool with no worker')
164 }
165 }
166
7c0ba920 167 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
168 if (isPlainObject(opts)) {
169 this.opts.workerChoiceStrategy =
170 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
171 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
172 this.opts.workerChoiceStrategyOptions =
173 opts.workerChoiceStrategyOptions ??
174 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
175 this.checkValidWorkerChoiceStrategyOptions(
176 this.opts.workerChoiceStrategyOptions
177 )
1f68cede 178 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
179 this.opts.enableEvents = opts.enableEvents ?? true
180 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
181 if (this.opts.enableTasksQueue) {
182 this.checkValidTasksQueueOptions(
183 opts.tasksQueueOptions as TasksQueueOptions
184 )
185 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
186 opts.tasksQueueOptions as TasksQueueOptions
187 )
188 }
189 } else {
190 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 191 }
aee46736
JB
192 }
193
194 private checkValidWorkerChoiceStrategy (
195 workerChoiceStrategy: WorkerChoiceStrategy
196 ): void {
197 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 198 throw new Error(
aee46736 199 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
200 )
201 }
7c0ba920
JB
202 }
203
0d80593b
JB
204 private checkValidWorkerChoiceStrategyOptions (
205 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
206 ): void {
207 if (!isPlainObject(workerChoiceStrategyOptions)) {
208 throw new TypeError(
209 'Invalid worker choice strategy options: must be a plain object'
210 )
211 }
49be33fe
JB
212 if (
213 workerChoiceStrategyOptions.weights != null &&
6b27d407 214 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
215 ) {
216 throw new Error(
217 'Invalid worker choice strategy options: must have a weight for each worker node'
218 )
219 }
f0d7f803
JB
220 if (
221 workerChoiceStrategyOptions.measurement != null &&
222 !Object.values(Measurements).includes(
223 workerChoiceStrategyOptions.measurement
224 )
225 ) {
226 throw new Error(
227 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
228 )
229 }
0d80593b
JB
230 }
231
a20f0ba5
JB
232 private checkValidTasksQueueOptions (
233 tasksQueueOptions: TasksQueueOptions
234 ): void {
0d80593b
JB
235 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
236 throw new TypeError('Invalid tasks queue options: must be a plain object')
237 }
f0d7f803
JB
238 if (
239 tasksQueueOptions?.concurrency != null &&
240 !Number.isSafeInteger(tasksQueueOptions.concurrency)
241 ) {
242 throw new TypeError(
243 'Invalid worker tasks concurrency: must be an integer'
244 )
245 }
246 if (
247 tasksQueueOptions?.concurrency != null &&
248 tasksQueueOptions.concurrency <= 0
249 ) {
a20f0ba5 250 throw new Error(
f0d7f803 251 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
252 )
253 }
254 }
255
08f3f44c 256 /** @inheritDoc */
6b27d407
JB
257 public get info (): PoolInfo {
258 return {
23ccf9d7 259 version,
6b27d407 260 type: this.type,
184855e6 261 worker: this.worker,
6b27d407
JB
262 minSize: this.minSize,
263 maxSize: this.maxSize,
c05f0d50
JB
264 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
265 .runTime.aggregate &&
1305e9a8
JB
266 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
267 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
268 workerNodes: this.workerNodes.length,
269 idleWorkerNodes: this.workerNodes.reduce(
270 (accumulator, workerNode) =>
f59e1027 271 workerNode.usage.tasks.executing === 0
a4e07f72
JB
272 ? accumulator + 1
273 : accumulator,
6b27d407
JB
274 0
275 ),
276 busyWorkerNodes: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
f59e1027 278 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
279 0
280 ),
a4e07f72 281 executedTasks: this.workerNodes.reduce(
6b27d407 282 (accumulator, workerNode) =>
f59e1027 283 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
284 0
285 ),
286 executingTasks: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
f59e1027 288 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
289 0
290 ),
291 queuedTasks: this.workerNodes.reduce(
df593701 292 (accumulator, workerNode) =>
f59e1027 293 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
294 0
295 ),
296 maxQueuedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
f59e1027 298 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 299 0
a4e07f72
JB
300 ),
301 failedTasks: this.workerNodes.reduce(
302 (accumulator, workerNode) =>
f59e1027 303 accumulator + workerNode.usage.tasks.failed,
a4e07f72 304 0
6b27d407
JB
305 )
306 }
307 }
08f3f44c 308
afe0d5bf
JB
309 /**
310 * Gets the pool run time.
311 *
312 * @returns The pool run time in milliseconds.
313 */
314 private get runTime (): number {
315 return performance.now() - this.startTimestamp
316 }
317
318 /**
319 * Gets the approximate pool utilization.
320 *
321 * @returns The pool utilization.
322 */
323 private get utilization (): number {
324 const poolRunTimeCapacity = this.runTime * this.maxSize
325 const totalTasksRunTime = this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.runTime.aggregate,
328 0
329 )
330 const totalTasksWaitTime = this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + workerNode.usage.waitTime.aggregate,
333 0
334 )
335 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
336 }
337
8881ae32
JB
338 /**
339 * Pool type.
340 *
341 * If it is `'dynamic'`, it provides the `max` property.
342 */
343 protected abstract get type (): PoolType
344
184855e6
JB
345 /**
346 * Gets the worker type.
347 */
348 protected abstract get worker (): WorkerType
349
c2ade475 350 /**
6b27d407 351 * Pool minimum size.
c2ade475 352 */
6b27d407 353 protected abstract get minSize (): number
ff733df7
JB
354
355 /**
6b27d407 356 * Pool maximum size.
ff733df7 357 */
6b27d407 358 protected abstract get maxSize (): number
a35560ba 359
f59e1027
JB
360 /**
361 * Get the worker given its id.
362 *
363 * @param workerId - The worker id.
364 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
365 */
366 private getWorkerById (workerId: number): Worker | undefined {
367 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
368 ?.worker
369 }
370
ffcbbad8 371 /**
f06e48d8 372 * Gets the given worker its worker node key.
ffcbbad8
JB
373 *
374 * @param worker - The worker.
f59e1027 375 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 376 */
f06e48d8
JB
377 private getWorkerNodeKey (worker: Worker): number {
378 return this.workerNodes.findIndex(
379 workerNode => workerNode.worker === worker
380 )
bf9549ae
JB
381 }
382
afc003b2 383 /** @inheritDoc */
a35560ba 384 public setWorkerChoiceStrategy (
59219cbb
JB
385 workerChoiceStrategy: WorkerChoiceStrategy,
386 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 387 ): void {
aee46736 388 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 389 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
390 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
391 this.opts.workerChoiceStrategy
392 )
393 if (workerChoiceStrategyOptions != null) {
394 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
395 }
9c16fb4b 396 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
397 this.setWorkerNodeTasksUsage(
398 workerNode,
9c16fb4b 399 this.getWorkerUsage(workerNodeKey)
8604aaab 400 )
b6b32453 401 this.setWorkerStatistics(workerNode.worker)
59219cbb 402 }
a20f0ba5
JB
403 }
404
405 /** @inheritDoc */
406 public setWorkerChoiceStrategyOptions (
407 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
408 ): void {
0d80593b 409 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
410 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
411 this.workerChoiceStrategyContext.setOptions(
412 this.opts.workerChoiceStrategyOptions
a35560ba
S
413 )
414 }
415
a20f0ba5 416 /** @inheritDoc */
8f52842f
JB
417 public enableTasksQueue (
418 enable: boolean,
419 tasksQueueOptions?: TasksQueueOptions
420 ): void {
a20f0ba5 421 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 422 this.flushTasksQueues()
a20f0ba5
JB
423 }
424 this.opts.enableTasksQueue = enable
8f52842f 425 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
426 }
427
428 /** @inheritDoc */
8f52842f 429 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 430 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
431 this.checkValidTasksQueueOptions(tasksQueueOptions)
432 this.opts.tasksQueueOptions =
433 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 434 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
435 delete this.opts.tasksQueueOptions
436 }
437 }
438
439 private buildTasksQueueOptions (
440 tasksQueueOptions: TasksQueueOptions
441 ): TasksQueueOptions {
442 return {
443 concurrency: tasksQueueOptions?.concurrency ?? 1
444 }
445 }
446
c319c66b
JB
447 /**
448 * Whether the pool is full or not.
449 *
450 * The pool filling boolean status.
451 */
dea903a8
JB
452 protected get full (): boolean {
453 return this.workerNodes.length >= this.maxSize
454 }
c2ade475 455
c319c66b
JB
456 /**
457 * Whether the pool is busy or not.
458 *
459 * The pool busyness boolean status.
460 */
461 protected abstract get busy (): boolean
7c0ba920 462
6c6afb84
JB
463 /**
464 * Whether worker nodes are executing at least one task.
465 *
466 * @returns Worker nodes busyness boolean status.
467 */
c2ade475 468 protected internalBusy (): boolean {
e0ae6100
JB
469 return (
470 this.workerNodes.findIndex(workerNode => {
f59e1027 471 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
472 }) === -1
473 )
cb70b19d
JB
474 }
475
afc003b2 476 /** @inheritDoc */
a86b6df1 477 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 478 const timestamp = performance.now()
20dcad1a 479 const workerNodeKey = this.chooseWorkerNode()
adc3c320 480 const submittedTask: Task<Data> = {
a86b6df1 481 name,
e5a5c0fc
JB
482 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
483 data: data ?? ({} as Data),
b6b32453 484 timestamp,
adc3c320
JB
485 id: crypto.randomUUID()
486 }
2e81254d 487 const res = new Promise<Response>((resolve, reject) => {
02706357 488 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
489 resolve,
490 reject,
20dcad1a 491 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
492 })
493 })
ff733df7
JB
494 if (
495 this.opts.enableTasksQueue === true &&
7171d33f 496 (this.busy ||
f59e1027 497 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 498 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 499 .concurrency as number))
ff733df7 500 ) {
26a929d7
JB
501 this.enqueueTask(workerNodeKey, submittedTask)
502 } else {
2e81254d 503 this.executeTask(workerNodeKey, submittedTask)
adc3c320 504 }
ff733df7 505 this.checkAndEmitEvents()
78cea37e 506 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
507 return res
508 }
c97c7edb 509
afc003b2 510 /** @inheritDoc */
c97c7edb 511 public async destroy (): Promise<void> {
1fbcaa7c 512 await Promise.all(
875a7c37
JB
513 this.workerNodes.map(async (workerNode, workerNodeKey) => {
514 this.flushTasksQueue(workerNodeKey)
47aacbaa 515 // FIXME: wait for tasks to be finished
920278a2
JB
516 const workerExitPromise = new Promise<void>(resolve => {
517 workerNode.worker.on('exit', () => {
518 resolve()
519 })
520 })
f06e48d8 521 await this.destroyWorker(workerNode.worker)
920278a2 522 await workerExitPromise
1fbcaa7c
JB
523 })
524 )
c97c7edb
S
525 }
526
4a6952ff 527 /**
6c6afb84 528 * Terminates the given worker.
4a6952ff 529 *
f06e48d8 530 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
531 */
532 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 533
729c563d 534 /**
6677a3d3
JB
535 * Setup hook to execute code before worker nodes are created in the abstract constructor.
536 * Can be overridden.
afc003b2
JB
537 *
538 * @virtual
729c563d 539 */
280c2a77 540 protected setupHook (): void {
d99ba5a8 541 // Intentionally empty
280c2a77 542 }
c97c7edb 543
729c563d 544 /**
280c2a77
S
545 * Should return whether the worker is the main worker or not.
546 */
547 protected abstract isMain (): boolean
548
549 /**
2e81254d 550 * Hook executed before the worker task execution.
bf9549ae 551 * Can be overridden.
729c563d 552 *
f06e48d8 553 * @param workerNodeKey - The worker node key.
1c6fe997 554 * @param task - The task to execute.
729c563d 555 */
1c6fe997
JB
556 protected beforeTaskExecutionHook (
557 workerNodeKey: number,
558 task: Task<Data>
559 ): void {
f59e1027 560 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
561 ++workerUsage.tasks.executing
562 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
563 }
564
c01733f1 565 /**
2e81254d 566 * Hook executed after the worker task execution.
bf9549ae 567 * Can be overridden.
c01733f1 568 *
c923ce56 569 * @param worker - The worker.
38e795c1 570 * @param message - The received message.
c01733f1 571 */
2e81254d 572 protected afterTaskExecutionHook (
c923ce56 573 worker: Worker,
2740a743 574 message: MessageValue<Response>
bf9549ae 575 ): void {
f59e1027 576 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
577 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
578 this.updateRunTimeWorkerUsage(workerUsage, message)
579 this.updateEluWorkerUsage(workerUsage, message)
580 }
581
582 private updateTaskStatisticsWorkerUsage (
583 workerUsage: WorkerUsage,
584 message: MessageValue<Response>
585 ): void {
a4e07f72
JB
586 const workerTaskStatistics = workerUsage.tasks
587 --workerTaskStatistics.executing
588 ++workerTaskStatistics.executed
82f36766 589 if (message.taskError != null) {
a4e07f72 590 ++workerTaskStatistics.failed
2740a743 591 }
f8eb0a2a
JB
592 }
593
a4e07f72
JB
594 private updateRunTimeWorkerUsage (
595 workerUsage: WorkerUsage,
f8eb0a2a
JB
596 message: MessageValue<Response>
597 ): void {
87de9ff5
JB
598 if (
599 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 600 .aggregate
87de9ff5 601 ) {
932fc8be 602 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 603 if (
932fc8be
JB
604 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
605 .average &&
a4e07f72 606 workerUsage.tasks.executed !== 0
c6bd2650 607 ) {
a4e07f72 608 workerUsage.runTime.average =
f1c06930
JB
609 workerUsage.runTime.aggregate /
610 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 611 }
3fa4cdd2 612 if (
932fc8be
JB
613 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
614 .median &&
d715b7bc 615 message.taskPerformance?.runTime != null
3fa4cdd2 616 ) {
a4e07f72
JB
617 workerUsage.runTime.history.push(message.taskPerformance.runTime)
618 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 619 }
3032893a 620 }
f8eb0a2a
JB
621 }
622
a4e07f72
JB
623 private updateWaitTimeWorkerUsage (
624 workerUsage: WorkerUsage,
1c6fe997 625 task: Task<Data>
f8eb0a2a 626 ): void {
1c6fe997
JB
627 const timestamp = performance.now()
628 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
629 if (
630 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 631 .aggregate
87de9ff5 632 ) {
932fc8be 633 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 634 if (
87de9ff5 635 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 636 .waitTime.average &&
a4e07f72 637 workerUsage.tasks.executed !== 0
09a6305f 638 ) {
a4e07f72 639 workerUsage.waitTime.average =
f1c06930
JB
640 workerUsage.waitTime.aggregate /
641 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
642 }
643 if (
87de9ff5 644 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 645 .waitTime.median &&
1c6fe997 646 taskWaitTime != null
09a6305f 647 ) {
1c6fe997 648 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 649 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 650 }
0567595a 651 }
c01733f1 652 }
653
a4e07f72 654 private updateEluWorkerUsage (
5df69fab 655 workerUsage: WorkerUsage,
62c15a68
JB
656 message: MessageValue<Response>
657 ): void {
5df69fab
JB
658 if (
659 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
660 .aggregate
661 ) {
662 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
663 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
664 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
665 workerUsage.elu.utilization =
666 (workerUsage.elu.utilization +
667 message.taskPerformance.elu.utilization) /
668 2
669 } else if (message.taskPerformance?.elu != null) {
670 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
671 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
672 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
673 }
d715b7bc 674 if (
5df69fab
JB
675 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
676 .average &&
677 workerUsage.tasks.executed !== 0
678 ) {
f1c06930
JB
679 const executedTasks =
680 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 681 workerUsage.elu.idle.average =
f1c06930 682 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 683 workerUsage.elu.active.average =
f1c06930 684 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
685 }
686 if (
687 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
688 .median &&
d715b7bc
JB
689 message.taskPerformance?.elu != null
690 ) {
5df69fab
JB
691 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
692 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
693 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
694 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
695 }
696 }
697 }
698
280c2a77 699 /**
f06e48d8 700 * Chooses a worker node for the next task.
280c2a77 701 *
6c6afb84 702 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 703 *
20dcad1a 704 * @returns The worker node key
280c2a77 705 */
6c6afb84 706 private chooseWorkerNode (): number {
930dcf12 707 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
708 const worker = this.createAndSetupDynamicWorker()
709 if (
710 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
711 ) {
712 return this.getWorkerNodeKey(worker)
713 }
17393ac8 714 }
930dcf12
JB
715 return this.workerChoiceStrategyContext.execute()
716 }
717
6c6afb84
JB
718 /**
719 * Conditions for dynamic worker creation.
720 *
721 * @returns Whether to create a dynamic worker or not.
722 */
723 private shallCreateDynamicWorker (): boolean {
930dcf12 724 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
725 }
726
280c2a77 727 /**
675bb809 728 * Sends a message to the given worker.
280c2a77 729 *
38e795c1
JB
730 * @param worker - The worker which should receive the message.
731 * @param message - The message.
280c2a77
S
732 */
733 protected abstract sendToWorker (
734 worker: Worker,
735 message: MessageValue<Data>
736 ): void
737
4a6952ff 738 /**
f06e48d8 739 * Registers a listener callback on the given worker.
4a6952ff 740 *
38e795c1
JB
741 * @param worker - The worker which should register a listener.
742 * @param listener - The message listener callback.
4a6952ff 743 */
e102732c
JB
744 private registerWorkerMessageListener<Message extends Data | Response>(
745 worker: Worker,
746 listener: (message: MessageValue<Message>) => void
747 ): void {
748 worker.on('message', listener as MessageHandler<Worker>)
749 }
c97c7edb 750
729c563d 751 /**
41344292 752 * Creates a new worker.
6c6afb84
JB
753 *
754 * @returns Newly created worker.
729c563d 755 */
280c2a77 756 protected abstract createWorker (): Worker
c97c7edb 757
729c563d 758 /**
f06e48d8 759 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 760 * Can be overridden.
729c563d 761 *
38e795c1 762 * @param worker - The newly created worker.
729c563d 763 */
6677a3d3 764 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
765 // Listen to worker messages.
766 this.registerWorkerMessageListener(worker, this.workerListener())
767 }
c97c7edb 768
4a6952ff 769 /**
f06e48d8 770 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
771 *
772 * @returns New, completely set up worker.
773 */
774 protected createAndSetupWorker (): Worker {
bdacc2d2 775 const worker = this.createWorker()
280c2a77 776
35cf1c03 777 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 778 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
779 worker.on('error', error => {
780 if (this.emitter != null) {
781 this.emitter.emit(PoolEvents.error, error)
782 }
5bc91f3e
JB
783 if (this.opts.enableTasksQueue === true) {
784 const workerNodeKey = this.getWorkerNodeKey(worker)
785 while (this.tasksQueueSize(workerNodeKey) > 0) {
786 let targetWorkerNodeKey: number = workerNodeKey
787 let minQueuedTasks = Infinity
788 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
789 if (
790 workerNodeId !== workerNodeKey &&
791 workerNode.usage.tasks.queued === 0
792 ) {
793 targetWorkerNodeKey = workerNodeId
794 break
795 }
796 if (
797 workerNodeId !== workerNodeKey &&
798 workerNode.usage.tasks.queued < minQueuedTasks
799 ) {
800 minQueuedTasks = workerNode.usage.tasks.queued
801 targetWorkerNodeKey = workerNodeId
802 }
803 }
804 this.enqueueTask(
805 targetWorkerNodeKey,
806 this.dequeueTask(workerNodeKey) as Task<Data>
807 )
808 }
809 }
e8a4c3ea 810 if (this.opts.restartWorkerOnError === true) {
1f68cede 811 this.createAndSetupWorker()
5baee0d7
JB
812 }
813 })
a35560ba
S
814 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
815 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 816 worker.once('exit', () => {
f06e48d8 817 this.removeWorkerNode(worker)
a974afa6 818 })
280c2a77 819
f06e48d8 820 this.pushWorkerNode(worker)
280c2a77 821
b6b32453
JB
822 this.setWorkerStatistics(worker)
823
280c2a77
S
824 this.afterWorkerSetup(worker)
825
c97c7edb
S
826 return worker
827 }
be0676b3 828
930dcf12
JB
829 /**
830 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
831 *
832 * @returns New, completely set up dynamic worker.
833 */
834 protected createAndSetupDynamicWorker (): Worker {
835 const worker = this.createAndSetupWorker()
836 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 837 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
838 if (
839 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
840 (message.kill != null &&
841 ((this.opts.enableTasksQueue === false &&
f59e1027 842 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 843 (this.opts.enableTasksQueue === true &&
f59e1027 844 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 845 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
846 ) {
847 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
848 void (this.destroyWorker(worker) as Promise<void>)
849 }
850 })
851 return worker
852 }
853
be0676b3 854 /**
ff733df7 855 * This function is the listener registered for each worker message.
be0676b3 856 *
bdacc2d2 857 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
858 */
859 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 860 return message => {
f59e1027
JB
861 if (message.workerId != null && message.started != null) {
862 // Worker started message received
6b272951 863 this.handleWorkerStartedMessage(message)
f59e1027 864 } else if (message.id != null) {
a3445496 865 // Task execution response received
6b272951
JB
866 this.handleTaskExecutionResponse(message)
867 }
868 }
869 }
870
871 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
872 // Worker started message received
873 const worker = this.getWorkerById(message.workerId as number)
874 if (worker != null) {
875 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
876 message.started as boolean
877 } else {
878 throw new Error(
879 `Worker started message received from unknown worker '${
880 message.workerId as number
881 }'`
882 )
883 }
884 }
885
886 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
887 const promiseResponse = this.promiseResponseMap.get(message.id as string)
888 if (promiseResponse != null) {
889 if (message.taskError != null) {
890 if (this.emitter != null) {
891 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 892 }
6b272951
JB
893 promiseResponse.reject(message.taskError.message)
894 } else {
895 promiseResponse.resolve(message.data as Response)
896 }
897 this.afterTaskExecutionHook(promiseResponse.worker, message)
898 this.promiseResponseMap.delete(message.id as string)
899 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
900 if (
901 this.opts.enableTasksQueue === true &&
902 this.tasksQueueSize(workerNodeKey) > 0
903 ) {
904 this.executeTask(
905 workerNodeKey,
906 this.dequeueTask(workerNodeKey) as Task<Data>
907 )
be0676b3 908 }
6b272951 909 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 910 }
be0676b3 911 }
7c0ba920 912
ff733df7 913 private checkAndEmitEvents (): void {
1f68cede 914 if (this.emitter != null) {
ff733df7 915 if (this.busy) {
6b27d407 916 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 917 }
6b27d407
JB
918 if (this.type === PoolTypes.dynamic && this.full) {
919 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 920 }
164d950a
JB
921 }
922 }
923
0ebe2a9f
JB
924 /**
925 * Sets the given worker node its tasks usage in the pool.
926 *
927 * @param workerNode - The worker node.
a4e07f72 928 * @param workerUsage - The worker usage.
0ebe2a9f
JB
929 */
930 private setWorkerNodeTasksUsage (
931 workerNode: WorkerNode<Worker, Data>,
a4e07f72 932 workerUsage: WorkerUsage
0ebe2a9f 933 ): void {
f59e1027 934 workerNode.usage = workerUsage
0ebe2a9f
JB
935 }
936
a05c10de 937 /**
f06e48d8 938 * Pushes the given worker in the pool worker nodes.
ea7a90d3 939 *
38e795c1 940 * @param worker - The worker.
f06e48d8 941 * @returns The worker nodes length.
ea7a90d3 942 */
f06e48d8 943 private pushWorkerNode (worker: Worker): number {
9c16fb4b 944 this.workerNodes.push({
ffcbbad8 945 worker,
7ae6fb74 946 info: { id: this.getWorkerId(worker), started: true },
f59e1027 947 usage: this.getWorkerUsage(),
29ee7e9a 948 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 949 })
9c16fb4b
JB
950 const workerNodeKey = this.getWorkerNodeKey(worker)
951 this.setWorkerNodeTasksUsage(
952 this.workerNodes[workerNodeKey],
953 this.getWorkerUsage(workerNodeKey)
954 )
955 return this.workerNodes.length
ea7a90d3 956 }
c923ce56 957
83fa0a36
JB
958 /**
959 * Gets the worker id.
960 *
961 * @param worker - The worker.
962 * @returns The worker id.
963 */
964 private getWorkerId (worker: Worker): number | undefined {
965 if (this.worker === WorkerTypes.thread) {
966 return worker.threadId
967 } else if (this.worker === WorkerTypes.cluster) {
968 return worker.id
969 }
970 }
971
8604aaab
JB
972 // /**
973 // * Sets the given worker in the pool worker nodes.
974 // *
975 // * @param workerNodeKey - The worker node key.
976 // * @param worker - The worker.
f59e1027 977 // * @param workerInfo - The worker info.
8604aaab
JB
978 // * @param workerUsage - The worker usage.
979 // * @param tasksQueue - The worker task queue.
980 // */
981 // private setWorkerNode (
982 // workerNodeKey: number,
983 // worker: Worker,
f59e1027 984 // workerInfo: WorkerInfo,
8604aaab
JB
985 // workerUsage: WorkerUsage,
986 // tasksQueue: Queue<Task<Data>>
987 // ): void {
988 // this.workerNodes[workerNodeKey] = {
989 // worker,
f59e1027
JB
990 // info: workerInfo,
991 // usage: workerUsage,
8604aaab
JB
992 // tasksQueue
993 // }
994 // }
51fe3d3c
JB
995
996 /**
f06e48d8 997 * Removes the given worker from the pool worker nodes.
51fe3d3c 998 *
f06e48d8 999 * @param worker - The worker.
51fe3d3c 1000 */
416fd65c 1001 private removeWorkerNode (worker: Worker): void {
f06e48d8 1002 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1003 if (workerNodeKey !== -1) {
1004 this.workerNodes.splice(workerNodeKey, 1)
1005 this.workerChoiceStrategyContext.remove(workerNodeKey)
1006 }
51fe3d3c 1007 }
adc3c320 1008
2e81254d 1009 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1010 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1011 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1012 }
1013
f9f00b5f 1014 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1015 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1016 }
1017
416fd65c 1018 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1019 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1020 }
1021
416fd65c 1022 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1023 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1024 }
ff733df7 1025
df593701
JB
1026 private tasksMaxQueueSize (workerNodeKey: number): number {
1027 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1028 }
1029
416fd65c 1030 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1031 while (this.tasksQueueSize(workerNodeKey) > 0) {
1032 this.executeTask(
1033 workerNodeKey,
1034 this.dequeueTask(workerNodeKey) as Task<Data>
1035 )
ff733df7 1036 }
df593701 1037 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1038 }
1039
ef41a6e6
JB
1040 private flushTasksQueues (): void {
1041 for (const [workerNodeKey] of this.workerNodes.entries()) {
1042 this.flushTasksQueue(workerNodeKey)
1043 }
1044 }
b6b32453
JB
1045
1046 private setWorkerStatistics (worker: Worker): void {
1047 this.sendToWorker(worker, {
1048 statistics: {
87de9ff5
JB
1049 runTime:
1050 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1051 .runTime.aggregate,
87de9ff5 1052 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1053 .elu.aggregate
b6b32453
JB
1054 }
1055 })
1056 }
8604aaab 1057
9c16fb4b 1058 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1059 const getTasksQueueSize = (workerNodeKey?: number): number => {
1060 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1061 }
df593701
JB
1062 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1063 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1064 }
8604aaab 1065 return {
9c16fb4b
JB
1066 tasks: {
1067 executed: 0,
1068 executing: 0,
1069 get queued (): number {
e3347a5c 1070 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1071 },
df593701
JB
1072 get maxQueued (): number {
1073 return getTasksMaxQueueSize(workerNodeKey)
1074 },
9c16fb4b
JB
1075 failed: 0
1076 },
8604aaab 1077 runTime: {
932fc8be 1078 aggregate: 0,
8604aaab
JB
1079 average: 0,
1080 median: 0,
1081 history: new CircularArray()
1082 },
1083 waitTime: {
932fc8be 1084 aggregate: 0,
8604aaab
JB
1085 average: 0,
1086 median: 0,
1087 history: new CircularArray()
1088 },
5df69fab
JB
1089 elu: {
1090 idle: {
1091 aggregate: 0,
1092 average: 0,
1093 median: 0,
1094 history: new CircularArray()
1095 },
1096 active: {
1097 aggregate: 0,
1098 average: 0,
1099 median: 0,
1100 history: new CircularArray()
1101 },
1102 utilization: 0
1103 }
8604aaab
JB
1104 }
1105 }
c97c7edb 1106}