1 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
3 import { WorkerData
, WorkerMessageEvents
, WorkerOptions
, WorkerSetElement
} from
'../types/Worker';
5 import Utils from
'../utils/Utils';
6 import { Worker
} from
'worker_threads';
7 import WorkerAbstract from
'./WorkerAbstract';
8 import { WorkerUtils
} from
'./WorkerUtils';
10 export default class WorkerSet
extends WorkerAbstract
<WorkerData
> {
11 private readonly workerSet
: Set
<WorkerSetElement
>;
12 private readonly messageHandler
: (message
: unknown
) => void | Promise
<void>;
15 * Create a new `WorkerSet`.
18 * @param workerOptions
20 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
21 super(workerScript
, workerOptions
);
22 this.workerSet
= new Set
<WorkerSetElement
>();
23 this.messageHandler
= workerOptions
?.messageHandler
?? (() => { /* This is intentional */ });
27 return this.workerSet
.size
;
30 get
maxElementsPerWorker(): number | null {
31 return this.workerOptions
.elementsPerWorker
;
40 public async addElement(elementData
: WorkerData
): Promise
<void> {
41 if (!this.workerSet
) {
42 throw new Error('Cannot add a WorkerSet element: workers\' set does not exist');
44 if (this.getLastWorkerSetElement().numberOfWorkerElements
>= this.workerOptions
.elementsPerWorker
) {
45 await this.startWorker();
47 this.getLastWorker().postMessage({
48 id
: WorkerMessageEvents
.START_WORKER_ELEMENT
,
51 this.getLastWorkerSetElement().numberOfWorkerElements
++;
52 // Start element sequentially to optimize memory at startup
53 if (this.workerOptions
.elementStartDelay
> 0) {
54 await Utils
.sleep(this.workerOptions
.elementStartDelay
);
63 public async start(): Promise
<void> {
64 await this.startWorker();
72 public async stop(): Promise
<void> {
73 for (const workerSetElement
of this.workerSet
) {
74 await workerSetElement
.worker
.terminate();
76 this.workerSet
.clear();
83 private async startWorker(): Promise
<void> {
84 const worker
= new Worker(this.workerScript
);
85 worker
.on('message', (msg
) => {
87 await this.messageHandler(msg
);
88 })().catch(() => { /* This is intentional */ });
90 worker
.on('error', () => { /* This is intentional */ });
91 worker
.on('exit', (code
) => {
92 WorkerUtils
.defaultExitHandler(code
);
93 this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
));
95 this.workerSet
.add({ worker
, numberOfWorkerElements
: 0 });
96 // Start worker sequentially to optimize memory at startup
97 this.workerOptions
.workerStartDelay
> 0 && await Utils
.sleep(this.workerOptions
.workerStartDelay
);
100 private getLastWorkerSetElement(): WorkerSetElement
{
101 let workerSetElement
: WorkerSetElement
;
102 for (workerSetElement
of this.workerSet
) { /* This is intentional */ }
103 return workerSetElement
;
106 private getLastWorker(): Worker
{
107 return this.getLastWorkerSetElement().worker
;
110 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
{
111 let workerSetElt
: WorkerSetElement
;
112 for (const workerSetElement
of this.workerSet
) {
113 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
114 workerSetElt
= workerSetElement
;