chore: generate documentation
[poolifier.git] / src / pools / abstract-pool.ts
... / ...
CommitLineData
1import crypto from 'node:crypto'
2import { performance } from 'node:perf_hooks'
3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
7 isKillBehavior,
8 isPlainObject,
9 median,
10 round
11} from '../utils'
12import { KillBehaviors } from '../worker/worker-options'
13import { CircularArray } from '../circular-array'
14import { Queue } from '../queue'
15import {
16 type IPool,
17 PoolEmitter,
18 PoolEvents,
19 type PoolInfo,
20 type PoolOptions,
21 type PoolType,
22 PoolTypes,
23 type TasksQueueOptions,
24 type WorkerType,
25 WorkerTypes
26} from './pool'
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
31 WorkerNode,
32 WorkerUsage
33} from './worker'
34import {
35 Measurements,
36 WorkerChoiceStrategies,
37 type WorkerChoiceStrategy,
38 type WorkerChoiceStrategyOptions
39} from './selection-strategies/selection-strategies-types'
40import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
41
42/**
43 * Base class that implements some shared logic for all poolifier pools.
44 *
45 * @typeParam Worker - Type of worker which manages this pool.
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
48 */
49export abstract class AbstractPool<
50 Worker extends IWorker,
51 Data = unknown,
52 Response = unknown
53> implements IPool<Worker, Data, Response> {
54 /** @inheritDoc */
55 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
56
57 /** @inheritDoc */
58 public readonly emitter?: PoolEmitter
59
60 /**
61 * The execution response promise map.
62 *
63 * - `key`: The message id of each submitted task.
64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
65 *
66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
67 */
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
72
73 /**
74 * Worker choice strategy context referencing a worker choice algorithm implementation.
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
77 Worker,
78 Data,
79 Response
80 >
81
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
87 /**
88 * Constructs a new poolifier pool.
89 *
90 * @param numberOfWorkers - Number of workers that this pool should manage.
91 * @param filePath - Path to the worker file.
92 * @param opts - Options for the pool.
93 */
94 public constructor (
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
98 ) {
99 if (!this.isMain()) {
100 throw new Error('Cannot start a pool from a worker!')
101 }
102 this.checkNumberOfWorkers(this.numberOfWorkers)
103 this.checkFilePath(this.filePath)
104 this.checkPoolOptions(this.opts)
105
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
110
111 if (this.opts.enableEvents === true) {
112 this.emitter = new PoolEmitter()
113 }
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
123
124 this.setupHook()
125
126 while (this.workerNodes.length < this.numberOfWorkers) {
127 this.createAndSetupWorker()
128 }
129
130 this.startTimestamp = performance.now()
131 }
132
133 private checkFilePath (filePath: string): void {
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
148 throw new TypeError(
149 'Cannot instantiate a pool with a non safe integer number of workers'
150 )
151 } else if (numberOfWorkers < 0) {
152 throw new RangeError(
153 'Cannot instantiate a pool with a negative number of workers'
154 )
155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
156 throw new Error('Cannot instantiate a fixed pool with no worker')
157 }
158 }
159
160 private checkPoolOptions (opts: PoolOptions<Worker>): void {
161 if (isPlainObject(opts)) {
162 this.opts.workerChoiceStrategy =
163 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
164 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
165 this.opts.workerChoiceStrategyOptions =
166 opts.workerChoiceStrategyOptions ??
167 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
168 this.checkValidWorkerChoiceStrategyOptions(
169 this.opts.workerChoiceStrategyOptions
170 )
171 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
172 this.opts.enableEvents = opts.enableEvents ?? true
173 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
174 if (this.opts.enableTasksQueue) {
175 this.checkValidTasksQueueOptions(
176 opts.tasksQueueOptions as TasksQueueOptions
177 )
178 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
179 opts.tasksQueueOptions as TasksQueueOptions
180 )
181 }
182 } else {
183 throw new TypeError('Invalid pool options: must be a plain object')
184 }
185 }
186
187 private checkValidWorkerChoiceStrategy (
188 workerChoiceStrategy: WorkerChoiceStrategy
189 ): void {
190 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
191 throw new Error(
192 `Invalid worker choice strategy '${workerChoiceStrategy}'`
193 )
194 }
195 }
196
197 private checkValidWorkerChoiceStrategyOptions (
198 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
199 ): void {
200 if (!isPlainObject(workerChoiceStrategyOptions)) {
201 throw new TypeError(
202 'Invalid worker choice strategy options: must be a plain object'
203 )
204 }
205 if (
206 workerChoiceStrategyOptions.weights != null &&
207 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
208 ) {
209 throw new Error(
210 'Invalid worker choice strategy options: must have a weight for each worker node'
211 )
212 }
213 if (
214 workerChoiceStrategyOptions.measurement != null &&
215 !Object.values(Measurements).includes(
216 workerChoiceStrategyOptions.measurement
217 )
218 ) {
219 throw new Error(
220 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
221 )
222 }
223 }
224
225 private checkValidTasksQueueOptions (
226 tasksQueueOptions: TasksQueueOptions
227 ): void {
228 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
229 throw new TypeError('Invalid tasks queue options: must be a plain object')
230 }
231 if (
232 tasksQueueOptions?.concurrency != null &&
233 !Number.isSafeInteger(tasksQueueOptions.concurrency)
234 ) {
235 throw new TypeError(
236 'Invalid worker tasks concurrency: must be an integer'
237 )
238 }
239 if (
240 tasksQueueOptions?.concurrency != null &&
241 tasksQueueOptions.concurrency <= 0
242 ) {
243 throw new Error(
244 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
245 )
246 }
247 }
248
249 /** @inheritDoc */
250 public get info (): PoolInfo {
251 return {
252 type: this.type,
253 worker: this.worker,
254 minSize: this.minSize,
255 maxSize: this.maxSize,
256 utilization: round(this.utilization),
257 workerNodes: this.workerNodes.length,
258 idleWorkerNodes: this.workerNodes.reduce(
259 (accumulator, workerNode) =>
260 workerNode.usage.tasks.executing === 0
261 ? accumulator + 1
262 : accumulator,
263 0
264 ),
265 busyWorkerNodes: this.workerNodes.reduce(
266 (accumulator, workerNode) =>
267 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
268 0
269 ),
270 executedTasks: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
272 accumulator + workerNode.usage.tasks.executed,
273 0
274 ),
275 executingTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.usage.tasks.executing,
278 0
279 ),
280 queuedTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 accumulator + workerNode.usage.tasks.queued,
283 0
284 ),
285 maxQueuedTasks: this.workerNodes.reduce(
286 (accumulator, workerNode) =>
287 accumulator + workerNode.usage.tasks.maxQueued,
288 0
289 ),
290 failedTasks: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
292 accumulator + workerNode.usage.tasks.failed,
293 0
294 )
295 }
296 }
297
298 /**
299 * Gets the pool run time.
300 *
301 * @returns The pool run time in milliseconds.
302 */
303 private get runTime (): number {
304 return performance.now() - this.startTimestamp
305 }
306
307 /**
308 * Gets the approximate pool utilization.
309 *
310 * @returns The pool utilization.
311 */
312 private get utilization (): number {
313 const poolRunTimeCapacity = this.runTime * this.maxSize
314 const totalTasksRunTime = this.workerNodes.reduce(
315 (accumulator, workerNode) =>
316 accumulator + workerNode.usage.runTime.aggregate,
317 0
318 )
319 const totalTasksWaitTime = this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + workerNode.usage.waitTime.aggregate,
322 0
323 )
324 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
325 }
326
327 /**
328 * Pool type.
329 *
330 * If it is `'dynamic'`, it provides the `max` property.
331 */
332 protected abstract get type (): PoolType
333
334 /**
335 * Gets the worker type.
336 */
337 protected abstract get worker (): WorkerType
338
339 /**
340 * Pool minimum size.
341 */
342 protected abstract get minSize (): number
343
344 /**
345 * Pool maximum size.
346 */
347 protected abstract get maxSize (): number
348
349 /**
350 * Get the worker given its id.
351 *
352 * @param workerId - The worker id.
353 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
354 */
355 private getWorkerById (workerId: number): Worker | undefined {
356 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
357 ?.worker
358 }
359
360 /**
361 * Gets the given worker its worker node key.
362 *
363 * @param worker - The worker.
364 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
365 */
366 private getWorkerNodeKey (worker: Worker): number {
367 return this.workerNodes.findIndex(
368 workerNode => workerNode.worker === worker
369 )
370 }
371
372 /** @inheritDoc */
373 public setWorkerChoiceStrategy (
374 workerChoiceStrategy: WorkerChoiceStrategy,
375 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
376 ): void {
377 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
378 this.opts.workerChoiceStrategy = workerChoiceStrategy
379 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
380 this.opts.workerChoiceStrategy
381 )
382 if (workerChoiceStrategyOptions != null) {
383 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
384 }
385 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
386 this.setWorkerNodeTasksUsage(
387 workerNode,
388 this.getWorkerUsage(workerNodeKey)
389 )
390 this.setWorkerStatistics(workerNode.worker)
391 }
392 }
393
394 /** @inheritDoc */
395 public setWorkerChoiceStrategyOptions (
396 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
397 ): void {
398 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
399 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
400 this.workerChoiceStrategyContext.setOptions(
401 this.opts.workerChoiceStrategyOptions
402 )
403 }
404
405 /** @inheritDoc */
406 public enableTasksQueue (
407 enable: boolean,
408 tasksQueueOptions?: TasksQueueOptions
409 ): void {
410 if (this.opts.enableTasksQueue === true && !enable) {
411 this.flushTasksQueues()
412 }
413 this.opts.enableTasksQueue = enable
414 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
415 }
416
417 /** @inheritDoc */
418 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
419 if (this.opts.enableTasksQueue === true) {
420 this.checkValidTasksQueueOptions(tasksQueueOptions)
421 this.opts.tasksQueueOptions =
422 this.buildTasksQueueOptions(tasksQueueOptions)
423 } else if (this.opts.tasksQueueOptions != null) {
424 delete this.opts.tasksQueueOptions
425 }
426 }
427
428 private buildTasksQueueOptions (
429 tasksQueueOptions: TasksQueueOptions
430 ): TasksQueueOptions {
431 return {
432 concurrency: tasksQueueOptions?.concurrency ?? 1
433 }
434 }
435
436 /**
437 * Whether the pool is full or not.
438 *
439 * The pool filling boolean status.
440 */
441 protected get full (): boolean {
442 return this.workerNodes.length >= this.maxSize
443 }
444
445 /**
446 * Whether the pool is busy or not.
447 *
448 * The pool busyness boolean status.
449 */
450 protected abstract get busy (): boolean
451
452 /**
453 * Whether worker nodes are executing at least one task.
454 *
455 * @returns Worker nodes busyness boolean status.
456 */
457 protected internalBusy (): boolean {
458 return (
459 this.workerNodes.findIndex(workerNode => {
460 return workerNode.usage.tasks.executing === 0
461 }) === -1
462 )
463 }
464
465 /** @inheritDoc */
466 public async execute (data?: Data, name?: string): Promise<Response> {
467 const timestamp = performance.now()
468 const workerNodeKey = this.chooseWorkerNode()
469 const submittedTask: Task<Data> = {
470 name,
471 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
472 data: data ?? ({} as Data),
473 timestamp,
474 id: crypto.randomUUID()
475 }
476 const res = new Promise<Response>((resolve, reject) => {
477 this.promiseResponseMap.set(submittedTask.id as string, {
478 resolve,
479 reject,
480 worker: this.workerNodes[workerNodeKey].worker
481 })
482 })
483 if (
484 this.opts.enableTasksQueue === true &&
485 (this.busy ||
486 this.workerNodes[workerNodeKey].usage.tasks.executing >=
487 ((this.opts.tasksQueueOptions as TasksQueueOptions)
488 .concurrency as number))
489 ) {
490 this.enqueueTask(workerNodeKey, submittedTask)
491 } else {
492 this.executeTask(workerNodeKey, submittedTask)
493 }
494 this.checkAndEmitEvents()
495 // eslint-disable-next-line @typescript-eslint/return-await
496 return res
497 }
498
499 /** @inheritDoc */
500 public async destroy (): Promise<void> {
501 await Promise.all(
502 this.workerNodes.map(async (workerNode, workerNodeKey) => {
503 this.flushTasksQueue(workerNodeKey)
504 // FIXME: wait for tasks to be finished
505 await this.destroyWorker(workerNode.worker)
506 })
507 )
508 }
509
510 /**
511 * Terminates the given worker.
512 *
513 * @param worker - A worker within `workerNodes`.
514 */
515 protected abstract destroyWorker (worker: Worker): void | Promise<void>
516
517 /**
518 * Setup hook to execute code before worker nodes are created in the abstract constructor.
519 * Can be overridden.
520 *
521 * @virtual
522 */
523 protected setupHook (): void {
524 // Intentionally empty
525 }
526
527 /**
528 * Should return whether the worker is the main worker or not.
529 */
530 protected abstract isMain (): boolean
531
532 /**
533 * Hook executed before the worker task execution.
534 * Can be overridden.
535 *
536 * @param workerNodeKey - The worker node key.
537 * @param task - The task to execute.
538 */
539 protected beforeTaskExecutionHook (
540 workerNodeKey: number,
541 task: Task<Data>
542 ): void {
543 const workerUsage = this.workerNodes[workerNodeKey].usage
544 ++workerUsage.tasks.executing
545 this.updateWaitTimeWorkerUsage(workerUsage, task)
546 }
547
548 /**
549 * Hook executed after the worker task execution.
550 * Can be overridden.
551 *
552 * @param worker - The worker.
553 * @param message - The received message.
554 */
555 protected afterTaskExecutionHook (
556 worker: Worker,
557 message: MessageValue<Response>
558 ): void {
559 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
560 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
561 this.updateRunTimeWorkerUsage(workerUsage, message)
562 this.updateEluWorkerUsage(workerUsage, message)
563 }
564
565 private updateTaskStatisticsWorkerUsage (
566 workerUsage: WorkerUsage,
567 message: MessageValue<Response>
568 ): void {
569 const workerTaskStatistics = workerUsage.tasks
570 --workerTaskStatistics.executing
571 ++workerTaskStatistics.executed
572 if (message.taskError != null) {
573 ++workerTaskStatistics.failed
574 }
575 }
576
577 private updateRunTimeWorkerUsage (
578 workerUsage: WorkerUsage,
579 message: MessageValue<Response>
580 ): void {
581 if (
582 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
583 .aggregate
584 ) {
585 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
586 if (
587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
588 .average &&
589 workerUsage.tasks.executed !== 0
590 ) {
591 workerUsage.runTime.average =
592 workerUsage.runTime.aggregate /
593 (workerUsage.tasks.executed - workerUsage.tasks.failed)
594 }
595 if (
596 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
597 .median &&
598 message.taskPerformance?.runTime != null
599 ) {
600 workerUsage.runTime.history.push(message.taskPerformance.runTime)
601 workerUsage.runTime.median = median(workerUsage.runTime.history)
602 }
603 }
604 }
605
606 private updateWaitTimeWorkerUsage (
607 workerUsage: WorkerUsage,
608 task: Task<Data>
609 ): void {
610 const timestamp = performance.now()
611 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
612 if (
613 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
614 .aggregate
615 ) {
616 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
617 if (
618 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
619 .waitTime.average &&
620 workerUsage.tasks.executed !== 0
621 ) {
622 workerUsage.waitTime.average =
623 workerUsage.waitTime.aggregate /
624 (workerUsage.tasks.executed - workerUsage.tasks.failed)
625 }
626 if (
627 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
628 .waitTime.median &&
629 taskWaitTime != null
630 ) {
631 workerUsage.waitTime.history.push(taskWaitTime)
632 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
633 }
634 }
635 }
636
637 private updateEluWorkerUsage (
638 workerUsage: WorkerUsage,
639 message: MessageValue<Response>
640 ): void {
641 if (
642 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
643 .aggregate
644 ) {
645 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
646 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
647 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
648 workerUsage.elu.utilization =
649 (workerUsage.elu.utilization +
650 message.taskPerformance.elu.utilization) /
651 2
652 } else if (message.taskPerformance?.elu != null) {
653 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
654 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
655 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
656 }
657 if (
658 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
659 .average &&
660 workerUsage.tasks.executed !== 0
661 ) {
662 const executedTasks =
663 workerUsage.tasks.executed - workerUsage.tasks.failed
664 workerUsage.elu.idle.average =
665 workerUsage.elu.idle.aggregate / executedTasks
666 workerUsage.elu.active.average =
667 workerUsage.elu.active.aggregate / executedTasks
668 }
669 if (
670 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
671 .median &&
672 message.taskPerformance?.elu != null
673 ) {
674 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
675 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
676 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
677 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
678 }
679 }
680 }
681
682 /**
683 * Chooses a worker node for the next task.
684 *
685 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
686 *
687 * @returns The worker node key
688 */
689 private chooseWorkerNode (): number {
690 if (this.shallCreateDynamicWorker()) {
691 const worker = this.createAndSetupDynamicWorker()
692 if (
693 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
694 ) {
695 return this.getWorkerNodeKey(worker)
696 }
697 }
698 return this.workerChoiceStrategyContext.execute()
699 }
700
701 /**
702 * Conditions for dynamic worker creation.
703 *
704 * @returns Whether to create a dynamic worker or not.
705 */
706 private shallCreateDynamicWorker (): boolean {
707 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
708 }
709
710 /**
711 * Sends a message to the given worker.
712 *
713 * @param worker - The worker which should receive the message.
714 * @param message - The message.
715 */
716 protected abstract sendToWorker (
717 worker: Worker,
718 message: MessageValue<Data>
719 ): void
720
721 /**
722 * Registers a listener callback on the given worker.
723 *
724 * @param worker - The worker which should register a listener.
725 * @param listener - The message listener callback.
726 */
727 private registerWorkerMessageListener<Message extends Data | Response>(
728 worker: Worker,
729 listener: (message: MessageValue<Message>) => void
730 ): void {
731 worker.on('message', listener as MessageHandler<Worker>)
732 }
733
734 /**
735 * Creates a new worker.
736 *
737 * @returns Newly created worker.
738 */
739 protected abstract createWorker (): Worker
740
741 /**
742 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
743 * Can be overridden.
744 *
745 * @param worker - The newly created worker.
746 */
747 protected afterWorkerSetup (worker: Worker): void {
748 // Listen to worker messages.
749 this.registerWorkerMessageListener(worker, this.workerListener())
750 }
751
752 /**
753 * Creates a new worker and sets it up completely in the pool worker nodes.
754 *
755 * @returns New, completely set up worker.
756 */
757 protected createAndSetupWorker (): Worker {
758 const worker = this.createWorker()
759
760 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
761 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
762 worker.on('error', error => {
763 if (this.emitter != null) {
764 this.emitter.emit(PoolEvents.error, error)
765 }
766 if (this.opts.restartWorkerOnError === true) {
767 this.createAndSetupWorker()
768 }
769 })
770 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
771 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
772 worker.once('exit', () => {
773 this.removeWorkerNode(worker)
774 })
775
776 this.pushWorkerNode(worker)
777
778 this.setWorkerStatistics(worker)
779
780 this.afterWorkerSetup(worker)
781
782 return worker
783 }
784
785 /**
786 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
787 *
788 * @returns New, completely set up dynamic worker.
789 */
790 protected createAndSetupDynamicWorker (): Worker {
791 const worker = this.createAndSetupWorker()
792 this.registerWorkerMessageListener(worker, message => {
793 const workerNodeKey = this.getWorkerNodeKey(worker)
794 if (
795 isKillBehavior(KillBehaviors.HARD, message.kill) ||
796 (message.kill != null &&
797 ((this.opts.enableTasksQueue === false &&
798 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
799 (this.opts.enableTasksQueue === true &&
800 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
801 this.tasksQueueSize(workerNodeKey) === 0)))
802 ) {
803 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
804 void (this.destroyWorker(worker) as Promise<void>)
805 }
806 })
807 return worker
808 }
809
810 /**
811 * This function is the listener registered for each worker message.
812 *
813 * @returns The listener function to execute when a message is received from a worker.
814 */
815 protected workerListener (): (message: MessageValue<Response>) => void {
816 return message => {
817 if (message.workerId != null && message.started != null) {
818 // Worker started message received
819 this.handleWorkerStartedMessage(message)
820 } else if (message.id != null) {
821 // Task execution response received
822 this.handleTaskExecutionResponse(message)
823 }
824 }
825 }
826
827 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
828 // Worker started message received
829 const worker = this.getWorkerById(message.workerId as number)
830 if (worker != null) {
831 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
832 message.started as boolean
833 } else {
834 throw new Error(
835 `Worker started message received from unknown worker '${
836 message.workerId as number
837 }'`
838 )
839 }
840 }
841
842 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
843 const promiseResponse = this.promiseResponseMap.get(message.id as string)
844 if (promiseResponse != null) {
845 if (message.taskError != null) {
846 if (this.emitter != null) {
847 this.emitter.emit(PoolEvents.taskError, message.taskError)
848 }
849 promiseResponse.reject(message.taskError.message)
850 } else {
851 promiseResponse.resolve(message.data as Response)
852 }
853 this.afterTaskExecutionHook(promiseResponse.worker, message)
854 this.promiseResponseMap.delete(message.id as string)
855 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
856 if (
857 this.opts.enableTasksQueue === true &&
858 this.tasksQueueSize(workerNodeKey) > 0
859 ) {
860 this.executeTask(
861 workerNodeKey,
862 this.dequeueTask(workerNodeKey) as Task<Data>
863 )
864 }
865 this.workerChoiceStrategyContext.update(workerNodeKey)
866 }
867 }
868
869 private checkAndEmitEvents (): void {
870 if (this.emitter != null) {
871 if (this.busy) {
872 this.emitter?.emit(PoolEvents.busy, this.info)
873 }
874 if (this.type === PoolTypes.dynamic && this.full) {
875 this.emitter?.emit(PoolEvents.full, this.info)
876 }
877 }
878 }
879
880 /**
881 * Sets the given worker node its tasks usage in the pool.
882 *
883 * @param workerNode - The worker node.
884 * @param workerUsage - The worker usage.
885 */
886 private setWorkerNodeTasksUsage (
887 workerNode: WorkerNode<Worker, Data>,
888 workerUsage: WorkerUsage
889 ): void {
890 workerNode.usage = workerUsage
891 }
892
893 /**
894 * Pushes the given worker in the pool worker nodes.
895 *
896 * @param worker - The worker.
897 * @returns The worker nodes length.
898 */
899 private pushWorkerNode (worker: Worker): number {
900 this.workerNodes.push({
901 worker,
902 info: { id: this.getWorkerId(worker), started: true },
903 usage: this.getWorkerUsage(),
904 tasksQueue: new Queue<Task<Data>>()
905 })
906 const workerNodeKey = this.getWorkerNodeKey(worker)
907 this.setWorkerNodeTasksUsage(
908 this.workerNodes[workerNodeKey],
909 this.getWorkerUsage(workerNodeKey)
910 )
911 return this.workerNodes.length
912 }
913
914 /**
915 * Gets the worker id.
916 *
917 * @param worker - The worker.
918 * @returns The worker id.
919 */
920 private getWorkerId (worker: Worker): number | undefined {
921 if (this.worker === WorkerTypes.thread) {
922 return worker.threadId
923 } else if (this.worker === WorkerTypes.cluster) {
924 return worker.id
925 }
926 }
927
928 // /**
929 // * Sets the given worker in the pool worker nodes.
930 // *
931 // * @param workerNodeKey - The worker node key.
932 // * @param worker - The worker.
933 // * @param workerInfo - The worker info.
934 // * @param workerUsage - The worker usage.
935 // * @param tasksQueue - The worker task queue.
936 // */
937 // private setWorkerNode (
938 // workerNodeKey: number,
939 // worker: Worker,
940 // workerInfo: WorkerInfo,
941 // workerUsage: WorkerUsage,
942 // tasksQueue: Queue<Task<Data>>
943 // ): void {
944 // this.workerNodes[workerNodeKey] = {
945 // worker,
946 // info: workerInfo,
947 // usage: workerUsage,
948 // tasksQueue
949 // }
950 // }
951
952 /**
953 * Removes the given worker from the pool worker nodes.
954 *
955 * @param worker - The worker.
956 */
957 private removeWorkerNode (worker: Worker): void {
958 const workerNodeKey = this.getWorkerNodeKey(worker)
959 if (workerNodeKey !== -1) {
960 this.workerNodes.splice(workerNodeKey, 1)
961 this.workerChoiceStrategyContext.remove(workerNodeKey)
962 }
963 }
964
965 private executeTask (workerNodeKey: number, task: Task<Data>): void {
966 this.beforeTaskExecutionHook(workerNodeKey, task)
967 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
968 }
969
970 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
971 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
972 }
973
974 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
975 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
976 }
977
978 private tasksQueueSize (workerNodeKey: number): number {
979 return this.workerNodes[workerNodeKey].tasksQueue.size
980 }
981
982 private tasksMaxQueueSize (workerNodeKey: number): number {
983 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
984 }
985
986 private flushTasksQueue (workerNodeKey: number): void {
987 if (this.tasksQueueSize(workerNodeKey) > 0) {
988 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
989 this.executeTask(
990 workerNodeKey,
991 this.dequeueTask(workerNodeKey) as Task<Data>
992 )
993 }
994 }
995 this.workerNodes[workerNodeKey].tasksQueue.clear()
996 }
997
998 private flushTasksQueues (): void {
999 for (const [workerNodeKey] of this.workerNodes.entries()) {
1000 this.flushTasksQueue(workerNodeKey)
1001 }
1002 }
1003
1004 private setWorkerStatistics (worker: Worker): void {
1005 this.sendToWorker(worker, {
1006 statistics: {
1007 runTime:
1008 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1009 .runTime.aggregate,
1010 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1011 .elu.aggregate
1012 }
1013 })
1014 }
1015
1016 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
1017 const getTasksQueueSize = (workerNodeKey?: number): number => {
1018 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
1019 }
1020 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1021 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1022 }
1023 return {
1024 tasks: {
1025 executed: 0,
1026 executing: 0,
1027 get queued (): number {
1028 return getTasksQueueSize(workerNodeKey)
1029 },
1030 get maxQueued (): number {
1031 return getTasksMaxQueueSize(workerNodeKey)
1032 },
1033 failed: 0
1034 },
1035 runTime: {
1036 aggregate: 0,
1037 average: 0,
1038 median: 0,
1039 history: new CircularArray()
1040 },
1041 waitTime: {
1042 aggregate: 0,
1043 average: 0,
1044 median: 0,
1045 history: new CircularArray()
1046 },
1047 elu: {
1048 idle: {
1049 aggregate: 0,
1050 average: 0,
1051 median: 0,
1052 history: new CircularArray()
1053 },
1054 active: {
1055 aggregate: 0,
1056 average: 0,
1057 median: 0,
1058 history: new CircularArray()
1059 },
1060 utilization: 0
1061 }
1062 }
1063 }
1064}