Commit | Line | Data |
---|---|---|
a19b897d | 1 | // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved. |
c8eeb62b | 2 | |
66a7748d JB |
3 | import { EventEmitterAsyncResource } from 'node:events' |
4 | import { SHARE_ENV, Worker } from 'node:worker_threads' | |
c045d9a9 | 5 | |
66a7748d JB |
6 | import { WorkerAbstract } from './WorkerAbstract.js' |
7 | import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js' | |
0e4fa348 | 8 | import { |
b779c0f8 | 9 | type SetInfo, |
0e4fa348 | 10 | type WorkerData, |
c26984f2 | 11 | type WorkerMessage, |
0e4fa348 JB |
12 | WorkerMessageEvents, |
13 | type WorkerOptions, | |
14 | type WorkerSetElement, | |
66a7748d JB |
15 | WorkerSetEvents |
16 | } from './WorkerTypes.js' | |
17 | import { randomizeDelay, sleep } from './WorkerUtils.js' | |
6013bc53 | 18 | |
268a74bb | 19 | export class WorkerSet extends WorkerAbstract<WorkerData> { |
66a7748d JB |
20 | public readonly emitter: EventEmitterAsyncResource | undefined |
21 | private readonly workerSet: Set<WorkerSetElement> | |
22 | private started: boolean | |
23 | private workerStartup: boolean | |
6013bc53 JB |
24 | |
25 | /** | |
361c98f5 | 26 | * Creates a new `WorkerSet`. |
6013bc53 | 27 | * |
0e4fa348 JB |
28 | * @param workerScript - |
29 | * @param workerOptions - | |
6013bc53 | 30 | */ |
66a7748d JB |
31 | constructor (workerScript: string, workerOptions: WorkerOptions) { |
32 | super(workerScript, workerOptions) | |
27662cf1 | 33 | if (this.workerOptions.elementsPerWorker == null) { |
66a7748d | 34 | throw new TypeError('Elements per worker is not defined') |
81027aa5 | 35 | } |
f028efca | 36 | if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) { |
66a7748d | 37 | throw new TypeError('Elements per worker must be an integer') |
81027aa5 | 38 | } |
195c627f JB |
39 | if ( |
40 | typeof this.workerOptions.elementsPerWorker === 'number' && | |
41 | this.workerOptions.elementsPerWorker <= 0 | |
42 | ) { | |
66a7748d | 43 | throw new RangeError('Elements per worker must be greater than zero') |
81027aa5 | 44 | } |
66a7748d | 45 | this.workerSet = new Set<WorkerSetElement>() |
a4385edc | 46 | if (this.workerOptions.poolOptions?.enableEvents === true) { |
66a7748d | 47 | this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) |
29bb4dee | 48 | } |
66a7748d JB |
49 | this.started = false |
50 | this.workerStartup = false | |
6013bc53 JB |
51 | } |
52 | ||
66a7748d | 53 | get info (): SetInfo { |
b779c0f8 | 54 | return { |
769d3b10 | 55 | version: workerSetVersion, |
0bde1ea1 JB |
56 | type: 'set', |
57 | worker: 'thread', | |
4f02e9b4 | 58 | started: this.started, |
b779c0f8 | 59 | size: this.size, |
19bdf4ca | 60 | elementsExecuting: [...this.workerSet].reduce( |
b779c0f8 | 61 | (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, |
66a7748d | 62 | 0 |
b779c0f8 | 63 | ), |
66a7748d JB |
64 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
65 | elementsPerWorker: this.maxElementsPerWorker! | |
66 | } | |
b779c0f8 JB |
67 | } |
68 | ||
66a7748d JB |
69 | get size (): number { |
70 | return this.workerSet.size | |
6013bc53 JB |
71 | } |
72 | ||
66a7748d JB |
73 | get maxElementsPerWorker (): number | undefined { |
74 | return this.workerOptions.elementsPerWorker | |
4d7227e6 JB |
75 | } |
76 | ||
b0dee778 | 77 | /** @inheritDoc */ |
66a7748d JB |
78 | public async start (): Promise<void> { |
79 | this.addWorkerSetElement() | |
b0dee778 | 80 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 81 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
ab93b184 | 82 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
83 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
84 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
85 | this.emitter?.emit(WorkerSetEvents.started, this.info) | |
86 | this.started = true | |
b0dee778 JB |
87 | } |
88 | ||
89 | /** @inheritDoc */ | |
66a7748d | 90 | public async stop (): Promise<void> { |
b0dee778 | 91 | for (const workerSetElement of this.workerSet) { |
66a7748d | 92 | const worker = workerSetElement.worker |
a974c8e4 | 93 | const waitWorkerExit = new Promise<void>(resolve => { |
ae3a41a1 | 94 | worker.once('exit', () => { |
66a7748d JB |
95 | resolve() |
96 | }) | |
97 | }) | |
8ba9c854 | 98 | worker.unref() |
66a7748d JB |
99 | await worker.terminate() |
100 | await waitWorkerExit | |
b0dee778 | 101 | } |
4f02e9b4 | 102 | this.emitter?.emit(WorkerSetEvents.stopped, this.info) |
36c1166d | 103 | this.started = false |
4f02e9b4 | 104 | this.emitter?.emitDestroy() |
b0dee778 JB |
105 | } |
106 | ||
8baf3f8f | 107 | /** @inheritDoc */ |
66a7748d | 108 | public async addElement (elementData: WorkerData): Promise<void> { |
c81424b8 | 109 | if (!this.started) { |
66a7748d | 110 | throw new Error('Cannot add a WorkerSet element: not started') |
c81424b8 | 111 | } |
66a7748d | 112 | const workerSetElement = await this.getWorkerSetElement() |
962a8159 | 113 | workerSetElement.worker.postMessage({ |
244c1396 | 114 | event: WorkerMessageEvents.addWorkerElement, |
66a7748d JB |
115 | data: elementData |
116 | }) | |
117 | ++workerSetElement.numberOfWorkerElements | |
962a8159 | 118 | // Add element sequentially to optimize memory at startup |
66a7748d | 119 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
da47bc29 | 120 | if (this.workerOptions.elementAddDelay! > 0) { |
66a7748d | 121 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
da47bc29 | 122 | await sleep(randomizeDelay(this.workerOptions.elementAddDelay!)) |
d070d967 | 123 | } |
6013bc53 JB |
124 | } |
125 | ||
6013bc53 | 126 | /** |
361c98f5 | 127 | * Adds a new `WorkerSetElement`. |
6013bc53 | 128 | */ |
66a7748d JB |
129 | private addWorkerSetElement (): WorkerSetElement { |
130 | this.workerStartup = true | |
be245fda JB |
131 | const worker = new Worker(this.workerScript, { |
132 | env: SHARE_ENV, | |
66a7748d JB |
133 | ...this.workerOptions.poolOptions?.workerOptions |
134 | }) | |
135 | worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) | |
c26984f2 | 136 | worker.on('message', (message: WorkerMessage<WorkerData>) => { |
244c1396 JB |
137 | if (message.event === WorkerMessageEvents.addedWorkerElement) { |
138 | this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) | |
139 | } else if (message.event === WorkerMessageEvents.workerElementError) { | |
66a7748d | 140 | this.emitter?.emit(WorkerSetEvents.elementError, message.data) |
c26984f2 | 141 | } |
66a7748d JB |
142 | }) |
143 | worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) | |
a974c8e4 | 144 | worker.on('error', error => { |
66a7748d | 145 | this.emitter?.emit(WorkerSetEvents.error, error) |
c81424b8 | 146 | if ( |
66a7748d | 147 | this.workerOptions.poolOptions?.restartWorkerOnError === true && |
c81424b8 JB |
148 | this.started && |
149 | !this.workerStartup | |
150 | ) { | |
66a7748d | 151 | this.addWorkerSetElement() |
29bb4dee | 152 | } |
8ba9c854 | 153 | worker.unref() |
ea32ea05 | 154 | worker.terminate().catch((error: unknown) => this.emitter?.emit(WorkerSetEvents.error, error)) |
66a7748d JB |
155 | }) |
156 | worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) | |
157 | worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) | |
158 | worker.once('exit', () => { | |
159 | this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) | |
160 | }) | |
161 | const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 } | |
162 | this.workerSet.add(workerSetElement) | |
163 | this.workerStartup = false | |
164 | return workerSetElement | |
6013bc53 JB |
165 | } |
166 | ||
66a7748d JB |
167 | private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void { |
168 | if (workerSetElement == null) { | |
169 | return | |
170 | } | |
171 | this.workerSet.delete(workerSetElement) | |
dbc29904 JB |
172 | } |
173 | ||
66a7748d JB |
174 | private async getWorkerSetElement (): Promise<WorkerSetElement> { |
175 | let chosenWorkerSetElement: WorkerSetElement | undefined | |
962a8159 | 176 | for (const workerSetElement of this.workerSet) { |
66a7748d | 177 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 178 | if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) { |
66a7748d JB |
179 | chosenWorkerSetElement = workerSetElement |
180 | break | |
962a8159 | 181 | } |
e7aeea18 | 182 | } |
66a7748d JB |
183 | if (chosenWorkerSetElement == null) { |
184 | chosenWorkerSetElement = this.addWorkerSetElement() | |
962a8159 | 185 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 186 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 187 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
188 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
189 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
962a8159 | 190 | } |
66a7748d | 191 | return chosenWorkerSetElement |
c045d9a9 JB |
192 | } |
193 | ||
66a7748d JB |
194 | private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined { |
195 | let workerSetElt: WorkerSetElement | undefined | |
0e7a11e1 | 196 | for (const workerSetElement of this.workerSet) { |
81696bd5 | 197 | if (workerSetElement.worker.threadId === worker.threadId) { |
66a7748d JB |
198 | workerSetElt = workerSetElement |
199 | break | |
1e924543 | 200 | } |
0e7a11e1 | 201 | } |
66a7748d | 202 | return workerSetElt |
1e924543 | 203 | } |
6013bc53 | 204 | } |