fix: fix version handling in pool information
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33 } from './worker'
34 import {
35 Measurements,
36 WorkerChoiceStrategies,
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
39 } from './selection-strategies/selection-strategies-types'
40 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
41 import { version } from './version'
42
43 /**
44 * Base class that implements some shared logic for all poolifier pools.
45 *
46 * @typeParam Worker - Type of worker which manages this pool.
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
49 */
50 export abstract class AbstractPool<
51 Worker extends IWorker,
52 Data = unknown,
53 Response = unknown
54 > implements IPool<Worker, Data, Response> {
55 /** @inheritDoc */
56 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
57
58 /** @inheritDoc */
59 public readonly emitter?: PoolEmitter
60
61 /**
62 * The execution response promise map.
63 *
64 * - `key`: The message id of each submitted task.
65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
66 *
67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
68 */
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
73
74 /**
75 * Worker choice strategy context referencing a worker choice algorithm implementation.
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78 Worker,
79 Data,
80 Response
81 >
82
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
88 /**
89 * Constructs a new poolifier pool.
90 *
91 * @param numberOfWorkers - Number of workers that this pool should manage.
92 * @param filePath - Path to the worker file.
93 * @param opts - Options for the pool.
94 */
95 public constructor (
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
99 ) {
100 if (!this.isMain()) {
101 throw new Error('Cannot start a pool from a worker!')
102 }
103 this.checkNumberOfWorkers(this.numberOfWorkers)
104 this.checkFilePath(this.filePath)
105 this.checkPoolOptions(this.opts)
106
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
111
112 if (this.opts.enableEvents === true) {
113 this.emitter = new PoolEmitter()
114 }
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
124
125 this.setupHook()
126
127 while (this.workerNodes.length < this.numberOfWorkers) {
128 this.createAndSetupWorker()
129 }
130
131 this.startTimestamp = performance.now()
132 }
133
134 private checkFilePath (filePath: string): void {
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
149 throw new TypeError(
150 'Cannot instantiate a pool with a non safe integer number of workers'
151 )
152 } else if (numberOfWorkers < 0) {
153 throw new RangeError(
154 'Cannot instantiate a pool with a negative number of workers'
155 )
156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
157 throw new Error('Cannot instantiate a fixed pool with no worker')
158 }
159 }
160
161 private checkPoolOptions (opts: PoolOptions<Worker>): void {
162 if (isPlainObject(opts)) {
163 this.opts.workerChoiceStrategy =
164 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
165 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
166 this.opts.workerChoiceStrategyOptions =
167 opts.workerChoiceStrategyOptions ??
168 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
169 this.checkValidWorkerChoiceStrategyOptions(
170 this.opts.workerChoiceStrategyOptions
171 )
172 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
173 this.opts.enableEvents = opts.enableEvents ?? true
174 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
175 if (this.opts.enableTasksQueue) {
176 this.checkValidTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
180 opts.tasksQueueOptions as TasksQueueOptions
181 )
182 }
183 } else {
184 throw new TypeError('Invalid pool options: must be a plain object')
185 }
186 }
187
188 private checkValidWorkerChoiceStrategy (
189 workerChoiceStrategy: WorkerChoiceStrategy
190 ): void {
191 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
192 throw new Error(
193 `Invalid worker choice strategy '${workerChoiceStrategy}'`
194 )
195 }
196 }
197
198 private checkValidWorkerChoiceStrategyOptions (
199 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
200 ): void {
201 if (!isPlainObject(workerChoiceStrategyOptions)) {
202 throw new TypeError(
203 'Invalid worker choice strategy options: must be a plain object'
204 )
205 }
206 if (
207 workerChoiceStrategyOptions.weights != null &&
208 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
209 ) {
210 throw new Error(
211 'Invalid worker choice strategy options: must have a weight for each worker node'
212 )
213 }
214 if (
215 workerChoiceStrategyOptions.measurement != null &&
216 !Object.values(Measurements).includes(
217 workerChoiceStrategyOptions.measurement
218 )
219 ) {
220 throw new Error(
221 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
222 )
223 }
224 }
225
226 private checkValidTasksQueueOptions (
227 tasksQueueOptions: TasksQueueOptions
228 ): void {
229 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
230 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 }
232 if (
233 tasksQueueOptions?.concurrency != null &&
234 !Number.isSafeInteger(tasksQueueOptions.concurrency)
235 ) {
236 throw new TypeError(
237 'Invalid worker tasks concurrency: must be an integer'
238 )
239 }
240 if (
241 tasksQueueOptions?.concurrency != null &&
242 tasksQueueOptions.concurrency <= 0
243 ) {
244 throw new Error(
245 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
246 )
247 }
248 }
249
250 /** @inheritDoc */
251 public get info (): PoolInfo {
252 return {
253 version,
254 type: this.type,
255 worker: this.worker,
256 minSize: this.minSize,
257 maxSize: this.maxSize,
258 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .runTime.aggregate &&
260 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
261 .waitTime.aggregate && { utilization: round(this.utilization) }),
262 workerNodes: this.workerNodes.length,
263 idleWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
265 workerNode.usage.tasks.executing === 0
266 ? accumulator + 1
267 : accumulator,
268 0
269 ),
270 busyWorkerNodes: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
272 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
273 0
274 ),
275 executedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.usage.tasks.executed,
278 0
279 ),
280 executingTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 accumulator + workerNode.usage.tasks.executing,
283 0
284 ),
285 queuedTasks: this.workerNodes.reduce(
286 (accumulator, workerNode) =>
287 accumulator + workerNode.usage.tasks.queued,
288 0
289 ),
290 maxQueuedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 accumulator + workerNode.usage.tasks.maxQueued,
293 0
294 ),
295 failedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 accumulator + workerNode.usage.tasks.failed,
298 0
299 )
300 }
301 }
302
303 /**
304 * Gets the pool run time.
305 *
306 * @returns The pool run time in milliseconds.
307 */
308 private get runTime (): number {
309 return performance.now() - this.startTimestamp
310 }
311
312 /**
313 * Gets the approximate pool utilization.
314 *
315 * @returns The pool utilization.
316 */
317 private get utilization (): number {
318 const poolRunTimeCapacity = this.runTime * this.maxSize
319 const totalTasksRunTime = this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.runTime.aggregate,
322 0
323 )
324 const totalTasksWaitTime = this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + workerNode.usage.waitTime.aggregate,
327 0
328 )
329 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
330 }
331
332 /**
333 * Pool type.
334 *
335 * If it is `'dynamic'`, it provides the `max` property.
336 */
337 protected abstract get type (): PoolType
338
339 /**
340 * Gets the worker type.
341 */
342 protected abstract get worker (): WorkerType
343
344 /**
345 * Pool minimum size.
346 */
347 protected abstract get minSize (): number
348
349 /**
350 * Pool maximum size.
351 */
352 protected abstract get maxSize (): number
353
354 /**
355 * Get the worker given its id.
356 *
357 * @param workerId - The worker id.
358 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
359 */
360 private getWorkerById (workerId: number): Worker | undefined {
361 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
362 ?.worker
363 }
364
365 /**
366 * Gets the given worker its worker node key.
367 *
368 * @param worker - The worker.
369 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
370 */
371 private getWorkerNodeKey (worker: Worker): number {
372 return this.workerNodes.findIndex(
373 workerNode => workerNode.worker === worker
374 )
375 }
376
377 /** @inheritDoc */
378 public setWorkerChoiceStrategy (
379 workerChoiceStrategy: WorkerChoiceStrategy,
380 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
381 ): void {
382 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
383 this.opts.workerChoiceStrategy = workerChoiceStrategy
384 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
385 this.opts.workerChoiceStrategy
386 )
387 if (workerChoiceStrategyOptions != null) {
388 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
389 }
390 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
391 this.setWorkerNodeTasksUsage(
392 workerNode,
393 this.getWorkerUsage(workerNodeKey)
394 )
395 this.setWorkerStatistics(workerNode.worker)
396 }
397 }
398
399 /** @inheritDoc */
400 public setWorkerChoiceStrategyOptions (
401 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
402 ): void {
403 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
404 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
405 this.workerChoiceStrategyContext.setOptions(
406 this.opts.workerChoiceStrategyOptions
407 )
408 }
409
410 /** @inheritDoc */
411 public enableTasksQueue (
412 enable: boolean,
413 tasksQueueOptions?: TasksQueueOptions
414 ): void {
415 if (this.opts.enableTasksQueue === true && !enable) {
416 this.flushTasksQueues()
417 }
418 this.opts.enableTasksQueue = enable
419 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
420 }
421
422 /** @inheritDoc */
423 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
424 if (this.opts.enableTasksQueue === true) {
425 this.checkValidTasksQueueOptions(tasksQueueOptions)
426 this.opts.tasksQueueOptions =
427 this.buildTasksQueueOptions(tasksQueueOptions)
428 } else if (this.opts.tasksQueueOptions != null) {
429 delete this.opts.tasksQueueOptions
430 }
431 }
432
433 private buildTasksQueueOptions (
434 tasksQueueOptions: TasksQueueOptions
435 ): TasksQueueOptions {
436 return {
437 concurrency: tasksQueueOptions?.concurrency ?? 1
438 }
439 }
440
441 /**
442 * Whether the pool is full or not.
443 *
444 * The pool filling boolean status.
445 */
446 protected get full (): boolean {
447 return this.workerNodes.length >= this.maxSize
448 }
449
450 /**
451 * Whether the pool is busy or not.
452 *
453 * The pool busyness boolean status.
454 */
455 protected abstract get busy (): boolean
456
457 /**
458 * Whether worker nodes are executing at least one task.
459 *
460 * @returns Worker nodes busyness boolean status.
461 */
462 protected internalBusy (): boolean {
463 return (
464 this.workerNodes.findIndex(workerNode => {
465 return workerNode.usage.tasks.executing === 0
466 }) === -1
467 )
468 }
469
470 /** @inheritDoc */
471 public async execute (data?: Data, name?: string): Promise<Response> {
472 const timestamp = performance.now()
473 const workerNodeKey = this.chooseWorkerNode()
474 const submittedTask: Task<Data> = {
475 name,
476 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
477 data: data ?? ({} as Data),
478 timestamp,
479 id: crypto.randomUUID()
480 }
481 const res = new Promise<Response>((resolve, reject) => {
482 this.promiseResponseMap.set(submittedTask.id as string, {
483 resolve,
484 reject,
485 worker: this.workerNodes[workerNodeKey].worker
486 })
487 })
488 if (
489 this.opts.enableTasksQueue === true &&
490 (this.busy ||
491 this.workerNodes[workerNodeKey].usage.tasks.executing >=
492 ((this.opts.tasksQueueOptions as TasksQueueOptions)
493 .concurrency as number))
494 ) {
495 this.enqueueTask(workerNodeKey, submittedTask)
496 } else {
497 this.executeTask(workerNodeKey, submittedTask)
498 }
499 this.checkAndEmitEvents()
500 // eslint-disable-next-line @typescript-eslint/return-await
501 return res
502 }
503
504 /** @inheritDoc */
505 public async destroy (): Promise<void> {
506 await Promise.all(
507 this.workerNodes.map(async (workerNode, workerNodeKey) => {
508 this.flushTasksQueue(workerNodeKey)
509 // FIXME: wait for tasks to be finished
510 const workerExitPromise = new Promise<void>(resolve => {
511 workerNode.worker.on('exit', () => {
512 resolve()
513 })
514 })
515 await this.destroyWorker(workerNode.worker)
516 await workerExitPromise
517 })
518 )
519 }
520
521 /**
522 * Terminates the given worker.
523 *
524 * @param worker - A worker within `workerNodes`.
525 */
526 protected abstract destroyWorker (worker: Worker): void | Promise<void>
527
528 /**
529 * Setup hook to execute code before worker nodes are created in the abstract constructor.
530 * Can be overridden.
531 *
532 * @virtual
533 */
534 protected setupHook (): void {
535 // Intentionally empty
536 }
537
538 /**
539 * Should return whether the worker is the main worker or not.
540 */
541 protected abstract isMain (): boolean
542
543 /**
544 * Hook executed before the worker task execution.
545 * Can be overridden.
546 *
547 * @param workerNodeKey - The worker node key.
548 * @param task - The task to execute.
549 */
550 protected beforeTaskExecutionHook (
551 workerNodeKey: number,
552 task: Task<Data>
553 ): void {
554 const workerUsage = this.workerNodes[workerNodeKey].usage
555 ++workerUsage.tasks.executing
556 this.updateWaitTimeWorkerUsage(workerUsage, task)
557 }
558
559 /**
560 * Hook executed after the worker task execution.
561 * Can be overridden.
562 *
563 * @param worker - The worker.
564 * @param message - The received message.
565 */
566 protected afterTaskExecutionHook (
567 worker: Worker,
568 message: MessageValue<Response>
569 ): void {
570 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
571 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
572 this.updateRunTimeWorkerUsage(workerUsage, message)
573 this.updateEluWorkerUsage(workerUsage, message)
574 }
575
576 private updateTaskStatisticsWorkerUsage (
577 workerUsage: WorkerUsage,
578 message: MessageValue<Response>
579 ): void {
580 const workerTaskStatistics = workerUsage.tasks
581 --workerTaskStatistics.executing
582 ++workerTaskStatistics.executed
583 if (message.taskError != null) {
584 ++workerTaskStatistics.failed
585 }
586 }
587
588 private updateRunTimeWorkerUsage (
589 workerUsage: WorkerUsage,
590 message: MessageValue<Response>
591 ): void {
592 if (
593 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
594 .aggregate
595 ) {
596 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
597 if (
598 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
599 .average &&
600 workerUsage.tasks.executed !== 0
601 ) {
602 workerUsage.runTime.average =
603 workerUsage.runTime.aggregate /
604 (workerUsage.tasks.executed - workerUsage.tasks.failed)
605 }
606 if (
607 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
608 .median &&
609 message.taskPerformance?.runTime != null
610 ) {
611 workerUsage.runTime.history.push(message.taskPerformance.runTime)
612 workerUsage.runTime.median = median(workerUsage.runTime.history)
613 }
614 }
615 }
616
617 private updateWaitTimeWorkerUsage (
618 workerUsage: WorkerUsage,
619 task: Task<Data>
620 ): void {
621 const timestamp = performance.now()
622 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
623 if (
624 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
625 .aggregate
626 ) {
627 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
628 if (
629 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
630 .waitTime.average &&
631 workerUsage.tasks.executed !== 0
632 ) {
633 workerUsage.waitTime.average =
634 workerUsage.waitTime.aggregate /
635 (workerUsage.tasks.executed - workerUsage.tasks.failed)
636 }
637 if (
638 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
639 .waitTime.median &&
640 taskWaitTime != null
641 ) {
642 workerUsage.waitTime.history.push(taskWaitTime)
643 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
644 }
645 }
646 }
647
648 private updateEluWorkerUsage (
649 workerUsage: WorkerUsage,
650 message: MessageValue<Response>
651 ): void {
652 if (
653 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
654 .aggregate
655 ) {
656 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
657 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
658 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
659 workerUsage.elu.utilization =
660 (workerUsage.elu.utilization +
661 message.taskPerformance.elu.utilization) /
662 2
663 } else if (message.taskPerformance?.elu != null) {
664 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
665 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
666 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
667 }
668 if (
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
670 .average &&
671 workerUsage.tasks.executed !== 0
672 ) {
673 const executedTasks =
674 workerUsage.tasks.executed - workerUsage.tasks.failed
675 workerUsage.elu.idle.average =
676 workerUsage.elu.idle.aggregate / executedTasks
677 workerUsage.elu.active.average =
678 workerUsage.elu.active.aggregate / executedTasks
679 }
680 if (
681 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
682 .median &&
683 message.taskPerformance?.elu != null
684 ) {
685 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
686 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
687 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
688 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
689 }
690 }
691 }
692
693 /**
694 * Chooses a worker node for the next task.
695 *
696 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
697 *
698 * @returns The worker node key
699 */
700 private chooseWorkerNode (): number {
701 if (this.shallCreateDynamicWorker()) {
702 const worker = this.createAndSetupDynamicWorker()
703 if (
704 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
705 ) {
706 return this.getWorkerNodeKey(worker)
707 }
708 }
709 return this.workerChoiceStrategyContext.execute()
710 }
711
712 /**
713 * Conditions for dynamic worker creation.
714 *
715 * @returns Whether to create a dynamic worker or not.
716 */
717 private shallCreateDynamicWorker (): boolean {
718 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
719 }
720
721 /**
722 * Sends a message to the given worker.
723 *
724 * @param worker - The worker which should receive the message.
725 * @param message - The message.
726 */
727 protected abstract sendToWorker (
728 worker: Worker,
729 message: MessageValue<Data>
730 ): void
731
732 /**
733 * Registers a listener callback on the given worker.
734 *
735 * @param worker - The worker which should register a listener.
736 * @param listener - The message listener callback.
737 */
738 private registerWorkerMessageListener<Message extends Data | Response>(
739 worker: Worker,
740 listener: (message: MessageValue<Message>) => void
741 ): void {
742 worker.on('message', listener as MessageHandler<Worker>)
743 }
744
745 /**
746 * Creates a new worker.
747 *
748 * @returns Newly created worker.
749 */
750 protected abstract createWorker (): Worker
751
752 /**
753 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
754 * Can be overridden.
755 *
756 * @param worker - The newly created worker.
757 */
758 protected afterWorkerSetup (worker: Worker): void {
759 // Listen to worker messages.
760 this.registerWorkerMessageListener(worker, this.workerListener())
761 }
762
763 /**
764 * Creates a new worker and sets it up completely in the pool worker nodes.
765 *
766 * @returns New, completely set up worker.
767 */
768 protected createAndSetupWorker (): Worker {
769 const worker = this.createWorker()
770
771 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
772 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
773 worker.on('error', error => {
774 if (this.emitter != null) {
775 this.emitter.emit(PoolEvents.error, error)
776 }
777 if (this.opts.enableTasksQueue === true) {
778 const workerNodeKey = this.getWorkerNodeKey(worker)
779 while (this.tasksQueueSize(workerNodeKey) > 0) {
780 let targetWorkerNodeKey: number = workerNodeKey
781 let minQueuedTasks = Infinity
782 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
783 if (
784 workerNodeId !== workerNodeKey &&
785 workerNode.usage.tasks.queued === 0
786 ) {
787 targetWorkerNodeKey = workerNodeId
788 break
789 }
790 if (
791 workerNodeId !== workerNodeKey &&
792 workerNode.usage.tasks.queued < minQueuedTasks
793 ) {
794 minQueuedTasks = workerNode.usage.tasks.queued
795 targetWorkerNodeKey = workerNodeId
796 }
797 }
798 this.enqueueTask(
799 targetWorkerNodeKey,
800 this.dequeueTask(workerNodeKey) as Task<Data>
801 )
802 }
803 }
804 if (this.opts.restartWorkerOnError === true) {
805 this.createAndSetupWorker()
806 }
807 })
808 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
809 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
810 worker.once('exit', () => {
811 this.removeWorkerNode(worker)
812 })
813
814 this.pushWorkerNode(worker)
815
816 this.setWorkerStatistics(worker)
817
818 this.afterWorkerSetup(worker)
819
820 return worker
821 }
822
823 /**
824 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
825 *
826 * @returns New, completely set up dynamic worker.
827 */
828 protected createAndSetupDynamicWorker (): Worker {
829 const worker = this.createAndSetupWorker()
830 this.registerWorkerMessageListener(worker, message => {
831 const workerNodeKey = this.getWorkerNodeKey(worker)
832 if (
833 isKillBehavior(KillBehaviors.HARD, message.kill) ||
834 (message.kill != null &&
835 ((this.opts.enableTasksQueue === false &&
836 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
837 (this.opts.enableTasksQueue === true &&
838 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
839 this.tasksQueueSize(workerNodeKey) === 0)))
840 ) {
841 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
842 void (this.destroyWorker(worker) as Promise<void>)
843 }
844 })
845 return worker
846 }
847
848 /**
849 * This function is the listener registered for each worker message.
850 *
851 * @returns The listener function to execute when a message is received from a worker.
852 */
853 protected workerListener (): (message: MessageValue<Response>) => void {
854 return message => {
855 if (message.workerId != null && message.started != null) {
856 // Worker started message received
857 this.handleWorkerStartedMessage(message)
858 } else if (message.id != null) {
859 // Task execution response received
860 this.handleTaskExecutionResponse(message)
861 }
862 }
863 }
864
865 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
866 // Worker started message received
867 const worker = this.getWorkerById(message.workerId as number)
868 if (worker != null) {
869 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
870 message.started as boolean
871 } else {
872 throw new Error(
873 `Worker started message received from unknown worker '${
874 message.workerId as number
875 }'`
876 )
877 }
878 }
879
880 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
881 const promiseResponse = this.promiseResponseMap.get(message.id as string)
882 if (promiseResponse != null) {
883 if (message.taskError != null) {
884 if (this.emitter != null) {
885 this.emitter.emit(PoolEvents.taskError, message.taskError)
886 }
887 promiseResponse.reject(message.taskError.message)
888 } else {
889 promiseResponse.resolve(message.data as Response)
890 }
891 this.afterTaskExecutionHook(promiseResponse.worker, message)
892 this.promiseResponseMap.delete(message.id as string)
893 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
894 if (
895 this.opts.enableTasksQueue === true &&
896 this.tasksQueueSize(workerNodeKey) > 0
897 ) {
898 this.executeTask(
899 workerNodeKey,
900 this.dequeueTask(workerNodeKey) as Task<Data>
901 )
902 }
903 this.workerChoiceStrategyContext.update(workerNodeKey)
904 }
905 }
906
907 private checkAndEmitEvents (): void {
908 if (this.emitter != null) {
909 if (this.busy) {
910 this.emitter?.emit(PoolEvents.busy, this.info)
911 }
912 if (this.type === PoolTypes.dynamic && this.full) {
913 this.emitter?.emit(PoolEvents.full, this.info)
914 }
915 }
916 }
917
918 /**
919 * Sets the given worker node its tasks usage in the pool.
920 *
921 * @param workerNode - The worker node.
922 * @param workerUsage - The worker usage.
923 */
924 private setWorkerNodeTasksUsage (
925 workerNode: WorkerNode<Worker, Data>,
926 workerUsage: WorkerUsage
927 ): void {
928 workerNode.usage = workerUsage
929 }
930
931 /**
932 * Pushes the given worker in the pool worker nodes.
933 *
934 * @param worker - The worker.
935 * @returns The worker nodes length.
936 */
937 private pushWorkerNode (worker: Worker): number {
938 this.workerNodes.push({
939 worker,
940 info: { id: this.getWorkerId(worker), started: true },
941 usage: this.getWorkerUsage(),
942 tasksQueue: new Queue<Task<Data>>()
943 })
944 const workerNodeKey = this.getWorkerNodeKey(worker)
945 this.setWorkerNodeTasksUsage(
946 this.workerNodes[workerNodeKey],
947 this.getWorkerUsage(workerNodeKey)
948 )
949 return this.workerNodes.length
950 }
951
952 /**
953 * Gets the worker id.
954 *
955 * @param worker - The worker.
956 * @returns The worker id.
957 */
958 private getWorkerId (worker: Worker): number | undefined {
959 if (this.worker === WorkerTypes.thread) {
960 return worker.threadId
961 } else if (this.worker === WorkerTypes.cluster) {
962 return worker.id
963 }
964 }
965
966 // /**
967 // * Sets the given worker in the pool worker nodes.
968 // *
969 // * @param workerNodeKey - The worker node key.
970 // * @param worker - The worker.
971 // * @param workerInfo - The worker info.
972 // * @param workerUsage - The worker usage.
973 // * @param tasksQueue - The worker task queue.
974 // */
975 // private setWorkerNode (
976 // workerNodeKey: number,
977 // worker: Worker,
978 // workerInfo: WorkerInfo,
979 // workerUsage: WorkerUsage,
980 // tasksQueue: Queue<Task<Data>>
981 // ): void {
982 // this.workerNodes[workerNodeKey] = {
983 // worker,
984 // info: workerInfo,
985 // usage: workerUsage,
986 // tasksQueue
987 // }
988 // }
989
990 /**
991 * Removes the given worker from the pool worker nodes.
992 *
993 * @param worker - The worker.
994 */
995 private removeWorkerNode (worker: Worker): void {
996 const workerNodeKey = this.getWorkerNodeKey(worker)
997 if (workerNodeKey !== -1) {
998 this.workerNodes.splice(workerNodeKey, 1)
999 this.workerChoiceStrategyContext.remove(workerNodeKey)
1000 }
1001 }
1002
1003 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1004 this.beforeTaskExecutionHook(workerNodeKey, task)
1005 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1006 }
1007
1008 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1009 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1010 }
1011
1012 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1013 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1014 }
1015
1016 private tasksQueueSize (workerNodeKey: number): number {
1017 return this.workerNodes[workerNodeKey].tasksQueue.size
1018 }
1019
1020 private tasksMaxQueueSize (workerNodeKey: number): number {
1021 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1022 }
1023
1024 private flushTasksQueue (workerNodeKey: number): void {
1025 while (this.tasksQueueSize(workerNodeKey) > 0) {
1026 this.executeTask(
1027 workerNodeKey,
1028 this.dequeueTask(workerNodeKey) as Task<Data>
1029 )
1030 }
1031 this.workerNodes[workerNodeKey].tasksQueue.clear()
1032 }
1033
1034 private flushTasksQueues (): void {
1035 for (const [workerNodeKey] of this.workerNodes.entries()) {
1036 this.flushTasksQueue(workerNodeKey)
1037 }
1038 }
1039
1040 private setWorkerStatistics (worker: Worker): void {
1041 this.sendToWorker(worker, {
1042 statistics: {
1043 runTime:
1044 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1045 .runTime.aggregate,
1046 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1047 .elu.aggregate
1048 }
1049 })
1050 }
1051
1052 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1053 const getTasksQueueSize = (workerNodeKey?: number): number => {
1054 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1055 }
1056 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1057 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1058 }
1059 return {
1060 tasks: {
1061 executed: 0,
1062 executing: 0,
1063 get queued (): number {
1064 return getTasksQueueSize(workerNodeKey)
1065 },
1066 get maxQueued (): number {
1067 return getTasksMaxQueueSize(workerNodeKey)
1068 },
1069 failed: 0
1070 },
1071 runTime: {
1072 aggregate: 0,
1073 average: 0,
1074 median: 0,
1075 history: new CircularArray()
1076 },
1077 waitTime: {
1078 aggregate: 0,
1079 average: 0,
1080 median: 0,
1081 history: new CircularArray()
1082 },
1083 elu: {
1084 idle: {
1085 aggregate: 0,
1086 average: 0,
1087 median: 0,
1088 history: new CircularArray()
1089 },
1090 active: {
1091 aggregate: 0,
1092 average: 0,
1093 median: 0,
1094 history: new CircularArray()
1095 },
1096 utilization: 0
1097 }
1098 }
1099 }
1100 }