1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
3 import { randomUUID
} from
'node:crypto'
4 import { EventEmitterAsyncResource
} from
'node:events'
5 import { SHARE_ENV
, Worker
} from
'node:worker_threads'
7 import { WorkerAbstract
} from
'./WorkerAbstract.js'
8 import { EMPTY_FUNCTION
, workerSetVersion
} from
'./WorkerConstants.js'
15 type WorkerSetElement
,
17 } from
'./WorkerTypes.js'
18 import { randomizeDelay
, sleep
} from
'./WorkerUtils.js'
20 interface ResponseWrapper
<R
extends WorkerData
> {
21 reject
: (reason
?: unknown
) => void
22 resolve
: (value
: PromiseLike
<R
> | R
) => void
23 workerSetElement
: WorkerSetElement
26 export class WorkerSet
<D
extends WorkerData
, R
extends WorkerData
> extends WorkerAbstract
<D
, R
> {
27 public readonly emitter
: EventEmitterAsyncResource
| undefined
29 get
info (): SetInfo
{
31 elementsExecuting
: [...this.workerSet
].reduce(
32 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
35 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
36 elementsPerWorker
: this.maxElementsPerWorker
!,
38 started
: this.started
,
40 version
: workerSetVersion
,
45 get
maxElementsPerWorker (): number | undefined {
46 return this.workerOptions
.elementsPerWorker
50 return this.workerSet
.size
53 private readonly promiseResponseMap
: Map
<
54 `${string}-${string}-${string}-${string}`,
58 private started
: boolean
59 private readonly workerSet
: Set
<WorkerSetElement
>
60 private workerStartup
: boolean
63 * Creates a new `WorkerSet`.
64 * @param workerScript -
65 * @param workerOptions -
67 constructor (workerScript
: string, workerOptions
: WorkerOptions
) {
68 super(workerScript
, workerOptions
)
69 if (this.workerOptions
.elementsPerWorker
== null) {
70 throw new TypeError('Elements per worker is not defined')
72 if (!Number.isSafeInteger(this.workerOptions
.elementsPerWorker
)) {
73 throw new TypeError('Elements per worker must be an integer')
75 if (this.workerOptions
.elementsPerWorker
<= 0) {
76 throw new RangeError('Elements per worker must be greater than zero')
78 this.workerSet
= new Set
<WorkerSetElement
>()
79 this.promiseResponseMap
= new Map
<
80 `${string}-${string}-${string}-${string}`,
83 if (this.workerOptions
.poolOptions
?.enableEvents
=== true) {
84 this.emitter
= new EventEmitterAsyncResource({ name
: 'workerset' })
87 this.workerStartup
= false
91 public async addElement (elementData
: D
): Promise
<R
> {
93 throw new Error('Cannot add a WorkerSet element: not started')
95 const workerSetElement
= await this.getWorkerSetElement()
96 const sendMessageToWorker
= new Promise
<R
>((resolve
, reject
) => {
99 event
: WorkerMessageEvents
.addWorkerElement
,
101 } satisfies WorkerMessage
<D
>
102 workerSetElement
.worker
.postMessage(message
)
103 this.promiseResponseMap
.set(message
.uuid
, {
109 const response
= await sendMessageToWorker
110 // Add element sequentially to optimize memory at startup
111 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
112 if (this.workerOptions
.elementAddDelay
! > 0) {
113 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
114 await sleep(randomizeDelay(this.workerOptions
.elementAddDelay
!))
120 public async start (): Promise
<void> {
121 this.addWorkerSetElement()
122 // Add worker set element sequentially to optimize memory at startup
123 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
124 this.workerOptions
.workerStartDelay
! > 0 &&
125 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
126 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)))
127 this.emitter
?.emit(WorkerSetEvents
.started
, this.info
)
132 public async stop (): Promise
<void> {
133 for (const workerSetElement
of this.workerSet
) {
134 const worker
= workerSetElement
.worker
135 const waitWorkerExit
= new Promise
<void>(resolve
=> {
136 worker
.once('exit', () => {
141 await worker
.terminate()
144 this.emitter
?.emit(WorkerSetEvents
.stopped
, this.info
)
146 this.emitter
?.emitDestroy()
150 * Adds a new `WorkerSetElement`.
151 * @returns The new `WorkerSetElement`.
153 private addWorkerSetElement (): WorkerSetElement
{
154 this.workerStartup
= true
155 const worker
= new Worker(this.workerScript
, {
157 ...this.workerOptions
.poolOptions
?.workerOptions
,
159 worker
.on('message', this.workerOptions
.poolOptions
?.messageHandler
?? EMPTY_FUNCTION
)
160 worker
.on('message', (message
: WorkerMessage
<R
>) => {
161 const { data
, event
, uuid
} = message
162 if (this.promiseResponseMap
.has(uuid
)) {
163 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
164 const { reject
, resolve
, workerSetElement
} = this.promiseResponseMap
.get(uuid
)!
166 case WorkerMessageEvents
.addedWorkerElement
:
167 this.emitter
?.emit(WorkerSetEvents
.elementAdded
, this.info
)
168 ++workerSetElement
.numberOfWorkerElements
171 case WorkerMessageEvents
.workerElementError
:
172 this.emitter
?.emit(WorkerSetEvents
.elementError
, data
)
178 `Unknown worker message event: '${event}' received with data: '${JSON.stringify(
186 this.promiseResponseMap
.delete(uuid
)
189 worker
.on('error', this.workerOptions
.poolOptions
?.errorHandler
?? EMPTY_FUNCTION
)
190 worker
.once('error', error
=> {
191 this.emitter
?.emit(WorkerSetEvents
.error
, error
)
193 this.workerOptions
.poolOptions
?.restartWorkerOnError
=== true &&
197 this.addWorkerSetElement()
200 // eslint-disable-next-line promise/no-promise-in-callback
201 worker
.terminate().catch((error
: unknown
) => this.emitter
?.emit(WorkerSetEvents
.error
, error
))
203 worker
.on('online', this.workerOptions
.poolOptions
?.onlineHandler
?? EMPTY_FUNCTION
)
204 worker
.on('exit', this.workerOptions
.poolOptions
?.exitHandler
?? EMPTY_FUNCTION
)
205 worker
.once('exit', () => {
206 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
))
208 const workerSetElement
: WorkerSetElement
= {
209 numberOfWorkerElements
: 0,
212 this.workerSet
.add(workerSetElement
)
213 this.workerStartup
= false
214 return workerSetElement
217 private async getWorkerSetElement (): Promise
<WorkerSetElement
> {
218 let chosenWorkerSetElement
: undefined | WorkerSetElement
219 for (const workerSetElement
of this.workerSet
) {
220 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
221 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
222 chosenWorkerSetElement
= workerSetElement
226 if (chosenWorkerSetElement
== null) {
227 chosenWorkerSetElement
= this.addWorkerSetElement()
228 // Add worker set element sequentially to optimize memory at startup
229 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
230 this.workerOptions
.workerStartDelay
! > 0 &&
231 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
232 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)))
234 return chosenWorkerSetElement
237 private getWorkerSetElementByWorker (worker
: Worker
): undefined | WorkerSetElement
{
238 let workerSetElt
: undefined | WorkerSetElement
239 for (const workerSetElement
of this.workerSet
) {
240 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
241 workerSetElt
= workerSetElement
248 private removeWorkerSetElement (workerSetElement
: undefined | WorkerSetElement
): void {
249 if (workerSetElement
== null) {
252 this.workerSet
.delete(workerSetElement
)