refactor: abstract out measurement statistics
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import crypto from 'node:crypto'
2import { performance } from 'node:perf_hooks'
3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9} from '../utils'
10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11import { CircularArray } from '../circular-array'
12import { Queue } from '../queue'
13import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23} from './pool'
24import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
25import {
26 WorkerChoiceStrategies,
27 type WorkerChoiceStrategy,
28 type WorkerChoiceStrategyOptions
29} from './selection-strategies/selection-strategies-types'
30import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
31
32/**
33 * Base class that implements some shared logic for all poolifier pools.
34 *
35 * @typeParam Worker - Type of worker which manages this pool.
36 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
37 * @typeParam Response - Type of execution response. This can only be serializable data.
38 */
39export abstract class AbstractPool<
40 Worker extends IWorker,
41 Data = unknown,
42 Response = unknown
43> implements IPool<Worker, Data, Response> {
44 /** @inheritDoc */
45 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
46
47 /** @inheritDoc */
48 public readonly emitter?: PoolEmitter
49
50 /**
51 * The execution response promise map.
52 *
53 * - `key`: The message id of each submitted task.
54 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
55 *
56 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
57 */
58 protected promiseResponseMap: Map<
59 string,
60 PromiseResponseWrapper<Worker, Response>
61 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
62
63 /**
64 * Worker choice strategy context referencing a worker choice algorithm implementation.
65 *
66 * Default to a round robin algorithm.
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
69 Worker,
70 Data,
71 Response
72 >
73
74 /**
75 * Constructs a new poolifier pool.
76 *
77 * @param numberOfWorkers - Number of workers that this pool should manage.
78 * @param filePath - Path to the worker file.
79 * @param opts - Options for the pool.
80 */
81 public constructor (
82 protected readonly numberOfWorkers: number,
83 protected readonly filePath: string,
84 protected readonly opts: PoolOptions<Worker>
85 ) {
86 if (!this.isMain()) {
87 throw new Error('Cannot start a pool from a worker!')
88 }
89 this.checkNumberOfWorkers(this.numberOfWorkers)
90 this.checkFilePath(this.filePath)
91 this.checkPoolOptions(this.opts)
92
93 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
94 this.executeTask = this.executeTask.bind(this)
95 this.enqueueTask = this.enqueueTask.bind(this)
96 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
97
98 if (this.opts.enableEvents === true) {
99 this.emitter = new PoolEmitter()
100 }
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
102 Worker,
103 Data,
104 Response
105 >(
106 this,
107 this.opts.workerChoiceStrategy,
108 this.opts.workerChoiceStrategyOptions
109 )
110
111 this.setupHook()
112
113 for (let i = 1; i <= this.numberOfWorkers; i++) {
114 this.createAndSetupWorker()
115 }
116 }
117
118 private checkFilePath (filePath: string): void {
119 if (
120 filePath == null ||
121 (typeof filePath === 'string' && filePath.trim().length === 0)
122 ) {
123 throw new Error('Please specify a file with a worker implementation')
124 }
125 }
126
127 private checkNumberOfWorkers (numberOfWorkers: number): void {
128 if (numberOfWorkers == null) {
129 throw new Error(
130 'Cannot instantiate a pool without specifying the number of workers'
131 )
132 } else if (!Number.isSafeInteger(numberOfWorkers)) {
133 throw new TypeError(
134 'Cannot instantiate a pool with a non safe integer number of workers'
135 )
136 } else if (numberOfWorkers < 0) {
137 throw new RangeError(
138 'Cannot instantiate a pool with a negative number of workers'
139 )
140 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
141 throw new Error('Cannot instantiate a fixed pool with no worker')
142 }
143 }
144
145 private checkPoolOptions (opts: PoolOptions<Worker>): void {
146 if (isPlainObject(opts)) {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
150 this.opts.workerChoiceStrategyOptions =
151 opts.workerChoiceStrategyOptions ??
152 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
153 this.checkValidWorkerChoiceStrategyOptions(
154 this.opts.workerChoiceStrategyOptions
155 )
156 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
157 this.opts.enableEvents = opts.enableEvents ?? true
158 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
159 if (this.opts.enableTasksQueue) {
160 this.checkValidTasksQueueOptions(
161 opts.tasksQueueOptions as TasksQueueOptions
162 )
163 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
164 opts.tasksQueueOptions as TasksQueueOptions
165 )
166 }
167 } else {
168 throw new TypeError('Invalid pool options: must be a plain object')
169 }
170 }
171
172 private checkValidWorkerChoiceStrategy (
173 workerChoiceStrategy: WorkerChoiceStrategy
174 ): void {
175 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
176 throw new Error(
177 `Invalid worker choice strategy '${workerChoiceStrategy}'`
178 )
179 }
180 }
181
182 private checkValidWorkerChoiceStrategyOptions (
183 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
184 ): void {
185 if (!isPlainObject(workerChoiceStrategyOptions)) {
186 throw new TypeError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
189 }
190 if (
191 workerChoiceStrategyOptions.weights != null &&
192 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
193 ) {
194 throw new Error(
195 'Invalid worker choice strategy options: must have a weight for each worker node'
196 )
197 }
198 }
199
200 private checkValidTasksQueueOptions (
201 tasksQueueOptions: TasksQueueOptions
202 ): void {
203 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
204 throw new TypeError('Invalid tasks queue options: must be a plain object')
205 }
206 if ((tasksQueueOptions?.concurrency as number) <= 0) {
207 throw new Error(
208 `Invalid worker tasks concurrency '${
209 tasksQueueOptions.concurrency as number
210 }'`
211 )
212 }
213 }
214
215 /** @inheritDoc */
216 public get info (): PoolInfo {
217 return {
218 type: this.type,
219 worker: this.worker,
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
225 workerNode.workerUsage.tasks.executing === 0
226 ? accumulator + 1
227 : accumulator,
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
232 workerNode.workerUsage.tasks.executing > 0
233 ? accumulator + 1
234 : accumulator,
235 0
236 ),
237 executedTasks: this.workerNodes.reduce(
238 (accumulator, workerNode) =>
239 accumulator + workerNode.workerUsage.tasks.executed,
240 0
241 ),
242 executingTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.workerUsage.tasks.executing,
245 0
246 ),
247 queuedTasks: this.workerNodes.reduce(
248 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
249 0
250 ),
251 maxQueuedTasks: this.workerNodes.reduce(
252 (accumulator, workerNode) =>
253 accumulator + workerNode.tasksQueue.maxSize,
254 0
255 ),
256 failedTasks: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
258 accumulator + workerNode.workerUsage.tasks.failed,
259 0
260 )
261 }
262 }
263
264 /**
265 * Pool type.
266 *
267 * If it is `'dynamic'`, it provides the `max` property.
268 */
269 protected abstract get type (): PoolType
270
271 /**
272 * Gets the worker type.
273 */
274 protected abstract get worker (): WorkerType
275
276 /**
277 * Pool minimum size.
278 */
279 protected abstract get minSize (): number
280
281 /**
282 * Pool maximum size.
283 */
284 protected abstract get maxSize (): number
285
286 /**
287 * Gets the given worker its worker node key.
288 *
289 * @param worker - The worker.
290 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
291 */
292 private getWorkerNodeKey (worker: Worker): number {
293 return this.workerNodes.findIndex(
294 workerNode => workerNode.worker === worker
295 )
296 }
297
298 /** @inheritDoc */
299 public setWorkerChoiceStrategy (
300 workerChoiceStrategy: WorkerChoiceStrategy,
301 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
302 ): void {
303 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
304 this.opts.workerChoiceStrategy = workerChoiceStrategy
305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
306 this.opts.workerChoiceStrategy
307 )
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
311 for (const workerNode of this.workerNodes) {
312 this.setWorkerNodeTasksUsage(workerNode, {
313 tasks: {
314 executing: 0,
315 executed: 0,
316 queued:
317 this.opts.enableTasksQueue === true
318 ? workerNode.tasksQueue.size
319 : 0,
320 failed: 0
321 },
322 runTime: {
323 aggregation: 0,
324 average: 0,
325 median: 0,
326 history: new CircularArray()
327 },
328 waitTime: {
329 aggregation: 0,
330 average: 0,
331 median: 0,
332 history: new CircularArray()
333 },
334 elu: undefined
335 })
336 this.setWorkerStatistics(workerNode.worker)
337 }
338 }
339
340 /** @inheritDoc */
341 public setWorkerChoiceStrategyOptions (
342 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
343 ): void {
344 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
345 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
346 this.workerChoiceStrategyContext.setOptions(
347 this.opts.workerChoiceStrategyOptions
348 )
349 }
350
351 /** @inheritDoc */
352 public enableTasksQueue (
353 enable: boolean,
354 tasksQueueOptions?: TasksQueueOptions
355 ): void {
356 if (this.opts.enableTasksQueue === true && !enable) {
357 this.flushTasksQueues()
358 }
359 this.opts.enableTasksQueue = enable
360 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
361 }
362
363 /** @inheritDoc */
364 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
365 if (this.opts.enableTasksQueue === true) {
366 this.checkValidTasksQueueOptions(tasksQueueOptions)
367 this.opts.tasksQueueOptions =
368 this.buildTasksQueueOptions(tasksQueueOptions)
369 } else if (this.opts.tasksQueueOptions != null) {
370 delete this.opts.tasksQueueOptions
371 }
372 }
373
374 private buildTasksQueueOptions (
375 tasksQueueOptions: TasksQueueOptions
376 ): TasksQueueOptions {
377 return {
378 concurrency: tasksQueueOptions?.concurrency ?? 1
379 }
380 }
381
382 /**
383 * Whether the pool is full or not.
384 *
385 * The pool filling boolean status.
386 */
387 protected get full (): boolean {
388 return this.workerNodes.length >= this.maxSize
389 }
390
391 /**
392 * Whether the pool is busy or not.
393 *
394 * The pool busyness boolean status.
395 */
396 protected abstract get busy (): boolean
397
398 protected internalBusy (): boolean {
399 return (
400 this.workerNodes.findIndex(workerNode => {
401 return workerNode.workerUsage.tasks.executing === 0
402 }) === -1
403 )
404 }
405
406 /** @inheritDoc */
407 public async execute (data?: Data, name?: string): Promise<Response> {
408 const timestamp = performance.now()
409 const workerNodeKey = this.chooseWorkerNode()
410 const submittedTask: Task<Data> = {
411 name,
412 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
413 data: data ?? ({} as Data),
414 timestamp,
415 id: crypto.randomUUID()
416 }
417 const res = new Promise<Response>((resolve, reject) => {
418 this.promiseResponseMap.set(submittedTask.id as string, {
419 resolve,
420 reject,
421 worker: this.workerNodes[workerNodeKey].worker
422 })
423 })
424 if (
425 this.opts.enableTasksQueue === true &&
426 (this.busy ||
427 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
428 ((this.opts.tasksQueueOptions as TasksQueueOptions)
429 .concurrency as number))
430 ) {
431 this.enqueueTask(workerNodeKey, submittedTask)
432 } else {
433 this.executeTask(workerNodeKey, submittedTask)
434 }
435 this.workerChoiceStrategyContext.update(workerNodeKey)
436 this.checkAndEmitEvents()
437 // eslint-disable-next-line @typescript-eslint/return-await
438 return res
439 }
440
441 /** @inheritDoc */
442 public async destroy (): Promise<void> {
443 await Promise.all(
444 this.workerNodes.map(async (workerNode, workerNodeKey) => {
445 this.flushTasksQueue(workerNodeKey)
446 // FIXME: wait for tasks to be finished
447 await this.destroyWorker(workerNode.worker)
448 })
449 )
450 }
451
452 /**
453 * Shutdowns the given worker.
454 *
455 * @param worker - A worker within `workerNodes`.
456 */
457 protected abstract destroyWorker (worker: Worker): void | Promise<void>
458
459 /**
460 * Setup hook to execute code before worker node are created in the abstract constructor.
461 * Can be overridden
462 *
463 * @virtual
464 */
465 protected setupHook (): void {
466 // Intentionally empty
467 }
468
469 /**
470 * Should return whether the worker is the main worker or not.
471 */
472 protected abstract isMain (): boolean
473
474 /**
475 * Hook executed before the worker task execution.
476 * Can be overridden.
477 *
478 * @param workerNodeKey - The worker node key.
479 */
480 protected beforeTaskExecutionHook (workerNodeKey: number): void {
481 ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
482 if (this.opts.enableTasksQueue === true) {
483 this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
484 this.tasksQueueSize(workerNodeKey)
485 }
486 }
487
488 /**
489 * Hook executed after the worker task execution.
490 * Can be overridden.
491 *
492 * @param worker - The worker.
493 * @param message - The received message.
494 */
495 protected afterTaskExecutionHook (
496 worker: Worker,
497 message: MessageValue<Response>
498 ): void {
499 const workerUsage =
500 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
501 const workerTaskStatistics = workerUsage.tasks
502 --workerTaskStatistics.executing
503 ++workerTaskStatistics.executed
504 if (message.taskError != null) {
505 ++workerTaskStatistics.failed
506 }
507
508 this.updateRunTimeWorkerUsage(workerUsage, message)
509 this.updateWaitTimeWorkerUsage(workerUsage, message)
510 this.updateEluWorkerUsage(workerUsage, message)
511 }
512
513 private updateRunTimeWorkerUsage (
514 workerUsage: WorkerUsage,
515 message: MessageValue<Response>
516 ): void {
517 if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
518 workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
519 if (
520 this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
521 workerUsage.tasks.executed !== 0
522 ) {
523 workerUsage.runTime.average =
524 workerUsage.runTime.aggregation / workerUsage.tasks.executed
525 }
526 if (
527 this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
528 message.taskPerformance?.runTime != null
529 ) {
530 workerUsage.runTime.history.push(message.taskPerformance.runTime)
531 workerUsage.runTime.median = median(workerUsage.runTime.history)
532 }
533 }
534 }
535
536 private updateWaitTimeWorkerUsage (
537 workerUsage: WorkerUsage,
538 message: MessageValue<Response>
539 ): void {
540 if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
541 workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
542 if (
543 this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
544 workerUsage.tasks.executed !== 0
545 ) {
546 workerUsage.waitTime.average =
547 workerUsage.waitTime.aggregation / workerUsage.tasks.executed
548 }
549 if (
550 this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
551 message.taskPerformance?.waitTime != null
552 ) {
553 workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
554 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
555 }
556 }
557 }
558
559 private updateEluWorkerUsage (
560 workerTasksUsage: WorkerUsage,
561 message: MessageValue<Response>
562 ): void {
563 if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
564 if (
565 workerTasksUsage.elu != null &&
566 message.taskPerformance?.elu != null
567 ) {
568 workerTasksUsage.elu = {
569 idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
570 active:
571 workerTasksUsage.elu.active + message.taskPerformance.elu.active,
572 utilization:
573 (workerTasksUsage.elu.utilization +
574 message.taskPerformance.elu.utilization) /
575 2
576 }
577 } else if (message.taskPerformance?.elu != null) {
578 workerTasksUsage.elu = message.taskPerformance.elu
579 }
580 }
581 }
582
583 /**
584 * Chooses a worker node for the next task.
585 *
586 * The default worker choice strategy uses a round robin algorithm to distribute the load.
587 *
588 * @returns The worker node key
589 */
590 protected chooseWorkerNode (): number {
591 let workerNodeKey: number
592 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
593 const workerCreated = this.createAndSetupWorker()
594 this.registerWorkerMessageListener(workerCreated, message => {
595 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
596 if (
597 isKillBehavior(KillBehaviors.HARD, message.kill) ||
598 (message.kill != null &&
599 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
600 .executing === 0)
601 ) {
602 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
603 this.flushTasksQueue(currentWorkerNodeKey)
604 // FIXME: wait for tasks to be finished
605 void (this.destroyWorker(workerCreated) as Promise<void>)
606 }
607 })
608 workerNodeKey = this.getWorkerNodeKey(workerCreated)
609 } else {
610 workerNodeKey = this.workerChoiceStrategyContext.execute()
611 }
612 return workerNodeKey
613 }
614
615 /**
616 * Sends a message to the given worker.
617 *
618 * @param worker - The worker which should receive the message.
619 * @param message - The message.
620 */
621 protected abstract sendToWorker (
622 worker: Worker,
623 message: MessageValue<Data>
624 ): void
625
626 /**
627 * Registers a listener callback on the given worker.
628 *
629 * @param worker - The worker which should register a listener.
630 * @param listener - The message listener callback.
631 */
632 protected abstract registerWorkerMessageListener<
633 Message extends Data | Response
634 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
635
636 /**
637 * Returns a newly created worker.
638 */
639 protected abstract createWorker (): Worker
640
641 /**
642 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
643 *
644 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
645 *
646 * @param worker - The newly created worker.
647 */
648 protected abstract afterWorkerSetup (worker: Worker): void
649
650 /**
651 * Creates a new worker and sets it up completely in the pool worker nodes.
652 *
653 * @returns New, completely set up worker.
654 */
655 protected createAndSetupWorker (): Worker {
656 const worker = this.createWorker()
657
658 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
659 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
660 worker.on('error', error => {
661 if (this.emitter != null) {
662 this.emitter.emit(PoolEvents.error, error)
663 }
664 })
665 worker.on('error', () => {
666 if (this.opts.restartWorkerOnError === true) {
667 this.createAndSetupWorker()
668 }
669 })
670 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
671 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
672 worker.once('exit', () => {
673 this.removeWorkerNode(worker)
674 })
675
676 this.pushWorkerNode(worker)
677
678 this.setWorkerStatistics(worker)
679
680 this.afterWorkerSetup(worker)
681
682 return worker
683 }
684
685 /**
686 * This function is the listener registered for each worker message.
687 *
688 * @returns The listener function to execute when a message is received from a worker.
689 */
690 protected workerListener (): (message: MessageValue<Response>) => void {
691 return message => {
692 if (message.id != null) {
693 // Task execution response received
694 const promiseResponse = this.promiseResponseMap.get(message.id)
695 if (promiseResponse != null) {
696 if (message.taskError != null) {
697 promiseResponse.reject(message.taskError.message)
698 if (this.emitter != null) {
699 this.emitter.emit(PoolEvents.taskError, message.taskError)
700 }
701 } else {
702 promiseResponse.resolve(message.data as Response)
703 }
704 this.afterTaskExecutionHook(promiseResponse.worker, message)
705 this.promiseResponseMap.delete(message.id)
706 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
707 if (
708 this.opts.enableTasksQueue === true &&
709 this.tasksQueueSize(workerNodeKey) > 0
710 ) {
711 this.executeTask(
712 workerNodeKey,
713 this.dequeueTask(workerNodeKey) as Task<Data>
714 )
715 }
716 }
717 }
718 }
719 }
720
721 private checkAndEmitEvents (): void {
722 if (this.emitter != null) {
723 if (this.busy) {
724 this.emitter?.emit(PoolEvents.busy, this.info)
725 }
726 if (this.type === PoolTypes.dynamic && this.full) {
727 this.emitter?.emit(PoolEvents.full, this.info)
728 }
729 }
730 }
731
732 /**
733 * Sets the given worker node its tasks usage in the pool.
734 *
735 * @param workerNode - The worker node.
736 * @param workerUsage - The worker usage.
737 */
738 private setWorkerNodeTasksUsage (
739 workerNode: WorkerNode<Worker, Data>,
740 workerUsage: WorkerUsage
741 ): void {
742 workerNode.workerUsage = workerUsage
743 }
744
745 /**
746 * Pushes the given worker in the pool worker nodes.
747 *
748 * @param worker - The worker.
749 * @returns The worker nodes length.
750 */
751 private pushWorkerNode (worker: Worker): number {
752 return this.workerNodes.push({
753 worker,
754 workerUsage: {
755 tasks: {
756 executed: 0,
757 executing: 0,
758 queued: 0,
759 failed: 0
760 },
761 runTime: {
762 aggregation: 0,
763 average: 0,
764 median: 0,
765 history: new CircularArray()
766 },
767
768 waitTime: {
769 aggregation: 0,
770 average: 0,
771 median: 0,
772 history: new CircularArray()
773 },
774 elu: undefined
775 },
776 tasksQueue: new Queue<Task<Data>>()
777 })
778 }
779
780 /**
781 * Sets the given worker in the pool worker nodes.
782 *
783 * @param workerNodeKey - The worker node key.
784 * @param worker - The worker.
785 * @param workerUsage - The worker usage.
786 * @param tasksQueue - The worker task queue.
787 */
788 private setWorkerNode (
789 workerNodeKey: number,
790 worker: Worker,
791 workerUsage: WorkerUsage,
792 tasksQueue: Queue<Task<Data>>
793 ): void {
794 this.workerNodes[workerNodeKey] = {
795 worker,
796 workerUsage,
797 tasksQueue
798 }
799 }
800
801 /**
802 * Removes the given worker from the pool worker nodes.
803 *
804 * @param worker - The worker.
805 */
806 private removeWorkerNode (worker: Worker): void {
807 const workerNodeKey = this.getWorkerNodeKey(worker)
808 if (workerNodeKey !== -1) {
809 this.workerNodes.splice(workerNodeKey, 1)
810 this.workerChoiceStrategyContext.remove(workerNodeKey)
811 }
812 }
813
814 private executeTask (workerNodeKey: number, task: Task<Data>): void {
815 this.beforeTaskExecutionHook(workerNodeKey)
816 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
817 }
818
819 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
820 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
821 }
822
823 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
824 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
825 }
826
827 private tasksQueueSize (workerNodeKey: number): number {
828 return this.workerNodes[workerNodeKey].tasksQueue.size
829 }
830
831 private flushTasksQueue (workerNodeKey: number): void {
832 if (this.tasksQueueSize(workerNodeKey) > 0) {
833 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
834 this.executeTask(
835 workerNodeKey,
836 this.dequeueTask(workerNodeKey) as Task<Data>
837 )
838 }
839 }
840 }
841
842 private flushTasksQueues (): void {
843 for (const [workerNodeKey] of this.workerNodes.entries()) {
844 this.flushTasksQueue(workerNodeKey)
845 }
846 }
847
848 private setWorkerStatistics (worker: Worker): void {
849 this.sendToWorker(worker, {
850 statistics: {
851 runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime,
852 waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime,
853 elu: this.workerChoiceStrategyContext.getTaskStatistics().elu
854 }
855 })
856 }
857}