1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { Worker
} from
'node:worker_threads';
5 import { WorkerAbstract
} from
'./WorkerAbstract';
6 import { WorkerConstants
} from
'./WorkerConstants';
12 type WorkerSetElement
,
13 } from
'./WorkerTypes';
14 import { defaultErrorHandler
, defaultExitHandler
, sleep
} from
'./WorkerUtils';
16 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
17 private readonly workerSet
: Set
<WorkerSetElement
>;
20 * Create a new `WorkerSet`.
22 * @param workerScript -
23 * @param workerOptions -
25 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
26 super(workerScript
, workerOptions
);
27 this.workerSet
= new Set
<WorkerSetElement
>();
31 return this.workerSet
.size
;
34 get
maxElementsPerWorker(): number | undefined {
35 return this.workerOptions
.elementsPerWorker
;
39 public async addElement(elementData
: WorkerData
): Promise
<void> {
40 if (!this.workerSet
) {
41 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
43 let lastWorkerSetElement
= this.getLastWorkerSetElement();
45 this.workerSet
.size
=== 0 ||
46 lastWorkerSetElement
.numberOfWorkerElements
>= this.workerOptions
.elementsPerWorker
49 // Start worker sequentially to optimize memory at startup
50 this.workerOptions
.workerStartDelay
> 0 && (await sleep(this.workerOptions
.workerStartDelay
));
51 lastWorkerSetElement
= this.getLastWorkerSetElement();
53 lastWorkerSetElement
.worker
.postMessage({
54 id
: WorkerMessageEvents
.startWorkerElement
,
57 ++lastWorkerSetElement
.numberOfWorkerElements
;
58 // Start element sequentially to optimize memory at startup
59 if (this.workerOptions
.elementStartDelay
> 0) {
60 await sleep(this.workerOptions
.elementStartDelay
);
65 public async start(): Promise
<void> {
67 // Start worker sequentially to optimize memory at startup
68 this.workerOptions
.workerStartDelay
> 0 && (await sleep(this.workerOptions
.workerStartDelay
));
72 public async stop(): Promise
<void> {
73 for (const workerSetElement
of this.workerSet
) {
74 await workerSetElement
.worker
.terminate();
76 this.workerSet
.clear();
80 * Start a new `Worker`.
82 private startWorker(): void {
83 const worker
= new Worker(this.workerScript
);
86 (this.workerOptions
?.messageHandler
?? WorkerConstants
.EMPTY_FUNCTION
).bind(
88 ) as MessageHandler
<Worker
>
90 worker
.on('error', defaultErrorHandler
.bind(this) as (err
: Error) => void);
91 worker
.on('error', () => this.startWorker());
92 worker
.on('exit', defaultExitHandler
.bind(this) as (exitCode
: number) => void);
93 worker
.on('exit', () => this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
)));
94 this.workerSet
.add({ worker
, numberOfWorkerElements
: 0 });
97 private getLastWorkerSetElement(): WorkerSetElement
{
98 let workerSetElement
: WorkerSetElement
;
99 for (workerSetElement
of this.workerSet
) {
100 /* This is intentional */
102 return workerSetElement
;
105 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
{
106 let workerSetElt
: WorkerSetElement
;
107 for (const workerSetElement
of this.workerSet
) {
108 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
109 workerSetElt
= workerSetElement
;