Revert "fix: use version from package.json"
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
a35560ba 34import {
f0d7f803 35 Measurements,
a35560ba 36 WorkerChoiceStrategies,
a20f0ba5
JB
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
bdaf31cd
JB
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 41import { version } from './version'
23ccf9d7 42
729c563d 43/**
ea7a90d3 44 * Base class that implements some shared logic for all poolifier pools.
729c563d 45 *
38e795c1 46 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 49 */
c97c7edb 50export abstract class AbstractPool<
f06e48d8 51 Worker extends IWorker,
d3c8a1a8
S
52 Data = unknown,
53 Response = unknown
c4855468 54> implements IPool<Worker, Data, Response> {
afc003b2 55 /** @inheritDoc */
f06e48d8 56 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 57
afc003b2 58 /** @inheritDoc */
7c0ba920
JB
59 public readonly emitter?: PoolEmitter
60
be0676b3 61 /**
a3445496 62 * The execution response promise map.
be0676b3 63 *
2740a743 64 * - `key`: The message id of each submitted task.
a3445496 65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 66 *
a3445496 67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 68 */
c923ce56
JB
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 73
a35560ba 74 /**
51fe3d3c 75 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
78 Worker,
79 Data,
80 Response
a35560ba
S
81 >
82
afe0d5bf
JB
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
729c563d
S
88 /**
89 * Constructs a new poolifier pool.
90 *
38e795c1 91 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 92 * @param filePath - Path to the worker file.
38e795c1 93 * @param opts - Options for the pool.
729c563d 94 */
c97c7edb 95 public constructor (
b4213b7f
JB
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
c97c7edb 99 ) {
78cea37e 100 if (!this.isMain()) {
c97c7edb
S
101 throw new Error('Cannot start a pool from a worker!')
102 }
8d3782fa 103 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 104 this.checkFilePath(this.filePath)
7c0ba920 105 this.checkPoolOptions(this.opts)
1086026a 106
7254e419
JB
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 111
6bd72cd0 112 if (this.opts.enableEvents === true) {
7c0ba920
JB
113 this.emitter = new PoolEmitter()
114 }
d59df138
JB
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
da309861
JB
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
b6b32453
JB
124
125 this.setupHook()
126
afe0d5bf 127 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
128 this.createAndSetupWorker()
129 }
afe0d5bf
JB
130
131 this.startTimestamp = performance.now()
c97c7edb
S
132 }
133
a35560ba 134 private checkFilePath (filePath: string): void {
ffcbbad8
JB
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
c510fea7
APA
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
8d3782fa
JB
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
78cea37e 148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 149 throw new TypeError(
0d80593b 150 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
151 )
152 } else if (numberOfWorkers < 0) {
473c717a 153 throw new RangeError(
8d3782fa
JB
154 'Cannot instantiate a pool with a negative number of workers'
155 )
6b27d407 156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
157 throw new Error('Cannot instantiate a fixed pool with no worker')
158 }
159 }
160
7c0ba920 161 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
162 if (isPlainObject(opts)) {
163 this.opts.workerChoiceStrategy =
164 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
165 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
166 this.opts.workerChoiceStrategyOptions =
167 opts.workerChoiceStrategyOptions ??
168 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
169 this.checkValidWorkerChoiceStrategyOptions(
170 this.opts.workerChoiceStrategyOptions
171 )
1f68cede 172 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
173 this.opts.enableEvents = opts.enableEvents ?? true
174 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
175 if (this.opts.enableTasksQueue) {
176 this.checkValidTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
180 opts.tasksQueueOptions as TasksQueueOptions
181 )
182 }
183 } else {
184 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 185 }
aee46736
JB
186 }
187
188 private checkValidWorkerChoiceStrategy (
189 workerChoiceStrategy: WorkerChoiceStrategy
190 ): void {
191 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 192 throw new Error(
aee46736 193 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
194 )
195 }
7c0ba920
JB
196 }
197
0d80593b
JB
198 private checkValidWorkerChoiceStrategyOptions (
199 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
200 ): void {
201 if (!isPlainObject(workerChoiceStrategyOptions)) {
202 throw new TypeError(
203 'Invalid worker choice strategy options: must be a plain object'
204 )
205 }
49be33fe
JB
206 if (
207 workerChoiceStrategyOptions.weights != null &&
6b27d407 208 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
209 ) {
210 throw new Error(
211 'Invalid worker choice strategy options: must have a weight for each worker node'
212 )
213 }
f0d7f803
JB
214 if (
215 workerChoiceStrategyOptions.measurement != null &&
216 !Object.values(Measurements).includes(
217 workerChoiceStrategyOptions.measurement
218 )
219 ) {
220 throw new Error(
221 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
222 )
223 }
0d80593b
JB
224 }
225
a20f0ba5
JB
226 private checkValidTasksQueueOptions (
227 tasksQueueOptions: TasksQueueOptions
228 ): void {
0d80593b
JB
229 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
230 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 }
f0d7f803
JB
232 if (
233 tasksQueueOptions?.concurrency != null &&
234 !Number.isSafeInteger(tasksQueueOptions.concurrency)
235 ) {
236 throw new TypeError(
237 'Invalid worker tasks concurrency: must be an integer'
238 )
239 }
240 if (
241 tasksQueueOptions?.concurrency != null &&
242 tasksQueueOptions.concurrency <= 0
243 ) {
a20f0ba5 244 throw new Error(
f0d7f803 245 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
246 )
247 }
248 }
249
08f3f44c 250 /** @inheritDoc */
6b27d407
JB
251 public get info (): PoolInfo {
252 return {
23ccf9d7 253 version,
6b27d407 254 type: this.type,
184855e6 255 worker: this.worker,
6b27d407
JB
256 minSize: this.minSize,
257 maxSize: this.maxSize,
c05f0d50
JB
258 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .runTime.aggregate &&
1305e9a8
JB
260 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
261 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
262 workerNodes: this.workerNodes.length,
263 idleWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
f59e1027 265 workerNode.usage.tasks.executing === 0
a4e07f72
JB
266 ? accumulator + 1
267 : accumulator,
6b27d407
JB
268 0
269 ),
270 busyWorkerNodes: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
f59e1027 272 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
273 0
274 ),
a4e07f72 275 executedTasks: this.workerNodes.reduce(
6b27d407 276 (accumulator, workerNode) =>
f59e1027 277 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
278 0
279 ),
280 executingTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
f59e1027 282 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
283 0
284 ),
285 queuedTasks: this.workerNodes.reduce(
df593701 286 (accumulator, workerNode) =>
f59e1027 287 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
288 0
289 ),
290 maxQueuedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
f59e1027 292 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 293 0
a4e07f72
JB
294 ),
295 failedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
f59e1027 297 accumulator + workerNode.usage.tasks.failed,
a4e07f72 298 0
6b27d407
JB
299 )
300 }
301 }
08f3f44c 302
afe0d5bf
JB
303 /**
304 * Gets the pool run time.
305 *
306 * @returns The pool run time in milliseconds.
307 */
308 private get runTime (): number {
309 return performance.now() - this.startTimestamp
310 }
311
312 /**
313 * Gets the approximate pool utilization.
314 *
315 * @returns The pool utilization.
316 */
317 private get utilization (): number {
318 const poolRunTimeCapacity = this.runTime * this.maxSize
319 const totalTasksRunTime = this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.runTime.aggregate,
322 0
323 )
324 const totalTasksWaitTime = this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.waitTime.aggregate,
327 0
328 )
329 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
330 }
331
8881ae32
JB
332 /**
333 * Pool type.
334 *
335 * If it is `'dynamic'`, it provides the `max` property.
336 */
337 protected abstract get type (): PoolType
338
184855e6
JB
339 /**
340 * Gets the worker type.
341 */
342 protected abstract get worker (): WorkerType
343
c2ade475 344 /**
6b27d407 345 * Pool minimum size.
c2ade475 346 */
6b27d407 347 protected abstract get minSize (): number
ff733df7
JB
348
349 /**
6b27d407 350 * Pool maximum size.
ff733df7 351 */
6b27d407 352 protected abstract get maxSize (): number
a35560ba 353
f59e1027
JB
354 /**
355 * Get the worker given its id.
356 *
357 * @param workerId - The worker id.
358 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
359 */
360 private getWorkerById (workerId: number): Worker | undefined {
361 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
362 ?.worker
363 }
364
ffcbbad8 365 /**
f06e48d8 366 * Gets the given worker its worker node key.
ffcbbad8
JB
367 *
368 * @param worker - The worker.
f59e1027 369 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 370 */
f06e48d8
JB
371 private getWorkerNodeKey (worker: Worker): number {
372 return this.workerNodes.findIndex(
373 workerNode => workerNode.worker === worker
374 )
bf9549ae
JB
375 }
376
afc003b2 377 /** @inheritDoc */
a35560ba 378 public setWorkerChoiceStrategy (
59219cbb
JB
379 workerChoiceStrategy: WorkerChoiceStrategy,
380 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 381 ): void {
aee46736 382 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 383 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
384 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
385 this.opts.workerChoiceStrategy
386 )
387 if (workerChoiceStrategyOptions != null) {
388 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
389 }
9c16fb4b 390 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
391 this.setWorkerNodeTasksUsage(
392 workerNode,
9c16fb4b 393 this.getWorkerUsage(workerNodeKey)
8604aaab 394 )
b6b32453 395 this.setWorkerStatistics(workerNode.worker)
59219cbb 396 }
a20f0ba5
JB
397 }
398
399 /** @inheritDoc */
400 public setWorkerChoiceStrategyOptions (
401 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
402 ): void {
0d80593b 403 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
404 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
405 this.workerChoiceStrategyContext.setOptions(
406 this.opts.workerChoiceStrategyOptions
a35560ba
S
407 )
408 }
409
a20f0ba5 410 /** @inheritDoc */
8f52842f
JB
411 public enableTasksQueue (
412 enable: boolean,
413 tasksQueueOptions?: TasksQueueOptions
414 ): void {
a20f0ba5 415 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 416 this.flushTasksQueues()
a20f0ba5
JB
417 }
418 this.opts.enableTasksQueue = enable
8f52842f 419 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
420 }
421
422 /** @inheritDoc */
8f52842f 423 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 424 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
425 this.checkValidTasksQueueOptions(tasksQueueOptions)
426 this.opts.tasksQueueOptions =
427 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 428 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
429 delete this.opts.tasksQueueOptions
430 }
431 }
432
433 private buildTasksQueueOptions (
434 tasksQueueOptions: TasksQueueOptions
435 ): TasksQueueOptions {
436 return {
437 concurrency: tasksQueueOptions?.concurrency ?? 1
438 }
439 }
440
c319c66b
JB
441 /**
442 * Whether the pool is full or not.
443 *
444 * The pool filling boolean status.
445 */
dea903a8
JB
446 protected get full (): boolean {
447 return this.workerNodes.length >= this.maxSize
448 }
c2ade475 449
c319c66b
JB
450 /**
451 * Whether the pool is busy or not.
452 *
453 * The pool busyness boolean status.
454 */
455 protected abstract get busy (): boolean
7c0ba920 456
6c6afb84
JB
457 /**
458 * Whether worker nodes are executing at least one task.
459 *
460 * @returns Worker nodes busyness boolean status.
461 */
c2ade475 462 protected internalBusy (): boolean {
e0ae6100
JB
463 return (
464 this.workerNodes.findIndex(workerNode => {
f59e1027 465 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
466 }) === -1
467 )
cb70b19d
JB
468 }
469
afc003b2 470 /** @inheritDoc */
a86b6df1 471 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 472 const timestamp = performance.now()
20dcad1a 473 const workerNodeKey = this.chooseWorkerNode()
adc3c320 474 const submittedTask: Task<Data> = {
a86b6df1 475 name,
e5a5c0fc
JB
476 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
477 data: data ?? ({} as Data),
b6b32453 478 timestamp,
adc3c320
JB
479 id: crypto.randomUUID()
480 }
2e81254d 481 const res = new Promise<Response>((resolve, reject) => {
02706357 482 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
483 resolve,
484 reject,
20dcad1a 485 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
486 })
487 })
ff733df7
JB
488 if (
489 this.opts.enableTasksQueue === true &&
7171d33f 490 (this.busy ||
f59e1027 491 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 492 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 493 .concurrency as number))
ff733df7 494 ) {
26a929d7
JB
495 this.enqueueTask(workerNodeKey, submittedTask)
496 } else {
2e81254d 497 this.executeTask(workerNodeKey, submittedTask)
adc3c320 498 }
ff733df7 499 this.checkAndEmitEvents()
78cea37e 500 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
501 return res
502 }
c97c7edb 503
afc003b2 504 /** @inheritDoc */
c97c7edb 505 public async destroy (): Promise<void> {
1fbcaa7c 506 await Promise.all(
875a7c37
JB
507 this.workerNodes.map(async (workerNode, workerNodeKey) => {
508 this.flushTasksQueue(workerNodeKey)
47aacbaa 509 // FIXME: wait for tasks to be finished
920278a2
JB
510 const workerExitPromise = new Promise<void>(resolve => {
511 workerNode.worker.on('exit', () => {
512 resolve()
513 })
514 })
f06e48d8 515 await this.destroyWorker(workerNode.worker)
920278a2 516 await workerExitPromise
1fbcaa7c
JB
517 })
518 )
c97c7edb
S
519 }
520
4a6952ff 521 /**
6c6afb84 522 * Terminates the given worker.
4a6952ff 523 *
f06e48d8 524 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
525 */
526 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 527
729c563d 528 /**
6677a3d3
JB
529 * Setup hook to execute code before worker nodes are created in the abstract constructor.
530 * Can be overridden.
afc003b2
JB
531 *
532 * @virtual
729c563d 533 */
280c2a77 534 protected setupHook (): void {
d99ba5a8 535 // Intentionally empty
280c2a77 536 }
c97c7edb 537
729c563d 538 /**
280c2a77
S
539 * Should return whether the worker is the main worker or not.
540 */
541 protected abstract isMain (): boolean
542
543 /**
2e81254d 544 * Hook executed before the worker task execution.
bf9549ae 545 * Can be overridden.
729c563d 546 *
f06e48d8 547 * @param workerNodeKey - The worker node key.
1c6fe997 548 * @param task - The task to execute.
729c563d 549 */
1c6fe997
JB
550 protected beforeTaskExecutionHook (
551 workerNodeKey: number,
552 task: Task<Data>
553 ): void {
f59e1027 554 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
555 ++workerUsage.tasks.executing
556 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
557 }
558
c01733f1 559 /**
2e81254d 560 * Hook executed after the worker task execution.
bf9549ae 561 * Can be overridden.
c01733f1 562 *
c923ce56 563 * @param worker - The worker.
38e795c1 564 * @param message - The received message.
c01733f1 565 */
2e81254d 566 protected afterTaskExecutionHook (
c923ce56 567 worker: Worker,
2740a743 568 message: MessageValue<Response>
bf9549ae 569 ): void {
f59e1027 570 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
571 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
572 this.updateRunTimeWorkerUsage(workerUsage, message)
573 this.updateEluWorkerUsage(workerUsage, message)
574 }
575
576 private updateTaskStatisticsWorkerUsage (
577 workerUsage: WorkerUsage,
578 message: MessageValue<Response>
579 ): void {
a4e07f72
JB
580 const workerTaskStatistics = workerUsage.tasks
581 --workerTaskStatistics.executing
582 ++workerTaskStatistics.executed
82f36766 583 if (message.taskError != null) {
a4e07f72 584 ++workerTaskStatistics.failed
2740a743 585 }
f8eb0a2a
JB
586 }
587
a4e07f72
JB
588 private updateRunTimeWorkerUsage (
589 workerUsage: WorkerUsage,
f8eb0a2a
JB
590 message: MessageValue<Response>
591 ): void {
87de9ff5
JB
592 if (
593 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 594 .aggregate
87de9ff5 595 ) {
932fc8be 596 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 597 if (
932fc8be
JB
598 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
599 .average &&
a4e07f72 600 workerUsage.tasks.executed !== 0
c6bd2650 601 ) {
a4e07f72 602 workerUsage.runTime.average =
f1c06930
JB
603 workerUsage.runTime.aggregate /
604 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 605 }
3fa4cdd2 606 if (
932fc8be
JB
607 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
608 .median &&
d715b7bc 609 message.taskPerformance?.runTime != null
3fa4cdd2 610 ) {
a4e07f72
JB
611 workerUsage.runTime.history.push(message.taskPerformance.runTime)
612 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 613 }
3032893a 614 }
f8eb0a2a
JB
615 }
616
a4e07f72
JB
617 private updateWaitTimeWorkerUsage (
618 workerUsage: WorkerUsage,
1c6fe997 619 task: Task<Data>
f8eb0a2a 620 ): void {
1c6fe997
JB
621 const timestamp = performance.now()
622 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
623 if (
624 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 625 .aggregate
87de9ff5 626 ) {
932fc8be 627 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 628 if (
87de9ff5 629 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 630 .waitTime.average &&
a4e07f72 631 workerUsage.tasks.executed !== 0
09a6305f 632 ) {
a4e07f72 633 workerUsage.waitTime.average =
f1c06930
JB
634 workerUsage.waitTime.aggregate /
635 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
636 }
637 if (
87de9ff5 638 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 639 .waitTime.median &&
1c6fe997 640 taskWaitTime != null
09a6305f 641 ) {
1c6fe997 642 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 643 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 644 }
0567595a 645 }
c01733f1 646 }
647
a4e07f72 648 private updateEluWorkerUsage (
5df69fab 649 workerUsage: WorkerUsage,
62c15a68
JB
650 message: MessageValue<Response>
651 ): void {
5df69fab
JB
652 if (
653 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
654 .aggregate
655 ) {
656 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
657 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
658 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
659 workerUsage.elu.utilization =
660 (workerUsage.elu.utilization +
661 message.taskPerformance.elu.utilization) /
662 2
663 } else if (message.taskPerformance?.elu != null) {
664 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
665 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
666 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
667 }
d715b7bc 668 if (
5df69fab
JB
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
670 .average &&
671 workerUsage.tasks.executed !== 0
672 ) {
f1c06930
JB
673 const executedTasks =
674 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 675 workerUsage.elu.idle.average =
f1c06930 676 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 677 workerUsage.elu.active.average =
f1c06930 678 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
679 }
680 if (
681 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
682 .median &&
d715b7bc
JB
683 message.taskPerformance?.elu != null
684 ) {
5df69fab
JB
685 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
686 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
687 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
688 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
689 }
690 }
691 }
692
280c2a77 693 /**
f06e48d8 694 * Chooses a worker node for the next task.
280c2a77 695 *
6c6afb84 696 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 697 *
20dcad1a 698 * @returns The worker node key
280c2a77 699 */
6c6afb84 700 private chooseWorkerNode (): number {
930dcf12 701 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
702 const worker = this.createAndSetupDynamicWorker()
703 if (
704 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
705 ) {
706 return this.getWorkerNodeKey(worker)
707 }
17393ac8 708 }
930dcf12
JB
709 return this.workerChoiceStrategyContext.execute()
710 }
711
6c6afb84
JB
712 /**
713 * Conditions for dynamic worker creation.
714 *
715 * @returns Whether to create a dynamic worker or not.
716 */
717 private shallCreateDynamicWorker (): boolean {
930dcf12 718 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
719 }
720
280c2a77 721 /**
675bb809 722 * Sends a message to the given worker.
280c2a77 723 *
38e795c1
JB
724 * @param worker - The worker which should receive the message.
725 * @param message - The message.
280c2a77
S
726 */
727 protected abstract sendToWorker (
728 worker: Worker,
729 message: MessageValue<Data>
730 ): void
731
4a6952ff 732 /**
f06e48d8 733 * Registers a listener callback on the given worker.
4a6952ff 734 *
38e795c1
JB
735 * @param worker - The worker which should register a listener.
736 * @param listener - The message listener callback.
4a6952ff 737 */
e102732c
JB
738 private registerWorkerMessageListener<Message extends Data | Response>(
739 worker: Worker,
740 listener: (message: MessageValue<Message>) => void
741 ): void {
742 worker.on('message', listener as MessageHandler<Worker>)
743 }
c97c7edb 744
729c563d 745 /**
41344292 746 * Creates a new worker.
6c6afb84
JB
747 *
748 * @returns Newly created worker.
729c563d 749 */
280c2a77 750 protected abstract createWorker (): Worker
c97c7edb 751
729c563d 752 /**
f06e48d8 753 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 754 * Can be overridden.
729c563d 755 *
38e795c1 756 * @param worker - The newly created worker.
729c563d 757 */
6677a3d3 758 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
759 // Listen to worker messages.
760 this.registerWorkerMessageListener(worker, this.workerListener())
761 }
c97c7edb 762
4a6952ff 763 /**
f06e48d8 764 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
765 *
766 * @returns New, completely set up worker.
767 */
768 protected createAndSetupWorker (): Worker {
bdacc2d2 769 const worker = this.createWorker()
280c2a77 770
35cf1c03 771 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 772 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
773 worker.on('error', error => {
774 if (this.emitter != null) {
775 this.emitter.emit(PoolEvents.error, error)
776 }
5bc91f3e
JB
777 if (this.opts.enableTasksQueue === true) {
778 const workerNodeKey = this.getWorkerNodeKey(worker)
779 while (this.tasksQueueSize(workerNodeKey) > 0) {
780 let targetWorkerNodeKey: number = workerNodeKey
781 let minQueuedTasks = Infinity
782 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
783 if (
784 workerNodeId !== workerNodeKey &&
785 workerNode.usage.tasks.queued === 0
786 ) {
787 targetWorkerNodeKey = workerNodeId
788 break
789 }
790 if (
791 workerNodeId !== workerNodeKey &&
792 workerNode.usage.tasks.queued < minQueuedTasks
793 ) {
794 minQueuedTasks = workerNode.usage.tasks.queued
795 targetWorkerNodeKey = workerNodeId
796 }
797 }
798 this.enqueueTask(
799 targetWorkerNodeKey,
800 this.dequeueTask(workerNodeKey) as Task<Data>
801 )
802 }
803 }
e8a4c3ea 804 if (this.opts.restartWorkerOnError === true) {
1f68cede 805 this.createAndSetupWorker()
5baee0d7
JB
806 }
807 })
a35560ba
S
808 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
809 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 810 worker.once('exit', () => {
f06e48d8 811 this.removeWorkerNode(worker)
a974afa6 812 })
280c2a77 813
f06e48d8 814 this.pushWorkerNode(worker)
280c2a77 815
b6b32453
JB
816 this.setWorkerStatistics(worker)
817
280c2a77
S
818 this.afterWorkerSetup(worker)
819
c97c7edb
S
820 return worker
821 }
be0676b3 822
930dcf12
JB
823 /**
824 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
825 *
826 * @returns New, completely set up dynamic worker.
827 */
828 protected createAndSetupDynamicWorker (): Worker {
829 const worker = this.createAndSetupWorker()
830 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 831 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
832 if (
833 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
834 (message.kill != null &&
835 ((this.opts.enableTasksQueue === false &&
f59e1027 836 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 837 (this.opts.enableTasksQueue === true &&
f59e1027 838 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 839 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
840 ) {
841 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
842 void (this.destroyWorker(worker) as Promise<void>)
843 }
844 })
845 return worker
846 }
847
be0676b3 848 /**
ff733df7 849 * This function is the listener registered for each worker message.
be0676b3 850 *
bdacc2d2 851 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
852 */
853 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 854 return message => {
f59e1027
JB
855 if (message.workerId != null && message.started != null) {
856 // Worker started message received
6b272951 857 this.handleWorkerStartedMessage(message)
f59e1027 858 } else if (message.id != null) {
a3445496 859 // Task execution response received
6b272951
JB
860 this.handleTaskExecutionResponse(message)
861 }
862 }
863 }
864
865 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
866 // Worker started message received
867 const worker = this.getWorkerById(message.workerId as number)
868 if (worker != null) {
869 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
870 message.started as boolean
871 } else {
872 throw new Error(
873 `Worker started message received from unknown worker '${
874 message.workerId as number
875 }'`
876 )
877 }
878 }
879
880 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
881 const promiseResponse = this.promiseResponseMap.get(message.id as string)
882 if (promiseResponse != null) {
883 if (message.taskError != null) {
884 if (this.emitter != null) {
885 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 886 }
6b272951
JB
887 promiseResponse.reject(message.taskError.message)
888 } else {
889 promiseResponse.resolve(message.data as Response)
890 }
891 this.afterTaskExecutionHook(promiseResponse.worker, message)
892 this.promiseResponseMap.delete(message.id as string)
893 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
894 if (
895 this.opts.enableTasksQueue === true &&
896 this.tasksQueueSize(workerNodeKey) > 0
897 ) {
898 this.executeTask(
899 workerNodeKey,
900 this.dequeueTask(workerNodeKey) as Task<Data>
901 )
be0676b3 902 }
6b272951 903 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 904 }
be0676b3 905 }
7c0ba920 906
ff733df7 907 private checkAndEmitEvents (): void {
1f68cede 908 if (this.emitter != null) {
ff733df7 909 if (this.busy) {
6b27d407 910 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 911 }
6b27d407
JB
912 if (this.type === PoolTypes.dynamic && this.full) {
913 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 914 }
164d950a
JB
915 }
916 }
917
0ebe2a9f
JB
918 /**
919 * Sets the given worker node its tasks usage in the pool.
920 *
921 * @param workerNode - The worker node.
a4e07f72 922 * @param workerUsage - The worker usage.
0ebe2a9f
JB
923 */
924 private setWorkerNodeTasksUsage (
925 workerNode: WorkerNode<Worker, Data>,
a4e07f72 926 workerUsage: WorkerUsage
0ebe2a9f 927 ): void {
f59e1027 928 workerNode.usage = workerUsage
0ebe2a9f
JB
929 }
930
a05c10de 931 /**
f06e48d8 932 * Pushes the given worker in the pool worker nodes.
ea7a90d3 933 *
38e795c1 934 * @param worker - The worker.
f06e48d8 935 * @returns The worker nodes length.
ea7a90d3 936 */
f06e48d8 937 private pushWorkerNode (worker: Worker): number {
9c16fb4b 938 this.workerNodes.push({
ffcbbad8 939 worker,
7ae6fb74 940 info: { id: this.getWorkerId(worker), started: true },
f59e1027 941 usage: this.getWorkerUsage(),
29ee7e9a 942 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 943 })
9c16fb4b
JB
944 const workerNodeKey = this.getWorkerNodeKey(worker)
945 this.setWorkerNodeTasksUsage(
946 this.workerNodes[workerNodeKey],
947 this.getWorkerUsage(workerNodeKey)
948 )
949 return this.workerNodes.length
ea7a90d3 950 }
c923ce56 951
83fa0a36
JB
952 /**
953 * Gets the worker id.
954 *
955 * @param worker - The worker.
956 * @returns The worker id.
957 */
958 private getWorkerId (worker: Worker): number | undefined {
959 if (this.worker === WorkerTypes.thread) {
960 return worker.threadId
961 } else if (this.worker === WorkerTypes.cluster) {
962 return worker.id
963 }
964 }
965
8604aaab
JB
966 // /**
967 // * Sets the given worker in the pool worker nodes.
968 // *
969 // * @param workerNodeKey - The worker node key.
970 // * @param worker - The worker.
f59e1027 971 // * @param workerInfo - The worker info.
8604aaab
JB
972 // * @param workerUsage - The worker usage.
973 // * @param tasksQueue - The worker task queue.
974 // */
975 // private setWorkerNode (
976 // workerNodeKey: number,
977 // worker: Worker,
f59e1027 978 // workerInfo: WorkerInfo,
8604aaab
JB
979 // workerUsage: WorkerUsage,
980 // tasksQueue: Queue<Task<Data>>
981 // ): void {
982 // this.workerNodes[workerNodeKey] = {
983 // worker,
f59e1027
JB
984 // info: workerInfo,
985 // usage: workerUsage,
8604aaab
JB
986 // tasksQueue
987 // }
988 // }
51fe3d3c
JB
989
990 /**
f06e48d8 991 * Removes the given worker from the pool worker nodes.
51fe3d3c 992 *
f06e48d8 993 * @param worker - The worker.
51fe3d3c 994 */
416fd65c 995 private removeWorkerNode (worker: Worker): void {
f06e48d8 996 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
997 if (workerNodeKey !== -1) {
998 this.workerNodes.splice(workerNodeKey, 1)
999 this.workerChoiceStrategyContext.remove(workerNodeKey)
1000 }
51fe3d3c 1001 }
adc3c320 1002
2e81254d 1003 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1004 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1005 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1006 }
1007
f9f00b5f 1008 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1009 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1010 }
1011
416fd65c 1012 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1013 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1014 }
1015
416fd65c 1016 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1017 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1018 }
ff733df7 1019
df593701
JB
1020 private tasksMaxQueueSize (workerNodeKey: number): number {
1021 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1022 }
1023
416fd65c 1024 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1025 while (this.tasksQueueSize(workerNodeKey) > 0) {
1026 this.executeTask(
1027 workerNodeKey,
1028 this.dequeueTask(workerNodeKey) as Task<Data>
1029 )
ff733df7 1030 }
df593701 1031 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1032 }
1033
ef41a6e6
JB
1034 private flushTasksQueues (): void {
1035 for (const [workerNodeKey] of this.workerNodes.entries()) {
1036 this.flushTasksQueue(workerNodeKey)
1037 }
1038 }
b6b32453
JB
1039
1040 private setWorkerStatistics (worker: Worker): void {
1041 this.sendToWorker(worker, {
1042 statistics: {
87de9ff5
JB
1043 runTime:
1044 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1045 .runTime.aggregate,
87de9ff5 1046 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1047 .elu.aggregate
b6b32453
JB
1048 }
1049 })
1050 }
8604aaab 1051
9c16fb4b 1052 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1053 const getTasksQueueSize = (workerNodeKey?: number): number => {
1054 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1055 }
df593701
JB
1056 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1057 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1058 }
8604aaab 1059 return {
9c16fb4b
JB
1060 tasks: {
1061 executed: 0,
1062 executing: 0,
1063 get queued (): number {
e3347a5c 1064 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1065 },
df593701
JB
1066 get maxQueued (): number {
1067 return getTasksMaxQueueSize(workerNodeKey)
1068 },
9c16fb4b
JB
1069 failed: 0
1070 },
8604aaab 1071 runTime: {
932fc8be 1072 aggregate: 0,
8604aaab
JB
1073 average: 0,
1074 median: 0,
1075 history: new CircularArray()
1076 },
1077 waitTime: {
932fc8be 1078 aggregate: 0,
8604aaab
JB
1079 average: 0,
1080 median: 0,
1081 history: new CircularArray()
1082 },
5df69fab
JB
1083 elu: {
1084 idle: {
1085 aggregate: 0,
1086 average: 0,
1087 median: 0,
1088 history: new CircularArray()
1089 },
1090 active: {
1091 aggregate: 0,
1092 average: 0,
1093 median: 0,
1094 history: new CircularArray()
1095 },
1096 utilization: 0
1097 }
8604aaab
JB
1098 }
1099 }
c97c7edb 1100}