71f765ac575da13933a1543cd069feb0b6b1984d
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type {
25 IWorker,
26 MessageHandler,
27 Task,
28 WorkerNode,
29 WorkerUsage
30 } from './worker'
31 import {
32 Measurements,
33 WorkerChoiceStrategies,
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
36 } from './selection-strategies/selection-strategies-types'
37 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
38
39 /**
40 * Base class that implements some shared logic for all poolifier pools.
41 *
42 * @typeParam Worker - Type of worker which manages this pool.
43 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
44 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
45 */
46 export abstract class AbstractPool<
47 Worker extends IWorker,
48 Data = unknown,
49 Response = unknown
50 > implements IPool<Worker, Data, Response> {
51 /** @inheritDoc */
52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
53
54 /** @inheritDoc */
55 public readonly emitter?: PoolEmitter
56
57 /**
58 * The execution response promise map.
59 *
60 * - `key`: The message id of each submitted task.
61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
62 *
63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
64 */
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
69
70 /**
71 * Worker choice strategy context referencing a worker choice algorithm implementation.
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
74 Worker,
75 Data,
76 Response
77 >
78
79 /**
80 * Constructs a new poolifier pool.
81 *
82 * @param numberOfWorkers - Number of workers that this pool should manage.
83 * @param filePath - Path to the worker file.
84 * @param opts - Options for the pool.
85 */
86 public constructor (
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
90 ) {
91 if (!this.isMain()) {
92 throw new Error('Cannot start a pool from a worker!')
93 }
94 this.checkNumberOfWorkers(this.numberOfWorkers)
95 this.checkFilePath(this.filePath)
96 this.checkPoolOptions(this.opts)
97
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
121 }
122
123 private checkFilePath (filePath: string): void {
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
138 throw new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 } else if (numberOfWorkers < 0) {
142 throw new RangeError(
143 'Cannot instantiate a pool with a negative number of workers'
144 )
145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
174 }
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
181 throw new Error(
182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
183 )
184 }
185 }
186
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
195 if (
196 workerChoiceStrategyOptions.weights != null &&
197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
213 }
214
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
233 throw new Error(
234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
235 )
236 }
237 }
238
239 /** @inheritDoc */
240 public get info (): PoolInfo {
241 return {
242 type: this.type,
243 worker: this.worker,
244 minSize: this.minSize,
245 maxSize: this.maxSize,
246 workerNodes: this.workerNodes.length,
247 idleWorkerNodes: this.workerNodes.reduce(
248 (accumulator, workerNode) =>
249 workerNode.usage.tasks.executing === 0
250 ? accumulator + 1
251 : accumulator,
252 0
253 ),
254 busyWorkerNodes: this.workerNodes.reduce(
255 (accumulator, workerNode) =>
256 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
257 0
258 ),
259 executedTasks: this.workerNodes.reduce(
260 (accumulator, workerNode) =>
261 accumulator + workerNode.usage.tasks.executed,
262 0
263 ),
264 executingTasks: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
266 accumulator + workerNode.usage.tasks.executing,
267 0
268 ),
269 queuedTasks: this.workerNodes.reduce(
270 (accumulator, workerNode) =>
271 accumulator + workerNode.usage.tasks.queued,
272 0
273 ),
274 maxQueuedTasks: this.workerNodes.reduce(
275 (accumulator, workerNode) =>
276 accumulator + workerNode.usage.tasks.maxQueued,
277 0
278 ),
279 failedTasks: this.workerNodes.reduce(
280 (accumulator, workerNode) =>
281 accumulator + workerNode.usage.tasks.failed,
282 0
283 )
284 }
285 }
286
287 /**
288 * Pool type.
289 *
290 * If it is `'dynamic'`, it provides the `max` property.
291 */
292 protected abstract get type (): PoolType
293
294 /**
295 * Gets the worker type.
296 */
297 protected abstract get worker (): WorkerType
298
299 /**
300 * Pool minimum size.
301 */
302 protected abstract get minSize (): number
303
304 /**
305 * Pool maximum size.
306 */
307 protected abstract get maxSize (): number
308
309 /**
310 * Gets the given worker its worker node key.
311 *
312 * @param worker - The worker.
313 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
314 */
315 private getWorkerNodeKey (worker: Worker): number {
316 return this.workerNodes.findIndex(
317 workerNode => workerNode.worker === worker
318 )
319 }
320
321 /** @inheritDoc */
322 public setWorkerChoiceStrategy (
323 workerChoiceStrategy: WorkerChoiceStrategy,
324 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
325 ): void {
326 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
327 this.opts.workerChoiceStrategy = workerChoiceStrategy
328 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
329 this.opts.workerChoiceStrategy
330 )
331 if (workerChoiceStrategyOptions != null) {
332 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
333 }
334 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
335 this.setWorkerNodeTasksUsage(
336 workerNode,
337 this.getWorkerUsage(workerNodeKey)
338 )
339 this.setWorkerStatistics(workerNode.worker)
340 }
341 }
342
343 /** @inheritDoc */
344 public setWorkerChoiceStrategyOptions (
345 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
346 ): void {
347 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
348 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
349 this.workerChoiceStrategyContext.setOptions(
350 this.opts.workerChoiceStrategyOptions
351 )
352 }
353
354 /** @inheritDoc */
355 public enableTasksQueue (
356 enable: boolean,
357 tasksQueueOptions?: TasksQueueOptions
358 ): void {
359 if (this.opts.enableTasksQueue === true && !enable) {
360 this.flushTasksQueues()
361 }
362 this.opts.enableTasksQueue = enable
363 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
364 }
365
366 /** @inheritDoc */
367 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
368 if (this.opts.enableTasksQueue === true) {
369 this.checkValidTasksQueueOptions(tasksQueueOptions)
370 this.opts.tasksQueueOptions =
371 this.buildTasksQueueOptions(tasksQueueOptions)
372 } else if (this.opts.tasksQueueOptions != null) {
373 delete this.opts.tasksQueueOptions
374 }
375 }
376
377 private buildTasksQueueOptions (
378 tasksQueueOptions: TasksQueueOptions
379 ): TasksQueueOptions {
380 return {
381 concurrency: tasksQueueOptions?.concurrency ?? 1
382 }
383 }
384
385 /**
386 * Whether the pool is full or not.
387 *
388 * The pool filling boolean status.
389 */
390 protected get full (): boolean {
391 return this.workerNodes.length >= this.maxSize
392 }
393
394 /**
395 * Whether the pool is busy or not.
396 *
397 * The pool busyness boolean status.
398 */
399 protected abstract get busy (): boolean
400
401 /**
402 * Whether worker nodes are executing at least one task.
403 *
404 * @returns Worker nodes busyness boolean status.
405 */
406 protected internalBusy (): boolean {
407 return (
408 this.workerNodes.findIndex(workerNode => {
409 return workerNode.usage.tasks.executing === 0
410 }) === -1
411 )
412 }
413
414 /** @inheritDoc */
415 public async execute (data?: Data, name?: string): Promise<Response> {
416 const timestamp = performance.now()
417 const workerNodeKey = this.chooseWorkerNode()
418 const submittedTask: Task<Data> = {
419 name,
420 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
421 data: data ?? ({} as Data),
422 timestamp,
423 id: crypto.randomUUID()
424 }
425 const res = new Promise<Response>((resolve, reject) => {
426 this.promiseResponseMap.set(submittedTask.id as string, {
427 resolve,
428 reject,
429 worker: this.workerNodes[workerNodeKey].worker
430 })
431 })
432 if (
433 this.opts.enableTasksQueue === true &&
434 (this.busy ||
435 this.workerNodes[workerNodeKey].usage.tasks.executing >=
436 ((this.opts.tasksQueueOptions as TasksQueueOptions)
437 .concurrency as number))
438 ) {
439 this.enqueueTask(workerNodeKey, submittedTask)
440 } else {
441 this.executeTask(workerNodeKey, submittedTask)
442 }
443 this.checkAndEmitEvents()
444 // eslint-disable-next-line @typescript-eslint/return-await
445 return res
446 }
447
448 /** @inheritDoc */
449 public async destroy (): Promise<void> {
450 await Promise.all(
451 this.workerNodes.map(async (workerNode, workerNodeKey) => {
452 this.flushTasksQueue(workerNodeKey)
453 // FIXME: wait for tasks to be finished
454 await this.destroyWorker(workerNode.worker)
455 })
456 )
457 }
458
459 /**
460 * Terminates the given worker.
461 *
462 * @param worker - A worker within `workerNodes`.
463 */
464 protected abstract destroyWorker (worker: Worker): void | Promise<void>
465
466 /**
467 * Setup hook to execute code before worker nodes are created in the abstract constructor.
468 * Can be overridden.
469 *
470 * @virtual
471 */
472 protected setupHook (): void {
473 // Intentionally empty
474 }
475
476 /**
477 * Should return whether the worker is the main worker or not.
478 */
479 protected abstract isMain (): boolean
480
481 /**
482 * Hook executed before the worker task execution.
483 * Can be overridden.
484 *
485 * @param workerNodeKey - The worker node key.
486 * @param task - The task to execute.
487 */
488 protected beforeTaskExecutionHook (
489 workerNodeKey: number,
490 task: Task<Data>
491 ): void {
492 const workerUsage = this.workerNodes[workerNodeKey].usage
493 ++workerUsage.tasks.executing
494 this.updateWaitTimeWorkerUsage(workerUsage, task)
495 }
496
497 /**
498 * Hook executed after the worker task execution.
499 * Can be overridden.
500 *
501 * @param worker - The worker.
502 * @param message - The received message.
503 */
504 protected afterTaskExecutionHook (
505 worker: Worker,
506 message: MessageValue<Response>
507 ): void {
508 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
509 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
510 this.updateRunTimeWorkerUsage(workerUsage, message)
511 this.updateEluWorkerUsage(workerUsage, message)
512 }
513
514 private updateTaskStatisticsWorkerUsage (
515 workerUsage: WorkerUsage,
516 message: MessageValue<Response>
517 ): void {
518 const workerTaskStatistics = workerUsage.tasks
519 --workerTaskStatistics.executing
520 ++workerTaskStatistics.executed
521 if (message.taskError != null) {
522 ++workerTaskStatistics.failed
523 }
524 }
525
526 private updateRunTimeWorkerUsage (
527 workerUsage: WorkerUsage,
528 message: MessageValue<Response>
529 ): void {
530 if (
531 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
532 .aggregate
533 ) {
534 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
535 if (
536 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
537 .average &&
538 workerUsage.tasks.executed !== 0
539 ) {
540 workerUsage.runTime.average =
541 workerUsage.runTime.aggregate /
542 (workerUsage.tasks.executed - workerUsage.tasks.failed)
543 }
544 if (
545 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
546 .median &&
547 message.taskPerformance?.runTime != null
548 ) {
549 workerUsage.runTime.history.push(message.taskPerformance.runTime)
550 workerUsage.runTime.median = median(workerUsage.runTime.history)
551 }
552 }
553 }
554
555 private updateWaitTimeWorkerUsage (
556 workerUsage: WorkerUsage,
557 task: Task<Data>
558 ): void {
559 const timestamp = performance.now()
560 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
561 if (
562 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
563 .aggregate
564 ) {
565 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
566 if (
567 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
568 .waitTime.average &&
569 workerUsage.tasks.executed !== 0
570 ) {
571 workerUsage.waitTime.average =
572 workerUsage.waitTime.aggregate /
573 (workerUsage.tasks.executed - workerUsage.tasks.failed)
574 }
575 if (
576 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
577 .waitTime.median &&
578 taskWaitTime != null
579 ) {
580 workerUsage.waitTime.history.push(taskWaitTime)
581 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
582 }
583 }
584 }
585
586 private updateEluWorkerUsage (
587 workerUsage: WorkerUsage,
588 message: MessageValue<Response>
589 ): void {
590 if (
591 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
592 .aggregate
593 ) {
594 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
595 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
596 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
597 workerUsage.elu.utilization =
598 (workerUsage.elu.utilization +
599 message.taskPerformance.elu.utilization) /
600 2
601 } else if (message.taskPerformance?.elu != null) {
602 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
603 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
604 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
605 }
606 if (
607 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
608 .average &&
609 workerUsage.tasks.executed !== 0
610 ) {
611 const executedTasks =
612 workerUsage.tasks.executed - workerUsage.tasks.failed
613 workerUsage.elu.idle.average =
614 workerUsage.elu.idle.aggregate / executedTasks
615 workerUsage.elu.active.average =
616 workerUsage.elu.active.aggregate / executedTasks
617 }
618 if (
619 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
620 .median &&
621 message.taskPerformance?.elu != null
622 ) {
623 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
624 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
625 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
626 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
627 }
628 }
629 }
630
631 /**
632 * Chooses a worker node for the next task.
633 *
634 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
635 *
636 * @returns The worker node key
637 */
638 private chooseWorkerNode (): number {
639 if (this.shallCreateDynamicWorker()) {
640 const worker = this.createAndSetupDynamicWorker()
641 if (
642 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
643 ) {
644 return this.getWorkerNodeKey(worker)
645 }
646 }
647 return this.workerChoiceStrategyContext.execute()
648 }
649
650 /**
651 * Conditions for dynamic worker creation.
652 *
653 * @returns Whether to create a dynamic worker or not.
654 */
655 private shallCreateDynamicWorker (): boolean {
656 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
657 }
658
659 /**
660 * Sends a message to the given worker.
661 *
662 * @param worker - The worker which should receive the message.
663 * @param message - The message.
664 */
665 protected abstract sendToWorker (
666 worker: Worker,
667 message: MessageValue<Data>
668 ): void
669
670 /**
671 * Registers a listener callback on the given worker.
672 *
673 * @param worker - The worker which should register a listener.
674 * @param listener - The message listener callback.
675 */
676 private registerWorkerMessageListener<Message extends Data | Response>(
677 worker: Worker,
678 listener: (message: MessageValue<Message>) => void
679 ): void {
680 worker.on('message', listener as MessageHandler<Worker>)
681 }
682
683 /**
684 * Creates a new worker.
685 *
686 * @returns Newly created worker.
687 */
688 protected abstract createWorker (): Worker
689
690 /**
691 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
692 * Can be overridden.
693 *
694 * @param worker - The newly created worker.
695 */
696 protected afterWorkerSetup (worker: Worker): void {
697 // Listen to worker messages.
698 this.registerWorkerMessageListener(worker, this.workerListener())
699 }
700
701 /**
702 * Creates a new worker and sets it up completely in the pool worker nodes.
703 *
704 * @returns New, completely set up worker.
705 */
706 protected createAndSetupWorker (): Worker {
707 const worker = this.createWorker()
708
709 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
710 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
711 worker.on('error', error => {
712 if (this.emitter != null) {
713 this.emitter.emit(PoolEvents.error, error)
714 }
715 if (this.opts.restartWorkerOnError === true) {
716 this.createAndSetupWorker()
717 }
718 })
719 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
720 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
721 worker.once('exit', () => {
722 this.removeWorkerNode(worker)
723 })
724
725 this.pushWorkerNode(worker)
726
727 this.setWorkerStatistics(worker)
728
729 this.afterWorkerSetup(worker)
730
731 return worker
732 }
733
734 /**
735 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
736 *
737 * @returns New, completely set up dynamic worker.
738 */
739 protected createAndSetupDynamicWorker (): Worker {
740 const worker = this.createAndSetupWorker()
741 this.registerWorkerMessageListener(worker, message => {
742 const workerNodeKey = this.getWorkerNodeKey(worker)
743 if (
744 isKillBehavior(KillBehaviors.HARD, message.kill) ||
745 (message.kill != null &&
746 ((this.opts.enableTasksQueue === false &&
747 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
748 (this.opts.enableTasksQueue === true &&
749 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
750 this.tasksQueueSize(workerNodeKey) === 0)))
751 ) {
752 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
753 void (this.destroyWorker(worker) as Promise<void>)
754 }
755 })
756 return worker
757 }
758
759 /**
760 * This function is the listener registered for each worker message.
761 *
762 * @returns The listener function to execute when a message is received from a worker.
763 */
764 protected workerListener (): (message: MessageValue<Response>) => void {
765 return message => {
766 if (message.id != null) {
767 // Task execution response received
768 const promiseResponse = this.promiseResponseMap.get(message.id)
769 if (promiseResponse != null) {
770 if (message.taskError != null) {
771 if (this.emitter != null) {
772 this.emitter.emit(PoolEvents.taskError, message.taskError)
773 }
774 promiseResponse.reject(message.taskError.message)
775 } else {
776 promiseResponse.resolve(message.data as Response)
777 }
778 this.afterTaskExecutionHook(promiseResponse.worker, message)
779 this.promiseResponseMap.delete(message.id)
780 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
781 if (
782 this.opts.enableTasksQueue === true &&
783 this.tasksQueueSize(workerNodeKey) > 0
784 ) {
785 this.executeTask(
786 workerNodeKey,
787 this.dequeueTask(workerNodeKey) as Task<Data>
788 )
789 }
790 this.workerChoiceStrategyContext.update(workerNodeKey)
791 }
792 }
793 }
794 }
795
796 private checkAndEmitEvents (): void {
797 if (this.emitter != null) {
798 if (this.busy) {
799 this.emitter?.emit(PoolEvents.busy, this.info)
800 }
801 if (this.type === PoolTypes.dynamic && this.full) {
802 this.emitter?.emit(PoolEvents.full, this.info)
803 }
804 }
805 }
806
807 /**
808 * Sets the given worker node its tasks usage in the pool.
809 *
810 * @param workerNode - The worker node.
811 * @param workerUsage - The worker usage.
812 */
813 private setWorkerNodeTasksUsage (
814 workerNode: WorkerNode<Worker, Data>,
815 workerUsage: WorkerUsage
816 ): void {
817 workerNode.usage = workerUsage
818 }
819
820 /**
821 * Pushes the given worker in the pool worker nodes.
822 *
823 * @param worker - The worker.
824 * @returns The worker nodes length.
825 */
826 private pushWorkerNode (worker: Worker): number {
827 this.workerNodes.push({
828 worker,
829 usage: this.getWorkerUsage(),
830 tasksQueue: new Queue<Task<Data>>()
831 })
832 const workerNodeKey = this.getWorkerNodeKey(worker)
833 this.setWorkerNodeTasksUsage(
834 this.workerNodes[workerNodeKey],
835 this.getWorkerUsage(workerNodeKey)
836 )
837 return this.workerNodes.length
838 }
839
840 // /**
841 // * Sets the given worker in the pool worker nodes.
842 // *
843 // * @param workerNodeKey - The worker node key.
844 // * @param worker - The worker.
845 // * @param workerUsage - The worker usage.
846 // * @param tasksQueue - The worker task queue.
847 // */
848 // private setWorkerNode (
849 // workerNodeKey: number,
850 // worker: Worker,
851 // workerUsage: WorkerUsage,
852 // tasksQueue: Queue<Task<Data>>
853 // ): void {
854 // this.workerNodes[workerNodeKey] = {
855 // worker,
856 // usage: workerUsage,
857 // tasksQueue
858 // }
859 // }
860
861 /**
862 * Removes the given worker from the pool worker nodes.
863 *
864 * @param worker - The worker.
865 */
866 private removeWorkerNode (worker: Worker): void {
867 const workerNodeKey = this.getWorkerNodeKey(worker)
868 if (workerNodeKey !== -1) {
869 this.workerNodes.splice(workerNodeKey, 1)
870 this.workerChoiceStrategyContext.remove(workerNodeKey)
871 }
872 }
873
874 private executeTask (workerNodeKey: number, task: Task<Data>): void {
875 this.beforeTaskExecutionHook(workerNodeKey, task)
876 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
877 }
878
879 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
880 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
881 }
882
883 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
884 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
885 }
886
887 private tasksQueueSize (workerNodeKey: number): number {
888 return this.workerNodes[workerNodeKey].tasksQueue.size
889 }
890
891 private tasksMaxQueueSize (workerNodeKey: number): number {
892 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
893 }
894
895 private flushTasksQueue (workerNodeKey: number): void {
896 if (this.tasksQueueSize(workerNodeKey) > 0) {
897 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
898 this.executeTask(
899 workerNodeKey,
900 this.dequeueTask(workerNodeKey) as Task<Data>
901 )
902 }
903 }
904 this.workerNodes[workerNodeKey].tasksQueue.clear()
905 }
906
907 private flushTasksQueues (): void {
908 for (const [workerNodeKey] of this.workerNodes.entries()) {
909 this.flushTasksQueue(workerNodeKey)
910 }
911 }
912
913 private setWorkerStatistics (worker: Worker): void {
914 this.sendToWorker(worker, {
915 statistics: {
916 runTime:
917 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
918 .runTime.aggregate,
919 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
920 .elu.aggregate
921 }
922 })
923 }
924
925 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
926 const getTasksQueueSize = (workerNodeKey?: number): number => {
927 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
928 }
929 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
930 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
931 }
932 return {
933 tasks: {
934 executed: 0,
935 executing: 0,
936 get queued (): number {
937 return getTasksQueueSize(workerNodeKey)
938 },
939 get maxQueued (): number {
940 return getTasksMaxQueueSize(workerNodeKey)
941 },
942 failed: 0
943 },
944 runTime: {
945 aggregate: 0,
946 average: 0,
947 median: 0,
948 history: new CircularArray()
949 },
950 waitTime: {
951 aggregate: 0,
952 average: 0,
953 median: 0,
954 history: new CircularArray()
955 },
956 elu: {
957 idle: {
958 aggregate: 0,
959 average: 0,
960 median: 0,
961 history: new CircularArray()
962 },
963 active: {
964 aggregate: 0,
965 average: 0,
966 median: 0,
967 history: new CircularArray()
968 },
969 utilization: 0
970 }
971 }
972 }
973 }