1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { Worker
} from
'worker_threads';
5 import { WorkerAbstract
} from
'./WorkerAbstract';
11 type WorkerSetElement
,
12 } from
'./WorkerTypes';
13 import { WorkerUtils
} from
'./WorkerUtils';
15 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
16 private readonly workerSet
: Set
<WorkerSetElement
>;
19 * Create a new `WorkerSet`.
21 * @param workerScript -
22 * @param workerOptions -
24 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
25 super(workerScript
, workerOptions
);
26 this.workerSet
= new Set
<WorkerSetElement
>();
30 return this.workerSet
.size
;
33 get
maxElementsPerWorker(): number | undefined {
34 return this.workerOptions
.elementsPerWorker
;
39 * @param elementData -
43 public async addElement(elementData
: WorkerData
): Promise
<void> {
44 if (!this.workerSet
) {
45 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
48 this.workerSet
.size
=== 0 ||
49 this.getLastWorkerSetElement().numberOfWorkerElements
>= this.workerOptions
.elementsPerWorker
51 await this.startWorker();
53 this.getLastWorker().postMessage({
54 id
: WorkerMessageEvents
.START_WORKER_ELEMENT
,
57 this.getLastWorkerSetElement().numberOfWorkerElements
++;
58 // Start element sequentially to optimize memory at startup
59 if (this.workerOptions
.elementStartDelay
> 0) {
60 await WorkerUtils
.sleep(this.workerOptions
.elementStartDelay
);
69 public async start(): Promise
<void> {
70 await this.startWorker();
78 public async stop(): Promise
<void> {
79 for (const workerSetElement
of this.workerSet
) {
80 await workerSetElement
.worker
.terminate();
82 this.workerSet
.clear();
86 * Start a new `Worker`.
88 private async startWorker(): Promise
<void> {
89 const worker
= new Worker(this.workerScript
);
93 this.workerOptions
?.messageHandler
??
95 /* This is intentional */
97 ).bind(this) as MessageHandler
<Worker
>
99 worker
.on('error', WorkerUtils
.defaultErrorHandler
.bind(this) as (err
: Error) => void);
100 worker
.on('exit', (code
) => {
101 WorkerUtils
.defaultExitHandler(code
);
102 this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
));
104 this.workerSet
.add({ worker
, numberOfWorkerElements
: 0 });
105 // Start worker sequentially to optimize memory at startup
106 this.workerOptions
.workerStartDelay
> 0 &&
107 (await WorkerUtils
.sleep(this.workerOptions
.workerStartDelay
));
110 private getLastWorkerSetElement(): WorkerSetElement
{
111 let workerSetElement
: WorkerSetElement
;
112 for (workerSetElement
of this.workerSet
) {
113 /* This is intentional */
115 return workerSetElement
;
118 private getLastWorker(): Worker
{
119 return this.getLastWorkerSetElement().worker
;
122 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
{
123 let workerSetElt
: WorkerSetElement
;
124 for (const workerSetElement
of this.workerSet
) {
125 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
126 workerSetElt
= workerSetElement
;