Commit | Line | Data |
---|---|---|
a19b897d | 1 | // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved. |
c8eeb62b | 2 | |
66a7748d JB |
3 | import { EventEmitterAsyncResource } from 'node:events' |
4 | import { SHARE_ENV, Worker } from 'node:worker_threads' | |
c045d9a9 | 5 | |
66a7748d JB |
6 | import { WorkerAbstract } from './WorkerAbstract.js' |
7 | import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js' | |
0e4fa348 | 8 | import { |
b779c0f8 | 9 | type SetInfo, |
0e4fa348 | 10 | type WorkerData, |
c26984f2 | 11 | type WorkerMessage, |
0e4fa348 JB |
12 | WorkerMessageEvents, |
13 | type WorkerOptions, | |
14 | type WorkerSetElement, | |
66a7748d JB |
15 | WorkerSetEvents |
16 | } from './WorkerTypes.js' | |
17 | import { randomizeDelay, sleep } from './WorkerUtils.js' | |
6013bc53 | 18 | |
268a74bb | 19 | export class WorkerSet extends WorkerAbstract<WorkerData> { |
66a7748d JB |
20 | public readonly emitter: EventEmitterAsyncResource | undefined |
21 | private readonly workerSet: Set<WorkerSetElement> | |
22 | private started: boolean | |
23 | private workerStartup: boolean | |
6013bc53 JB |
24 | |
25 | /** | |
361c98f5 | 26 | * Creates a new `WorkerSet`. |
6013bc53 | 27 | * |
0e4fa348 JB |
28 | * @param workerScript - |
29 | * @param workerOptions - | |
6013bc53 | 30 | */ |
66a7748d JB |
31 | constructor (workerScript: string, workerOptions: WorkerOptions) { |
32 | super(workerScript, workerOptions) | |
27662cf1 | 33 | if (this.workerOptions.elementsPerWorker == null) { |
66a7748d | 34 | throw new TypeError('Elements per worker is not defined') |
81027aa5 | 35 | } |
f028efca | 36 | if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) { |
66a7748d | 37 | throw new TypeError('Elements per worker must be an integer') |
81027aa5 JB |
38 | } |
39 | if (this.workerOptions.elementsPerWorker <= 0) { | |
66a7748d | 40 | throw new RangeError('Elements per worker must be greater than zero') |
81027aa5 | 41 | } |
66a7748d | 42 | this.workerSet = new Set<WorkerSetElement>() |
a4385edc | 43 | if (this.workerOptions.poolOptions?.enableEvents === true) { |
66a7748d | 44 | this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) |
29bb4dee | 45 | } |
66a7748d JB |
46 | this.started = false |
47 | this.workerStartup = false | |
6013bc53 JB |
48 | } |
49 | ||
66a7748d | 50 | get info (): SetInfo { |
b779c0f8 | 51 | return { |
769d3b10 | 52 | version: workerSetVersion, |
0bde1ea1 JB |
53 | type: 'set', |
54 | worker: 'thread', | |
4f02e9b4 | 55 | started: this.started, |
b779c0f8 | 56 | size: this.size, |
19bdf4ca | 57 | elementsExecuting: [...this.workerSet].reduce( |
b779c0f8 | 58 | (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, |
66a7748d | 59 | 0 |
b779c0f8 | 60 | ), |
66a7748d JB |
61 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
62 | elementsPerWorker: this.maxElementsPerWorker! | |
63 | } | |
b779c0f8 JB |
64 | } |
65 | ||
66a7748d JB |
66 | get size (): number { |
67 | return this.workerSet.size | |
6013bc53 JB |
68 | } |
69 | ||
66a7748d JB |
70 | get maxElementsPerWorker (): number | undefined { |
71 | return this.workerOptions.elementsPerWorker | |
4d7227e6 JB |
72 | } |
73 | ||
b0dee778 | 74 | /** @inheritDoc */ |
66a7748d JB |
75 | public async start (): Promise<void> { |
76 | this.addWorkerSetElement() | |
b0dee778 | 77 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 78 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
ab93b184 | 79 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
80 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
81 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
82 | this.emitter?.emit(WorkerSetEvents.started, this.info) | |
83 | this.started = true | |
b0dee778 JB |
84 | } |
85 | ||
86 | /** @inheritDoc */ | |
66a7748d | 87 | public async stop (): Promise<void> { |
b0dee778 | 88 | for (const workerSetElement of this.workerSet) { |
66a7748d | 89 | const worker = workerSetElement.worker |
a974c8e4 | 90 | const waitWorkerExit = new Promise<void>(resolve => { |
ae3a41a1 | 91 | worker.once('exit', () => { |
66a7748d JB |
92 | resolve() |
93 | }) | |
94 | }) | |
8ba9c854 | 95 | worker.unref() |
66a7748d JB |
96 | await worker.terminate() |
97 | await waitWorkerExit | |
b0dee778 | 98 | } |
4f02e9b4 | 99 | this.emitter?.emit(WorkerSetEvents.stopped, this.info) |
36c1166d | 100 | this.started = false |
4f02e9b4 JB |
101 | this.emitter?.emitDestroy() |
102 | this.emitter?.removeAllListeners() | |
b0dee778 JB |
103 | } |
104 | ||
8baf3f8f | 105 | /** @inheritDoc */ |
66a7748d | 106 | public async addElement (elementData: WorkerData): Promise<void> { |
c81424b8 | 107 | if (!this.started) { |
66a7748d | 108 | throw new Error('Cannot add a WorkerSet element: not started') |
c81424b8 | 109 | } |
66a7748d | 110 | const workerSetElement = await this.getWorkerSetElement() |
962a8159 | 111 | workerSetElement.worker.postMessage({ |
244c1396 | 112 | event: WorkerMessageEvents.addWorkerElement, |
66a7748d JB |
113 | data: elementData |
114 | }) | |
115 | ++workerSetElement.numberOfWorkerElements | |
962a8159 | 116 | // Add element sequentially to optimize memory at startup |
66a7748d | 117 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 118 | if (this.workerOptions.elementStartDelay! > 0) { |
66a7748d JB |
119 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
120 | await sleep(randomizeDelay(this.workerOptions.elementStartDelay!)) | |
d070d967 | 121 | } |
6013bc53 JB |
122 | } |
123 | ||
6013bc53 | 124 | /** |
361c98f5 | 125 | * Adds a new `WorkerSetElement`. |
6013bc53 | 126 | */ |
66a7748d JB |
127 | private addWorkerSetElement (): WorkerSetElement { |
128 | this.workerStartup = true | |
be245fda JB |
129 | const worker = new Worker(this.workerScript, { |
130 | env: SHARE_ENV, | |
66a7748d JB |
131 | ...this.workerOptions.poolOptions?.workerOptions |
132 | }) | |
133 | worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) | |
c26984f2 | 134 | worker.on('message', (message: WorkerMessage<WorkerData>) => { |
244c1396 JB |
135 | if (message.event === WorkerMessageEvents.addedWorkerElement) { |
136 | this.emitter?.emit(WorkerSetEvents.elementAdded, this.info) | |
137 | } else if (message.event === WorkerMessageEvents.workerElementError) { | |
66a7748d | 138 | this.emitter?.emit(WorkerSetEvents.elementError, message.data) |
c26984f2 | 139 | } |
66a7748d JB |
140 | }) |
141 | worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) | |
a974c8e4 | 142 | worker.on('error', error => { |
66a7748d | 143 | this.emitter?.emit(WorkerSetEvents.error, error) |
c81424b8 | 144 | if ( |
66a7748d | 145 | this.workerOptions.poolOptions?.restartWorkerOnError === true && |
c81424b8 JB |
146 | this.started && |
147 | !this.workerStartup | |
148 | ) { | |
66a7748d | 149 | this.addWorkerSetElement() |
29bb4dee | 150 | } |
8ba9c854 JB |
151 | worker.unref() |
152 | worker.terminate().catch(error => this.emitter?.emit(WorkerSetEvents.error, error)) | |
66a7748d JB |
153 | }) |
154 | worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) | |
155 | worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) | |
156 | worker.once('exit', () => { | |
157 | this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) | |
158 | }) | |
159 | const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 } | |
160 | this.workerSet.add(workerSetElement) | |
161 | this.workerStartup = false | |
162 | return workerSetElement | |
6013bc53 JB |
163 | } |
164 | ||
66a7748d JB |
165 | private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void { |
166 | if (workerSetElement == null) { | |
167 | return | |
168 | } | |
169 | this.workerSet.delete(workerSetElement) | |
dbc29904 JB |
170 | } |
171 | ||
66a7748d JB |
172 | private async getWorkerSetElement (): Promise<WorkerSetElement> { |
173 | let chosenWorkerSetElement: WorkerSetElement | undefined | |
962a8159 | 174 | for (const workerSetElement of this.workerSet) { |
66a7748d | 175 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 176 | if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) { |
66a7748d JB |
177 | chosenWorkerSetElement = workerSetElement |
178 | break | |
962a8159 | 179 | } |
e7aeea18 | 180 | } |
66a7748d JB |
181 | if (chosenWorkerSetElement == null) { |
182 | chosenWorkerSetElement = this.addWorkerSetElement() | |
962a8159 | 183 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 184 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 185 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
186 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
187 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
962a8159 | 188 | } |
66a7748d | 189 | return chosenWorkerSetElement |
c045d9a9 JB |
190 | } |
191 | ||
66a7748d JB |
192 | private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined { |
193 | let workerSetElt: WorkerSetElement | undefined | |
0e7a11e1 | 194 | for (const workerSetElement of this.workerSet) { |
81696bd5 | 195 | if (workerSetElement.worker.threadId === worker.threadId) { |
66a7748d JB |
196 | workerSetElt = workerSetElement |
197 | break | |
1e924543 | 198 | } |
0e7a11e1 | 199 | } |
66a7748d | 200 | return workerSetElt |
1e924543 | 201 | } |
6013bc53 | 202 | } |