X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fabstract-pool.ts;h=f32aab0dc838409e2514bf0377c4ba9276180579;hb=71002f09cd4b8ac7b8c6d5704222eb045acd1d7a;hp=4c82ef08a8cdad70602ebf20a64235b1cb3833f0;hpb=542b9b8db671323753d60c910e044ccf7bf93fd5;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 4c82ef08..f32aab0d 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -263,6 +263,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }) } @@ -340,11 +344,13 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { + const submissionTimestamp = performance.now() const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + submissionTimestamp, id: crypto.randomUUID() } const res = new Promise((resolve, reject) => { @@ -424,13 +430,21 @@ export abstract class AbstractPool< worker: Worker, message: MessageValue ): void { - const workerNodeKey = this.getWorkerNodeKey(worker) - const workerTasksUsage = this.workerNodes[workerNodeKey].tasksUsage + const workerTasksUsage = + this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage --workerTasksUsage.running ++workerTasksUsage.run if (message.error != null) { ++workerTasksUsage.error } + this.updateRunTimeTasksUsage(workerTasksUsage, message) + this.updateWaitTimeTasksUsage(workerTasksUsage, message) + } + + private updateRunTimeTasksUsage ( + workerTasksUsage: TasksUsage, + message: MessageValue + ): void { if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) { workerTasksUsage.runTime += message.runTime ?? 0 if ( @@ -450,6 +464,29 @@ export abstract class AbstractPool< } } + private updateWaitTimeTasksUsage ( + workerTasksUsage: TasksUsage, + message: MessageValue + ): void { + if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { + workerTasksUsage.waitTime += message.waitTime ?? 0 + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime && + workerTasksUsage.run !== 0 + ) { + workerTasksUsage.avgWaitTime = + workerTasksUsage.waitTime / workerTasksUsage.run + } + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime && + message.waitTime != null + ) { + workerTasksUsage.waitTimeHistory.push(message.waitTime) + workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory) + } + } + } + /** * Chooses a worker node for the next task. * @@ -611,6 +648,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }, tasksQueue: new Queue>()