Commit | Line | Data |
---|---|---|
a19b897d | 1 | // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved. |
c8eeb62b | 2 | |
65d22502 | 3 | import { randomUUID } from 'node:crypto' |
66a7748d JB |
4 | import { EventEmitterAsyncResource } from 'node:events' |
5 | import { SHARE_ENV, Worker } from 'node:worker_threads' | |
c045d9a9 | 6 | |
66a7748d JB |
7 | import { WorkerAbstract } from './WorkerAbstract.js' |
8 | import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js' | |
0e4fa348 | 9 | import { |
b779c0f8 | 10 | type SetInfo, |
0e4fa348 | 11 | type WorkerData, |
c26984f2 | 12 | type WorkerMessage, |
0e4fa348 JB |
13 | WorkerMessageEvents, |
14 | type WorkerOptions, | |
15 | type WorkerSetElement, | |
66a7748d JB |
16 | WorkerSetEvents |
17 | } from './WorkerTypes.js' | |
18 | import { randomizeDelay, sleep } from './WorkerUtils.js' | |
6013bc53 | 19 | |
65d22502 JB |
20 | interface ResponseWrapper<R extends WorkerData> { |
21 | resolve: (value: R | PromiseLike<R>) => void | |
22 | reject: (reason?: unknown) => void | |
23 | workerSetElement: WorkerSetElement | |
24 | } | |
25 | ||
3b09e788 | 26 | export class WorkerSet<D extends WorkerData, R extends WorkerData> extends WorkerAbstract<D, R> { |
66a7748d JB |
27 | public readonly emitter: EventEmitterAsyncResource | undefined |
28 | private readonly workerSet: Set<WorkerSetElement> | |
65d22502 JB |
29 | private readonly promiseResponseMap: Map< |
30 | `${string}-${string}-${string}-${string}`, | |
31 | ResponseWrapper<R> | |
32 | > | |
33 | ||
66a7748d JB |
34 | private started: boolean |
35 | private workerStartup: boolean | |
6013bc53 JB |
36 | |
37 | /** | |
361c98f5 | 38 | * Creates a new `WorkerSet`. |
6013bc53 | 39 | * |
0e4fa348 JB |
40 | * @param workerScript - |
41 | * @param workerOptions - | |
6013bc53 | 42 | */ |
66a7748d JB |
43 | constructor (workerScript: string, workerOptions: WorkerOptions) { |
44 | super(workerScript, workerOptions) | |
27662cf1 | 45 | if (this.workerOptions.elementsPerWorker == null) { |
66a7748d | 46 | throw new TypeError('Elements per worker is not defined') |
81027aa5 | 47 | } |
f028efca | 48 | if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) { |
66a7748d | 49 | throw new TypeError('Elements per worker must be an integer') |
81027aa5 | 50 | } |
2bb3c92f | 51 | if (this.workerOptions.elementsPerWorker <= 0) { |
66a7748d | 52 | throw new RangeError('Elements per worker must be greater than zero') |
81027aa5 | 53 | } |
66a7748d | 54 | this.workerSet = new Set<WorkerSetElement>() |
65d22502 JB |
55 | this.promiseResponseMap = new Map< |
56 | `${string}-${string}-${string}-${string}`, | |
57 | ResponseWrapper<R> | |
58 | >() | |
a4385edc | 59 | if (this.workerOptions.poolOptions?.enableEvents === true) { |
66a7748d | 60 | this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) |
29bb4dee | 61 | } |
66a7748d JB |
62 | this.started = false |
63 | this.workerStartup = false | |
6013bc53 JB |
64 | } |
65 | ||
66a7748d | 66 | get info (): SetInfo { |
b779c0f8 | 67 | return { |
769d3b10 | 68 | version: workerSetVersion, |
0bde1ea1 JB |
69 | type: 'set', |
70 | worker: 'thread', | |
4f02e9b4 | 71 | started: this.started, |
b779c0f8 | 72 | size: this.size, |
19bdf4ca | 73 | elementsExecuting: [...this.workerSet].reduce( |
b779c0f8 | 74 | (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, |
66a7748d | 75 | 0 |
b779c0f8 | 76 | ), |
66a7748d JB |
77 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
78 | elementsPerWorker: this.maxElementsPerWorker! | |
79 | } | |
b779c0f8 JB |
80 | } |
81 | ||
66a7748d JB |
82 | get size (): number { |
83 | return this.workerSet.size | |
6013bc53 JB |
84 | } |
85 | ||
66a7748d JB |
86 | get maxElementsPerWorker (): number | undefined { |
87 | return this.workerOptions.elementsPerWorker | |
4d7227e6 JB |
88 | } |
89 | ||
b0dee778 | 90 | /** @inheritDoc */ |
66a7748d JB |
91 | public async start (): Promise<void> { |
92 | this.addWorkerSetElement() | |
b0dee778 | 93 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 94 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
ab93b184 | 95 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
96 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
97 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
98 | this.emitter?.emit(WorkerSetEvents.started, this.info) | |
99 | this.started = true | |
b0dee778 JB |
100 | } |
101 | ||
102 | /** @inheritDoc */ | |
66a7748d | 103 | public async stop (): Promise<void> { |
b0dee778 | 104 | for (const workerSetElement of this.workerSet) { |
66a7748d | 105 | const worker = workerSetElement.worker |
a974c8e4 | 106 | const waitWorkerExit = new Promise<void>(resolve => { |
ae3a41a1 | 107 | worker.once('exit', () => { |
66a7748d JB |
108 | resolve() |
109 | }) | |
110 | }) | |
8ba9c854 | 111 | worker.unref() |
66a7748d JB |
112 | await worker.terminate() |
113 | await waitWorkerExit | |
b0dee778 | 114 | } |
4f02e9b4 | 115 | this.emitter?.emit(WorkerSetEvents.stopped, this.info) |
36c1166d | 116 | this.started = false |
4f02e9b4 | 117 | this.emitter?.emitDestroy() |
b0dee778 JB |
118 | } |
119 | ||
8baf3f8f | 120 | /** @inheritDoc */ |
3b09e788 | 121 | public async addElement (elementData: D): Promise<R> { |
c81424b8 | 122 | if (!this.started) { |
66a7748d | 123 | throw new Error('Cannot add a WorkerSet element: not started') |
c81424b8 | 124 | } |
66a7748d | 125 | const workerSetElement = await this.getWorkerSetElement() |
65d22502 JB |
126 | const sendMessageToWorker = new Promise<R>((resolve, reject) => { |
127 | const message = { | |
128 | uuid: randomUUID(), | |
129 | event: WorkerMessageEvents.addWorkerElement, | |
130 | data: elementData | |
131 | } satisfies WorkerMessage<D> | |
132 | workerSetElement.worker.postMessage(message) | |
133 | this.promiseResponseMap.set(message.uuid, { resolve, reject, workerSetElement }) | |
66a7748d | 134 | }) |
65d22502 | 135 | const response = await sendMessageToWorker |
962a8159 | 136 | // Add element sequentially to optimize memory at startup |
66a7748d | 137 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
da47bc29 | 138 | if (this.workerOptions.elementAddDelay! > 0) { |
66a7748d | 139 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
da47bc29 | 140 | await sleep(randomizeDelay(this.workerOptions.elementAddDelay!)) |
d070d967 | 141 | } |
3b09e788 | 142 | return response |
6013bc53 JB |
143 | } |
144 | ||
6013bc53 | 145 | /** |
361c98f5 | 146 | * Adds a new `WorkerSetElement`. |
6013bc53 | 147 | */ |
66a7748d JB |
148 | private addWorkerSetElement (): WorkerSetElement { |
149 | this.workerStartup = true | |
be245fda JB |
150 | const worker = new Worker(this.workerScript, { |
151 | env: SHARE_ENV, | |
66a7748d JB |
152 | ...this.workerOptions.poolOptions?.workerOptions |
153 | }) | |
154 | worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) | |
3b09e788 | 155 | worker.on('message', (message: WorkerMessage<R>) => { |
1d7a504b JB |
156 | const { uuid, event, data } = message |
157 | if (this.promiseResponseMap.has(uuid)) { | |
65d22502 | 158 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
1d7a504b | 159 | const { resolve, reject, workerSetElement } = this.promiseResponseMap.get(uuid)! |
ce0abd82 JB |
160 | switch (event) { |
161 | case WorkerMessageEvents.addedWorkerElement: | |
162 | this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) | |
163 | ++workerSetElement.numberOfWorkerElements | |
164 | resolve(data) | |
165 | break | |
166 | case WorkerMessageEvents.workerElementError: | |
167 | this.emitter?.emit(WorkerSetEvents.elementError, data) | |
168 | reject(data) | |
169 | break | |
170 | default: | |
171 | reject( | |
172 | new Error( | |
173 | `Unknown worker message event: '${event}' received with data: '${JSON.stringify( | |
174 | data, | |
175 | undefined, | |
176 | 2 | |
177 | )}'` | |
178 | ) | |
179 | ) | |
65d22502 | 180 | } |
ccf11189 | 181 | this.promiseResponseMap.delete(uuid) |
c26984f2 | 182 | } |
66a7748d JB |
183 | }) |
184 | worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) | |
4119f480 | 185 | worker.once('error', error => { |
66a7748d | 186 | this.emitter?.emit(WorkerSetEvents.error, error) |
c81424b8 | 187 | if ( |
66a7748d | 188 | this.workerOptions.poolOptions?.restartWorkerOnError === true && |
c81424b8 JB |
189 | this.started && |
190 | !this.workerStartup | |
191 | ) { | |
66a7748d | 192 | this.addWorkerSetElement() |
29bb4dee | 193 | } |
8ba9c854 | 194 | worker.unref() |
ea32ea05 | 195 | worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error)) |
66a7748d JB |
196 | }) |
197 | worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) | |
198 | worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) | |
199 | worker.once('exit', () => { | |
200 | this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) | |
201 | }) | |
202 | const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 } | |
203 | this.workerSet.add(workerSetElement) | |
204 | this.workerStartup = false | |
205 | return workerSetElement | |
6013bc53 JB |
206 | } |
207 | ||
66a7748d JB |
208 | private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void { |
209 | if (workerSetElement == null) { | |
210 | return | |
211 | } | |
212 | this.workerSet.delete(workerSetElement) | |
dbc29904 JB |
213 | } |
214 | ||
66a7748d JB |
215 | private async getWorkerSetElement (): Promise<WorkerSetElement> { |
216 | let chosenWorkerSetElement: WorkerSetElement | undefined | |
962a8159 | 217 | for (const workerSetElement of this.workerSet) { |
66a7748d | 218 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 219 | if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) { |
66a7748d JB |
220 | chosenWorkerSetElement = workerSetElement |
221 | break | |
962a8159 | 222 | } |
e7aeea18 | 223 | } |
66a7748d JB |
224 | if (chosenWorkerSetElement == null) { |
225 | chosenWorkerSetElement = this.addWorkerSetElement() | |
962a8159 | 226 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 227 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 228 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
229 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
230 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
962a8159 | 231 | } |
66a7748d | 232 | return chosenWorkerSetElement |
c045d9a9 JB |
233 | } |
234 | ||
66a7748d JB |
235 | private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined { |
236 | let workerSetElt: WorkerSetElement | undefined | |
0e7a11e1 | 237 | for (const workerSetElement of this.workerSet) { |
81696bd5 | 238 | if (workerSetElement.worker.threadId === worker.threadId) { |
66a7748d JB |
239 | workerSetElt = workerSetElement |
240 | break | |
1e924543 | 241 | } |
0e7a11e1 | 242 | } |
66a7748d | 243 | return workerSetElt |
1e924543 | 244 | } |
6013bc53 | 245 | } |