Commit | Line | Data |
---|---|---|
edd13439 | 1 | // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved. |
c8eeb62b | 2 | |
66a7748d JB |
3 | import { EventEmitterAsyncResource } from 'node:events' |
4 | import { SHARE_ENV, Worker } from 'node:worker_threads' | |
c045d9a9 | 5 | |
66a7748d JB |
6 | import { WorkerAbstract } from './WorkerAbstract.js' |
7 | import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants.js' | |
0e4fa348 | 8 | import { |
b779c0f8 | 9 | type SetInfo, |
0e4fa348 | 10 | type WorkerData, |
c26984f2 | 11 | type WorkerMessage, |
0e4fa348 JB |
12 | WorkerMessageEvents, |
13 | type WorkerOptions, | |
14 | type WorkerSetElement, | |
66a7748d JB |
15 | WorkerSetEvents |
16 | } from './WorkerTypes.js' | |
17 | import { randomizeDelay, sleep } from './WorkerUtils.js' | |
6013bc53 | 18 | |
268a74bb | 19 | export class WorkerSet extends WorkerAbstract<WorkerData> { |
66a7748d JB |
20 | public readonly emitter: EventEmitterAsyncResource | undefined |
21 | private readonly workerSet: Set<WorkerSetElement> | |
22 | private started: boolean | |
23 | private workerStartup: boolean | |
6013bc53 JB |
24 | |
25 | /** | |
361c98f5 | 26 | * Creates a new `WorkerSet`. |
6013bc53 | 27 | * |
0e4fa348 JB |
28 | * @param workerScript - |
29 | * @param workerOptions - | |
6013bc53 | 30 | */ |
66a7748d JB |
31 | constructor (workerScript: string, workerOptions: WorkerOptions) { |
32 | super(workerScript, workerOptions) | |
27662cf1 | 33 | if (this.workerOptions.elementsPerWorker == null) { |
66a7748d | 34 | throw new TypeError('Elements per worker is not defined') |
81027aa5 | 35 | } |
f028efca | 36 | if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) { |
66a7748d | 37 | throw new TypeError('Elements per worker must be an integer') |
81027aa5 JB |
38 | } |
39 | if (this.workerOptions.elementsPerWorker <= 0) { | |
66a7748d | 40 | throw new RangeError('Elements per worker must be greater than zero') |
81027aa5 | 41 | } |
66a7748d | 42 | this.workerSet = new Set<WorkerSetElement>() |
a4385edc | 43 | if (this.workerOptions.poolOptions?.enableEvents === true) { |
66a7748d | 44 | this.emitter = new EventEmitterAsyncResource({ name: 'workerset' }) |
29bb4dee | 45 | } |
66a7748d JB |
46 | this.started = false |
47 | this.workerStartup = false | |
6013bc53 JB |
48 | } |
49 | ||
66a7748d | 50 | get info (): SetInfo { |
b779c0f8 | 51 | return { |
769d3b10 | 52 | version: workerSetVersion, |
0bde1ea1 JB |
53 | type: 'set', |
54 | worker: 'thread', | |
b779c0f8 | 55 | size: this.size, |
19bdf4ca | 56 | elementsExecuting: [...this.workerSet].reduce( |
b779c0f8 | 57 | (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, |
66a7748d | 58 | 0 |
b779c0f8 | 59 | ), |
66a7748d JB |
60 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
61 | elementsPerWorker: this.maxElementsPerWorker! | |
62 | } | |
b779c0f8 JB |
63 | } |
64 | ||
66a7748d JB |
65 | get size (): number { |
66 | return this.workerSet.size | |
6013bc53 JB |
67 | } |
68 | ||
66a7748d JB |
69 | get maxElementsPerWorker (): number | undefined { |
70 | return this.workerOptions.elementsPerWorker | |
4d7227e6 JB |
71 | } |
72 | ||
b0dee778 | 73 | /** @inheritDoc */ |
66a7748d JB |
74 | public async start (): Promise<void> { |
75 | this.addWorkerSetElement() | |
b0dee778 | 76 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 77 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
ab93b184 | 78 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
79 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
80 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
81 | this.emitter?.emit(WorkerSetEvents.started, this.info) | |
82 | this.started = true | |
b0dee778 JB |
83 | } |
84 | ||
85 | /** @inheritDoc */ | |
66a7748d | 86 | public async stop (): Promise<void> { |
b0dee778 | 87 | for (const workerSetElement of this.workerSet) { |
66a7748d | 88 | const worker = workerSetElement.worker |
bd62e88f | 89 | const waitWorkerExit = new Promise<void>((resolve) => { |
ae3a41a1 | 90 | worker.once('exit', () => { |
66a7748d JB |
91 | resolve() |
92 | }) | |
93 | }) | |
94 | await worker.terminate() | |
95 | await waitWorkerExit | |
96 | this.emitter?.emit(WorkerSetEvents.stopped, this.info) | |
97 | this.emitter?.emitDestroy() | |
98 | this.emitter?.removeAllListeners() | |
99 | this.started = false | |
b0dee778 | 100 | } |
b0dee778 JB |
101 | } |
102 | ||
8baf3f8f | 103 | /** @inheritDoc */ |
66a7748d | 104 | public async addElement (elementData: WorkerData): Promise<void> { |
c81424b8 | 105 | if (!this.started) { |
66a7748d | 106 | throw new Error('Cannot add a WorkerSet element: not started') |
c81424b8 | 107 | } |
66a7748d | 108 | const workerSetElement = await this.getWorkerSetElement() |
962a8159 | 109 | workerSetElement.worker.postMessage({ |
2bb7a73e | 110 | event: WorkerMessageEvents.startWorkerElement, |
66a7748d JB |
111 | data: elementData |
112 | }) | |
113 | ++workerSetElement.numberOfWorkerElements | |
962a8159 | 114 | // Add element sequentially to optimize memory at startup |
66a7748d | 115 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 116 | if (this.workerOptions.elementStartDelay! > 0) { |
66a7748d JB |
117 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
118 | await sleep(randomizeDelay(this.workerOptions.elementStartDelay!)) | |
d070d967 | 119 | } |
6013bc53 JB |
120 | } |
121 | ||
6013bc53 | 122 | /** |
361c98f5 | 123 | * Adds a new `WorkerSetElement`. |
6013bc53 | 124 | */ |
66a7748d JB |
125 | private addWorkerSetElement (): WorkerSetElement { |
126 | this.workerStartup = true | |
be245fda JB |
127 | const worker = new Worker(this.workerScript, { |
128 | env: SHARE_ENV, | |
66a7748d JB |
129 | ...this.workerOptions.poolOptions?.workerOptions |
130 | }) | |
131 | worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION) | |
c26984f2 JB |
132 | worker.on('message', (message: WorkerMessage<WorkerData>) => { |
133 | if (message.event === WorkerMessageEvents.startedWorkerElement) { | |
66a7748d | 134 | this.emitter?.emit(WorkerSetEvents.elementStarted, this.info) |
c26984f2 | 135 | } else if (message.event === WorkerMessageEvents.startWorkerElementError) { |
66a7748d | 136 | this.emitter?.emit(WorkerSetEvents.elementError, message.data) |
c26984f2 | 137 | } |
66a7748d JB |
138 | }) |
139 | worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) | |
962a8159 | 140 | worker.on('error', (error) => { |
66a7748d | 141 | this.emitter?.emit(WorkerSetEvents.error, error) |
c81424b8 | 142 | if ( |
66a7748d | 143 | this.workerOptions.poolOptions?.restartWorkerOnError === true && |
c81424b8 JB |
144 | this.started && |
145 | !this.workerStartup | |
146 | ) { | |
66a7748d | 147 | this.addWorkerSetElement() |
29bb4dee | 148 | } |
66a7748d JB |
149 | }) |
150 | worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) | |
151 | worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) | |
152 | worker.once('exit', () => { | |
153 | this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) | |
154 | }) | |
155 | const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 } | |
156 | this.workerSet.add(workerSetElement) | |
157 | this.workerStartup = false | |
158 | return workerSetElement | |
6013bc53 JB |
159 | } |
160 | ||
66a7748d JB |
161 | private removeWorkerSetElement (workerSetElement: WorkerSetElement | undefined): void { |
162 | if (workerSetElement == null) { | |
163 | return | |
164 | } | |
165 | this.workerSet.delete(workerSetElement) | |
dbc29904 JB |
166 | } |
167 | ||
66a7748d JB |
168 | private async getWorkerSetElement (): Promise<WorkerSetElement> { |
169 | let chosenWorkerSetElement: WorkerSetElement | undefined | |
962a8159 | 170 | for (const workerSetElement of this.workerSet) { |
66a7748d | 171 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 172 | if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker!) { |
66a7748d JB |
173 | chosenWorkerSetElement = workerSetElement |
174 | break | |
962a8159 | 175 | } |
e7aeea18 | 176 | } |
66a7748d JB |
177 | if (chosenWorkerSetElement == null) { |
178 | chosenWorkerSetElement = this.addWorkerSetElement() | |
962a8159 | 179 | // Add worker set element sequentially to optimize memory at startup |
66a7748d | 180 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
e1d9a0f4 | 181 | this.workerOptions.workerStartDelay! > 0 && |
66a7748d JB |
182 | // eslint-disable-next-line @typescript-eslint/no-non-null-assertion |
183 | (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) | |
962a8159 | 184 | } |
66a7748d | 185 | return chosenWorkerSetElement |
c045d9a9 JB |
186 | } |
187 | ||
66a7748d JB |
188 | private getWorkerSetElementByWorker (worker: Worker): WorkerSetElement | undefined { |
189 | let workerSetElt: WorkerSetElement | undefined | |
0e7a11e1 | 190 | for (const workerSetElement of this.workerSet) { |
81696bd5 | 191 | if (workerSetElement.worker.threadId === worker.threadId) { |
66a7748d JB |
192 | workerSetElt = workerSetElement |
193 | break | |
1e924543 | 194 | } |
0e7a11e1 | 195 | } |
66a7748d | 196 | return workerSetElt |
1e924543 | 197 | } |
6013bc53 | 198 | } |