build(deps): apply updates
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
962a8159 3import EventEmitterAsyncResource from 'node:events';
01f4001e 4import { Worker } from 'node:worker_threads';
c045d9a9 5
268a74bb 6import { WorkerAbstract } from './WorkerAbstract';
59b6ed8d 7import { WorkerConstants } from './WorkerConstants';
0e4fa348
JB
8import {
9 type MessageHandler,
10 type WorkerData,
11 WorkerMessageEvents,
12 type WorkerOptions,
13 type WorkerSetElement,
268a74bb 14} from './WorkerTypes';
789871d6 15import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils';
6013bc53 16
268a74bb 17export class WorkerSet extends WorkerAbstract<WorkerData> {
962a8159 18 public readonly emitter: EventEmitterAsyncResource;
f2bf9948 19 private readonly workerSet: Set<WorkerSetElement>;
6013bc53
JB
20
21 /**
22 * Create a new `WorkerSet`.
23 *
0e4fa348
JB
24 * @param workerScript -
25 * @param workerOptions -
6013bc53 26 */
4d7227e6
JB
27 constructor(workerScript: string, workerOptions?: WorkerOptions) {
28 super(workerScript, workerOptions);
ffd71f2c 29 this.workerSet = new Set<WorkerSetElement>();
962a8159 30 this.emitter = new EventEmitterAsyncResource();
6013bc53
JB
31 }
32
33 get size(): number {
ded13d97 34 return this.workerSet.size;
6013bc53
JB
35 }
36
72092cfc 37 get maxElementsPerWorker(): number | undefined {
4d7227e6
JB
38 return this.workerOptions.elementsPerWorker;
39 }
40
8baf3f8f 41 /** @inheritDoc */
c3ee95af 42 public async addElement(elementData: WorkerData): Promise<void> {
ded13d97 43 if (!this.workerSet) {
e7aeea18 44 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
6013bc53 45 }
962a8159
JB
46 const workerSetElement = await this.getWorkerSetElement();
47 workerSetElement.worker.postMessage({
721646e9 48 id: WorkerMessageEvents.startWorkerElement,
e7aeea18 49 data: elementData,
d070d967 50 });
962a8159
JB
51 ++workerSetElement.numberOfWorkerElements;
52 // Add element sequentially to optimize memory at startup
d070d967 53 if (this.workerOptions.elementStartDelay > 0) {
789871d6 54 await sleep(this.workerOptions.elementStartDelay);
d070d967 55 }
6013bc53
JB
56 }
57
8baf3f8f 58 /** @inheritDoc */
6013bc53 59 public async start(): Promise<void> {
962a8159
JB
60 this.addWorkerSetElement();
61 // Add worker set element sequentially to optimize memory at startup
8baf3f8f 62 this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
6013bc53
JB
63 }
64
8baf3f8f 65 /** @inheritDoc */
ded13d97
JB
66 public async stop(): Promise<void> {
67 for (const workerSetElement of this.workerSet) {
68 await workerSetElement.worker.terminate();
69 }
70 this.workerSet.clear();
71 }
72
6013bc53 73 /**
962a8159 74 * Add a new `WorkerSetElement`.
6013bc53 75 */
962a8159 76 private addWorkerSetElement(): WorkerSetElement {
a4624c96 77 const worker = new Worker(this.workerScript);
0e4fa348
JB
78 worker.on(
79 'message',
59b6ed8d
JB
80 (this.workerOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION).bind(
81 this
82 ) as MessageHandler<Worker>
0e4fa348 83 );
789871d6 84 worker.on('error', defaultErrorHandler.bind(this) as (err: Error) => void);
962a8159
JB
85 worker.on('error', (error) => {
86 this.emitter.emit('error', error);
87 this.addWorkerSetElement();
88 });
8baf3f8f
JB
89 worker.on('exit', defaultExitHandler.bind(this) as (exitCode: number) => void);
90 worker.on('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker)));
962a8159
JB
91 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
92 this.workerSet.add(workerSetElement);
93 return workerSetElement;
6013bc53
JB
94 }
95
962a8159
JB
96 private async getWorkerSetElement(): Promise<WorkerSetElement> {
97 let chosenWorkerSetElement: WorkerSetElement;
98 for (const workerSetElement of this.workerSet) {
99 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker) {
100 chosenWorkerSetElement = workerSetElement;
101 break;
102 }
e7aeea18 103 }
962a8159
JB
104 if (!chosenWorkerSetElement) {
105 chosenWorkerSetElement = this.addWorkerSetElement();
106 // Add worker set element sequentially to optimize memory at startup
107 this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
108 }
109 return chosenWorkerSetElement;
c045d9a9
JB
110 }
111
962a8159 112 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
1e924543 113 let workerSetElt: WorkerSetElement;
0e7a11e1 114 for (const workerSetElement of this.workerSet) {
81696bd5 115 if (workerSetElement.worker.threadId === worker.threadId) {
1e924543 116 workerSetElt = workerSetElement;
0e7a11e1 117 break;
1e924543 118 }
0e7a11e1 119 }
1e924543
JB
120 return workerSetElt;
121 }
6013bc53 122}