// Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
-import { EventEmitter } from 'node:events';
+import { EventEmitterAsyncResource } from 'node:events';
import { SHARE_ENV, Worker } from 'node:worker_threads';
import { WorkerAbstract } from './WorkerAbstract';
-import { DEFAULT_POOL_OPTIONS, EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
+import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
import {
type SetInfo,
type WorkerData,
type WorkerSetElement,
WorkerSetEvents,
} from './WorkerTypes';
-import { sleep } from './WorkerUtils';
+import { randomizeDelay, sleep } from './WorkerUtils';
export class WorkerSet extends WorkerAbstract<WorkerData> {
- public readonly emitter!: EventEmitter;
+ public readonly emitter: EventEmitterAsyncResource | undefined;
private readonly workerSet: Set<WorkerSetElement>;
+ private started: boolean;
private workerStartup: boolean;
/**
if (this.workerOptions.elementsPerWorker <= 0) {
throw new RangeError('Elements per worker must be greater than zero');
}
- this.workerOptions.poolOptions = {
- ...DEFAULT_POOL_OPTIONS,
- ...this.workerOptions.poolOptions,
- };
this.workerSet = new Set<WorkerSetElement>();
if (this.workerOptions.poolOptions?.enableEvents) {
- this.emitter = new EventEmitter();
+ this.emitter = new EventEmitterAsyncResource({ name: 'workerset' });
}
+ this.started = false;
this.workerStartup = false;
}
public async start(): Promise<void> {
this.addWorkerSetElement();
// Add worker set element sequentially to optimize memory at startup
- this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
+ this.workerOptions.workerStartDelay! > 0 &&
+ (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
+ this.emitter?.emit(WorkerSetEvents.started, this.info);
+ this.started = true;
}
/** @inheritDoc */
for (const workerSetElement of this.workerSet) {
const worker = workerSetElement.worker;
const waitWorkerExit = new Promise<void>((resolve) => {
- worker.on('exit', () => {
+ worker.once('exit', () => {
resolve();
});
});
await worker.terminate();
await waitWorkerExit;
+ this.emitter?.emit(WorkerSetEvents.stopped, this.info);
+ this.emitter?.emitDestroy();
+ this.emitter?.removeAllListeners();
+ this.started = false;
}
}
/** @inheritDoc */
public async addElement(elementData: WorkerData): Promise<void> {
- if (!this.workerSet) {
- throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
+ if (!this.started) {
+ throw new Error('Cannot add a WorkerSet element: not started');
+ }
+ if (this.workerSet == null) {
+ throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
}
const workerSetElement = await this.getWorkerSetElement();
workerSetElement.worker.postMessage({
++workerSetElement.numberOfWorkerElements;
// Add element sequentially to optimize memory at startup
if (this.workerOptions.elementStartDelay! > 0) {
- await sleep(this.workerOptions.elementStartDelay!);
+ await sleep(randomizeDelay(this.workerOptions.elementStartDelay!));
}
}
worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
worker.on('error', (error) => {
this.emitter?.emit(WorkerSetEvents.error, error);
- if (this.workerOptions.poolOptions?.restartWorkerOnError && !this.workerStartup) {
+ if (
+ this.workerOptions.poolOptions?.restartWorkerOnError &&
+ this.started &&
+ !this.workerStartup
+ ) {
this.addWorkerSetElement();
}
});
worker.once('exit', () =>
this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
);
- this.workerStartup = false;
const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
this.workerSet.add(workerSetElement);
+ this.workerStartup = false;
return workerSetElement;
}
chosenWorkerSetElement = this.addWorkerSetElement();
// Add worker set element sequentially to optimize memory at startup
this.workerOptions.workerStartDelay! > 0 &&
- (await sleep(this.workerOptions.workerStartDelay!));
+ (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!)));
}
return chosenWorkerSetElement;
}