refactor: cleanup internal pool messaging code
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type {
25 IWorker,
26 MessageHandler,
27 Task,
28 WorkerNode,
29 WorkerUsage
30 } from './worker'
31 import {
32 Measurements,
33 WorkerChoiceStrategies,
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
36 } from './selection-strategies/selection-strategies-types'
37 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
38
39 /**
40 * Base class that implements some shared logic for all poolifier pools.
41 *
42 * @typeParam Worker - Type of worker which manages this pool.
43 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
44 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
45 */
46 export abstract class AbstractPool<
47 Worker extends IWorker,
48 Data = unknown,
49 Response = unknown
50 > implements IPool<Worker, Data, Response> {
51 /** @inheritDoc */
52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
53
54 /** @inheritDoc */
55 public readonly emitter?: PoolEmitter
56
57 /**
58 * The execution response promise map.
59 *
60 * - `key`: The message id of each submitted task.
61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
62 *
63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
64 */
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
69
70 /**
71 * Worker choice strategy context referencing a worker choice algorithm implementation.
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
74 Worker,
75 Data,
76 Response
77 >
78
79 /**
80 * Constructs a new poolifier pool.
81 *
82 * @param numberOfWorkers - Number of workers that this pool should manage.
83 * @param filePath - Path to the worker file.
84 * @param opts - Options for the pool.
85 */
86 public constructor (
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
90 ) {
91 if (!this.isMain()) {
92 throw new Error('Cannot start a pool from a worker!')
93 }
94 this.checkNumberOfWorkers(this.numberOfWorkers)
95 this.checkFilePath(this.filePath)
96 this.checkPoolOptions(this.opts)
97
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
121 }
122
123 private checkFilePath (filePath: string): void {
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
138 throw new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 } else if (numberOfWorkers < 0) {
142 throw new RangeError(
143 'Cannot instantiate a pool with a negative number of workers'
144 )
145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
174 }
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
181 throw new Error(
182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
183 )
184 }
185 }
186
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
195 if (
196 workerChoiceStrategyOptions.weights != null &&
197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
213 }
214
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
233 throw new Error(
234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
235 )
236 }
237 }
238
239 /** @inheritDoc */
240 public get info (): PoolInfo {
241 return {
242 type: this.type,
243 worker: this.worker,
244 minSize: this.minSize,
245 maxSize: this.maxSize,
246 workerNodes: this.workerNodes.length,
247 idleWorkerNodes: this.workerNodes.reduce(
248 (accumulator, workerNode) =>
249 workerNode.workerUsage.tasks.executing === 0
250 ? accumulator + 1
251 : accumulator,
252 0
253 ),
254 busyWorkerNodes: this.workerNodes.reduce(
255 (accumulator, workerNode) =>
256 workerNode.workerUsage.tasks.executing > 0
257 ? accumulator + 1
258 : accumulator,
259 0
260 ),
261 executedTasks: this.workerNodes.reduce(
262 (accumulator, workerNode) =>
263 accumulator + workerNode.workerUsage.tasks.executed,
264 0
265 ),
266 executingTasks: this.workerNodes.reduce(
267 (accumulator, workerNode) =>
268 accumulator + workerNode.workerUsage.tasks.executing,
269 0
270 ),
271 queuedTasks: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
273 accumulator + workerNode.workerUsage.tasks.queued,
274 0
275 ),
276 maxQueuedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
278 accumulator + workerNode.workerUsage.tasks.maxQueued,
279 0
280 ),
281 failedTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.workerUsage.tasks.failed,
284 0
285 )
286 }
287 }
288
289 /**
290 * Pool type.
291 *
292 * If it is `'dynamic'`, it provides the `max` property.
293 */
294 protected abstract get type (): PoolType
295
296 /**
297 * Gets the worker type.
298 */
299 protected abstract get worker (): WorkerType
300
301 /**
302 * Pool minimum size.
303 */
304 protected abstract get minSize (): number
305
306 /**
307 * Pool maximum size.
308 */
309 protected abstract get maxSize (): number
310
311 /**
312 * Gets the given worker its worker node key.
313 *
314 * @param worker - The worker.
315 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
316 */
317 private getWorkerNodeKey (worker: Worker): number {
318 return this.workerNodes.findIndex(
319 workerNode => workerNode.worker === worker
320 )
321 }
322
323 /** @inheritDoc */
324 public setWorkerChoiceStrategy (
325 workerChoiceStrategy: WorkerChoiceStrategy,
326 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
327 ): void {
328 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
329 this.opts.workerChoiceStrategy = workerChoiceStrategy
330 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
331 this.opts.workerChoiceStrategy
332 )
333 if (workerChoiceStrategyOptions != null) {
334 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
335 }
336 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
337 this.setWorkerNodeTasksUsage(
338 workerNode,
339 this.getWorkerUsage(workerNodeKey)
340 )
341 this.setWorkerStatistics(workerNode.worker)
342 }
343 }
344
345 /** @inheritDoc */
346 public setWorkerChoiceStrategyOptions (
347 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
348 ): void {
349 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
350 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
351 this.workerChoiceStrategyContext.setOptions(
352 this.opts.workerChoiceStrategyOptions
353 )
354 }
355
356 /** @inheritDoc */
357 public enableTasksQueue (
358 enable: boolean,
359 tasksQueueOptions?: TasksQueueOptions
360 ): void {
361 if (this.opts.enableTasksQueue === true && !enable) {
362 this.flushTasksQueues()
363 }
364 this.opts.enableTasksQueue = enable
365 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
366 }
367
368 /** @inheritDoc */
369 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
370 if (this.opts.enableTasksQueue === true) {
371 this.checkValidTasksQueueOptions(tasksQueueOptions)
372 this.opts.tasksQueueOptions =
373 this.buildTasksQueueOptions(tasksQueueOptions)
374 } else if (this.opts.tasksQueueOptions != null) {
375 delete this.opts.tasksQueueOptions
376 }
377 }
378
379 private buildTasksQueueOptions (
380 tasksQueueOptions: TasksQueueOptions
381 ): TasksQueueOptions {
382 return {
383 concurrency: tasksQueueOptions?.concurrency ?? 1
384 }
385 }
386
387 /**
388 * Whether the pool is full or not.
389 *
390 * The pool filling boolean status.
391 */
392 protected get full (): boolean {
393 return this.workerNodes.length >= this.maxSize
394 }
395
396 /**
397 * Whether the pool is busy or not.
398 *
399 * The pool busyness boolean status.
400 */
401 protected abstract get busy (): boolean
402
403 /**
404 * Whether worker nodes are executing at least one task.
405 *
406 * @returns Worker nodes busyness boolean status.
407 */
408 protected internalBusy (): boolean {
409 return (
410 this.workerNodes.findIndex(workerNode => {
411 return workerNode.workerUsage.tasks.executing === 0
412 }) === -1
413 )
414 }
415
416 /** @inheritDoc */
417 public async execute (data?: Data, name?: string): Promise<Response> {
418 const timestamp = performance.now()
419 const workerNodeKey = this.chooseWorkerNode()
420 const submittedTask: Task<Data> = {
421 name,
422 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
423 data: data ?? ({} as Data),
424 timestamp,
425 id: crypto.randomUUID()
426 }
427 const res = new Promise<Response>((resolve, reject) => {
428 this.promiseResponseMap.set(submittedTask.id as string, {
429 resolve,
430 reject,
431 worker: this.workerNodes[workerNodeKey].worker
432 })
433 })
434 if (
435 this.opts.enableTasksQueue === true &&
436 (this.busy ||
437 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
438 ((this.opts.tasksQueueOptions as TasksQueueOptions)
439 .concurrency as number))
440 ) {
441 this.enqueueTask(workerNodeKey, submittedTask)
442 } else {
443 this.executeTask(workerNodeKey, submittedTask)
444 }
445 this.checkAndEmitEvents()
446 // eslint-disable-next-line @typescript-eslint/return-await
447 return res
448 }
449
450 /** @inheritDoc */
451 public async destroy (): Promise<void> {
452 await Promise.all(
453 this.workerNodes.map(async (workerNode, workerNodeKey) => {
454 this.flushTasksQueue(workerNodeKey)
455 // FIXME: wait for tasks to be finished
456 await this.destroyWorker(workerNode.worker)
457 })
458 )
459 }
460
461 /**
462 * Terminates the given worker.
463 *
464 * @param worker - A worker within `workerNodes`.
465 */
466 protected abstract destroyWorker (worker: Worker): void | Promise<void>
467
468 /**
469 * Setup hook to execute code before worker nodes are created in the abstract constructor.
470 * Can be overridden.
471 *
472 * @virtual
473 */
474 protected setupHook (): void {
475 // Intentionally empty
476 }
477
478 /**
479 * Should return whether the worker is the main worker or not.
480 */
481 protected abstract isMain (): boolean
482
483 /**
484 * Hook executed before the worker task execution.
485 * Can be overridden.
486 *
487 * @param workerNodeKey - The worker node key.
488 * @param task - The task to execute.
489 */
490 protected beforeTaskExecutionHook (
491 workerNodeKey: number,
492 task: Task<Data>
493 ): void {
494 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
495 ++workerUsage.tasks.executing
496 this.updateWaitTimeWorkerUsage(workerUsage, task)
497 }
498
499 /**
500 * Hook executed after the worker task execution.
501 * Can be overridden.
502 *
503 * @param worker - The worker.
504 * @param message - The received message.
505 */
506 protected afterTaskExecutionHook (
507 worker: Worker,
508 message: MessageValue<Response>
509 ): void {
510 const workerUsage =
511 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
512 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
513 this.updateRunTimeWorkerUsage(workerUsage, message)
514 this.updateEluWorkerUsage(workerUsage, message)
515 }
516
517 private updateTaskStatisticsWorkerUsage (
518 workerUsage: WorkerUsage,
519 message: MessageValue<Response>
520 ): void {
521 const workerTaskStatistics = workerUsage.tasks
522 --workerTaskStatistics.executing
523 ++workerTaskStatistics.executed
524 if (message.taskError != null) {
525 ++workerTaskStatistics.failed
526 }
527 }
528
529 private updateRunTimeWorkerUsage (
530 workerUsage: WorkerUsage,
531 message: MessageValue<Response>
532 ): void {
533 if (
534 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
535 .aggregate
536 ) {
537 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
538 if (
539 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
540 .average &&
541 workerUsage.tasks.executed !== 0
542 ) {
543 workerUsage.runTime.average =
544 workerUsage.runTime.aggregate /
545 (workerUsage.tasks.executed - workerUsage.tasks.failed)
546 }
547 if (
548 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
549 .median &&
550 message.taskPerformance?.runTime != null
551 ) {
552 workerUsage.runTime.history.push(message.taskPerformance.runTime)
553 workerUsage.runTime.median = median(workerUsage.runTime.history)
554 }
555 }
556 }
557
558 private updateWaitTimeWorkerUsage (
559 workerUsage: WorkerUsage,
560 task: Task<Data>
561 ): void {
562 const timestamp = performance.now()
563 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
564 if (
565 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
566 .aggregate
567 ) {
568 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
569 if (
570 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
571 .waitTime.average &&
572 workerUsage.tasks.executed !== 0
573 ) {
574 workerUsage.waitTime.average =
575 workerUsage.waitTime.aggregate /
576 (workerUsage.tasks.executed - workerUsage.tasks.failed)
577 }
578 if (
579 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
580 .waitTime.median &&
581 taskWaitTime != null
582 ) {
583 workerUsage.waitTime.history.push(taskWaitTime)
584 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
585 }
586 }
587 }
588
589 private updateEluWorkerUsage (
590 workerUsage: WorkerUsage,
591 message: MessageValue<Response>
592 ): void {
593 if (
594 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
595 .aggregate
596 ) {
597 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
598 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
599 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
600 workerUsage.elu.utilization =
601 (workerUsage.elu.utilization +
602 message.taskPerformance.elu.utilization) /
603 2
604 } else if (message.taskPerformance?.elu != null) {
605 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
606 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
607 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
608 }
609 if (
610 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
611 .average &&
612 workerUsage.tasks.executed !== 0
613 ) {
614 const executedTasks =
615 workerUsage.tasks.executed - workerUsage.tasks.failed
616 workerUsage.elu.idle.average =
617 workerUsage.elu.idle.aggregate / executedTasks
618 workerUsage.elu.active.average =
619 workerUsage.elu.active.aggregate / executedTasks
620 }
621 if (
622 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
623 .median &&
624 message.taskPerformance?.elu != null
625 ) {
626 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
627 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
628 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
629 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
630 }
631 }
632 }
633
634 /**
635 * Chooses a worker node for the next task.
636 *
637 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
638 *
639 * @returns The worker node key
640 */
641 private chooseWorkerNode (): number {
642 if (this.shallCreateDynamicWorker()) {
643 const worker = this.createAndSetupDynamicWorker()
644 if (
645 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
646 ) {
647 return this.getWorkerNodeKey(worker)
648 }
649 }
650 return this.workerChoiceStrategyContext.execute()
651 }
652
653 /**
654 * Conditions for dynamic worker creation.
655 *
656 * @returns Whether to create a dynamic worker or not.
657 */
658 private shallCreateDynamicWorker (): boolean {
659 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
660 }
661
662 /**
663 * Sends a message to the given worker.
664 *
665 * @param worker - The worker which should receive the message.
666 * @param message - The message.
667 */
668 protected abstract sendToWorker (
669 worker: Worker,
670 message: MessageValue<Data>
671 ): void
672
673 /**
674 * Registers a listener callback on the given worker.
675 *
676 * @param worker - The worker which should register a listener.
677 * @param listener - The message listener callback.
678 */
679 private registerWorkerMessageListener<Message extends Data | Response>(
680 worker: Worker,
681 listener: (message: MessageValue<Message>) => void
682 ): void {
683 worker.on('message', listener as MessageHandler<Worker>)
684 }
685
686 /**
687 * Creates a new worker.
688 *
689 * @returns Newly created worker.
690 */
691 protected abstract createWorker (): Worker
692
693 /**
694 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
695 * Can be overridden.
696 *
697 * @param worker - The newly created worker.
698 */
699 protected afterWorkerSetup (worker: Worker): void {
700 // Listen to worker messages.
701 this.registerWorkerMessageListener(worker, this.workerListener())
702 }
703
704 /**
705 * Creates a new worker and sets it up completely in the pool worker nodes.
706 *
707 * @returns New, completely set up worker.
708 */
709 protected createAndSetupWorker (): Worker {
710 const worker = this.createWorker()
711
712 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
713 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
714 worker.on('error', error => {
715 if (this.emitter != null) {
716 this.emitter.emit(PoolEvents.error, error)
717 }
718 if (this.opts.restartWorkerOnError === true) {
719 this.createAndSetupWorker()
720 }
721 })
722 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
723 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
724 worker.once('exit', () => {
725 this.removeWorkerNode(worker)
726 })
727
728 this.pushWorkerNode(worker)
729
730 this.setWorkerStatistics(worker)
731
732 this.afterWorkerSetup(worker)
733
734 return worker
735 }
736
737 /**
738 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
739 *
740 * @returns New, completely set up dynamic worker.
741 */
742 protected createAndSetupDynamicWorker (): Worker {
743 const worker = this.createAndSetupWorker()
744 this.registerWorkerMessageListener(worker, message => {
745 const workerNodeKey = this.getWorkerNodeKey(worker)
746 if (
747 isKillBehavior(KillBehaviors.HARD, message.kill) ||
748 (message.kill != null &&
749 ((this.opts.enableTasksQueue === false &&
750 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
751 0) ||
752 (this.opts.enableTasksQueue === true &&
753 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
754 0 &&
755 this.tasksQueueSize(workerNodeKey) === 0)))
756 ) {
757 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
758 void (this.destroyWorker(worker) as Promise<void>)
759 }
760 })
761 return worker
762 }
763
764 /**
765 * This function is the listener registered for each worker message.
766 *
767 * @returns The listener function to execute when a message is received from a worker.
768 */
769 protected workerListener (): (message: MessageValue<Response>) => void {
770 return message => {
771 if (message.id != null) {
772 // Task execution response received
773 const promiseResponse = this.promiseResponseMap.get(message.id)
774 if (promiseResponse != null) {
775 if (message.taskError != null) {
776 if (this.emitter != null) {
777 this.emitter.emit(PoolEvents.taskError, message.taskError)
778 }
779 promiseResponse.reject(message.taskError.message)
780 } else {
781 promiseResponse.resolve(message.data as Response)
782 }
783 this.afterTaskExecutionHook(promiseResponse.worker, message)
784 this.promiseResponseMap.delete(message.id)
785 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
786 if (
787 this.opts.enableTasksQueue === true &&
788 this.tasksQueueSize(workerNodeKey) > 0
789 ) {
790 this.executeTask(
791 workerNodeKey,
792 this.dequeueTask(workerNodeKey) as Task<Data>
793 )
794 }
795 this.workerChoiceStrategyContext.update(workerNodeKey)
796 }
797 }
798 }
799 }
800
801 private checkAndEmitEvents (): void {
802 if (this.emitter != null) {
803 if (this.busy) {
804 this.emitter?.emit(PoolEvents.busy, this.info)
805 }
806 if (this.type === PoolTypes.dynamic && this.full) {
807 this.emitter?.emit(PoolEvents.full, this.info)
808 }
809 }
810 }
811
812 /**
813 * Sets the given worker node its tasks usage in the pool.
814 *
815 * @param workerNode - The worker node.
816 * @param workerUsage - The worker usage.
817 */
818 private setWorkerNodeTasksUsage (
819 workerNode: WorkerNode<Worker, Data>,
820 workerUsage: WorkerUsage
821 ): void {
822 workerNode.workerUsage = workerUsage
823 }
824
825 /**
826 * Pushes the given worker in the pool worker nodes.
827 *
828 * @param worker - The worker.
829 * @returns The worker nodes length.
830 */
831 private pushWorkerNode (worker: Worker): number {
832 this.workerNodes.push({
833 worker,
834 workerUsage: this.getWorkerUsage(),
835 tasksQueue: new Queue<Task<Data>>()
836 })
837 const workerNodeKey = this.getWorkerNodeKey(worker)
838 this.setWorkerNodeTasksUsage(
839 this.workerNodes[workerNodeKey],
840 this.getWorkerUsage(workerNodeKey)
841 )
842 return this.workerNodes.length
843 }
844
845 // /**
846 // * Sets the given worker in the pool worker nodes.
847 // *
848 // * @param workerNodeKey - The worker node key.
849 // * @param worker - The worker.
850 // * @param workerUsage - The worker usage.
851 // * @param tasksQueue - The worker task queue.
852 // */
853 // private setWorkerNode (
854 // workerNodeKey: number,
855 // worker: Worker,
856 // workerUsage: WorkerUsage,
857 // tasksQueue: Queue<Task<Data>>
858 // ): void {
859 // this.workerNodes[workerNodeKey] = {
860 // worker,
861 // workerUsage,
862 // tasksQueue
863 // }
864 // }
865
866 /**
867 * Removes the given worker from the pool worker nodes.
868 *
869 * @param worker - The worker.
870 */
871 private removeWorkerNode (worker: Worker): void {
872 const workerNodeKey = this.getWorkerNodeKey(worker)
873 if (workerNodeKey !== -1) {
874 this.workerNodes.splice(workerNodeKey, 1)
875 this.workerChoiceStrategyContext.remove(workerNodeKey)
876 }
877 }
878
879 private executeTask (workerNodeKey: number, task: Task<Data>): void {
880 this.beforeTaskExecutionHook(workerNodeKey, task)
881 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
882 }
883
884 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
885 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
886 }
887
888 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
889 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
890 }
891
892 private tasksQueueSize (workerNodeKey: number): number {
893 return this.workerNodes[workerNodeKey].tasksQueue.size
894 }
895
896 private tasksMaxQueueSize (workerNodeKey: number): number {
897 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
898 }
899
900 private flushTasksQueue (workerNodeKey: number): void {
901 if (this.tasksQueueSize(workerNodeKey) > 0) {
902 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
903 this.executeTask(
904 workerNodeKey,
905 this.dequeueTask(workerNodeKey) as Task<Data>
906 )
907 }
908 }
909 this.workerNodes[workerNodeKey].tasksQueue.clear()
910 }
911
912 private flushTasksQueues (): void {
913 for (const [workerNodeKey] of this.workerNodes.entries()) {
914 this.flushTasksQueue(workerNodeKey)
915 }
916 }
917
918 private setWorkerStatistics (worker: Worker): void {
919 this.sendToWorker(worker, {
920 statistics: {
921 runTime:
922 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
923 .runTime.aggregate,
924 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
925 .elu.aggregate
926 }
927 })
928 }
929
930 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
931 const getTasksQueueSize = (workerNodeKey?: number): number => {
932 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
933 }
934 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
935 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
936 }
937 return {
938 tasks: {
939 executed: 0,
940 executing: 0,
941 get queued (): number {
942 return getTasksQueueSize(workerNodeKey)
943 },
944 get maxQueued (): number {
945 return getTasksMaxQueueSize(workerNodeKey)
946 },
947 failed: 0
948 },
949 runTime: {
950 aggregate: 0,
951 average: 0,
952 median: 0,
953 history: new CircularArray()
954 },
955 waitTime: {
956 aggregate: 0,
957 average: 0,
958 median: 0,
959 history: new CircularArray()
960 },
961 elu: {
962 idle: {
963 aggregate: 0,
964 average: 0,
965 median: 0,
966 history: new CircularArray()
967 },
968 active: {
969 aggregate: 0,
970 average: 0,
971 median: 0,
972 history: new CircularArray()
973 },
974 utilization: 0
975 }
976 }
977 }
978 }