refactor: unflag worker startup at the right time on worker set
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import { WorkerAbstract } from './WorkerAbstract';
7 import { DEFAULT_POOL_OPTIONS, EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents,
16 } from './WorkerTypes';
17 import { sleep } from './WorkerUtils';
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter!: EventEmitter;
21 private readonly workerSet: Set<WorkerSetElement>;
22 private workerStartup: boolean;
23
24 /**
25 * Creates a new `WorkerSet`.
26 *
27 * @param workerScript -
28 * @param workerOptions -
29 */
30 constructor(workerScript: string, workerOptions: WorkerOptions) {
31 super(workerScript, workerOptions);
32 if (
33 this.workerOptions.elementsPerWorker === null ||
34 this.workerOptions.elementsPerWorker === undefined
35 ) {
36 throw new TypeError('Elements per worker is not defined');
37 }
38 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
39 throw new TypeError('Elements per worker must be an integer');
40 }
41 if (this.workerOptions.elementsPerWorker <= 0) {
42 throw new RangeError('Elements per worker must be greater than zero');
43 }
44 this.workerOptions.poolOptions = {
45 ...DEFAULT_POOL_OPTIONS,
46 ...this.workerOptions.poolOptions,
47 };
48 this.workerSet = new Set<WorkerSetElement>();
49 if (this.workerOptions.poolOptions?.enableEvents) {
50 this.emitter = new EventEmitter();
51 }
52 this.workerStartup = false;
53 }
54
55 get info(): SetInfo {
56 return {
57 version: workerSetVersion,
58 type: 'set',
59 worker: 'thread',
60 size: this.size,
61 elementsExecuting: [...this.workerSet].reduce(
62 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
63 0,
64 ),
65 elementsPerWorker: this.maxElementsPerWorker!,
66 };
67 }
68
69 get size(): number {
70 return this.workerSet.size;
71 }
72
73 get maxElementsPerWorker(): number | undefined {
74 return this.workerOptions.elementsPerWorker;
75 }
76
77 /** @inheritDoc */
78 public async start(): Promise<void> {
79 this.addWorkerSetElement();
80 // Add worker set element sequentially to optimize memory at startup
81 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
82 }
83
84 /** @inheritDoc */
85 public async stop(): Promise<void> {
86 for (const workerSetElement of this.workerSet) {
87 const worker = workerSetElement.worker;
88 const waitWorkerExit = new Promise<void>((resolve) => {
89 worker.on('exit', () => {
90 resolve();
91 });
92 });
93 await worker.terminate();
94 await waitWorkerExit;
95 }
96 }
97
98 /** @inheritDoc */
99 public async addElement(elementData: WorkerData): Promise<void> {
100 if (!this.workerSet) {
101 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
102 }
103 const workerSetElement = await this.getWorkerSetElement();
104 workerSetElement.worker.postMessage({
105 event: WorkerMessageEvents.startWorkerElement,
106 data: elementData,
107 });
108 ++workerSetElement.numberOfWorkerElements;
109 // Add element sequentially to optimize memory at startup
110 if (this.workerOptions.elementStartDelay! > 0) {
111 await sleep(this.workerOptions.elementStartDelay!);
112 }
113 }
114
115 /**
116 * Adds a new `WorkerSetElement`.
117 */
118 private addWorkerSetElement(): WorkerSetElement {
119 this.workerStartup = true;
120 const worker = new Worker(this.workerScript, {
121 env: SHARE_ENV,
122 ...this.workerOptions.poolOptions?.workerOptions,
123 });
124 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
125 worker.on('message', (message: WorkerMessage<WorkerData>) => {
126 if (message.event === WorkerMessageEvents.startedWorkerElement) {
127 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
128 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
129 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
130 }
131 });
132 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
133 worker.on('error', (error) => {
134 this.emitter?.emit(WorkerSetEvents.error, error);
135 if (this.workerOptions.poolOptions?.restartWorkerOnError && !this.workerStartup) {
136 this.addWorkerSetElement();
137 }
138 });
139 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
140 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
141 worker.once('exit', () =>
142 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
143 );
144 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
145 this.workerSet.add(workerSetElement);
146 this.workerStartup = false;
147 return workerSetElement;
148 }
149
150 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
151 this.workerSet.delete(workerSetElement);
152 }
153
154 private async getWorkerSetElement(): Promise<WorkerSetElement> {
155 let chosenWorkerSetElement: WorkerSetElement | undefined;
156 for (const workerSetElement of this.workerSet) {
157 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
158 chosenWorkerSetElement = workerSetElement;
159 break;
160 }
161 }
162 if (!chosenWorkerSetElement) {
163 chosenWorkerSetElement = this.addWorkerSetElement();
164 // Add worker set element sequentially to optimize memory at startup
165 this.workerOptions.workerStartDelay! > 0 &&
166 (await sleep(this.workerOptions.workerStartDelay!));
167 }
168 return chosenWorkerSetElement;
169 }
170
171 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
172 let workerSetElt: WorkerSetElement | undefined;
173 for (const workerSetElement of this.workerSet) {
174 if (workerSetElement.worker.threadId === worker.threadId) {
175 workerSetElt = workerSetElement;
176 break;
177 }
178 }
179 return workerSetElt;
180 }
181 }