From df593701c4bd494b0e99372fdcc3708412799942 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 25 Jun 2023 19:52:19 +0200 Subject: [PATCH] fix: add maximum tasks queue size to worker usage data MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/pools/abstract-pool.ts | 16 ++++++++++++++-- src/pools/worker.ts | 4 ++++ src/queue.ts | 10 ++++++++++ tests/pools/abstract/abstract-pool.test.js | 5 +++++ tests/pools/cluster/fixed.test.js | 2 ++ .../selection-strategies.test.js | 16 ++++++++++++++++ tests/pools/thread/fixed.test.js | 2 ++ tests/queue.test.js | 14 ++++++++++++++ 8 files changed, 67 insertions(+), 2 deletions(-) diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 413a1fc0..6ebb3059 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -263,12 +263,13 @@ export abstract class AbstractPool< 0 ), queuedTasks: this.workerNodes.reduce( - (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size, + (accumulator, workerNode) => + accumulator + workerNode.workerUsage.tasks.queued, 0 ), maxQueuedTasks: this.workerNodes.reduce( (accumulator, workerNode) => - accumulator + workerNode.tasksQueue.maxSize, + accumulator + workerNode.workerUsage.tasks.maxQueued, 0 ), failedTasks: this.workerNodes.reduce( @@ -881,6 +882,10 @@ export abstract class AbstractPool< return this.workerNodes[workerNodeKey].tasksQueue.size } + private tasksMaxQueueSize (workerNodeKey: number): number { + return this.workerNodes[workerNodeKey].tasksQueue.maxSize + } + private flushTasksQueue (workerNodeKey: number): void { if (this.tasksQueueSize(workerNodeKey) > 0) { for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) { @@ -890,6 +895,7 @@ export abstract class AbstractPool< ) } } + this.workerNodes[workerNodeKey].tasksQueue.clear() } private flushTasksQueues (): void { @@ -914,6 +920,9 @@ export abstract class AbstractPool< const getTasksQueueSize = (workerNodeKey?: number): number => { return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0 } + const getTasksMaxQueueSize = (workerNodeKey?: number): number => { + return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0 + } return { tasks: { executed: 0, @@ -921,6 +930,9 @@ export abstract class AbstractPool< get queued (): number { return getTasksQueueSize(workerNodeKey) }, + get maxQueued (): number { + return getTasksMaxQueueSize(workerNodeKey) + }, failed: 0 }, runTime: { diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 598e6be8..4e6aea4e 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -108,6 +108,10 @@ export interface TaskStatistics { * Number of queued tasks. */ readonly queued: number + /** + * Maximum number of queued tasks. + */ + readonly maxQueued: number /** * Number of failed tasks. */ diff --git a/src/queue.ts b/src/queue.ts index 7aad6d06..baecaa35 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -64,4 +64,14 @@ export class Queue { } return this.items[this.offset] } + + /** + * Clear the queue. + */ + public clear (): void { + this.items = [] + this.offset = 0 + this.size = 0 + this.maxSize = 0 + } } diff --git a/tests/pools/abstract/abstract-pool.test.js b/tests/pools/abstract/abstract-pool.test.js index a8cc1454..c1e3a323 100644 --- a/tests/pools/abstract/abstract-pool.test.js +++ b/tests/pools/abstract/abstract-pool.test.js @@ -455,6 +455,7 @@ describe('Abstract pool test suite', () => { executed: 0, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -519,6 +520,7 @@ describe('Abstract pool test suite', () => { executed: 0, executing: maxMultiplier, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -557,6 +559,7 @@ describe('Abstract pool test suite', () => { executed: maxMultiplier, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -609,6 +612,7 @@ describe('Abstract pool test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -651,6 +655,7 @@ describe('Abstract pool test suite', () => { executed: 0, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { diff --git a/tests/pools/cluster/fixed.test.js b/tests/pools/cluster/fixed.test.js index d0ddcf04..aa90c8ab 100644 --- a/tests/pools/cluster/fixed.test.js +++ b/tests/pools/cluster/fixed.test.js @@ -101,6 +101,7 @@ describe('Fixed cluster pool test suite', () => { ) expect(workerNode.workerUsage.tasks.executed).toBe(0) expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0) } expect(queuePool.info.executingTasks).toBe(numberOfWorkers) expect(queuePool.info.queuedTasks).toBe( @@ -117,6 +118,7 @@ describe('Fixed cluster pool test suite', () => { maxMultiplier ) expect(workerNode.workerUsage.tasks.queued).toBe(0) + expect(workerNode.workerUsage.tasks.maxQueued).toBe(1) } }) diff --git a/tests/pools/selection-strategies/selection-strategies.test.js b/tests/pools/selection-strategies/selection-strategies.test.js index 6091bf08..dd5a398f 100644 --- a/tests/pools/selection-strategies/selection-strategies.test.js +++ b/tests/pools/selection-strategies/selection-strategies.test.js @@ -214,6 +214,7 @@ describe('Selection strategies test suite', () => { executed: maxMultiplier, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -274,6 +275,7 @@ describe('Selection strategies test suite', () => { executed: maxMultiplier, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -479,6 +481,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -538,6 +541,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -676,6 +680,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -739,6 +744,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -881,6 +887,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -942,6 +949,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1082,6 +1090,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1150,6 +1159,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1223,6 +1233,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1444,6 +1455,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1515,6 +1527,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1591,6 +1604,7 @@ describe('Selection strategies test suite', () => { executed: expect.any(Number), executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1824,6 +1838,7 @@ describe('Selection strategies test suite', () => { executed: maxMultiplier, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { @@ -1906,6 +1921,7 @@ describe('Selection strategies test suite', () => { executed: maxMultiplier, executing: 0, queued: 0, + maxQueued: 0, failed: 0 }, runTime: { diff --git a/tests/pools/thread/fixed.test.js b/tests/pools/thread/fixed.test.js index f210d520..8501ff7e 100644 --- a/tests/pools/thread/fixed.test.js +++ b/tests/pools/thread/fixed.test.js @@ -101,6 +101,7 @@ describe('Fixed thread pool test suite', () => { ) expect(workerNode.workerUsage.tasks.executed).toBe(0) expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0) + expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0) } expect(queuePool.info.executingTasks).toBe(numberOfThreads) expect(queuePool.info.queuedTasks).toBe( @@ -117,6 +118,7 @@ describe('Fixed thread pool test suite', () => { maxMultiplier ) expect(workerNode.workerUsage.tasks.queued).toBe(0) + expect(workerNode.workerUsage.tasks.maxQueued).toBe(1) } }) diff --git a/tests/queue.test.js b/tests/queue.test.js index 3fd51dc8..9bb810d3 100644 --- a/tests/queue.test.js +++ b/tests/queue.test.js @@ -48,4 +48,18 @@ describe('Queue test suite', () => { expect(queue.maxSize).toBe(3) expect(queue.items).toStrictEqual([]) }) + + it('Verify clear() behavior', () => { + const queue = new Queue() + queue.enqueue(1) + queue.enqueue(2) + queue.enqueue(3) + expect(queue.size).toBe(3) + expect(queue.maxSize).toBe(3) + queue.clear() + expect(queue.size).toBe(0) + expect(queue.maxSize).toBe(0) + expect(queue.items).toStrictEqual([]) + expect(queue.offset).toBe(0) + }) }) -- 2.34.1