fix: fix tasks queued count computation
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
25 import {
26 Measurements,
27 WorkerChoiceStrategies,
28 type WorkerChoiceStrategy,
29 type WorkerChoiceStrategyOptions
30 } from './selection-strategies/selection-strategies-types'
31 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
32
33 /**
34 * Base class that implements some shared logic for all poolifier pools.
35 *
36 * @typeParam Worker - Type of worker which manages this pool.
37 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
38 * @typeParam Response - Type of execution response. This can only be serializable data.
39 */
40 export abstract class AbstractPool<
41 Worker extends IWorker,
42 Data = unknown,
43 Response = unknown
44 > implements IPool<Worker, Data, Response> {
45 /** @inheritDoc */
46 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
47
48 /** @inheritDoc */
49 public readonly emitter?: PoolEmitter
50
51 /**
52 * The execution response promise map.
53 *
54 * - `key`: The message id of each submitted task.
55 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
56 *
57 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
58 */
59 protected promiseResponseMap: Map<
60 string,
61 PromiseResponseWrapper<Worker, Response>
62 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
63
64 /**
65 * Worker choice strategy context referencing a worker choice algorithm implementation.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
73 /**
74 * Constructs a new poolifier pool.
75 *
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
79 */
80 public constructor (
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
84 ) {
85 if (!this.isMain()) {
86 throw new Error('Cannot start a pool from a worker!')
87 }
88 this.checkNumberOfWorkers(this.numberOfWorkers)
89 this.checkFilePath(this.filePath)
90 this.checkPoolOptions(this.opts)
91
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
96
97 if (this.opts.enableEvents === true) {
98 this.emitter = new PoolEmitter()
99 }
100 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
101 Worker,
102 Data,
103 Response
104 >(
105 this,
106 this.opts.workerChoiceStrategy,
107 this.opts.workerChoiceStrategyOptions
108 )
109
110 this.setupHook()
111
112 for (let i = 1; i <= this.numberOfWorkers; i++) {
113 this.createAndSetupWorker()
114 }
115 }
116
117 private checkFilePath (filePath: string): void {
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
132 throw new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
168 }
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
175 throw new Error(
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
177 )
178 }
179 }
180
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
189 if (
190 workerChoiceStrategyOptions.weights != null &&
191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
197 if (
198 workerChoiceStrategyOptions.measurement != null &&
199 !Object.values(Measurements).includes(
200 workerChoiceStrategyOptions.measurement
201 )
202 ) {
203 throw new Error(
204 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
205 )
206 }
207 }
208
209 private checkValidTasksQueueOptions (
210 tasksQueueOptions: TasksQueueOptions
211 ): void {
212 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
213 throw new TypeError('Invalid tasks queue options: must be a plain object')
214 }
215 if (
216 tasksQueueOptions?.concurrency != null &&
217 !Number.isSafeInteger(tasksQueueOptions.concurrency)
218 ) {
219 throw new TypeError(
220 'Invalid worker tasks concurrency: must be an integer'
221 )
222 }
223 if (
224 tasksQueueOptions?.concurrency != null &&
225 tasksQueueOptions.concurrency <= 0
226 ) {
227 throw new Error(
228 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
229 )
230 }
231 }
232
233 /** @inheritDoc */
234 public get info (): PoolInfo {
235 return {
236 type: this.type,
237 worker: this.worker,
238 minSize: this.minSize,
239 maxSize: this.maxSize,
240 workerNodes: this.workerNodes.length,
241 idleWorkerNodes: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
243 workerNode.workerUsage.tasks.executing === 0
244 ? accumulator + 1
245 : accumulator,
246 0
247 ),
248 busyWorkerNodes: this.workerNodes.reduce(
249 (accumulator, workerNode) =>
250 workerNode.workerUsage.tasks.executing > 0
251 ? accumulator + 1
252 : accumulator,
253 0
254 ),
255 executedTasks: this.workerNodes.reduce(
256 (accumulator, workerNode) =>
257 accumulator + workerNode.workerUsage.tasks.executed,
258 0
259 ),
260 executingTasks: this.workerNodes.reduce(
261 (accumulator, workerNode) =>
262 accumulator + workerNode.workerUsage.tasks.executing,
263 0
264 ),
265 queuedTasks: this.workerNodes.reduce(
266 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
267 0
268 ),
269 maxQueuedTasks: this.workerNodes.reduce(
270 (accumulator, workerNode) =>
271 accumulator + workerNode.tasksQueue.maxSize,
272 0
273 ),
274 failedTasks: this.workerNodes.reduce(
275 (accumulator, workerNode) =>
276 accumulator + workerNode.workerUsage.tasks.failed,
277 0
278 )
279 }
280 }
281
282 /**
283 * Pool type.
284 *
285 * If it is `'dynamic'`, it provides the `max` property.
286 */
287 protected abstract get type (): PoolType
288
289 /**
290 * Gets the worker type.
291 */
292 protected abstract get worker (): WorkerType
293
294 /**
295 * Pool minimum size.
296 */
297 protected abstract get minSize (): number
298
299 /**
300 * Pool maximum size.
301 */
302 protected abstract get maxSize (): number
303
304 /**
305 * Gets the given worker its worker node key.
306 *
307 * @param worker - The worker.
308 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
309 */
310 private getWorkerNodeKey (worker: Worker): number {
311 return this.workerNodes.findIndex(
312 workerNode => workerNode.worker === worker
313 )
314 }
315
316 /** @inheritDoc */
317 public setWorkerChoiceStrategy (
318 workerChoiceStrategy: WorkerChoiceStrategy,
319 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
320 ): void {
321 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
322 this.opts.workerChoiceStrategy = workerChoiceStrategy
323 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
324 this.opts.workerChoiceStrategy
325 )
326 if (workerChoiceStrategyOptions != null) {
327 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
328 }
329 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
330 this.setWorkerNodeTasksUsage(
331 workerNode,
332 this.getWorkerUsage(workerNodeKey)
333 )
334 this.setWorkerStatistics(workerNode.worker)
335 }
336 }
337
338 /** @inheritDoc */
339 public setWorkerChoiceStrategyOptions (
340 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
341 ): void {
342 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
343 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
344 this.workerChoiceStrategyContext.setOptions(
345 this.opts.workerChoiceStrategyOptions
346 )
347 }
348
349 /** @inheritDoc */
350 public enableTasksQueue (
351 enable: boolean,
352 tasksQueueOptions?: TasksQueueOptions
353 ): void {
354 if (this.opts.enableTasksQueue === true && !enable) {
355 this.flushTasksQueues()
356 }
357 this.opts.enableTasksQueue = enable
358 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
359 }
360
361 /** @inheritDoc */
362 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
363 if (this.opts.enableTasksQueue === true) {
364 this.checkValidTasksQueueOptions(tasksQueueOptions)
365 this.opts.tasksQueueOptions =
366 this.buildTasksQueueOptions(tasksQueueOptions)
367 } else if (this.opts.tasksQueueOptions != null) {
368 delete this.opts.tasksQueueOptions
369 }
370 }
371
372 private buildTasksQueueOptions (
373 tasksQueueOptions: TasksQueueOptions
374 ): TasksQueueOptions {
375 return {
376 concurrency: tasksQueueOptions?.concurrency ?? 1
377 }
378 }
379
380 /**
381 * Whether the pool is full or not.
382 *
383 * The pool filling boolean status.
384 */
385 protected get full (): boolean {
386 return this.workerNodes.length >= this.maxSize
387 }
388
389 /**
390 * Whether the pool is busy or not.
391 *
392 * The pool busyness boolean status.
393 */
394 protected abstract get busy (): boolean
395
396 /**
397 * Whether worker nodes are executing at least one task.
398 *
399 * @returns Worker nodes busyness boolean status.
400 */
401 protected internalBusy (): boolean {
402 return (
403 this.workerNodes.findIndex(workerNode => {
404 return workerNode.workerUsage.tasks.executing === 0
405 }) === -1
406 )
407 }
408
409 /** @inheritDoc */
410 public async execute (data?: Data, name?: string): Promise<Response> {
411 const timestamp = performance.now()
412 const workerNodeKey = this.chooseWorkerNode()
413 const submittedTask: Task<Data> = {
414 name,
415 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
416 data: data ?? ({} as Data),
417 timestamp,
418 id: crypto.randomUUID()
419 }
420 const res = new Promise<Response>((resolve, reject) => {
421 this.promiseResponseMap.set(submittedTask.id as string, {
422 resolve,
423 reject,
424 worker: this.workerNodes[workerNodeKey].worker
425 })
426 })
427 if (
428 this.opts.enableTasksQueue === true &&
429 (this.busy ||
430 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
431 ((this.opts.tasksQueueOptions as TasksQueueOptions)
432 .concurrency as number))
433 ) {
434 this.enqueueTask(workerNodeKey, submittedTask)
435 } else {
436 this.executeTask(workerNodeKey, submittedTask)
437 }
438 this.checkAndEmitEvents()
439 // eslint-disable-next-line @typescript-eslint/return-await
440 return res
441 }
442
443 /** @inheritDoc */
444 public async destroy (): Promise<void> {
445 await Promise.all(
446 this.workerNodes.map(async (workerNode, workerNodeKey) => {
447 this.flushTasksQueue(workerNodeKey)
448 // FIXME: wait for tasks to be finished
449 await this.destroyWorker(workerNode.worker)
450 })
451 )
452 }
453
454 /**
455 * Terminates the given worker.
456 *
457 * @param worker - A worker within `workerNodes`.
458 */
459 protected abstract destroyWorker (worker: Worker): void | Promise<void>
460
461 /**
462 * Setup hook to execute code before worker node are created in the abstract constructor.
463 * Can be overridden
464 *
465 * @virtual
466 */
467 protected setupHook (): void {
468 // Intentionally empty
469 }
470
471 /**
472 * Should return whether the worker is the main worker or not.
473 */
474 protected abstract isMain (): boolean
475
476 /**
477 * Hook executed before the worker task execution.
478 * Can be overridden.
479 *
480 * @param workerNodeKey - The worker node key.
481 * @param task - The task to execute.
482 */
483 protected beforeTaskExecutionHook (
484 workerNodeKey: number,
485 task: Task<Data>
486 ): void {
487 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
488 ++workerUsage.tasks.executing
489 this.updateWaitTimeWorkerUsage(workerUsage, task)
490 }
491
492 /**
493 * Hook executed after the worker task execution.
494 * Can be overridden.
495 *
496 * @param worker - The worker.
497 * @param message - The received message.
498 */
499 protected afterTaskExecutionHook (
500 worker: Worker,
501 message: MessageValue<Response>
502 ): void {
503 const workerUsage =
504 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
505 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
506 this.updateRunTimeWorkerUsage(workerUsage, message)
507 this.updateEluWorkerUsage(workerUsage, message)
508 }
509
510 private updateTaskStatisticsWorkerUsage (
511 workerUsage: WorkerUsage,
512 message: MessageValue<Response>
513 ): void {
514 const workerTaskStatistics = workerUsage.tasks
515 --workerTaskStatistics.executing
516 ++workerTaskStatistics.executed
517 if (message.taskError != null) {
518 ++workerTaskStatistics.failed
519 }
520 }
521
522 private updateRunTimeWorkerUsage (
523 workerUsage: WorkerUsage,
524 message: MessageValue<Response>
525 ): void {
526 if (
527 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
528 .aggregate
529 ) {
530 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
531 if (
532 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
533 .average &&
534 workerUsage.tasks.executed !== 0
535 ) {
536 workerUsage.runTime.average =
537 workerUsage.runTime.aggregate /
538 (workerUsage.tasks.executed - workerUsage.tasks.failed)
539 }
540 if (
541 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
542 .median &&
543 message.taskPerformance?.runTime != null
544 ) {
545 workerUsage.runTime.history.push(message.taskPerformance.runTime)
546 workerUsage.runTime.median = median(workerUsage.runTime.history)
547 }
548 }
549 }
550
551 private updateWaitTimeWorkerUsage (
552 workerUsage: WorkerUsage,
553 task: Task<Data>
554 ): void {
555 const timestamp = performance.now()
556 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
557 if (
558 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
559 .aggregate
560 ) {
561 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
562 if (
563 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
564 .waitTime.average &&
565 workerUsage.tasks.executed !== 0
566 ) {
567 workerUsage.waitTime.average =
568 workerUsage.waitTime.aggregate /
569 (workerUsage.tasks.executed - workerUsage.tasks.failed)
570 }
571 if (
572 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
573 .waitTime.median &&
574 taskWaitTime != null
575 ) {
576 workerUsage.waitTime.history.push(taskWaitTime)
577 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
578 }
579 }
580 }
581
582 private updateEluWorkerUsage (
583 workerUsage: WorkerUsage,
584 message: MessageValue<Response>
585 ): void {
586 if (
587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
588 .aggregate
589 ) {
590 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
591 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
592 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
593 workerUsage.elu.utilization =
594 (workerUsage.elu.utilization +
595 message.taskPerformance.elu.utilization) /
596 2
597 } else if (message.taskPerformance?.elu != null) {
598 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
599 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
600 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
601 }
602 if (
603 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
604 .average &&
605 workerUsage.tasks.executed !== 0
606 ) {
607 const executedTasks =
608 workerUsage.tasks.executed - workerUsage.tasks.failed
609 workerUsage.elu.idle.average =
610 workerUsage.elu.idle.aggregate / executedTasks
611 workerUsage.elu.active.average =
612 workerUsage.elu.active.aggregate / executedTasks
613 }
614 if (
615 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
616 .median &&
617 message.taskPerformance?.elu != null
618 ) {
619 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
620 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
621 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
622 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
623 }
624 }
625 }
626
627 /**
628 * Chooses a worker node for the next task.
629 *
630 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
631 *
632 * @returns The worker node key
633 */
634 private chooseWorkerNode (): number {
635 if (this.shallCreateDynamicWorker()) {
636 const worker = this.createAndSetupDynamicWorker()
637 if (
638 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
639 ) {
640 return this.getWorkerNodeKey(worker)
641 }
642 }
643 return this.workerChoiceStrategyContext.execute()
644 }
645
646 /**
647 * Conditions for dynamic worker creation.
648 *
649 * @returns Whether to create a dynamic worker or not.
650 */
651 private shallCreateDynamicWorker (): boolean {
652 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
653 }
654
655 /**
656 * Sends a message to the given worker.
657 *
658 * @param worker - The worker which should receive the message.
659 * @param message - The message.
660 */
661 protected abstract sendToWorker (
662 worker: Worker,
663 message: MessageValue<Data>
664 ): void
665
666 /**
667 * Registers a listener callback on the given worker.
668 *
669 * @param worker - The worker which should register a listener.
670 * @param listener - The message listener callback.
671 */
672 protected abstract registerWorkerMessageListener<
673 Message extends Data | Response
674 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
675
676 /**
677 * Creates a new worker.
678 *
679 * @returns Newly created worker.
680 */
681 protected abstract createWorker (): Worker
682
683 /**
684 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
685 *
686 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
687 *
688 * @param worker - The newly created worker.
689 */
690 protected abstract afterWorkerSetup (worker: Worker): void
691
692 /**
693 * Creates a new worker and sets it up completely in the pool worker nodes.
694 *
695 * @returns New, completely set up worker.
696 */
697 protected createAndSetupWorker (): Worker {
698 const worker = this.createWorker()
699
700 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
701 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
702 worker.on('error', error => {
703 if (this.emitter != null) {
704 this.emitter.emit(PoolEvents.error, error)
705 }
706 if (this.opts.restartWorkerOnError === true) {
707 this.createAndSetupWorker()
708 }
709 })
710 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
711 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
712 worker.once('exit', () => {
713 this.removeWorkerNode(worker)
714 })
715
716 this.pushWorkerNode(worker)
717
718 this.setWorkerStatistics(worker)
719
720 this.afterWorkerSetup(worker)
721
722 return worker
723 }
724
725 /**
726 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
727 *
728 * @returns New, completely set up dynamic worker.
729 */
730 protected createAndSetupDynamicWorker (): Worker {
731 const worker = this.createAndSetupWorker()
732 this.registerWorkerMessageListener(worker, message => {
733 const workerNodeKey = this.getWorkerNodeKey(worker)
734 if (
735 isKillBehavior(KillBehaviors.HARD, message.kill) ||
736 (message.kill != null &&
737 ((this.opts.enableTasksQueue === false &&
738 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
739 0) ||
740 (this.opts.enableTasksQueue === true &&
741 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
742 0 &&
743 this.tasksQueueSize(workerNodeKey) === 0)))
744 ) {
745 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
746 void (this.destroyWorker(worker) as Promise<void>)
747 }
748 })
749 return worker
750 }
751
752 /**
753 * This function is the listener registered for each worker message.
754 *
755 * @returns The listener function to execute when a message is received from a worker.
756 */
757 protected workerListener (): (message: MessageValue<Response>) => void {
758 return message => {
759 if (message.id != null) {
760 // Task execution response received
761 const promiseResponse = this.promiseResponseMap.get(message.id)
762 if (promiseResponse != null) {
763 if (message.taskError != null) {
764 if (this.emitter != null) {
765 this.emitter.emit(PoolEvents.taskError, message.taskError)
766 }
767 promiseResponse.reject(message.taskError.message)
768 } else {
769 promiseResponse.resolve(message.data as Response)
770 }
771 this.afterTaskExecutionHook(promiseResponse.worker, message)
772 this.promiseResponseMap.delete(message.id)
773 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
774 if (
775 this.opts.enableTasksQueue === true &&
776 this.tasksQueueSize(workerNodeKey) > 0
777 ) {
778 this.executeTask(
779 workerNodeKey,
780 this.dequeueTask(workerNodeKey) as Task<Data>
781 )
782 }
783 this.workerChoiceStrategyContext.update(workerNodeKey)
784 }
785 }
786 }
787 }
788
789 private checkAndEmitEvents (): void {
790 if (this.emitter != null) {
791 if (this.busy) {
792 this.emitter?.emit(PoolEvents.busy, this.info)
793 }
794 if (this.type === PoolTypes.dynamic && this.full) {
795 this.emitter?.emit(PoolEvents.full, this.info)
796 }
797 }
798 }
799
800 /**
801 * Sets the given worker node its tasks usage in the pool.
802 *
803 * @param workerNode - The worker node.
804 * @param workerUsage - The worker usage.
805 */
806 private setWorkerNodeTasksUsage (
807 workerNode: WorkerNode<Worker, Data>,
808 workerUsage: WorkerUsage
809 ): void {
810 workerNode.workerUsage = workerUsage
811 }
812
813 /**
814 * Pushes the given worker in the pool worker nodes.
815 *
816 * @param worker - The worker.
817 * @returns The worker nodes length.
818 */
819 private pushWorkerNode (worker: Worker): number {
820 this.workerNodes.push({
821 worker,
822 workerUsage: this.getWorkerUsage(),
823 tasksQueue: new Queue<Task<Data>>()
824 })
825 const workerNodeKey = this.getWorkerNodeKey(worker)
826 this.setWorkerNodeTasksUsage(
827 this.workerNodes[workerNodeKey],
828 this.getWorkerUsage(workerNodeKey)
829 )
830 return this.workerNodes.length
831 }
832
833 // /**
834 // * Sets the given worker in the pool worker nodes.
835 // *
836 // * @param workerNodeKey - The worker node key.
837 // * @param worker - The worker.
838 // * @param workerUsage - The worker usage.
839 // * @param tasksQueue - The worker task queue.
840 // */
841 // private setWorkerNode (
842 // workerNodeKey: number,
843 // worker: Worker,
844 // workerUsage: WorkerUsage,
845 // tasksQueue: Queue<Task<Data>>
846 // ): void {
847 // this.workerNodes[workerNodeKey] = {
848 // worker,
849 // workerUsage,
850 // tasksQueue
851 // }
852 // }
853
854 /**
855 * Removes the given worker from the pool worker nodes.
856 *
857 * @param worker - The worker.
858 */
859 private removeWorkerNode (worker: Worker): void {
860 const workerNodeKey = this.getWorkerNodeKey(worker)
861 if (workerNodeKey !== -1) {
862 this.workerNodes.splice(workerNodeKey, 1)
863 this.workerChoiceStrategyContext.remove(workerNodeKey)
864 }
865 }
866
867 private executeTask (workerNodeKey: number, task: Task<Data>): void {
868 this.beforeTaskExecutionHook(workerNodeKey, task)
869 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
870 }
871
872 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
873 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
874 }
875
876 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
877 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
878 }
879
880 private tasksQueueSize (workerNodeKey: number): number {
881 return this.workerNodes[workerNodeKey].tasksQueue.size
882 }
883
884 private flushTasksQueue (workerNodeKey: number): void {
885 if (this.tasksQueueSize(workerNodeKey) > 0) {
886 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
887 this.executeTask(
888 workerNodeKey,
889 this.dequeueTask(workerNodeKey) as Task<Data>
890 )
891 }
892 }
893 }
894
895 private flushTasksQueues (): void {
896 for (const [workerNodeKey] of this.workerNodes.entries()) {
897 this.flushTasksQueue(workerNodeKey)
898 }
899 }
900
901 private setWorkerStatistics (worker: Worker): void {
902 this.sendToWorker(worker, {
903 statistics: {
904 runTime:
905 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
906 .runTime.aggregate,
907 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
908 .elu.aggregate
909 }
910 })
911 }
912
913 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
914 const getQueueSize = (workerNodeKey: number): number => {
915 return this.tasksQueueSize(workerNodeKey)
916 }
917 return {
918 tasks: {
919 executed: 0,
920 executing: 0,
921 get queued (): number {
922 return workerNodeKey == null ? 0 : getQueueSize(workerNodeKey)
923 },
924 failed: 0
925 },
926 runTime: {
927 aggregate: 0,
928 average: 0,
929 median: 0,
930 history: new CircularArray()
931 },
932 waitTime: {
933 aggregate: 0,
934 average: 0,
935 median: 0,
936 history: new CircularArray()
937 },
938 elu: {
939 idle: {
940 aggregate: 0,
941 average: 0,
942 median: 0,
943 history: new CircularArray()
944 },
945 active: {
946 aggregate: 0,
947 average: 0,
948 median: 0,
949 history: new CircularArray()
950 },
951 utilization: 0
952 }
953 }
954 }
955 }