X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fworker%2FWorkerSet.ts;h=d066ce9e7ffb818d6e4190c4ce158a51fdaa9da5;hb=fac8866f9d09e5621d41337fb5f53adba1cfc0e7;hp=b76af5df2d5a2c274de00db35c3d593ee628d846;hpb=361c98f57255e5b91d123d5f2ba43ab533134b1a;p=e-mobility-charging-stations-simulator.git diff --git a/src/worker/WorkerSet.ts b/src/worker/WorkerSet.ts index b76af5df..d066ce9e 100644 --- a/src/worker/WorkerSet.ts +++ b/src/worker/WorkerSet.ts @@ -1,6 +1,6 @@ // Partial Copyright Jerome Benoit. 2021-2023. All Rights Reserved. -import EventEmitterAsyncResource from 'node:events'; +import { EventEmitter } from 'node:events'; import { SHARE_ENV, Worker } from 'node:worker_threads'; import { WorkerAbstract } from './WorkerAbstract'; @@ -16,7 +16,7 @@ import { import { sleep } from './WorkerUtils'; export class WorkerSet extends WorkerAbstract { - public readonly emitter: EventEmitterAsyncResource; + public readonly emitter: EventEmitter; private readonly workerSet: Set; /** @@ -36,18 +36,19 @@ export class WorkerSet extends WorkerAbstract { }; this.workerSet = new Set(); if (this.workerOptions?.poolOptions?.enableEvents) { - this.emitter = new EventEmitterAsyncResource(); + this.emitter = new EventEmitter(); } } get info(): SetInfo { return { + version: WorkerConstants.version, type: 'set', worker: 'thread', size: this.size, elementsExecuting: [...this.workerSet].reduce( (accumulator, workerSetElement) => accumulator + workerSetElement.numberOfWorkerElements, - 0 + 0, ), elementsPerWorker: this.maxElementsPerWorker, }; @@ -71,7 +72,13 @@ export class WorkerSet extends WorkerAbstract { /** @inheritDoc */ public async stop(): Promise { for (const workerSetElement of this.workerSet) { + const workerExitPromise = new Promise((resolve) => { + workerSetElement.worker.on('exit', () => { + resolve(); + }); + }); await workerSetElement.worker.terminate(); + await workerExitPromise; } this.workerSet.clear(); } @@ -103,34 +110,38 @@ export class WorkerSet extends WorkerAbstract { }); worker.on( 'message', - this.workerOptions?.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION + this.workerOptions?.poolOptions?.messageHandler ?? WorkerConstants.EMPTY_FUNCTION, ); worker.on( 'error', - this.workerOptions?.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION + this.workerOptions?.poolOptions?.errorHandler ?? WorkerConstants.EMPTY_FUNCTION, ); worker.on('error', (error) => { - if (this.emitter !== undefined) { - this.emitter.emit(WorkerSetEvents.error, error); - } + this.emitter?.emit(WorkerSetEvents.error, error); if (this.workerOptions?.poolOptions?.restartWorkerOnError) { this.addWorkerSetElement(); } }); worker.on( 'online', - this.workerOptions?.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION + this.workerOptions?.poolOptions?.onlineHandler ?? WorkerConstants.EMPTY_FUNCTION, ); worker.on( 'exit', - this.workerOptions?.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION + this.workerOptions?.poolOptions?.exitHandler ?? WorkerConstants.EMPTY_FUNCTION, + ); + worker.once('exit', () => + this.removeWorkerSetElement(this.getWorkerSetElementByWorker(worker)), ); - worker.once('exit', () => this.workerSet.delete(this.getWorkerSetElementByWorker(worker))); const workerSetElement: WorkerSetElement = { worker, numberOfWorkerElements: 0 }; this.workerSet.add(workerSetElement); return workerSetElement; } + private removeWorkerSetElement(workerSetElement: WorkerSetElement): void { + this.workerSet.delete(workerSetElement); + } + private async getWorkerSetElement(): Promise { let chosenWorkerSetElement: WorkerSetElement; for (const workerSetElement of this.workerSet) {