import { EventEmitter } from 'node:events';
import { SHARE_ENV, Worker } from 'node:worker_threads';
-import type { ThreadPoolOptions } from 'poolifier';
-
import { WorkerAbstract } from './WorkerAbstract';
-import { WorkerConstants } from './WorkerConstants';
+import { DEFAULT_POOL_OPTIONS, EMPTY_FUNCTION, workerSetVersion } from './WorkerConstants';
import {
type SetInfo,
type WorkerData,
} from './WorkerTypes';
import { sleep } from './WorkerUtils';
-const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
- enableEvents: true,
- restartWorkerOnError: true,
-};
-
export class WorkerSet extends WorkerAbstract<WorkerData> {
public readonly emitter!: EventEmitter;
private readonly workerSet: Set<WorkerSetElement>;
+ private workerStartup: boolean;
/**
* Creates a new `WorkerSet`.
*/
constructor(workerScript: string, workerOptions: WorkerOptions) {
super(workerScript, workerOptions);
+ if (
+ this.workerOptions.elementsPerWorker === null ||
+ this.workerOptions.elementsPerWorker === undefined
+ ) {
+ throw new TypeError('Elements per worker is not defined');
+ }
+ if (!Number.isSafeInteger(this.workerOptions.elementsPerWorker)) {
+ throw new TypeError('Elements per worker must be an integer');
+ }
+ if (this.workerOptions.elementsPerWorker <= 0) {
+ throw new RangeError('Elements per worker must be greater than zero');
+ }
this.workerOptions.poolOptions = {
...DEFAULT_POOL_OPTIONS,
...this.workerOptions.poolOptions,
if (this.workerOptions.poolOptions?.enableEvents) {
this.emitter = new EventEmitter();
}
+ this.workerStartup = false;
}
get info(): SetInfo {
return {
- version: WorkerConstants.version,
+ version: workerSetVersion,
type: 'set',
worker: 'thread',
size: this.size,
/** @inheritDoc */
public async stop(): Promise<void> {
for (const workerSetElement of this.workerSet) {
- const workerExitPromise = new Promise<void>((resolve) => {
- workerSetElement.worker.on('exit', () => {
+ const worker = workerSetElement.worker;
+ const waitWorkerExit = new Promise<void>((resolve) => {
+ worker.on('exit', () => {
resolve();
});
});
- await workerSetElement.worker.terminate();
- await workerExitPromise;
+ await worker.terminate();
+ await waitWorkerExit;
}
- this.workerSet.clear();
}
/** @inheritDoc */
* Adds a new `WorkerSetElement`.
*/
private addWorkerSetElement(): WorkerSetElement {
+ this.workerStartup = true;
const worker = new Worker(this.workerScript, {
env: SHARE_ENV,
...this.workerOptions.poolOptions?.workerOptions,
});
- worker.on(
- 'message',
- this.workerOptions.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
- );
+ worker.on('message', this.workerOptions.poolOptions?.messageHandler ?? EMPTY_FUNCTION);
worker.on('message', (message: WorkerMessage<WorkerData>) => {
if (message.event === WorkerMessageEvents.startedWorkerElement) {
- this.emitter?.emit(WorkerSetEvents.elementStarted, message.data);
+ this.emitter?.emit(WorkerSetEvents.elementStarted, this.info);
} else if (message.event === WorkerMessageEvents.startWorkerElementError) {
this.emitter?.emit(WorkerSetEvents.elementError, message.data);
}
});
- worker.on(
- 'error',
- this.workerOptions.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
- );
+ worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION);
worker.on('error', (error) => {
this.emitter?.emit(WorkerSetEvents.error, error);
- if (this.workerOptions.poolOptions?.restartWorkerOnError) {
+ if (this.workerOptions.poolOptions?.restartWorkerOnError && !this.workerStartup) {
this.addWorkerSetElement();
}
});
- worker.on(
- 'online',
- this.workerOptions.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
- );
- worker.on(
- 'exit',
- this.workerOptions.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
- );
+ worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION);
+ worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION);
worker.once('exit', () =>
this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),
);
+ this.workerStartup = false;
const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 };
this.workerSet.add(workerSetElement);
return workerSetElement;