From 9c16fb4b40484a75e534e055d838f22ac0343009 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 25 Jun 2023 18:29:36 +0200 Subject: [PATCH] fix: fix tasks queued count computation MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + src/pools/abstract-pool.ts | 49 ++++++++++------------ tests/pools/abstract/abstract-pool.test.js | 1 + tests/pools/cluster/fixed.test.js | 4 +- tests/pools/thread/fixed.test.js | 4 +- 5 files changed, 29 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23bb6216..3b544676 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Ensure cluster pool destroy() gracefully shutdowns worker's server. - Ensure pool event is emitted before task error promise rejection. +- Fix queued tasks count computation. ## [2.6.3] - 2023-06-19 diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 058b43b8..07907be9 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -21,13 +21,7 @@ import { type TasksQueueOptions, type WorkerType } from './pool' -import type { - IWorker, - Task, - TaskStatistics, - WorkerNode, - WorkerUsage -} from './worker' +import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker' import { Measurements, WorkerChoiceStrategies, @@ -332,10 +326,10 @@ export abstract class AbstractPool< if (workerChoiceStrategyOptions != null) { this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions) } - for (const workerNode of this.workerNodes) { + for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) { this.setWorkerNodeTasksUsage( workerNode, - this.getWorkerUsage(workerNode.worker) + this.getWorkerUsage(workerNodeKey) ) this.setWorkerStatistics(workerNode.worker) } @@ -823,11 +817,17 @@ export abstract class AbstractPool< * @returns The worker nodes length. */ private pushWorkerNode (worker: Worker): number { - return this.workerNodes.push({ + this.workerNodes.push({ worker, - workerUsage: this.getWorkerUsage(worker), + workerUsage: this.getWorkerUsage(), tasksQueue: new Queue>() }) + const workerNodeKey = this.getWorkerNodeKey(worker) + this.setWorkerNodeTasksUsage( + this.workerNodes[workerNodeKey], + this.getWorkerUsage(workerNodeKey) + ) + return this.workerNodes.length } // /** @@ -910,9 +910,19 @@ export abstract class AbstractPool< }) } - private getWorkerUsage (worker: Worker): WorkerUsage { + private getWorkerUsage (workerNodeKey?: number): WorkerUsage { + const getQueueSize = (workerNodeKey: number): number => { + return this.tasksQueueSize(workerNodeKey) + } return { - tasks: this.getTaskStatistics(worker), + tasks: { + executed: 0, + executing: 0, + get queued (): number { + return workerNodeKey == null ? 0 : getQueueSize(workerNodeKey) + }, + failed: 0 + }, runTime: { aggregate: 0, average: 0, @@ -942,17 +952,4 @@ export abstract class AbstractPool< } } } - - private getTaskStatistics (worker: Worker): TaskStatistics { - const queueSize = - this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size - return { - executed: 0, - executing: 0, - get queued (): number { - return queueSize ?? 0 - }, - failed: 0 - } - } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index b6219ce9..a8cc1454 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -498,6 +498,7 @@ describe('Abstract pool test suite', () => { expect(workerNode.tasksQueue).toBeDefined() expect(workerNode.tasksQueue).toBeInstanceOf(Queue) expect(workerNode.tasksQueue.size).toBe(0) + expect(workerNode.tasksQueue.maxSize).toBe(0) } await pool.destroy() }) diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index cf0be68d..d0ddcf04 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -100,7 +100,7 @@ describe('Fixed cluster pool test suite', () => { queuePool.opts.tasksQueueOptions.concurrency ) expect(workerNode.workerUsage.tasks.executed).toBe(0) - expect(workerNode.tasksQueue.size).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0) } expect(queuePool.info.executingTasks).toBe(numberOfWorkers) expect(queuePool.info.queuedTasks).toBe( @@ -116,7 +116,7 @@ describe('Fixed cluster pool test suite', () => { expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( maxMultiplier ) - expect(workerNode.tasksQueue.size).toBe(0) + expect(workerNode.workerUsage.tasks.queued).toBe(0) } }) diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index 39ddc67d..f210d520 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -100,7 +100,7 @@ describe('Fixed thread pool test suite', () => { queuePool.opts.tasksQueueOptions.concurrency ) expect(workerNode.workerUsage.tasks.executed).toBe(0) - expect(workerNode.tasksQueue.size).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0) } expect(queuePool.info.executingTasks).toBe(numberOfThreads) expect(queuePool.info.queuedTasks).toBe( @@ -116,7 +116,7 @@ describe('Fixed thread pool test suite', () => { expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual( maxMultiplier ) - expect(workerNode.tasksQueue.size).toBe(0) + expect(workerNode.workerUsage.tasks.queued).toBe(0) } }) -- 2.34.1