X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=ce885ddee31abfec7f77916f458b5c4f741482e2;hb=f20f344f672374fbb1d8885f74516d30ff201f7f;hp=b9a7cd398920fc77a754d3d4047414beae94ba46;hpb=3fa4cdd2951436bbb60a46b9d0ac151a4aa45ce1;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index b9a7cd39..ce885dde 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -263,6 +263,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }) } @@ -340,18 +344,20 @@ export abstract class AbstractPool< /** @inheritDoc */ public async execute (data?: Data, name?: string): Promise { - const [workerNodeKey, workerNode] = this.chooseWorkerNode() + const submissionTimestamp = performance.now() + const workerNodeKey = this.chooseWorkerNode() const submittedTask: Task = { name, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + submissionTimestamp, id: crypto.randomUUID() } const res = new Promise((resolve, reject) => { this.promiseResponseMap.set(submittedTask.id as string, { resolve, reject, - worker: workerNode.worker + worker: this.workerNodes[workerNodeKey].worker }) }) if ( @@ -448,16 +454,33 @@ export abstract class AbstractPool< workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory) } } + if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) { + workerTasksUsage.waitTime += message.waitTime ?? 0 + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime && + workerTasksUsage.run !== 0 + ) { + workerTasksUsage.avgWaitTime = + workerTasksUsage.waitTime / workerTasksUsage.run + } + if ( + this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime && + message.waitTime != null + ) { + workerTasksUsage.waitTimeHistory.push(message.waitTime) + workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory) + } + } } /** * Chooses a worker node for the next task. * - * The default uses a round robin algorithm to distribute the load. + * The default worker choice strategy uses a round robin algorithm to distribute the load. * - * @returns [worker node key, worker node]. + * @returns The worker node key */ - protected chooseWorkerNode (): [number, WorkerNode] { + protected chooseWorkerNode (): number { let workerNodeKey: number if (this.type === PoolType.DYNAMIC && !this.full && this.internalBusy()) { const workerCreated = this.createAndSetupWorker() @@ -477,7 +500,7 @@ export abstract class AbstractPool< } else { workerNodeKey = this.workerChoiceStrategyContext.execute() } - return [workerNodeKey, this.workerNodes[workerNodeKey]] + return workerNodeKey } /** @@ -611,6 +634,10 @@ export abstract class AbstractPool< runTimeHistory: new CircularArray(), avgRunTime: 0, medRunTime: 0, + waitTime: 0, + waitTimeHistory: new CircularArray(), + avgWaitTime: 0, + medWaitTime: 0, error: 0 }, tasksQueue: new Queue>() @@ -650,7 +677,7 @@ export abstract class AbstractPool< } private executeTask (workerNodeKey: number, task: Task): void { - this.beforeTaskExecutionHook(workerNodeKey) + this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) }