test: improve pool options coverage
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type {
25 IWorker,
26 Task,
27 TaskStatistics,
28 WorkerNode,
29 WorkerUsage
30 } from './worker'
31 import {
32 Measurements,
33 WorkerChoiceStrategies,
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
36 } from './selection-strategies/selection-strategies-types'
37 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
38
39 /**
40 * Base class that implements some shared logic for all poolifier pools.
41 *
42 * @typeParam Worker - Type of worker which manages this pool.
43 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
44 * @typeParam Response - Type of execution response. This can only be serializable data.
45 */
46 export abstract class AbstractPool<
47 Worker extends IWorker,
48 Data = unknown,
49 Response = unknown
50 > implements IPool<Worker, Data, Response> {
51 /** @inheritDoc */
52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
53
54 /** @inheritDoc */
55 public readonly emitter?: PoolEmitter
56
57 /**
58 * The execution response promise map.
59 *
60 * - `key`: The message id of each submitted task.
61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
62 *
63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
64 */
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
69
70 /**
71 * Worker choice strategy context referencing a worker choice algorithm implementation.
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
74 Worker,
75 Data,
76 Response
77 >
78
79 /**
80 * Constructs a new poolifier pool.
81 *
82 * @param numberOfWorkers - Number of workers that this pool should manage.
83 * @param filePath - Path to the worker file.
84 * @param opts - Options for the pool.
85 */
86 public constructor (
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
90 ) {
91 if (!this.isMain()) {
92 throw new Error('Cannot start a pool from a worker!')
93 }
94 this.checkNumberOfWorkers(this.numberOfWorkers)
95 this.checkFilePath(this.filePath)
96 this.checkPoolOptions(this.opts)
97
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
121 }
122
123 private checkFilePath (filePath: string): void {
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
138 throw new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 } else if (numberOfWorkers < 0) {
142 throw new RangeError(
143 'Cannot instantiate a pool with a negative number of workers'
144 )
145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
174 }
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
181 throw new Error(
182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
183 )
184 }
185 }
186
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
195 if (
196 workerChoiceStrategyOptions.weights != null &&
197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
213 }
214
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
233 throw new Error(
234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
235 )
236 }
237 }
238
239 /** @inheritDoc */
240 public get info (): PoolInfo {
241 return {
242 type: this.type,
243 worker: this.worker,
244 minSize: this.minSize,
245 maxSize: this.maxSize,
246 workerNodes: this.workerNodes.length,
247 idleWorkerNodes: this.workerNodes.reduce(
248 (accumulator, workerNode) =>
249 workerNode.workerUsage.tasks.executing === 0
250 ? accumulator + 1
251 : accumulator,
252 0
253 ),
254 busyWorkerNodes: this.workerNodes.reduce(
255 (accumulator, workerNode) =>
256 workerNode.workerUsage.tasks.executing > 0
257 ? accumulator + 1
258 : accumulator,
259 0
260 ),
261 executedTasks: this.workerNodes.reduce(
262 (accumulator, workerNode) =>
263 accumulator + workerNode.workerUsage.tasks.executed,
264 0
265 ),
266 executingTasks: this.workerNodes.reduce(
267 (accumulator, workerNode) =>
268 accumulator + workerNode.workerUsage.tasks.executing,
269 0
270 ),
271 queuedTasks: this.workerNodes.reduce(
272 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
273 0
274 ),
275 maxQueuedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.tasksQueue.maxSize,
278 0
279 ),
280 failedTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 accumulator + workerNode.workerUsage.tasks.failed,
283 0
284 )
285 }
286 }
287
288 /**
289 * Pool type.
290 *
291 * If it is `'dynamic'`, it provides the `max` property.
292 */
293 protected abstract get type (): PoolType
294
295 /**
296 * Gets the worker type.
297 */
298 protected abstract get worker (): WorkerType
299
300 /**
301 * Pool minimum size.
302 */
303 protected abstract get minSize (): number
304
305 /**
306 * Pool maximum size.
307 */
308 protected abstract get maxSize (): number
309
310 /**
311 * Gets the given worker its worker node key.
312 *
313 * @param worker - The worker.
314 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
315 */
316 private getWorkerNodeKey (worker: Worker): number {
317 return this.workerNodes.findIndex(
318 workerNode => workerNode.worker === worker
319 )
320 }
321
322 /** @inheritDoc */
323 public setWorkerChoiceStrategy (
324 workerChoiceStrategy: WorkerChoiceStrategy,
325 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
326 ): void {
327 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
328 this.opts.workerChoiceStrategy = workerChoiceStrategy
329 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
330 this.opts.workerChoiceStrategy
331 )
332 if (workerChoiceStrategyOptions != null) {
333 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
334 }
335 for (const workerNode of this.workerNodes) {
336 this.setWorkerNodeTasksUsage(
337 workerNode,
338 this.getWorkerUsage(workerNode.worker)
339 )
340 this.setWorkerStatistics(workerNode.worker)
341 }
342 }
343
344 /** @inheritDoc */
345 public setWorkerChoiceStrategyOptions (
346 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
347 ): void {
348 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
349 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
350 this.workerChoiceStrategyContext.setOptions(
351 this.opts.workerChoiceStrategyOptions
352 )
353 }
354
355 /** @inheritDoc */
356 public enableTasksQueue (
357 enable: boolean,
358 tasksQueueOptions?: TasksQueueOptions
359 ): void {
360 if (this.opts.enableTasksQueue === true && !enable) {
361 this.flushTasksQueues()
362 }
363 this.opts.enableTasksQueue = enable
364 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
365 }
366
367 /** @inheritDoc */
368 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
369 if (this.opts.enableTasksQueue === true) {
370 this.checkValidTasksQueueOptions(tasksQueueOptions)
371 this.opts.tasksQueueOptions =
372 this.buildTasksQueueOptions(tasksQueueOptions)
373 } else if (this.opts.tasksQueueOptions != null) {
374 delete this.opts.tasksQueueOptions
375 }
376 }
377
378 private buildTasksQueueOptions (
379 tasksQueueOptions: TasksQueueOptions
380 ): TasksQueueOptions {
381 return {
382 concurrency: tasksQueueOptions?.concurrency ?? 1
383 }
384 }
385
386 /**
387 * Whether the pool is full or not.
388 *
389 * The pool filling boolean status.
390 */
391 protected get full (): boolean {
392 return this.workerNodes.length >= this.maxSize
393 }
394
395 /**
396 * Whether the pool is busy or not.
397 *
398 * The pool busyness boolean status.
399 */
400 protected abstract get busy (): boolean
401
402 /**
403 * Whether worker nodes are executing at least one task.
404 *
405 * @returns Worker nodes busyness boolean status.
406 */
407 protected internalBusy (): boolean {
408 return (
409 this.workerNodes.findIndex(workerNode => {
410 return workerNode.workerUsage.tasks.executing === 0
411 }) === -1
412 )
413 }
414
415 /** @inheritDoc */
416 public async execute (data?: Data, name?: string): Promise<Response> {
417 const timestamp = performance.now()
418 const workerNodeKey = this.chooseWorkerNode()
419 const submittedTask: Task<Data> = {
420 name,
421 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
422 data: data ?? ({} as Data),
423 timestamp,
424 id: crypto.randomUUID()
425 }
426 const res = new Promise<Response>((resolve, reject) => {
427 this.promiseResponseMap.set(submittedTask.id as string, {
428 resolve,
429 reject,
430 worker: this.workerNodes[workerNodeKey].worker
431 })
432 })
433 if (
434 this.opts.enableTasksQueue === true &&
435 (this.busy ||
436 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
437 ((this.opts.tasksQueueOptions as TasksQueueOptions)
438 .concurrency as number))
439 ) {
440 this.enqueueTask(workerNodeKey, submittedTask)
441 } else {
442 this.executeTask(workerNodeKey, submittedTask)
443 }
444 this.checkAndEmitEvents()
445 // eslint-disable-next-line @typescript-eslint/return-await
446 return res
447 }
448
449 /** @inheritDoc */
450 public async destroy (): Promise<void> {
451 await Promise.all(
452 this.workerNodes.map(async (workerNode, workerNodeKey) => {
453 this.flushTasksQueue(workerNodeKey)
454 // FIXME: wait for tasks to be finished
455 await this.destroyWorker(workerNode.worker)
456 })
457 )
458 }
459
460 /**
461 * Terminates the given worker.
462 *
463 * @param worker - A worker within `workerNodes`.
464 */
465 protected abstract destroyWorker (worker: Worker): void | Promise<void>
466
467 /**
468 * Setup hook to execute code before worker node are created in the abstract constructor.
469 * Can be overridden
470 *
471 * @virtual
472 */
473 protected setupHook (): void {
474 // Intentionally empty
475 }
476
477 /**
478 * Should return whether the worker is the main worker or not.
479 */
480 protected abstract isMain (): boolean
481
482 /**
483 * Hook executed before the worker task execution.
484 * Can be overridden.
485 *
486 * @param workerNodeKey - The worker node key.
487 * @param task - The task to execute.
488 */
489 protected beforeTaskExecutionHook (
490 workerNodeKey: number,
491 task: Task<Data>
492 ): void {
493 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
494 ++workerUsage.tasks.executing
495 this.updateWaitTimeWorkerUsage(workerUsage, task)
496 }
497
498 /**
499 * Hook executed after the worker task execution.
500 * Can be overridden.
501 *
502 * @param worker - The worker.
503 * @param message - The received message.
504 */
505 protected afterTaskExecutionHook (
506 worker: Worker,
507 message: MessageValue<Response>
508 ): void {
509 const workerUsage =
510 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
511 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
512 this.updateRunTimeWorkerUsage(workerUsage, message)
513 this.updateEluWorkerUsage(workerUsage, message)
514 }
515
516 private updateTaskStatisticsWorkerUsage (
517 workerUsage: WorkerUsage,
518 message: MessageValue<Response>
519 ): void {
520 const workerTaskStatistics = workerUsage.tasks
521 --workerTaskStatistics.executing
522 ++workerTaskStatistics.executed
523 if (message.taskError != null) {
524 ++workerTaskStatistics.failed
525 }
526 }
527
528 private updateRunTimeWorkerUsage (
529 workerUsage: WorkerUsage,
530 message: MessageValue<Response>
531 ): void {
532 if (
533 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
534 .aggregate
535 ) {
536 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
537 if (
538 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
539 .average &&
540 workerUsage.tasks.executed !== 0
541 ) {
542 workerUsage.runTime.average =
543 workerUsage.runTime.aggregate /
544 (workerUsage.tasks.executed - workerUsage.tasks.failed)
545 }
546 if (
547 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
548 .median &&
549 message.taskPerformance?.runTime != null
550 ) {
551 workerUsage.runTime.history.push(message.taskPerformance.runTime)
552 workerUsage.runTime.median = median(workerUsage.runTime.history)
553 }
554 }
555 }
556
557 private updateWaitTimeWorkerUsage (
558 workerUsage: WorkerUsage,
559 task: Task<Data>
560 ): void {
561 const timestamp = performance.now()
562 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
563 if (
564 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
565 .aggregate
566 ) {
567 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
568 if (
569 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
570 .waitTime.average &&
571 workerUsage.tasks.executed !== 0
572 ) {
573 workerUsage.waitTime.average =
574 workerUsage.waitTime.aggregate /
575 (workerUsage.tasks.executed - workerUsage.tasks.failed)
576 }
577 if (
578 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
579 .waitTime.median &&
580 taskWaitTime != null
581 ) {
582 workerUsage.waitTime.history.push(taskWaitTime)
583 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
584 }
585 }
586 }
587
588 private updateEluWorkerUsage (
589 workerUsage: WorkerUsage,
590 message: MessageValue<Response>
591 ): void {
592 if (
593 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
594 .aggregate
595 ) {
596 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
597 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
598 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
599 workerUsage.elu.utilization =
600 (workerUsage.elu.utilization +
601 message.taskPerformance.elu.utilization) /
602 2
603 } else if (message.taskPerformance?.elu != null) {
604 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
605 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
606 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
607 }
608 if (
609 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
610 .average &&
611 workerUsage.tasks.executed !== 0
612 ) {
613 const executedTasks =
614 workerUsage.tasks.executed - workerUsage.tasks.failed
615 workerUsage.elu.idle.average =
616 workerUsage.elu.idle.aggregate / executedTasks
617 workerUsage.elu.active.average =
618 workerUsage.elu.active.aggregate / executedTasks
619 }
620 if (
621 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
622 .median &&
623 message.taskPerformance?.elu != null
624 ) {
625 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
626 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
627 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
628 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
629 }
630 }
631 }
632
633 /**
634 * Chooses a worker node for the next task.
635 *
636 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
637 *
638 * @returns The worker node key
639 */
640 private chooseWorkerNode (): number {
641 if (this.shallCreateDynamicWorker()) {
642 const worker = this.createAndSetupDynamicWorker()
643 if (
644 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
645 ) {
646 return this.getWorkerNodeKey(worker)
647 }
648 }
649 return this.workerChoiceStrategyContext.execute()
650 }
651
652 /**
653 * Conditions for dynamic worker creation.
654 *
655 * @returns Whether to create a dynamic worker or not.
656 */
657 private shallCreateDynamicWorker (): boolean {
658 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
659 }
660
661 /**
662 * Sends a message to the given worker.
663 *
664 * @param worker - The worker which should receive the message.
665 * @param message - The message.
666 */
667 protected abstract sendToWorker (
668 worker: Worker,
669 message: MessageValue<Data>
670 ): void
671
672 /**
673 * Registers a listener callback on the given worker.
674 *
675 * @param worker - The worker which should register a listener.
676 * @param listener - The message listener callback.
677 */
678 protected abstract registerWorkerMessageListener<
679 Message extends Data | Response
680 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
681
682 /**
683 * Creates a new worker.
684 *
685 * @returns Newly created worker.
686 */
687 protected abstract createWorker (): Worker
688
689 /**
690 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
691 *
692 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
693 *
694 * @param worker - The newly created worker.
695 */
696 protected abstract afterWorkerSetup (worker: Worker): void
697
698 /**
699 * Creates a new worker and sets it up completely in the pool worker nodes.
700 *
701 * @returns New, completely set up worker.
702 */
703 protected createAndSetupWorker (): Worker {
704 const worker = this.createWorker()
705
706 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
707 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
708 worker.on('error', error => {
709 if (this.emitter != null) {
710 this.emitter.emit(PoolEvents.error, error)
711 }
712 if (this.opts.restartWorkerOnError === true) {
713 this.createAndSetupWorker()
714 }
715 })
716 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
717 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
718 worker.once('exit', () => {
719 this.removeWorkerNode(worker)
720 })
721
722 this.pushWorkerNode(worker)
723
724 this.setWorkerStatistics(worker)
725
726 this.afterWorkerSetup(worker)
727
728 return worker
729 }
730
731 /**
732 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
733 *
734 * @returns New, completely set up dynamic worker.
735 */
736 protected createAndSetupDynamicWorker (): Worker {
737 const worker = this.createAndSetupWorker()
738 this.registerWorkerMessageListener(worker, message => {
739 const workerNodeKey = this.getWorkerNodeKey(worker)
740 if (
741 isKillBehavior(KillBehaviors.HARD, message.kill) ||
742 (message.kill != null &&
743 ((this.opts.enableTasksQueue === false &&
744 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
745 0) ||
746 (this.opts.enableTasksQueue === true &&
747 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
748 0 &&
749 this.tasksQueueSize(workerNodeKey) === 0)))
750 ) {
751 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
752 void (this.destroyWorker(worker) as Promise<void>)
753 }
754 })
755 return worker
756 }
757
758 /**
759 * This function is the listener registered for each worker message.
760 *
761 * @returns The listener function to execute when a message is received from a worker.
762 */
763 protected workerListener (): (message: MessageValue<Response>) => void {
764 return message => {
765 if (message.id != null) {
766 // Task execution response received
767 const promiseResponse = this.promiseResponseMap.get(message.id)
768 if (promiseResponse != null) {
769 if (message.taskError != null) {
770 promiseResponse.reject(message.taskError.message)
771 if (this.emitter != null) {
772 this.emitter.emit(PoolEvents.taskError, message.taskError)
773 }
774 } else {
775 promiseResponse.resolve(message.data as Response)
776 }
777 this.afterTaskExecutionHook(promiseResponse.worker, message)
778 this.promiseResponseMap.delete(message.id)
779 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
780 if (
781 this.opts.enableTasksQueue === true &&
782 this.tasksQueueSize(workerNodeKey) > 0
783 ) {
784 this.executeTask(
785 workerNodeKey,
786 this.dequeueTask(workerNodeKey) as Task<Data>
787 )
788 }
789 this.workerChoiceStrategyContext.update(workerNodeKey)
790 }
791 }
792 }
793 }
794
795 private checkAndEmitEvents (): void {
796 if (this.emitter != null) {
797 if (this.busy) {
798 this.emitter?.emit(PoolEvents.busy, this.info)
799 }
800 if (this.type === PoolTypes.dynamic && this.full) {
801 this.emitter?.emit(PoolEvents.full, this.info)
802 }
803 }
804 }
805
806 /**
807 * Sets the given worker node its tasks usage in the pool.
808 *
809 * @param workerNode - The worker node.
810 * @param workerUsage - The worker usage.
811 */
812 private setWorkerNodeTasksUsage (
813 workerNode: WorkerNode<Worker, Data>,
814 workerUsage: WorkerUsage
815 ): void {
816 workerNode.workerUsage = workerUsage
817 }
818
819 /**
820 * Pushes the given worker in the pool worker nodes.
821 *
822 * @param worker - The worker.
823 * @returns The worker nodes length.
824 */
825 private pushWorkerNode (worker: Worker): number {
826 return this.workerNodes.push({
827 worker,
828 workerUsage: this.getWorkerUsage(worker),
829 tasksQueue: new Queue<Task<Data>>()
830 })
831 }
832
833 // /**
834 // * Sets the given worker in the pool worker nodes.
835 // *
836 // * @param workerNodeKey - The worker node key.
837 // * @param worker - The worker.
838 // * @param workerUsage - The worker usage.
839 // * @param tasksQueue - The worker task queue.
840 // */
841 // private setWorkerNode (
842 // workerNodeKey: number,
843 // worker: Worker,
844 // workerUsage: WorkerUsage,
845 // tasksQueue: Queue<Task<Data>>
846 // ): void {
847 // this.workerNodes[workerNodeKey] = {
848 // worker,
849 // workerUsage,
850 // tasksQueue
851 // }
852 // }
853
854 /**
855 * Removes the given worker from the pool worker nodes.
856 *
857 * @param worker - The worker.
858 */
859 private removeWorkerNode (worker: Worker): void {
860 const workerNodeKey = this.getWorkerNodeKey(worker)
861 if (workerNodeKey !== -1) {
862 this.workerNodes.splice(workerNodeKey, 1)
863 this.workerChoiceStrategyContext.remove(workerNodeKey)
864 }
865 }
866
867 private executeTask (workerNodeKey: number, task: Task<Data>): void {
868 this.beforeTaskExecutionHook(workerNodeKey, task)
869 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
870 }
871
872 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
873 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
874 }
875
876 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
877 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
878 }
879
880 private tasksQueueSize (workerNodeKey: number): number {
881 return this.workerNodes[workerNodeKey].tasksQueue.size
882 }
883
884 private flushTasksQueue (workerNodeKey: number): void {
885 if (this.tasksQueueSize(workerNodeKey) > 0) {
886 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
887 this.executeTask(
888 workerNodeKey,
889 this.dequeueTask(workerNodeKey) as Task<Data>
890 )
891 }
892 }
893 }
894
895 private flushTasksQueues (): void {
896 for (const [workerNodeKey] of this.workerNodes.entries()) {
897 this.flushTasksQueue(workerNodeKey)
898 }
899 }
900
901 private setWorkerStatistics (worker: Worker): void {
902 this.sendToWorker(worker, {
903 statistics: {
904 runTime:
905 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
906 .runTime.aggregate,
907 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
908 .elu.aggregate
909 }
910 })
911 }
912
913 private getWorkerUsage (worker: Worker): WorkerUsage {
914 return {
915 tasks: this.getTaskStatistics(worker),
916 runTime: {
917 aggregate: 0,
918 average: 0,
919 median: 0,
920 history: new CircularArray()
921 },
922 waitTime: {
923 aggregate: 0,
924 average: 0,
925 median: 0,
926 history: new CircularArray()
927 },
928 elu: {
929 idle: {
930 aggregate: 0,
931 average: 0,
932 median: 0,
933 history: new CircularArray()
934 },
935 active: {
936 aggregate: 0,
937 average: 0,
938 median: 0,
939 history: new CircularArray()
940 },
941 utilization: 0
942 }
943 }
944 }
945
946 private getTaskStatistics (worker: Worker): TaskStatistics {
947 const queueSize =
948 this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
949 return {
950 executed: 0,
951 executing: 0,
952 get queued (): number {
953 return queueSize ?? 0
954 },
955 failed: 0
956 }
957 }
958 }