feat: add event emitter to worker pool/set code
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import EventEmitterAsyncResource from 'node:events';
4 import { Worker } from 'node:worker_threads';
5
6 import { WorkerAbstract } from './WorkerAbstract';
7 import { WorkerConstants } from './WorkerConstants';
8 import {
9 type MessageHandler,
10 type WorkerData,
11 WorkerMessageEvents,
12 type WorkerOptions,
13 type WorkerSetElement,
14 } from './WorkerTypes';
15 import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils';
16
17 export class WorkerSet extends WorkerAbstract<WorkerData> {
18 public readonly emitter: EventEmitterAsyncResource;
19 private readonly workerSet: Set<WorkerSetElement>;
20
21 /**
22 * Create a new `WorkerSet`.
23 *
24 * @param workerScript -
25 * @param workerOptions -
26 */
27 constructor(workerScript: string, workerOptions?: WorkerOptions) {
28 super(workerScript, workerOptions);
29 this.workerSet = new Set<WorkerSetElement>();
30 this.emitter = new EventEmitterAsyncResource();
31 }
32
33 get size(): number {
34 return this.workerSet.size;
35 }
36
37 get maxElementsPerWorker(): number | undefined {
38 return this.workerOptions.elementsPerWorker;
39 }
40
41 /** @inheritDoc */
42 public async addElement(elementData: WorkerData): Promise<void> {
43 if (!this.workerSet) {
44 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
45 }
46 const workerSetElement = await this.getWorkerSetElement();
47 workerSetElement.worker.postMessage({
48 id: WorkerMessageEvents.startWorkerElement,
49 data: elementData,
50 });
51 ++workerSetElement.numberOfWorkerElements;
52 // Add element sequentially to optimize memory at startup
53 if (this.workerOptions.elementStartDelay > 0) {
54 await sleep(this.workerOptions.elementStartDelay);
55 }
56 }
57
58 /** @inheritDoc */
59 public async start(): Promise<void> {
60 this.addWorkerSetElement();
61 // Add worker set element sequentially to optimize memory at startup
62 this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
63 }
64
65 /** @inheritDoc */
66 public async stop(): Promise<void> {
67 for (const workerSetElement of this.workerSet) {
68 await workerSetElement.worker.terminate();
69 }
70 this.workerSet.clear();
71 }
72
73 /**
74 * Add a new `WorkerSetElement`.
75 */
76 private addWorkerSetElement(): WorkerSetElement {
77 const worker = new Worker(this.workerScript);
78 worker.on(
79 'message',
80 (this.workerOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION).bind(
81 this
82 ) as MessageHandler<Worker>
83 );
84 worker.on('error', defaultErrorHandler.bind(this) as (err: Error) => void);
85 worker.on('error', (error) => {
86 this.emitter.emit('error', error);
87 this.addWorkerSetElement();
88 });
89 worker.on('exit', defaultExitHandler.bind(this) as (exitCode: number) => void);
90 worker.on('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker)));
91 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
92 this.workerSet.add(workerSetElement);
93 return workerSetElement;
94 }
95
96 private async getWorkerSetElement(): Promise<WorkerSetElement> {
97 let chosenWorkerSetElement: WorkerSetElement;
98 for (const workerSetElement of this.workerSet) {
99 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker) {
100 chosenWorkerSetElement = workerSetElement;
101 break;
102 }
103 }
104 if (!chosenWorkerSetElement) {
105 chosenWorkerSetElement = this.addWorkerSetElement();
106 // Add worker set element sequentially to optimize memory at startup
107 this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
108 }
109 return chosenWorkerSetElement;
110 }
111
112 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
113 let workerSetElt: WorkerSetElement;
114 for (const workerSetElement of this.workerSet) {
115 if (workerSetElement.worker.threadId === worker.threadId) {
116 workerSetElt = workerSetElement;
117 break;
118 }
119 }
120 return workerSetElt;
121 }
122 }