refactor: cleanup performance statistics code
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
d972af76 3import { EventEmitter } from 'node:events';
be245fda 4import { SHARE_ENV, Worker } from 'node:worker_threads';
c045d9a9 5
268a74bb 6import { WorkerAbstract } from './WorkerAbstract';
769d3b10 7import { DEFAULT_POOL_OPTIONS, EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
0e4fa348 8import {
b779c0f8 9 type SetInfo,
0e4fa348 10 type WorkerData,
c26984f2 11 type WorkerMessage,
0e4fa348
JB
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
810f4caf 15 WorkerSetEvents,
268a74bb 16} from './WorkerTypes';
be245fda 17import { sleep } from './WorkerUtils';
6013bc53 18
268a74bb 19export class WorkerSet extends WorkerAbstract<WorkerData> {
a37fc6dc 20 public readonly emitter!: EventEmitter;
f2bf9948 21 private readonly workerSet: Set<WorkerSetElement>;
156c5f4e 22 private workerStartup: boolean;
6013bc53
JB
23
24 /**
361c98f5 25 * Creates a new `WorkerSet`.
6013bc53 26 *
0e4fa348
JB
27 * @param workerScript -
28 * @param workerOptions -
6013bc53 29 */
4a3807d1 30 constructor(workerScript: string, workerOptions: WorkerOptions) {
4d7227e6 31 super(workerScript, workerOptions);
81027aa5
JB
32 if (
33 this.workerOptions.elementsPerWorker === null ||
34 this.workerOptions.elementsPerWorker === undefined
35 ) {
36 throw new TypeError('Elements per worker is not defined');
37 }
f028efca 38 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
81027aa5
JB
39 throw new TypeError('Elements per worker must be an integer');
40 }
41 if (this.workerOptions.elementsPerWorker <= 0) {
42 throw new RangeError('Elements per worker must be greater than zero');
43 }
29bb4dee 44 this.workerOptions.poolOptions = {
4a3807d1 45 ...DEFAULT_POOL_OPTIONS,
29bb4dee
JB
46 ...this.workerOptions.poolOptions,
47 };
ffd71f2c 48 this.workerSet = new Set<WorkerSetElement>();
a37fc6dc 49 if (this.workerOptions.poolOptions?.enableEvents) {
d972af76 50 this.emitter = new EventEmitter();
29bb4dee 51 }
156c5f4e 52 this.workerStartup = false;
6013bc53
JB
53 }
54
b779c0f8
JB
55 get info(): SetInfo {
56 return {
769d3b10 57 version: workerSetVersion,
0bde1ea1
JB
58 type: 'set',
59 worker: 'thread',
b779c0f8 60 size: this.size,
19bdf4ca 61 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 62 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
5edd8ba0 63 0,
b779c0f8 64 ),
e1d9a0f4 65 elementsPerWorker: this.maxElementsPerWorker!,
b779c0f8
JB
66 };
67 }
68
6013bc53 69 get size(): number {
ded13d97 70 return this.workerSet.size;
6013bc53
JB
71 }
72
72092cfc 73 get maxElementsPerWorker(): number | undefined {
4d7227e6
JB
74 return this.workerOptions.elementsPerWorker;
75 }
76
b0dee778
JB
77 /** @inheritDoc */
78 public async start(): Promise<void> {
79 this.addWorkerSetElement();
80 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4 81 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
b0dee778
JB
82 }
83
84 /** @inheritDoc */
85 public async stop(): Promise<void> {
86 for (const workerSetElement of this.workerSet) {
b3ded6ae 87 const worker = workerSetElement.worker;
bd62e88f 88 const waitWorkerExit = new Promise<void>((resolve) => {
b3ded6ae 89 worker.on('exit', () => {
dbc29904
JB
90 resolve();
91 });
92 });
b3ded6ae 93 await worker.terminate();
bd62e88f 94 await waitWorkerExit;
b0dee778 95 }
b0dee778
JB
96 }
97
8baf3f8f 98 /** @inheritDoc */
c3ee95af 99 public async addElement(elementData: WorkerData): Promise<void> {
ded13d97 100 if (!this.workerSet) {
e7aeea18 101 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
6013bc53 102 }
962a8159
JB
103 const workerSetElement = await this.getWorkerSetElement();
104 workerSetElement.worker.postMessage({
2bb7a73e 105 event: WorkerMessageEvents.startWorkerElement,
e7aeea18 106 data: elementData,
d070d967 107 });
962a8159
JB
108 ++workerSetElement.numberOfWorkerElements;
109 // Add element sequentially to optimize memory at startup
e1d9a0f4
JB
110 if (this.workerOptions.elementStartDelay! > 0) {
111 await sleep(this.workerOptions.elementStartDelay!);
d070d967 112 }
6013bc53
JB
113 }
114
6013bc53 115 /**
361c98f5 116 * Adds a new `WorkerSetElement`.
6013bc53 117 */
962a8159 118 private addWorkerSetElement(): WorkerSetElement {
156c5f4e 119 this.workerStartup = true;
be245fda
JB
120 const worker = new Worker(this.workerScript, {
121 env: SHARE_ENV,
e1d9a0f4 122 ...this.workerOptions.poolOptions?.workerOptions,
be245fda 123 });
769d3b10 124 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
c26984f2
JB
125 worker.on('message', (message: WorkerMessage<WorkerData>) => {
126 if (message.event === WorkerMessageEvents.startedWorkerElement) {
bdb50f5a 127 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
c26984f2
JB
128 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
129 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
130 }
131 });
769d3b10 132 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
962a8159 133 worker.on('error', (error) => {
fac8866f 134 this.emitter?.emit(WorkerSetEvents.error, error);
156c5f4e 135 if (this.workerOptions.poolOptions?.restartWorkerOnError && !this.workerStartup) {
29bb4dee
JB
136 this.addWorkerSetElement();
137 }
962a8159 138 });
769d3b10
JB
139 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
140 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
dbc29904 141 worker.once('exit', () =>
e1d9a0f4 142 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
dbc29904 143 );
962a8159
JB
144 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
145 this.workerSet.add(workerSetElement);
3fb26594 146 this.workerStartup = false;
962a8159 147 return workerSetElement;
6013bc53
JB
148 }
149
dbc29904
JB
150 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
151 this.workerSet.delete(workerSetElement);
152 }
153
962a8159 154 private async getWorkerSetElement(): Promise<WorkerSetElement> {
e1d9a0f4 155 let chosenWorkerSetElement: WorkerSetElement | undefined;
962a8159 156 for (const workerSetElement of this.workerSet) {
e1d9a0f4 157 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
962a8159
JB
158 chosenWorkerSetElement = workerSetElement;
159 break;
160 }
e7aeea18 161 }
962a8159
JB
162 if (!chosenWorkerSetElement) {
163 chosenWorkerSetElement = this.addWorkerSetElement();
164 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4
JB
165 this.workerOptions.workerStartDelay! > 0 &&
166 (await sleep(this.workerOptions.workerStartDelay!));
962a8159
JB
167 }
168 return chosenWorkerSetElement;
c045d9a9
JB
169 }
170
962a8159 171 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
e1d9a0f4 172 let workerSetElt: WorkerSetElement | undefined;
0e7a11e1 173 for (const workerSetElement of this.workerSet) {
81696bd5 174 if (workerSetElement.worker.threadId === worker.threadId) {
1e924543 175 workerSetElt = workerSetElement;
0e7a11e1 176 break;
1e924543 177 }
0e7a11e1 178 }
1e924543
JB
179 return workerSetElt;
180 }
6013bc53 181}