// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
+import EventEmitterAsyncResource from 'node:events';
import { Worker } from 'node:worker_threads';
import { WorkerAbstract } from './WorkerAbstract';
import { defaultErrorHandler, defaultExitHandler, sleep } from './WorkerUtils';
export class WorkerSet extends WorkerAbstract<WorkerData> {
+ public readonly emitter: EventEmitterAsyncResource;
private readonly workerSet: Set<WorkerSetElement>;
/**
constructor(workerScript: string, workerOptions?: WorkerOptions) {
super(workerScript, workerOptions);
this.workerSet = new Set<WorkerSetElement>();
+ this.emitter = new EventEmitterAsyncResource();
}
get size(): number {
if (!this.workerSet) {
throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
}
- let lastWorkerSetElement = this.getLastWorkerSetElement();
- if (
- this.workerSet.size === 0 ||
- lastWorkerSetElement.numberOfWorkerElements >= this.workerOptions.elementsPerWorker
- ) {
- this.startWorker();
- // Start worker sequentially to optimize memory at startup
- this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
- lastWorkerSetElement = this.getLastWorkerSetElement();
- }
- lastWorkerSetElement.worker.postMessage({
+ const workerSetElement = await this.getWorkerSetElement();
+ workerSetElement.worker.postMessage({
id: WorkerMessageEvents.startWorkerElement,
data: elementData,
});
- ++lastWorkerSetElement.numberOfWorkerElements;
- // Start element sequentially to optimize memory at startup
+ ++workerSetElement.numberOfWorkerElements;
+ // Add element sequentially to optimize memory at startup
if (this.workerOptions.elementStartDelay > 0) {
await sleep(this.workerOptions.elementStartDelay);
}
/** @inheritDoc */
public async start(): Promise<void> {
- this.startWorker();
- // Start worker sequentially to optimize memory at startup
+ this.addWorkerSetElement();
+ // Add worker set element sequentially to optimize memory at startup
this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
}
}
/**
- * Start a new `Worker`.
+ * Add a new `WorkerSetElement`.
*/
- private startWorker(): void {
+ private addWorkerSetElement(): WorkerSetElement {
const worker = new Worker(this.workerScript);
worker.on(
'message',
) as MessageHandler<Worker>
);
worker.on('error', defaultErrorHandler.bind(this) as (err: Error) => void);
- worker.on('error', () => this.startWorker());
+ worker.on('error', (error) => {
+ this.emitter.emit('error', error);
+ this.addWorkerSetElement();
+ });
worker.on('exit', defaultExitHandler.bind(this) as (exitCode: number) => void);
worker.on('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker)));
- this.workerSet.add({ worker, numberOfWorkerElements: 0 });
+ const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
+ this.workerSet.add(workerSetElement);
+ return workerSetElement;
}
- private getLastWorkerSetElement(): WorkerSetElement {
- let workerSetElement: WorkerSetElement;
- for (workerSetElement of this.workerSet) {
- /* This is intentional */
+ private async getWorkerSetElement(): Promise<WorkerSetElement> {
+ let chosenWorkerSetElement: WorkerSetElement;
+ for (const workerSetElement of this.workerSet) {
+ if (workerSetElement.numberOfWorkerElements < this.workerOptions.elementsPerWorker) {
+ chosenWorkerSetElement = workerSetElement;
+ break;
+ }
}
- return workerSetElement;
+ if (!chosenWorkerSetElement) {
+ chosenWorkerSetElement = this.addWorkerSetElement();
+ // Add worker set element sequentially to optimize memory at startup
+ this.workerOptions.workerStartDelay > 0 && (await sleep(this.workerOptions.workerStartDelay));
+ }
+ return chosenWorkerSetElement;
}
- private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement {
+ private getWorkerSetElementByWorker(worker: Worker): WorkerSetElement | undefined {
let workerSetElt: WorkerSetElement;
for (const workerSetElement of this.workerSet) {
if (workerSetElement.worker.threadId === worker.threadId) {