chore: switch coding style to JS standard
[e-mobility-charging-stations-simulator.git] / src / worker / WorkerSet.ts
CommitLineData
edd13439 1// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
c8eeb62b 2
66a7748d
JB
3import { EventEmitterAsyncResource } from 'node:events'
4import { SHARE_ENV, Worker } from 'node:worker_threads'
c045d9a9 5
66a7748d
JB
6import { WorkerAbstract } from './WorkerAbstract.js'
7import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js'
0e4fa348 8import {
b779c0f8 9 type SetInfo,
0e4fa348 10 type WorkerData,
c26984f2 11 type WorkerMessage,
0e4fa348
JB
12 WorkerMessageEvents,
13 type WorkerOptions,
14 type WorkerSetElement,
66a7748d
JB
15 WorkerSetEvents
16} from './WorkerTypes.js'
17import { randomizeDelay, sleep } from './WorkerUtils.js'
6013bc53 18
268a74bb 19export class WorkerSet extends WorkerAbstract<WorkerData> {
66a7748d
JB
20 public readonly emitter: EventEmitterAsyncResource | undefined
21 private readonly workerSet: Set<WorkerSetElement>
22 private started: boolean
23 private workerStartup: boolean
6013bc53
JB
24
25 /**
361c98f5 26 * Creates a new `WorkerSet`.
6013bc53 27 *
0e4fa348
JB
28 * @param workerScript -
29 * @param workerOptions -
6013bc53 30 */
66a7748d
JB
31 constructor (workerScript: string, workerOptions: WorkerOptions) {
32 super(workerScript, workerOptions)
27662cf1 33 if (this.workerOptions.elementsPerWorker == null) {
66a7748d 34 throw new TypeError('Elements per worker is not defined')
81027aa5 35 }
f028efca 36 if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
66a7748d 37 throw new TypeError('Elements per worker must be an integer')
81027aa5
JB
38 }
39 if (this.workerOptions.elementsPerWorker <= 0) {
66a7748d 40 throw new RangeError('Elements per worker must be greater than zero')
81027aa5 41 }
66a7748d
JB
42 this.workerSet = new Set<WorkerSetElement>()
43 if (this.workerOptions.poolOptions?.enableEvents != null) {
44 this.emitter = new EventEmitterAsyncResource({ name: 'workerset' })
29bb4dee 45 }
66a7748d
JB
46 this.started = false
47 this.workerStartup = false
6013bc53
JB
48 }
49
66a7748d 50 get info (): SetInfo {
b779c0f8 51 return {
769d3b10 52 version: workerSetVersion,
0bde1ea1
JB
53 type: 'set',
54 worker: 'thread',
b779c0f8 55 size: this.size,
19bdf4ca 56 elementsExecuting: [...this.workerSet].reduce(
b779c0f8 57 (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements,
66a7748d 58 0
b779c0f8 59 ),
66a7748d
JB
60 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
61 elementsPerWorker: this.maxElementsPerWorker!
62 }
b779c0f8
JB
63 }
64
66a7748d
JB
65 get size (): number {
66 return this.workerSet.size
6013bc53
JB
67 }
68
66a7748d
JB
69 get maxElementsPerWorker (): number | undefined {
70 return this.workerOptions.elementsPerWorker
4d7227e6
JB
71 }
72
b0dee778 73 /** @inheritDoc */
66a7748d
JB
74 public async start (): Promise<void> {
75 this.addWorkerSetElement()
b0dee778 76 // Add worker set element sequentially to optimize memory at startup
66a7748d 77 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
ab93b184 78 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
79 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
80 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
81 this.emitter?.emit(WorkerSetEvents.started, this.info)
82 this.started = true
b0dee778
JB
83 }
84
85 /** @inheritDoc */
66a7748d 86 public async stop (): Promise<void> {
b0dee778 87 for (const workerSetElement of this.workerSet) {
66a7748d 88 const worker = workerSetElement.worker
bd62e88f 89 const waitWorkerExit = new Promise<void>((resolve) => {
ae3a41a1 90 worker.once('exit', () => {
66a7748d
JB
91 resolve()
92 })
93 })
94 await worker.terminate()
95 await waitWorkerExit
96 this.emitter?.emit(WorkerSetEvents.stopped, this.info)
97 this.emitter?.emitDestroy()
98 this.emitter?.removeAllListeners()
99 this.started = false
b0dee778 100 }
b0dee778
JB
101 }
102
8baf3f8f 103 /** @inheritDoc */
66a7748d 104 public async addElement (elementData: WorkerData): Promise<void> {
c81424b8 105 if (!this.started) {
66a7748d 106 throw new Error('Cannot add a WorkerSet element: not started')
c81424b8 107 }
a78c196b 108 if (this.workerSet == null) {
66a7748d 109 throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist")
6013bc53 110 }
66a7748d 111 const workerSetElement = await this.getWorkerSetElement()
962a8159 112 workerSetElement.worker.postMessage({
2bb7a73e 113 event: WorkerMessageEvents.startWorkerElement,
66a7748d
JB
114 data: elementData
115 })
116 ++workerSetElement.numberOfWorkerElements
962a8159 117 // Add element sequentially to optimize memory at startup
66a7748d 118 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 119 if (this.workerOptions.elementStartDelay! > 0) {
66a7748d
JB
120 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
121 await sleep(randomizeDelay(this.workerOptions.elementStartDelay!))
d070d967 122 }
6013bc53
JB
123 }
124
6013bc53 125 /**
361c98f5 126 * Adds a new `WorkerSetElement`.
6013bc53 127 */
66a7748d
JB
128 private addWorkerSetElement (): WorkerSetElement {
129 this.workerStartup = true
be245fda
JB
130 const worker = new Worker(this.workerScript, {
131 env: SHARE_ENV,
66a7748d
JB
132 ...this.workerOptions.poolOptions?.workerOptions
133 })
134 worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION)
c26984f2
JB
135 worker.on('message', (message: WorkerMessage<WorkerData>) => {
136 if (message.event === WorkerMessageEvents.startedWorkerElement) {
66a7748d 137 this.emitter?.emit(WorkerSetEvents.elementStarted, this.info)
c26984f2 138 } else if (message.event === WorkerMessageEvents.startWorkerElementError) {
66a7748d 139 this.emitter?.emit(WorkerSetEvents.elementError, message.data)
c26984f2 140 }
66a7748d
JB
141 })
142 worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION)
962a8159 143 worker.on('error', (error) => {
66a7748d 144 this.emitter?.emit(WorkerSetEvents.error, error)
c81424b8 145 if (
66a7748d 146 this.workerOptions.poolOptions?.restartWorkerOnError === true &&
c81424b8
JB
147 this.started &&
148 !this.workerStartup
149 ) {
66a7748d 150 this.addWorkerSetElement()
29bb4dee 151 }
66a7748d
JB
152 })
153 worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION)
154 worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION)
155 worker.once('exit', () => {
156 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker))
157 })
158 const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }
159 this.workerSet.add(workerSetElement)
160 this.workerStartup = false
161 return workerSetElement
6013bc53
JB
162 }
163
66a7748d
JB
164 private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void {
165 if (workerSetElement == null) {
166 return
167 }
168 this.workerSet.delete(workerSetElement)
dbc29904
JB
169 }
170
66a7748d
JB
171 private async getWorkerSetElement (): Promise<WorkerSetElement> {
172 let chosenWorkerSetElement: WorkerSetElement | undefined
962a8159 173 for (const workerSetElement of this.workerSet) {
66a7748d 174 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 175 if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) {
66a7748d
JB
176 chosenWorkerSetElement = workerSetElement
177 break
962a8159 178 }
e7aeea18 179 }
66a7748d
JB
180 if (chosenWorkerSetElement == null) {
181 chosenWorkerSetElement = this.addWorkerSetElement()
962a8159 182 // Add worker set element sequentially to optimize memory at startup
66a7748d 183 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
e1d9a0f4 184 this.workerOptions.workerStartDelay! > 0 &&
66a7748d
JB
185 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
186 (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)))
962a8159 187 }
66a7748d 188 return chosenWorkerSetElement
c045d9a9
JB
189 }
190
66a7748d
JB
191 private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined {
192 let workerSetElt: WorkerSetElement | undefined
0e7a11e1 193 for (const workerSetElement of this.workerSet) {
81696bd5 194 if (workerSetElement.worker.threadId === worker.threadId) {
66a7748d
JB
195 workerSetElt = workerSetElement
196 break
1e924543 197 }
0e7a11e1 198 }
66a7748d 199 return workerSetElt
1e924543 200 }
6013bc53 201}