build: switch to NodeNext module resolution
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
38b2428f 3import { EventEmitterAsyncResource } from 'node:events';
be245fda 4import { SHARE_ENV, Worker } from 'node:worker_threads';
c045d9a9 5
a6ef1ece
JB
6import { WorkerAbstract } from './WorkerAbstract.js';
7import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js';
0e4fa348 8import {
b779c0f8 9 type SetInfo,
0e4fa348 10 type WorkerData,
c26984f2 11 type WorkerMessage,
0e4fa348
JB
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
810f4caf 15 WorkerSetEvents,
a6ef1ece
JB
16} from './WorkerTypes.js';
17import { randomizeDelay, sleep } from './WorkerUtils.js';
6013bc53 18
268a74bb 19export class WorkerSet extends WorkerAbstract<WorkerData> {
38b2428f 20 public readonly emitter: EventEmitterAsyncResource | undefined;
f2bf9948 21 private readonly workerSet: Set<WorkerSetElement>;
c81424b8 22 private started: boolean;
156c5f4e 23 private workerStartup: boolean;
6013bc53
JB
24
25 /**
361c98f5 26 * Creates a new `WorkerSet`.
6013bc53 27 *
0e4fa348
JB
28 * @param workerScript -
29 * @param workerOptions -
6013bc53 30 */
4a3807d1 31 constructor(workerScript: string, workerOptions: WorkerOptions) {
4d7227e6 32 super(workerScript, workerOptions);
27662cf1 33 if (this.workerOptions.elementsPerWorker == null) {
81027aa5
JB
34 throw new TypeError('Elements per worker is not defined');
35 }
f028efca 36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
81027aa5
JB
37 throw new TypeError('Elements per worker must be an integer');
38 }
39 if (this.workerOptions.elementsPerWorker <= 0) {
40 throw new RangeError('Elements per worker must be greater than zero');
41 }
ffd71f2c 42 this.workerSet = new Set<WorkerSetElement>();
a37fc6dc 43 if (this.workerOptions.poolOptions?.enableEvents) {
38b2428f 44 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' });
29bb4dee 45 }
c81424b8 46 this.started = false;
156c5f4e 47 this.workerStartup = false;
6013bc53
JB
48 }
49
b779c0f8
JB
50 get info(): SetInfo {
51 return {
769d3b10 52 version: workerSetVersion,
0bde1ea1
JB
53 type: 'set',
54 worker: 'thread',
b779c0f8 55 size: this.size,
19bdf4ca 56 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 57 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
5edd8ba0 58 0,
b779c0f8 59 ),
e1d9a0f4 60 elementsPerWorker: this.maxElementsPerWorker!,
b779c0f8
JB
61 };
62 }
63
6013bc53 64 get size(): number {
ded13d97 65 return this.workerSet.size;
6013bc53
JB
66 }
67
72092cfc 68 get maxElementsPerWorker(): number | undefined {
4d7227e6
JB
69 return this.workerOptions.elementsPerWorker;
70 }
71
b0dee778
JB
72 /** @inheritDoc */
73 public async start(): Promise<void> {
74 this.addWorkerSetElement();
75 // Add worker set element sequentially to optimize memory at startup
ab93b184
JB
76 this.workerOptions.workerStartDelay! > 0 &&
77 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
4c7c767b 78 this.emitter?.emit(WorkerSetEvents.started, this.info);
c81424b8 79 this.started = true;
b0dee778
JB
80 }
81
82 /** @inheritDoc */
83 public async stop(): Promise<void> {
84 for (const workerSetElement of this.workerSet) {
b3ded6ae 85 const worker = workerSetElement.worker;
bd62e88f 86 const waitWorkerExit = new Promise<void>((resolve) => {
ae3a41a1 87 worker.once('exit', () => {
dbc29904
JB
88 resolve();
89 });
90 });
b3ded6ae 91 await worker.terminate();
bd62e88f 92 await waitWorkerExit;
4c7c767b 93 this.emitter?.emit(WorkerSetEvents.stopped, this.info);
38b2428f 94 this.emitter?.emitDestroy();
f0737681 95 this.emitter?.removeAllListeners();
c81424b8 96 this.started = false;
b0dee778 97 }
b0dee778
JB
98 }
99
8baf3f8f 100 /** @inheritDoc */
c3ee95af 101 public async addElement(elementData: WorkerData): Promise<void> {
c81424b8
JB
102 if (!this.started) {
103 throw new Error('Cannot add a WorkerSet element: not started');
104 }
a78c196b 105 if (this.workerSet == null) {
c81424b8 106 throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
6013bc53 107 }
962a8159
JB
108 const workerSetElement = await this.getWorkerSetElement();
109 workerSetElement.worker.postMessage({
2bb7a73e 110 event: WorkerMessageEvents.startWorkerElement,
e7aeea18 111 data: elementData,
d070d967 112 });
962a8159
JB
113 ++workerSetElement.numberOfWorkerElements;
114 // Add element sequentially to optimize memory at startup
e1d9a0f4 115 if (this.workerOptions.elementStartDelay! > 0) {
ab93b184 116 await sleep(randomizeDelay(this.workerOptions.elementStartDelay!));
d070d967 117 }
6013bc53
JB
118 }
119
6013bc53 120 /**
361c98f5 121 * Adds a new `WorkerSetElement`.
6013bc53 122 */
962a8159 123 private addWorkerSetElement(): WorkerSetElement {
156c5f4e 124 this.workerStartup = true;
be245fda
JB
125 const worker = new Worker(this.workerScript, {
126 env: SHARE_ENV,
e1d9a0f4 127 ...this.workerOptions.poolOptions?.workerOptions,
be245fda 128 });
769d3b10 129 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
c26984f2
JB
130 worker.on('message', (message: WorkerMessage<WorkerData>) => {
131 if (message.event === WorkerMessageEvents.startedWorkerElement) {
bdb50f5a 132 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
c26984f2
JB
133 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
134 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
135 }
136 });
769d3b10 137 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
962a8159 138 worker.on('error', (error) => {
fac8866f 139 this.emitter?.emit(WorkerSetEvents.error, error);
c81424b8
JB
140 if (
141 this.workerOptions.poolOptions?.restartWorkerOnError &&
142 this.started &&
143 !this.workerStartup
144 ) {
29bb4dee
JB
145 this.addWorkerSetElement();
146 }
962a8159 147 });
769d3b10
JB
148 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
149 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
dbc29904 150 worker.once('exit', () =>
e1d9a0f4 151 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
dbc29904 152 );
962a8159
JB
153 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
154 this.workerSet.add(workerSetElement);
3fb26594 155 this.workerStartup = false;
962a8159 156 return workerSetElement;
6013bc53
JB
157 }
158
dbc29904
JB
159 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
160 this.workerSet.delete(workerSetElement);
161 }
162
962a8159 163 private async getWorkerSetElement(): Promise<WorkerSetElement> {
e1d9a0f4 164 let chosenWorkerSetElement: WorkerSetElement | undefined;
962a8159 165 for (const workerSetElement of this.workerSet) {
e1d9a0f4 166 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
962a8159
JB
167 chosenWorkerSetElement = workerSetElement;
168 break;
169 }
e7aeea18 170 }
962a8159
JB
171 if (!chosenWorkerSetElement) {
172 chosenWorkerSetElement = this.addWorkerSetElement();
173 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4 174 this.workerOptions.workerStartDelay! > 0 &&
ab93b184 175 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
962a8159
JB
176 }
177 return chosenWorkerSetElement;
c045d9a9
JB
178 }
179
962a8159 180 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
e1d9a0f4 181 let workerSetElt: WorkerSetElement | undefined;
0e7a11e1 182 for (const workerSetElement of this.workerSet) {
81696bd5 183 if (workerSetElement.worker.threadId === worker.threadId) {
1e924543 184 workerSetElt = workerSetElement;
0e7a11e1 185 break;
1e924543 186 }
0e7a11e1 187 }
1e924543
JB
188 return workerSetElt;
189 }
6013bc53 190}