]> Piment Noir Git Repositories - e-mobility-charging-stations-simulator.git/blame_incremental - src/worker/WorkerSet.ts
chore(deps-dev): apply updates
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
... / ...
CommitLineData
1// Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
2
3import { randomUUID } from 'node:crypto'
4import { EventEmitterAsyncResource } from 'node:events'
5import { SHARE_ENV, Worker } from 'node:worker_threads'
6
7import { WorkerAbstract } from './WorkerAbstract.js'
8import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
9import {
10 type SetInfo,
11 type WorkerData,
12 type WorkerMessage,
13 WorkerMessageEvents,
14 type WorkerOptions,
15 type WorkerSetElement,
16 WorkerSetEvents,
17} from './WorkerTypes.js'
18import { randomizeDelay, sleep } from './WorkerUtils.js'
19
20interface ResponseWrapper<R extends WorkerData> {
21 reject: (reason?: unknown) => void
22 resolve: (value: PromiseLike<R> | R) => void
23 workerSetElement: WorkerSetElement
24}
25
26export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> {
27 public readonly emitter: EventEmitterAsyncResource | undefined
28
29 get info (): SetInfo {
30 return {
31 elementsExecuting: [...this.workerSet].reduce(
32 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
33 0
34 ),
35 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
36 elementsPerWorker: this.maxElementsPerWorker!,
37 size: this.size,
38 started: this.started,
39 type: 'set',
40 version: workerSetVersion,
41 worker: 'thread',
42 }
43 }
44
45 get maxElementsPerWorker (): number | undefined {
46 return this.workerOptions.elementsPerWorker
47 }
48
49 get size (): number {
50 return this.workerSet.size
51 }
52
53 private readonly promiseResponseMap: Map<
54 `${string}-${string}-${string}-${string}`,
55 ResponseWrapper<R>
56 >
57
58 private started: boolean
59 private readonly workerSet: Set<WorkerSetElement>
60 private workerStartup: boolean
61
62 /**
63 * Creates a new `WorkerSet`.
64 * @param workerScript -
65 * @param workerOptions -
66 */
67 constructor (workerScript: string, workerOptions: WorkerOptions) {
68 super(workerScript, workerOptions)
69 if (this.workerOptions.elementsPerWorker == null) {
70 throw new TypeError('Elements per worker is not defined')
71 }
72 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
73 throw new TypeError('Elements per worker must be an integer')
74 }
75 if (this.workerOptions.elementsPerWorker <= 0) {
76 throw new RangeError('Elements per worker must be greater than zero')
77 }
78 this.workerSet = new Set<WorkerSetElement>()
79 this.promiseResponseMap = new Map<
80 `${string}-${string}-${string}-${string}`,
81 ResponseWrapper<R>
82 >()
83 if (this.workerOptions.poolOptions?.enableEvents === true) {
84 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
85 }
86 this.started = false
87 this.workerStartup = false
88 }
89
90 /** @inheritDoc */
91 public async addElement (elementData: D): Promise<R> {
92 if (!this.started) {
93 throw new Error('Cannot add a WorkerSet element: not started')
94 }
95 const workerSetElement = await this.getWorkerSetElement()
96 const sendMessageToWorker = new Promise<R>((resolve, reject) => {
97 const message = {
98 data: elementData,
99 event: WorkerMessageEvents.addWorkerElement,
100 uuid: randomUUID(),
101 } satisfies WorkerMessage<D>
102 workerSetElement.worker.postMessage(message)
103 this.promiseResponseMap.set(message.uuid, {
104 reject,
105 resolve,
106 workerSetElement,
107 })
108 })
109 const response = await sendMessageToWorker
110 // Add element sequentially to optimize memory at startup
111 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
112 if (this.workerOptions.elementAddDelay! > 0) {
113 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
114 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
115 }
116 return response
117 }
118
119 /** @inheritDoc */
120 public async start (): Promise<void> {
121 this.addWorkerSetElement()
122 // Add worker set element sequentially to optimize memory at startup
123 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
124 this.workerOptions.workerStartDelay! > 0 &&
125 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
126 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
127 this.emitter?.emit(WorkerSetEvents.started, this.info)
128 this.started = true
129 }
130
131 /** @inheritDoc */
132 public async stop (): Promise<void> {
133 for (const workerSetElement of this.workerSet) {
134 const worker = workerSetElement.worker
135 const waitWorkerExit = new Promise<void>(resolve => {
136 worker.once('exit', () => {
137 resolve()
138 })
139 })
140 worker.unref()
141 await worker.terminate()
142 await waitWorkerExit
143 }
144 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
145 this.started = false
146 this.emitter?.emitDestroy()
147 }
148
149 /**
150 * Adds a new `WorkerSetElement`.
151 * @returns The new `WorkerSetElement`.
152 */
153 private addWorkerSetElement (): WorkerSetElement {
154 this.workerStartup = true
155 const worker = new Worker(this.workerScript, {
156 env: SHARE_ENV,
157 ...this.workerOptions.poolOptions?.workerOptions,
158 })
159 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
160 worker.on('message', (message: WorkerMessage<R>) => {
161 const { data, event, uuid } = message
162 if (this.promiseResponseMap.has(uuid)) {
163 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
164 const { reject, resolve, workerSetElement } = this.promiseResponseMap.get(uuid)!
165 switch (event) {
166 case WorkerMessageEvents.addedWorkerElement:
167 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
168 ++workerSetElement.numberOfWorkerElements
169 resolve(data)
170 break
171 case WorkerMessageEvents.workerElementError:
172 this.emitter?.emit(WorkerSetEvents.elementError, data)
173 reject(data)
174 break
175 default:
176 reject(
177 new Error(
178 `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
179 data,
180 undefined,
181 2
182 )}'`
183 )
184 )
185 }
186 this.promiseResponseMap.delete(uuid)
187 }
188 })
189 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
190 worker.once('error', error => {
191 this.emitter?.emit(WorkerSetEvents.error, error)
192 if (
193 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
194 this.started &&
195 !this.workerStartup
196 ) {
197 this.addWorkerSetElement()
198 }
199 worker.unref()
200 // eslint-disable-next-line promise/no-promise-in-callback
201 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
202 })
203 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
204 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
205 worker.once('exit', () => {
206 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
207 })
208 const workerSetElement: WorkerSetElement = {
209 numberOfWorkerElements: 0,
210 worker,
211 }
212 this.workerSet.add(workerSetElement)
213 this.workerStartup = false
214 return workerSetElement
215 }
216
217 private async getWorkerSetElement (): Promise<WorkerSetElement> {
218 let chosenWorkerSetElement: undefined | WorkerSetElement
219 for (const workerSetElement of this.workerSet) {
220 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
221 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
222 chosenWorkerSetElement = workerSetElement
223 break
224 }
225 }
226 if (chosenWorkerSetElement == null) {
227 chosenWorkerSetElement = this.addWorkerSetElement()
228 // Add worker set element sequentially to optimize memory at startup
229 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
230 this.workerOptions.workerStartDelay! > 0 &&
231 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
232 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
233 }
234 return chosenWorkerSetElement
235 }
236
237 private getWorkerSetElementByWorker (worker: Worker): undefined | WorkerSetElement {
238 let workerSetElt: undefined | WorkerSetElement
239 for (const workerSetElement of this.workerSet) {
240 if (workerSetElement.worker.threadId === worker.threadId) {
241 workerSetElt = workerSetElement
242 break
243 }
244 }
245 return workerSetElt
246 }
247
248 private removeWorkerSetElement (workerSetElement: undefined | WorkerSetElement): void {
249 if (workerSetElement == null) {
250 return
251 }
252 this.workerSet.delete(workerSetElement)
253 }
254}