refactor: add sanity checks to worker set arguments
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import type { ThreadPoolOptions } from 'poolifier';
7
8 import { WorkerAbstract } from './WorkerAbstract';
9 import { WorkerConstants } from './WorkerConstants';
10 import {
11 type SetInfo,
12 type WorkerData,
13 type WorkerMessage,
14 WorkerMessageEvents,
15 type WorkerOptions,
16 type WorkerSetElement,
17 WorkerSetEvents,
18 } from './WorkerTypes';
19 import { sleep } from './WorkerUtils';
20
21 const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
22 enableEvents: true,
23 restartWorkerOnError: true,
24 };
25
26 export class WorkerSet extends WorkerAbstract<WorkerData> {
27 public readonly emitter!: EventEmitter;
28 private readonly workerSet: Set<WorkerSetElement>;
29
30 /**
31 * Creates a new `WorkerSet`.
32 *
33 * @param workerScript -
34 * @param workerOptions -
35 */
36 constructor(workerScript: string, workerOptions: WorkerOptions) {
37 super(workerScript, workerOptions);
38 if (
39 this.workerOptions.elementsPerWorker === null ||
40 this.workerOptions.elementsPerWorker === undefined
41 ) {
42 throw new TypeError('Elements per worker is not defined');
43 }
44 if (Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
45 throw new TypeError('Elements per worker must be an integer');
46 }
47 if (this.workerOptions.elementsPerWorker <= 0) {
48 throw new RangeError('Elements per worker must be greater than zero');
49 }
50 this.workerOptions.poolOptions = {
51 ...DEFAULT_POOL_OPTIONS,
52 ...this.workerOptions.poolOptions,
53 };
54 this.workerSet = new Set<WorkerSetElement>();
55 if (this.workerOptions.poolOptions?.enableEvents) {
56 this.emitter = new EventEmitter();
57 }
58 }
59
60 get info(): SetInfo {
61 return {
62 version: WorkerConstants.version,
63 type: 'set',
64 worker: 'thread',
65 size: this.size,
66 elementsExecuting: [...this.workerSet].reduce(
67 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
68 0,
69 ),
70 elementsPerWorker: this.maxElementsPerWorker!,
71 };
72 }
73
74 get size(): number {
75 return this.workerSet.size;
76 }
77
78 get maxElementsPerWorker(): number | undefined {
79 return this.workerOptions.elementsPerWorker;
80 }
81
82 /** @inheritDoc */
83 public async start(): Promise<void> {
84 this.addWorkerSetElement();
85 // Add worker set element sequentially to optimize memory at startup
86 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
87 }
88
89 /** @inheritDoc */
90 public async stop(): Promise<void> {
91 for (const workerSetElement of this.workerSet) {
92 const worker = workerSetElement.worker;
93 const waitWorkerExit = new Promise<void>((resolve) => {
94 worker.on('exit', () => {
95 resolve();
96 });
97 });
98 await worker.terminate();
99 await waitWorkerExit;
100 }
101 }
102
103 /** @inheritDoc */
104 public async addElement(elementData: WorkerData): Promise<void> {
105 if (!this.workerSet) {
106 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
107 }
108 const workerSetElement = await this.getWorkerSetElement();
109 workerSetElement.worker.postMessage({
110 event: WorkerMessageEvents.startWorkerElement,
111 data: elementData,
112 });
113 ++workerSetElement.numberOfWorkerElements;
114 // Add element sequentially to optimize memory at startup
115 if (this.workerOptions.elementStartDelay! > 0) {
116 await sleep(this.workerOptions.elementStartDelay!);
117 }
118 }
119
120 /**
121 * Adds a new `WorkerSetElement`.
122 */
123 private addWorkerSetElement(): WorkerSetElement {
124 const worker = new Worker(this.workerScript, {
125 env: SHARE_ENV,
126 ...this.workerOptions.poolOptions?.workerOptions,
127 });
128 worker.on(
129 'message',
130 this.workerOptions.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
131 );
132 worker.on('message', (message: WorkerMessage<WorkerData>) => {
133 if (message.event === WorkerMessageEvents.startedWorkerElement) {
134 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
135 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
136 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
137 }
138 });
139 worker.on(
140 'error',
141 this.workerOptions.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
142 );
143 worker.on('error', (error) => {
144 this.emitter?.emit(WorkerSetEvents.error, error);
145 if (this.workerOptions.poolOptions?.restartWorkerOnError) {
146 this.addWorkerSetElement();
147 }
148 });
149 worker.on(
150 'online',
151 this.workerOptions.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
152 );
153 worker.on(
154 'exit',
155 this.workerOptions.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
156 );
157 worker.once('exit', () =>
158 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
159 );
160 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
161 this.workerSet.add(workerSetElement);
162 return workerSetElement;
163 }
164
165 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
166 this.workerSet.delete(workerSetElement);
167 }
168
169 private async getWorkerSetElement(): Promise<WorkerSetElement> {
170 let chosenWorkerSetElement: WorkerSetElement | undefined;
171 for (const workerSetElement of this.workerSet) {
172 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
173 chosenWorkerSetElement = workerSetElement;
174 break;
175 }
176 }
177 if (!chosenWorkerSetElement) {
178 chosenWorkerSetElement = this.addWorkerSetElement();
179 // Add worker set element sequentially to optimize memory at startup
180 this.workerOptions.workerStartDelay! > 0 &&
181 (await sleep(this.workerOptions.workerStartDelay!));
182 }
183 return chosenWorkerSetElement;
184 }
185
186 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
187 let workerSetElt: WorkerSetElement | undefined;
188 for (const workerSetElement of this.workerSet) {
189 if (workerSetElement.worker.threadId === worker.threadId) {
190 workerSetElt = workerSetElement;
191 break;
192 }
193 }
194 return workerSetElt;
195 }
196 }