1 // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
3 import { EventEmitter
} from
'node:events';
4 import { SHARE_ENV
, Worker
} from
'node:worker_threads';
6 import type { ThreadPoolOptions
} from
'poolifier';
8 import { WorkerAbstract
} from
'./WorkerAbstract';
9 import { WorkerConstants
} from
'./WorkerConstants';
16 type WorkerSetElement
,
18 } from
'./WorkerTypes';
19 import { sleep
} from
'./WorkerUtils';
21 const DEFAULT_POOL_OPTIONS
: ThreadPoolOptions
= {
23 restartWorkerOnError
: true,
26 export class WorkerSet
extends WorkerAbstract
<WorkerData
> {
27 public readonly emitter
!: EventEmitter
;
28 private readonly workerSet
: Set
<WorkerSetElement
>;
31 * Creates a new `WorkerSet`.
33 * @param workerScript -
34 * @param workerOptions -
36 constructor(workerScript
: string, workerOptions
: WorkerOptions
) {
37 super(workerScript
, workerOptions
);
39 this.workerOptions
.elementsPerWorker
=== null ||
40 this.workerOptions
.elementsPerWorker
=== undefined
42 throw new TypeError('Elements per worker is not defined');
44 if (Number.isSafeInteger(this.workerOptions
.elementsPerWorker
)) {
45 throw new TypeError('Elements per worker must be an integer');
47 if (this.workerOptions
.elementsPerWorker
<= 0) {
48 throw new RangeError('Elements per worker must be greater than zero');
50 this.workerOptions
.poolOptions
= {
51 ...DEFAULT_POOL_OPTIONS
,
52 ...this.workerOptions
.poolOptions
,
54 this.workerSet
= new Set
<WorkerSetElement
>();
55 if (this.workerOptions
.poolOptions
?.enableEvents
) {
56 this.emitter
= new EventEmitter();
62 version
: WorkerConstants
.version
,
66 elementsExecuting
: [...this.workerSet
].reduce(
67 (accumulator
, workerSetElement
) => accumulator
+ workerSetElement
.numberOfWorkerElements
,
70 elementsPerWorker
: this.maxElementsPerWorker
!,
75 return this.workerSet
.size
;
78 get
maxElementsPerWorker(): number | undefined {
79 return this.workerOptions
.elementsPerWorker
;
83 public async start(): Promise
<void> {
84 this.addWorkerSetElement();
85 // Add worker set element sequentially to optimize memory at startup
86 this.workerOptions
.workerStartDelay
! > 0 && (await sleep(this.workerOptions
.workerStartDelay
!));
90 public async stop(): Promise
<void> {
91 for (const workerSetElement
of this.workerSet
) {
92 const worker
= workerSetElement
.worker
;
93 const waitWorkerExit
= new Promise
<void>((resolve
) => {
94 worker
.on('exit', () => {
98 await worker
.terminate();
104 public async addElement(elementData
: WorkerData
): Promise
<void> {
105 if (!this.workerSet
) {
106 throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
108 const workerSetElement
= await this.getWorkerSetElement();
109 workerSetElement
.worker
.postMessage({
110 event
: WorkerMessageEvents
.startWorkerElement
,
113 ++workerSetElement
.numberOfWorkerElements
;
114 // Add element sequentially to optimize memory at startup
115 if (this.workerOptions
.elementStartDelay
! > 0) {
116 await sleep(this.workerOptions
.elementStartDelay
!);
121 * Adds a new `WorkerSetElement`.
123 private addWorkerSetElement(): WorkerSetElement
{
124 const worker
= new Worker(this.workerScript
, {
126 ...this.workerOptions
.poolOptions
?.workerOptions
,
130 this.workerOptions
.poolOptions
?.messageHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
132 worker
.on('message', (message
: WorkerMessage
<WorkerData
>) => {
133 if (message
.event
=== WorkerMessageEvents
.startedWorkerElement
) {
134 this.emitter
?.emit(WorkerSetEvents
.elementStarted
, this.info
);
135 } else if (message
.event
=== WorkerMessageEvents
.startWorkerElementError
) {
136 this.emitter
?.emit(WorkerSetEvents
.elementError
, message
.data
);
141 this.workerOptions
.poolOptions
?.errorHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
143 worker
.on('error', (error
) => {
144 this.emitter
?.emit(WorkerSetEvents
.error
, error
);
145 if (this.workerOptions
.poolOptions
?.restartWorkerOnError
) {
146 this.addWorkerSetElement();
151 this.workerOptions
.poolOptions
?.onlineHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
155 this.workerOptions
.poolOptions
?.exitHandler
?? WorkerConstants
.EMPTY_FUNCTION
,
157 worker
.once('exit', () =>
158 this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker
)!),
160 const workerSetElement
: WorkerSetElement
= { worker
, numberOfWorkerElements
: 0 };
161 this.workerSet
.add(workerSetElement
);
162 return workerSetElement
;
165 private removeWorkerSetElement(workerSetElement
: WorkerSetElement
): void {
166 this.workerSet
.delete(workerSetElement
);
169 private async getWorkerSetElement(): Promise
<WorkerSetElement
> {
170 let chosenWorkerSetElement
: WorkerSetElement
| undefined;
171 for (const workerSetElement
of this.workerSet
) {
172 if (workerSetElement
.numberOfWorkerElements
< this.workerOptions
.elementsPerWorker
!) {
173 chosenWorkerSetElement
= workerSetElement
;
177 if (!chosenWorkerSetElement
) {
178 chosenWorkerSetElement
= this.addWorkerSetElement();
179 // Add worker set element sequentially to optimize memory at startup
180 this.workerOptions
.workerStartDelay
! > 0 &&
181 (await sleep(this.workerOptions
.workerStartDelay
!));
183 return chosenWorkerSetElement
;
186 private getWorkerSetElementByWorker(worker
: Worker
): WorkerSetElement
| undefined {
187 let workerSetElt
: WorkerSetElement
| undefined;
188 for (const workerSetElement
of this.workerSet
) {
189 if (workerSetElement
.worker
.threadId
=== worker
.threadId
) {
190 workerSetElt
= workerSetElement
;