import { EventEmitter } from 'node:events';
import { SHARE_ENV, Worker } from 'node:worker_threads';
+import type { ThreadPoolOptions } from 'poolifier';
+
import { WorkerAbstract } from './WorkerAbstract';
import { WorkerConstants } from './WorkerConstants';
import {
} from './WorkerTypes';
import { sleep } from './WorkerUtils';
+const DEFAULT_POOL_OPTIONS: ThreadPoolOptions = {
+ enableEvents: true,
+ restartWorkerOnError: true,
+};
+
export class WorkerSet extends WorkerAbstract<WorkerData> {
- public readonly emitter: EventEmitter;
+ public readonly emitter!: EventEmitter;
private readonly workerSet: Set<WorkerSetElement>;
/**
* @param workerScript -
* @param workerOptions -
*/
- constructor(workerScript: string, workerOptions?: WorkerOptions) {
+ constructor(workerScript: string, workerOptions: WorkerOptions) {
super(workerScript, workerOptions);
this.workerOptions.poolOptions = {
- ...{
- enableEvents: true,
- restartWorkerOnError: true,
- },
+ ...DEFAULT_POOL_OPTIONS,
...this.workerOptions.poolOptions,
};
this.workerSet = new Set<WorkerSetElement>();
- if (this.workerOptions?.poolOptions?.enableEvents) {
+ if (this.workerOptions.poolOptions?.enableEvents) {
this.emitter = new EventEmitter();
}
}
}
const workerSetElement = await this.getWorkerSetElement();
workerSetElement.worker.postMessage({
- id: WorkerMessageEvents.startWorkerElement,
+ event: WorkerMessageEvents.startWorkerElement,
data: elementData,
});
++workerSetElement.numberOfWorkerElements;
});
worker.on(
'message',
- this.workerOptions?.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
+ this.workerOptions.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION,
);
worker.on(
'error',
- this.workerOptions?.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
+ this.workerOptions.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION,
);
worker.on('error', (error) => {
this.emitter?.emit(WorkerSetEvents.error, error);
- if (this.workerOptions?.poolOptions?.restartWorkerOnError) {
+ if (this.workerOptions.poolOptions?.restartWorkerOnError) {
this.addWorkerSetElement();
}
});
worker.on(
'online',
- this.workerOptions?.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
+ this.workerOptions.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION,
);
worker.on(
'exit',
- this.workerOptions?.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
+ this.workerOptions.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION,
);
worker.once('exit', () =>
this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)!),