1 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
3 import { WorkerMessageEvents
, WorkerOptions
, WorkerSetElement
} from
'../types/Worker';
5 import Utils from
'../utils/Utils';
6 import { Worker
} from
'worker_threads';
7 import WorkerAbstract from
'./WorkerAbstract';
8 import { WorkerUtils
} from
'./WorkerUtils';
10 export default class WorkerSet
<T
> extends WorkerAbstract
{
11 public readonly maxElementsPerWorker
: number;
12 private readonly messageHandler
: (message
: unknown
) => void | Promise
<void>;
13 private workerSet
: Set
<WorkerSetElement
>;
16 * Create a new `WorkerSet`.
19 * @param maxElementsPerWorker
20 * @param workerStartDelay
23 constructor(workerScript
: string, maxElementsPerWorker
= 1, workerStartDelay
?: number, opts
?: WorkerOptions
) {
24 super(workerScript
, workerStartDelay
);
25 this.maxElementsPerWorker
= maxElementsPerWorker
;
26 this.messageHandler
= opts
?.messageHandler
?? (() => { /* This is intentional */ });
27 this.workerSet
= new Set
<WorkerSetElement
>();
31 return this.workerSet
.size
;
40 public async addElement(elementData
: T
): Promise
<void> {
41 if (!this.workerSet
) {
42 throw new Error('Cannot add a WorkerSet element: workers\' set does not exist');
44 if (this.getLastWorkerSetElement().numberOfWorkerElements
>= this.maxElementsPerWorker
) {
46 // Start worker sequentially to optimize memory at startup
47 await Utils
.sleep(this.workerStartDelay
);
49 this.getLastWorker().postMessage({ id
: WorkerMessageEvents
.START_WORKER_ELEMENT
, data
: elementData
});
50 this.getLastWorkerSetElement().numberOfWorkerElements
++;
58 public async start(): Promise
<void> {
60 // Start worker sequentially to optimize memory at startup
61 await Utils
.sleep(this.workerStartDelay
);
69 public async stop(): Promise
<void> {
70 for (const workerSetElement
of this.workerSet
) {
71 await workerSetElement
.worker
.terminate();
73 this.workerSet
.clear();
80 private startWorker(): void {
81 const worker
= new Worker(this.workerScript
);
82 worker
.on('message', (msg
) => {
84 await this.messageHandler(msg
);
85 })().catch(() => { /* This is intentional */ });
87 worker
.on('error', () => { /* This is intentional */ });
88 worker
.on('exit', (code
) => {
89 WorkerUtils
.defaultExitHandler(code
);
90 this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
));
92 this.workerSet
.add({ worker
, numberOfWorkerElements
: 0 });
95 private getLastWorkerSetElement(): WorkerSetElement
{
96 let workerSetElement
: WorkerSetElement
;
97 // eslint-disable-next-line no-empty
98 for (workerSetElement
of this.workerSet
) { /* This is intentional */ }
99 return workerSetElement
;
102 private getLastWorker(): Worker
{
103 return this.getLastWorkerSetElement().worker
;
106 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
{
107 let workerSetElt
: WorkerSetElement
;
108 for (const workerSetElement
of this.workerSet
) {
109 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
110 workerSetElt
= workerSetElement
;