1 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
3 import { WorkerEvents
, WorkerSetElement
} from
'../types/Worker';
5 import Utils from
'../utils/Utils';
6 import { Worker
} from
'worker_threads';
7 import WorkerAbstract from
'./WorkerAbstract';
8 import { WorkerUtils
} from
'./WorkerUtils';
10 export default class WorkerSet
<T
> extends WorkerAbstract
{
11 public maxElementsPerWorker
: number;
12 private workerSet
: Set
<WorkerSetElement
>;
15 * Create a new `WorkerSet`.
18 * @param maxElementsPerWorker
19 * @param workerStartDelay
20 * @param messageListenerCallback
22 constructor(workerScript
: string, maxElementsPerWorker
= 1, workerStartDelay
?: number, messageListenerCallback
: (message
: any) => void = () => { /* This is intentional */ }) {
23 super(workerScript
, workerStartDelay
, messageListenerCallback
);
24 this.workerSet
= new Set
<WorkerSetElement
>();
25 this.maxElementsPerWorker
= maxElementsPerWorker
;
29 return this.workerSet
.size
;
38 public async addElement(elementData
: T
): Promise
<void> {
39 if (!this.workerSet
) {
40 throw new Error('Cannot add a WorkerSet element: workers\' set does not exist');
42 if (this.getLastWorkerSetElement().numberOfWorkerElements
>= this.maxElementsPerWorker
) {
44 // Start worker sequentially to optimize memory at startup
45 await Utils
.sleep(this.workerStartDelay
);
47 this.getLastWorker().postMessage({ id
: WorkerEvents
.START_WORKER_ELEMENT
, data
: elementData
});
48 this.getLastWorkerSetElement().numberOfWorkerElements
++;
56 public async start(): Promise
<void> {
58 // Start worker sequentially to optimize memory at startup
59 await Utils
.sleep(this.workerStartDelay
);
67 public async stop(): Promise
<void> {
68 for (const workerSetElement
of this.workerSet
) {
69 await workerSetElement
.worker
.terminate();
71 this.workerSet
.clear();
78 private startWorker(): void {
79 const worker
= new Worker(this.workerScript
);
80 worker
.on('message', this.messageListener
);
81 worker
.on('error', () => { /* This is intentional */ });
82 worker
.on('exit', (code
) => {
83 WorkerUtils
.defaultExitHandler(code
);
84 this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
));
86 this.workerSet
.add({ worker
, numberOfWorkerElements
: 0 });
89 private getLastWorkerSetElement(): WorkerSetElement
{
90 let workerSetElement
: WorkerSetElement
;
91 // eslint-disable-next-line no-empty
92 for (workerSetElement
of this.workerSet
) { /* This is intentional */ }
93 return workerSetElement
;
96 private getLastWorker(): Worker
{
97 return this.getLastWorkerSetElement().worker
;
100 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
{
101 let workerSetElt
: WorkerSetElement
;
102 for (const workerSetElement
of this.workerSet
) {
103 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
104 workerSetElt
= workerSetElement
;