fix: fix type namespace collision
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
25 import {
26 WorkerChoiceStrategies,
27 type WorkerChoiceStrategy,
28 type WorkerChoiceStrategyOptions
29 } from './selection-strategies/selection-strategies-types'
30 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
31
32 /**
33 * Base class that implements some shared logic for all poolifier pools.
34 *
35 * @typeParam Worker - Type of worker which manages this pool.
36 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
37 * @typeParam Response - Type of execution response. This can only be serializable data.
38 */
39 export abstract class AbstractPool<
40 Worker extends IWorker,
41 Data = unknown,
42 Response = unknown
43 > implements IPool<Worker, Data, Response> {
44 /** @inheritDoc */
45 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
46
47 /** @inheritDoc */
48 public readonly emitter?: PoolEmitter
49
50 /**
51 * The execution response promise map.
52 *
53 * - `key`: The message id of each submitted task.
54 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
55 *
56 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
57 */
58 protected promiseResponseMap: Map<
59 string,
60 PromiseResponseWrapper<Worker, Response>
61 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
62
63 /**
64 * Worker choice strategy context referencing a worker choice algorithm implementation.
65 *
66 * Default to a round robin algorithm.
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
69 Worker,
70 Data,
71 Response
72 >
73
74 /**
75 * Constructs a new poolifier pool.
76 *
77 * @param numberOfWorkers - Number of workers that this pool should manage.
78 * @param filePath - Path to the worker file.
79 * @param opts - Options for the pool.
80 */
81 public constructor (
82 protected readonly numberOfWorkers: number,
83 protected readonly filePath: string,
84 protected readonly opts: PoolOptions<Worker>
85 ) {
86 if (!this.isMain()) {
87 throw new Error('Cannot start a pool from a worker!')
88 }
89 this.checkNumberOfWorkers(this.numberOfWorkers)
90 this.checkFilePath(this.filePath)
91 this.checkPoolOptions(this.opts)
92
93 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
94 this.executeTask = this.executeTask.bind(this)
95 this.enqueueTask = this.enqueueTask.bind(this)
96 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
97
98 if (this.opts.enableEvents === true) {
99 this.emitter = new PoolEmitter()
100 }
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
102 Worker,
103 Data,
104 Response
105 >(
106 this,
107 this.opts.workerChoiceStrategy,
108 this.opts.workerChoiceStrategyOptions
109 )
110
111 this.setupHook()
112
113 for (let i = 1; i <= this.numberOfWorkers; i++) {
114 this.createAndSetupWorker()
115 }
116 }
117
118 private checkFilePath (filePath: string): void {
119 if (
120 filePath == null ||
121 (typeof filePath === 'string' && filePath.trim().length === 0)
122 ) {
123 throw new Error('Please specify a file with a worker implementation')
124 }
125 }
126
127 private checkNumberOfWorkers (numberOfWorkers: number): void {
128 if (numberOfWorkers == null) {
129 throw new Error(
130 'Cannot instantiate a pool without specifying the number of workers'
131 )
132 } else if (!Number.isSafeInteger(numberOfWorkers)) {
133 throw new TypeError(
134 'Cannot instantiate a pool with a non safe integer number of workers'
135 )
136 } else if (numberOfWorkers < 0) {
137 throw new RangeError(
138 'Cannot instantiate a pool with a negative number of workers'
139 )
140 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
141 throw new Error('Cannot instantiate a fixed pool with no worker')
142 }
143 }
144
145 private checkPoolOptions (opts: PoolOptions<Worker>): void {
146 if (isPlainObject(opts)) {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
150 this.opts.workerChoiceStrategyOptions =
151 opts.workerChoiceStrategyOptions ??
152 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
153 this.checkValidWorkerChoiceStrategyOptions(
154 this.opts.workerChoiceStrategyOptions
155 )
156 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
157 this.opts.enableEvents = opts.enableEvents ?? true
158 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
159 if (this.opts.enableTasksQueue) {
160 this.checkValidTasksQueueOptions(
161 opts.tasksQueueOptions as TasksQueueOptions
162 )
163 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
164 opts.tasksQueueOptions as TasksQueueOptions
165 )
166 }
167 } else {
168 throw new TypeError('Invalid pool options: must be a plain object')
169 }
170 }
171
172 private checkValidWorkerChoiceStrategy (
173 workerChoiceStrategy: WorkerChoiceStrategy
174 ): void {
175 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
176 throw new Error(
177 `Invalid worker choice strategy '${workerChoiceStrategy}'`
178 )
179 }
180 }
181
182 private checkValidWorkerChoiceStrategyOptions (
183 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
184 ): void {
185 if (!isPlainObject(workerChoiceStrategyOptions)) {
186 throw new TypeError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
189 }
190 if (
191 workerChoiceStrategyOptions.weights != null &&
192 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
193 ) {
194 throw new Error(
195 'Invalid worker choice strategy options: must have a weight for each worker node'
196 )
197 }
198 }
199
200 private checkValidTasksQueueOptions (
201 tasksQueueOptions: TasksQueueOptions
202 ): void {
203 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
204 throw new TypeError('Invalid tasks queue options: must be a plain object')
205 }
206 if ((tasksQueueOptions?.concurrency as number) <= 0) {
207 throw new Error(
208 `Invalid worker tasks concurrency '${
209 tasksQueueOptions.concurrency as number
210 }'`
211 )
212 }
213 }
214
215 /** @inheritDoc */
216 public get info (): PoolInfo {
217 return {
218 type: this.type,
219 worker: this.worker,
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
225 workerNode.workerUsage.tasks.executing === 0
226 ? accumulator + 1
227 : accumulator,
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
232 workerNode.workerUsage.tasks.executing > 0
233 ? accumulator + 1
234 : accumulator,
235 0
236 ),
237 executedTasks: this.workerNodes.reduce(
238 (accumulator, workerNode) =>
239 accumulator + workerNode.workerUsage.tasks.executed,
240 0
241 ),
242 executingTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.workerUsage.tasks.executing,
245 0
246 ),
247 queuedTasks: this.workerNodes.reduce(
248 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
249 0
250 ),
251 maxQueuedTasks: this.workerNodes.reduce(
252 (accumulator, workerNode) =>
253 accumulator + workerNode.tasksQueue.maxSize,
254 0
255 ),
256 failedTasks: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
258 accumulator + workerNode.workerUsage.tasks.failed,
259 0
260 )
261 }
262 }
263
264 /**
265 * Pool type.
266 *
267 * If it is `'dynamic'`, it provides the `max` property.
268 */
269 protected abstract get type (): PoolType
270
271 /**
272 * Gets the worker type.
273 */
274 protected abstract get worker (): WorkerType
275
276 /**
277 * Pool minimum size.
278 */
279 protected abstract get minSize (): number
280
281 /**
282 * Pool maximum size.
283 */
284 protected abstract get maxSize (): number
285
286 /**
287 * Gets the given worker its worker node key.
288 *
289 * @param worker - The worker.
290 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
291 */
292 private getWorkerNodeKey (worker: Worker): number {
293 return this.workerNodes.findIndex(
294 workerNode => workerNode.worker === worker
295 )
296 }
297
298 /** @inheritDoc */
299 public setWorkerChoiceStrategy (
300 workerChoiceStrategy: WorkerChoiceStrategy,
301 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
302 ): void {
303 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
304 this.opts.workerChoiceStrategy = workerChoiceStrategy
305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
306 this.opts.workerChoiceStrategy
307 )
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
311 for (const workerNode of this.workerNodes) {
312 this.setWorkerNodeTasksUsage(workerNode, {
313 tasks: {
314 executed: 0,
315 executing: 0,
316 queued:
317 this.opts.enableTasksQueue === true
318 ? workerNode.tasksQueue.size
319 : 0,
320 failed: 0
321 },
322 runTime: {
323 aggregation: 0,
324 average: 0,
325 median: 0,
326 history: new CircularArray()
327 },
328 waitTime: {
329 aggregation: 0,
330 average: 0,
331 median: 0,
332 history: new CircularArray()
333 },
334 elu: undefined
335 })
336 this.setWorkerStatistics(workerNode.worker)
337 }
338 }
339
340 /** @inheritDoc */
341 public setWorkerChoiceStrategyOptions (
342 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
343 ): void {
344 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
345 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
346 this.workerChoiceStrategyContext.setOptions(
347 this.opts.workerChoiceStrategyOptions
348 )
349 }
350
351 /** @inheritDoc */
352 public enableTasksQueue (
353 enable: boolean,
354 tasksQueueOptions?: TasksQueueOptions
355 ): void {
356 if (this.opts.enableTasksQueue === true && !enable) {
357 this.flushTasksQueues()
358 }
359 this.opts.enableTasksQueue = enable
360 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
361 }
362
363 /** @inheritDoc */
364 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
365 if (this.opts.enableTasksQueue === true) {
366 this.checkValidTasksQueueOptions(tasksQueueOptions)
367 this.opts.tasksQueueOptions =
368 this.buildTasksQueueOptions(tasksQueueOptions)
369 } else if (this.opts.tasksQueueOptions != null) {
370 delete this.opts.tasksQueueOptions
371 }
372 }
373
374 private buildTasksQueueOptions (
375 tasksQueueOptions: TasksQueueOptions
376 ): TasksQueueOptions {
377 return {
378 concurrency: tasksQueueOptions?.concurrency ?? 1
379 }
380 }
381
382 /**
383 * Whether the pool is full or not.
384 *
385 * The pool filling boolean status.
386 */
387 protected get full (): boolean {
388 return this.workerNodes.length >= this.maxSize
389 }
390
391 /**
392 * Whether the pool is busy or not.
393 *
394 * The pool busyness boolean status.
395 */
396 protected abstract get busy (): boolean
397
398 protected internalBusy (): boolean {
399 return (
400 this.workerNodes.findIndex(workerNode => {
401 return workerNode.workerUsage.tasks.executing === 0
402 }) === -1
403 )
404 }
405
406 /** @inheritDoc */
407 public async execute (data?: Data, name?: string): Promise<Response> {
408 const timestamp = performance.now()
409 const workerNodeKey = this.chooseWorkerNode()
410 const submittedTask: Task<Data> = {
411 name,
412 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
413 data: data ?? ({} as Data),
414 timestamp,
415 id: crypto.randomUUID()
416 }
417 const res = new Promise<Response>((resolve, reject) => {
418 this.promiseResponseMap.set(submittedTask.id as string, {
419 resolve,
420 reject,
421 worker: this.workerNodes[workerNodeKey].worker
422 })
423 })
424 if (
425 this.opts.enableTasksQueue === true &&
426 (this.busy ||
427 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
428 ((this.opts.tasksQueueOptions as TasksQueueOptions)
429 .concurrency as number))
430 ) {
431 this.enqueueTask(workerNodeKey, submittedTask)
432 } else {
433 this.executeTask(workerNodeKey, submittedTask)
434 }
435 this.workerChoiceStrategyContext.update(workerNodeKey)
436 this.checkAndEmitEvents()
437 // eslint-disable-next-line @typescript-eslint/return-await
438 return res
439 }
440
441 /** @inheritDoc */
442 public async destroy (): Promise<void> {
443 await Promise.all(
444 this.workerNodes.map(async (workerNode, workerNodeKey) => {
445 this.flushTasksQueue(workerNodeKey)
446 // FIXME: wait for tasks to be finished
447 await this.destroyWorker(workerNode.worker)
448 })
449 )
450 }
451
452 /**
453 * Shutdowns the given worker.
454 *
455 * @param worker - A worker within `workerNodes`.
456 */
457 protected abstract destroyWorker (worker: Worker): void | Promise<void>
458
459 /**
460 * Setup hook to execute code before worker node are created in the abstract constructor.
461 * Can be overridden
462 *
463 * @virtual
464 */
465 protected setupHook (): void {
466 // Intentionally empty
467 }
468
469 /**
470 * Should return whether the worker is the main worker or not.
471 */
472 protected abstract isMain (): boolean
473
474 /**
475 * Hook executed before the worker task execution.
476 * Can be overridden.
477 *
478 * @param workerNodeKey - The worker node key.
479 */
480 protected beforeTaskExecutionHook (workerNodeKey: number): void {
481 ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
482 if (this.opts.enableTasksQueue === true) {
483 this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
484 this.tasksQueueSize(workerNodeKey)
485 }
486 }
487
488 /**
489 * Hook executed after the worker task execution.
490 * Can be overridden.
491 *
492 * @param worker - The worker.
493 * @param message - The received message.
494 */
495 protected afterTaskExecutionHook (
496 worker: Worker,
497 message: MessageValue<Response>
498 ): void {
499 const workerUsage =
500 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
501 const workerTaskStatistics = workerUsage.tasks
502 --workerTaskStatistics.executing
503 ++workerTaskStatistics.executed
504 if (message.taskError != null) {
505 ++workerTaskStatistics.failed
506 }
507
508 this.updateRunTimeWorkerUsage(workerUsage, message)
509 this.updateWaitTimeWorkerUsage(workerUsage, message)
510 this.updateEluWorkerUsage(workerUsage, message)
511 }
512
513 private updateRunTimeWorkerUsage (
514 workerUsage: WorkerUsage,
515 message: MessageValue<Response>
516 ): void {
517 if (
518 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
519 ) {
520 workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
521 if (
522 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
523 .avgRunTime &&
524 workerUsage.tasks.executed !== 0
525 ) {
526 workerUsage.runTime.average =
527 workerUsage.runTime.aggregation / workerUsage.tasks.executed
528 }
529 if (
530 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
531 .medRunTime &&
532 message.taskPerformance?.runTime != null
533 ) {
534 workerUsage.runTime.history.push(message.taskPerformance.runTime)
535 workerUsage.runTime.median = median(workerUsage.runTime.history)
536 }
537 }
538 }
539
540 private updateWaitTimeWorkerUsage (
541 workerUsage: WorkerUsage,
542 message: MessageValue<Response>
543 ): void {
544 if (
545 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
546 ) {
547 workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
548 if (
549 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
550 .avgWaitTime &&
551 workerUsage.tasks.executed !== 0
552 ) {
553 workerUsage.waitTime.average =
554 workerUsage.waitTime.aggregation / workerUsage.tasks.executed
555 }
556 if (
557 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
558 .medWaitTime &&
559 message.taskPerformance?.waitTime != null
560 ) {
561 workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
562 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
563 }
564 }
565 }
566
567 private updateEluWorkerUsage (
568 workerTasksUsage: WorkerUsage,
569 message: MessageValue<Response>
570 ): void {
571 if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) {
572 if (
573 workerTasksUsage.elu != null &&
574 message.taskPerformance?.elu != null
575 ) {
576 workerTasksUsage.elu = {
577 idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
578 active:
579 workerTasksUsage.elu.active + message.taskPerformance.elu.active,
580 utilization:
581 (workerTasksUsage.elu.utilization +
582 message.taskPerformance.elu.utilization) /
583 2
584 }
585 } else if (message.taskPerformance?.elu != null) {
586 workerTasksUsage.elu = message.taskPerformance.elu
587 }
588 }
589 }
590
591 /**
592 * Chooses a worker node for the next task.
593 *
594 * The default worker choice strategy uses a round robin algorithm to distribute the load.
595 *
596 * @returns The worker node key
597 */
598 protected chooseWorkerNode (): number {
599 let workerNodeKey: number
600 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
601 const workerCreated = this.createAndSetupWorker()
602 this.registerWorkerMessageListener(workerCreated, message => {
603 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
604 if (
605 isKillBehavior(KillBehaviors.HARD, message.kill) ||
606 (message.kill != null &&
607 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
608 .executing === 0)
609 ) {
610 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
611 this.flushTasksQueue(currentWorkerNodeKey)
612 // FIXME: wait for tasks to be finished
613 void (this.destroyWorker(workerCreated) as Promise<void>)
614 }
615 })
616 workerNodeKey = this.getWorkerNodeKey(workerCreated)
617 } else {
618 workerNodeKey = this.workerChoiceStrategyContext.execute()
619 }
620 return workerNodeKey
621 }
622
623 /**
624 * Sends a message to the given worker.
625 *
626 * @param worker - The worker which should receive the message.
627 * @param message - The message.
628 */
629 protected abstract sendToWorker (
630 worker: Worker,
631 message: MessageValue<Data>
632 ): void
633
634 /**
635 * Registers a listener callback on the given worker.
636 *
637 * @param worker - The worker which should register a listener.
638 * @param listener - The message listener callback.
639 */
640 protected abstract registerWorkerMessageListener<
641 Message extends Data | Response
642 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
643
644 /**
645 * Returns a newly created worker.
646 */
647 protected abstract createWorker (): Worker
648
649 /**
650 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
651 *
652 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
653 *
654 * @param worker - The newly created worker.
655 */
656 protected abstract afterWorkerSetup (worker: Worker): void
657
658 /**
659 * Creates a new worker and sets it up completely in the pool worker nodes.
660 *
661 * @returns New, completely set up worker.
662 */
663 protected createAndSetupWorker (): Worker {
664 const worker = this.createWorker()
665
666 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
667 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
668 worker.on('error', error => {
669 if (this.emitter != null) {
670 this.emitter.emit(PoolEvents.error, error)
671 }
672 })
673 worker.on('error', () => {
674 if (this.opts.restartWorkerOnError === true) {
675 this.createAndSetupWorker()
676 }
677 })
678 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
679 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
680 worker.once('exit', () => {
681 this.removeWorkerNode(worker)
682 })
683
684 this.pushWorkerNode(worker)
685
686 this.setWorkerStatistics(worker)
687
688 this.afterWorkerSetup(worker)
689
690 return worker
691 }
692
693 /**
694 * This function is the listener registered for each worker message.
695 *
696 * @returns The listener function to execute when a message is received from a worker.
697 */
698 protected workerListener (): (message: MessageValue<Response>) => void {
699 return message => {
700 if (message.id != null) {
701 // Task execution response received
702 const promiseResponse = this.promiseResponseMap.get(message.id)
703 if (promiseResponse != null) {
704 if (message.taskError != null) {
705 promiseResponse.reject(message.taskError.message)
706 if (this.emitter != null) {
707 this.emitter.emit(PoolEvents.taskError, message.taskError)
708 }
709 } else {
710 promiseResponse.resolve(message.data as Response)
711 }
712 this.afterTaskExecutionHook(promiseResponse.worker, message)
713 this.promiseResponseMap.delete(message.id)
714 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
715 if (
716 this.opts.enableTasksQueue === true &&
717 this.tasksQueueSize(workerNodeKey) > 0
718 ) {
719 this.executeTask(
720 workerNodeKey,
721 this.dequeueTask(workerNodeKey) as Task<Data>
722 )
723 }
724 }
725 }
726 }
727 }
728
729 private checkAndEmitEvents (): void {
730 if (this.emitter != null) {
731 if (this.busy) {
732 this.emitter?.emit(PoolEvents.busy, this.info)
733 }
734 if (this.type === PoolTypes.dynamic && this.full) {
735 this.emitter?.emit(PoolEvents.full, this.info)
736 }
737 }
738 }
739
740 /**
741 * Sets the given worker node its tasks usage in the pool.
742 *
743 * @param workerNode - The worker node.
744 * @param workerUsage - The worker usage.
745 */
746 private setWorkerNodeTasksUsage (
747 workerNode: WorkerNode<Worker, Data>,
748 workerUsage: WorkerUsage
749 ): void {
750 workerNode.workerUsage = workerUsage
751 }
752
753 /**
754 * Pushes the given worker in the pool worker nodes.
755 *
756 * @param worker - The worker.
757 * @returns The worker nodes length.
758 */
759 private pushWorkerNode (worker: Worker): number {
760 return this.workerNodes.push({
761 worker,
762 workerUsage: {
763 tasks: {
764 executed: 0,
765 executing: 0,
766 queued: 0,
767 failed: 0
768 },
769 runTime: {
770 aggregation: 0,
771 average: 0,
772 median: 0,
773 history: new CircularArray()
774 },
775
776 waitTime: {
777 aggregation: 0,
778 average: 0,
779 median: 0,
780 history: new CircularArray()
781 },
782 elu: undefined
783 },
784 tasksQueue: new Queue<Task<Data>>()
785 })
786 }
787
788 /**
789 * Sets the given worker in the pool worker nodes.
790 *
791 * @param workerNodeKey - The worker node key.
792 * @param worker - The worker.
793 * @param workerUsage - The worker usage.
794 * @param tasksQueue - The worker task queue.
795 */
796 private setWorkerNode (
797 workerNodeKey: number,
798 worker: Worker,
799 workerUsage: WorkerUsage,
800 tasksQueue: Queue<Task<Data>>
801 ): void {
802 this.workerNodes[workerNodeKey] = {
803 worker,
804 workerUsage,
805 tasksQueue
806 }
807 }
808
809 /**
810 * Removes the given worker from the pool worker nodes.
811 *
812 * @param worker - The worker.
813 */
814 private removeWorkerNode (worker: Worker): void {
815 const workerNodeKey = this.getWorkerNodeKey(worker)
816 if (workerNodeKey !== -1) {
817 this.workerNodes.splice(workerNodeKey, 1)
818 this.workerChoiceStrategyContext.remove(workerNodeKey)
819 }
820 }
821
822 private executeTask (workerNodeKey: number, task: Task<Data>): void {
823 this.beforeTaskExecutionHook(workerNodeKey)
824 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
825 }
826
827 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
828 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
829 }
830
831 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
832 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
833 }
834
835 private tasksQueueSize (workerNodeKey: number): number {
836 return this.workerNodes[workerNodeKey].tasksQueue.size
837 }
838
839 private flushTasksQueue (workerNodeKey: number): void {
840 if (this.tasksQueueSize(workerNodeKey) > 0) {
841 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
842 this.executeTask(
843 workerNodeKey,
844 this.dequeueTask(workerNodeKey) as Task<Data>
845 )
846 }
847 }
848 }
849
850 private flushTasksQueues (): void {
851 for (const [workerNodeKey] of this.workerNodes.entries()) {
852 this.flushTasksQueue(workerNodeKey)
853 }
854 }
855
856 private setWorkerStatistics (worker: Worker): void {
857 this.sendToWorker(worker, {
858 statistics: {
859 runTime:
860 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
861 .runTime,
862 waitTime:
863 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
864 .waitTime,
865 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
866 .elu
867 }
868 })
869 }
870 }