b33d917c14a4b7865f60acd80029dd5522c3ebd0
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitterAsyncResource } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import { WorkerAbstract } from './WorkerAbstract.js';
7 import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js';
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents,
16 } from './WorkerTypes.js';
17 import { randomizeDelay, sleep } from './WorkerUtils.js';
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter: EventEmitterAsyncResource | undefined;
21 private readonly workerSet: Set<WorkerSetElement>;
22 private started: boolean;
23 private workerStartup: boolean;
24
25 /**
26 * Creates a new `WorkerSet`.
27 *
28 * @param workerScript -
29 * @param workerOptions -
30 */
31 constructor(workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions);
33 if (this.workerOptions.elementsPerWorker == null) {
34 throw new TypeError('Elements per worker is not defined');
35 }
36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
37 throw new TypeError('Elements per worker must be an integer');
38 }
39 if (this.workerOptions.elementsPerWorker <= 0) {
40 throw new RangeError('Elements per worker must be greater than zero');
41 }
42 this.workerSet = new Set<WorkerSetElement>();
43 if (this.workerOptions.poolOptions?.enableEvents) {
44 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' });
45 }
46 this.started = false;
47 this.workerStartup = false;
48 }
49
50 get info(): SetInfo {
51 return {
52 version: workerSetVersion,
53 type: 'set',
54 worker: 'thread',
55 size: this.size,
56 elementsExecuting: [...this.workerSet].reduce(
57 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
58 0,
59 ),
60 elementsPerWorker: this.maxElementsPerWorker!,
61 };
62 }
63
64 get size(): number {
65 return this.workerSet.size;
66 }
67
68 get maxElementsPerWorker(): number | undefined {
69 return this.workerOptions.elementsPerWorker;
70 }
71
72 /** @inheritDoc */
73 public async start(): Promise<void> {
74 this.addWorkerSetElement();
75 // Add worker set element sequentially to optimize memory at startup
76 this.workerOptions.workerStartDelay! > 0 &&
77 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
78 this.emitter?.emit(WorkerSetEvents.started, this.info);
79 this.started = true;
80 }
81
82 /** @inheritDoc */
83 public async stop(): Promise<void> {
84 for (const workerSetElement of this.workerSet) {
85 const worker = workerSetElement.worker;
86 const waitWorkerExit = new Promise<void>((resolve) => {
87 worker.once('exit', () => {
88 resolve();
89 });
90 });
91 await worker.terminate();
92 await waitWorkerExit;
93 this.emitter?.emit(WorkerSetEvents.stopped, this.info);
94 this.emitter?.emitDestroy();
95 this.emitter?.removeAllListeners();
96 this.started = false;
97 }
98 }
99
100 /** @inheritDoc */
101 public async addElement(elementData: WorkerData): Promise<void> {
102 if (!this.started) {
103 throw new Error('Cannot add a WorkerSet element: not started');
104 }
105 if (this.workerSet == null) {
106 throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
107 }
108 const workerSetElement = await this.getWorkerSetElement();
109 workerSetElement.worker.postMessage({
110 event: WorkerMessageEvents.startWorkerElement,
111 data: elementData,
112 });
113 ++workerSetElement.numberOfWorkerElements;
114 // Add element sequentially to optimize memory at startup
115 if (this.workerOptions.elementStartDelay! > 0) {
116 await sleep(randomizeDelay(this.workerOptions.elementStartDelay!));
117 }
118 }
119
120 /**
121 * Adds a new `WorkerSetElement`.
122 */
123 private addWorkerSetElement(): WorkerSetElement {
124 this.workerStartup = true;
125 const worker = new Worker(this.workerScript, {
126 env: SHARE_ENV,
127 ...this.workerOptions.poolOptions?.workerOptions,
128 });
129 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
130 worker.on('message', (message: WorkerMessage<WorkerData>) => {
131 if (message.event === WorkerMessageEvents.startedWorkerElement) {
132 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
133 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
134 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
135 }
136 });
137 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
138 worker.on('error', (error) => {
139 this.emitter?.emit(WorkerSetEvents.error, error);
140 if (
141 this.workerOptions.poolOptions?.restartWorkerOnError &&
142 this.started &&
143 !this.workerStartup
144 ) {
145 this.addWorkerSetElement();
146 }
147 });
148 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
149 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
150 worker.once('exit', () =>
151 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
152 );
153 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
154 this.workerSet.add(workerSetElement);
155 this.workerStartup = false;
156 return workerSetElement;
157 }
158
159 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
160 this.workerSet.delete(workerSetElement);
161 }
162
163 private async getWorkerSetElement(): Promise<WorkerSetElement> {
164 let chosenWorkerSetElement: WorkerSetElement | undefined;
165 for (const workerSetElement of this.workerSet) {
166 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
167 chosenWorkerSetElement = workerSetElement;
168 break;
169 }
170 }
171 if (!chosenWorkerSetElement) {
172 chosenWorkerSetElement = this.addWorkerSetElement();
173 // Add worker set element sequentially to optimize memory at startup
174 this.workerOptions.workerStartDelay! > 0 &&
175 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
176 }
177 return chosenWorkerSetElement;
178 }
179
180 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
181 let workerSetElt: WorkerSetElement | undefined;
182 for (const workerSetElement of this.workerSet) {
183 if (workerSetElement.worker.threadId === worker.threadId) {
184 workerSetElt = workerSetElement;
185 break;
186 }
187 }
188 return workerSetElt;
189 }
190 }