refactor(simulator): switch to named exports
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { Worker } from 'worker_threads';
4
5 import { WorkerAbstract } from './WorkerAbstract';
6 import {
7 type MessageHandler,
8 type WorkerData,
9 WorkerMessageEvents,
10 type WorkerOptions,
11 type WorkerSetElement,
12 } from './WorkerTypes';
13 import { WorkerUtils } from './WorkerUtils';
14
15 export class WorkerSet extends WorkerAbstract<WorkerData> {
16 private readonly workerSet: Set<WorkerSetElement>;
17
18 /**
19 * Create a new `WorkerSet`.
20 *
21 * @param workerScript -
22 * @param workerOptions -
23 */
24 constructor(workerScript: string, workerOptions?: WorkerOptions) {
25 super(workerScript, workerOptions);
26 this.workerSet = new Set<WorkerSetElement>();
27 }
28
29 get size(): number {
30 return this.workerSet.size;
31 }
32
33 get maxElementsPerWorker(): number | undefined {
34 return this.workerOptions.elementsPerWorker;
35 }
36
37 /**
38 *
39 * @param elementData -
40 * @returns
41 * @public
42 */
43 public async addElement(elementData: WorkerData): Promise<void> {
44 if (!this.workerSet) {
45 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
46 }
47 if (
48 this.workerSet.size === 0 ||
49 this.getLastWorkerSetElement().numberOfWorkerElements >= this.workerOptions.elementsPerWorker
50 ) {
51 await this.startWorker();
52 }
53 this.getLastWorker().postMessage({
54 id: WorkerMessageEvents.START_WORKER_ELEMENT,
55 data: elementData,
56 });
57 this.getLastWorkerSetElement().numberOfWorkerElements++;
58 // Start element sequentially to optimize memory at startup
59 if (this.workerOptions.elementStartDelay > 0) {
60 await WorkerUtils.sleep(this.workerOptions.elementStartDelay);
61 }
62 }
63
64 /**
65 *
66 * @returns
67 * @public
68 */
69 public async start(): Promise<void> {
70 await this.startWorker();
71 }
72
73 /**
74 *
75 * @returns
76 * @public
77 */
78 public async stop(): Promise<void> {
79 for (const workerSetElement of this.workerSet) {
80 await workerSetElement.worker.terminate();
81 }
82 this.workerSet.clear();
83 }
84
85 /**
86 * Start a new `Worker`.
87 */
88 private async startWorker(): Promise<void> {
89 const worker = new Worker(this.workerScript);
90 worker.on(
91 'message',
92 (
93 this.workerOptions?.messageHandler ??
94 (() => {
95 /* This is intentional */
96 })
97 ).bind(this) as MessageHandler<Worker>
98 );
99 worker.on('error', WorkerUtils.defaultErrorHandler.bind(this) as (err: Error) => void);
100 worker.on('exit', (code) => {
101 WorkerUtils.defaultExitHandler(code);
102 this.workerSet.delete(this.getWorkerSetElementByWorker(worker));
103 });
104 this.workerSet.add({ worker, numberOfWorkerElements: 0 });
105 // Start worker sequentially to optimize memory at startup
106 this.workerOptions.workerStartDelay > 0 &&
107 (await WorkerUtils.sleep(this.workerOptions.workerStartDelay));
108 }
109
110 private getLastWorkerSetElement(): WorkerSetElement {
111 let workerSetElement: WorkerSetElement;
112 for (workerSetElement of this.workerSet) {
113 /* This is intentional */
114 }
115 return workerSetElement;
116 }
117
118 private getLastWorker(): Worker {
119 return this.getLastWorkerSetElement().worker;
120 }
121
122 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement {
123 let workerSetElt: WorkerSetElement;
124 for (const workerSetElement of this.workerSet) {
125 if (workerSetElement.worker.threadId === worker.threadId) {
126 workerSetElt = workerSetElement;
127 break;
128 }
129 }
130 return workerSetElt;
131 }
132 }