refactor: cleanup pool options defaults handling
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
d972af76 3import { EventEmitter } from 'node:events';
be245fda 4import { SHARE_ENV, Worker } from 'node:worker_threads';
c045d9a9 5
268a74bb 6import { WorkerAbstract } from './WorkerAbstract';
c81424b8 7import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
0e4fa348 8import {
b779c0f8 9 type SetInfo,
0e4fa348 10 type WorkerData,
c26984f2 11 type WorkerMessage,
0e4fa348
JB
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
810f4caf 15 WorkerSetEvents,
268a74bb 16} from './WorkerTypes';
be245fda 17import { sleep } from './WorkerUtils';
6013bc53 18
268a74bb 19export class WorkerSet extends WorkerAbstract<WorkerData> {
a37fc6dc 20 public readonly emitter!: EventEmitter;
f2bf9948 21 private readonly workerSet: Set<WorkerSetElement>;
c81424b8 22 private started: boolean;
156c5f4e 23 private workerStartup: boolean;
6013bc53
JB
24
25 /**
361c98f5 26 * Creates a new `WorkerSet`.
6013bc53 27 *
0e4fa348
JB
28 * @param workerScript -
29 * @param workerOptions -
6013bc53 30 */
4a3807d1 31 constructor(workerScript: string, workerOptions: WorkerOptions) {
4d7227e6 32 super(workerScript, workerOptions);
81027aa5
JB
33 if (
34 this.workerOptions.elementsPerWorker === null ||
35 this.workerOptions.elementsPerWorker === undefined
36 ) {
37 throw new TypeError('Elements per worker is not defined');
38 }
f028efca 39 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
81027aa5
JB
40 throw new TypeError('Elements per worker must be an integer');
41 }
42 if (this.workerOptions.elementsPerWorker <= 0) {
43 throw new RangeError('Elements per worker must be greater than zero');
44 }
ffd71f2c 45 this.workerSet = new Set<WorkerSetElement>();
a37fc6dc 46 if (this.workerOptions.poolOptions?.enableEvents) {
d972af76 47 this.emitter = new EventEmitter();
29bb4dee 48 }
c81424b8 49 this.started = false;
156c5f4e 50 this.workerStartup = false;
6013bc53
JB
51 }
52
b779c0f8
JB
53 get info(): SetInfo {
54 return {
769d3b10 55 version: workerSetVersion,
0bde1ea1
JB
56 type: 'set',
57 worker: 'thread',
b779c0f8 58 size: this.size,
19bdf4ca 59 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 60 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
5edd8ba0 61 0,
b779c0f8 62 ),
e1d9a0f4 63 elementsPerWorker: this.maxElementsPerWorker!,
b779c0f8
JB
64 };
65 }
66
6013bc53 67 get size(): number {
ded13d97 68 return this.workerSet.size;
6013bc53
JB
69 }
70
72092cfc 71 get maxElementsPerWorker(): number | undefined {
4d7227e6
JB
72 return this.workerOptions.elementsPerWorker;
73 }
74
b0dee778
JB
75 /** @inheritDoc */
76 public async start(): Promise<void> {
77 this.addWorkerSetElement();
78 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4 79 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
c81424b8 80 this.started = true;
b0dee778
JB
81 }
82
83 /** @inheritDoc */
84 public async stop(): Promise<void> {
85 for (const workerSetElement of this.workerSet) {
b3ded6ae 86 const worker = workerSetElement.worker;
bd62e88f 87 const waitWorkerExit = new Promise<void>((resolve) => {
b3ded6ae 88 worker.on('exit', () => {
dbc29904
JB
89 resolve();
90 });
91 });
b3ded6ae 92 await worker.terminate();
bd62e88f 93 await waitWorkerExit;
c81424b8 94 this.started = false;
b0dee778 95 }
b0dee778
JB
96 }
97
8baf3f8f 98 /** @inheritDoc */
c3ee95af 99 public async addElement(elementData: WorkerData): Promise<void> {
c81424b8
JB
100 if (!this.started) {
101 throw new Error('Cannot add a WorkerSet element: not started');
102 }
ded13d97 103 if (!this.workerSet) {
c81424b8 104 throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
6013bc53 105 }
962a8159
JB
106 const workerSetElement = await this.getWorkerSetElement();
107 workerSetElement.worker.postMessage({
2bb7a73e 108 event: WorkerMessageEvents.startWorkerElement,
e7aeea18 109 data: elementData,
d070d967 110 });
962a8159
JB
111 ++workerSetElement.numberOfWorkerElements;
112 // Add element sequentially to optimize memory at startup
e1d9a0f4
JB
113 if (this.workerOptions.elementStartDelay! > 0) {
114 await sleep(this.workerOptions.elementStartDelay!);
d070d967 115 }
6013bc53
JB
116 }
117
6013bc53 118 /**
361c98f5 119 * Adds a new `WorkerSetElement`.
6013bc53 120 */
962a8159 121 private addWorkerSetElement(): WorkerSetElement {
156c5f4e 122 this.workerStartup = true;
be245fda
JB
123 const worker = new Worker(this.workerScript, {
124 env: SHARE_ENV,
e1d9a0f4 125 ...this.workerOptions.poolOptions?.workerOptions,
be245fda 126 });
769d3b10 127 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
c26984f2
JB
128 worker.on('message', (message: WorkerMessage<WorkerData>) => {
129 if (message.event === WorkerMessageEvents.startedWorkerElement) {
bdb50f5a 130 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
c26984f2
JB
131 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
132 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
133 }
134 });
769d3b10 135 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
962a8159 136 worker.on('error', (error) => {
fac8866f 137 this.emitter?.emit(WorkerSetEvents.error, error);
c81424b8
JB
138 if (
139 this.workerOptions.poolOptions?.restartWorkerOnError &&
140 this.started &&
141 !this.workerStartup
142 ) {
29bb4dee
JB
143 this.addWorkerSetElement();
144 }
962a8159 145 });
769d3b10
JB
146 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
147 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
dbc29904 148 worker.once('exit', () =>
e1d9a0f4 149 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
dbc29904 150 );
962a8159
JB
151 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
152 this.workerSet.add(workerSetElement);
3fb26594 153 this.workerStartup = false;
962a8159 154 return workerSetElement;
6013bc53
JB
155 }
156
dbc29904
JB
157 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
158 this.workerSet.delete(workerSetElement);
159 }
160
962a8159 161 private async getWorkerSetElement(): Promise<WorkerSetElement> {
e1d9a0f4 162 let chosenWorkerSetElement: WorkerSetElement | undefined;
962a8159 163 for (const workerSetElement of this.workerSet) {
e1d9a0f4 164 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
962a8159
JB
165 chosenWorkerSetElement = workerSetElement;
166 break;
167 }
e7aeea18 168 }
962a8159
JB
169 if (!chosenWorkerSetElement) {
170 chosenWorkerSetElement = this.addWorkerSetElement();
171 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4
JB
172 this.workerOptions.workerStartDelay! > 0 &&
173 (await sleep(this.workerOptions.workerStartDelay!));
962a8159
JB
174 }
175 return chosenWorkerSetElement;
c045d9a9
JB
176 }
177
962a8159 178 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
e1d9a0f4 179 let workerSetElt: WorkerSetElement | undefined;
0e7a11e1 180 for (const workerSetElement of this.workerSet) {
81696bd5 181 if (workerSetElement.worker.threadId === worker.threadId) {
1e924543 182 workerSetElt = workerSetElement;
0e7a11e1 183 break;
1e924543 184 }
0e7a11e1 185 }
1e924543
JB
186 return workerSetElt;
187 }
6013bc53 188}