1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { EventEmitter
} from
'node:events';
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads';
6 import { WorkerAbstract
} from
'./WorkerAbstract';
7 import { WorkerConstants
} from
'./WorkerConstants';
13 type WorkerSetElement
,
15 } from
'./WorkerTypes';
16 import { sleep
} from
'./WorkerUtils';
18 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
19 public readonly emitter
: EventEmitter
;
20 private readonly workerSet
: Set
<WorkerSetElement
>;
23 * Creates a new `WorkerSet`.
25 * @param workerScript -
26 * @param workerOptions -
28 constructor(workerScript
: string, workerOptions
?: WorkerOptions
) {
29 super(workerScript
, workerOptions
);
30 this.workerOptions
.poolOptions
= {
33 restartWorkerOnError
: true,
35 ...this.workerOptions
.poolOptions
,
37 this.workerSet
= new Set
<WorkerSetElement
>();
38 if (this.workerOptions
?.poolOptions
?.enableEvents
) {
39 this.emitter
= new EventEmitter();
45 version
: WorkerConstants
.version
,
49 elementsExecuting
: [...this.workerSet
].reduce(
50 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
53 elementsPerWorker
: this.maxElementsPerWorker
!,
58 return this.workerSet
.size
;
61 get
maxElementsPerWorker(): number | undefined {
62 return this.workerOptions
.elementsPerWorker
;
66 public async start(): Promise
<void> {
67 this.addWorkerSetElement();
68 // Add worker set element sequentially to optimize memory at startup
69 this.workerOptions
.workerStartDelay
! > 0 && (await sleep(this.workerOptions
.workerStartDelay
!));
73 public async stop(): Promise
<void> {
74 for (const workerSetElement
of this.workerSet
) {
75 const workerExitPromise
= new Promise
<void>((resolve
) => {
76 workerSetElement
.worker
.on('exit', () => {
80 await workerSetElement
.worker
.terminate();
81 await workerExitPromise
;
83 this.workerSet
.clear();
87 public async addElement(elementData
: WorkerData
): Promise
<void> {
88 if (!this.workerSet
) {
89 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
91 const workerSetElement
= await this.getWorkerSetElement();
92 workerSetElement
.worker
.postMessage({
93 id
: WorkerMessageEvents
.startWorkerElement
,
96 ++workerSetElement
.numberOfWorkerElements
;
97 // Add element sequentially to optimize memory at startup
98 if (this.workerOptions
.elementStartDelay
! > 0) {
99 await sleep(this.workerOptions
.elementStartDelay
!);
104 * Adds a new `WorkerSetElement`.
106 private addWorkerSetElement(): WorkerSetElement
{
107 const worker
= new Worker(this.workerScript
, {
109 ...this.workerOptions
.poolOptions
?.workerOptions
,
113 this.workerOptions
?.poolOptions
?.messageHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
117 this.workerOptions
?.poolOptions
?.errorHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
119 worker
.on('error', (error
) => {
120 this.emitter
?.emit(WorkerSetEvents
.error
, error
);
121 if (this.workerOptions
?.poolOptions
?.restartWorkerOnError
) {
122 this.addWorkerSetElement();
127 this.workerOptions
?.poolOptions
?.onlineHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
131 this.workerOptions
?.poolOptions
?.exitHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
133 worker
.once('exit', () =>
134 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
)!),
136 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
137 this.workerSet
.add(workerSetElement
);
138 return workerSetElement
;
141 private removeWorkerSetElement(workerSetElement
: WorkerSetElement
): void {
142 this.workerSet
.delete(workerSetElement
);
145 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
146 let chosenWorkerSetElement
: WorkerSetElement
| undefined;
147 for (const workerSetElement
of this.workerSet
) {
148 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
149 chosenWorkerSetElement
= workerSetElement
;
153 if (!chosenWorkerSetElement
) {
154 chosenWorkerSetElement
= this.addWorkerSetElement();
155 // Add worker set element sequentially to optimize memory at startup
156 this.workerOptions
.workerStartDelay
! > 0 &&
157 (await sleep(this.workerOptions
.workerStartDelay
!));
159 return chosenWorkerSetElement
;
162 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
163 let workerSetElt
: WorkerSetElement
| undefined;
164 for (const workerSetElement
of this.workerSet
) {
165 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
166 workerSetElt
= workerSetElement
;