refactor: split WorkerConstants class
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import { WorkerAbstract } from './WorkerAbstract';
7 import { DEFAULT_POOL_OPTIONS, EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents,
16 } from './WorkerTypes';
17 import { sleep } from './WorkerUtils';
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter!: EventEmitter;
21 private readonly workerSet: Set<WorkerSetElement>;
22
23 /**
24 * Creates a new `WorkerSet`.
25 *
26 * @param workerScript -
27 * @param workerOptions -
28 */
29 constructor(workerScript: string, workerOptions: WorkerOptions) {
30 super(workerScript, workerOptions);
31 if (
32 this.workerOptions.elementsPerWorker === null ||
33 this.workerOptions.elementsPerWorker === undefined
34 ) {
35 throw new TypeError('Elements per worker is not defined');
36 }
37 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
38 throw new TypeError('Elements per worker must be an integer');
39 }
40 if (this.workerOptions.elementsPerWorker <= 0) {
41 throw new RangeError('Elements per worker must be greater than zero');
42 }
43 this.workerOptions.poolOptions = {
44 ...DEFAULT_POOL_OPTIONS,
45 ...this.workerOptions.poolOptions,
46 };
47 this.workerSet = new Set<WorkerSetElement>();
48 if (this.workerOptions.poolOptions?.enableEvents) {
49 this.emitter = new EventEmitter();
50 }
51 }
52
53 get info(): SetInfo {
54 return {
55 version: workerSetVersion,
56 type: 'set',
57 worker: 'thread',
58 size: this.size,
59 elementsExecuting: [...this.workerSet].reduce(
60 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
61 0,
62 ),
63 elementsPerWorker: this.maxElementsPerWorker!,
64 };
65 }
66
67 get size(): number {
68 return this.workerSet.size;
69 }
70
71 get maxElementsPerWorker(): number | undefined {
72 return this.workerOptions.elementsPerWorker;
73 }
74
75 /** @inheritDoc */
76 public async start(): Promise<void> {
77 this.addWorkerSetElement();
78 // Add worker set element sequentially to optimize memory at startup
79 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
80 }
81
82 /** @inheritDoc */
83 public async stop(): Promise<void> {
84 for (const workerSetElement of this.workerSet) {
85 const worker = workerSetElement.worker;
86 const waitWorkerExit = new Promise<void>((resolve) => {
87 worker.on('exit', () => {
88 resolve();
89 });
90 });
91 await worker.terminate();
92 await waitWorkerExit;
93 }
94 }
95
96 /** @inheritDoc */
97 public async addElement(elementData: WorkerData): Promise<void> {
98 if (!this.workerSet) {
99 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
100 }
101 const workerSetElement = await this.getWorkerSetElement();
102 workerSetElement.worker.postMessage({
103 event: WorkerMessageEvents.startWorkerElement,
104 data: elementData,
105 });
106 ++workerSetElement.numberOfWorkerElements;
107 // Add element sequentially to optimize memory at startup
108 if (this.workerOptions.elementStartDelay! > 0) {
109 await sleep(this.workerOptions.elementStartDelay!);
110 }
111 }
112
113 /**
114 * Adds a new `WorkerSetElement`.
115 */
116 private addWorkerSetElement(): WorkerSetElement {
117 const worker = new Worker(this.workerScript, {
118 env: SHARE_ENV,
119 ...this.workerOptions.poolOptions?.workerOptions,
120 });
121 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
122 worker.on('message', (message: WorkerMessage<WorkerData>) => {
123 if (message.event === WorkerMessageEvents.startedWorkerElement) {
124 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
125 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
126 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
127 }
128 });
129 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
130 worker.on('error', (error) => {
131 this.emitter?.emit(WorkerSetEvents.error, error);
132 if (this.workerOptions.poolOptions?.restartWorkerOnError) {
133 this.addWorkerSetElement();
134 }
135 });
136 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
137 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
138 worker.once('exit', () =>
139 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
140 );
141 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
142 this.workerSet.add(workerSetElement);
143 return workerSetElement;
144 }
145
146 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
147 this.workerSet.delete(workerSetElement);
148 }
149
150 private async getWorkerSetElement(): Promise<WorkerSetElement> {
151 let chosenWorkerSetElement: WorkerSetElement | undefined;
152 for (const workerSetElement of this.workerSet) {
153 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
154 chosenWorkerSetElement = workerSetElement;
155 break;
156 }
157 }
158 if (!chosenWorkerSetElement) {
159 chosenWorkerSetElement = this.addWorkerSetElement();
160 // Add worker set element sequentially to optimize memory at startup
161 this.workerOptions.workerStartDelay! > 0 &&
162 (await sleep(this.workerOptions.workerStartDelay!));
163 }
164 return chosenWorkerSetElement;
165 }
166
167 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
168 let workerSetElt: WorkerSetElement | undefined;
169 for (const workerSetElement of this.workerSet) {
170 if (workerSetElement.worker.threadId === worker.threadId) {
171 workerSetElt = workerSetElement;
172 break;
173 }
174 }
175 return workerSetElt;
176 }
177 }