62ddf8079f2f6377cd0f95fe62daac629a27a9bb
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitterAsyncResource } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import { WorkerAbstract } from './WorkerAbstract';
7 import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents,
16 } from './WorkerTypes';
17 import { randomizeDelay, sleep } from './WorkerUtils';
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter: EventEmitterAsyncResource | undefined;
21 private readonly workerSet: Set<WorkerSetElement>;
22 private started: boolean;
23 private workerStartup: boolean;
24
25 /**
26 * Creates a new `WorkerSet`.
27 *
28 * @param workerScript -
29 * @param workerOptions -
30 */
31 constructor(workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions);
33 if (
34 this.workerOptions.elementsPerWorker === null ||
35 this.workerOptions.elementsPerWorker === undefined
36 ) {
37 throw new TypeError('Elements per worker is not defined');
38 }
39 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
40 throw new TypeError('Elements per worker must be an integer');
41 }
42 if (this.workerOptions.elementsPerWorker <= 0) {
43 throw new RangeError('Elements per worker must be greater than zero');
44 }
45 this.workerSet = new Set<WorkerSetElement>();
46 if (this.workerOptions.poolOptions?.enableEvents) {
47 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' });
48 }
49 this.started = false;
50 this.workerStartup = false;
51 }
52
53 get info(): SetInfo {
54 return {
55 version: workerSetVersion,
56 type: 'set',
57 worker: 'thread',
58 size: this.size,
59 elementsExecuting: [...this.workerSet].reduce(
60 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
61 0,
62 ),
63 elementsPerWorker: this.maxElementsPerWorker!,
64 };
65 }
66
67 get size(): number {
68 return this.workerSet.size;
69 }
70
71 get maxElementsPerWorker(): number | undefined {
72 return this.workerOptions.elementsPerWorker;
73 }
74
75 /** @inheritDoc */
76 public async start(): Promise<void> {
77 this.addWorkerSetElement();
78 // Add worker set element sequentially to optimize memory at startup
79 this.workerOptions.workerStartDelay! > 0 &&
80 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
81 this.emitter?.emit(WorkerSetEvents.started, this.info);
82 this.started = true;
83 }
84
85 /** @inheritDoc */
86 public async stop(): Promise<void> {
87 for (const workerSetElement of this.workerSet) {
88 const worker = workerSetElement.worker;
89 const waitWorkerExit = new Promise<void>((resolve) => {
90 worker.once('exit', () => {
91 resolve();
92 });
93 });
94 await worker.terminate();
95 await waitWorkerExit;
96 this.emitter?.emit(WorkerSetEvents.stopped, this.info);
97 this.emitter?.emitDestroy();
98 this.emitter?.removeAllListeners();
99 this.started = false;
100 }
101 }
102
103 /** @inheritDoc */
104 public async addElement(elementData: WorkerData): Promise<void> {
105 if (!this.started) {
106 throw new Error('Cannot add a WorkerSet element: not started');
107 }
108 if (!this.workerSet) {
109 throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
110 }
111 const workerSetElement = await this.getWorkerSetElement();
112 workerSetElement.worker.postMessage({
113 event: WorkerMessageEvents.startWorkerElement,
114 data: elementData,
115 });
116 ++workerSetElement.numberOfWorkerElements;
117 // Add element sequentially to optimize memory at startup
118 if (this.workerOptions.elementStartDelay! > 0) {
119 await sleep(randomizeDelay(this.workerOptions.elementStartDelay!));
120 }
121 }
122
123 /**
124 * Adds a new `WorkerSetElement`.
125 */
126 private addWorkerSetElement(): WorkerSetElement {
127 this.workerStartup = true;
128 const worker = new Worker(this.workerScript, {
129 env: SHARE_ENV,
130 ...this.workerOptions.poolOptions?.workerOptions,
131 });
132 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
133 worker.on('message', (message: WorkerMessage<WorkerData>) => {
134 if (message.event === WorkerMessageEvents.startedWorkerElement) {
135 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
136 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
137 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
138 }
139 });
140 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
141 worker.on('error', (error) => {
142 this.emitter?.emit(WorkerSetEvents.error, error);
143 if (
144 this.workerOptions.poolOptions?.restartWorkerOnError &&
145 this.started &&
146 !this.workerStartup
147 ) {
148 this.addWorkerSetElement();
149 }
150 });
151 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
152 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
153 worker.once('exit', () =>
154 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
155 );
156 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
157 this.workerSet.add(workerSetElement);
158 this.workerStartup = false;
159 return workerSetElement;
160 }
161
162 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
163 this.workerSet.delete(workerSetElement);
164 }
165
166 private async getWorkerSetElement(): Promise<WorkerSetElement> {
167 let chosenWorkerSetElement: WorkerSetElement | undefined;
168 for (const workerSetElement of this.workerSet) {
169 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
170 chosenWorkerSetElement = workerSetElement;
171 break;
172 }
173 }
174 if (!chosenWorkerSetElement) {
175 chosenWorkerSetElement = this.addWorkerSetElement();
176 // Add worker set element sequentially to optimize memory at startup
177 this.workerOptions.workerStartDelay! > 0 &&
178 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
179 }
180 return chosenWorkerSetElement;
181 }
182
183 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
184 let workerSetElt: WorkerSetElement | undefined;
185 for (const workerSetElement of this.workerSet) {
186 if (workerSetElement.worker.threadId === worker.threadId) {
187 workerSetElt = workerSetElement;
188 break;
189 }
190 }
191 return workerSetElt;
192 }
193 }