feat: add minimum and maximum to internal measurements
[poolifier.git] / src / pools / abstract-pool.ts
1 import crypto from 'node:crypto'
2 import { performance } from 'node:perf_hooks'
3 import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4 import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11 } from '../utils'
12 import { KillBehaviors } from '../worker/worker-options'
13 import { CircularArray } from '../circular-array'
14 import { Queue } from '../queue'
15 import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26 } from './pool'
27 import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33 } from './worker'
34 import {
35 Measurements,
36 WorkerChoiceStrategies,
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
39 } from './selection-strategies/selection-strategies-types'
40 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
41 import { version } from './version'
42
43 /**
44 * Base class that implements some shared logic for all poolifier pools.
45 *
46 * @typeParam Worker - Type of worker which manages this pool.
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
49 */
50 export abstract class AbstractPool<
51 Worker extends IWorker,
52 Data = unknown,
53 Response = unknown
54 > implements IPool<Worker, Data, Response> {
55 /** @inheritDoc */
56 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
57
58 /** @inheritDoc */
59 public readonly emitter?: PoolEmitter
60
61 /**
62 * The execution response promise map.
63 *
64 * - `key`: The message id of each submitted task.
65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
66 *
67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
68 */
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
73
74 /**
75 * Worker choice strategy context referencing a worker choice algorithm implementation.
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78 Worker,
79 Data,
80 Response
81 >
82
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
88 /**
89 * Constructs a new poolifier pool.
90 *
91 * @param numberOfWorkers - Number of workers that this pool should manage.
92 * @param filePath - Path to the worker file.
93 * @param opts - Options for the pool.
94 */
95 public constructor (
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
99 ) {
100 if (!this.isMain()) {
101 throw new Error('Cannot start a pool from a worker!')
102 }
103 this.checkNumberOfWorkers(this.numberOfWorkers)
104 this.checkFilePath(this.filePath)
105 this.checkPoolOptions(this.opts)
106
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
111
112 if (this.opts.enableEvents === true) {
113 this.emitter = new PoolEmitter()
114 }
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
124
125 this.setupHook()
126
127 while (this.workerNodes.length < this.numberOfWorkers) {
128 this.createAndSetupWorker()
129 }
130
131 this.startTimestamp = performance.now()
132 }
133
134 private checkFilePath (filePath: string): void {
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
149 throw new TypeError(
150 'Cannot instantiate a pool with a non safe integer number of workers'
151 )
152 } else if (numberOfWorkers < 0) {
153 throw new RangeError(
154 'Cannot instantiate a pool with a negative number of workers'
155 )
156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
157 throw new Error('Cannot instantiate a fixed pool with no worker')
158 }
159 }
160
161 private checkPoolOptions (opts: PoolOptions<Worker>): void {
162 if (isPlainObject(opts)) {
163 this.opts.workerChoiceStrategy =
164 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
165 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
166 this.opts.workerChoiceStrategyOptions =
167 opts.workerChoiceStrategyOptions ??
168 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
169 this.checkValidWorkerChoiceStrategyOptions(
170 this.opts.workerChoiceStrategyOptions
171 )
172 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
173 this.opts.enableEvents = opts.enableEvents ?? true
174 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
175 if (this.opts.enableTasksQueue) {
176 this.checkValidTasksQueueOptions(
177 opts.tasksQueueOptions as TasksQueueOptions
178 )
179 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
180 opts.tasksQueueOptions as TasksQueueOptions
181 )
182 }
183 } else {
184 throw new TypeError('Invalid pool options: must be a plain object')
185 }
186 }
187
188 private checkValidWorkerChoiceStrategy (
189 workerChoiceStrategy: WorkerChoiceStrategy
190 ): void {
191 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
192 throw new Error(
193 `Invalid worker choice strategy '${workerChoiceStrategy}'`
194 )
195 }
196 }
197
198 private checkValidWorkerChoiceStrategyOptions (
199 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
200 ): void {
201 if (!isPlainObject(workerChoiceStrategyOptions)) {
202 throw new TypeError(
203 'Invalid worker choice strategy options: must be a plain object'
204 )
205 }
206 if (
207 workerChoiceStrategyOptions.weights != null &&
208 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
209 ) {
210 throw new Error(
211 'Invalid worker choice strategy options: must have a weight for each worker node'
212 )
213 }
214 if (
215 workerChoiceStrategyOptions.measurement != null &&
216 !Object.values(Measurements).includes(
217 workerChoiceStrategyOptions.measurement
218 )
219 ) {
220 throw new Error(
221 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
222 )
223 }
224 }
225
226 private checkValidTasksQueueOptions (
227 tasksQueueOptions: TasksQueueOptions
228 ): void {
229 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
230 throw new TypeError('Invalid tasks queue options: must be a plain object')
231 }
232 if (
233 tasksQueueOptions?.concurrency != null &&
234 !Number.isSafeInteger(tasksQueueOptions.concurrency)
235 ) {
236 throw new TypeError(
237 'Invalid worker tasks concurrency: must be an integer'
238 )
239 }
240 if (
241 tasksQueueOptions?.concurrency != null &&
242 tasksQueueOptions.concurrency <= 0
243 ) {
244 throw new Error(
245 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
246 )
247 }
248 }
249
250 /** @inheritDoc */
251 public get info (): PoolInfo {
252 return {
253 version,
254 type: this.type,
255 worker: this.worker,
256 minSize: this.minSize,
257 maxSize: this.maxSize,
258 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
259 .runTime.aggregate &&
260 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
261 .waitTime.aggregate && { utilization: round(this.utilization) }),
262 workerNodes: this.workerNodes.length,
263 idleWorkerNodes: this.workerNodes.reduce(
264 (accumulator, workerNode) =>
265 workerNode.usage.tasks.executing === 0
266 ? accumulator + 1
267 : accumulator,
268 0
269 ),
270 busyWorkerNodes: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
272 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
273 0
274 ),
275 executedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.usage.tasks.executed,
278 0
279 ),
280 executingTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 accumulator + workerNode.usage.tasks.executing,
283 0
284 ),
285 queuedTasks: this.workerNodes.reduce(
286 (accumulator, workerNode) =>
287 accumulator + workerNode.usage.tasks.queued,
288 0
289 ),
290 maxQueuedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 accumulator + workerNode.usage.tasks.maxQueued,
293 0
294 ),
295 failedTasks: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
297 accumulator + workerNode.usage.tasks.failed,
298 0
299 )
300 }
301 }
302
303 /**
304 * Gets the approximate pool utilization.
305 *
306 * @returns The pool utilization.
307 */
308 private get utilization (): number {
309 const poolRunTimeCapacity =
310 (performance.now() - this.startTimestamp) * this.maxSize
311 const totalTasksRunTime = this.workerNodes.reduce(
312 (accumulator, workerNode) =>
313 accumulator + workerNode.usage.runTime.aggregate,
314 0
315 )
316 const totalTasksWaitTime = this.workerNodes.reduce(
317 (accumulator, workerNode) =>
318 accumulator + workerNode.usage.waitTime.aggregate,
319 0
320 )
321 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
322 }
323
324 /**
325 * Pool type.
326 *
327 * If it is `'dynamic'`, it provides the `max` property.
328 */
329 protected abstract get type (): PoolType
330
331 /**
332 * Gets the worker type.
333 */
334 protected abstract get worker (): WorkerType
335
336 /**
337 * Pool minimum size.
338 */
339 protected abstract get minSize (): number
340
341 /**
342 * Pool maximum size.
343 */
344 protected abstract get maxSize (): number
345
346 /**
347 * Get the worker given its id.
348 *
349 * @param workerId - The worker id.
350 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
351 */
352 private getWorkerById (workerId: number): Worker | undefined {
353 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
354 ?.worker
355 }
356
357 /**
358 * Gets the given worker its worker node key.
359 *
360 * @param worker - The worker.
361 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
362 */
363 private getWorkerNodeKey (worker: Worker): number {
364 return this.workerNodes.findIndex(
365 workerNode => workerNode.worker === worker
366 )
367 }
368
369 /** @inheritDoc */
370 public setWorkerChoiceStrategy (
371 workerChoiceStrategy: WorkerChoiceStrategy,
372 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
373 ): void {
374 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
375 this.opts.workerChoiceStrategy = workerChoiceStrategy
376 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
377 this.opts.workerChoiceStrategy
378 )
379 if (workerChoiceStrategyOptions != null) {
380 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
381 }
382 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
383 this.setWorkerNodeTasksUsage(
384 workerNode,
385 this.getWorkerUsage(workerNodeKey)
386 )
387 this.setWorkerStatistics(workerNode.worker)
388 }
389 }
390
391 /** @inheritDoc */
392 public setWorkerChoiceStrategyOptions (
393 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
394 ): void {
395 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
396 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
397 this.workerChoiceStrategyContext.setOptions(
398 this.opts.workerChoiceStrategyOptions
399 )
400 }
401
402 /** @inheritDoc */
403 public enableTasksQueue (
404 enable: boolean,
405 tasksQueueOptions?: TasksQueueOptions
406 ): void {
407 if (this.opts.enableTasksQueue === true && !enable) {
408 this.flushTasksQueues()
409 }
410 this.opts.enableTasksQueue = enable
411 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
412 }
413
414 /** @inheritDoc */
415 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
416 if (this.opts.enableTasksQueue === true) {
417 this.checkValidTasksQueueOptions(tasksQueueOptions)
418 this.opts.tasksQueueOptions =
419 this.buildTasksQueueOptions(tasksQueueOptions)
420 } else if (this.opts.tasksQueueOptions != null) {
421 delete this.opts.tasksQueueOptions
422 }
423 }
424
425 private buildTasksQueueOptions (
426 tasksQueueOptions: TasksQueueOptions
427 ): TasksQueueOptions {
428 return {
429 concurrency: tasksQueueOptions?.concurrency ?? 1
430 }
431 }
432
433 /**
434 * Whether the pool is full or not.
435 *
436 * The pool filling boolean status.
437 */
438 protected get full (): boolean {
439 return this.workerNodes.length >= this.maxSize
440 }
441
442 /**
443 * Whether the pool is busy or not.
444 *
445 * The pool busyness boolean status.
446 */
447 protected abstract get busy (): boolean
448
449 /**
450 * Whether worker nodes are executing at least one task.
451 *
452 * @returns Worker nodes busyness boolean status.
453 */
454 protected internalBusy (): boolean {
455 return (
456 this.workerNodes.findIndex(workerNode => {
457 return workerNode.usage.tasks.executing === 0
458 }) === -1
459 )
460 }
461
462 /** @inheritDoc */
463 public async execute (data?: Data, name?: string): Promise<Response> {
464 const timestamp = performance.now()
465 const workerNodeKey = this.chooseWorkerNode()
466 const submittedTask: Task<Data> = {
467 name,
468 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
469 data: data ?? ({} as Data),
470 timestamp,
471 id: crypto.randomUUID()
472 }
473 const res = new Promise<Response>((resolve, reject) => {
474 this.promiseResponseMap.set(submittedTask.id as string, {
475 resolve,
476 reject,
477 worker: this.workerNodes[workerNodeKey].worker
478 })
479 })
480 if (
481 this.opts.enableTasksQueue === true &&
482 (this.busy ||
483 this.workerNodes[workerNodeKey].usage.tasks.executing >=
484 ((this.opts.tasksQueueOptions as TasksQueueOptions)
485 .concurrency as number))
486 ) {
487 this.enqueueTask(workerNodeKey, submittedTask)
488 } else {
489 this.executeTask(workerNodeKey, submittedTask)
490 }
491 this.checkAndEmitEvents()
492 // eslint-disable-next-line @typescript-eslint/return-await
493 return res
494 }
495
496 /** @inheritDoc */
497 public async destroy (): Promise<void> {
498 await Promise.all(
499 this.workerNodes.map(async (workerNode, workerNodeKey) => {
500 this.flushTasksQueue(workerNodeKey)
501 // FIXME: wait for tasks to be finished
502 const workerExitPromise = new Promise<void>(resolve => {
503 workerNode.worker.on('exit', () => {
504 resolve()
505 })
506 })
507 await this.destroyWorker(workerNode.worker)
508 await workerExitPromise
509 })
510 )
511 }
512
513 /**
514 * Terminates the given worker.
515 *
516 * @param worker - A worker within `workerNodes`.
517 */
518 protected abstract destroyWorker (worker: Worker): void | Promise<void>
519
520 /**
521 * Setup hook to execute code before worker nodes are created in the abstract constructor.
522 * Can be overridden.
523 *
524 * @virtual
525 */
526 protected setupHook (): void {
527 // Intentionally empty
528 }
529
530 /**
531 * Should return whether the worker is the main worker or not.
532 */
533 protected abstract isMain (): boolean
534
535 /**
536 * Hook executed before the worker task execution.
537 * Can be overridden.
538 *
539 * @param workerNodeKey - The worker node key.
540 * @param task - The task to execute.
541 */
542 protected beforeTaskExecutionHook (
543 workerNodeKey: number,
544 task: Task<Data>
545 ): void {
546 const workerUsage = this.workerNodes[workerNodeKey].usage
547 ++workerUsage.tasks.executing
548 this.updateWaitTimeWorkerUsage(workerUsage, task)
549 }
550
551 /**
552 * Hook executed after the worker task execution.
553 * Can be overridden.
554 *
555 * @param worker - The worker.
556 * @param message - The received message.
557 */
558 protected afterTaskExecutionHook (
559 worker: Worker,
560 message: MessageValue<Response>
561 ): void {
562 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
563 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
564 this.updateRunTimeWorkerUsage(workerUsage, message)
565 this.updateEluWorkerUsage(workerUsage, message)
566 }
567
568 private updateTaskStatisticsWorkerUsage (
569 workerUsage: WorkerUsage,
570 message: MessageValue<Response>
571 ): void {
572 const workerTaskStatistics = workerUsage.tasks
573 --workerTaskStatistics.executing
574 ++workerTaskStatistics.executed
575 if (message.taskError != null) {
576 ++workerTaskStatistics.failed
577 }
578 }
579
580 private updateRunTimeWorkerUsage (
581 workerUsage: WorkerUsage,
582 message: MessageValue<Response>
583 ): void {
584 if (
585 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
586 .aggregate
587 ) {
588 const taskRunTime = message.taskPerformance?.runTime ?? 0
589 workerUsage.runTime.aggregate += taskRunTime
590 workerUsage.runTime.minimum = Math.min(
591 taskRunTime,
592 workerUsage.runTime.minimum
593 )
594 workerUsage.runTime.maximum = Math.max(
595 taskRunTime,
596 workerUsage.runTime.maximum
597 )
598 if (
599 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
600 .average &&
601 workerUsage.tasks.executed !== 0
602 ) {
603 workerUsage.runTime.average =
604 workerUsage.runTime.aggregate /
605 (workerUsage.tasks.executed - workerUsage.tasks.failed)
606 }
607 if (
608 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
609 .median &&
610 message.taskPerformance?.runTime != null
611 ) {
612 workerUsage.runTime.history.push(message.taskPerformance.runTime)
613 workerUsage.runTime.median = median(workerUsage.runTime.history)
614 }
615 }
616 }
617
618 private updateWaitTimeWorkerUsage (
619 workerUsage: WorkerUsage,
620 task: Task<Data>
621 ): void {
622 const timestamp = performance.now()
623 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
624 if (
625 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
626 .aggregate
627 ) {
628 workerUsage.waitTime.aggregate += taskWaitTime
629 workerUsage.waitTime.minimum = Math.min(
630 taskWaitTime,
631 workerUsage.waitTime.minimum
632 )
633 workerUsage.waitTime.maximum = Math.max(
634 taskWaitTime,
635 workerUsage.waitTime.maximum
636 )
637 if (
638 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
639 .waitTime.average &&
640 workerUsage.tasks.executed !== 0
641 ) {
642 workerUsage.waitTime.average =
643 workerUsage.waitTime.aggregate /
644 (workerUsage.tasks.executed - workerUsage.tasks.failed)
645 }
646 if (
647 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
648 .waitTime.median &&
649 taskWaitTime != null
650 ) {
651 workerUsage.waitTime.history.push(taskWaitTime)
652 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
653 }
654 }
655 }
656
657 private updateEluWorkerUsage (
658 workerUsage: WorkerUsage,
659 message: MessageValue<Response>
660 ): void {
661 if (
662 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
663 .aggregate
664 ) {
665 if (message.taskPerformance?.elu != null) {
666 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
667 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
668 if (workerUsage.elu.utilization != null) {
669 workerUsage.elu.utilization =
670 (workerUsage.elu.utilization +
671 message.taskPerformance.elu.utilization) /
672 2
673 } else {
674 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
675 }
676 workerUsage.elu.idle.minimum = Math.min(
677 message.taskPerformance.elu.idle,
678 workerUsage.elu.idle.minimum
679 )
680 workerUsage.elu.idle.maximum = Math.max(
681 message.taskPerformance.elu.idle,
682 workerUsage.elu.idle.maximum
683 )
684 workerUsage.elu.active.minimum = Math.min(
685 message.taskPerformance.elu.active,
686 workerUsage.elu.active.minimum
687 )
688 workerUsage.elu.active.maximum = Math.max(
689 message.taskPerformance.elu.active,
690 workerUsage.elu.active.maximum
691 )
692 }
693 if (
694 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
695 .average &&
696 workerUsage.tasks.executed !== 0
697 ) {
698 const executedTasks =
699 workerUsage.tasks.executed - workerUsage.tasks.failed
700 workerUsage.elu.idle.average =
701 workerUsage.elu.idle.aggregate / executedTasks
702 workerUsage.elu.active.average =
703 workerUsage.elu.active.aggregate / executedTasks
704 }
705 if (
706 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
707 .median &&
708 message.taskPerformance?.elu != null
709 ) {
710 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
711 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
712 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
713 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
714 }
715 }
716 }
717
718 /**
719 * Chooses a worker node for the next task.
720 *
721 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
722 *
723 * @returns The worker node key
724 */
725 private chooseWorkerNode (): number {
726 if (this.shallCreateDynamicWorker()) {
727 const worker = this.createAndSetupDynamicWorker()
728 if (
729 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
730 ) {
731 return this.getWorkerNodeKey(worker)
732 }
733 }
734 return this.workerChoiceStrategyContext.execute()
735 }
736
737 /**
738 * Conditions for dynamic worker creation.
739 *
740 * @returns Whether to create a dynamic worker or not.
741 */
742 private shallCreateDynamicWorker (): boolean {
743 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
744 }
745
746 /**
747 * Sends a message to the given worker.
748 *
749 * @param worker - The worker which should receive the message.
750 * @param message - The message.
751 */
752 protected abstract sendToWorker (
753 worker: Worker,
754 message: MessageValue<Data>
755 ): void
756
757 /**
758 * Registers a listener callback on the given worker.
759 *
760 * @param worker - The worker which should register a listener.
761 * @param listener - The message listener callback.
762 */
763 private registerWorkerMessageListener<Message extends Data | Response>(
764 worker: Worker,
765 listener: (message: MessageValue<Message>) => void
766 ): void {
767 worker.on('message', listener as MessageHandler<Worker>)
768 }
769
770 /**
771 * Creates a new worker.
772 *
773 * @returns Newly created worker.
774 */
775 protected abstract createWorker (): Worker
776
777 /**
778 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
779 * Can be overridden.
780 *
781 * @param worker - The newly created worker.
782 */
783 protected afterWorkerSetup (worker: Worker): void {
784 // Listen to worker messages.
785 this.registerWorkerMessageListener(worker, this.workerListener())
786 }
787
788 /**
789 * Creates a new worker and sets it up completely in the pool worker nodes.
790 *
791 * @returns New, completely set up worker.
792 */
793 protected createAndSetupWorker (): Worker {
794 const worker = this.createWorker()
795
796 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
797 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
798 worker.on('error', error => {
799 if (this.emitter != null) {
800 this.emitter.emit(PoolEvents.error, error)
801 }
802 if (this.opts.enableTasksQueue === true) {
803 const workerNodeKey = this.getWorkerNodeKey(worker)
804 while (this.tasksQueueSize(workerNodeKey) > 0) {
805 let targetWorkerNodeKey: number = workerNodeKey
806 let minQueuedTasks = Infinity
807 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
808 if (
809 workerNodeId !== workerNodeKey &&
810 workerNode.usage.tasks.queued === 0
811 ) {
812 targetWorkerNodeKey = workerNodeId
813 break
814 }
815 if (
816 workerNodeId !== workerNodeKey &&
817 workerNode.usage.tasks.queued < minQueuedTasks
818 ) {
819 minQueuedTasks = workerNode.usage.tasks.queued
820 targetWorkerNodeKey = workerNodeId
821 }
822 }
823 this.enqueueTask(
824 targetWorkerNodeKey,
825 this.dequeueTask(workerNodeKey) as Task<Data>
826 )
827 }
828 }
829 if (this.opts.restartWorkerOnError === true) {
830 this.createAndSetupWorker()
831 }
832 })
833 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
834 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
835 worker.once('exit', () => {
836 this.removeWorkerNode(worker)
837 })
838
839 this.pushWorkerNode(worker)
840
841 this.setWorkerStatistics(worker)
842
843 this.afterWorkerSetup(worker)
844
845 return worker
846 }
847
848 /**
849 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
850 *
851 * @returns New, completely set up dynamic worker.
852 */
853 protected createAndSetupDynamicWorker (): Worker {
854 const worker = this.createAndSetupWorker()
855 this.registerWorkerMessageListener(worker, message => {
856 const workerNodeKey = this.getWorkerNodeKey(worker)
857 if (
858 isKillBehavior(KillBehaviors.HARD, message.kill) ||
859 (message.kill != null &&
860 ((this.opts.enableTasksQueue === false &&
861 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
862 (this.opts.enableTasksQueue === true &&
863 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
864 this.tasksQueueSize(workerNodeKey) === 0)))
865 ) {
866 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
867 void (this.destroyWorker(worker) as Promise<void>)
868 }
869 })
870 return worker
871 }
872
873 /**
874 * This function is the listener registered for each worker message.
875 *
876 * @returns The listener function to execute when a message is received from a worker.
877 */
878 protected workerListener (): (message: MessageValue<Response>) => void {
879 return message => {
880 if (message.workerId != null && message.started != null) {
881 // Worker started message received
882 this.handleWorkerStartedMessage(message)
883 } else if (message.id != null) {
884 // Task execution response received
885 this.handleTaskExecutionResponse(message)
886 }
887 }
888 }
889
890 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
891 // Worker started message received
892 const worker = this.getWorkerById(message.workerId as number)
893 if (worker != null) {
894 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
895 message.started as boolean
896 } else {
897 throw new Error(
898 `Worker started message received from unknown worker '${
899 message.workerId as number
900 }'`
901 )
902 }
903 }
904
905 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
906 const promiseResponse = this.promiseResponseMap.get(message.id as string)
907 if (promiseResponse != null) {
908 if (message.taskError != null) {
909 if (this.emitter != null) {
910 this.emitter.emit(PoolEvents.taskError, message.taskError)
911 }
912 promiseResponse.reject(message.taskError.message)
913 } else {
914 promiseResponse.resolve(message.data as Response)
915 }
916 this.afterTaskExecutionHook(promiseResponse.worker, message)
917 this.promiseResponseMap.delete(message.id as string)
918 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
919 if (
920 this.opts.enableTasksQueue === true &&
921 this.tasksQueueSize(workerNodeKey) > 0
922 ) {
923 this.executeTask(
924 workerNodeKey,
925 this.dequeueTask(workerNodeKey) as Task<Data>
926 )
927 }
928 this.workerChoiceStrategyContext.update(workerNodeKey)
929 }
930 }
931
932 private checkAndEmitEvents (): void {
933 if (this.emitter != null) {
934 if (this.busy) {
935 this.emitter?.emit(PoolEvents.busy, this.info)
936 }
937 if (this.type === PoolTypes.dynamic && this.full) {
938 this.emitter?.emit(PoolEvents.full, this.info)
939 }
940 }
941 }
942
943 /**
944 * Sets the given worker node its tasks usage in the pool.
945 *
946 * @param workerNode - The worker node.
947 * @param workerUsage - The worker usage.
948 */
949 private setWorkerNodeTasksUsage (
950 workerNode: WorkerNode<Worker, Data>,
951 workerUsage: WorkerUsage
952 ): void {
953 workerNode.usage = workerUsage
954 }
955
956 /**
957 * Pushes the given worker in the pool worker nodes.
958 *
959 * @param worker - The worker.
960 * @returns The worker nodes length.
961 */
962 private pushWorkerNode (worker: Worker): number {
963 this.workerNodes.push({
964 worker,
965 info: { id: this.getWorkerId(worker), started: true },
966 usage: this.getWorkerUsage(),
967 tasksQueue: new Queue<Task<Data>>()
968 })
969 const workerNodeKey = this.getWorkerNodeKey(worker)
970 this.setWorkerNodeTasksUsage(
971 this.workerNodes[workerNodeKey],
972 this.getWorkerUsage(workerNodeKey)
973 )
974 return this.workerNodes.length
975 }
976
977 /**
978 * Gets the worker id.
979 *
980 * @param worker - The worker.
981 * @returns The worker id.
982 */
983 private getWorkerId (worker: Worker): number | undefined {
984 if (this.worker === WorkerTypes.thread) {
985 return worker.threadId
986 } else if (this.worker === WorkerTypes.cluster) {
987 return worker.id
988 }
989 }
990
991 // /**
992 // * Sets the given worker in the pool worker nodes.
993 // *
994 // * @param workerNodeKey - The worker node key.
995 // * @param worker - The worker.
996 // * @param workerInfo - The worker info.
997 // * @param workerUsage - The worker usage.
998 // * @param tasksQueue - The worker task queue.
999 // */
1000 // private setWorkerNode (
1001 // workerNodeKey: number,
1002 // worker: Worker,
1003 // workerInfo: WorkerInfo,
1004 // workerUsage: WorkerUsage,
1005 // tasksQueue: Queue<Task<Data>>
1006 // ): void {
1007 // this.workerNodes[workerNodeKey] = {
1008 // worker,
1009 // info: workerInfo,
1010 // usage: workerUsage,
1011 // tasksQueue
1012 // }
1013 // }
1014
1015 /**
1016 * Removes the given worker from the pool worker nodes.
1017 *
1018 * @param worker - The worker.
1019 */
1020 private removeWorkerNode (worker: Worker): void {
1021 const workerNodeKey = this.getWorkerNodeKey(worker)
1022 if (workerNodeKey !== -1) {
1023 this.workerNodes.splice(workerNodeKey, 1)
1024 this.workerChoiceStrategyContext.remove(workerNodeKey)
1025 }
1026 }
1027
1028 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1029 this.beforeTaskExecutionHook(workerNodeKey, task)
1030 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1031 }
1032
1033 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
1034 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
1035 }
1036
1037 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
1038 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
1039 }
1040
1041 private tasksQueueSize (workerNodeKey: number): number {
1042 return this.workerNodes[workerNodeKey].tasksQueue.size
1043 }
1044
1045 private tasksMaxQueueSize (workerNodeKey: number): number {
1046 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1047 }
1048
1049 private flushTasksQueue (workerNodeKey: number): void {
1050 while (this.tasksQueueSize(workerNodeKey) > 0) {
1051 this.executeTask(
1052 workerNodeKey,
1053 this.dequeueTask(workerNodeKey) as Task<Data>
1054 )
1055 }
1056 this.workerNodes[workerNodeKey].tasksQueue.clear()
1057 }
1058
1059 private flushTasksQueues (): void {
1060 for (const [workerNodeKey] of this.workerNodes.entries()) {
1061 this.flushTasksQueue(workerNodeKey)
1062 }
1063 }
1064
1065 private setWorkerStatistics (worker: Worker): void {
1066 this.sendToWorker(worker, {
1067 statistics: {
1068 runTime:
1069 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1070 .runTime.aggregate,
1071 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1072 .elu.aggregate
1073 }
1074 })
1075 }
1076
1077 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1078 const getTasksQueueSize = (workerNodeKey?: number): number => {
1079 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1080 }
1081 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1082 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1083 }
1084 return {
1085 tasks: {
1086 executed: 0,
1087 executing: 0,
1088 get queued (): number {
1089 return getTasksQueueSize(workerNodeKey)
1090 },
1091 get maxQueued (): number {
1092 return getTasksMaxQueueSize(workerNodeKey)
1093 },
1094 failed: 0
1095 },
1096 runTime: {
1097 aggregate: 0,
1098 maximum: 0,
1099 minimum: 0,
1100 average: 0,
1101 median: 0,
1102 history: new CircularArray()
1103 },
1104 waitTime: {
1105 aggregate: 0,
1106 maximum: 0,
1107 minimum: 0,
1108 average: 0,
1109 median: 0,
1110 history: new CircularArray()
1111 },
1112 elu: {
1113 idle: {
1114 aggregate: 0,
1115 maximum: 0,
1116 minimum: 0,
1117 average: 0,
1118 median: 0,
1119 history: new CircularArray()
1120 },
1121 active: {
1122 aggregate: 0,
1123 maximum: 0,
1124 minimum: 0,
1125 average: 0,
1126 median: 0,
1127 history: new CircularArray()
1128 }
1129 }
1130 }
1131 }
1132 }