1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import EventEmitterAsyncResource from
'node:events';
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads';
6 import { WorkerAbstract
} from
'./WorkerAbstract';
7 import { WorkerConstants
} from
'./WorkerConstants';
13 type WorkerSetElement
,
15 } from
'./WorkerTypes';
16 import { sleep
} from
'./WorkerUtils';
18 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
19 public readonly emitter
: EventEmitterAsyncResource
;
20 private readonly workerSet
: Set
<WorkerSetElement
>;
23 * Create a new `WorkerSet`.
25 * @param workerScript -
26 * @param workerOptions -
28 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
29 super(workerScript
, workerOptions
);
30 this.workerSet
= new Set
<WorkerSetElement
>();
31 this.emitter
= new EventEmitterAsyncResource();
37 runningElements
: [...this.workerSet
].reduce(
38 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
41 elementsPerWorker
: this.maxElementsPerWorker
,
46 return this.workerSet
.size
;
49 get
maxElementsPerWorker(): number | undefined {
50 return this.workerOptions
.elementsPerWorker
;
54 public async start(): Promise
<void> {
55 this.addWorkerSetElement();
56 // Add worker set element sequentially to optimize memory at startup
57 this.workerOptions
.workerStartDelay
> 0 && (await sleep(this.workerOptions
.workerStartDelay
));
61 public async stop(): Promise
<void> {
62 for (const workerSetElement
of this.workerSet
) {
63 await workerSetElement
.worker
.terminate();
65 this.workerSet
.clear();
69 public async addElement(elementData
: WorkerData
): Promise
<void> {
70 if (!this.workerSet
) {
71 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
73 const workerSetElement
= await this.getWorkerSetElement();
74 workerSetElement
.worker
.postMessage({
75 id
: WorkerMessageEvents
.startWorkerElement
,
78 ++workerSetElement
.numberOfWorkerElements
;
79 // Add element sequentially to optimize memory at startup
80 if (this.workerOptions
.elementStartDelay
> 0) {
81 await sleep(this.workerOptions
.elementStartDelay
);
86 * Add a new `WorkerSetElement`.
88 private addWorkerSetElement(): WorkerSetElement
{
89 const worker
= new Worker(this.workerScript
, {
91 ...this.workerOptions
.poolOptions
.workerOptions
,
95 this.workerOptions
?.poolOptions
?.messageHandler
?? WorkerConstants
.EMPTY_FUNCTION
99 this.workerOptions
?.poolOptions
?.errorHandler
?? WorkerConstants
.EMPTY_FUNCTION
101 worker
.on('error', (error
) => {
102 this.emitter
.emit(WorkerSetEvents
.error
, error
);
103 this.addWorkerSetElement();
107 this.workerOptions
?.poolOptions
?.onlineHandler
?? WorkerConstants
.EMPTY_FUNCTION
111 this.workerOptions
?.poolOptions
?.exitHandler
?? WorkerConstants
.EMPTY_FUNCTION
113 worker
.on('exit', () => this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
)));
114 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
115 this.workerSet
.add(workerSetElement
);
116 return workerSetElement
;
119 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
120 let chosenWorkerSetElement
: WorkerSetElement
;
121 for (const workerSetElement
of this.workerSet
) {
122 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
) {
123 chosenWorkerSetElement
= workerSetElement
;
127 if (!chosenWorkerSetElement
) {
128 chosenWorkerSetElement
= this.addWorkerSetElement();
129 // Add worker set element sequentially to optimize memory at startup
130 this.workerOptions
.workerStartDelay
> 0 && (await sleep(this.workerOptions
.workerStartDelay
));
132 return chosenWorkerSetElement
;
135 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
136 let workerSetElt
: WorkerSetElement
;
137 for (const workerSetElement
of this.workerSet
) {
138 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
139 workerSetElt
= workerSetElement
;