fix: add maximum tasks queue size to worker usage data
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
25 import {
26 Measurements,
27 WorkerChoiceStrategies,
28 type WorkerChoiceStrategy,
29 type WorkerChoiceStrategyOptions
30 } from './selection-strategies/selection-strategies-types'
31 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
32
33 /**
34 * Base class that implements some shared logic for all poolifier pools.
35 *
36 * @typeParam Worker - Type of worker which manages this pool.
37 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
38 * @typeParam Response - Type of execution response. This can only be serializable data.
39 */
40 export abstract class AbstractPool<
41 Worker extends IWorker,
42 Data = unknown,
43 Response = unknown
44 > implements IPool<Worker, Data, Response> {
45 /** @inheritDoc */
46 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
47
48 /** @inheritDoc */
49 public readonly emitter?: PoolEmitter
50
51 /**
52 * The execution response promise map.
53 *
54 * - `key`: The message id of each submitted task.
55 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
56 *
57 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
58 */
59 protected promiseResponseMap: Map<
60 string,
61 PromiseResponseWrapper<Worker, Response>
62 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
63
64 /**
65 * Worker choice strategy context referencing a worker choice algorithm implementation.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
73 /**
74 * Constructs a new poolifier pool.
75 *
76 * @param numberOfWorkers - Number of workers that this pool should manage.
77 * @param filePath - Path to the worker file.
78 * @param opts - Options for the pool.
79 */
80 public constructor (
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
84 ) {
85 if (!this.isMain()) {
86 throw new Error('Cannot start a pool from a worker!')
87 }
88 this.checkNumberOfWorkers(this.numberOfWorkers)
89 this.checkFilePath(this.filePath)
90 this.checkPoolOptions(this.opts)
91
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
96
97 if (this.opts.enableEvents === true) {
98 this.emitter = new PoolEmitter()
99 }
100 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
101 Worker,
102 Data,
103 Response
104 >(
105 this,
106 this.opts.workerChoiceStrategy,
107 this.opts.workerChoiceStrategyOptions
108 )
109
110 this.setupHook()
111
112 for (let i = 1; i <= this.numberOfWorkers; i++) {
113 this.createAndSetupWorker()
114 }
115 }
116
117 private checkFilePath (filePath: string): void {
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
132 throw new TypeError(
133 'Cannot instantiate a pool with a non safe integer number of workers'
134 )
135 } else if (numberOfWorkers < 0) {
136 throw new RangeError(
137 'Cannot instantiate a pool with a negative number of workers'
138 )
139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
168 }
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
175 throw new Error(
176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
177 )
178 }
179 }
180
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
189 if (
190 workerChoiceStrategyOptions.weights != null &&
191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
197 if (
198 workerChoiceStrategyOptions.measurement != null &&
199 !Object.values(Measurements).includes(
200 workerChoiceStrategyOptions.measurement
201 )
202 ) {
203 throw new Error(
204 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
205 )
206 }
207 }
208
209 private checkValidTasksQueueOptions (
210 tasksQueueOptions: TasksQueueOptions
211 ): void {
212 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
213 throw new TypeError('Invalid tasks queue options: must be a plain object')
214 }
215 if (
216 tasksQueueOptions?.concurrency != null &&
217 !Number.isSafeInteger(tasksQueueOptions.concurrency)
218 ) {
219 throw new TypeError(
220 'Invalid worker tasks concurrency: must be an integer'
221 )
222 }
223 if (
224 tasksQueueOptions?.concurrency != null &&
225 tasksQueueOptions.concurrency <= 0
226 ) {
227 throw new Error(
228 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
229 )
230 }
231 }
232
233 /** @inheritDoc */
234 public get info (): PoolInfo {
235 return {
236 type: this.type,
237 worker: this.worker,
238 minSize: this.minSize,
239 maxSize: this.maxSize,
240 workerNodes: this.workerNodes.length,
241 idleWorkerNodes: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
243 workerNode.workerUsage.tasks.executing === 0
244 ? accumulator + 1
245 : accumulator,
246 0
247 ),
248 busyWorkerNodes: this.workerNodes.reduce(
249 (accumulator, workerNode) =>
250 workerNode.workerUsage.tasks.executing > 0
251 ? accumulator + 1
252 : accumulator,
253 0
254 ),
255 executedTasks: this.workerNodes.reduce(
256 (accumulator, workerNode) =>
257 accumulator + workerNode.workerUsage.tasks.executed,
258 0
259 ),
260 executingTasks: this.workerNodes.reduce(
261 (accumulator, workerNode) =>
262 accumulator + workerNode.workerUsage.tasks.executing,
263 0
264 ),
265 queuedTasks: this.workerNodes.reduce(
266 (accumulator, workerNode) =>
267 accumulator + workerNode.workerUsage.tasks.queued,
268 0
269 ),
270 maxQueuedTasks: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
272 accumulator + workerNode.workerUsage.tasks.maxQueued,
273 0
274 ),
275 failedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.workerUsage.tasks.failed,
278 0
279 )
280 }
281 }
282
283 /**
284 * Pool type.
285 *
286 * If it is `'dynamic'`, it provides the `max` property.
287 */
288 protected abstract get type (): PoolType
289
290 /**
291 * Gets the worker type.
292 */
293 protected abstract get worker (): WorkerType
294
295 /**
296 * Pool minimum size.
297 */
298 protected abstract get minSize (): number
299
300 /**
301 * Pool maximum size.
302 */
303 protected abstract get maxSize (): number
304
305 /**
306 * Gets the given worker its worker node key.
307 *
308 * @param worker - The worker.
309 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
310 */
311 private getWorkerNodeKey (worker: Worker): number {
312 return this.workerNodes.findIndex(
313 workerNode => workerNode.worker === worker
314 )
315 }
316
317 /** @inheritDoc */
318 public setWorkerChoiceStrategy (
319 workerChoiceStrategy: WorkerChoiceStrategy,
320 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
321 ): void {
322 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
323 this.opts.workerChoiceStrategy = workerChoiceStrategy
324 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
325 this.opts.workerChoiceStrategy
326 )
327 if (workerChoiceStrategyOptions != null) {
328 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
329 }
330 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
331 this.setWorkerNodeTasksUsage(
332 workerNode,
333 this.getWorkerUsage(workerNodeKey)
334 )
335 this.setWorkerStatistics(workerNode.worker)
336 }
337 }
338
339 /** @inheritDoc */
340 public setWorkerChoiceStrategyOptions (
341 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
342 ): void {
343 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
344 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
345 this.workerChoiceStrategyContext.setOptions(
346 this.opts.workerChoiceStrategyOptions
347 )
348 }
349
350 /** @inheritDoc */
351 public enableTasksQueue (
352 enable: boolean,
353 tasksQueueOptions?: TasksQueueOptions
354 ): void {
355 if (this.opts.enableTasksQueue === true && !enable) {
356 this.flushTasksQueues()
357 }
358 this.opts.enableTasksQueue = enable
359 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
360 }
361
362 /** @inheritDoc */
363 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
364 if (this.opts.enableTasksQueue === true) {
365 this.checkValidTasksQueueOptions(tasksQueueOptions)
366 this.opts.tasksQueueOptions =
367 this.buildTasksQueueOptions(tasksQueueOptions)
368 } else if (this.opts.tasksQueueOptions != null) {
369 delete this.opts.tasksQueueOptions
370 }
371 }
372
373 private buildTasksQueueOptions (
374 tasksQueueOptions: TasksQueueOptions
375 ): TasksQueueOptions {
376 return {
377 concurrency: tasksQueueOptions?.concurrency ?? 1
378 }
379 }
380
381 /**
382 * Whether the pool is full or not.
383 *
384 * The pool filling boolean status.
385 */
386 protected get full (): boolean {
387 return this.workerNodes.length >= this.maxSize
388 }
389
390 /**
391 * Whether the pool is busy or not.
392 *
393 * The pool busyness boolean status.
394 */
395 protected abstract get busy (): boolean
396
397 /**
398 * Whether worker nodes are executing at least one task.
399 *
400 * @returns Worker nodes busyness boolean status.
401 */
402 protected internalBusy (): boolean {
403 return (
404 this.workerNodes.findIndex(workerNode => {
405 return workerNode.workerUsage.tasks.executing === 0
406 }) === -1
407 )
408 }
409
410 /** @inheritDoc */
411 public async execute (data?: Data, name?: string): Promise<Response> {
412 const timestamp = performance.now()
413 const workerNodeKey = this.chooseWorkerNode()
414 const submittedTask: Task<Data> = {
415 name,
416 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
417 data: data ?? ({} as Data),
418 timestamp,
419 id: crypto.randomUUID()
420 }
421 const res = new Promise<Response>((resolve, reject) => {
422 this.promiseResponseMap.set(submittedTask.id as string, {
423 resolve,
424 reject,
425 worker: this.workerNodes[workerNodeKey].worker
426 })
427 })
428 if (
429 this.opts.enableTasksQueue === true &&
430 (this.busy ||
431 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
432 ((this.opts.tasksQueueOptions as TasksQueueOptions)
433 .concurrency as number))
434 ) {
435 this.enqueueTask(workerNodeKey, submittedTask)
436 } else {
437 this.executeTask(workerNodeKey, submittedTask)
438 }
439 this.checkAndEmitEvents()
440 // eslint-disable-next-line @typescript-eslint/return-await
441 return res
442 }
443
444 /** @inheritDoc */
445 public async destroy (): Promise<void> {
446 await Promise.all(
447 this.workerNodes.map(async (workerNode, workerNodeKey) => {
448 this.flushTasksQueue(workerNodeKey)
449 // FIXME: wait for tasks to be finished
450 await this.destroyWorker(workerNode.worker)
451 })
452 )
453 }
454
455 /**
456 * Terminates the given worker.
457 *
458 * @param worker - A worker within `workerNodes`.
459 */
460 protected abstract destroyWorker (worker: Worker): void | Promise<void>
461
462 /**
463 * Setup hook to execute code before worker node are created in the abstract constructor.
464 * Can be overridden
465 *
466 * @virtual
467 */
468 protected setupHook (): void {
469 // Intentionally empty
470 }
471
472 /**
473 * Should return whether the worker is the main worker or not.
474 */
475 protected abstract isMain (): boolean
476
477 /**
478 * Hook executed before the worker task execution.
479 * Can be overridden.
480 *
481 * @param workerNodeKey - The worker node key.
482 * @param task - The task to execute.
483 */
484 protected beforeTaskExecutionHook (
485 workerNodeKey: number,
486 task: Task<Data>
487 ): void {
488 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
489 ++workerUsage.tasks.executing
490 this.updateWaitTimeWorkerUsage(workerUsage, task)
491 }
492
493 /**
494 * Hook executed after the worker task execution.
495 * Can be overridden.
496 *
497 * @param worker - The worker.
498 * @param message - The received message.
499 */
500 protected afterTaskExecutionHook (
501 worker: Worker,
502 message: MessageValue<Response>
503 ): void {
504 const workerUsage =
505 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
506 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
507 this.updateRunTimeWorkerUsage(workerUsage, message)
508 this.updateEluWorkerUsage(workerUsage, message)
509 }
510
511 private updateTaskStatisticsWorkerUsage (
512 workerUsage: WorkerUsage,
513 message: MessageValue<Response>
514 ): void {
515 const workerTaskStatistics = workerUsage.tasks
516 --workerTaskStatistics.executing
517 ++workerTaskStatistics.executed
518 if (message.taskError != null) {
519 ++workerTaskStatistics.failed
520 }
521 }
522
523 private updateRunTimeWorkerUsage (
524 workerUsage: WorkerUsage,
525 message: MessageValue<Response>
526 ): void {
527 if (
528 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
529 .aggregate
530 ) {
531 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
532 if (
533 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
534 .average &&
535 workerUsage.tasks.executed !== 0
536 ) {
537 workerUsage.runTime.average =
538 workerUsage.runTime.aggregate /
539 (workerUsage.tasks.executed - workerUsage.tasks.failed)
540 }
541 if (
542 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
543 .median &&
544 message.taskPerformance?.runTime != null
545 ) {
546 workerUsage.runTime.history.push(message.taskPerformance.runTime)
547 workerUsage.runTime.median = median(workerUsage.runTime.history)
548 }
549 }
550 }
551
552 private updateWaitTimeWorkerUsage (
553 workerUsage: WorkerUsage,
554 task: Task<Data>
555 ): void {
556 const timestamp = performance.now()
557 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
558 if (
559 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
560 .aggregate
561 ) {
562 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
563 if (
564 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
565 .waitTime.average &&
566 workerUsage.tasks.executed !== 0
567 ) {
568 workerUsage.waitTime.average =
569 workerUsage.waitTime.aggregate /
570 (workerUsage.tasks.executed - workerUsage.tasks.failed)
571 }
572 if (
573 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
574 .waitTime.median &&
575 taskWaitTime != null
576 ) {
577 workerUsage.waitTime.history.push(taskWaitTime)
578 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
579 }
580 }
581 }
582
583 private updateEluWorkerUsage (
584 workerUsage: WorkerUsage,
585 message: MessageValue<Response>
586 ): void {
587 if (
588 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
589 .aggregate
590 ) {
591 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
592 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
593 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
594 workerUsage.elu.utilization =
595 (workerUsage.elu.utilization +
596 message.taskPerformance.elu.utilization) /
597 2
598 } else if (message.taskPerformance?.elu != null) {
599 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
600 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
601 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
602 }
603 if (
604 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
605 .average &&
606 workerUsage.tasks.executed !== 0
607 ) {
608 const executedTasks =
609 workerUsage.tasks.executed - workerUsage.tasks.failed
610 workerUsage.elu.idle.average =
611 workerUsage.elu.idle.aggregate / executedTasks
612 workerUsage.elu.active.average =
613 workerUsage.elu.active.aggregate / executedTasks
614 }
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
617 .median &&
618 message.taskPerformance?.elu != null
619 ) {
620 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
621 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
622 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
623 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
624 }
625 }
626 }
627
628 /**
629 * Chooses a worker node for the next task.
630 *
631 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
632 *
633 * @returns The worker node key
634 */
635 private chooseWorkerNode (): number {
636 if (this.shallCreateDynamicWorker()) {
637 const worker = this.createAndSetupDynamicWorker()
638 if (
639 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
640 ) {
641 return this.getWorkerNodeKey(worker)
642 }
643 }
644 return this.workerChoiceStrategyContext.execute()
645 }
646
647 /**
648 * Conditions for dynamic worker creation.
649 *
650 * @returns Whether to create a dynamic worker or not.
651 */
652 private shallCreateDynamicWorker (): boolean {
653 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
654 }
655
656 /**
657 * Sends a message to the given worker.
658 *
659 * @param worker - The worker which should receive the message.
660 * @param message - The message.
661 */
662 protected abstract sendToWorker (
663 worker: Worker,
664 message: MessageValue<Data>
665 ): void
666
667 /**
668 * Registers a listener callback on the given worker.
669 *
670 * @param worker - The worker which should register a listener.
671 * @param listener - The message listener callback.
672 */
673 protected abstract registerWorkerMessageListener<
674 Message extends Data | Response
675 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
676
677 /**
678 * Creates a new worker.
679 *
680 * @returns Newly created worker.
681 */
682 protected abstract createWorker (): Worker
683
684 /**
685 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
686 *
687 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
688 *
689 * @param worker - The newly created worker.
690 */
691 protected abstract afterWorkerSetup (worker: Worker): void
692
693 /**
694 * Creates a new worker and sets it up completely in the pool worker nodes.
695 *
696 * @returns New, completely set up worker.
697 */
698 protected createAndSetupWorker (): Worker {
699 const worker = this.createWorker()
700
701 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
702 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
703 worker.on('error', error => {
704 if (this.emitter != null) {
705 this.emitter.emit(PoolEvents.error, error)
706 }
707 if (this.opts.restartWorkerOnError === true) {
708 this.createAndSetupWorker()
709 }
710 })
711 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
712 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
713 worker.once('exit', () => {
714 this.removeWorkerNode(worker)
715 })
716
717 this.pushWorkerNode(worker)
718
719 this.setWorkerStatistics(worker)
720
721 this.afterWorkerSetup(worker)
722
723 return worker
724 }
725
726 /**
727 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
728 *
729 * @returns New, completely set up dynamic worker.
730 */
731 protected createAndSetupDynamicWorker (): Worker {
732 const worker = this.createAndSetupWorker()
733 this.registerWorkerMessageListener(worker, message => {
734 const workerNodeKey = this.getWorkerNodeKey(worker)
735 if (
736 isKillBehavior(KillBehaviors.HARD, message.kill) ||
737 (message.kill != null &&
738 ((this.opts.enableTasksQueue === false &&
739 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
740 0) ||
741 (this.opts.enableTasksQueue === true &&
742 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
743 0 &&
744 this.tasksQueueSize(workerNodeKey) === 0)))
745 ) {
746 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
747 void (this.destroyWorker(worker) as Promise<void>)
748 }
749 })
750 return worker
751 }
752
753 /**
754 * This function is the listener registered for each worker message.
755 *
756 * @returns The listener function to execute when a message is received from a worker.
757 */
758 protected workerListener (): (message: MessageValue<Response>) => void {
759 return message => {
760 if (message.id != null) {
761 // Task execution response received
762 const promiseResponse = this.promiseResponseMap.get(message.id)
763 if (promiseResponse != null) {
764 if (message.taskError != null) {
765 if (this.emitter != null) {
766 this.emitter.emit(PoolEvents.taskError, message.taskError)
767 }
768 promiseResponse.reject(message.taskError.message)
769 } else {
770 promiseResponse.resolve(message.data as Response)
771 }
772 this.afterTaskExecutionHook(promiseResponse.worker, message)
773 this.promiseResponseMap.delete(message.id)
774 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
775 if (
776 this.opts.enableTasksQueue === true &&
777 this.tasksQueueSize(workerNodeKey) > 0
778 ) {
779 this.executeTask(
780 workerNodeKey,
781 this.dequeueTask(workerNodeKey) as Task<Data>
782 )
783 }
784 this.workerChoiceStrategyContext.update(workerNodeKey)
785 }
786 }
787 }
788 }
789
790 private checkAndEmitEvents (): void {
791 if (this.emitter != null) {
792 if (this.busy) {
793 this.emitter?.emit(PoolEvents.busy, this.info)
794 }
795 if (this.type === PoolTypes.dynamic && this.full) {
796 this.emitter?.emit(PoolEvents.full, this.info)
797 }
798 }
799 }
800
801 /**
802 * Sets the given worker node its tasks usage in the pool.
803 *
804 * @param workerNode - The worker node.
805 * @param workerUsage - The worker usage.
806 */
807 private setWorkerNodeTasksUsage (
808 workerNode: WorkerNode<Worker, Data>,
809 workerUsage: WorkerUsage
810 ): void {
811 workerNode.workerUsage = workerUsage
812 }
813
814 /**
815 * Pushes the given worker in the pool worker nodes.
816 *
817 * @param worker - The worker.
818 * @returns The worker nodes length.
819 */
820 private pushWorkerNode (worker: Worker): number {
821 this.workerNodes.push({
822 worker,
823 workerUsage: this.getWorkerUsage(),
824 tasksQueue: new Queue<Task<Data>>()
825 })
826 const workerNodeKey = this.getWorkerNodeKey(worker)
827 this.setWorkerNodeTasksUsage(
828 this.workerNodes[workerNodeKey],
829 this.getWorkerUsage(workerNodeKey)
830 )
831 return this.workerNodes.length
832 }
833
834 // /**
835 // * Sets the given worker in the pool worker nodes.
836 // *
837 // * @param workerNodeKey - The worker node key.
838 // * @param worker - The worker.
839 // * @param workerUsage - The worker usage.
840 // * @param tasksQueue - The worker task queue.
841 // */
842 // private setWorkerNode (
843 // workerNodeKey: number,
844 // worker: Worker,
845 // workerUsage: WorkerUsage,
846 // tasksQueue: Queue<Task<Data>>
847 // ): void {
848 // this.workerNodes[workerNodeKey] = {
849 // worker,
850 // workerUsage,
851 // tasksQueue
852 // }
853 // }
854
855 /**
856 * Removes the given worker from the pool worker nodes.
857 *
858 * @param worker - The worker.
859 */
860 private removeWorkerNode (worker: Worker): void {
861 const workerNodeKey = this.getWorkerNodeKey(worker)
862 if (workerNodeKey !== -1) {
863 this.workerNodes.splice(workerNodeKey, 1)
864 this.workerChoiceStrategyContext.remove(workerNodeKey)
865 }
866 }
867
868 private executeTask (workerNodeKey: number, task: Task<Data>): void {
869 this.beforeTaskExecutionHook(workerNodeKey, task)
870 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
871 }
872
873 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
874 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
875 }
876
877 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
878 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
879 }
880
881 private tasksQueueSize (workerNodeKey: number): number {
882 return this.workerNodes[workerNodeKey].tasksQueue.size
883 }
884
885 private tasksMaxQueueSize (workerNodeKey: number): number {
886 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
887 }
888
889 private flushTasksQueue (workerNodeKey: number): void {
890 if (this.tasksQueueSize(workerNodeKey) > 0) {
891 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
892 this.executeTask(
893 workerNodeKey,
894 this.dequeueTask(workerNodeKey) as Task<Data>
895 )
896 }
897 }
898 this.workerNodes[workerNodeKey].tasksQueue.clear()
899 }
900
901 private flushTasksQueues (): void {
902 for (const [workerNodeKey] of this.workerNodes.entries()) {
903 this.flushTasksQueue(workerNodeKey)
904 }
905 }
906
907 private setWorkerStatistics (worker: Worker): void {
908 this.sendToWorker(worker, {
909 statistics: {
910 runTime:
911 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
912 .runTime.aggregate,
913 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
914 .elu.aggregate
915 }
916 })
917 }
918
919 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
920 const getTasksQueueSize = (workerNodeKey?: number): number => {
921 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
922 }
923 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
924 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
925 }
926 return {
927 tasks: {
928 executed: 0,
929 executing: 0,
930 get queued (): number {
931 return getTasksQueueSize(workerNodeKey)
932 },
933 get maxQueued (): number {
934 return getTasksMaxQueueSize(workerNodeKey)
935 },
936 failed: 0
937 },
938 runTime: {
939 aggregate: 0,
940 average: 0,
941 median: 0,
942 history: new CircularArray()
943 },
944 waitTime: {
945 aggregate: 0,
946 average: 0,
947 median: 0,
948 history: new CircularArray()
949 },
950 elu: {
951 idle: {
952 aggregate: 0,
953 average: 0,
954 median: 0,
955 history: new CircularArray()
956 },
957 active: {
958 aggregate: 0,
959 average: 0,
960 median: 0,
961 history: new CircularArray()
962 },
963 utilization: 0
964 }
965 }
966 }
967 }