Commit | Line | Data |
---|---|---|
a19b897d | 1 | // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved. |
c8eeb62b | 2 | |
66a7748d JB |
3 | import { EventEmitterAsyncResource } from 'node:events' |
4 | import { SHARE_ENV, Worker } from 'node:worker_threads' | |
c045d9a9 | 5 | |
66a7748d JB |
6 | import { WorkerAbstract } from './WorkerAbstract.js' |
7 | import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js' | |
0e4fa348 | 8 | import { |
b779c0f8 | 9 | type SetInfo, |
0e4fa348 | 10 | type WorkerData, |
c26984f2 | 11 | type WorkerMessage, |
0e4fa348 JB |
12 | WorkerMessageEvents, |
13 | type WorkerOptions, | |
14 | type WorkerSetElement, | |
66a7748d JB |
15 | WorkerSetEvents |
16 | } from './WorkerTypes.js' | |
17 | import { randomizeDelay, sleep } from './WorkerUtils.js' | |
6013bc53 | 18 | |
268a74bb | 19 | export class WorkerSet extends WorkerAbstract<WorkerData> { |
66a7748d JB |
20 | public readonly emitter: EventEmitterAsyncResource | undefined |
21 | private readonly workerSet: Set<WorkerSetElement> | |
22 | private started: boolean | |
23 | private workerStartup: boolean | |
6013bc53 JB |
24 | |
25 | /** | |
361c98f5 | 26 | * Creates a new `WorkerSet`. |
6013bc53 | 27 | * |
0e4fa348 JB |
28 | * @param workerScript - |
29 | * @param workerOptions - | |
6013bc53 | 30 | */ |
66a7748d JB |
31 | constructor (workerScript: string, workerOptions: WorkerOptions) { |
32 | super(workerScript, workerOptions) | |
27662cf1 | 33 | if (this.workerOptions.elementsPerWorker == null) { |
66a7748d | 34 | throw new TypeError('Elements per worker is not defined') |
81027aa5 | 35 | } |
f028efca | 36 | if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) { |
66a7748d | 37 | throw new TypeError('Elements per worker must be an integer') |
81027aa5 JB |
38 | } |
39 | if (this.workerOptions.elementsPerWorker <= 0) { | |
66a7748d | 40 | throw new RangeError('Elements per worker must be greater than zero') |
81027aa5 | 41 | } |
66a7748d | 42 | this.workerSet = new Set<WorkerSetElement>() |
a4385edc | 43 | if (this.workerOptions.poolOptions?.enableEvents === true) { |
66a7748d | 44 | this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) |
29bb4dee | 45 | } |
66a7748d JB |
46 | this.started = false |
47 | this.workerStartup = false | |
6013bc53 JB |
48 | } |
49 | ||
66a7748d | 50 | get info (): SetInfo { |
b779c0f8 | 51 | return { |
769d3b10 | 52 | version: workerSetVersion, |
0bde1ea1 JB |
53 | type: 'set', |
54 | worker: 'thread', | |
4f02e9b4 | 55 | started: this.started, |
b779c0f8 | 56 | size: this.size, |
19bdf4ca | 57 | elementsExecuting: [...this.workerSet].reduce( |
b779c0f8 | 58 | (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, |
66a7748d | 59 | 0 |
b779c0f8 | 60 | ), |
66a7748d JB |
61 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
62 | elementsPerWorker: this.maxElementsPerWorker! | |
63 | } | |
b779c0f8 JB |
64 | } |
65 | ||
66a7748d JB |
66 | get size (): number { |
67 | return this.workerSet.size | |
6013bc53 JB |
68 | } |
69 | ||
66a7748d JB |
70 | get maxElementsPerWorker (): number | undefined { |
71 | return this.workerOptions.elementsPerWorker | |
4d7227e6 JB |
72 | } |
73 | ||
b0dee778 | 74 | /** @inheritDoc */ |
66a7748d JB |
75 | public async start (): Promise<void> { |
76 | this.addWorkerSetElement() | |
b0dee778 | 77 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 78 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
ab93b184 | 79 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
80 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
81 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
82 | this.emitter?.emit(WorkerSetEvents.started, this.info) | |
83 | this.started = true | |
b0dee778 JB |
84 | } |
85 | ||
86 | /** @inheritDoc */ | |
66a7748d | 87 | public async stop (): Promise<void> { |
b0dee778 | 88 | for (const workerSetElement of this.workerSet) { |
66a7748d | 89 | const worker = workerSetElement.worker |
a974c8e4 | 90 | const waitWorkerExit = new Promise<void>(resolve => { |
ae3a41a1 | 91 | worker.once('exit', () => { |
66a7748d JB |
92 | resolve() |
93 | }) | |
94 | }) | |
95 | await worker.terminate() | |
96 | await waitWorkerExit | |
b0dee778 | 97 | } |
4f02e9b4 JB |
98 | this.emitter?.emit(WorkerSetEvents.stopped, this.info) |
99 | this.emitter?.emitDestroy() | |
100 | this.emitter?.removeAllListeners() | |
101 | this.started = false | |
b0dee778 JB |
102 | } |
103 | ||
8baf3f8f | 104 | /** @inheritDoc */ |
66a7748d | 105 | public async addElement (elementData: WorkerData): Promise<void> { |
c81424b8 | 106 | if (!this.started) { |
66a7748d | 107 | throw new Error('Cannot add a WorkerSet element: not started') |
c81424b8 | 108 | } |
66a7748d | 109 | const workerSetElement = await this.getWorkerSetElement() |
962a8159 | 110 | workerSetElement.worker.postMessage({ |
244c1396 | 111 | event: WorkerMessageEvents.addWorkerElement, |
66a7748d JB |
112 | data: elementData |
113 | }) | |
114 | ++workerSetElement.numberOfWorkerElements | |
962a8159 | 115 | // Add element sequentially to optimize memory at startup |
66a7748d | 116 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 117 | if (this.workerOptions.elementStartDelay! > 0) { |
66a7748d JB |
118 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
119 | await sleep(randomizeDelay(this.workerOptions.elementStartDelay!)) | |
d070d967 | 120 | } |
6013bc53 JB |
121 | } |
122 | ||
6013bc53 | 123 | /** |
361c98f5 | 124 | * Adds a new `WorkerSetElement`. |
6013bc53 | 125 | */ |
66a7748d JB |
126 | private addWorkerSetElement (): WorkerSetElement { |
127 | this.workerStartup = true | |
be245fda JB |
128 | const worker = new Worker(this.workerScript, { |
129 | env: SHARE_ENV, | |
66a7748d JB |
130 | ...this.workerOptions.poolOptions?.workerOptions |
131 | }) | |
132 | worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) | |
c26984f2 | 133 | worker.on('message', (message: WorkerMessage<WorkerData>) => { |
244c1396 JB |
134 | if (message.event === WorkerMessageEvents.addedWorkerElement) { |
135 | this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) | |
136 | } else if (message.event === WorkerMessageEvents.workerElementError) { | |
66a7748d | 137 | this.emitter?.emit(WorkerSetEvents.elementError, message.data) |
c26984f2 | 138 | } |
66a7748d JB |
139 | }) |
140 | worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) | |
a974c8e4 | 141 | worker.on('error', error => { |
66a7748d | 142 | this.emitter?.emit(WorkerSetEvents.error, error) |
c81424b8 | 143 | if ( |
66a7748d | 144 | this.workerOptions.poolOptions?.restartWorkerOnError === true && |
c81424b8 JB |
145 | this.started && |
146 | !this.workerStartup | |
147 | ) { | |
66a7748d | 148 | this.addWorkerSetElement() |
29bb4dee | 149 | } |
66a7748d JB |
150 | }) |
151 | worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) | |
152 | worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) | |
153 | worker.once('exit', () => { | |
154 | this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) | |
155 | }) | |
156 | const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 } | |
157 | this.workerSet.add(workerSetElement) | |
158 | this.workerStartup = false | |
159 | return workerSetElement | |
6013bc53 JB |
160 | } |
161 | ||
66a7748d JB |
162 | private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void { |
163 | if (workerSetElement == null) { | |
164 | return | |
165 | } | |
166 | this.workerSet.delete(workerSetElement) | |
dbc29904 JB |
167 | } |
168 | ||
66a7748d JB |
169 | private async getWorkerSetElement (): Promise<WorkerSetElement> { |
170 | let chosenWorkerSetElement: WorkerSetElement | undefined | |
962a8159 | 171 | for (const workerSetElement of this.workerSet) { |
66a7748d | 172 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 173 | if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) { |
66a7748d JB |
174 | chosenWorkerSetElement = workerSetElement |
175 | break | |
962a8159 | 176 | } |
e7aeea18 | 177 | } |
66a7748d JB |
178 | if (chosenWorkerSetElement == null) { |
179 | chosenWorkerSetElement = this.addWorkerSetElement() | |
962a8159 | 180 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 181 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 182 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
183 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
184 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
962a8159 | 185 | } |
66a7748d | 186 | return chosenWorkerSetElement |
c045d9a9 JB |
187 | } |
188 | ||
66a7748d JB |
189 | private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined { |
190 | let workerSetElt: WorkerSetElement | undefined | |
0e7a11e1 | 191 | for (const workerSetElement of this.workerSet) { |
81696bd5 | 192 | if (workerSetElement.worker.threadId === worker.threadId) { |
66a7748d JB |
193 | workerSetElt = workerSetElement |
194 | break | |
1e924543 | 195 | } |
0e7a11e1 | 196 | } |
66a7748d | 197 | return workerSetElt |
1e924543 | 198 | } |
6013bc53 | 199 | } |