"dev": true
},
"mocha": {
- "version": "9.1.0",
- "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.0.tgz",
- "integrity": "sha512-Kjg/XxYOFFUi0h/FwMOeb6RoroiZ+P1yOfya6NK7h3dNhahrJx1r2XIT3ge4ZQvJM86mdjNA+W5phqRQh7DwCg==",
+ "version": "9.1.1",
+ "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.1.tgz",
+ "integrity": "sha512-0wE74YMgOkCgBUj8VyIDwmLUjTsS13WV1Pg7l0SHea2qzZzlq7MDnfbPsHKcELBRk3+izEVkRofjmClpycudCA==",
"dev": true,
"requires": {
"@ungap/promise-all-settled": "1.1.2",
"eslint-plugin-jsdoc": "^36.0.8",
"eslint-plugin-node": "^11.1.0",
"expect": "^27.1.0",
- "mocha": "^9.1.0",
+ "mocha": "^9.1.1",
"mochawesome": "^6.2.2",
"npm-check": "^5.9.2",
"nyc": "^15.1.0",
elementsPerWorker: Configuration.getChargingStationsPerWorker(),
poolOptions: {
workerChoiceStrategy: Configuration.getWorkerPoolStrategy()
- }
- // eslint-disable-next-line @typescript-eslint/no-misused-promises
- }, async (msg: WorkerMessage) => {
- if (msg.id === WorkerMessageEvents.PERFORMANCE_STATISTICS) {
- await Bootstrap.storage.storePerformanceStatistics(msg.data);
+ },
+ messageHandler: async (msg: WorkerMessage) => {
+ if (msg.id === WorkerMessageEvents.PERFORMANCE_STATISTICS) {
+ await Bootstrap.storage.storePerformanceStatistics(msg.data);
+ }
}
});
}
poolMinSize?: number;
elementsPerWorker?: number;
poolOptions?: PoolOptions<Worker>;
+ messageHandler?: (message: any) => void | Promise<void>;
}
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export default abstract class WorkerAbstract {
protected readonly workerScript: string;
protected readonly workerStartDelay: number;
- protected readonly messageListener: (message: any) => void;
public abstract size: number;
public abstract maxElementsPerWorker: number | null;
*
* @param workerScript
* @param workerStartDelay
- * @param messageListenerCallback
*/
- constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY,
- messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
+ constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) {
this.workerScript = workerScript;
this.workerStartDelay = workerStartDelay;
- this.messageListener = messageListenerCallback;
}
public abstract start(): Promise<void>;
* @param max
* @param workerStartDelay
* @param opts
- * @param messageListenerCallback
*/
- constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>,
- messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
- super(workerScript, workerStartDelay, messageListenerCallback);
+ constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions<Worker>) {
+ super(workerScript, workerStartDelay);
opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
this.pool = new DynamicThreadPool<WorkerData>(min, max, this.workerScript, opts);
}
+import { Worker, isMainThread } from 'worker_threads';
import { WorkerOptions, WorkerProcessType } from '../types/Worker';
import Constants from '../utils/Constants';
+import { PoolOptions } from 'poolifier';
import WorkerAbstract from './WorkerAbstract';
import WorkerDynamicPool from './WorkerDynamicPool';
import WorkerSet from './WorkerSet';
import WorkerStaticPool from './WorkerStaticPool';
-import { isMainThread } from 'worker_threads';
export default class WorkerFactory {
// eslint-disable-next-line @typescript-eslint/no-empty-function
// This is intentional
}
- public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions,
- messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }): WorkerAbstract | null {
+ public static getWorkerImplementation<T>(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null {
if (!isMainThread) {
throw new Error('Trying to get a worker implementation outside the main thread');
}
options = options ?? {} as WorkerOptions;
options.startDelay = options.startDelay ?? Constants.WORKER_START_DELAY;
+ options.poolOptions = options?.poolOptions ?? {} as PoolOptions<Worker>;
+ // options?.messageHandler && options.poolOptions.messageHandler = options.messageHandler;
let workerImplementation: WorkerAbstract = null;
switch (workerProcessType) {
case WorkerProcessType.WORKER_SET:
options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER;
- workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay, messageListenerCallback);
+ workerImplementation = new WorkerSet<T>(workerScript, options.elementsPerWorker, options.startDelay, options);
break;
case WorkerProcessType.STATIC_POOL:
options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
- workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
+ workerImplementation = new WorkerStaticPool<T>(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions);
break;
case WorkerProcessType.DYNAMIC_POOL:
options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE;
options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE;
- workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback);
+ workerImplementation = new WorkerDynamicPool<T>(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions);
break;
default:
throw new Error(`Worker implementation type '${workerProcessType}' not found`);
// Partial Copyright Jerome Benoit. 2021. All Rights Reserved.
-import { WorkerMessageEvents, WorkerSetElement } from '../types/Worker';
+import { WorkerMessageEvents, WorkerOptions, WorkerSetElement } from '../types/Worker';
import Utils from '../utils/Utils';
import { Worker } from 'worker_threads';
import { WorkerUtils } from './WorkerUtils';
export default class WorkerSet<T> extends WorkerAbstract {
- public maxElementsPerWorker: number;
+ public readonly maxElementsPerWorker: number;
+ private readonly messageHandler: (message: any) => void | Promise<void>;
private workerSet: Set<WorkerSetElement>;
/**
* @param workerScript
* @param maxElementsPerWorker
* @param workerStartDelay
- * @param messageListenerCallback
+ * @param opts
*/
- constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
- super(workerScript, workerStartDelay, messageListenerCallback);
- this.workerSet = new Set<WorkerSetElement>();
+ constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, opts?: WorkerOptions) {
+ super(workerScript, workerStartDelay);
this.maxElementsPerWorker = maxElementsPerWorker;
+ this.messageHandler = opts?.messageHandler ?? (() => { });
+ this.workerSet = new Set<WorkerSetElement>();
}
get size(): number {
*/
private startWorker(): void {
const worker = new Worker(this.workerScript);
- worker.on('message', this.messageListener);
+ worker.on('message', this.messageHandler);
worker.on('error', () => { /* This is intentional */ });
worker.on('exit', (code) => {
WorkerUtils.defaultExitHandler(code);
* @param numberOfThreads
* @param startWorkerDelay
* @param opts
- * @param messageListenerCallback
*/
- constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>,
- messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) {
- super(workerScript, startWorkerDelay, messageListenerCallback);
+ constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions<Worker>) {
+ super(workerScript, startWorkerDelay);
opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler;
this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts);
}