1 // Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
3 import { Worker
} from
'worker_threads';
10 type WorkerSetElement
,
11 } from
'../types/Worker';
12 import Utils from
'../utils/Utils';
13 import WorkerAbstract from
'./WorkerAbstract';
14 import { WorkerUtils
} from
'./WorkerUtils';
16 export default class WorkerSet
extends WorkerAbstract
<WorkerData
> {
17 private readonly workerSet
: Set
<WorkerSetElement
>;
20 * Create a new `WorkerSet`.
22 * @param workerScript -
23 * @param workerOptions -
25 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
26 super(workerScript
, workerOptions
);
27 this.workerSet
= new Set
<WorkerSetElement
>();
31 return this.workerSet
.size
;
34 get
maxElementsPerWorker(): number | null {
35 return this.workerOptions
.elementsPerWorker
;
40 * @param elementData -
44 public async addElement(elementData
: WorkerData
): Promise
<void> {
45 if (!this.workerSet
) {
46 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
49 this.workerSet
.size
=== 0 ||
50 this.getLastWorkerSetElement().numberOfWorkerElements
>= this.workerOptions
.elementsPerWorker
52 await this.startWorker();
54 this.getLastWorker().postMessage({
55 id
: WorkerMessageEvents
.START_WORKER_ELEMENT
,
58 this.getLastWorkerSetElement().numberOfWorkerElements
++;
59 // Start element sequentially to optimize memory at startup
60 if (this.workerOptions
.elementStartDelay
> 0) {
61 await Utils
.sleep(this.workerOptions
.elementStartDelay
);
70 public async start(): Promise
<void> {
71 await this.startWorker();
79 public async stop(): Promise
<void> {
80 for (const workerSetElement
of this.workerSet
) {
81 await workerSetElement
.worker
.terminate();
83 this.workerSet
.clear();
90 private async startWorker(): Promise
<void> {
91 const worker
= new Worker(this.workerScript
);
95 this.workerOptions
?.messageHandler
??
97 /* This is intentional */
99 ).bind(this) as MessageHandler
<Worker
>
101 worker
.on('error', WorkerUtils
.defaultErrorHandler
.bind(this) as (err
: Error) => void);
102 worker
.on('exit', (code
) => {
103 WorkerUtils
.defaultExitHandler(code
);
104 this.workerSet
.delete(this.getWorkerSetElementByWorker(worker
));
106 this.workerSet
.add({ worker
, numberOfWorkerElements
: 0 });
107 // Start worker sequentially to optimize memory at startup
108 this.workerOptions
.workerStartDelay
> 0 &&
109 (await Utils
.sleep(this.workerOptions
.workerStartDelay
));
112 private getLastWorkerSetElement(): WorkerSetElement
{
113 let workerSetElement
: WorkerSetElement
;
114 for (workerSetElement
of this.workerSet
) {
115 /* This is intentional */
117 return workerSetElement
;
120 private getLastWorker(): Worker
{
121 return this.getLastWorkerSetElement().worker
;
124 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
{
125 let workerSetElt
: WorkerSetElement
;
126 for (const workerSetElement
of this.workerSet
) {
127 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
128 workerSetElt
= workerSetElement
;