feat: add statistics accounting to ELU fields
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type {
25 IWorker,
26 Task,
27 TaskStatistics,
28 WorkerNode,
29 WorkerUsage
30 } from './worker'
31 import {
32 WorkerChoiceStrategies,
33 type WorkerChoiceStrategy,
34 type WorkerChoiceStrategyOptions
35 } from './selection-strategies/selection-strategies-types'
36 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
37
38 /**
39 * Base class that implements some shared logic for all poolifier pools.
40 *
41 * @typeParam Worker - Type of worker which manages this pool.
42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
43 * @typeParam Response - Type of execution response. This can only be serializable data.
44 */
45 export abstract class AbstractPool<
46 Worker extends IWorker,
47 Data = unknown,
48 Response = unknown
49 > implements IPool<Worker, Data, Response> {
50 /** @inheritDoc */
51 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
52
53 /** @inheritDoc */
54 public readonly emitter?: PoolEmitter
55
56 /**
57 * The execution response promise map.
58 *
59 * - `key`: The message id of each submitted task.
60 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
61 *
62 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
63 */
64 protected promiseResponseMap: Map<
65 string,
66 PromiseResponseWrapper<Worker, Response>
67 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
68
69 /**
70 * Worker choice strategy context referencing a worker choice algorithm implementation.
71 *
72 * Default to a round robin algorithm.
73 */
74 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
75 Worker,
76 Data,
77 Response
78 >
79
80 /**
81 * Constructs a new poolifier pool.
82 *
83 * @param numberOfWorkers - Number of workers that this pool should manage.
84 * @param filePath - Path to the worker file.
85 * @param opts - Options for the pool.
86 */
87 public constructor (
88 protected readonly numberOfWorkers: number,
89 protected readonly filePath: string,
90 protected readonly opts: PoolOptions<Worker>
91 ) {
92 if (!this.isMain()) {
93 throw new Error('Cannot start a pool from a worker!')
94 }
95 this.checkNumberOfWorkers(this.numberOfWorkers)
96 this.checkFilePath(this.filePath)
97 this.checkPoolOptions(this.opts)
98
99 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
100 this.executeTask = this.executeTask.bind(this)
101 this.enqueueTask = this.enqueueTask.bind(this)
102 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
103
104 if (this.opts.enableEvents === true) {
105 this.emitter = new PoolEmitter()
106 }
107 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
108 Worker,
109 Data,
110 Response
111 >(
112 this,
113 this.opts.workerChoiceStrategy,
114 this.opts.workerChoiceStrategyOptions
115 )
116
117 this.setupHook()
118
119 for (let i = 1; i <= this.numberOfWorkers; i++) {
120 this.createAndSetupWorker()
121 }
122 }
123
124 private checkFilePath (filePath: string): void {
125 if (
126 filePath == null ||
127 (typeof filePath === 'string' && filePath.trim().length === 0)
128 ) {
129 throw new Error('Please specify a file with a worker implementation')
130 }
131 }
132
133 private checkNumberOfWorkers (numberOfWorkers: number): void {
134 if (numberOfWorkers == null) {
135 throw new Error(
136 'Cannot instantiate a pool without specifying the number of workers'
137 )
138 } else if (!Number.isSafeInteger(numberOfWorkers)) {
139 throw new TypeError(
140 'Cannot instantiate a pool with a non safe integer number of workers'
141 )
142 } else if (numberOfWorkers < 0) {
143 throw new RangeError(
144 'Cannot instantiate a pool with a negative number of workers'
145 )
146 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
147 throw new Error('Cannot instantiate a fixed pool with no worker')
148 }
149 }
150
151 private checkPoolOptions (opts: PoolOptions<Worker>): void {
152 if (isPlainObject(opts)) {
153 this.opts.workerChoiceStrategy =
154 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
155 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
156 this.opts.workerChoiceStrategyOptions =
157 opts.workerChoiceStrategyOptions ??
158 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
159 this.checkValidWorkerChoiceStrategyOptions(
160 this.opts.workerChoiceStrategyOptions
161 )
162 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
163 this.opts.enableEvents = opts.enableEvents ?? true
164 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
165 if (this.opts.enableTasksQueue) {
166 this.checkValidTasksQueueOptions(
167 opts.tasksQueueOptions as TasksQueueOptions
168 )
169 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
170 opts.tasksQueueOptions as TasksQueueOptions
171 )
172 }
173 } else {
174 throw new TypeError('Invalid pool options: must be a plain object')
175 }
176 }
177
178 private checkValidWorkerChoiceStrategy (
179 workerChoiceStrategy: WorkerChoiceStrategy
180 ): void {
181 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
182 throw new Error(
183 `Invalid worker choice strategy '${workerChoiceStrategy}'`
184 )
185 }
186 }
187
188 private checkValidWorkerChoiceStrategyOptions (
189 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
190 ): void {
191 if (!isPlainObject(workerChoiceStrategyOptions)) {
192 throw new TypeError(
193 'Invalid worker choice strategy options: must be a plain object'
194 )
195 }
196 if (
197 workerChoiceStrategyOptions.weights != null &&
198 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
199 ) {
200 throw new Error(
201 'Invalid worker choice strategy options: must have a weight for each worker node'
202 )
203 }
204 }
205
206 private checkValidTasksQueueOptions (
207 tasksQueueOptions: TasksQueueOptions
208 ): void {
209 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
210 throw new TypeError('Invalid tasks queue options: must be a plain object')
211 }
212 if ((tasksQueueOptions?.concurrency as number) <= 0) {
213 throw new Error(
214 `Invalid worker tasks concurrency '${
215 tasksQueueOptions.concurrency as number
216 }'`
217 )
218 }
219 }
220
221 /** @inheritDoc */
222 public get info (): PoolInfo {
223 return {
224 type: this.type,
225 worker: this.worker,
226 minSize: this.minSize,
227 maxSize: this.maxSize,
228 workerNodes: this.workerNodes.length,
229 idleWorkerNodes: this.workerNodes.reduce(
230 (accumulator, workerNode) =>
231 workerNode.workerUsage.tasks.executing === 0
232 ? accumulator + 1
233 : accumulator,
234 0
235 ),
236 busyWorkerNodes: this.workerNodes.reduce(
237 (accumulator, workerNode) =>
238 workerNode.workerUsage.tasks.executing > 0
239 ? accumulator + 1
240 : accumulator,
241 0
242 ),
243 executedTasks: this.workerNodes.reduce(
244 (accumulator, workerNode) =>
245 accumulator + workerNode.workerUsage.tasks.executed,
246 0
247 ),
248 executingTasks: this.workerNodes.reduce(
249 (accumulator, workerNode) =>
250 accumulator + workerNode.workerUsage.tasks.executing,
251 0
252 ),
253 queuedTasks: this.workerNodes.reduce(
254 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
255 0
256 ),
257 maxQueuedTasks: this.workerNodes.reduce(
258 (accumulator, workerNode) =>
259 accumulator + workerNode.tasksQueue.maxSize,
260 0
261 ),
262 failedTasks: this.workerNodes.reduce(
263 (accumulator, workerNode) =>
264 accumulator + workerNode.workerUsage.tasks.failed,
265 0
266 )
267 }
268 }
269
270 /**
271 * Pool type.
272 *
273 * If it is `'dynamic'`, it provides the `max` property.
274 */
275 protected abstract get type (): PoolType
276
277 /**
278 * Gets the worker type.
279 */
280 protected abstract get worker (): WorkerType
281
282 /**
283 * Pool minimum size.
284 */
285 protected abstract get minSize (): number
286
287 /**
288 * Pool maximum size.
289 */
290 protected abstract get maxSize (): number
291
292 /**
293 * Gets the given worker its worker node key.
294 *
295 * @param worker - The worker.
296 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
297 */
298 private getWorkerNodeKey (worker: Worker): number {
299 return this.workerNodes.findIndex(
300 workerNode => workerNode.worker === worker
301 )
302 }
303
304 /** @inheritDoc */
305 public setWorkerChoiceStrategy (
306 workerChoiceStrategy: WorkerChoiceStrategy,
307 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
308 ): void {
309 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
310 this.opts.workerChoiceStrategy = workerChoiceStrategy
311 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
312 this.opts.workerChoiceStrategy
313 )
314 if (workerChoiceStrategyOptions != null) {
315 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
316 }
317 for (const workerNode of this.workerNodes) {
318 this.setWorkerNodeTasksUsage(
319 workerNode,
320 this.getWorkerUsage(workerNode.worker)
321 )
322 this.setWorkerStatistics(workerNode.worker)
323 }
324 }
325
326 /** @inheritDoc */
327 public setWorkerChoiceStrategyOptions (
328 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
329 ): void {
330 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
331 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
332 this.workerChoiceStrategyContext.setOptions(
333 this.opts.workerChoiceStrategyOptions
334 )
335 }
336
337 /** @inheritDoc */
338 public enableTasksQueue (
339 enable: boolean,
340 tasksQueueOptions?: TasksQueueOptions
341 ): void {
342 if (this.opts.enableTasksQueue === true && !enable) {
343 this.flushTasksQueues()
344 }
345 this.opts.enableTasksQueue = enable
346 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
347 }
348
349 /** @inheritDoc */
350 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
351 if (this.opts.enableTasksQueue === true) {
352 this.checkValidTasksQueueOptions(tasksQueueOptions)
353 this.opts.tasksQueueOptions =
354 this.buildTasksQueueOptions(tasksQueueOptions)
355 } else if (this.opts.tasksQueueOptions != null) {
356 delete this.opts.tasksQueueOptions
357 }
358 }
359
360 private buildTasksQueueOptions (
361 tasksQueueOptions: TasksQueueOptions
362 ): TasksQueueOptions {
363 return {
364 concurrency: tasksQueueOptions?.concurrency ?? 1
365 }
366 }
367
368 /**
369 * Whether the pool is full or not.
370 *
371 * The pool filling boolean status.
372 */
373 protected get full (): boolean {
374 return this.workerNodes.length >= this.maxSize
375 }
376
377 /**
378 * Whether the pool is busy or not.
379 *
380 * The pool busyness boolean status.
381 */
382 protected abstract get busy (): boolean
383
384 protected internalBusy (): boolean {
385 return (
386 this.workerNodes.findIndex(workerNode => {
387 return workerNode.workerUsage.tasks.executing === 0
388 }) === -1
389 )
390 }
391
392 /** @inheritDoc */
393 public async execute (data?: Data, name?: string): Promise<Response> {
394 const timestamp = performance.now()
395 const workerNodeKey = this.chooseWorkerNode()
396 const submittedTask: Task<Data> = {
397 name,
398 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
399 data: data ?? ({} as Data),
400 timestamp,
401 id: crypto.randomUUID()
402 }
403 const res = new Promise<Response>((resolve, reject) => {
404 this.promiseResponseMap.set(submittedTask.id as string, {
405 resolve,
406 reject,
407 worker: this.workerNodes[workerNodeKey].worker
408 })
409 })
410 if (
411 this.opts.enableTasksQueue === true &&
412 (this.busy ||
413 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
414 ((this.opts.tasksQueueOptions as TasksQueueOptions)
415 .concurrency as number))
416 ) {
417 this.enqueueTask(workerNodeKey, submittedTask)
418 } else {
419 this.executeTask(workerNodeKey, submittedTask)
420 }
421 this.workerChoiceStrategyContext.update(workerNodeKey)
422 this.checkAndEmitEvents()
423 // eslint-disable-next-line @typescript-eslint/return-await
424 return res
425 }
426
427 /** @inheritDoc */
428 public async destroy (): Promise<void> {
429 await Promise.all(
430 this.workerNodes.map(async (workerNode, workerNodeKey) => {
431 this.flushTasksQueue(workerNodeKey)
432 // FIXME: wait for tasks to be finished
433 await this.destroyWorker(workerNode.worker)
434 })
435 )
436 }
437
438 /**
439 * Shutdowns the given worker.
440 *
441 * @param worker - A worker within `workerNodes`.
442 */
443 protected abstract destroyWorker (worker: Worker): void | Promise<void>
444
445 /**
446 * Setup hook to execute code before worker node are created in the abstract constructor.
447 * Can be overridden
448 *
449 * @virtual
450 */
451 protected setupHook (): void {
452 // Intentionally empty
453 }
454
455 /**
456 * Should return whether the worker is the main worker or not.
457 */
458 protected abstract isMain (): boolean
459
460 /**
461 * Hook executed before the worker task execution.
462 * Can be overridden.
463 *
464 * @param workerNodeKey - The worker node key.
465 * @param task - The task to execute.
466 */
467 protected beforeTaskExecutionHook (
468 workerNodeKey: number,
469 task: Task<Data>
470 ): void {
471 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
472 ++workerUsage.tasks.executing
473 this.updateWaitTimeWorkerUsage(workerUsage, task)
474 }
475
476 /**
477 * Hook executed after the worker task execution.
478 * Can be overridden.
479 *
480 * @param worker - The worker.
481 * @param message - The received message.
482 */
483 protected afterTaskExecutionHook (
484 worker: Worker,
485 message: MessageValue<Response>
486 ): void {
487 const workerUsage =
488 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
489 const workerTaskStatistics = workerUsage.tasks
490 --workerTaskStatistics.executing
491 ++workerTaskStatistics.executed
492 if (message.taskError != null) {
493 ++workerTaskStatistics.failed
494 }
495 this.updateRunTimeWorkerUsage(workerUsage, message)
496 this.updateEluWorkerUsage(workerUsage, message)
497 }
498
499 private updateRunTimeWorkerUsage (
500 workerUsage: WorkerUsage,
501 message: MessageValue<Response>
502 ): void {
503 if (
504 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
505 .aggregate
506 ) {
507 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
508 if (
509 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
510 .average &&
511 workerUsage.tasks.executed !== 0
512 ) {
513 workerUsage.runTime.average =
514 workerUsage.runTime.aggregate / workerUsage.tasks.executed
515 }
516 if (
517 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
518 .median &&
519 message.taskPerformance?.runTime != null
520 ) {
521 workerUsage.runTime.history.push(message.taskPerformance.runTime)
522 workerUsage.runTime.median = median(workerUsage.runTime.history)
523 }
524 }
525 }
526
527 private updateWaitTimeWorkerUsage (
528 workerUsage: WorkerUsage,
529 task: Task<Data>
530 ): void {
531 const timestamp = performance.now()
532 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
533 if (
534 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
535 .aggregate
536 ) {
537 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
538 if (
539 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
540 .waitTime.average &&
541 workerUsage.tasks.executed !== 0
542 ) {
543 workerUsage.waitTime.average =
544 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
545 }
546 if (
547 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
548 .waitTime.median &&
549 taskWaitTime != null
550 ) {
551 workerUsage.waitTime.history.push(taskWaitTime)
552 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
553 }
554 }
555 }
556
557 private updateEluWorkerUsage (
558 workerUsage: WorkerUsage,
559 message: MessageValue<Response>
560 ): void {
561 if (
562 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
563 .aggregate
564 ) {
565 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
566 workerUsage.elu.idle.aggregate =
567 workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle
568 workerUsage.elu.active.aggregate =
569 workerUsage.elu.active.aggregate + message.taskPerformance.elu.active
570 workerUsage.elu.utilization =
571 (workerUsage.elu.utilization +
572 message.taskPerformance.elu.utilization) /
573 2
574 } else if (message.taskPerformance?.elu != null) {
575 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
576 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
577 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
578 }
579 if (
580 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
581 .average &&
582 workerUsage.tasks.executed !== 0
583 ) {
584 workerUsage.elu.idle.average =
585 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
586 workerUsage.elu.active.average =
587 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
588 }
589 if (
590 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
591 .median &&
592 message.taskPerformance?.elu != null
593 ) {
594 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
595 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
596 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
597 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
598 }
599 }
600 }
601
602 /**
603 * Chooses a worker node for the next task.
604 *
605 * The default worker choice strategy uses a round robin algorithm to distribute the load.
606 *
607 * @returns The worker node key
608 */
609 protected chooseWorkerNode (): number {
610 let workerNodeKey: number
611 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
612 const workerCreated = this.createAndSetupWorker()
613 this.registerWorkerMessageListener(workerCreated, message => {
614 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
615 if (
616 isKillBehavior(KillBehaviors.HARD, message.kill) ||
617 (message.kill != null &&
618 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
619 .executing === 0)
620 ) {
621 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
622 this.flushTasksQueue(currentWorkerNodeKey)
623 // FIXME: wait for tasks to be finished
624 void (this.destroyWorker(workerCreated) as Promise<void>)
625 }
626 })
627 workerNodeKey = this.getWorkerNodeKey(workerCreated)
628 } else {
629 workerNodeKey = this.workerChoiceStrategyContext.execute()
630 }
631 return workerNodeKey
632 }
633
634 /**
635 * Sends a message to the given worker.
636 *
637 * @param worker - The worker which should receive the message.
638 * @param message - The message.
639 */
640 protected abstract sendToWorker (
641 worker: Worker,
642 message: MessageValue<Data>
643 ): void
644
645 /**
646 * Registers a listener callback on the given worker.
647 *
648 * @param worker - The worker which should register a listener.
649 * @param listener - The message listener callback.
650 */
651 protected abstract registerWorkerMessageListener<
652 Message extends Data | Response
653 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
654
655 /**
656 * Returns a newly created worker.
657 */
658 protected abstract createWorker (): Worker
659
660 /**
661 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
662 *
663 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
664 *
665 * @param worker - The newly created worker.
666 */
667 protected abstract afterWorkerSetup (worker: Worker): void
668
669 /**
670 * Creates a new worker and sets it up completely in the pool worker nodes.
671 *
672 * @returns New, completely set up worker.
673 */
674 protected createAndSetupWorker (): Worker {
675 const worker = this.createWorker()
676
677 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
678 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
679 worker.on('error', error => {
680 if (this.emitter != null) {
681 this.emitter.emit(PoolEvents.error, error)
682 }
683 })
684 worker.on('error', () => {
685 if (this.opts.restartWorkerOnError === true) {
686 this.createAndSetupWorker()
687 }
688 })
689 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
690 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
691 worker.once('exit', () => {
692 this.removeWorkerNode(worker)
693 })
694
695 this.pushWorkerNode(worker)
696
697 this.setWorkerStatistics(worker)
698
699 this.afterWorkerSetup(worker)
700
701 return worker
702 }
703
704 /**
705 * This function is the listener registered for each worker message.
706 *
707 * @returns The listener function to execute when a message is received from a worker.
708 */
709 protected workerListener (): (message: MessageValue<Response>) => void {
710 return message => {
711 if (message.id != null) {
712 // Task execution response received
713 const promiseResponse = this.promiseResponseMap.get(message.id)
714 if (promiseResponse != null) {
715 if (message.taskError != null) {
716 promiseResponse.reject(message.taskError.message)
717 if (this.emitter != null) {
718 this.emitter.emit(PoolEvents.taskError, message.taskError)
719 }
720 } else {
721 promiseResponse.resolve(message.data as Response)
722 }
723 this.afterTaskExecutionHook(promiseResponse.worker, message)
724 this.promiseResponseMap.delete(message.id)
725 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
726 if (
727 this.opts.enableTasksQueue === true &&
728 this.tasksQueueSize(workerNodeKey) > 0
729 ) {
730 this.executeTask(
731 workerNodeKey,
732 this.dequeueTask(workerNodeKey) as Task<Data>
733 )
734 }
735 }
736 }
737 }
738 }
739
740 private checkAndEmitEvents (): void {
741 if (this.emitter != null) {
742 if (this.busy) {
743 this.emitter?.emit(PoolEvents.busy, this.info)
744 }
745 if (this.type === PoolTypes.dynamic && this.full) {
746 this.emitter?.emit(PoolEvents.full, this.info)
747 }
748 }
749 }
750
751 /**
752 * Sets the given worker node its tasks usage in the pool.
753 *
754 * @param workerNode - The worker node.
755 * @param workerUsage - The worker usage.
756 */
757 private setWorkerNodeTasksUsage (
758 workerNode: WorkerNode<Worker, Data>,
759 workerUsage: WorkerUsage
760 ): void {
761 workerNode.workerUsage = workerUsage
762 }
763
764 /**
765 * Pushes the given worker in the pool worker nodes.
766 *
767 * @param worker - The worker.
768 * @returns The worker nodes length.
769 */
770 private pushWorkerNode (worker: Worker): number {
771 return this.workerNodes.push({
772 worker,
773 workerUsage: this.getWorkerUsage(worker),
774 tasksQueue: new Queue<Task<Data>>()
775 })
776 }
777
778 // /**
779 // * Sets the given worker in the pool worker nodes.
780 // *
781 // * @param workerNodeKey - The worker node key.
782 // * @param worker - The worker.
783 // * @param workerUsage - The worker usage.
784 // * @param tasksQueue - The worker task queue.
785 // */
786 // private setWorkerNode (
787 // workerNodeKey: number,
788 // worker: Worker,
789 // workerUsage: WorkerUsage,
790 // tasksQueue: Queue<Task<Data>>
791 // ): void {
792 // this.workerNodes[workerNodeKey] = {
793 // worker,
794 // workerUsage,
795 // tasksQueue
796 // }
797 // }
798
799 /**
800 * Removes the given worker from the pool worker nodes.
801 *
802 * @param worker - The worker.
803 */
804 private removeWorkerNode (worker: Worker): void {
805 const workerNodeKey = this.getWorkerNodeKey(worker)
806 if (workerNodeKey !== -1) {
807 this.workerNodes.splice(workerNodeKey, 1)
808 this.workerChoiceStrategyContext.remove(workerNodeKey)
809 }
810 }
811
812 private executeTask (workerNodeKey: number, task: Task<Data>): void {
813 this.beforeTaskExecutionHook(workerNodeKey, task)
814 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
815 }
816
817 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
818 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
819 }
820
821 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
822 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
823 }
824
825 private tasksQueueSize (workerNodeKey: number): number {
826 return this.workerNodes[workerNodeKey].tasksQueue.size
827 }
828
829 private flushTasksQueue (workerNodeKey: number): void {
830 if (this.tasksQueueSize(workerNodeKey) > 0) {
831 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
832 this.executeTask(
833 workerNodeKey,
834 this.dequeueTask(workerNodeKey) as Task<Data>
835 )
836 }
837 }
838 }
839
840 private flushTasksQueues (): void {
841 for (const [workerNodeKey] of this.workerNodes.entries()) {
842 this.flushTasksQueue(workerNodeKey)
843 }
844 }
845
846 private setWorkerStatistics (worker: Worker): void {
847 this.sendToWorker(worker, {
848 statistics: {
849 runTime:
850 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
851 .runTime.aggregate,
852 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
853 .elu.aggregate
854 }
855 })
856 }
857
858 private getWorkerUsage (worker: Worker): WorkerUsage {
859 return {
860 tasks: this.getTaskStatistics(worker),
861 runTime: {
862 aggregate: 0,
863 average: 0,
864 median: 0,
865 history: new CircularArray()
866 },
867 waitTime: {
868 aggregate: 0,
869 average: 0,
870 median: 0,
871 history: new CircularArray()
872 },
873 elu: {
874 idle: {
875 aggregate: 0,
876 average: 0,
877 median: 0,
878 history: new CircularArray()
879 },
880 active: {
881 aggregate: 0,
882 average: 0,
883 median: 0,
884 history: new CircularArray()
885 },
886 utilization: 0
887 }
888 }
889 }
890
891 private getTaskStatistics (worker: Worker): TaskStatistics {
892 const queueSize =
893 this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
894 return {
895 executed: 0,
896 executing: 0,
897 get queued (): number {
898 return queueSize ?? 0
899 },
900 failed: 0
901 }
902 }
903 }