1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
3 import { randomUUID
} from
'node:crypto'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { SHARE_ENV
, Worker
} from
'node:worker_threads'
7 import { WorkerAbstract
} from
'./WorkerAbstract.js'
8 import { EMPTY_FUNCTION
, workerSetVersion
} from
'./WorkerConstants.js'
15 type WorkerSetElement
,
17 } from
'./WorkerTypes.js'
18 import { randomizeDelay
, sleep
} from
'./WorkerUtils.js'
20 interface ResponseWrapper
<R
extends WorkerData
> {
21 resolve
: (value
: R
| PromiseLike
<R
>) => void
22 reject
: (reason
?: unknown
) => void
23 workerSetElement
: WorkerSetElement
26 export class WorkerSet
<D
extends WorkerData
, R
extends WorkerData
> extends WorkerAbstract
<D
, R
> {
27 public readonly emitter
: EventEmitterAsyncResource
| undefined
28 private readonly workerSet
: Set
<WorkerSetElement
>
29 private readonly promiseResponseMap
: Map
<
30 `${string}-${string}-${string}-${string}`,
34 private started
: boolean
35 private workerStartup
: boolean
38 * Creates a new `WorkerSet`.
40 * @param workerScript -
41 * @param workerOptions -
43 constructor (workerScript
: string, workerOptions
: WorkerOptions
) {
44 super(workerScript
, workerOptions
)
45 if (this.workerOptions
.elementsPerWorker
== null) {
46 throw new TypeError('Elements per worker is not defined')
48 if (!Number.isSafeInteger(this.workerOptions
.elementsPerWorker
)) {
49 throw new TypeError('Elements per worker must be an integer')
51 if (this.workerOptions
.elementsPerWorker
<= 0) {
52 throw new RangeError('Elements per worker must be greater than zero')
54 this.workerSet
= new Set
<WorkerSetElement
>()
55 this.promiseResponseMap
= new Map
<
56 `${string}-${string}-${string}-${string}`,
59 if (this.workerOptions
.poolOptions
?.enableEvents
=== true) {
60 this.emitter
= new EventEmitterAsyncResource({ name
: 'workerset' })
63 this.workerStartup
= false
66 get
info (): SetInfo
{
68 version
: workerSetVersion
,
71 started
: this.started
,
73 elementsExecuting
: [...this.workerSet
].reduce(
74 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
77 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
78 elementsPerWorker
: this.maxElementsPerWorker
!
83 return this.workerSet
.size
86 get
maxElementsPerWorker (): number | undefined {
87 return this.workerOptions
.elementsPerWorker
91 public async start (): Promise
<void> {
92 this.addWorkerSetElement()
93 // Add worker set element sequentially to optimize memory at startup
94 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95 this.workerOptions
.workerStartDelay
! > 0 &&
96 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
97 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)))
98 this.emitter
?.emit(WorkerSetEvents
.started
, this.info
)
103 public async stop (): Promise
<void> {
104 for (const workerSetElement
of this.workerSet
) {
105 const worker
= workerSetElement
.worker
106 const waitWorkerExit
= new Promise
<void>(resolve
=> {
107 worker
.once('exit', () => {
112 await worker
.terminate()
115 this.emitter
?.emit(WorkerSetEvents
.stopped
, this.info
)
117 this.emitter
?.emitDestroy()
121 public async addElement (elementData
: D
): Promise
<R
> {
123 throw new Error('Cannot add a WorkerSet element: not started')
125 const workerSetElement
= await this.getWorkerSetElement()
126 const sendMessageToWorker
= new Promise
<R
>((resolve
, reject
) => {
129 event
: WorkerMessageEvents
.addWorkerElement
,
131 } satisfies WorkerMessage
<D
>
132 workerSetElement
.worker
.postMessage(message
)
133 this.promiseResponseMap
.set(message
.uuid
, {
139 const response
= await sendMessageToWorker
140 // Add element sequentially to optimize memory at startup
141 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
142 if (this.workerOptions
.elementAddDelay
! > 0) {
143 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
144 await sleep(randomizeDelay(this.workerOptions
.elementAddDelay
!))
150 * Adds a new `WorkerSetElement`.
152 private addWorkerSetElement (): WorkerSetElement
{
153 this.workerStartup
= true
154 const worker
= new Worker(this.workerScript
, {
156 ...this.workerOptions
.poolOptions
?.workerOptions
158 worker
.on('message', this.workerOptions
.poolOptions
?.messageHandler
?? EMPTY_FUNCTION
)
159 worker
.on('message', (message
: WorkerMessage
<R
>) => {
160 const { uuid
, event
, data
} = message
161 if (this.promiseResponseMap
.has(uuid
)) {
162 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
163 const { resolve
, reject
, workerSetElement
} = this.promiseResponseMap
.get(uuid
)!
165 case WorkerMessageEvents
.addedWorkerElement
:
166 this.emitter
?.emit(WorkerSetEvents
.elementAdded
, this.info
)
167 ++workerSetElement
.numberOfWorkerElements
170 case WorkerMessageEvents
.workerElementError
:
171 this.emitter
?.emit(WorkerSetEvents
.elementError
, data
)
177 `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
185 this.promiseResponseMap
.delete(uuid
)
188 worker
.on('error', this.workerOptions
.poolOptions
?.errorHandler
?? EMPTY_FUNCTION
)
189 worker
.once('error', error
=> {
190 this.emitter
?.emit(WorkerSetEvents
.error
, error
)
192 this.workerOptions
.poolOptions
?.restartWorkerOnError
=== true &&
196 this.addWorkerSetElement()
199 worker
.terminate().catch((error
: unknown
) => this.emitter
?.emit(WorkerSetEvents
.error
, error
))
201 worker
.on('online', this.workerOptions
.poolOptions
?.onlineHandler
?? EMPTY_FUNCTION
)
202 worker
.on('exit', this.workerOptions
.poolOptions
?.exitHandler
?? EMPTY_FUNCTION
)
203 worker
.once('exit', () => {
204 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
))
206 const workerSetElement
: WorkerSetElement
= {
208 numberOfWorkerElements
: 0
210 this.workerSet
.add(workerSetElement
)
211 this.workerStartup
= false
212 return workerSetElement
215 private removeWorkerSetElement (workerSetElement
: WorkerSetElement
| undefined): void {
216 if (workerSetElement
== null) {
219 this.workerSet
.delete(workerSetElement
)
222 private async getWorkerSetElement (): Promise
<WorkerSetElement
> {
223 let chosenWorkerSetElement
: WorkerSetElement
| undefined
224 for (const workerSetElement
of this.workerSet
) {
225 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
226 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
227 chosenWorkerSetElement
= workerSetElement
231 if (chosenWorkerSetElement
== null) {
232 chosenWorkerSetElement
= this.addWorkerSetElement()
233 // Add worker set element sequentially to optimize memory at startup
234 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
235 this.workerOptions
.workerStartDelay
! > 0 &&
236 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
237 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)))
239 return chosenWorkerSetElement
242 private getWorkerSetElementByWorker (worker
: Worker
): WorkerSetElement
| undefined {
243 let workerSetElt
: WorkerSetElement
| undefined
244 for (const workerSetElement
of this.workerSet
) {
245 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
246 workerSetElt
= workerSetElement