10606d2e498639bb6d0097f2d6c3740465746113
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
2
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { SHARE_ENV, Worker } from 'node:worker_threads'
5
6 import { WorkerAbstract } from './WorkerAbstract.js'
7 import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents
16 } from './WorkerTypes.js'
17 import { randomizeDelay, sleep } from './WorkerUtils.js'
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter: EventEmitterAsyncResource | undefined
21 private readonly workerSet: Set<WorkerSetElement>
22 private started: boolean
23 private workerStartup: boolean
24
25 /**
26 * Creates a new `WorkerSet`.
27 *
28 * @param workerScript -
29 * @param workerOptions -
30 */
31 constructor (workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions)
33 if (this.workerOptions.elementsPerWorker == null) {
34 throw new TypeError('Elements per worker is not defined')
35 }
36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
37 throw new TypeError('Elements per worker must be an integer')
38 }
39 if (
40 typeof this.workerOptions.elementsPerWorker === 'number' &&
41 this.workerOptions.elementsPerWorker <= 0
42 ) {
43 throw new RangeError('Elements per worker must be greater than zero')
44 }
45 this.workerSet = new Set<WorkerSetElement>()
46 if (this.workerOptions.poolOptions?.enableEvents === true) {
47 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
48 }
49 this.started = false
50 this.workerStartup = false
51 }
52
53 get info (): SetInfo {
54 return {
55 version: workerSetVersion,
56 type: 'set',
57 worker: 'thread',
58 started: this.started,
59 size: this.size,
60 elementsExecuting: [...this.workerSet].reduce(
61 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
62 0
63 ),
64 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65 elementsPerWorker: this.maxElementsPerWorker!
66 }
67 }
68
69 get size (): number {
70 return this.workerSet.size
71 }
72
73 get maxElementsPerWorker (): number | undefined {
74 return this.workerOptions.elementsPerWorker
75 }
76
77 /** @inheritDoc */
78 public async start (): Promise<void> {
79 this.addWorkerSetElement()
80 // Add worker set element sequentially to optimize memory at startup
81 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
82 this.workerOptions.workerStartDelay! > 0 &&
83 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
84 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
85 this.emitter?.emit(WorkerSetEvents.started, this.info)
86 this.started = true
87 }
88
89 /** @inheritDoc */
90 public async stop (): Promise<void> {
91 for (const workerSetElement of this.workerSet) {
92 const worker = workerSetElement.worker
93 const waitWorkerExit = new Promise<void>(resolve => {
94 worker.once('exit', () => {
95 resolve()
96 })
97 })
98 worker.unref()
99 await worker.terminate()
100 await waitWorkerExit
101 }
102 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
103 this.started = false
104 this.emitter?.emitDestroy()
105 }
106
107 /** @inheritDoc */
108 public async addElement (elementData: WorkerData): Promise<void> {
109 if (!this.started) {
110 throw new Error('Cannot add a WorkerSet element: not started')
111 }
112 const workerSetElement = await this.getWorkerSetElement()
113 workerSetElement.worker.postMessage({
114 event: WorkerMessageEvents.addWorkerElement,
115 data: elementData
116 })
117 ++workerSetElement.numberOfWorkerElements
118 // Add element sequentially to optimize memory at startup
119 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
120 if (this.workerOptions.elementAddDelay! > 0) {
121 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
122 await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))
123 }
124 }
125
126 /**
127 * Adds a new `WorkerSetElement`.
128 */
129 private addWorkerSetElement (): WorkerSetElement {
130 this.workerStartup = true
131 const worker = new Worker(this.workerScript, {
132 env: SHARE_ENV,
133 ...this.workerOptions.poolOptions?.workerOptions
134 })
135 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
136 worker.on('message', (message: WorkerMessage<WorkerData>) => {
137 if (message.event === WorkerMessageEvents.addedWorkerElement) {
138 this.emitter?.emit(WorkerSetEvents.elementAdded, this.info)
139 } else if (message.event === WorkerMessageEvents.workerElementError) {
140 this.emitter?.emit(WorkerSetEvents.elementError, message.data)
141 }
142 })
143 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
144 worker.on('error', error => {
145 this.emitter?.emit(WorkerSetEvents.error, error)
146 if (
147 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
148 this.started &&
149 !this.workerStartup
150 ) {
151 this.addWorkerSetElement()
152 }
153 worker.unref()
154 worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error))
155 })
156 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
157 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
158 worker.once('exit', () => {
159 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
160 })
161 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }
162 this.workerSet.add(workerSetElement)
163 this.workerStartup = false
164 return workerSetElement
165 }
166
167 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
168 if (workerSetElement == null) {
169 return
170 }
171 this.workerSet.delete(workerSetElement)
172 }
173
174 private async getWorkerSetElement (): Promise<WorkerSetElement> {
175 let chosenWorkerSetElement: WorkerSetElement | undefined
176 for (const workerSetElement of this.workerSet) {
177 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
178 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
179 chosenWorkerSetElement = workerSetElement
180 break
181 }
182 }
183 if (chosenWorkerSetElement == null) {
184 chosenWorkerSetElement = this.addWorkerSetElement()
185 // Add worker set element sequentially to optimize memory at startup
186 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
187 this.workerOptions.workerStartDelay! > 0 &&
188 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
189 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
190 }
191 return chosenWorkerSetElement
192 }
193
194 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
195 let workerSetElt: WorkerSetElement | undefined
196 for (const workerSetElement of this.workerSet) {
197 if (workerSetElement.worker.threadId === worker.threadId) {
198 workerSetElt = workerSetElement
199 break
200 }
201 }
202 return workerSetElt
203 }
204 }