1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { EventEmitter
} from
'node:events';
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads';
6 import type { ThreadPoolOptions
} from
'poolifier';
8 import { WorkerAbstract
} from
'./WorkerAbstract';
9 import { WorkerConstants
} from
'./WorkerConstants';
15 type WorkerSetElement
,
17 } from
'./WorkerTypes';
18 import { sleep
} from
'./WorkerUtils';
20 const DEFAULT_POOL_OPTIONS
: ThreadPoolOptions
= {
22 restartWorkerOnError
: true,
25 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
26 public readonly emitter
!: EventEmitter
;
27 private readonly workerSet
: Set
<WorkerSetElement
>;
30 * Creates a new `WorkerSet`.
32 * @param workerScript -
33 * @param workerOptions -
35 constructor(workerScript
: string, workerOptions
: WorkerOptions
) {
36 super(workerScript
, workerOptions
);
37 this.workerOptions
.poolOptions
= {
38 ...DEFAULT_POOL_OPTIONS
,
39 ...this.workerOptions
.poolOptions
,
41 this.workerSet
= new Set
<WorkerSetElement
>();
42 if (this.workerOptions
.poolOptions
?.enableEvents
) {
43 this.emitter
= new EventEmitter();
49 version
: WorkerConstants
.version
,
53 elementsExecuting
: [...this.workerSet
].reduce(
54 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
57 elementsPerWorker
: this.maxElementsPerWorker
!,
62 return this.workerSet
.size
;
65 get
maxElementsPerWorker(): number | undefined {
66 return this.workerOptions
.elementsPerWorker
;
70 public async start(): Promise
<void> {
71 this.addWorkerSetElement();
72 // Add worker set element sequentially to optimize memory at startup
73 this.workerOptions
.workerStartDelay
! > 0 && (await sleep(this.workerOptions
.workerStartDelay
!));
77 public async stop(): Promise
<void> {
78 for (const workerSetElement
of this.workerSet
) {
79 const workerExitPromise
= new Promise
<void>((resolve
) => {
80 workerSetElement
.worker
.on('exit', () => {
84 await workerSetElement
.worker
.terminate();
85 await workerExitPromise
;
87 this.workerSet
.clear();
91 public async addElement(elementData
: WorkerData
): Promise
<void> {
92 if (!this.workerSet
) {
93 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
95 const workerSetElement
= await this.getWorkerSetElement();
96 workerSetElement
.worker
.postMessage({
97 id
: WorkerMessageEvents
.startWorkerElement
,
100 ++workerSetElement
.numberOfWorkerElements
;
101 // Add element sequentially to optimize memory at startup
102 if (this.workerOptions
.elementStartDelay
! > 0) {
103 await sleep(this.workerOptions
.elementStartDelay
!);
108 * Adds a new `WorkerSetElement`.
110 private addWorkerSetElement(): WorkerSetElement
{
111 const worker
= new Worker(this.workerScript
, {
113 ...this.workerOptions
.poolOptions
?.workerOptions
,
117 this.workerOptions
.poolOptions
?.messageHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
121 this.workerOptions
.poolOptions
?.errorHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
123 worker
.on('error', (error
) => {
124 this.emitter
?.emit(WorkerSetEvents
.error
, error
);
125 if (this.workerOptions
.poolOptions
?.restartWorkerOnError
) {
126 this.addWorkerSetElement();
131 this.workerOptions
.poolOptions
?.onlineHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
135 this.workerOptions
.poolOptions
?.exitHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
137 worker
.once('exit', () =>
138 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
)!),
140 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
141 this.workerSet
.add(workerSetElement
);
142 return workerSetElement
;
145 private removeWorkerSetElement(workerSetElement
: WorkerSetElement
): void {
146 this.workerSet
.delete(workerSetElement
);
149 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
150 let chosenWorkerSetElement
: WorkerSetElement
| undefined;
151 for (const workerSetElement
of this.workerSet
) {
152 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
153 chosenWorkerSetElement
= workerSetElement
;
157 if (!chosenWorkerSetElement
) {
158 chosenWorkerSetElement
= this.addWorkerSetElement();
159 // Add worker set element sequentially to optimize memory at startup
160 this.workerOptions
.workerStartDelay
! > 0 &&
161 (await sleep(this.workerOptions
.workerStartDelay
!));
163 return chosenWorkerSetElement
;
166 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
167 let workerSetElt
: WorkerSetElement
| undefined;
168 for (const workerSetElement
of this.workerSet
) {
169 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
170 workerSetElt
= workerSetElement
;