build(deps-dev): apply updates
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
d972af76 3import { EventEmitter } from 'node:events';
be245fda 4import { SHARE_ENV, Worker } from 'node:worker_threads';
c045d9a9 5
4a3807d1
JB
6import type { ThreadPoolOptions } from 'poolifier';
7
268a74bb 8import { WorkerAbstract } from './WorkerAbstract';
59b6ed8d 9import { WorkerConstants } from './WorkerConstants';
0e4fa348 10import {
b779c0f8 11 type SetInfo,
0e4fa348 12 type WorkerData,
c26984f2 13 type WorkerMessage,
0e4fa348
JB
14 WorkerMessageEvents,
15 type WorkerOptions,
16 type WorkerSetElement,
810f4caf 17 WorkerSetEvents,
268a74bb 18} from './WorkerTypes';
be245fda 19import { sleep } from './WorkerUtils';
6013bc53 20
4a3807d1
JB
21const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
22 enableEvents: true,
23 restartWorkerOnError: true,
24};
25
268a74bb 26export class WorkerSet extends WorkerAbstract<WorkerData> {
a37fc6dc 27 public readonly emitter!: EventEmitter;
f2bf9948 28 private readonly workerSet: Set<WorkerSetElement>;
6013bc53
JB
29
30 /**
361c98f5 31 * Creates a new `WorkerSet`.
6013bc53 32 *
0e4fa348
JB
33 * @param workerScript -
34 * @param workerOptions -
6013bc53 35 */
4a3807d1 36 constructor(workerScript: string, workerOptions: WorkerOptions) {
4d7227e6 37 super(workerScript, workerOptions);
81027aa5
JB
38 if (
39 this.workerOptions.elementsPerWorker === null ||
40 this.workerOptions.elementsPerWorker === undefined
41 ) {
42 throw new TypeError('Elements per worker is not defined');
43 }
44 if (Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
45 throw new TypeError('Elements per worker must be an integer');
46 }
47 if (this.workerOptions.elementsPerWorker <= 0) {
48 throw new RangeError('Elements per worker must be greater than zero');
49 }
29bb4dee 50 this.workerOptions.poolOptions = {
4a3807d1 51 ...DEFAULT_POOL_OPTIONS,
29bb4dee
JB
52 ...this.workerOptions.poolOptions,
53 };
ffd71f2c 54 this.workerSet = new Set<WorkerSetElement>();
a37fc6dc 55 if (this.workerOptions.poolOptions?.enableEvents) {
d972af76 56 this.emitter = new EventEmitter();
29bb4dee 57 }
6013bc53
JB
58 }
59
b779c0f8
JB
60 get info(): SetInfo {
61 return {
628c30e5 62 version: WorkerConstants.version,
0bde1ea1
JB
63 type: 'set',
64 worker: 'thread',
b779c0f8 65 size: this.size,
19bdf4ca 66 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 67 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
5edd8ba0 68 0,
b779c0f8 69 ),
e1d9a0f4 70 elementsPerWorker: this.maxElementsPerWorker!,
b779c0f8
JB
71 };
72 }
73
6013bc53 74 get size(): number {
ded13d97 75 return this.workerSet.size;
6013bc53
JB
76 }
77
72092cfc 78 get maxElementsPerWorker(): number | undefined {
4d7227e6
JB
79 return this.workerOptions.elementsPerWorker;
80 }
81
b0dee778
JB
82 /** @inheritDoc */
83 public async start(): Promise<void> {
84 this.addWorkerSetElement();
85 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4 86 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
b0dee778
JB
87 }
88
89 /** @inheritDoc */
90 public async stop(): Promise<void> {
91 for (const workerSetElement of this.workerSet) {
b3ded6ae 92 const worker = workerSetElement.worker;
bd62e88f 93 const waitWorkerExit = new Promise<void>((resolve) => {
b3ded6ae 94 worker.on('exit', () => {
dbc29904
JB
95 resolve();
96 });
97 });
b3ded6ae 98 await worker.terminate();
bd62e88f 99 await waitWorkerExit;
b0dee778 100 }
b0dee778
JB
101 }
102
8baf3f8f 103 /** @inheritDoc */
c3ee95af 104 public async addElement(elementData: WorkerData): Promise<void> {
ded13d97 105 if (!this.workerSet) {
e7aeea18 106 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
6013bc53 107 }
962a8159
JB
108 const workerSetElement = await this.getWorkerSetElement();
109 workerSetElement.worker.postMessage({
2bb7a73e 110 event: WorkerMessageEvents.startWorkerElement,
e7aeea18 111 data: elementData,
d070d967 112 });
962a8159
JB
113 ++workerSetElement.numberOfWorkerElements;
114 // Add element sequentially to optimize memory at startup
e1d9a0f4
JB
115 if (this.workerOptions.elementStartDelay! > 0) {
116 await sleep(this.workerOptions.elementStartDelay!);
d070d967 117 }
6013bc53
JB
118 }
119
6013bc53 120 /**
361c98f5 121 * Adds a new `WorkerSetElement`.
6013bc53 122 */
962a8159 123 private addWorkerSetElement(): WorkerSetElement {
be245fda
JB
124 const worker = new Worker(this.workerScript, {
125 env: SHARE_ENV,
e1d9a0f4 126 ...this.workerOptions.poolOptions?.workerOptions,
be245fda 127 });
0e4fa348
JB
128 worker.on(
129 'message',
a37fc6dc 130 this.workerOptions.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
be245fda 131 );
c26984f2
JB
132 worker.on('message', (message: WorkerMessage<WorkerData>) => {
133 if (message.event === WorkerMessageEvents.startedWorkerElement) {
bdb50f5a 134 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
c26984f2
JB
135 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
136 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
137 }
138 });
be245fda
JB
139 worker.on(
140 'error',
a37fc6dc 141 this.workerOptions.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
0e4fa348 142 );
962a8159 143 worker.on('error', (error) => {
fac8866f 144 this.emitter?.emit(WorkerSetEvents.error, error);
a37fc6dc 145 if (this.workerOptions.poolOptions?.restartWorkerOnError) {
29bb4dee
JB
146 this.addWorkerSetElement();
147 }
962a8159 148 });
be245fda
JB
149 worker.on(
150 'online',
a37fc6dc 151 this.workerOptions.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
be245fda
JB
152 );
153 worker.on(
154 'exit',
a37fc6dc 155 this.workerOptions.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
be245fda 156 );
dbc29904 157 worker.once('exit', () =>
e1d9a0f4 158 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
dbc29904 159 );
962a8159
JB
160 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
161 this.workerSet.add(workerSetElement);
162 return workerSetElement;
6013bc53
JB
163 }
164
dbc29904
JB
165 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
166 this.workerSet.delete(workerSetElement);
167 }
168
962a8159 169 private async getWorkerSetElement(): Promise<WorkerSetElement> {
e1d9a0f4 170 let chosenWorkerSetElement: WorkerSetElement | undefined;
962a8159 171 for (const workerSetElement of this.workerSet) {
e1d9a0f4 172 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
962a8159
JB
173 chosenWorkerSetElement = workerSetElement;
174 break;
175 }
e7aeea18 176 }
962a8159
JB
177 if (!chosenWorkerSetElement) {
178 chosenWorkerSetElement = this.addWorkerSetElement();
179 // Add worker set element sequentially to optimize memory at startup
e1d9a0f4
JB
180 this.workerOptions.workerStartDelay! > 0 &&
181 (await sleep(this.workerOptions.workerStartDelay!));
962a8159
JB
182 }
183 return chosenWorkerSetElement;
c045d9a9
JB
184 }
185
962a8159 186 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
e1d9a0f4 187 let workerSetElt: WorkerSetElement | undefined;
0e7a11e1 188 for (const workerSetElement of this.workerSet) {
81696bd5 189 if (workerSetElement.worker.threadId === worker.threadId) {
1e924543 190 workerSetElt = workerSetElement;
0e7a11e1 191 break;
1e924543 192 }
0e7a11e1 193 }
1e924543
JB
194 return workerSetElt;
195 }
6013bc53 196}