X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fabstract-pool.ts;h=3c690fb819e9c718163e86badcde28786a5dd849;hb=230fcf18603f74badb3446048ed9feaf24afa54a;hp=e941fabb772690f0797c204226430f113ea97d43;hpb=87de9ff55a7ad494b9e9500208b9b7319c094ea6;p=poolifier.git diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e941fabb..3c690fb8 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,7 +21,13 @@ import { type TasksQueueOptions, type WorkerType } from './pool' -import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker' +import type { + IWorker, + Task, + TaskStatistics, + WorkerNode, + WorkerUsage +} from './worker' import { WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -309,30 +315,10 @@ export abstract class AbstractPool< this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } for (const workerNode of this.workerNodes) { - this.setWorkerNodeTasksUsage(workerNode, { - tasks: { - executed: 0, - executing: 0, - queued: - this.opts.enableTasksQueue === true - ? workerNode.tasksQueue.size - : 0, - failed: 0 - }, - runTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - elu: undefined - }) + this.setWorkerNodeTasksUsage( + workerNode, + this.getWorkerUsage(workerNode.worker) + ) this.setWorkerStatistics(workerNode.worker) } } @@ -476,13 +462,15 @@ export abstract class AbstractPool< * Can be overridden. * * @param workerNodeKey - The worker node key. + * @param task - The task to execute. */ - protected beforeTaskExecutionHook (workerNodeKey: number): void { - ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing - if (this.opts.enableTasksQueue === true) { - this.workerNodes[workerNodeKey].workerUsage.tasks.queued = - this.tasksQueueSize(workerNodeKey) - } + protected beforeTaskExecutionHook ( + workerNodeKey: number, + task: Task + ): void { + const workerUsage = this.workerNodes[workerNodeKey].workerUsage + ++workerUsage.tasks.executing + this.updateWaitTimeWorkerUsage(workerUsage, task) } /** @@ -504,9 +492,7 @@ export abstract class AbstractPool< if (message.taskError != null) { ++workerTaskStatistics.failed } - this.updateRunTimeWorkerUsage(workerUsage, message) - this.updateWaitTimeWorkerUsage(workerUsage, message) this.updateEluWorkerUsage(workerUsage, message) } @@ -516,19 +502,20 @@ export abstract class AbstractPool< ): void { if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .aggregate ) { - workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0 + workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0 if ( - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .avgRunTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .average && workerUsage.tasks.executed !== 0 ) { workerUsage.runTime.average = - workerUsage.runTime.aggregation / workerUsage.tasks.executed + workerUsage.runTime.aggregate / workerUsage.tasks.executed } if ( - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .medRunTime && + this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime + .median && message.taskPerformance?.runTime != null ) { workerUsage.runTime.history.push(message.taskPerformance.runTime) @@ -539,26 +526,29 @@ export abstract class AbstractPool< private updateWaitTimeWorkerUsage ( workerUsage: WorkerUsage, - message: MessageValue + task: Task ): void { + const timestamp = performance.now() + const taskWaitTime = timestamp - (task.timestamp ?? timestamp) if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime + .aggregate ) { - workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0 + workerUsage.waitTime.aggregate += taskWaitTime ?? 0 if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .avgWaitTime && + .waitTime.average && workerUsage.tasks.executed !== 0 ) { workerUsage.waitTime.average = - workerUsage.waitTime.aggregation / workerUsage.tasks.executed + workerUsage.waitTime.aggregate / workerUsage.tasks.executed } if ( this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .medWaitTime && - message.taskPerformance?.waitTime != null + .waitTime.median && + taskWaitTime != null ) { - workerUsage.waitTime.history.push(message.taskPerformance.waitTime) + workerUsage.waitTime.history.push(taskWaitTime) workerUsage.waitTime.median = median(workerUsage.waitTime.history) } } @@ -759,52 +749,31 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { return this.workerNodes.push({ worker, - workerUsage: { - tasks: { - executed: 0, - executing: 0, - queued: 0, - failed: 0 - }, - runTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - - waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - elu: undefined - }, + workerUsage: this.getWorkerUsage(worker), tasksQueue: new Queue>() }) } - /** - * Sets the given worker in the pool worker nodes. - * - * @param workerNodeKey - The worker node key. - * @param worker - The worker. - * @param workerUsage - The worker usage. - * @param tasksQueue - The worker task queue. - */ - private setWorkerNode ( - workerNodeKey: number, - worker: Worker, - workerUsage: WorkerUsage, - tasksQueue: Queue> - ): void { - this.workerNodes[workerNodeKey] = { - worker, - workerUsage, - tasksQueue - } - } + // /** + // * Sets the given worker in the pool worker nodes. + // * + // * @param workerNodeKey - The worker node key. + // * @param worker - The worker. + // * @param workerUsage - The worker usage. + // * @param tasksQueue - The worker task queue. + // */ + // private setWorkerNode ( + // workerNodeKey: number, + // worker: Worker, + // workerUsage: WorkerUsage, + // tasksQueue: Queue> + // ): void { + // this.workerNodes[workerNodeKey] = { + // worker, + // workerUsage, + // tasksQueue + // } + // } /** * Removes the given worker from the pool worker nodes. @@ -820,7 +789,7 @@ export abstract class AbstractPool< } private executeTask (workerNodeKey: number, task: Task): void { - this.beforeTaskExecutionHook(workerNodeKey) + this.beforeTaskExecutionHook(workerNodeKey, task) this.sendToWorker(this.workerNodes[workerNodeKey].worker, task) } @@ -858,13 +827,42 @@ export abstract class AbstractPool< statistics: { runTime: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .runTime, - waitTime: - this.workerChoiceStrategyContext.getTaskStatisticsRequirements() - .waitTime, + .runTime.aggregate, elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements() .elu } }) } + + private getWorkerUsage (worker: Worker): WorkerUsage { + return { + tasks: this.getTaskStatistics(worker), + runTime: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + waitTime: { + aggregate: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + elu: undefined + } + } + + private getTaskStatistics (worker: Worker): TaskStatistics { + const queueSize = + this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size + return { + executed: 0, + executing: 0, + get queued (): number { + return queueSize ?? 0 + }, + failed: 0 + } + } }