1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { EventEmitterAsyncResource
} from
'node:events';
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads';
6 import { WorkerAbstract
} from
'./WorkerAbstract';
7 import { EMPTY_FUNCTION
, workerSetVersion
} from
'./WorkerConstants';
14 type WorkerSetElement
,
16 } from
'./WorkerTypes';
17 import { randomizeDelay
, sleep
} from
'./WorkerUtils';
19 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
20 public readonly emitter
: EventEmitterAsyncResource
| undefined;
21 private readonly workerSet
: Set
<WorkerSetElement
>;
22 private started
: boolean;
23 private workerStartup
: boolean;
26 * Creates a new `WorkerSet`.
28 * @param workerScript -
29 * @param workerOptions -
31 constructor(workerScript
: string, workerOptions
: WorkerOptions
) {
32 super(workerScript
, workerOptions
);
33 if (this.workerOptions
.elementsPerWorker
== null) {
34 throw new TypeError('Elements per worker is not defined');
36 if (!Number.isSafeInteger(this.workerOptions
.elementsPerWorker
)) {
37 throw new TypeError('Elements per worker must be an integer');
39 if (this.workerOptions
.elementsPerWorker
<= 0) {
40 throw new RangeError('Elements per worker must be greater than zero');
42 this.workerSet
= new Set
<WorkerSetElement
>();
43 if (this.workerOptions
.poolOptions
?.enableEvents
) {
44 this.emitter
= new EventEmitterAsyncResource({ name
: 'workerset' });
47 this.workerStartup
= false;
52 version
: workerSetVersion
,
56 elementsExecuting
: [...this.workerSet
].reduce(
57 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
60 elementsPerWorker
: this.maxElementsPerWorker
!,
65 return this.workerSet
.size
;
68 get
maxElementsPerWorker(): number | undefined {
69 return this.workerOptions
.elementsPerWorker
;
73 public async start(): Promise
<void> {
74 this.addWorkerSetElement();
75 // Add worker set element sequentially to optimize memory at startup
76 this.workerOptions
.workerStartDelay
! > 0 &&
77 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)));
78 this.emitter
?.emit(WorkerSetEvents
.started
, this.info
);
83 public async stop(): Promise
<void> {
84 for (const workerSetElement
of this.workerSet
) {
85 const worker
= workerSetElement
.worker
;
86 const waitWorkerExit
= new Promise
<void>((resolve
) => {
87 worker
.once('exit', () => {
91 await worker
.terminate();
93 this.emitter
?.emit(WorkerSetEvents
.stopped
, this.info
);
94 this.emitter
?.emitDestroy();
95 this.emitter
?.removeAllListeners();
101 public async addElement(elementData
: WorkerData
): Promise
<void> {
103 throw new Error('Cannot add a WorkerSet element: not started');
105 if (this.workerSet
== null) {
106 throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
108 const workerSetElement
= await this.getWorkerSetElement();
109 workerSetElement
.worker
.postMessage({
110 event
: WorkerMessageEvents
.startWorkerElement
,
113 ++workerSetElement
.numberOfWorkerElements
;
114 // Add element sequentially to optimize memory at startup
115 if (this.workerOptions
.elementStartDelay
! > 0) {
116 await sleep(randomizeDelay(this.workerOptions
.elementStartDelay
!));
121 * Adds a new `WorkerSetElement`.
123 private addWorkerSetElement(): WorkerSetElement
{
124 this.workerStartup
= true;
125 const worker
= new Worker(this.workerScript
, {
127 ...this.workerOptions
.poolOptions
?.workerOptions
,
129 worker
.on('message', this.workerOptions
.poolOptions
?.messageHandler
?? EMPTY_FUNCTION
);
130 worker
.on('message', (message
: WorkerMessage
<WorkerData
>) => {
131 if (message
.event
=== WorkerMessageEvents
.startedWorkerElement
) {
132 this.emitter
?.emit(WorkerSetEvents
.elementStarted
, this.info
);
133 } else if (message
.event
=== WorkerMessageEvents
.startWorkerElementError
) {
134 this.emitter
?.emit(WorkerSetEvents
.elementError
, message
.data
);
137 worker
.on('error', this.workerOptions
.poolOptions
?.errorHandler
?? EMPTY_FUNCTION
);
138 worker
.on('error', (error
) => {
139 this.emitter
?.emit(WorkerSetEvents
.error
, error
);
141 this.workerOptions
.poolOptions
?.restartWorkerOnError
&&
145 this.addWorkerSetElement();
148 worker
.on('online', this.workerOptions
.poolOptions
?.onlineHandler
?? EMPTY_FUNCTION
);
149 worker
.on('exit', this.workerOptions
.poolOptions
?.exitHandler
?? EMPTY_FUNCTION
);
150 worker
.once('exit', () =>
151 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
)!),
153 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
154 this.workerSet
.add(workerSetElement
);
155 this.workerStartup
= false;
156 return workerSetElement
;
159 private removeWorkerSetElement(workerSetElement
: WorkerSetElement
): void {
160 this.workerSet
.delete(workerSetElement
);
163 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
164 let chosenWorkerSetElement
: WorkerSetElement
| undefined;
165 for (const workerSetElement
of this.workerSet
) {
166 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
167 chosenWorkerSetElement
= workerSetElement
;
171 if (!chosenWorkerSetElement
) {
172 chosenWorkerSetElement
= this.addWorkerSetElement();
173 // Add worker set element sequentially to optimize memory at startup
174 this.workerOptions
.workerStartDelay
! > 0 &&
175 (await sleep(randomizeDelay(this.workerOptions
.workerStartDelay
!)));
177 return chosenWorkerSetElement
;
180 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
181 let workerSetElt
: WorkerSetElement
| undefined;
182 for (const workerSetElement
of this.workerSet
) {
183 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
184 workerSetElt
= workerSetElement
;