X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=a558ef504568ad95abcf91d8f413968a2f07bf8f;hb=70353024c44987c467dadcec306c699ce4ae1f06;hp=81012fa0b4f736559fca73eb395bf7c622bce48a;hpb=09a6305fb250c17cb2565f8cbe3d9afbb33f307c;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 81012fa0..a558ef50 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -149,6 +149,7 @@ export abstract class AbstractPool< this.checkValidWorkerChoiceStrategyOptions( this.opts.workerChoiceStrategyOptions ) + this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true this.opts.enableEvents = opts.enableEvents ?? true this.opts.enableTasksQueue = opts.enableTasksQueue ?? false if (this.opts.enableTasksQueue) { @@ -415,10 +416,7 @@ export abstract class AbstractPool< * * @param workerNodeKey - The worker node key. */ - protected beforeTaskExecutionHook ( - workerNodeKey: number, - task: Task - ): void { + protected beforeTaskExecutionHook (workerNodeKey: number): void { ++this.workerNodes[workerNodeKey].tasksUsage.running } @@ -433,13 +431,21 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerNodeKey = this.getWorkerNodeKey(worker) - const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage + const workerTasksUsage = + this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage --workerTasksUsage.running ++workerTasksUsage.run if (message.error != null) { ++workerTasksUsage.error } + this.updateRunTimeTasksUsage(workerTasksUsage, message) + this.updateWaitTimeTasksUsage(workerTasksUsage, message) + } + + private updateRunTimeTasksUsage ( + workerTasksUsage: TasksUsage, + message: MessageValue + ): void { if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) { workerTasksUsage.runTime += message.runTime ?? 0 if ( @@ -457,6 +463,12 @@ export abstract class AbstractPool< workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } + } + + private updateWaitTimeTasksUsage ( + workerTasksUsage: TasksUsage, + message: MessageValue + ): void { if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { workerTasksUsage.waitTime += message.waitTime ?? 0 if ( @@ -551,6 +563,16 @@ export abstract class AbstractPool< worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION) worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION) + worker.on('error', error => { + if (this.emitter != null) { + this.emitter.emit(PoolEvents.error, error) + } + }) + if (this.opts.restartWorkerOnError === true) { + worker.on('error', () => { + this.createAndSetupWorker() + }) + } worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION) worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION) worker.once('exit', () => { @@ -598,7 +620,7 @@ export abstract class AbstractPool< } private checkAndEmitEvents (): void { - if (this.opts.enableEvents === true) { + if (this.emitter != null) { if (this.busy) { this.emitter?.emit(PoolEvents.busy) } @@ -675,12 +697,14 @@ export abstract class AbstractPool< */ private removeWorkerNode (worker: Worker): void { const workerNodeKey = this.getWorkerNodeKey(worker) - this.workerNodes.splice(workerNodeKey, 1) - this.workerChoiceStrategyContext.remove(workerNodeKey) + if (workerNodeKey !== -1) { + this.workerNodes.splice(workerNodeKey, 1) + this.workerChoiceStrategyContext.remove(workerNodeKey) + } } private executeTask (workerNodeKey: number, task: Task): void { - this.beforeTaskExecutionHook(workerNodeKey, task) + this.beforeTaskExecutionHook(workerNodeKey) this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) }