refactor: cleanup imports
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import { WorkerAbstract } from './WorkerAbstract';
7 import { WorkerConstants } from './WorkerConstants';
8 import {
9 type SetInfo,
10 type WorkerData,
11 WorkerMessageEvents,
12 type WorkerOptions,
13 type WorkerSetElement,
14 WorkerSetEvents,
15 } from './WorkerTypes';
16 import { sleep } from './WorkerUtils';
17
18 export class WorkerSet extends WorkerAbstract<WorkerData> {
19 public readonly emitter: EventEmitter;
20 private readonly workerSet: Set<WorkerSetElement>;
21
22 /**
23 * Creates a new `WorkerSet`.
24 *
25 * @param workerScript -
26 * @param workerOptions -
27 */
28 constructor(workerScript: string, workerOptions?: WorkerOptions) {
29 super(workerScript, workerOptions);
30 this.workerOptions.poolOptions = {
31 ...{
32 enableEvents: true,
33 restartWorkerOnError: true,
34 },
35 ...this.workerOptions.poolOptions,
36 };
37 this.workerSet = new Set<WorkerSetElement>();
38 if (this.workerOptions?.poolOptions?.enableEvents) {
39 this.emitter = new EventEmitter();
40 }
41 }
42
43 get info(): SetInfo {
44 return {
45 version: WorkerConstants.version,
46 type: 'set',
47 worker: 'thread',
48 size: this.size,
49 elementsExecuting: [...this.workerSet].reduce(
50 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
51 0
52 ),
53 elementsPerWorker: this.maxElementsPerWorker,
54 };
55 }
56
57 get size(): number {
58 return this.workerSet.size;
59 }
60
61 get maxElementsPerWorker(): number | undefined {
62 return this.workerOptions.elementsPerWorker;
63 }
64
65 /** @inheritDoc */
66 public async start(): Promise<void> {
67 this.addWorkerSetElement();
68 // Add worker set element sequentially to optimize memory at startup
69 this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
70 }
71
72 /** @inheritDoc */
73 public async stop(): Promise<void> {
74 for (const workerSetElement of this.workerSet) {
75 await workerSetElement.worker.terminate();
76 }
77 this.workerSet.clear();
78 }
79
80 /** @inheritDoc */
81 public async addElement(elementData: WorkerData): Promise<void> {
82 if (!this.workerSet) {
83 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
84 }
85 const workerSetElement = await this.getWorkerSetElement();
86 workerSetElement.worker.postMessage({
87 id: WorkerMessageEvents.startWorkerElement,
88 data: elementData,
89 });
90 ++workerSetElement.numberOfWorkerElements;
91 // Add element sequentially to optimize memory at startup
92 if (this.workerOptions.elementStartDelay > 0) {
93 await sleep(this.workerOptions.elementStartDelay);
94 }
95 }
96
97 /**
98 * Adds a new `WorkerSetElement`.
99 */
100 private addWorkerSetElement(): WorkerSetElement {
101 const worker = new Worker(this.workerScript, {
102 env: SHARE_ENV,
103 ...this.workerOptions.poolOptions.workerOptions,
104 });
105 worker.on(
106 'message',
107 this.workerOptions?.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION
108 );
109 worker.on(
110 'error',
111 this.workerOptions?.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION
112 );
113 worker.on('error', (error) => {
114 if (this.emitter !== undefined) {
115 this.emitter.emit(WorkerSetEvents.error, error);
116 }
117 if (this.workerOptions?.poolOptions?.restartWorkerOnError) {
118 this.addWorkerSetElement();
119 }
120 });
121 worker.on(
122 'online',
123 this.workerOptions?.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION
124 );
125 worker.on(
126 'exit',
127 this.workerOptions?.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION
128 );
129 worker.once('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker)));
130 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
131 this.workerSet.add(workerSetElement);
132 return workerSetElement;
133 }
134
135 private async getWorkerSetElement(): Promise<WorkerSetElement> {
136 let chosenWorkerSetElement: WorkerSetElement;
137 for (const workerSetElement of this.workerSet) {
138 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker) {
139 chosenWorkerSetElement = workerSetElement;
140 break;
141 }
142 }
143 if (!chosenWorkerSetElement) {
144 chosenWorkerSetElement = this.addWorkerSetElement();
145 // Add worker set element sequentially to optimize memory at startup
146 this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
147 }
148 return chosenWorkerSetElement;
149 }
150
151 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
152 let workerSetElt: WorkerSetElement;
153 for (const workerSetElement of this.workerSet) {
154 if (workerSetElement.worker.threadId === worker.threadId) {
155 workerSetElt = workerSetElement;
156 break;
157 }
158 }
159 return workerSetElt;
160 }
161 }