From 2a8ae9de21aae859fac0dd35942b1ad0446bfc1e Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 18 Aug 2025 23:11:41 +0200 Subject: [PATCH] refactor: make worker set code more robust MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/charging-station/ChargingStationWorker.ts | 4 +- src/utils/Constants.ts | 8 +- src/worker/WorkerAbstract.ts | 10 ++- src/worker/WorkerConstants.ts | 25 +++--- src/worker/WorkerDynamicPool.ts | 7 +- src/worker/WorkerFixedPool.ts | 7 +- src/worker/WorkerSet.ts | 88 +++++++++++++------ src/worker/WorkerTypes.ts | 2 +- 8 files changed, 99 insertions(+), 52 deletions(-) diff --git a/src/charging-station/ChargingStationWorker.ts b/src/charging-station/ChargingStationWorker.ts index fd9717c4..73c816a3 100644 --- a/src/charging-station/ChargingStationWorker.ts +++ b/src/charging-station/ChargingStationWorker.ts @@ -10,7 +10,9 @@ import { Configuration } from '../utils/index.js' import { type WorkerDataError, type WorkerMessage, WorkerMessageEvents } from '../worker/index.js' import { ChargingStation } from './ChargingStation.js' -export let chargingStationWorker: object +export let chargingStationWorker: + | object + | ThreadWorker if (Configuration.workerPoolInUse()) { chargingStationWorker = new ThreadWorker< ChargingStationWorkerData, diff --git a/src/utils/Constants.ts b/src/utils/Constants.ts index 37025b33..9732d386 100644 --- a/src/utils/Constants.ts +++ b/src/utils/Constants.ts @@ -10,7 +10,7 @@ import { // eslint-disable-next-line @typescript-eslint/no-extraneous-class export class Constants { - static readonly DEFAULT_ATG_CONFIGURATION: AutomaticTransactionGeneratorConfiguration = + static readonly DEFAULT_ATG_CONFIGURATION: Readonly = Object.freeze({ enable: false, maxDelayBetweenTwoTransactions: 30, @@ -27,7 +27,7 @@ export class Constants { static readonly DEFAULT_BOOT_NOTIFICATION_INTERVAL = 60000 // Ms static readonly DEFAULT_CIRCULAR_BUFFER_CAPACITY = 386 - static readonly DEFAULT_CONNECTION_TIMEOUT = 30 + static readonly DEFAULT_CONNECTION_TIMEOUT = 30 // Seconds static readonly DEFAULT_FLUCTUATION_PERCENT = 5 static readonly DEFAULT_HASH_ALGORITHM = 'sha384' @@ -87,11 +87,11 @@ export class Constants { static readonly DEFAULT_UI_SERVER_PORT = 8080 static readonly EMPTY_FROZEN_OBJECT = Object.freeze({}) - static readonly EMPTY_FUNCTION = Object.freeze(() => { + static readonly EMPTY_FUNCTION: () => void = Object.freeze(() => { /* This is intentional */ }) - static readonly MAX_RANDOM_INTEGER = 281474976710655 + static readonly MAX_RANDOM_INTEGER = 281474976710655 // 2^48 - 1 (randomInit() limit) static readonly PERFORMANCE_RECORDS_TABLE = 'performance_records' diff --git a/src/worker/WorkerAbstract.ts b/src/worker/WorkerAbstract.ts index d4282e0c..a38db441 100644 --- a/src/worker/WorkerAbstract.ts +++ b/src/worker/WorkerAbstract.ts @@ -1,7 +1,7 @@ import type { EventEmitterAsyncResource } from 'node:events' import type { PoolInfo } from 'poolifier' -import { existsSync } from 'node:fs' +import { statSync } from 'node:fs' import type { SetInfo, WorkerData, WorkerOptions } from './WorkerTypes.js' @@ -29,8 +29,12 @@ export abstract class WorkerAbstract if (workerScript.trim().length === 0) { throw new Error('Worker script is an empty string') } - if (!existsSync(workerScript)) { - throw new Error('Worker script file does not exist') + const workerScriptStats = statSync(workerScript, { throwIfNoEntry: false }) + if (workerScriptStats == null) { + throw new Error(`Worker script file does not exist: '${workerScript}'`) + } + if (!workerScriptStats.isFile()) { + throw new Error(`Worker script is not a regular file: '${workerScript}'`) } this.workerScript = workerScript this.workerOptions = workerOptions diff --git a/src/worker/WorkerConstants.ts b/src/worker/WorkerConstants.ts index 0d7407dc..5f4084f5 100644 --- a/src/worker/WorkerConstants.ts +++ b/src/worker/WorkerConstants.ts @@ -1,4 +1,4 @@ -import { availableParallelism } from 'poolifier' +import { availableParallelism, type ThreadPoolOptions } from 'poolifier' import type { WorkerOptions } from './WorkerTypes.js' @@ -12,21 +12,26 @@ export const workerSetVersion = '1.0.1' export const DEFAULT_ELEMENT_ADD_DELAY = 0 export const DEFAULT_WORKER_START_DELAY = 500 -export const DEFAULT_POOL_MIN_SIZE = Math.floor(availableParallelism() / 2) -export const DEFAULT_POOL_MAX_SIZE = Math.round(availableParallelism() * 1.5) +export const DEFAULT_POOL_MIN_SIZE = Math.max(1, Math.floor(availableParallelism() / 2)) +export const DEFAULT_POOL_MAX_SIZE = Math.max( + DEFAULT_POOL_MIN_SIZE, + Math.round(availableParallelism() * 1.5) +) export const DEFAULT_ELEMENTS_PER_WORKER = 1 +export const DEFAULT_POOL_OPTIONS: Readonly = Object.freeze({ + enableEvents: true, + errorHandler: defaultErrorHandler, + exitHandler: defaultExitHandler, + restartWorkerOnError: true, + startWorkers: false, +}) + export const DEFAULT_WORKER_OPTIONS: Readonly = Object.freeze({ elementAddDelay: DEFAULT_ELEMENT_ADD_DELAY, elementsPerWorker: DEFAULT_ELEMENTS_PER_WORKER, poolMaxSize: DEFAULT_POOL_MAX_SIZE, poolMinSize: DEFAULT_POOL_MIN_SIZE, - poolOptions: { - enableEvents: true, - errorHandler: defaultErrorHandler, - exitHandler: defaultExitHandler, - restartWorkerOnError: true, - startWorkers: false, - }, + poolOptions: DEFAULT_POOL_OPTIONS, workerStartDelay: DEFAULT_WORKER_START_DELAY, }) diff --git a/src/worker/WorkerDynamicPool.ts b/src/worker/WorkerDynamicPool.ts index e5808d30..f7ed91b1 100644 --- a/src/worker/WorkerDynamicPool.ts +++ b/src/worker/WorkerDynamicPool.ts @@ -48,10 +48,9 @@ export class WorkerDynamicPool exten public async addElement (elementData: D): Promise { const response = await this.pool.execute(elementData) // Start element sequentially to optimize memory at startup - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.workerOptions.elementAddDelay! > 0 && - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - (await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))) + if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) { + await sleep(randomizeDelay(this.workerOptions.elementAddDelay)) + } return response } diff --git a/src/worker/WorkerFixedPool.ts b/src/worker/WorkerFixedPool.ts index 8bddad7d..f5d9b01e 100644 --- a/src/worker/WorkerFixedPool.ts +++ b/src/worker/WorkerFixedPool.ts @@ -47,10 +47,9 @@ export class WorkerFixedPool extends public async addElement (elementData: D): Promise { const response = await this.pool.execute(elementData) // Start element sequentially to optimize memory at startup - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.workerOptions.elementAddDelay! > 0 && - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - (await sleep(randomizeDelay(this.workerOptions.elementAddDelay!))) + if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) { + await sleep(randomizeDelay(this.workerOptions.elementAddDelay)) + } return response } diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index babf6932..b0f29e64 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -51,7 +51,7 @@ export class WorkerSet extends Worke } private readonly promiseResponseMap: Map< - `${string}-${string}-${string}-${string}`, + `${string}-${string}-${string}-${string}-${string}`, ResponseWrapper > @@ -77,7 +77,7 @@ export class WorkerSet extends Worke } this.workerSet = new Set() this.promiseResponseMap = new Map< - `${string}-${string}-${string}-${string}`, + `${string}-${string}-${string}-${string}-${string}`, ResponseWrapper >() if (this.workerOptions.poolOptions?.enableEvents === true) { @@ -99,19 +99,17 @@ export class WorkerSet extends Worke event: WorkerMessageEvents.addWorkerElement, uuid: randomUUID(), } satisfies WorkerMessage - workerSetElement.worker.postMessage(message) this.promiseResponseMap.set(message.uuid, { reject, resolve, workerSetElement, }) + workerSetElement.worker.postMessage(message) }) const response = await sendMessageToWorker // Add element sequentially to optimize memory at startup - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (this.workerOptions.elementAddDelay! > 0) { - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - await sleep(randomizeDelay(this.workerOptions.elementAddDelay!)) + if (this.workerOptions.elementAddDelay != null && this.workerOptions.elementAddDelay > 0) { + await sleep(randomizeDelay(this.workerOptions.elementAddDelay)) } return response } @@ -124,8 +122,8 @@ export class WorkerSet extends Worke this.workerOptions.workerStartDelay! > 0 && // eslint-disable-next-line @typescript-eslint/no-non-null-assertion (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) - this.emitter?.emit(WorkerSetEvents.started, this.info) this.started = true + this.emitter?.emit(WorkerSetEvents.started, this.info) } /** @inheritDoc */ @@ -141,8 +139,20 @@ export class WorkerSet extends Worke await worker.terminate() await waitWorkerExit } - this.emitter?.emit(WorkerSetEvents.stopped, this.info) + for (const [uuid, responseWrapper] of this.promiseResponseMap) { + try { + responseWrapper.reject( + new Error(`WorkerSet stopped before responding request (uuid: ${uuid})`) + ) + } finally { + this.promiseResponseMap.delete(uuid) + } + } + if (this.workerSet.size > 0) { + this.workerSet.clear() + } this.started = false + this.emitter?.emit(WorkerSetEvents.stopped, this.info) this.emitter?.emitDestroy() } @@ -160,6 +170,7 @@ export class WorkerSet extends Worke worker.on('message', (message: WorkerMessage) => { const { data, event, uuid } = message if (this.promiseResponseMap.has(uuid)) { + let error: Error // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const { reject, resolve, workerSetElement } = this.promiseResponseMap.get(uuid)! switch (event) { @@ -173,22 +184,32 @@ export class WorkerSet extends Worke reject(data) break default: - reject( - new Error( - `Unknown worker message event: '${event}' received with data: '${JSON.stringify( - data, - undefined, - 2 - )}'` - ) + error = new Error( + `Unknown worker message event: '${event}' received with data: '${JSON.stringify( + data, + undefined, + 2 + )}'` ) + this.emitter?.emit(WorkerSetEvents.error, error) + reject(error) } this.promiseResponseMap.delete(uuid) + } else { + this.emitter?.emit(WorkerSetEvents.elementError, { + data, + event, + message: `Unknown worker message uuid: '${uuid}'`, + }) } }) worker.on('error', this.workerOptions.poolOptions?.errorHandler ?? EMPTY_FUNCTION) worker.once('error', error => { this.emitter?.emit(WorkerSetEvents.error, error) + const workerSetElement = this.getWorkerSetElementByWorker(worker) + if (workerSetElement != null) { + this.rejectPendingPromiseForWorker(workerSetElement, error) + } if ( this.workerOptions.poolOptions?.restartWorkerOnError === true && this.started && @@ -203,7 +224,11 @@ export class WorkerSet extends Worke worker.on('online', this.workerOptions.poolOptions?.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.workerOptions.poolOptions?.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { - this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)) + const workerSetElement = this.getWorkerSetElementByWorker(worker) + if (workerSetElement != null) { + this.rejectPendingPromiseForWorker(workerSetElement, new Error('Worker exited')) + } + this.removeWorkerSetElement(workerSetElement) }) const workerSetElement: WorkerSetElement = { numberOfWorkerElements: 0, @@ -226,23 +251,36 @@ export class WorkerSet extends Worke if (chosenWorkerSetElement == null) { chosenWorkerSetElement = this.addWorkerSetElement() // Add worker set element sequentially to optimize memory at startup - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - this.workerOptions.workerStartDelay! > 0 && - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - (await sleep(randomizeDelay(this.workerOptions.workerStartDelay!))) + if (this.workerOptions.workerStartDelay != null && this.workerOptions.workerStartDelay > 0) { + await sleep(randomizeDelay(this.workerOptions.workerStartDelay)) + } } return chosenWorkerSetElement } private getWorkerSetElementByWorker (worker: Worker): undefined | WorkerSetElement { - let workerSetElt: undefined | WorkerSetElement + let workerSetElementFound: undefined | WorkerSetElement for (const workerSetElement of this.workerSet) { if (workerSetElement.worker.threadId === worker.threadId) { - workerSetElt = workerSetElement + workerSetElementFound = workerSetElement break } } - return workerSetElt + return workerSetElementFound + } + + private rejectPendingPromiseForWorker (workerSetElement: WorkerSetElement, reason: unknown): void { + for (const [uuid, responseWrapper] of this.promiseResponseMap) { + if (responseWrapper.workerSetElement === workerSetElement) { + try { + responseWrapper.reject( + reason ?? new Error(`Worker failed before completing request (uuid: ${uuid})`) + ) + } finally { + this.promiseResponseMap.delete(uuid) + } + } + } } private removeWorkerSetElement (workerSetElement: undefined | WorkerSetElement): void { diff --git a/src/worker/WorkerTypes.ts b/src/worker/WorkerTypes.ts index c6b5c473..9a800870 100644 --- a/src/worker/WorkerTypes.ts +++ b/src/worker/WorkerTypes.ts @@ -52,7 +52,7 @@ export type WorkerEvents = PoolEvent | WorkerSetEvents export interface WorkerMessage { data: T event: WorkerMessageEvents - uuid: `${string}-${string}-${string}-${string}` + uuid: `${string}-${string}-${string}-${string}-${string}` } export interface WorkerOptions extends Record { -- 2.43.0