feat: add utilization to pool information
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median,
9 round
10 } from '../utils'
11 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
12 import { CircularArray } from '../circular-array'
13 import { Queue } from '../queue'
14 import {
15 type IPool,
16 PoolEmitter,
17 PoolEvents,
18 type PoolInfo,
19 type PoolOptions,
20 type PoolType,
21 PoolTypes,
22 type TasksQueueOptions,
23 type WorkerType
24 } from './pool'
25 import type {
26 IWorker,
27 MessageHandler,
28 Task,
29 WorkerNode,
30 WorkerUsage
31 } from './worker'
32 import {
33 Measurements,
34 WorkerChoiceStrategies,
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
37 } from './selection-strategies/selection-strategies-types'
38 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
39
40 /**
41 * Base class that implements some shared logic for all poolifier pools.
42 *
43 * @typeParam Worker - Type of worker which manages this pool.
44 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
45 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
46 */
47 export abstract class AbstractPool<
48 Worker extends IWorker,
49 Data = unknown,
50 Response = unknown
51 > implements IPool<Worker, Data, Response> {
52 /** @inheritDoc */
53 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
54
55 /** @inheritDoc */
56 public readonly emitter?: PoolEmitter
57
58 /**
59 * The execution response promise map.
60 *
61 * - `key`: The message id of each submitted task.
62 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
63 *
64 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
65 */
66 protected promiseResponseMap: Map<
67 string,
68 PromiseResponseWrapper<Worker, Response>
69 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
70
71 /**
72 * Worker choice strategy context referencing a worker choice algorithm implementation.
73 */
74 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
75 Worker,
76 Data,
77 Response
78 >
79
80 /**
81 * The start timestamp of the pool.
82 */
83 private readonly startTimestamp
84
85 /**
86 * Constructs a new poolifier pool.
87 *
88 * @param numberOfWorkers - Number of workers that this pool should manage.
89 * @param filePath - Path to the worker file.
90 * @param opts - Options for the pool.
91 */
92 public constructor (
93 protected readonly numberOfWorkers: number,
94 protected readonly filePath: string,
95 protected readonly opts: PoolOptions<Worker>
96 ) {
97 if (!this.isMain()) {
98 throw new Error('Cannot start a pool from a worker!')
99 }
100 this.checkNumberOfWorkers(this.numberOfWorkers)
101 this.checkFilePath(this.filePath)
102 this.checkPoolOptions(this.opts)
103
104 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
105 this.executeTask = this.executeTask.bind(this)
106 this.enqueueTask = this.enqueueTask.bind(this)
107 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
108
109 if (this.opts.enableEvents === true) {
110 this.emitter = new PoolEmitter()
111 }
112 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
113 Worker,
114 Data,
115 Response
116 >(
117 this,
118 this.opts.workerChoiceStrategy,
119 this.opts.workerChoiceStrategyOptions
120 )
121
122 this.setupHook()
123
124 while (this.workerNodes.length < this.numberOfWorkers) {
125 this.createAndSetupWorker()
126 }
127
128 this.startTimestamp = performance.now()
129 }
130
131 private checkFilePath (filePath: string): void {
132 if (
133 filePath == null ||
134 (typeof filePath === 'string' && filePath.trim().length === 0)
135 ) {
136 throw new Error('Please specify a file with a worker implementation')
137 }
138 }
139
140 private checkNumberOfWorkers (numberOfWorkers: number): void {
141 if (numberOfWorkers == null) {
142 throw new Error(
143 'Cannot instantiate a pool without specifying the number of workers'
144 )
145 } else if (!Number.isSafeInteger(numberOfWorkers)) {
146 throw new TypeError(
147 'Cannot instantiate a pool with a non safe integer number of workers'
148 )
149 } else if (numberOfWorkers < 0) {
150 throw new RangeError(
151 'Cannot instantiate a pool with a negative number of workers'
152 )
153 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
154 throw new Error('Cannot instantiate a fixed pool with no worker')
155 }
156 }
157
158 private checkPoolOptions (opts: PoolOptions<Worker>): void {
159 if (isPlainObject(opts)) {
160 this.opts.workerChoiceStrategy =
161 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
162 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
163 this.opts.workerChoiceStrategyOptions =
164 opts.workerChoiceStrategyOptions ??
165 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
166 this.checkValidWorkerChoiceStrategyOptions(
167 this.opts.workerChoiceStrategyOptions
168 )
169 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
170 this.opts.enableEvents = opts.enableEvents ?? true
171 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
172 if (this.opts.enableTasksQueue) {
173 this.checkValidTasksQueueOptions(
174 opts.tasksQueueOptions as TasksQueueOptions
175 )
176 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 }
180 } else {
181 throw new TypeError('Invalid pool options: must be a plain object')
182 }
183 }
184
185 private checkValidWorkerChoiceStrategy (
186 workerChoiceStrategy: WorkerChoiceStrategy
187 ): void {
188 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
189 throw new Error(
190 `Invalid worker choice strategy '${workerChoiceStrategy}'`
191 )
192 }
193 }
194
195 private checkValidWorkerChoiceStrategyOptions (
196 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
197 ): void {
198 if (!isPlainObject(workerChoiceStrategyOptions)) {
199 throw new TypeError(
200 'Invalid worker choice strategy options: must be a plain object'
201 )
202 }
203 if (
204 workerChoiceStrategyOptions.weights != null &&
205 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
206 ) {
207 throw new Error(
208 'Invalid worker choice strategy options: must have a weight for each worker node'
209 )
210 }
211 if (
212 workerChoiceStrategyOptions.measurement != null &&
213 !Object.values(Measurements).includes(
214 workerChoiceStrategyOptions.measurement
215 )
216 ) {
217 throw new Error(
218 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
219 )
220 }
221 }
222
223 private checkValidTasksQueueOptions (
224 tasksQueueOptions: TasksQueueOptions
225 ): void {
226 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
227 throw new TypeError('Invalid tasks queue options: must be a plain object')
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 !Number.isSafeInteger(tasksQueueOptions.concurrency)
232 ) {
233 throw new TypeError(
234 'Invalid worker tasks concurrency: must be an integer'
235 )
236 }
237 if (
238 tasksQueueOptions?.concurrency != null &&
239 tasksQueueOptions.concurrency <= 0
240 ) {
241 throw new Error(
242 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
243 )
244 }
245 }
246
247 /** @inheritDoc */
248 public get info (): PoolInfo {
249 return {
250 type: this.type,
251 worker: this.worker,
252 minSize: this.minSize,
253 maxSize: this.maxSize,
254 utilization: round(this.utilization),
255 workerNodes: this.workerNodes.length,
256 idleWorkerNodes: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
258 workerNode.usage.tasks.executing === 0
259 ? accumulator + 1
260 : accumulator,
261 0
262 ),
263 busyWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
265 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
266 0
267 ),
268 executedTasks: this.workerNodes.reduce(
269 (accumulator, workerNode) =>
270 accumulator + workerNode.usage.tasks.executed,
271 0
272 ),
273 executingTasks: this.workerNodes.reduce(
274 (accumulator, workerNode) =>
275 accumulator + workerNode.usage.tasks.executing,
276 0
277 ),
278 queuedTasks: this.workerNodes.reduce(
279 (accumulator, workerNode) =>
280 accumulator + workerNode.usage.tasks.queued,
281 0
282 ),
283 maxQueuedTasks: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 accumulator + workerNode.usage.tasks.maxQueued,
286 0
287 ),
288 failedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
290 accumulator + workerNode.usage.tasks.failed,
291 0
292 )
293 }
294 }
295
296 /**
297 * Gets the pool run time.
298 *
299 * @returns The pool run time in milliseconds.
300 */
301 private get runTime (): number {
302 return performance.now() - this.startTimestamp
303 }
304
305 /**
306 * Gets the approximate pool utilization.
307 *
308 * @returns The pool utilization.
309 */
310 private get utilization (): number {
311 const poolRunTimeCapacity = this.runTime * this.maxSize
312 const totalTasksRunTime = this.workerNodes.reduce(
313 (accumulator, workerNode) =>
314 accumulator + workerNode.usage.runTime.aggregate,
315 0
316 )
317 const totalTasksWaitTime = this.workerNodes.reduce(
318 (accumulator, workerNode) =>
319 accumulator + workerNode.usage.waitTime.aggregate,
320 0
321 )
322 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
323 }
324
325 /**
326 * Pool type.
327 *
328 * If it is `'dynamic'`, it provides the `max` property.
329 */
330 protected abstract get type (): PoolType
331
332 /**
333 * Gets the worker type.
334 */
335 protected abstract get worker (): WorkerType
336
337 /**
338 * Pool minimum size.
339 */
340 protected abstract get minSize (): number
341
342 /**
343 * Pool maximum size.
344 */
345 protected abstract get maxSize (): number
346
347 /**
348 * Gets the given worker its worker node key.
349 *
350 * @param worker - The worker.
351 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
352 */
353 private getWorkerNodeKey (worker: Worker): number {
354 return this.workerNodes.findIndex(
355 workerNode => workerNode.worker === worker
356 )
357 }
358
359 /** @inheritDoc */
360 public setWorkerChoiceStrategy (
361 workerChoiceStrategy: WorkerChoiceStrategy,
362 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
363 ): void {
364 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
365 this.opts.workerChoiceStrategy = workerChoiceStrategy
366 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
367 this.opts.workerChoiceStrategy
368 )
369 if (workerChoiceStrategyOptions != null) {
370 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
371 }
372 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
373 this.setWorkerNodeTasksUsage(
374 workerNode,
375 this.getWorkerUsage(workerNodeKey)
376 )
377 this.setWorkerStatistics(workerNode.worker)
378 }
379 }
380
381 /** @inheritDoc */
382 public setWorkerChoiceStrategyOptions (
383 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
384 ): void {
385 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
386 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
387 this.workerChoiceStrategyContext.setOptions(
388 this.opts.workerChoiceStrategyOptions
389 )
390 }
391
392 /** @inheritDoc */
393 public enableTasksQueue (
394 enable: boolean,
395 tasksQueueOptions?: TasksQueueOptions
396 ): void {
397 if (this.opts.enableTasksQueue === true && !enable) {
398 this.flushTasksQueues()
399 }
400 this.opts.enableTasksQueue = enable
401 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
402 }
403
404 /** @inheritDoc */
405 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
406 if (this.opts.enableTasksQueue === true) {
407 this.checkValidTasksQueueOptions(tasksQueueOptions)
408 this.opts.tasksQueueOptions =
409 this.buildTasksQueueOptions(tasksQueueOptions)
410 } else if (this.opts.tasksQueueOptions != null) {
411 delete this.opts.tasksQueueOptions
412 }
413 }
414
415 private buildTasksQueueOptions (
416 tasksQueueOptions: TasksQueueOptions
417 ): TasksQueueOptions {
418 return {
419 concurrency: tasksQueueOptions?.concurrency ?? 1
420 }
421 }
422
423 /**
424 * Whether the pool is full or not.
425 *
426 * The pool filling boolean status.
427 */
428 protected get full (): boolean {
429 return this.workerNodes.length >= this.maxSize
430 }
431
432 /**
433 * Whether the pool is busy or not.
434 *
435 * The pool busyness boolean status.
436 */
437 protected abstract get busy (): boolean
438
439 /**
440 * Whether worker nodes are executing at least one task.
441 *
442 * @returns Worker nodes busyness boolean status.
443 */
444 protected internalBusy (): boolean {
445 return (
446 this.workerNodes.findIndex(workerNode => {
447 return workerNode.usage.tasks.executing === 0
448 }) === -1
449 )
450 }
451
452 /** @inheritDoc */
453 public async execute (data?: Data, name?: string): Promise<Response> {
454 const timestamp = performance.now()
455 const workerNodeKey = this.chooseWorkerNode()
456 const submittedTask: Task<Data> = {
457 name,
458 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
459 data: data ?? ({} as Data),
460 timestamp,
461 id: crypto.randomUUID()
462 }
463 const res = new Promise<Response>((resolve, reject) => {
464 this.promiseResponseMap.set(submittedTask.id as string, {
465 resolve,
466 reject,
467 worker: this.workerNodes[workerNodeKey].worker
468 })
469 })
470 if (
471 this.opts.enableTasksQueue === true &&
472 (this.busy ||
473 this.workerNodes[workerNodeKey].usage.tasks.executing >=
474 ((this.opts.tasksQueueOptions as TasksQueueOptions)
475 .concurrency as number))
476 ) {
477 this.enqueueTask(workerNodeKey, submittedTask)
478 } else {
479 this.executeTask(workerNodeKey, submittedTask)
480 }
481 this.checkAndEmitEvents()
482 // eslint-disable-next-line @typescript-eslint/return-await
483 return res
484 }
485
486 /** @inheritDoc */
487 public async destroy (): Promise<void> {
488 await Promise.all(
489 this.workerNodes.map(async (workerNode, workerNodeKey) => {
490 this.flushTasksQueue(workerNodeKey)
491 // FIXME: wait for tasks to be finished
492 await this.destroyWorker(workerNode.worker)
493 })
494 )
495 }
496
497 /**
498 * Terminates the given worker.
499 *
500 * @param worker - A worker within `workerNodes`.
501 */
502 protected abstract destroyWorker (worker: Worker): void | Promise<void>
503
504 /**
505 * Setup hook to execute code before worker nodes are created in the abstract constructor.
506 * Can be overridden.
507 *
508 * @virtual
509 */
510 protected setupHook (): void {
511 // Intentionally empty
512 }
513
514 /**
515 * Should return whether the worker is the main worker or not.
516 */
517 protected abstract isMain (): boolean
518
519 /**
520 * Hook executed before the worker task execution.
521 * Can be overridden.
522 *
523 * @param workerNodeKey - The worker node key.
524 * @param task - The task to execute.
525 */
526 protected beforeTaskExecutionHook (
527 workerNodeKey: number,
528 task: Task<Data>
529 ): void {
530 const workerUsage = this.workerNodes[workerNodeKey].usage
531 ++workerUsage.tasks.executing
532 this.updateWaitTimeWorkerUsage(workerUsage, task)
533 }
534
535 /**
536 * Hook executed after the worker task execution.
537 * Can be overridden.
538 *
539 * @param worker - The worker.
540 * @param message - The received message.
541 */
542 protected afterTaskExecutionHook (
543 worker: Worker,
544 message: MessageValue<Response>
545 ): void {
546 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
547 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
548 this.updateRunTimeWorkerUsage(workerUsage, message)
549 this.updateEluWorkerUsage(workerUsage, message)
550 }
551
552 private updateTaskStatisticsWorkerUsage (
553 workerUsage: WorkerUsage,
554 message: MessageValue<Response>
555 ): void {
556 const workerTaskStatistics = workerUsage.tasks
557 --workerTaskStatistics.executing
558 ++workerTaskStatistics.executed
559 if (message.taskError != null) {
560 ++workerTaskStatistics.failed
561 }
562 }
563
564 private updateRunTimeWorkerUsage (
565 workerUsage: WorkerUsage,
566 message: MessageValue<Response>
567 ): void {
568 if (
569 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
570 .aggregate
571 ) {
572 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
573 if (
574 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
575 .average &&
576 workerUsage.tasks.executed !== 0
577 ) {
578 workerUsage.runTime.average =
579 workerUsage.runTime.aggregate /
580 (workerUsage.tasks.executed - workerUsage.tasks.failed)
581 }
582 if (
583 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
584 .median &&
585 message.taskPerformance?.runTime != null
586 ) {
587 workerUsage.runTime.history.push(message.taskPerformance.runTime)
588 workerUsage.runTime.median = median(workerUsage.runTime.history)
589 }
590 }
591 }
592
593 private updateWaitTimeWorkerUsage (
594 workerUsage: WorkerUsage,
595 task: Task<Data>
596 ): void {
597 const timestamp = performance.now()
598 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
599 if (
600 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
601 .aggregate
602 ) {
603 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
604 if (
605 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
606 .waitTime.average &&
607 workerUsage.tasks.executed !== 0
608 ) {
609 workerUsage.waitTime.average =
610 workerUsage.waitTime.aggregate /
611 (workerUsage.tasks.executed - workerUsage.tasks.failed)
612 }
613 if (
614 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
615 .waitTime.median &&
616 taskWaitTime != null
617 ) {
618 workerUsage.waitTime.history.push(taskWaitTime)
619 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
620 }
621 }
622 }
623
624 private updateEluWorkerUsage (
625 workerUsage: WorkerUsage,
626 message: MessageValue<Response>
627 ): void {
628 if (
629 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
630 .aggregate
631 ) {
632 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
633 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
634 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
635 workerUsage.elu.utilization =
636 (workerUsage.elu.utilization +
637 message.taskPerformance.elu.utilization) /
638 2
639 } else if (message.taskPerformance?.elu != null) {
640 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
641 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
642 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
643 }
644 if (
645 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
646 .average &&
647 workerUsage.tasks.executed !== 0
648 ) {
649 const executedTasks =
650 workerUsage.tasks.executed - workerUsage.tasks.failed
651 workerUsage.elu.idle.average =
652 workerUsage.elu.idle.aggregate / executedTasks
653 workerUsage.elu.active.average =
654 workerUsage.elu.active.aggregate / executedTasks
655 }
656 if (
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
658 .median &&
659 message.taskPerformance?.elu != null
660 ) {
661 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
662 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
663 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
664 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
665 }
666 }
667 }
668
669 /**
670 * Chooses a worker node for the next task.
671 *
672 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
673 *
674 * @returns The worker node key
675 */
676 private chooseWorkerNode (): number {
677 if (this.shallCreateDynamicWorker()) {
678 const worker = this.createAndSetupDynamicWorker()
679 if (
680 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
681 ) {
682 return this.getWorkerNodeKey(worker)
683 }
684 }
685 return this.workerChoiceStrategyContext.execute()
686 }
687
688 /**
689 * Conditions for dynamic worker creation.
690 *
691 * @returns Whether to create a dynamic worker or not.
692 */
693 private shallCreateDynamicWorker (): boolean {
694 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
695 }
696
697 /**
698 * Sends a message to the given worker.
699 *
700 * @param worker - The worker which should receive the message.
701 * @param message - The message.
702 */
703 protected abstract sendToWorker (
704 worker: Worker,
705 message: MessageValue<Data>
706 ): void
707
708 /**
709 * Registers a listener callback on the given worker.
710 *
711 * @param worker - The worker which should register a listener.
712 * @param listener - The message listener callback.
713 */
714 private registerWorkerMessageListener<Message extends Data | Response>(
715 worker: Worker,
716 listener: (message: MessageValue<Message>) => void
717 ): void {
718 worker.on('message', listener as MessageHandler<Worker>)
719 }
720
721 /**
722 * Creates a new worker.
723 *
724 * @returns Newly created worker.
725 */
726 protected abstract createWorker (): Worker
727
728 /**
729 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
730 * Can be overridden.
731 *
732 * @param worker - The newly created worker.
733 */
734 protected afterWorkerSetup (worker: Worker): void {
735 // Listen to worker messages.
736 this.registerWorkerMessageListener(worker, this.workerListener())
737 }
738
739 /**
740 * Creates a new worker and sets it up completely in the pool worker nodes.
741 *
742 * @returns New, completely set up worker.
743 */
744 protected createAndSetupWorker (): Worker {
745 const worker = this.createWorker()
746
747 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
748 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
749 worker.on('error', error => {
750 if (this.emitter != null) {
751 this.emitter.emit(PoolEvents.error, error)
752 }
753 if (this.opts.restartWorkerOnError === true) {
754 this.createAndSetupWorker()
755 }
756 })
757 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
758 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
759 worker.once('exit', () => {
760 this.removeWorkerNode(worker)
761 })
762
763 this.pushWorkerNode(worker)
764
765 this.setWorkerStatistics(worker)
766
767 this.afterWorkerSetup(worker)
768
769 return worker
770 }
771
772 /**
773 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
774 *
775 * @returns New, completely set up dynamic worker.
776 */
777 protected createAndSetupDynamicWorker (): Worker {
778 const worker = this.createAndSetupWorker()
779 this.registerWorkerMessageListener(worker, message => {
780 const workerNodeKey = this.getWorkerNodeKey(worker)
781 if (
782 isKillBehavior(KillBehaviors.HARD, message.kill) ||
783 (message.kill != null &&
784 ((this.opts.enableTasksQueue === false &&
785 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
786 (this.opts.enableTasksQueue === true &&
787 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
788 this.tasksQueueSize(workerNodeKey) === 0)))
789 ) {
790 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
791 void (this.destroyWorker(worker) as Promise<void>)
792 }
793 })
794 return worker
795 }
796
797 /**
798 * This function is the listener registered for each worker message.
799 *
800 * @returns The listener function to execute when a message is received from a worker.
801 */
802 protected workerListener (): (message: MessageValue<Response>) => void {
803 return message => {
804 if (message.id != null) {
805 // Task execution response received
806 const promiseResponse = this.promiseResponseMap.get(message.id)
807 if (promiseResponse != null) {
808 if (message.taskError != null) {
809 if (this.emitter != null) {
810 this.emitter.emit(PoolEvents.taskError, message.taskError)
811 }
812 promiseResponse.reject(message.taskError.message)
813 } else {
814 promiseResponse.resolve(message.data as Response)
815 }
816 this.afterTaskExecutionHook(promiseResponse.worker, message)
817 this.promiseResponseMap.delete(message.id)
818 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
819 if (
820 this.opts.enableTasksQueue === true &&
821 this.tasksQueueSize(workerNodeKey) > 0
822 ) {
823 this.executeTask(
824 workerNodeKey,
825 this.dequeueTask(workerNodeKey) as Task<Data>
826 )
827 }
828 this.workerChoiceStrategyContext.update(workerNodeKey)
829 }
830 }
831 }
832 }
833
834 private checkAndEmitEvents (): void {
835 if (this.emitter != null) {
836 if (this.busy) {
837 this.emitter?.emit(PoolEvents.busy, this.info)
838 }
839 if (this.type === PoolTypes.dynamic && this.full) {
840 this.emitter?.emit(PoolEvents.full, this.info)
841 }
842 }
843 }
844
845 /**
846 * Sets the given worker node its tasks usage in the pool.
847 *
848 * @param workerNode - The worker node.
849 * @param workerUsage - The worker usage.
850 */
851 private setWorkerNodeTasksUsage (
852 workerNode: WorkerNode<Worker, Data>,
853 workerUsage: WorkerUsage
854 ): void {
855 workerNode.usage = workerUsage
856 }
857
858 /**
859 * Pushes the given worker in the pool worker nodes.
860 *
861 * @param worker - The worker.
862 * @returns The worker nodes length.
863 */
864 private pushWorkerNode (worker: Worker): number {
865 this.workerNodes.push({
866 worker,
867 usage: this.getWorkerUsage(),
868 tasksQueue: new Queue<Task<Data>>()
869 })
870 const workerNodeKey = this.getWorkerNodeKey(worker)
871 this.setWorkerNodeTasksUsage(
872 this.workerNodes[workerNodeKey],
873 this.getWorkerUsage(workerNodeKey)
874 )
875 return this.workerNodes.length
876 }
877
878 // /**
879 // * Sets the given worker in the pool worker nodes.
880 // *
881 // * @param workerNodeKey - The worker node key.
882 // * @param worker - The worker.
883 // * @param workerUsage - The worker usage.
884 // * @param tasksQueue - The worker task queue.
885 // */
886 // private setWorkerNode (
887 // workerNodeKey: number,
888 // worker: Worker,
889 // workerUsage: WorkerUsage,
890 // tasksQueue: Queue<Task<Data>>
891 // ): void {
892 // this.workerNodes[workerNodeKey] = {
893 // worker,
894 // usage: workerUsage,
895 // tasksQueue
896 // }
897 // }
898
899 /**
900 * Removes the given worker from the pool worker nodes.
901 *
902 * @param worker - The worker.
903 */
904 private removeWorkerNode (worker: Worker): void {
905 const workerNodeKey = this.getWorkerNodeKey(worker)
906 if (workerNodeKey !== -1) {
907 this.workerNodes.splice(workerNodeKey, 1)
908 this.workerChoiceStrategyContext.remove(workerNodeKey)
909 }
910 }
911
912 private executeTask (workerNodeKey: number, task: Task<Data>): void {
913 this.beforeTaskExecutionHook(workerNodeKey, task)
914 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
915 }
916
917 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
918 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
919 }
920
921 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
922 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
923 }
924
925 private tasksQueueSize (workerNodeKey: number): number {
926 return this.workerNodes[workerNodeKey].tasksQueue.size
927 }
928
929 private tasksMaxQueueSize (workerNodeKey: number): number {
930 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
931 }
932
933 private flushTasksQueue (workerNodeKey: number): void {
934 if (this.tasksQueueSize(workerNodeKey) > 0) {
935 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
936 this.executeTask(
937 workerNodeKey,
938 this.dequeueTask(workerNodeKey) as Task<Data>
939 )
940 }
941 }
942 this.workerNodes[workerNodeKey].tasksQueue.clear()
943 }
944
945 private flushTasksQueues (): void {
946 for (const [workerNodeKey] of this.workerNodes.entries()) {
947 this.flushTasksQueue(workerNodeKey)
948 }
949 }
950
951 private setWorkerStatistics (worker: Worker): void {
952 this.sendToWorker(worker, {
953 statistics: {
954 runTime:
955 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
956 .runTime.aggregate,
957 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
958 .elu.aggregate
959 }
960 })
961 }
962
963 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
964 const getTasksQueueSize = (workerNodeKey?: number): number => {
965 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
966 }
967 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
968 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
969 }
970 return {
971 tasks: {
972 executed: 0,
973 executing: 0,
974 get queued (): number {
975 return getTasksQueueSize(workerNodeKey)
976 },
977 get maxQueued (): number {
978 return getTasksMaxQueueSize(workerNodeKey)
979 },
980 failed: 0
981 },
982 runTime: {
983 aggregate: 0,
984 average: 0,
985 median: 0,
986 history: new CircularArray()
987 },
988 waitTime: {
989 aggregate: 0,
990 average: 0,
991 median: 0,
992 history: new CircularArray()
993 },
994 elu: {
995 idle: {
996 aggregate: 0,
997 average: 0,
998 median: 0,
999 history: new CircularArray()
1000 },
1001 active: {
1002 aggregate: 0,
1003 average: 0,
1004 median: 0,
1005 history: new CircularArray()
1006 },
1007 utilization: 0
1008 }
1009 }
1010 }
1011 }