1 // Partial Copyright Jerome Benoit. 2021-2024. All Rights Reserved.
3 import { EventEmitterAsyncResource
} from
'node:events'
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads'
6 import { WorkerAbstract
} from
'./WorkerAbstract.js'
7 import { EMPTY_FUNCTION
, workerSetVersion
} from
'./WorkerConstants.js'
14 type WorkerSetElement
,
16 } from
'./WorkerTypes.js'
17 import { randomizeDelay
, sleep
} from
'./WorkerUtils.js'
19 export class WorkerSet
<D
extends WorkerData
, R
extends WorkerData
> extends WorkerAbstract
<D
, R
> {
20 public readonly emitter
: EventEmitterAsyncResource
| undefined
21 private readonly workerSet
: Set
<WorkerSetElement
>
22 private started
: boolean
23 private workerStartup
: boolean
26 * Creates a new `WorkerSet`.
28 * @param workerScript -
29 * @param workerOptions -
31 constructor (workerScript
: string, workerOptions
: WorkerOptions
) {
32 super(workerScript
, workerOptions
)
33 if (this.workerOptions
.elementsPerWorker
== null) {
34 throw new TypeError('Elements per worker is not defined')
36 if (!Number.isSafeInteger(this.workerOptions
.elementsPerWorker
)) {
37 throw new TypeError('Elements per worker must be an integer')
39 if (this.workerOptions
.elementsPerWorker
<= 0) {
40 throw new RangeError('Elements per worker must be greater than zero')
42 this.workerSet
= new Set
<WorkerSetElement
>()
43 if (this.workerOptions
.poolOptions
?.enableEvents
=== true) {
44 this.emitter
= new EventEmitterAsyncResource({ name
: 'workerset' })
47 this.workerStartup
= false
50 get
info (): SetInfo
{
52 version
: workerSetVersion
,
55 started
: this.started
,
57 elementsExecuting
: [...this.workerSet
].reduce(
58 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
61 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
62 elementsPerWorker
: this.maxElementsPerWorker
!
67 return this.workerSet
.size
70 get
maxElementsPerWorker (): number | undefined {
71 return this.workerOptions
.elementsPerWorker
75 public async start (): Promise
<void> {
76 this.addWorkerSetElement()
77 // Add worker set element sequentially to optimize memory at startup
78 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
79 this.workerOptions
.workerStartDelay
! > 0 &&
80 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
81 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)))
82 this.emitter
?.emit(WorkerSetEvents
.started
, this.info
)
87 public async stop (): Promise
<void> {
88 for (const workerSetElement
of this.workerSet
) {
89 const worker
= workerSetElement
.worker
90 const waitWorkerExit
= new Promise
<void>(resolve
=> {
91 worker
.once('exit', () => {
96 await worker
.terminate()
99 this.emitter
?.emit(WorkerSetEvents
.stopped
, this.info
)
101 this.emitter
?.emitDestroy()
105 public async addElement (elementData
: D
): Promise
<R
> {
107 throw new Error('Cannot add a WorkerSet element: not started')
109 const workerSetElement
= await this.getWorkerSetElement()
110 const waitAddedWorkerElement
= new Promise
<R
>((resolve
, reject
) => {
111 const messageHandler
= (message
: WorkerMessage
<R
>): void => {
112 if (message
.event
=== WorkerMessageEvents
.addedWorkerElement
) {
113 ++workerSetElement
.numberOfWorkerElements
114 resolve(message
.data
)
115 workerSetElement
.worker
.off('message', messageHandler
)
116 } else if (message
.event
=== WorkerMessageEvents
.workerElementError
) {
117 // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
119 workerSetElement
.worker
.off('message', messageHandler
)
122 workerSetElement
.worker
.on('message', messageHandler
)
124 workerSetElement
.worker
.postMessage({
125 event
: WorkerMessageEvents
.addWorkerElement
,
128 const response
= await waitAddedWorkerElement
129 // Add element sequentially to optimize memory at startup
130 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
131 if (this.workerOptions
.elementAddDelay
! > 0) {
132 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
133 await sleep(randomizeDelay(this.workerOptions
.elementAddDelay
!))
139 * Adds a new `WorkerSetElement`.
141 private addWorkerSetElement (): WorkerSetElement
{
142 this.workerStartup
= true
143 const worker
= new Worker(this.workerScript
, {
145 ...this.workerOptions
.poolOptions
?.workerOptions
147 worker
.on('message', this.workerOptions
.poolOptions
?.messageHandler
?? EMPTY_FUNCTION
)
148 worker
.on('message', (message
: WorkerMessage
<R
>) => {
149 if (message
.event
=== WorkerMessageEvents
.addedWorkerElement
) {
150 this.emitter
?.emit(WorkerSetEvents
.elementAdded
, this.info
)
151 } else if (message
.event
=== WorkerMessageEvents
.workerElementError
) {
152 this.emitter
?.emit(WorkerSetEvents
.elementError
, message
.data
)
155 worker
.on('error', this.workerOptions
.poolOptions
?.errorHandler
?? EMPTY_FUNCTION
)
156 worker
.on('error', error
=> {
157 this.emitter
?.emit(WorkerSetEvents
.error
, error
)
159 this.workerOptions
.poolOptions
?.restartWorkerOnError
=== true &&
163 this.addWorkerSetElement()
166 worker
.terminate().catch((error
: unknown
) => this.emitter
?.emit(WorkerSetEvents
.error
, error
))
168 worker
.on('online', this.workerOptions
.poolOptions
?.onlineHandler
?? EMPTY_FUNCTION
)
169 worker
.on('exit', this.workerOptions
.poolOptions
?.exitHandler
?? EMPTY_FUNCTION
)
170 worker
.once('exit', () => {
171 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
))
173 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 }
174 this.workerSet
.add(workerSetElement
)
175 this.workerStartup
= false
176 return workerSetElement
179 private removeWorkerSetElement (workerSetElement
: WorkerSetElement
| undefined): void {
180 if (workerSetElement
== null) {
183 this.workerSet
.delete(workerSetElement
)
186 private async getWorkerSetElement (): Promise
<WorkerSetElement
> {
187 let chosenWorkerSetElement
: WorkerSetElement
| undefined
188 for (const workerSetElement
of this.workerSet
) {
189 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
190 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
191 chosenWorkerSetElement
= workerSetElement
195 if (chosenWorkerSetElement
== null) {
196 chosenWorkerSetElement
= this.addWorkerSetElement()
197 // Add worker set element sequentially to optimize memory at startup
198 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
199 this.workerOptions
.workerStartDelay
! > 0 &&
200 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
201 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)))
203 return chosenWorkerSetElement
206 private getWorkerSetElementByWorker (worker
: Worker
): WorkerSetElement
| undefined {
207 let workerSetElt
: WorkerSetElement
| undefined
208 for (const workerSetElement
of this.workerSet
) {
209 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
210 workerSetElt
= workerSetElement