refactor: refine prettier configuration
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
2
3 import { EventEmitterAsyncResource } from 'node:events'
4 import { SHARE_ENV, Worker } from 'node:worker_threads'
5
6 import { WorkerAbstract } from './WorkerAbstract.js'
7 import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
8 import {
9 type SetInfo,
10 type WorkerData,
11 type WorkerMessage,
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
15 WorkerSetEvents
16 } from './WorkerTypes.js'
17 import { randomizeDelay, sleep } from './WorkerUtils.js'
18
19 export class WorkerSet extends WorkerAbstract<WorkerData> {
20 public readonly emitter: EventEmitterAsyncResource | undefined
21 private readonly workerSet: Set<WorkerSetElement>
22 private started: boolean
23 private workerStartup: boolean
24
25 /**
26 * Creates a new `WorkerSet`.
27 *
28 * @param workerScript -
29 * @param workerOptions -
30 */
31 constructor (workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions)
33 if (this.workerOptions.elementsPerWorker == null) {
34 throw new TypeError('Elements per worker is not defined')
35 }
36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
37 throw new TypeError('Elements per worker must be an integer')
38 }
39 if (this.workerOptions.elementsPerWorker <= 0) {
40 throw new RangeError('Elements per worker must be greater than zero')
41 }
42 this.workerSet = new Set<WorkerSetElement>()
43 if (this.workerOptions.poolOptions?.enableEvents === true) {
44 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
45 }
46 this.started = false
47 this.workerStartup = false
48 }
49
50 get info (): SetInfo {
51 return {
52 version: workerSetVersion,
53 type: 'set',
54 worker: 'thread',
55 size: this.size,
56 elementsExecuting: [...this.workerSet].reduce(
57 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
58 0
59 ),
60 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
61 elementsPerWorker: this.maxElementsPerWorker!
62 }
63 }
64
65 get size (): number {
66 return this.workerSet.size
67 }
68
69 get maxElementsPerWorker (): number | undefined {
70 return this.workerOptions.elementsPerWorker
71 }
72
73 /** @inheritDoc */
74 public async start (): Promise<void> {
75 this.addWorkerSetElement()
76 // Add worker set element sequentially to optimize memory at startup
77 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
78 this.workerOptions.workerStartDelay! > 0 &&
79 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
80 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
81 this.emitter?.emit(WorkerSetEvents.started, this.info)
82 this.started = true
83 }
84
85 /** @inheritDoc */
86 public async stop (): Promise<void> {
87 for (const workerSetElement of this.workerSet) {
88 const worker = workerSetElement.worker
89 const waitWorkerExit = new Promise<void>(resolve => {
90 worker.once('exit', () => {
91 resolve()
92 })
93 })
94 await worker.terminate()
95 await waitWorkerExit
96 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
97 this.emitter?.emitDestroy()
98 this.emitter?.removeAllListeners()
99 this.started = false
100 }
101 }
102
103 /** @inheritDoc */
104 public async addElement (elementData: WorkerData): Promise<void> {
105 if (!this.started) {
106 throw new Error('Cannot add a WorkerSet element: not started')
107 }
108 const workerSetElement = await this.getWorkerSetElement()
109 workerSetElement.worker.postMessage({
110 event: WorkerMessageEvents.startWorkerElement,
111 data: elementData
112 })
113 ++workerSetElement.numberOfWorkerElements
114 // Add element sequentially to optimize memory at startup
115 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
116 if (this.workerOptions.elementStartDelay! > 0) {
117 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
118 await sleep(randomizeDelay(this.workerOptions.elementStartDelay!))
119 }
120 }
121
122 /**
123 * Adds a new `WorkerSetElement`.
124 */
125 private addWorkerSetElement (): WorkerSetElement {
126 this.workerStartup = true
127 const worker = new Worker(this.workerScript, {
128 env: SHARE_ENV,
129 ...this.workerOptions.poolOptions?.workerOptions
130 })
131 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
132 worker.on('message', (message: WorkerMessage<WorkerData>) => {
133 if (message.event === WorkerMessageEvents.startedWorkerElement) {
134 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info)
135 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
136 this.emitter?.emit(WorkerSetEvents.elementError, message.data)
137 }
138 })
139 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
140 worker.on('error', error => {
141 this.emitter?.emit(WorkerSetEvents.error, error)
142 if (
143 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
144 this.started &&
145 !this.workerStartup
146 ) {
147 this.addWorkerSetElement()
148 }
149 })
150 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
151 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
152 worker.once('exit', () => {
153 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
154 })
155 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }
156 this.workerSet.add(workerSetElement)
157 this.workerStartup = false
158 return workerSetElement
159 }
160
161 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
162 if (workerSetElement == null) {
163 return
164 }
165 this.workerSet.delete(workerSetElement)
166 }
167
168 private async getWorkerSetElement (): Promise<WorkerSetElement> {
169 let chosenWorkerSetElement: WorkerSetElement | undefined
170 for (const workerSetElement of this.workerSet) {
171 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
172 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
173 chosenWorkerSetElement = workerSetElement
174 break
175 }
176 }
177 if (chosenWorkerSetElement == null) {
178 chosenWorkerSetElement = this.addWorkerSetElement()
179 // Add worker set element sequentially to optimize memory at startup
180 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
181 this.workerOptions.workerStartDelay! > 0 &&
182 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
183 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
184 }
185 return chosenWorkerSetElement
186 }
187
188 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
189 let workerSetElt: WorkerSetElement | undefined
190 for (const workerSetElement of this.workerSet) {
191 if (workerSetElement.worker.threadId === worker.threadId) {
192 workerSetElt = workerSetElement
193 break
194 }
195 }
196 return workerSetElt
197 }
198 }