From ffd71f2c31025fcec6d5a95e1fba5d32c6d28e5b Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sat, 28 Aug 2021 16:12:43 +0200 Subject: [PATCH] Move worker message handler to the options argument MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- package-lock.json | 6 +++--- package.json | 2 +- src/charging-station/Bootstrap.ts | 10 +++++----- src/types/Worker.ts | 1 + src/worker/WorkerAbstract.ts | 6 +----- src/worker/WorkerDynamicPool.ts | 6 ++---- src/worker/WorkerFactory.ts | 14 ++++++++------ src/worker/WorkerSet.ts | 16 +++++++++------- src/worker/WorkerStaticPool.ts | 6 ++---- 9 files changed, 32 insertions(+), 35 deletions(-) diff --git a/package-lock.json b/package-lock.json index 30024ef1..202fc838 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9181,9 +9181,9 @@ "dev": true }, "mocha": { - "version": "9.1.0", - "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.0.tgz", - "integrity": "sha512-Kjg/XxYOFFUi0h/FwMOeb6RoroiZ+P1yOfya6NK7h3dNhahrJx1r2XIT3ge4ZQvJM86mdjNA+W5phqRQh7DwCg==", + "version": "9.1.1", + "resolved": "https://registry.npmjs.org/mocha/-/mocha-9.1.1.tgz", + "integrity": "sha512-0wE74YMgOkCgBUj8VyIDwmLUjTsS13WV1Pg7l0SHea2qzZzlq7MDnfbPsHKcELBRk3+izEVkRofjmClpycudCA==", "dev": true, "requires": { "@ungap/promise-all-settled": "1.1.2", diff --git a/package.json b/package.json index b8cb11c2..d965b838 100644 --- a/package.json +++ b/package.json @@ -98,7 +98,7 @@ "eslint-plugin-jsdoc": "^36.0.8", "eslint-plugin-node": "^11.1.0", "expect": "^27.1.0", - "mocha": "^9.1.0", + "mocha": "^9.1.1", "mochawesome": "^6.2.2", "npm-check": "^5.9.2", "nyc": "^15.1.0", diff --git a/src/charging-station/Bootstrap.ts b/src/charging-station/Bootstrap.ts index 6bcc8511..a982cb69 100644 --- a/src/charging-station/Bootstrap.ts +++ b/src/charging-station/Bootstrap.ts @@ -95,11 +95,11 @@ export default class Bootstrap { elementsPerWorker: Configuration.getChargingStationsPerWorker(), poolOptions: { workerChoiceStrategy: Configuration.getWorkerPoolStrategy() - } - // eslint-disable-next-line @typescript-eslint/no-misused-promises - }, async (msg: WorkerMessage) => { - if (msg.id === WorkerMessageEvents.PERFORMANCE_STATISTICS) { - await Bootstrap.storage.storePerformanceStatistics(msg.data); + }, + messageHandler: async (msg: WorkerMessage) => { + if (msg.id === WorkerMessageEvents.PERFORMANCE_STATISTICS) { + await Bootstrap.storage.storePerformanceStatistics(msg.data); + } } }); } diff --git a/src/types/Worker.ts b/src/types/Worker.ts index c4ad7052..0c795b2e 100644 --- a/src/types/Worker.ts +++ b/src/types/Worker.ts @@ -13,6 +13,7 @@ export interface WorkerOptions { poolMinSize?: number; elementsPerWorker?: number; poolOptions?: PoolOptions; + messageHandler?: (message: any) => void | Promise; } // eslint-disable-next-line @typescript-eslint/no-empty-interface diff --git a/src/worker/WorkerAbstract.ts b/src/worker/WorkerAbstract.ts index d5f9a58e..a18a085e 100644 --- a/src/worker/WorkerAbstract.ts +++ b/src/worker/WorkerAbstract.ts @@ -4,7 +4,6 @@ import { WorkerData } from '../types/Worker'; export default abstract class WorkerAbstract { protected readonly workerScript: string; protected readonly workerStartDelay: number; - protected readonly messageListener: (message: any) => void; public abstract size: number; public abstract maxElementsPerWorker: number | null; @@ -13,13 +12,10 @@ export default abstract class WorkerAbstract { * * @param workerScript * @param workerStartDelay - * @param messageListenerCallback */ - constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY, - messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { + constructor(workerScript: string, workerStartDelay: number = Constants.WORKER_START_DELAY) { this.workerScript = workerScript; this.workerStartDelay = workerStartDelay; - this.messageListener = messageListenerCallback; } public abstract start(): Promise; diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts index 0e1cd4cd..044938d4 100644 --- a/src/worker/WorkerDynamicPool.ts +++ b/src/worker/WorkerDynamicPool.ts @@ -17,11 +17,9 @@ export default class WorkerDynamicPool extends WorkerAbstract { * @param max * @param workerStartDelay * @param opts - * @param messageListenerCallback */ - constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions, - messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { - super(workerScript, workerStartDelay, messageListenerCallback); + constructor(workerScript: string, min: number, max: number, workerStartDelay?: number, opts?: PoolOptions) { + super(workerScript, workerStartDelay); opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler; this.pool = new DynamicThreadPool(min, max, this.workerScript, opts); } diff --git a/src/worker/WorkerFactory.ts b/src/worker/WorkerFactory.ts index fdfd0cb4..65d2f4d4 100644 --- a/src/worker/WorkerFactory.ts +++ b/src/worker/WorkerFactory.ts @@ -1,11 +1,12 @@ +import { Worker, isMainThread } from 'worker_threads'; import { WorkerOptions, WorkerProcessType } from '../types/Worker'; import Constants from '../utils/Constants'; +import { PoolOptions } from 'poolifier'; import WorkerAbstract from './WorkerAbstract'; import WorkerDynamicPool from './WorkerDynamicPool'; import WorkerSet from './WorkerSet'; import WorkerStaticPool from './WorkerStaticPool'; -import { isMainThread } from 'worker_threads'; export default class WorkerFactory { // eslint-disable-next-line @typescript-eslint/no-empty-function @@ -13,27 +14,28 @@ export default class WorkerFactory { // This is intentional } - public static getWorkerImplementation(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions, - messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }): WorkerAbstract | null { + public static getWorkerImplementation(workerScript: string, workerProcessType: WorkerProcessType, options?: WorkerOptions): WorkerAbstract | null { if (!isMainThread) { throw new Error('Trying to get a worker implementation outside the main thread'); } options = options ?? {} as WorkerOptions; options.startDelay = options.startDelay ?? Constants.WORKER_START_DELAY; + options.poolOptions = options?.poolOptions ?? {} as PoolOptions; + // options?.messageHandler && options.poolOptions.messageHandler = options.messageHandler; let workerImplementation: WorkerAbstract = null; switch (workerProcessType) { case WorkerProcessType.WORKER_SET: options.elementsPerWorker = options.elementsPerWorker ?? Constants.DEFAULT_CHARGING_STATIONS_PER_WORKER; - workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay, messageListenerCallback); + workerImplementation = new WorkerSet(workerScript, options.elementsPerWorker, options.startDelay, options); break; case WorkerProcessType.STATIC_POOL: options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback); + workerImplementation = new WorkerStaticPool(workerScript, options.poolMaxSize, options.startDelay, options.poolOptions); break; case WorkerProcessType.DYNAMIC_POOL: options.poolMinSize = options.poolMinSize ?? Constants.DEFAULT_WORKER_POOL_MIN_SIZE; options.poolMaxSize = options.poolMaxSize ?? Constants.DEFAULT_WORKER_POOL_MAX_SIZE; - workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions, messageListenerCallback); + workerImplementation = new WorkerDynamicPool(workerScript, options.poolMinSize, options.poolMaxSize, options.startDelay, options.poolOptions); break; default: throw new Error(`Worker implementation type '${workerProcessType}' not found`); diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index 9b9ad332..6ab08b92 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -1,6 +1,6 @@ // Partial Copyright Jerome Benoit. 2021. All Rights Reserved. -import { WorkerMessageEvents, WorkerSetElement } from '../types/Worker'; +import { WorkerMessageEvents, WorkerOptions, WorkerSetElement } from '../types/Worker'; import Utils from '../utils/Utils'; import { Worker } from 'worker_threads'; @@ -8,7 +8,8 @@ import WorkerAbstract from './WorkerAbstract'; import { WorkerUtils } from './WorkerUtils'; export default class WorkerSet extends WorkerAbstract { - public maxElementsPerWorker: number; + public readonly maxElementsPerWorker: number; + private readonly messageHandler: (message: any) => void | Promise; private workerSet: Set; /** @@ -17,12 +18,13 @@ export default class WorkerSet extends WorkerAbstract { * @param workerScript * @param maxElementsPerWorker * @param workerStartDelay - * @param messageListenerCallback + * @param opts */ - constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { - super(workerScript, workerStartDelay, messageListenerCallback); - this.workerSet = new Set(); + constructor(workerScript: string, maxElementsPerWorker = 1, workerStartDelay?: number, opts?: WorkerOptions) { + super(workerScript, workerStartDelay); this.maxElementsPerWorker = maxElementsPerWorker; + this.messageHandler = opts?.messageHandler ?? (() => { }); + this.workerSet = new Set(); } get size(): number { @@ -77,7 +79,7 @@ export default class WorkerSet extends WorkerAbstract { */ private startWorker(): void { const worker = new Worker(this.workerScript); - worker.on('message', this.messageListener); + worker.on('message', this.messageHandler); worker.on('error', () => { /* This is intentional */ }); worker.on('exit', (code) => { WorkerUtils.defaultExitHandler(code); diff --git a/src/worker/WorkerStaticPool.ts b/src/worker/WorkerStaticPool.ts index d5e92226..e4a8910b 100644 --- a/src/worker/WorkerStaticPool.ts +++ b/src/worker/WorkerStaticPool.ts @@ -16,11 +16,9 @@ export default class WorkerStaticPool extends WorkerAbstract { * @param numberOfThreads * @param startWorkerDelay * @param opts - * @param messageListenerCallback */ - constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions, - messageListenerCallback: (message: any) => void = () => { /* This is intentional */ }) { - super(workerScript, startWorkerDelay, messageListenerCallback); + constructor(workerScript: string, numberOfThreads: number, startWorkerDelay?: number, opts?: PoolOptions) { + super(workerScript, startWorkerDelay); opts.exitHandler = opts?.exitHandler ?? WorkerUtils.defaultExitHandler; this.pool = new FixedThreadPool(numberOfThreads, this.workerScript, opts); } -- 2.34.1