-import { type ThreadPoolOptions, availableParallelism } from 'poolifier';
+import { availableParallelism } from 'poolifier';
import type { WorkerOptions } from './WorkerTypes';
poolMinSize: DEFAULT_POOL_MIN_SIZE,
poolMaxSize: DEFAULT_POOL_MAX_SIZE,
elementsPerWorker: DEFAULT_ELEMENTS_PER_WORKER,
- poolOptions: {},
+ poolOptions: {
+ enableEvents: true,
+ restartWorkerOnError: true,
+ },
});
-
-export const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
- enableEvents: true,
- restartWorkerOnError: true,
-};
import { SHARE_ENV, Worker } from 'node:worker_threads';
import { WorkerAbstract } from './WorkerAbstract';
-import { DEFAULT_POOL_OPTIONS, EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
+import { EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
import {
type SetInfo,
type WorkerData,
export class WorkerSet extends WorkerAbstract<WorkerData> {
public readonly emitter!: EventEmitter;
private readonly workerSet: Set<WorkerSetElement>;
+ private started: boolean;
private workerStartup: boolean;
/**
if (this.workerOptions.elementsPerWorker <= 0) {
throw new RangeError('Elements per worker must be greater than zero');
}
- this.workerOptions.poolOptions = {
- ...DEFAULT_POOL_OPTIONS,
- ...this.workerOptions.poolOptions,
- };
this.workerSet = new Set<WorkerSetElement>();
if (this.workerOptions.poolOptions?.enableEvents) {
this.emitter = new EventEmitter();
}
+ this.started = false;
this.workerStartup = false;
}
this.addWorkerSetElement();
// Add worker set element sequentially to optimize memory at startup
this.workerOptions.workerStartDelay! > 0 && (await sleep(this.workerOptions.workerStartDelay!));
+ this.started = true;
}
/** @inheritDoc */
});
await worker.terminate();
await waitWorkerExit;
+ this.started = false;
}
}
/** @inheritDoc */
public async addElement(elementData: WorkerData): Promise<void> {
+ if (!this.started) {
+ throw new Error('Cannot add a WorkerSet element: not started');
+ }
if (!this.workerSet) {
- throw new Error("Cannot add a WorkerSet element: workers' set does not exist");
+ throw new Error("Cannot add a WorkerSet element: 'workerSet' property does not exist");
}
const workerSetElement = await this.getWorkerSetElement();
workerSetElement.worker.postMessage({
worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
worker.on('error', (error) => {
this.emitter?.emit(WorkerSetEvents.error, error);
- if (this.workerOptions.poolOptions?.restartWorkerOnError && !this.workerStartup) {
+ if (
+ this.workerOptions.poolOptions?.restartWorkerOnError &&
+ this.started &&
+ !this.workerStartup
+ ) {
this.addWorkerSetElement();
}
});