refactor: rename ChargingStationConfigurationUtils.ts -> ChargingStationConfiguration...
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
d972af76 3import { EventEmitter } from 'node:events';
be245fda 4import { SHARE_ENV, Worker } from 'node:worker_threads';
c045d9a9 5
4a3807d1
JB
6import type { ThreadPoolOptions } from 'poolifier';
7
268a74bb 8import { WorkerAbstract } from './WorkerAbstract';
59b6ed8d 9import { WorkerConstants } from './WorkerConstants';
0e4fa348 10import {
b779c0f8 11 type SetInfo,
0e4fa348 12 type WorkerData,
c26984f2 13 type WorkerMessage,
0e4fa348
JB
14 WorkerMessageEvents,
15 type WorkerOptions,
16 type WorkerSetElement,
810f4caf 17 WorkerSetEvents,
268a74bb 18} from './WorkerTypes';
be245fda 19import { sleep } from './WorkerUtils';
6013bc53 20
4a3807d1
JB
21const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
22 enableEvents: true,
23 restartWorkerOnError: true,
24};
25
268a74bb 26export class WorkerSet extends WorkerAbstract<WorkerData> {
a37fc6dc 27 public readonly emitter!: EventEmitter;
f2bf9948 28 private readonly workerSet: Set<WorkerSetElement>;
6013bc53
JB
29
30 /**
361c98f5 31 * Creates a new `WorkerSet`.
6013bc53 32 *
0e4fa348
JB
33 * @param workerScript -
34 * @param workerOptions -
6013bc53 35 */
4a3807d1 36 constructor(workerScript: string, workerOptions: WorkerOptions) {
4d7227e6 37 super(workerScript, workerOptions);
29bb4dee 38 this.workerOptions.poolOptions = {
4a3807d1 39 ...DEFAULT_POOL_OPTIONS,
29bb4dee
JB
40 ...this.workerOptions.poolOptions,
41 };
ffd71f2c 42 this.workerSet = new Set<WorkerSetElement>();
a37fc6dc 43 if (this.workerOptions.poolOptions?.enableEvents) {
d972af76 44 this.emitter = new EventEmitter();
29bb4dee 45 }
6013bc53
JB
46 }
47
b779c0f8
JB
48 get info(): SetInfo {
49 return {
628c30e5 50 version: WorkerConstants.version,
0bde1ea1
JB
51 type: 'set',
52 worker: 'thread',
b779c0f8 53 size: this.size,
19bdf4ca 54 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 55 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
5edd8ba0 56 0,
b779c0f8 57 ),
e1d9a0f4 58 elementsPerWorker: this.maxElementsPerWorker!,
b779c0f8
JB
59 };
60 }
61
6013bc53 62 get size(): number {
ded13d97 63 return this.workerSet.size;
6013bc53
JB
64 }
65
72092cfc 66 get maxElementsPerWorker(): number | undefined {
4d7227e6
JB
67 return this.workerOptions.elementsPerWorker;
68 }
69
b0dee778
JB
70 /** @inheritDoc */
71 public async start(): Promise<void> {
72 this.addWorkerSetElement();
73 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4 74 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
b0dee778
JB
75 }
76
77 /** @inheritDoc */
78 public async stop(): Promise<void> {
79 for (const workerSetElement of this.workerSet) {
b3ded6ae 80 const worker = workerSetElement.worker;
bd62e88f 81 const waitWorkerExit = new Promise<void>((resolve) => {
b3ded6ae 82 worker.on('exit', () => {
dbc29904
JB
83 resolve();
84 });
85 });
b3ded6ae 86 await worker.terminate();
bd62e88f 87 await waitWorkerExit;
b0dee778 88 }
b0dee778
JB
89 }
90
8baf3f8f 91 /** @inheritDoc */
c3ee95af 92 public async addElement(elementData: WorkerData): Promise<void> {
ded13d97 93 if (!this.workerSet) {
e7aeea18 94 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
6013bc53 95 }
962a8159
JB
96 const workerSetElement = await this.getWorkerSetElement();
97 workerSetElement.worker.postMessage({
2bb7a73e 98 event: WorkerMessageEvents.startWorkerElement,
e7aeea18 99 data: elementData,
d070d967 100 });
962a8159
JB
101 ++workerSetElement.numberOfWorkerElements;
102 // Add element sequentially to optimize memory at startup
e1d9a0f4
JB
103 if (this.workerOptions.elementStartDelay! > 0) {
104 await sleep(this.workerOptions.elementStartDelay!);
d070d967 105 }
6013bc53
JB
106 }
107
6013bc53 108 /**
361c98f5 109 * Adds a new `WorkerSetElement`.
6013bc53 110 */
962a8159 111 private addWorkerSetElement(): WorkerSetElement {
be245fda
JB
112 const worker = new Worker(this.workerScript, {
113 env: SHARE_ENV,
e1d9a0f4 114 ...this.workerOptions.poolOptions?.workerOptions,
be245fda 115 });
0e4fa348
JB
116 worker.on(
117 'message',
a37fc6dc 118 this.workerOptions.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
be245fda 119 );
c26984f2
JB
120 worker.on('message', (message: WorkerMessage<WorkerData>) => {
121 if (message.event === WorkerMessageEvents.startedWorkerElement) {
bdb50f5a 122 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
c26984f2
JB
123 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
124 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
125 }
126 });
be245fda
JB
127 worker.on(
128 'error',
a37fc6dc 129 this.workerOptions.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
0e4fa348 130 );
962a8159 131 worker.on('error', (error) => {
fac8866f 132 this.emitter?.emit(WorkerSetEvents.error, error);
a37fc6dc 133 if (this.workerOptions.poolOptions?.restartWorkerOnError) {
29bb4dee
JB
134 this.addWorkerSetElement();
135 }
962a8159 136 });
be245fda
JB
137 worker.on(
138 'online',
a37fc6dc 139 this.workerOptions.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
be245fda
JB
140 );
141 worker.on(
142 'exit',
a37fc6dc 143 this.workerOptions.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
be245fda 144 );
dbc29904 145 worker.once('exit', () =>
e1d9a0f4 146 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
dbc29904 147 );
962a8159
JB
148 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
149 this.workerSet.add(workerSetElement);
150 return workerSetElement;
6013bc53
JB
151 }
152
dbc29904
JB
153 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
154 this.workerSet.delete(workerSetElement);
155 }
156
962a8159 157 private async getWorkerSetElement(): Promise<WorkerSetElement> {
e1d9a0f4 158 let chosenWorkerSetElement: WorkerSetElement | undefined;
962a8159 159 for (const workerSetElement of this.workerSet) {
e1d9a0f4 160 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
962a8159
JB
161 chosenWorkerSetElement = workerSetElement;
162 break;
163 }
e7aeea18 164 }
962a8159
JB
165 if (!chosenWorkerSetElement) {
166 chosenWorkerSetElement = this.addWorkerSetElement();
167 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4
JB
168 this.workerOptions.workerStartDelay! > 0 &&
169 (await sleep(this.workerOptions.workerStartDelay!));
962a8159
JB
170 }
171 return chosenWorkerSetElement;
c045d9a9
JB
172 }
173
962a8159 174 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
e1d9a0f4 175 let workerSetElt: WorkerSetElement | undefined;
0e7a11e1 176 for (const workerSetElement of this.workerSet) {
81696bd5 177 if (workerSetElement.worker.threadId === worker.threadId) {
1e924543 178 workerSetElt = workerSetElement;
0e7a11e1 179 break;
1e924543 180 }
0e7a11e1 181 }
1e924543
JB
182 return workerSetElt;
183 }
6013bc53 184}