fix: wait for worker exit to clean the worker set
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitter } from 'node:events';
4 import { SHARE_ENV, Worker } from 'node:worker_threads';
5
6 import type { ThreadPoolOptions } from 'poolifier';
7
8 import { WorkerAbstract } from './WorkerAbstract';
9 import { WorkerConstants } from './WorkerConstants';
10 import {
11 type SetInfo,
12 type WorkerData,
13 type WorkerMessage,
14 WorkerMessageEvents,
15 type WorkerOptions,
16 type WorkerSetElement,
17 WorkerSetEvents,
18 } from './WorkerTypes';
19 import { sleep } from './WorkerUtils';
20
21 const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
22 enableEvents: true,
23 restartWorkerOnError: true,
24 };
25
26 export class WorkerSet extends WorkerAbstract<WorkerData> {
27 public readonly emitter!: EventEmitter;
28 private readonly workerSet: Set<WorkerSetElement>;
29
30 /**
31 * Creates a new `WorkerSet`.
32 *
33 * @param workerScript -
34 * @param workerOptions -
35 */
36 constructor(workerScript: string, workerOptions: WorkerOptions) {
37 super(workerScript, workerOptions);
38 this.workerOptions.poolOptions = {
39 ...DEFAULT_POOL_OPTIONS,
40 ...this.workerOptions.poolOptions,
41 };
42 this.workerSet = new Set<WorkerSetElement>();
43 if (this.workerOptions.poolOptions?.enableEvents) {
44 this.emitter = new EventEmitter();
45 }
46 }
47
48 get info(): SetInfo {
49 return {
50 version: WorkerConstants.version,
51 type: 'set',
52 worker: 'thread',
53 size: this.size,
54 elementsExecuting: [...this.workerSet].reduce(
55 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
56 0,
57 ),
58 elementsPerWorker: this.maxElementsPerWorker!,
59 };
60 }
61
62 get size(): number {
63 return this.workerSet.size;
64 }
65
66 get maxElementsPerWorker(): number | undefined {
67 return this.workerOptions.elementsPerWorker;
68 }
69
70 /** @inheritDoc */
71 public async start(): Promise<void> {
72 this.addWorkerSetElement();
73 // Add worker set element sequentially to optimize memory at startup
74 this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
75 }
76
77 /** @inheritDoc */
78 public async stop(): Promise<void> {
79 for (const workerSetElement of this.workerSet) {
80 const worker = workerSetElement.worker;
81 const workerExitPromise = new Promise<void>((resolve) => {
82 worker.on('exit', () => {
83 resolve();
84 });
85 });
86 await worker.terminate();
87 await workerExitPromise;
88 }
89 }
90
91 /** @inheritDoc */
92 public async addElement(elementData: WorkerData): Promise<void> {
93 if (!this.workerSet) {
94 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
95 }
96 const workerSetElement = await this.getWorkerSetElement();
97 workerSetElement.worker.postMessage({
98 event: WorkerMessageEvents.startWorkerElement,
99 data: elementData,
100 });
101 ++workerSetElement.numberOfWorkerElements;
102 // Add element sequentially to optimize memory at startup
103 if (this.workerOptions.elementStartDelay! > 0) {
104 await sleep(this.workerOptions.elementStartDelay!);
105 }
106 }
107
108 /**
109 * Adds a new `WorkerSetElement`.
110 */
111 private addWorkerSetElement(): WorkerSetElement {
112 const worker = new Worker(this.workerScript, {
113 env: SHARE_ENV,
114 ...this.workerOptions.poolOptions?.workerOptions,
115 });
116 worker.on(
117 'message',
118 this.workerOptions.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
119 );
120 worker.on('message', (message: WorkerMessage<WorkerData>) => {
121 if (message.event === WorkerMessageEvents.startedWorkerElement) {
122 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
123 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
124 this.emitter?.emit(WorkerSetEvents.elementError, message.data);
125 }
126 });
127 worker.on(
128 'error',
129 this.workerOptions.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
130 );
131 worker.on('error', (error) => {
132 this.emitter?.emit(WorkerSetEvents.error, error);
133 if (this.workerOptions.poolOptions?.restartWorkerOnError) {
134 this.addWorkerSetElement();
135 }
136 });
137 worker.on(
138 'online',
139 this.workerOptions.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
140 );
141 worker.on(
142 'exit',
143 this.workerOptions.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
144 );
145 worker.once('exit', () =>
146 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
147 );
148 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
149 this.workerSet.add(workerSetElement);
150 return workerSetElement;
151 }
152
153 private removeWorkerSetElement(workerSetElement: WorkerSetElement): void {
154 this.workerSet.delete(workerSetElement);
155 }
156
157 private async getWorkerSetElement(): Promise<WorkerSetElement> {
158 let chosenWorkerSetElement: WorkerSetElement | undefined;
159 for (const workerSetElement of this.workerSet) {
160 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
161 chosenWorkerSetElement = workerSetElement;
162 break;
163 }
164 }
165 if (!chosenWorkerSetElement) {
166 chosenWorkerSetElement = this.addWorkerSetElement();
167 // Add worker set element sequentially to optimize memory at startup
168 this.workerOptions.workerStartDelay! > 0 &&
169 (await sleep(this.workerOptions.workerStartDelay!));
170 }
171 return chosenWorkerSetElement;
172 }
173
174 private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
175 let workerSetElt: WorkerSetElement | undefined;
176 for (const workerSetElement of this.workerSet) {
177 if (workerSetElement.worker.threadId === worker.threadId) {
178 workerSetElt = workerSetElement;
179 break;
180 }
181 }
182 return workerSetElt;
183 }
184 }