]>
Commit | Line | Data |
---|---|---|
a19b897d | 1 | // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved. |
c8eeb62b | 2 | |
65d22502 | 3 | import { randomUUID } from 'node:crypto' |
66a7748d JB |
4 | import { EventEmitterAsyncResource } from 'node:events' |
5 | import { SHARE_ENV, Worker } from 'node:worker_threads' | |
c045d9a9 | 6 | |
66a7748d JB |
7 | import { WorkerAbstract } from './WorkerAbstract.js' |
8 | import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js' | |
0e4fa348 | 9 | import { |
b779c0f8 | 10 | type SetInfo, |
0e4fa348 | 11 | type WorkerData, |
c26984f2 | 12 | type WorkerMessage, |
0e4fa348 JB |
13 | WorkerMessageEvents, |
14 | type WorkerOptions, | |
15 | type WorkerSetElement, | |
d1f5bfd8 | 16 | WorkerSetEvents, |
66a7748d JB |
17 | } from './WorkerTypes.js' |
18 | import { randomizeDelay, sleep } from './WorkerUtils.js' | |
6013bc53 | 19 | |
65d22502 | 20 | interface ResponseWrapper<R extends WorkerData> { |
65d22502 | 21 | reject: (reason?: unknown) => void |
0749233f | 22 | resolve: (value: PromiseLike<R> | R) => void |
65d22502 JB |
23 | workerSetElement: WorkerSetElement |
24 | } | |
25 | ||
3b09e788 | 26 | export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> { |
c4a89082 JB |
27 | public readonly emitter: EventEmitterAsyncResource | undefined |
28 | ||
29 | get info (): SetInfo { | |
30 | return { | |
31 | elementsExecuting: [...this.workerSet].reduce( | |
32 | (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, | |
33 | 0 | |
34 | ), | |
35 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
36 | elementsPerWorker: this.maxElementsPerWorker!, | |
37 | size: this.size, | |
38 | started: this.started, | |
39 | type: 'set', | |
40 | version: workerSetVersion, | |
41 | worker: 'thread', | |
42 | } | |
43 | } | |
44 | ||
45 | get maxElementsPerWorker (): number | undefined { | |
46 | return this.workerOptions.elementsPerWorker | |
47 | } | |
48 | ||
49 | get size (): number { | |
50 | return this.workerSet.size | |
51 | } | |
52 | ||
65d22502 JB |
53 | private readonly promiseResponseMap: Map< |
54 | `${string}-${string}-${string}-${string}`, | |
d1f5bfd8 | 55 | ResponseWrapper<R> |
65d22502 JB |
56 | > |
57 | ||
66a7748d | 58 | private started: boolean |
0749233f | 59 | private readonly workerSet: Set<WorkerSetElement> |
66a7748d | 60 | private workerStartup: boolean |
6013bc53 JB |
61 | |
62 | /** | |
361c98f5 | 63 | * Creates a new `WorkerSet`. |
0e4fa348 JB |
64 | * @param workerScript - |
65 | * @param workerOptions - | |
6013bc53 | 66 | */ |
66a7748d JB |
67 | constructor (workerScript: string, workerOptions: WorkerOptions) { |
68 | super(workerScript, workerOptions) | |
27662cf1 | 69 | if (this.workerOptions.elementsPerWorker == null) { |
66a7748d | 70 | throw new TypeError('Elements per worker is not defined') |
81027aa5 | 71 | } |
f028efca | 72 | if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) { |
66a7748d | 73 | throw new TypeError('Elements per worker must be an integer') |
81027aa5 | 74 | } |
2bb3c92f | 75 | if (this.workerOptions.elementsPerWorker <= 0) { |
66a7748d | 76 | throw new RangeError('Elements per worker must be greater than zero') |
81027aa5 | 77 | } |
66a7748d | 78 | this.workerSet = new Set<WorkerSetElement>() |
65d22502 JB |
79 | this.promiseResponseMap = new Map< |
80 | `${string}-${string}-${string}-${string}`, | |
d1f5bfd8 | 81 | ResponseWrapper<R> |
65d22502 | 82 | >() |
a4385edc | 83 | if (this.workerOptions.poolOptions?.enableEvents === true) { |
66a7748d | 84 | this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) |
29bb4dee | 85 | } |
66a7748d JB |
86 | this.started = false |
87 | this.workerStartup = false | |
6013bc53 JB |
88 | } |
89 | ||
c4a89082 JB |
90 | /** @inheritDoc */ |
91 | public async addElement (elementData: D): Promise<R> { | |
92 | if (!this.started) { | |
93 | throw new Error('Cannot add a WorkerSet element: not started') | |
94 | } | |
95 | const workerSetElement = await this.getWorkerSetElement() | |
96 | const sendMessageToWorker = new Promise<R>((resolve, reject) => { | |
97 | const message = { | |
98 | data: elementData, | |
99 | event: WorkerMessageEvents.addWorkerElement, | |
100 | uuid: randomUUID(), | |
101 | } satisfies WorkerMessage<D> | |
102 | workerSetElement.worker.postMessage(message) | |
103 | this.promiseResponseMap.set(message.uuid, { | |
104 | reject, | |
105 | resolve, | |
106 | workerSetElement, | |
107 | }) | |
108 | }) | |
109 | const response = await sendMessageToWorker | |
110 | // Add element sequentially to optimize memory at startup | |
111 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
112 | if (this.workerOptions.elementAddDelay! > 0) { | |
113 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
114 | await sleep(randomizeDelay(this.workerOptions.elementAddDelay!)) | |
115 | } | |
116 | return response | |
117 | } | |
118 | ||
119 | /** @inheritDoc */ | |
120 | public async start (): Promise<void> { | |
121 | this.addWorkerSetElement() | |
122 | // Add worker set element sequentially to optimize memory at startup | |
123 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
124 | this.workerOptions.workerStartDelay! > 0 && | |
125 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion | |
126 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
127 | this.emitter?.emit(WorkerSetEvents.started, this.info) | |
128 | this.started = true | |
129 | } | |
130 | ||
131 | /** @inheritDoc */ | |
132 | public async stop (): Promise<void> { | |
133 | for (const workerSetElement of this.workerSet) { | |
134 | const worker = workerSetElement.worker | |
135 | const waitWorkerExit = new Promise<void>(resolve => { | |
136 | worker.once('exit', () => { | |
137 | resolve() | |
138 | }) | |
139 | }) | |
140 | worker.unref() | |
141 | await worker.terminate() | |
142 | await waitWorkerExit | |
143 | } | |
144 | this.emitter?.emit(WorkerSetEvents.stopped, this.info) | |
145 | this.started = false | |
146 | this.emitter?.emitDestroy() | |
147 | } | |
148 | ||
6013bc53 | 149 | /** |
361c98f5 | 150 | * Adds a new `WorkerSetElement`. |
d1f5bfd8 | 151 | * @returns The new `WorkerSetElement`. |
6013bc53 | 152 | */ |
66a7748d JB |
153 | private addWorkerSetElement (): WorkerSetElement { |
154 | this.workerStartup = true | |
be245fda JB |
155 | const worker = new Worker(this.workerScript, { |
156 | env: SHARE_ENV, | |
d1f5bfd8 | 157 | ...this.workerOptions.poolOptions?.workerOptions, |
66a7748d JB |
158 | }) |
159 | worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) | |
3b09e788 | 160 | worker.on('message', (message: WorkerMessage<R>) => { |
0749233f | 161 | const { data, event, uuid } = message |
1d7a504b | 162 | if (this.promiseResponseMap.has(uuid)) { |
65d22502 | 163 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
0749233f | 164 | const { reject, resolve, workerSetElement } = this.promiseResponseMap.get(uuid)! |
ce0abd82 JB |
165 | switch (event) { |
166 | case WorkerMessageEvents.addedWorkerElement: | |
167 | this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) | |
168 | ++workerSetElement.numberOfWorkerElements | |
169 | resolve(data) | |
170 | break | |
171 | case WorkerMessageEvents.workerElementError: | |
172 | this.emitter?.emit(WorkerSetEvents.elementError, data) | |
173 | reject(data) | |
174 | break | |
175 | default: | |
176 | reject( | |
177 | new Error( | |
178 | `Unknown worker message event: '${event}' received with data: '${JSON.stringify( | |
179 | data, | |
180 | undefined, | |
181 | 2 | |
182 | )}'` | |
183 | ) | |
184 | ) | |
65d22502 | 185 | } |
ccf11189 | 186 | this.promiseResponseMap.delete(uuid) |
c26984f2 | 187 | } |
66a7748d JB |
188 | }) |
189 | worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) | |
4119f480 | 190 | worker.once('error', error => { |
66a7748d | 191 | this.emitter?.emit(WorkerSetEvents.error, error) |
c81424b8 | 192 | if ( |
66a7748d | 193 | this.workerOptions.poolOptions?.restartWorkerOnError === true && |
c81424b8 JB |
194 | this.started && |
195 | !this.workerStartup | |
196 | ) { | |
66a7748d | 197 | this.addWorkerSetElement() |
29bb4dee | 198 | } |
8ba9c854 | 199 | worker.unref() |
d1f5bfd8 | 200 | // eslint-disable-next-line promise/no-promise-in-callback |
ea32ea05 | 201 | worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error)) |
66a7748d JB |
202 | }) |
203 | worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) | |
204 | worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) | |
205 | worker.once('exit', () => { | |
206 | this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) | |
207 | }) | |
48847bc0 | 208 | const workerSetElement: WorkerSetElement = { |
d1f5bfd8 | 209 | numberOfWorkerElements: 0, |
0749233f | 210 | worker, |
48847bc0 | 211 | } |
66a7748d JB |
212 | this.workerSet.add(workerSetElement) |
213 | this.workerStartup = false | |
214 | return workerSetElement | |
6013bc53 JB |
215 | } |
216 | ||
66a7748d | 217 | private async getWorkerSetElement (): Promise<WorkerSetElement> { |
0749233f | 218 | let chosenWorkerSetElement: undefined | WorkerSetElement |
962a8159 | 219 | for (const workerSetElement of this.workerSet) { |
66a7748d | 220 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 221 | if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) { |
66a7748d JB |
222 | chosenWorkerSetElement = workerSetElement |
223 | break | |
962a8159 | 224 | } |
e7aeea18 | 225 | } |
66a7748d JB |
226 | if (chosenWorkerSetElement == null) { |
227 | chosenWorkerSetElement = this.addWorkerSetElement() | |
962a8159 | 228 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 229 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 230 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
231 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
232 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
962a8159 | 233 | } |
66a7748d | 234 | return chosenWorkerSetElement |
c045d9a9 JB |
235 | } |
236 | ||
0749233f JB |
237 | private getWorkerSetElementByWorker (worker: Worker): undefined | WorkerSetElement { |
238 | let workerSetElt: undefined | WorkerSetElement | |
0e7a11e1 | 239 | for (const workerSetElement of this.workerSet) { |
81696bd5 | 240 | if (workerSetElement.worker.threadId === worker.threadId) { |
66a7748d JB |
241 | workerSetElt = workerSetElement |
242 | break | |
1e924543 | 243 | } |
0e7a11e1 | 244 | } |
66a7748d | 245 | return workerSetElt |
1e924543 | 246 | } |
0749233f JB |
247 | |
248 | private removeWorkerSetElement (workerSetElement: undefined | WorkerSetElement): void { | |
249 | if (workerSetElement == null) { | |
250 | return | |
251 | } | |
252 | this.workerSet.delete(workerSetElement) | |
253 | } | |
6013bc53 | 254 | } |