From 8604aaabd5aef29ed9b824a2d1dcdb53490ea8ad Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Thu, 8 Jun 2023 21:14:29 +0200 Subject: [PATCH] fix: fix 'queued' value in worker usage MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 136 ++++++++++++++++++------------------- src/pools/worker.ts | 2 +- 2 files changed, 66 insertions(+), 72 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index e941fabb..db0d5455 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,7 +21,13 @@ import { type TasksQueueOptions, type WorkerType } from './pool' -import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker' +import type { + IWorker, + Task, + TaskStatistics, + WorkerNode, + WorkerUsage +} from './worker' import { WorkerChoiceStrategies, type WorkerChoiceStrategy, @@ -309,30 +315,10 @@ export abstract class AbstractPool< this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } for (const workerNode of this.workerNodes) { - this.setWorkerNodeTasksUsage(workerNode, { - tasks: { - executed: 0, - executing: 0, - queued: - this.opts.enableTasksQueue === true - ? workerNode.tasksQueue.size - : 0, - failed: 0 - }, - runTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - elu: undefined - }) + this.setWorkerNodeTasksUsage( + workerNode, + this.getWorkerUsage(workerNode.worker) + ) this.setWorkerStatistics(workerNode.worker) } } @@ -479,10 +465,6 @@ export abstract class AbstractPool< */ protected beforeTaskExecutionHook (workerNodeKey: number): void { ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing - if (this.opts.enableTasksQueue === true) { - this.workerNodes[workerNodeKey].workerUsage.tasks.queued = - this.tasksQueueSize(workerNodeKey) - } } /** @@ -759,52 +741,31 @@ export abstract class AbstractPool< private pushWorkerNode (worker: Worker): number { return this.workerNodes.push({ worker, - workerUsage: { - tasks: { - executed: 0, - executing: 0, - queued: 0, - failed: 0 - }, - runTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - - waitTime: { - aggregation: 0, - average: 0, - median: 0, - history: new CircularArray() - }, - elu: undefined - }, + workerUsage: this.getWorkerUsage(worker), tasksQueue: new Queue>() }) } - /** - * Sets the given worker in the pool worker nodes. - * - * @param workerNodeKey - The worker node key. - * @param worker - The worker. - * @param workerUsage - The worker usage. - * @param tasksQueue - The worker task queue. - */ - private setWorkerNode ( - workerNodeKey: number, - worker: Worker, - workerUsage: WorkerUsage, - tasksQueue: Queue> - ): void { - this.workerNodes[workerNodeKey] = { - worker, - workerUsage, - tasksQueue - } - } + // /** + // * Sets the given worker in the pool worker nodes. + // * + // * @param workerNodeKey - The worker node key. + // * @param worker - The worker. + // * @param workerUsage - The worker usage. + // * @param tasksQueue - The worker task queue. + // */ + // private setWorkerNode ( + // workerNodeKey: number, + // worker: Worker, + // workerUsage: WorkerUsage, + // tasksQueue: Queue> + // ): void { + // this.workerNodes[workerNodeKey] = { + // worker, + // workerUsage, + // tasksQueue + // } + // } /** * Removes the given worker from the pool worker nodes. @@ -867,4 +828,37 @@ export abstract class AbstractPool< } }) } + + private getWorkerUsage (worker: Worker): WorkerUsage { + return { + tasks: this.getTaskStatistics(this, worker), + runTime: { + aggregation: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + waitTime: { + aggregation: 0, + average: 0, + median: 0, + history: new CircularArray() + }, + elu: undefined + } + } + + private getTaskStatistics ( + self: AbstractPool, + worker: Worker + ): TaskStatistics { + return { + executed: 0, + executing: 0, + get queued (): number { + return self.tasksQueueSize(self.getWorkerNodeKey(worker)) + }, + failed: 0 + } + } } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index ba9393ab..b6dddd1d 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -97,7 +97,7 @@ export interface TaskStatistics { /** * Number of tasks queued. */ - queued: number + readonly queued: number /** * Number of tasks failed. */ -- 2.34.1