6fde2491b6e1a565ba48ce26d821407eff67c8a2
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType
23 } from './pool'
24 import type {
25 IWorker,
26 MessageHandler,
27 Task,
28 WorkerNode,
29 WorkerUsage
30 } from './worker'
31 import {
32 Measurements,
33 WorkerChoiceStrategies,
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
36 } from './selection-strategies/selection-strategies-types'
37 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
38
39 /**
40 * Base class that implements some shared logic for all poolifier pools.
41 *
42 * @typeParam Worker - Type of worker which manages this pool.
43 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
44 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
45 */
46 export abstract class AbstractPool<
47 Worker extends IWorker,
48 Data = unknown,
49 Response = unknown
50 > implements IPool<Worker, Data, Response> {
51 /** @inheritDoc */
52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
53
54 /** @inheritDoc */
55 public readonly emitter?: PoolEmitter
56
57 /**
58 * The execution response promise map.
59 *
60 * - `key`: The message id of each submitted task.
61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
62 *
63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
64 */
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
69
70 /**
71 * Worker choice strategy context referencing a worker choice algorithm implementation.
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
74 Worker,
75 Data,
76 Response
77 >
78
79 /**
80 * Constructs a new poolifier pool.
81 *
82 * @param numberOfWorkers - Number of workers that this pool should manage.
83 * @param filePath - Path to the worker file.
84 * @param opts - Options for the pool.
85 */
86 public constructor (
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
90 ) {
91 if (!this.isMain()) {
92 throw new Error('Cannot start a pool from a worker!')
93 }
94 this.checkNumberOfWorkers(this.numberOfWorkers)
95 this.checkFilePath(this.filePath)
96 this.checkPoolOptions(this.opts)
97
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
102
103 if (this.opts.enableEvents === true) {
104 this.emitter = new PoolEmitter()
105 }
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
121 }
122
123 private checkFilePath (filePath: string): void {
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
138 throw new TypeError(
139 'Cannot instantiate a pool with a non safe integer number of workers'
140 )
141 } else if (numberOfWorkers < 0) {
142 throw new RangeError(
143 'Cannot instantiate a pool with a negative number of workers'
144 )
145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
174 }
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
181 throw new Error(
182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
183 )
184 }
185 }
186
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
195 if (
196 workerChoiceStrategyOptions.weights != null &&
197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
213 }
214
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
233 throw new Error(
234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
235 )
236 }
237 }
238
239 private get starting (): boolean {
240 return this.workerNodes.some(workerNode => !workerNode.info.started)
241 }
242
243 private get started (): boolean {
244 return this.workerNodes.some(workerNode => workerNode.info.started)
245 }
246
247 /** @inheritDoc */
248 public get info (): PoolInfo {
249 return {
250 type: this.type,
251 worker: this.worker,
252 minSize: this.minSize,
253 maxSize: this.maxSize,
254 workerNodes: this.workerNodes.length,
255 idleWorkerNodes: this.workerNodes.reduce(
256 (accumulator, workerNode) =>
257 workerNode.usage.tasks.executing === 0
258 ? accumulator + 1
259 : accumulator,
260 0
261 ),
262 busyWorkerNodes: this.workerNodes.reduce(
263 (accumulator, workerNode) =>
264 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
265 0
266 ),
267 executedTasks: this.workerNodes.reduce(
268 (accumulator, workerNode) =>
269 accumulator + workerNode.usage.tasks.executed,
270 0
271 ),
272 executingTasks: this.workerNodes.reduce(
273 (accumulator, workerNode) =>
274 accumulator + workerNode.usage.tasks.executing,
275 0
276 ),
277 queuedTasks: this.workerNodes.reduce(
278 (accumulator, workerNode) =>
279 accumulator + workerNode.usage.tasks.queued,
280 0
281 ),
282 maxQueuedTasks: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
284 accumulator + workerNode.usage.tasks.maxQueued,
285 0
286 ),
287 failedTasks: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
289 accumulator + workerNode.usage.tasks.failed,
290 0
291 )
292 }
293 }
294
295 /**
296 * Pool type.
297 *
298 * If it is `'dynamic'`, it provides the `max` property.
299 */
300 protected abstract get type (): PoolType
301
302 /**
303 * Gets the worker type.
304 */
305 protected abstract get worker (): WorkerType
306
307 /**
308 * Pool minimum size.
309 */
310 protected abstract get minSize (): number
311
312 /**
313 * Pool maximum size.
314 */
315 protected abstract get maxSize (): number
316
317 /**
318 * Get the worker given its id.
319 *
320 * @param workerId - The worker id.
321 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
322 */
323 private getWorkerById (workerId: number): Worker | undefined {
324 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
325 ?.worker
326 }
327
328 /**
329 * Gets the given worker its worker node key.
330 *
331 * @param worker - The worker.
332 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
333 */
334 private getWorkerNodeKey (worker: Worker): number {
335 return this.workerNodes.findIndex(
336 workerNode => workerNode.worker === worker
337 )
338 }
339
340 /** @inheritDoc */
341 public setWorkerChoiceStrategy (
342 workerChoiceStrategy: WorkerChoiceStrategy,
343 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
344 ): void {
345 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
346 this.opts.workerChoiceStrategy = workerChoiceStrategy
347 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
348 this.opts.workerChoiceStrategy
349 )
350 if (workerChoiceStrategyOptions != null) {
351 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
352 }
353 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
354 this.setWorkerNodeTasksUsage(
355 workerNode,
356 this.getWorkerUsage(workerNodeKey)
357 )
358 this.setWorkerStatistics(workerNode.worker)
359 }
360 }
361
362 /** @inheritDoc */
363 public setWorkerChoiceStrategyOptions (
364 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
365 ): void {
366 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
367 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
368 this.workerChoiceStrategyContext.setOptions(
369 this.opts.workerChoiceStrategyOptions
370 )
371 }
372
373 /** @inheritDoc */
374 public enableTasksQueue (
375 enable: boolean,
376 tasksQueueOptions?: TasksQueueOptions
377 ): void {
378 if (this.opts.enableTasksQueue === true && !enable) {
379 this.flushTasksQueues()
380 }
381 this.opts.enableTasksQueue = enable
382 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
383 }
384
385 /** @inheritDoc */
386 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
387 if (this.opts.enableTasksQueue === true) {
388 this.checkValidTasksQueueOptions(tasksQueueOptions)
389 this.opts.tasksQueueOptions =
390 this.buildTasksQueueOptions(tasksQueueOptions)
391 } else if (this.opts.tasksQueueOptions != null) {
392 delete this.opts.tasksQueueOptions
393 }
394 }
395
396 private buildTasksQueueOptions (
397 tasksQueueOptions: TasksQueueOptions
398 ): TasksQueueOptions {
399 return {
400 concurrency: tasksQueueOptions?.concurrency ?? 1
401 }
402 }
403
404 /**
405 * Whether the pool is full or not.
406 *
407 * The pool filling boolean status.
408 */
409 protected get full (): boolean {
410 return this.workerNodes.length >= this.maxSize
411 }
412
413 /**
414 * Whether the pool is busy or not.
415 *
416 * The pool busyness boolean status.
417 */
418 protected abstract get busy (): boolean
419
420 /**
421 * Whether worker nodes are executing at least one task.
422 *
423 * @returns Worker nodes busyness boolean status.
424 */
425 protected internalBusy (): boolean {
426 return (
427 this.workerNodes.findIndex(workerNode => {
428 return workerNode.usage.tasks.executing === 0
429 }) === -1
430 )
431 }
432
433 /** @inheritDoc */
434 public async execute (data?: Data, name?: string): Promise<Response> {
435 const timestamp = performance.now()
436 const workerNodeKey = this.chooseWorkerNode()
437 const submittedTask: Task<Data> = {
438 name,
439 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
440 data: data ?? ({} as Data),
441 timestamp,
442 id: crypto.randomUUID()
443 }
444 const res = new Promise<Response>((resolve, reject) => {
445 this.promiseResponseMap.set(submittedTask.id as string, {
446 resolve,
447 reject,
448 worker: this.workerNodes[workerNodeKey].worker
449 })
450 })
451 if (
452 this.opts.enableTasksQueue === true &&
453 (this.busy ||
454 this.workerNodes[workerNodeKey].usage.tasks.executing >=
455 ((this.opts.tasksQueueOptions as TasksQueueOptions)
456 .concurrency as number))
457 ) {
458 this.enqueueTask(workerNodeKey, submittedTask)
459 } else {
460 this.executeTask(workerNodeKey, submittedTask)
461 }
462 this.checkAndEmitEvents()
463 // eslint-disable-next-line @typescript-eslint/return-await
464 return res
465 }
466
467 /** @inheritDoc */
468 public async destroy (): Promise<void> {
469 await Promise.all(
470 this.workerNodes.map(async (workerNode, workerNodeKey) => {
471 this.flushTasksQueue(workerNodeKey)
472 // FIXME: wait for tasks to be finished
473 await this.destroyWorker(workerNode.worker)
474 })
475 )
476 }
477
478 /**
479 * Terminates the given worker.
480 *
481 * @param worker - A worker within `workerNodes`.
482 */
483 protected abstract destroyWorker (worker: Worker): void | Promise<void>
484
485 /**
486 * Setup hook to execute code before worker nodes are created in the abstract constructor.
487 * Can be overridden.
488 *
489 * @virtual
490 */
491 protected setupHook (): void {
492 // Intentionally empty
493 }
494
495 /**
496 * Should return whether the worker is the main worker or not.
497 */
498 protected abstract isMain (): boolean
499
500 /**
501 * Hook executed before the worker task execution.
502 * Can be overridden.
503 *
504 * @param workerNodeKey - The worker node key.
505 * @param task - The task to execute.
506 */
507 protected beforeTaskExecutionHook (
508 workerNodeKey: number,
509 task: Task<Data>
510 ): void {
511 const workerUsage = this.workerNodes[workerNodeKey].usage
512 ++workerUsage.tasks.executing
513 this.updateWaitTimeWorkerUsage(workerUsage, task)
514 }
515
516 /**
517 * Hook executed after the worker task execution.
518 * Can be overridden.
519 *
520 * @param worker - The worker.
521 * @param message - The received message.
522 */
523 protected afterTaskExecutionHook (
524 worker: Worker,
525 message: MessageValue<Response>
526 ): void {
527 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
528 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
529 this.updateRunTimeWorkerUsage(workerUsage, message)
530 this.updateEluWorkerUsage(workerUsage, message)
531 }
532
533 private updateTaskStatisticsWorkerUsage (
534 workerUsage: WorkerUsage,
535 message: MessageValue<Response>
536 ): void {
537 const workerTaskStatistics = workerUsage.tasks
538 --workerTaskStatistics.executing
539 ++workerTaskStatistics.executed
540 if (message.taskError != null) {
541 ++workerTaskStatistics.failed
542 }
543 }
544
545 private updateRunTimeWorkerUsage (
546 workerUsage: WorkerUsage,
547 message: MessageValue<Response>
548 ): void {
549 if (
550 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
551 .aggregate
552 ) {
553 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
554 if (
555 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
556 .average &&
557 workerUsage.tasks.executed !== 0
558 ) {
559 workerUsage.runTime.average =
560 workerUsage.runTime.aggregate /
561 (workerUsage.tasks.executed - workerUsage.tasks.failed)
562 }
563 if (
564 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
565 .median &&
566 message.taskPerformance?.runTime != null
567 ) {
568 workerUsage.runTime.history.push(message.taskPerformance.runTime)
569 workerUsage.runTime.median = median(workerUsage.runTime.history)
570 }
571 }
572 }
573
574 private updateWaitTimeWorkerUsage (
575 workerUsage: WorkerUsage,
576 task: Task<Data>
577 ): void {
578 const timestamp = performance.now()
579 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
580 if (
581 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
582 .aggregate
583 ) {
584 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
585 if (
586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
587 .waitTime.average &&
588 workerUsage.tasks.executed !== 0
589 ) {
590 workerUsage.waitTime.average =
591 workerUsage.waitTime.aggregate /
592 (workerUsage.tasks.executed - workerUsage.tasks.failed)
593 }
594 if (
595 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
596 .waitTime.median &&
597 taskWaitTime != null
598 ) {
599 workerUsage.waitTime.history.push(taskWaitTime)
600 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
601 }
602 }
603 }
604
605 private updateEluWorkerUsage (
606 workerUsage: WorkerUsage,
607 message: MessageValue<Response>
608 ): void {
609 if (
610 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
611 .aggregate
612 ) {
613 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
614 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
615 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
616 workerUsage.elu.utilization =
617 (workerUsage.elu.utilization +
618 message.taskPerformance.elu.utilization) /
619 2
620 } else if (message.taskPerformance?.elu != null) {
621 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
622 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
623 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
624 }
625 if (
626 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
627 .average &&
628 workerUsage.tasks.executed !== 0
629 ) {
630 const executedTasks =
631 workerUsage.tasks.executed - workerUsage.tasks.failed
632 workerUsage.elu.idle.average =
633 workerUsage.elu.idle.aggregate / executedTasks
634 workerUsage.elu.active.average =
635 workerUsage.elu.active.aggregate / executedTasks
636 }
637 if (
638 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
639 .median &&
640 message.taskPerformance?.elu != null
641 ) {
642 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
643 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
644 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
645 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
646 }
647 }
648 }
649
650 /**
651 * Chooses a worker node for the next task.
652 *
653 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
654 *
655 * @returns The worker node key
656 */
657 private chooseWorkerNode (): number {
658 if (this.shallCreateDynamicWorker()) {
659 const worker = this.createAndSetupDynamicWorker()
660 if (
661 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
662 ) {
663 return this.getWorkerNodeKey(worker)
664 }
665 }
666 return this.workerChoiceStrategyContext.execute()
667 }
668
669 /**
670 * Conditions for dynamic worker creation.
671 *
672 * @returns Whether to create a dynamic worker or not.
673 */
674 private shallCreateDynamicWorker (): boolean {
675 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
676 }
677
678 /**
679 * Sends a message to the given worker.
680 *
681 * @param worker - The worker which should receive the message.
682 * @param message - The message.
683 */
684 protected abstract sendToWorker (
685 worker: Worker,
686 message: MessageValue<Data>
687 ): void
688
689 /**
690 * Registers a listener callback on the given worker.
691 *
692 * @param worker - The worker which should register a listener.
693 * @param listener - The message listener callback.
694 */
695 private registerWorkerMessageListener<Message extends Data | Response>(
696 worker: Worker,
697 listener: (message: MessageValue<Message>) => void
698 ): void {
699 worker.on('message', listener as MessageHandler<Worker>)
700 }
701
702 /**
703 * Creates a new worker.
704 *
705 * @returns Newly created worker.
706 */
707 protected abstract createWorker (): Worker
708
709 /**
710 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
711 * Can be overridden.
712 *
713 * @param worker - The newly created worker.
714 */
715 protected afterWorkerSetup (worker: Worker): void {
716 // Listen to worker messages.
717 this.registerWorkerMessageListener(worker, this.workerListener())
718 }
719
720 /**
721 * Creates a new worker and sets it up completely in the pool worker nodes.
722 *
723 * @returns New, completely set up worker.
724 */
725 protected createAndSetupWorker (): Worker {
726 const worker = this.createWorker()
727
728 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
729 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
730 worker.on('error', error => {
731 if (this.emitter != null) {
732 this.emitter.emit(PoolEvents.error, error)
733 }
734 if (this.opts.restartWorkerOnError === true && !this.starting) {
735 this.createAndSetupWorker()
736 }
737 })
738 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
739 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
740 worker.once('exit', () => {
741 this.removeWorkerNode(worker)
742 })
743
744 this.pushWorkerNode(worker)
745
746 this.setWorkerStatistics(worker)
747
748 this.afterWorkerSetup(worker)
749
750 return worker
751 }
752
753 /**
754 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
755 *
756 * @returns New, completely set up dynamic worker.
757 */
758 protected createAndSetupDynamicWorker (): Worker {
759 const worker = this.createAndSetupWorker()
760 this.registerWorkerMessageListener(worker, message => {
761 const workerNodeKey = this.getWorkerNodeKey(worker)
762 if (
763 isKillBehavior(KillBehaviors.HARD, message.kill) ||
764 (message.kill != null &&
765 ((this.opts.enableTasksQueue === false &&
766 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
767 (this.opts.enableTasksQueue === true &&
768 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
769 this.tasksQueueSize(workerNodeKey) === 0)))
770 ) {
771 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
772 void (this.destroyWorker(worker) as Promise<void>)
773 }
774 })
775 return worker
776 }
777
778 /**
779 * This function is the listener registered for each worker message.
780 *
781 * @returns The listener function to execute when a message is received from a worker.
782 */
783 protected workerListener (): (message: MessageValue<Response>) => void {
784 return message => {
785 if (message.workerId != null && message.started != null) {
786 // Worker started message received
787 this.workerNodes[
788 this.getWorkerNodeKey(this.getWorkerById(message.workerId) as Worker)
789 ].info.started = message.started
790 } else if (message.id != null) {
791 // Task execution response received
792 const promiseResponse = this.promiseResponseMap.get(message.id)
793 if (promiseResponse != null) {
794 if (message.taskError != null) {
795 if (this.emitter != null) {
796 this.emitter.emit(PoolEvents.taskError, message.taskError)
797 }
798 promiseResponse.reject(message.taskError.message)
799 } else {
800 promiseResponse.resolve(message.data as Response)
801 }
802 this.afterTaskExecutionHook(promiseResponse.worker, message)
803 this.promiseResponseMap.delete(message.id)
804 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
805 if (
806 this.opts.enableTasksQueue === true &&
807 this.tasksQueueSize(workerNodeKey) > 0
808 ) {
809 this.executeTask(
810 workerNodeKey,
811 this.dequeueTask(workerNodeKey) as Task<Data>
812 )
813 }
814 this.workerChoiceStrategyContext.update(workerNodeKey)
815 }
816 }
817 }
818 }
819
820 private checkAndEmitEvents (): void {
821 if (this.emitter != null) {
822 if (this.busy) {
823 this.emitter?.emit(PoolEvents.busy, this.info)
824 }
825 if (this.type === PoolTypes.dynamic && this.full) {
826 this.emitter?.emit(PoolEvents.full, this.info)
827 }
828 }
829 }
830
831 /**
832 * Sets the given worker node its tasks usage in the pool.
833 *
834 * @param workerNode - The worker node.
835 * @param workerUsage - The worker usage.
836 */
837 private setWorkerNodeTasksUsage (
838 workerNode: WorkerNode<Worker, Data>,
839 workerUsage: WorkerUsage
840 ): void {
841 workerNode.usage = workerUsage
842 }
843
844 /**
845 * Pushes the given worker in the pool worker nodes.
846 *
847 * @param worker - The worker.
848 * @returns The worker nodes length.
849 */
850 private pushWorkerNode (worker: Worker): number {
851 this.workerNodes.push({
852 worker,
853 info: { id: worker.threadId ?? worker.id, started: false },
854 usage: this.getWorkerUsage(),
855 tasksQueue: new Queue<Task<Data>>()
856 })
857 const workerNodeKey = this.getWorkerNodeKey(worker)
858 this.setWorkerNodeTasksUsage(
859 this.workerNodes[workerNodeKey],
860 this.getWorkerUsage(workerNodeKey)
861 )
862 return this.workerNodes.length
863 }
864
865 // /**
866 // * Sets the given worker in the pool worker nodes.
867 // *
868 // * @param workerNodeKey - The worker node key.
869 // * @param worker - The worker.
870 // * @param workerInfo - The worker info.
871 // * @param workerUsage - The worker usage.
872 // * @param tasksQueue - The worker task queue.
873 // */
874 // private setWorkerNode (
875 // workerNodeKey: number,
876 // worker: Worker,
877 // workerInfo: WorkerInfo,
878 // workerUsage: WorkerUsage,
879 // tasksQueue: Queue<Task<Data>>
880 // ): void {
881 // this.workerNodes[workerNodeKey] = {
882 // worker,
883 // info: workerInfo,
884 // usage: workerUsage,
885 // tasksQueue
886 // }
887 // }
888
889 /**
890 * Removes the given worker from the pool worker nodes.
891 *
892 * @param worker - The worker.
893 */
894 private removeWorkerNode (worker: Worker): void {
895 const workerNodeKey = this.getWorkerNodeKey(worker)
896 if (workerNodeKey !== -1) {
897 this.workerNodes.splice(workerNodeKey, 1)
898 this.workerChoiceStrategyContext.remove(workerNodeKey)
899 }
900 }
901
902 private executeTask (workerNodeKey: number, task: Task<Data>): void {
903 this.beforeTaskExecutionHook(workerNodeKey, task)
904 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
905 }
906
907 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
908 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
909 }
910
911 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
912 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
913 }
914
915 private tasksQueueSize (workerNodeKey: number): number {
916 return this.workerNodes[workerNodeKey].tasksQueue.size
917 }
918
919 private tasksMaxQueueSize (workerNodeKey: number): number {
920 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
921 }
922
923 private flushTasksQueue (workerNodeKey: number): void {
924 if (this.tasksQueueSize(workerNodeKey) > 0) {
925 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
926 this.executeTask(
927 workerNodeKey,
928 this.dequeueTask(workerNodeKey) as Task<Data>
929 )
930 }
931 }
932 this.workerNodes[workerNodeKey].tasksQueue.clear()
933 }
934
935 private flushTasksQueues (): void {
936 for (const [workerNodeKey] of this.workerNodes.entries()) {
937 this.flushTasksQueue(workerNodeKey)
938 }
939 }
940
941 private setWorkerStatistics (worker: Worker): void {
942 this.sendToWorker(worker, {
943 statistics: {
944 runTime:
945 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
946 .runTime.aggregate,
947 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
948 .elu.aggregate
949 }
950 })
951 }
952
953 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
954 const getTasksQueueSize = (workerNodeKey?: number): number => {
955 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
956 }
957 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
958 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
959 }
960 return {
961 tasks: {
962 executed: 0,
963 executing: 0,
964 get queued (): number {
965 return getTasksQueueSize(workerNodeKey)
966 },
967 get maxQueued (): number {
968 return getTasksMaxQueueSize(workerNodeKey)
969 },
970 failed: 0
971 },
972 runTime: {
973 aggregate: 0,
974 average: 0,
975 median: 0,
976 history: new CircularArray()
977 },
978 waitTime: {
979 aggregate: 0,
980 average: 0,
981 median: 0,
982 history: new CircularArray()
983 },
984 elu: {
985 idle: {
986 aggregate: 0,
987 average: 0,
988 median: 0,
989 history: new CircularArray()
990 },
991 active: {
992 aggregate: 0,
993 average: 0,
994 median: 0,
995 history: new CircularArray()
996 },
997 utilization: 0
998 }
999 }
1000 }
1001 }