1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import EventEmitterAsyncResource from
'node:events';
4 import { Worker
} from
'node:worker_threads';
6 import { WorkerAbstract
} from
'./WorkerAbstract';
7 import { WorkerConstants
} from
'./WorkerConstants';
14 type WorkerSetElement
,
16 } from
'./WorkerTypes';
17 import { defaultErrorHandler
, defaultExitHandler
, sleep
} from
'./WorkerUtils';
19 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
20 public readonly emitter
: EventEmitterAsyncResource
;
21 private readonly workerSet
: Set
<WorkerSetElement
>;
24 * Create a new `WorkerSet`.
26 * @param workerScript -
27 * @param workerOptions -
29 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
30 super(workerScript
, workerOptions
);
31 this.workerSet
= new Set
<WorkerSetElement
>();
32 this.emitter
= new EventEmitterAsyncResource();
38 runningElements
: [...this.workerSet
].reduce(
39 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
42 elementsPerWorker
: this.maxElementsPerWorker
,
47 return this.workerSet
.size
;
50 get
maxElementsPerWorker(): number | undefined {
51 return this.workerOptions
.elementsPerWorker
;
55 public async start(): Promise
<void> {
56 this.addWorkerSetElement();
57 // Add worker set element sequentially to optimize memory at startup
58 this.workerOptions
.workerStartDelay
> 0 && (await sleep(this.workerOptions
.workerStartDelay
));
62 public async stop(): Promise
<void> {
63 for (const workerSetElement
of this.workerSet
) {
64 await workerSetElement
.worker
.terminate();
66 this.workerSet
.clear();
70 public async addElement(elementData
: WorkerData
): Promise
<void> {
71 if (!this.workerSet
) {
72 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
74 const workerSetElement
= await this.getWorkerSetElement();
75 workerSetElement
.worker
.postMessage({
76 id
: WorkerMessageEvents
.startWorkerElement
,
79 ++workerSetElement
.numberOfWorkerElements
;
80 // Add element sequentially to optimize memory at startup
81 if (this.workerOptions
.elementStartDelay
> 0) {
82 await sleep(this.workerOptions
.elementStartDelay
);
87 * Add a new `WorkerSetElement`.
89 private addWorkerSetElement(): WorkerSetElement
{
90 const worker
= new Worker(this.workerScript
);
93 (this.workerOptions
?.messageHandler
?? WorkerConstants
.EMPTY_FUNCTION
).bind(
95 ) as MessageHandler
<Worker
>
97 worker
.on('error', defaultErrorHandler
.bind(this) as (err
: Error) => void);
98 worker
.on('error', (error
) => {
99 this.emitter
.emit(WorkerSetEvents
.error
, error
);
100 this.addWorkerSetElement();
102 worker
.on('exit', defaultExitHandler
.bind(this) as (exitCode
: number) => void);
103 worker
.on('exit', () => this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
)));
104 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
105 this.workerSet
.add(workerSetElement
);
106 return workerSetElement
;
109 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
110 let chosenWorkerSetElement
: WorkerSetElement
;
111 for (const workerSetElement
of this.workerSet
) {
112 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
) {
113 chosenWorkerSetElement
= workerSetElement
;
117 if (!chosenWorkerSetElement
) {
118 chosenWorkerSetElement
= this.addWorkerSetElement();
119 // Add worker set element sequentially to optimize memory at startup
120 this.workerOptions
.workerStartDelay
> 0 && (await sleep(this.workerOptions
.workerStartDelay
));
122 return chosenWorkerSetElement
;
125 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
126 let workerSetElt
: WorkerSetElement
;
127 for (const workerSetElement
of this.workerSet
) {
128 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
129 workerSetElt
= workerSetElement
;