12f29294ea54b98ad82b1cca9013662725f3c223
1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { EventEmitter
} from
'node:events';
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads';
6 import { WorkerAbstract
} from
'./WorkerAbstract';
7 import { DEFAULT_POOL_OPTIONS
, EMPTY_FUNCTION
, workerSetVersion
} from
'./WorkerConstants';
14 type WorkerSetElement
,
16 } from
'./WorkerTypes';
17 import { sleep
} from
'./WorkerUtils';
19 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
20 public readonly emitter
!: EventEmitter
;
21 private readonly workerSet
: Set
<WorkerSetElement
>;
22 private workerStartup
: boolean;
25 * Creates a new `WorkerSet`.
27 * @param workerScript -
28 * @param workerOptions -
30 constructor(workerScript
: string, workerOptions
: WorkerOptions
) {
31 super(workerScript
, workerOptions
);
33 this.workerOptions
.elementsPerWorker
=== null ||
34 this.workerOptions
.elementsPerWorker
=== undefined
36 throw new TypeError('Elements per worker is not defined');
38 if (!Number.isSafeInteger(this.workerOptions
.elementsPerWorker
)) {
39 throw new TypeError('Elements per worker must be an integer');
41 if (this.workerOptions
.elementsPerWorker
<= 0) {
42 throw new RangeError('Elements per worker must be greater than zero');
44 this.workerOptions
.poolOptions
= {
45 ...DEFAULT_POOL_OPTIONS
,
46 ...this.workerOptions
.poolOptions
,
48 this.workerSet
= new Set
<WorkerSetElement
>();
49 if (this.workerOptions
.poolOptions
?.enableEvents
) {
50 this.emitter
= new EventEmitter();
52 this.workerStartup
= false;
57 version
: workerSetVersion
,
61 elementsExecuting
: [...this.workerSet
].reduce(
62 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
65 elementsPerWorker
: this.maxElementsPerWorker
!,
70 return this.workerSet
.size
;
73 get
maxElementsPerWorker(): number | undefined {
74 return this.workerOptions
.elementsPerWorker
;
78 public async start(): Promise
<void> {
79 this.addWorkerSetElement();
80 // Add worker set element sequentially to optimize memory at startup
81 this.workerOptions
.workerStartDelay
! > 0 && (await sleep(this.workerOptions
.workerStartDelay
!));
85 public async stop(): Promise
<void> {
86 for (const workerSetElement
of this.workerSet
) {
87 const worker
= workerSetElement
.worker
;
88 const waitWorkerExit
= new Promise
<void>((resolve
) => {
89 worker
.on('exit', () => {
93 await worker
.terminate();
99 public async addElement(elementData
: WorkerData
): Promise
<void> {
100 if (!this.workerSet
) {
101 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
103 const workerSetElement
= await this.getWorkerSetElement();
104 workerSetElement
.worker
.postMessage({
105 event
: WorkerMessageEvents
.startWorkerElement
,
108 ++workerSetElement
.numberOfWorkerElements
;
109 // Add element sequentially to optimize memory at startup
110 if (this.workerOptions
.elementStartDelay
! > 0) {
111 await sleep(this.workerOptions
.elementStartDelay
!);
116 * Adds a new `WorkerSetElement`.
118 private addWorkerSetElement(): WorkerSetElement
{
119 this.workerStartup
= true;
120 const worker
= new Worker(this.workerScript
, {
122 ...this.workerOptions
.poolOptions
?.workerOptions
,
124 worker
.on('message', this.workerOptions
.poolOptions
?.messageHandler
?? EMPTY_FUNCTION
);
125 worker
.on('message', (message
: WorkerMessage
<WorkerData
>) => {
126 if (message
.event
=== WorkerMessageEvents
.startedWorkerElement
) {
127 this.emitter
?.emit(WorkerSetEvents
.elementStarted
, this.info
);
128 } else if (message
.event
=== WorkerMessageEvents
.startWorkerElementError
) {
129 this.emitter
?.emit(WorkerSetEvents
.elementError
, message
.data
);
132 worker
.on('error', this.workerOptions
.poolOptions
?.errorHandler
?? EMPTY_FUNCTION
);
133 worker
.on('error', (error
) => {
134 this.emitter
?.emit(WorkerSetEvents
.error
, error
);
135 if (this.workerOptions
.poolOptions
?.restartWorkerOnError
&& !this.workerStartup
) {
136 this.addWorkerSetElement();
139 worker
.on('online', this.workerOptions
.poolOptions
?.onlineHandler
?? EMPTY_FUNCTION
);
140 worker
.on('exit', this.workerOptions
.poolOptions
?.exitHandler
?? EMPTY_FUNCTION
);
141 worker
.once('exit', () =>
142 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
)!),
144 this.workerStartup
= false;
145 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
146 this.workerSet
.add(workerSetElement
);
147 return workerSetElement
;
150 private removeWorkerSetElement(workerSetElement
: WorkerSetElement
): void {
151 this.workerSet
.delete(workerSetElement
);
154 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
155 let chosenWorkerSetElement
: WorkerSetElement
| undefined;
156 for (const workerSetElement
of this.workerSet
) {
157 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
158 chosenWorkerSetElement
= workerSetElement
;
162 if (!chosenWorkerSetElement
) {
163 chosenWorkerSetElement
= this.addWorkerSetElement();
164 // Add worker set element sequentially to optimize memory at startup
165 this.workerOptions
.workerStartDelay
! > 0 &&
166 (await sleep(this.workerOptions
.workerStartDelay
!));
168 return chosenWorkerSetElement
;
171 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
172 let workerSetElt
: WorkerSetElement
| undefined;
173 for (const workerSetElement
of this.workerSet
) {
174 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
175 workerSetElt
= workerSetElement
;