ba05d92959a83f362fd721b4ca3615e66c86d196
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isPlainObject,
8 median
9 } from '../utils'
10 import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
11 import { CircularArray } from '../circular-array'
12 import { Queue } from '../queue'
13 import {
14 type IPool,
15 PoolEmitter,
16 PoolEvents,
17 type PoolInfo,
18 type PoolOptions,
19 type PoolType,
20 PoolTypes,
21 type TasksQueueOptions,
22 type WorkerType,
23 WorkerTypes
24 } from './pool'
25 import type {
26 IWorker,
27 MessageHandler,
28 Task,
29 WorkerNode,
30 WorkerUsage
31 } from './worker'
32 import {
33 Measurements,
34 WorkerChoiceStrategies,
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
37 } from './selection-strategies/selection-strategies-types'
38 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
39
40 /**
41 * Base class that implements some shared logic for all poolifier pools.
42 *
43 * @typeParam Worker - Type of worker which manages this pool.
44 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
45 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
46 */
47 export abstract class AbstractPool<
48 Worker extends IWorker,
49 Data = unknown,
50 Response = unknown
51 > implements IPool<Worker, Data, Response> {
52 /** @inheritDoc */
53 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
54
55 /** @inheritDoc */
56 public readonly emitter?: PoolEmitter
57
58 /**
59 * The execution response promise map.
60 *
61 * - `key`: The message id of each submitted task.
62 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
63 *
64 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
65 */
66 protected promiseResponseMap: Map<
67 string,
68 PromiseResponseWrapper<Worker, Response>
69 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
70
71 /**
72 * Worker choice strategy context referencing a worker choice algorithm implementation.
73 */
74 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
75 Worker,
76 Data,
77 Response
78 >
79
80 /**
81 * Constructs a new poolifier pool.
82 *
83 * @param numberOfWorkers - Number of workers that this pool should manage.
84 * @param filePath - Path to the worker file.
85 * @param opts - Options for the pool.
86 */
87 public constructor (
88 protected readonly numberOfWorkers: number,
89 protected readonly filePath: string,
90 protected readonly opts: PoolOptions<Worker>
91 ) {
92 if (!this.isMain()) {
93 throw new Error('Cannot start a pool from a worker!')
94 }
95 this.checkNumberOfWorkers(this.numberOfWorkers)
96 this.checkFilePath(this.filePath)
97 this.checkPoolOptions(this.opts)
98
99 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
100 this.executeTask = this.executeTask.bind(this)
101 this.enqueueTask = this.enqueueTask.bind(this)
102 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
103
104 if (this.opts.enableEvents === true) {
105 this.emitter = new PoolEmitter()
106 }
107 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
108 Worker,
109 Data,
110 Response
111 >(
112 this,
113 this.opts.workerChoiceStrategy,
114 this.opts.workerChoiceStrategyOptions
115 )
116
117 this.setupHook()
118
119 for (let i = 1; i <= this.numberOfWorkers; i++) {
120 this.createAndSetupWorker()
121 }
122 }
123
124 private checkFilePath (filePath: string): void {
125 if (
126 filePath == null ||
127 (typeof filePath === 'string' && filePath.trim().length === 0)
128 ) {
129 throw new Error('Please specify a file with a worker implementation')
130 }
131 }
132
133 private checkNumberOfWorkers (numberOfWorkers: number): void {
134 if (numberOfWorkers == null) {
135 throw new Error(
136 'Cannot instantiate a pool without specifying the number of workers'
137 )
138 } else if (!Number.isSafeInteger(numberOfWorkers)) {
139 throw new TypeError(
140 'Cannot instantiate a pool with a non safe integer number of workers'
141 )
142 } else if (numberOfWorkers < 0) {
143 throw new RangeError(
144 'Cannot instantiate a pool with a negative number of workers'
145 )
146 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
147 throw new Error('Cannot instantiate a fixed pool with no worker')
148 }
149 }
150
151 private checkPoolOptions (opts: PoolOptions<Worker>): void {
152 if (isPlainObject(opts)) {
153 this.opts.workerChoiceStrategy =
154 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
155 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
156 this.opts.workerChoiceStrategyOptions =
157 opts.workerChoiceStrategyOptions ??
158 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
159 this.checkValidWorkerChoiceStrategyOptions(
160 this.opts.workerChoiceStrategyOptions
161 )
162 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
163 this.opts.enableEvents = opts.enableEvents ?? true
164 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
165 if (this.opts.enableTasksQueue) {
166 this.checkValidTasksQueueOptions(
167 opts.tasksQueueOptions as TasksQueueOptions
168 )
169 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
170 opts.tasksQueueOptions as TasksQueueOptions
171 )
172 }
173 } else {
174 throw new TypeError('Invalid pool options: must be a plain object')
175 }
176 }
177
178 private checkValidWorkerChoiceStrategy (
179 workerChoiceStrategy: WorkerChoiceStrategy
180 ): void {
181 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
182 throw new Error(
183 `Invalid worker choice strategy '${workerChoiceStrategy}'`
184 )
185 }
186 }
187
188 private checkValidWorkerChoiceStrategyOptions (
189 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
190 ): void {
191 if (!isPlainObject(workerChoiceStrategyOptions)) {
192 throw new TypeError(
193 'Invalid worker choice strategy options: must be a plain object'
194 )
195 }
196 if (
197 workerChoiceStrategyOptions.weights != null &&
198 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
199 ) {
200 throw new Error(
201 'Invalid worker choice strategy options: must have a weight for each worker node'
202 )
203 }
204 if (
205 workerChoiceStrategyOptions.measurement != null &&
206 !Object.values(Measurements).includes(
207 workerChoiceStrategyOptions.measurement
208 )
209 ) {
210 throw new Error(
211 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
212 )
213 }
214 }
215
216 private checkValidTasksQueueOptions (
217 tasksQueueOptions: TasksQueueOptions
218 ): void {
219 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
220 throw new TypeError('Invalid tasks queue options: must be a plain object')
221 }
222 if (
223 tasksQueueOptions?.concurrency != null &&
224 !Number.isSafeInteger(tasksQueueOptions.concurrency)
225 ) {
226 throw new TypeError(
227 'Invalid worker tasks concurrency: must be an integer'
228 )
229 }
230 if (
231 tasksQueueOptions?.concurrency != null &&
232 tasksQueueOptions.concurrency <= 0
233 ) {
234 throw new Error(
235 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
236 )
237 }
238 }
239
240 private get starting (): boolean {
241 return this.workerNodes.some(workerNode => !workerNode.info.started)
242 }
243
244 private get started (): boolean {
245 return this.workerNodes.some(workerNode => workerNode.info.started)
246 }
247
248 /** @inheritDoc */
249 public get info (): PoolInfo {
250 return {
251 type: this.type,
252 worker: this.worker,
253 minSize: this.minSize,
254 maxSize: this.maxSize,
255 workerNodes: this.workerNodes.length,
256 idleWorkerNodes: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
258 workerNode.usage.tasks.executing === 0
259 ? accumulator + 1
260 : accumulator,
261 0
262 ),
263 busyWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
265 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
266 0
267 ),
268 executedTasks: this.workerNodes.reduce(
269 (accumulator, workerNode) =>
270 accumulator + workerNode.usage.tasks.executed,
271 0
272 ),
273 executingTasks: this.workerNodes.reduce(
274 (accumulator, workerNode) =>
275 accumulator + workerNode.usage.tasks.executing,
276 0
277 ),
278 queuedTasks: this.workerNodes.reduce(
279 (accumulator, workerNode) =>
280 accumulator + workerNode.usage.tasks.queued,
281 0
282 ),
283 maxQueuedTasks: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
285 accumulator + workerNode.usage.tasks.maxQueued,
286 0
287 ),
288 failedTasks: this.workerNodes.reduce(
289 (accumulator, workerNode) =>
290 accumulator + workerNode.usage.tasks.failed,
291 0
292 )
293 }
294 }
295
296 /**
297 * Pool type.
298 *
299 * If it is `'dynamic'`, it provides the `max` property.
300 */
301 protected abstract get type (): PoolType
302
303 /**
304 * Gets the worker type.
305 */
306 protected abstract get worker (): WorkerType
307
308 /**
309 * Pool minimum size.
310 */
311 protected abstract get minSize (): number
312
313 /**
314 * Pool maximum size.
315 */
316 protected abstract get maxSize (): number
317
318 /**
319 * Get the worker given its id.
320 *
321 * @param workerId - The worker id.
322 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
323 */
324 private getWorkerById (workerId: number): Worker | undefined {
325 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
326 ?.worker
327 }
328
329 /**
330 * Gets the given worker its worker node key.
331 *
332 * @param worker - The worker.
333 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
334 */
335 private getWorkerNodeKey (worker: Worker): number {
336 return this.workerNodes.findIndex(
337 workerNode => workerNode.worker === worker
338 )
339 }
340
341 /** @inheritDoc */
342 public setWorkerChoiceStrategy (
343 workerChoiceStrategy: WorkerChoiceStrategy,
344 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
345 ): void {
346 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
347 this.opts.workerChoiceStrategy = workerChoiceStrategy
348 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
349 this.opts.workerChoiceStrategy
350 )
351 if (workerChoiceStrategyOptions != null) {
352 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
353 }
354 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
355 this.setWorkerNodeTasksUsage(
356 workerNode,
357 this.getWorkerUsage(workerNodeKey)
358 )
359 this.setWorkerStatistics(workerNode.worker)
360 }
361 }
362
363 /** @inheritDoc */
364 public setWorkerChoiceStrategyOptions (
365 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
366 ): void {
367 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
368 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
369 this.workerChoiceStrategyContext.setOptions(
370 this.opts.workerChoiceStrategyOptions
371 )
372 }
373
374 /** @inheritDoc */
375 public enableTasksQueue (
376 enable: boolean,
377 tasksQueueOptions?: TasksQueueOptions
378 ): void {
379 if (this.opts.enableTasksQueue === true && !enable) {
380 this.flushTasksQueues()
381 }
382 this.opts.enableTasksQueue = enable
383 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
384 }
385
386 /** @inheritDoc */
387 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
388 if (this.opts.enableTasksQueue === true) {
389 this.checkValidTasksQueueOptions(tasksQueueOptions)
390 this.opts.tasksQueueOptions =
391 this.buildTasksQueueOptions(tasksQueueOptions)
392 } else if (this.opts.tasksQueueOptions != null) {
393 delete this.opts.tasksQueueOptions
394 }
395 }
396
397 private buildTasksQueueOptions (
398 tasksQueueOptions: TasksQueueOptions
399 ): TasksQueueOptions {
400 return {
401 concurrency: tasksQueueOptions?.concurrency ?? 1
402 }
403 }
404
405 /**
406 * Whether the pool is full or not.
407 *
408 * The pool filling boolean status.
409 */
410 protected get full (): boolean {
411 return this.workerNodes.length >= this.maxSize
412 }
413
414 /**
415 * Whether the pool is busy or not.
416 *
417 * The pool busyness boolean status.
418 */
419 protected abstract get busy (): boolean
420
421 /**
422 * Whether worker nodes are executing at least one task.
423 *
424 * @returns Worker nodes busyness boolean status.
425 */
426 protected internalBusy (): boolean {
427 return (
428 this.workerNodes.findIndex(workerNode => {
429 return workerNode.usage.tasks.executing === 0
430 }) === -1
431 )
432 }
433
434 /** @inheritDoc */
435 public async execute (data?: Data, name?: string): Promise<Response> {
436 const timestamp = performance.now()
437 const workerNodeKey = this.chooseWorkerNode()
438 const submittedTask: Task<Data> = {
439 name,
440 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
441 data: data ?? ({} as Data),
442 timestamp,
443 id: crypto.randomUUID()
444 }
445 const res = new Promise<Response>((resolve, reject) => {
446 this.promiseResponseMap.set(submittedTask.id as string, {
447 resolve,
448 reject,
449 worker: this.workerNodes[workerNodeKey].worker
450 })
451 })
452 if (
453 this.opts.enableTasksQueue === true &&
454 (this.busy ||
455 this.workerNodes[workerNodeKey].usage.tasks.executing >=
456 ((this.opts.tasksQueueOptions as TasksQueueOptions)
457 .concurrency as number))
458 ) {
459 this.enqueueTask(workerNodeKey, submittedTask)
460 } else {
461 this.executeTask(workerNodeKey, submittedTask)
462 }
463 this.checkAndEmitEvents()
464 // eslint-disable-next-line @typescript-eslint/return-await
465 return res
466 }
467
468 /** @inheritDoc */
469 public async destroy (): Promise<void> {
470 await Promise.all(
471 this.workerNodes.map(async (workerNode, workerNodeKey) => {
472 this.flushTasksQueue(workerNodeKey)
473 // FIXME: wait for tasks to be finished
474 await this.destroyWorker(workerNode.worker)
475 })
476 )
477 }
478
479 /**
480 * Terminates the given worker.
481 *
482 * @param worker - A worker within `workerNodes`.
483 */
484 protected abstract destroyWorker (worker: Worker): void | Promise<void>
485
486 /**
487 * Setup hook to execute code before worker nodes are created in the abstract constructor.
488 * Can be overridden.
489 *
490 * @virtual
491 */
492 protected setupHook (): void {
493 // Intentionally empty
494 }
495
496 /**
497 * Should return whether the worker is the main worker or not.
498 */
499 protected abstract isMain (): boolean
500
501 /**
502 * Hook executed before the worker task execution.
503 * Can be overridden.
504 *
505 * @param workerNodeKey - The worker node key.
506 * @param task - The task to execute.
507 */
508 protected beforeTaskExecutionHook (
509 workerNodeKey: number,
510 task: Task<Data>
511 ): void {
512 const workerUsage = this.workerNodes[workerNodeKey].usage
513 ++workerUsage.tasks.executing
514 this.updateWaitTimeWorkerUsage(workerUsage, task)
515 }
516
517 /**
518 * Hook executed after the worker task execution.
519 * Can be overridden.
520 *
521 * @param worker - The worker.
522 * @param message - The received message.
523 */
524 protected afterTaskExecutionHook (
525 worker: Worker,
526 message: MessageValue<Response>
527 ): void {
528 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
529 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
530 this.updateRunTimeWorkerUsage(workerUsage, message)
531 this.updateEluWorkerUsage(workerUsage, message)
532 }
533
534 private updateTaskStatisticsWorkerUsage (
535 workerUsage: WorkerUsage,
536 message: MessageValue<Response>
537 ): void {
538 const workerTaskStatistics = workerUsage.tasks
539 --workerTaskStatistics.executing
540 ++workerTaskStatistics.executed
541 if (message.taskError != null) {
542 ++workerTaskStatistics.failed
543 }
544 }
545
546 private updateRunTimeWorkerUsage (
547 workerUsage: WorkerUsage,
548 message: MessageValue<Response>
549 ): void {
550 if (
551 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
552 .aggregate
553 ) {
554 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
555 if (
556 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
557 .average &&
558 workerUsage.tasks.executed !== 0
559 ) {
560 workerUsage.runTime.average =
561 workerUsage.runTime.aggregate /
562 (workerUsage.tasks.executed - workerUsage.tasks.failed)
563 }
564 if (
565 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
566 .median &&
567 message.taskPerformance?.runTime != null
568 ) {
569 workerUsage.runTime.history.push(message.taskPerformance.runTime)
570 workerUsage.runTime.median = median(workerUsage.runTime.history)
571 }
572 }
573 }
574
575 private updateWaitTimeWorkerUsage (
576 workerUsage: WorkerUsage,
577 task: Task<Data>
578 ): void {
579 const timestamp = performance.now()
580 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
581 if (
582 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
583 .aggregate
584 ) {
585 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
586 if (
587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
588 .waitTime.average &&
589 workerUsage.tasks.executed !== 0
590 ) {
591 workerUsage.waitTime.average =
592 workerUsage.waitTime.aggregate /
593 (workerUsage.tasks.executed - workerUsage.tasks.failed)
594 }
595 if (
596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
597 .waitTime.median &&
598 taskWaitTime != null
599 ) {
600 workerUsage.waitTime.history.push(taskWaitTime)
601 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
602 }
603 }
604 }
605
606 private updateEluWorkerUsage (
607 workerUsage: WorkerUsage,
608 message: MessageValue<Response>
609 ): void {
610 if (
611 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
612 .aggregate
613 ) {
614 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
615 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
616 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
617 workerUsage.elu.utilization =
618 (workerUsage.elu.utilization +
619 message.taskPerformance.elu.utilization) /
620 2
621 } else if (message.taskPerformance?.elu != null) {
622 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
623 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
624 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
625 }
626 if (
627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
628 .average &&
629 workerUsage.tasks.executed !== 0
630 ) {
631 const executedTasks =
632 workerUsage.tasks.executed - workerUsage.tasks.failed
633 workerUsage.elu.idle.average =
634 workerUsage.elu.idle.aggregate / executedTasks
635 workerUsage.elu.active.average =
636 workerUsage.elu.active.aggregate / executedTasks
637 }
638 if (
639 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
640 .median &&
641 message.taskPerformance?.elu != null
642 ) {
643 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
644 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
645 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
646 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
647 }
648 }
649 }
650
651 /**
652 * Chooses a worker node for the next task.
653 *
654 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
655 *
656 * @returns The worker node key
657 */
658 private chooseWorkerNode (): number {
659 if (this.shallCreateDynamicWorker()) {
660 const worker = this.createAndSetupDynamicWorker()
661 if (
662 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
663 ) {
664 return this.getWorkerNodeKey(worker)
665 }
666 }
667 return this.workerChoiceStrategyContext.execute()
668 }
669
670 /**
671 * Conditions for dynamic worker creation.
672 *
673 * @returns Whether to create a dynamic worker or not.
674 */
675 private shallCreateDynamicWorker (): boolean {
676 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
677 }
678
679 /**
680 * Sends a message to the given worker.
681 *
682 * @param worker - The worker which should receive the message.
683 * @param message - The message.
684 */
685 protected abstract sendToWorker (
686 worker: Worker,
687 message: MessageValue<Data>
688 ): void
689
690 /**
691 * Registers a listener callback on the given worker.
692 *
693 * @param worker - The worker which should register a listener.
694 * @param listener - The message listener callback.
695 */
696 private registerWorkerMessageListener<Message extends Data | Response>(
697 worker: Worker,
698 listener: (message: MessageValue<Message>) => void
699 ): void {
700 worker.on('message', listener as MessageHandler<Worker>)
701 }
702
703 /**
704 * Creates a new worker.
705 *
706 * @returns Newly created worker.
707 */
708 protected abstract createWorker (): Worker
709
710 /**
711 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
712 * Can be overridden.
713 *
714 * @param worker - The newly created worker.
715 */
716 protected afterWorkerSetup (worker: Worker): void {
717 // Listen to worker messages.
718 this.registerWorkerMessageListener(worker, this.workerListener())
719 }
720
721 /**
722 * Creates a new worker and sets it up completely in the pool worker nodes.
723 *
724 * @returns New, completely set up worker.
725 */
726 protected createAndSetupWorker (): Worker {
727 const worker = this.createWorker()
728
729 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
730 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
731 worker.on('error', error => {
732 if (this.emitter != null) {
733 this.emitter.emit(PoolEvents.error, error)
734 }
735 if (this.opts.restartWorkerOnError === true && !this.starting) {
736 this.createAndSetupWorker()
737 }
738 })
739 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
740 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
741 worker.once('exit', () => {
742 this.removeWorkerNode(worker)
743 })
744
745 this.pushWorkerNode(worker)
746
747 this.setWorkerStatistics(worker)
748
749 this.afterWorkerSetup(worker)
750
751 return worker
752 }
753
754 /**
755 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
756 *
757 * @returns New, completely set up dynamic worker.
758 */
759 protected createAndSetupDynamicWorker (): Worker {
760 const worker = this.createAndSetupWorker()
761 this.registerWorkerMessageListener(worker, message => {
762 const workerNodeKey = this.getWorkerNodeKey(worker)
763 if (
764 isKillBehavior(KillBehaviors.HARD, message.kill) ||
765 (message.kill != null &&
766 ((this.opts.enableTasksQueue === false &&
767 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
768 (this.opts.enableTasksQueue === true &&
769 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
770 this.tasksQueueSize(workerNodeKey) === 0)))
771 ) {
772 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
773 void (this.destroyWorker(worker) as Promise<void>)
774 }
775 })
776 return worker
777 }
778
779 /**
780 * This function is the listener registered for each worker message.
781 *
782 * @returns The listener function to execute when a message is received from a worker.
783 */
784 protected workerListener (): (message: MessageValue<Response>) => void {
785 return message => {
786 if (message.workerId != null && message.started != null) {
787 // Worker started message received
788 const worker = this.getWorkerById(message.workerId)
789 if (worker != null) {
790 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
791 message.started
792 } else {
793 throw new Error('Worker started message received from unknown worker')
794 }
795 } else if (message.id != null) {
796 // Task execution response received
797 const promiseResponse = this.promiseResponseMap.get(message.id)
798 if (promiseResponse != null) {
799 if (message.taskError != null) {
800 if (this.emitter != null) {
801 this.emitter.emit(PoolEvents.taskError, message.taskError)
802 }
803 promiseResponse.reject(message.taskError.message)
804 } else {
805 promiseResponse.resolve(message.data as Response)
806 }
807 this.afterTaskExecutionHook(promiseResponse.worker, message)
808 this.promiseResponseMap.delete(message.id)
809 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
810 if (
811 this.opts.enableTasksQueue === true &&
812 this.tasksQueueSize(workerNodeKey) > 0
813 ) {
814 this.executeTask(
815 workerNodeKey,
816 this.dequeueTask(workerNodeKey) as Task<Data>
817 )
818 }
819 this.workerChoiceStrategyContext.update(workerNodeKey)
820 }
821 }
822 }
823 }
824
825 private checkAndEmitEvents (): void {
826 if (this.emitter != null) {
827 if (this.busy) {
828 this.emitter?.emit(PoolEvents.busy, this.info)
829 }
830 if (this.type === PoolTypes.dynamic && this.full) {
831 this.emitter?.emit(PoolEvents.full, this.info)
832 }
833 }
834 }
835
836 /**
837 * Sets the given worker node its tasks usage in the pool.
838 *
839 * @param workerNode - The worker node.
840 * @param workerUsage - The worker usage.
841 */
842 private setWorkerNodeTasksUsage (
843 workerNode: WorkerNode<Worker, Data>,
844 workerUsage: WorkerUsage
845 ): void {
846 workerNode.usage = workerUsage
847 }
848
849 /**
850 * Pushes the given worker in the pool worker nodes.
851 *
852 * @param worker - The worker.
853 * @returns The worker nodes length.
854 */
855 private pushWorkerNode (worker: Worker): number {
856 this.workerNodes.push({
857 worker,
858 info: { id: this.getWorkerId(worker), started: false },
859 usage: this.getWorkerUsage(),
860 tasksQueue: new Queue<Task<Data>>()
861 })
862 const workerNodeKey = this.getWorkerNodeKey(worker)
863 this.setWorkerNodeTasksUsage(
864 this.workerNodes[workerNodeKey],
865 this.getWorkerUsage(workerNodeKey)
866 )
867 return this.workerNodes.length
868 }
869
870 /**
871 * Gets the worker id.
872 *
873 * @param worker - The worker.
874 * @returns The worker id.
875 */
876 private getWorkerId (worker: Worker): number | undefined {
877 if (this.worker === WorkerTypes.thread) {
878 return worker.threadId
879 } else if (this.worker === WorkerTypes.cluster) {
880 return worker.id
881 }
882 }
883
884 // /**
885 // * Sets the given worker in the pool worker nodes.
886 // *
887 // * @param workerNodeKey - The worker node key.
888 // * @param worker - The worker.
889 // * @param workerInfo - The worker info.
890 // * @param workerUsage - The worker usage.
891 // * @param tasksQueue - The worker task queue.
892 // */
893 // private setWorkerNode (
894 // workerNodeKey: number,
895 // worker: Worker,
896 // workerInfo: WorkerInfo,
897 // workerUsage: WorkerUsage,
898 // tasksQueue: Queue<Task<Data>>
899 // ): void {
900 // this.workerNodes[workerNodeKey] = {
901 // worker,
902 // info: workerInfo,
903 // usage: workerUsage,
904 // tasksQueue
905 // }
906 // }
907
908 /**
909 * Removes the given worker from the pool worker nodes.
910 *
911 * @param worker - The worker.
912 */
913 private removeWorkerNode (worker: Worker): void {
914 const workerNodeKey = this.getWorkerNodeKey(worker)
915 if (workerNodeKey !== -1) {
916 this.workerNodes.splice(workerNodeKey, 1)
917 this.workerChoiceStrategyContext.remove(workerNodeKey)
918 }
919 }
920
921 private executeTask (workerNodeKey: number, task: Task<Data>): void {
922 this.beforeTaskExecutionHook(workerNodeKey, task)
923 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
924 }
925
926 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
927 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
928 }
929
930 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
931 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
932 }
933
934 private tasksQueueSize (workerNodeKey: number): number {
935 return this.workerNodes[workerNodeKey].tasksQueue.size
936 }
937
938 private tasksMaxQueueSize (workerNodeKey: number): number {
939 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
940 }
941
942 private flushTasksQueue (workerNodeKey: number): void {
943 if (this.tasksQueueSize(workerNodeKey) > 0) {
944 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
945 this.executeTask(
946 workerNodeKey,
947 this.dequeueTask(workerNodeKey) as Task<Data>
948 )
949 }
950 }
951 this.workerNodes[workerNodeKey].tasksQueue.clear()
952 }
953
954 private flushTasksQueues (): void {
955 for (const [workerNodeKey] of this.workerNodes.entries()) {
956 this.flushTasksQueue(workerNodeKey)
957 }
958 }
959
960 private setWorkerStatistics (worker: Worker): void {
961 this.sendToWorker(worker, {
962 statistics: {
963 runTime:
964 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
965 .runTime.aggregate,
966 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
967 .elu.aggregate
968 }
969 })
970 }
971
972 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
973 const getTasksQueueSize = (workerNodeKey?: number): number => {
974 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
975 }
976 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
977 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
978 }
979 return {
980 tasks: {
981 executed: 0,
982 executing: 0,
983 get queued (): number {
984 return getTasksQueueSize(workerNodeKey)
985 },
986 get maxQueued (): number {
987 return getTasksMaxQueueSize(workerNodeKey)
988 },
989 failed: 0
990 },
991 runTime: {
992 aggregate: 0,
993 average: 0,
994 median: 0,
995 history: new CircularArray()
996 },
997 waitTime: {
998 aggregate: 0,
999 average: 0,
1000 median: 0,
1001 history: new CircularArray()
1002 },
1003 elu: {
1004 idle: {
1005 aggregate: 0,
1006 average: 0,
1007 median: 0,
1008 history: new CircularArray()
1009 },
1010 active: {
1011 aggregate: 0,
1012 average: 0,
1013 median: 0,
1014 history: new CircularArray()
1015 },
1016 utilization: 0
1017 }
1018 }
1019 }
1020 }