fix: add maximum tasks queue size to worker usage data
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 25 Jun 2023 17:52:19 +0000 (19:52 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 25 Jun 2023 17:52:19 +0000 (19:52 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/worker.ts
src/queue.ts
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/thread/fixed.test.js
tests/queue.test.js

index 413a1fc0a1246372796230c1f084d0795fa85855..6ebb30595d822c525ef98e226bf3239a1d4c475f 100644 (file)
@@ -263,12 +263,13 @@ export abstract class AbstractPool<
         0
       ),
       queuedTasks: this.workerNodes.reduce(
-        (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
+        (accumulator, workerNode) =>
+          accumulator + workerNode.workerUsage.tasks.queued,
         0
       ),
       maxQueuedTasks: this.workerNodes.reduce(
         (accumulator, workerNode) =>
-          accumulator + workerNode.tasksQueue.maxSize,
+          accumulator + workerNode.workerUsage.tasks.maxQueued,
         0
       ),
       failedTasks: this.workerNodes.reduce(
@@ -881,6 +882,10 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey].tasksQueue.size
   }
 
+  private tasksMaxQueueSize (workerNodeKey: number): number {
+    return this.workerNodes[workerNodeKey].tasksQueue.maxSize
+  }
+
   private flushTasksQueue (workerNodeKey: number): void {
     if (this.tasksQueueSize(workerNodeKey) > 0) {
       for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
@@ -890,6 +895,7 @@ export abstract class AbstractPool<
         )
       }
     }
+    this.workerNodes[workerNodeKey].tasksQueue.clear()
   }
 
   private flushTasksQueues (): void {
@@ -914,6 +920,9 @@ export abstract class AbstractPool<
     const getTasksQueueSize = (workerNodeKey?: number): number => {
       return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
     }
+    const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
+      return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
+    }
     return {
       tasks: {
         executed: 0,
@@ -921,6 +930,9 @@ export abstract class AbstractPool<
         get queued (): number {
           return getTasksQueueSize(workerNodeKey)
         },
+        get maxQueued (): number {
+          return getTasksMaxQueueSize(workerNodeKey)
+        },
         failed: 0
       },
       runTime: {
index 598e6be82ca22588c34f04d6a0c402359956b66d..4e6aea4eef596df89bf828cd24a9d00f335ed7f8 100644 (file)
@@ -108,6 +108,10 @@ export interface TaskStatistics {
    * Number of queued tasks.
    */
   readonly queued: number
+  /**
+   * Maximum number of queued tasks.
+   */
+  readonly maxQueued: number
   /**
    * Number of failed tasks.
    */
index 7aad6d065ce6338129c70032b266ce5c6bcba600..baecaa35b248d98a6cd4d378b444cfe2a4d316f2 100644 (file)
@@ -64,4 +64,14 @@ export class Queue<T> {
     }
     return this.items[this.offset]
   }
+
+  /**
+   * Clear the queue.
+   */
+  public clear (): void {
+    this.items = []
+    this.offset = 0
+    this.size = 0
+    this.maxSize = 0
+  }
 }
index a8cc145419c54826808a1faaf0a0d3e3a98b7c4b..c1e3a323997bd4623089cc4484a0680f4a659f6f 100644 (file)
@@ -455,6 +455,7 @@ describe('Abstract pool test suite', () => {
           executed: 0,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -519,6 +520,7 @@ describe('Abstract pool test suite', () => {
           executed: 0,
           executing: maxMultiplier,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -557,6 +559,7 @@ describe('Abstract pool test suite', () => {
           executed: maxMultiplier,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -609,6 +612,7 @@ describe('Abstract pool test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -651,6 +655,7 @@ describe('Abstract pool test suite', () => {
           executed: 0,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
index d0ddcf044cfb711e19aaa75997633c673f764948..aa90c8ab4194ca4d4a07d06fbde9233f62a73067 100644 (file)
@@ -101,6 +101,7 @@ describe('Fixed cluster pool test suite', () => {
       )
       expect(workerNode.workerUsage.tasks.executed).toBe(0)
       expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0)
     }
     expect(queuePool.info.executingTasks).toBe(numberOfWorkers)
     expect(queuePool.info.queuedTasks).toBe(
@@ -117,6 +118,7 @@ describe('Fixed cluster pool test suite', () => {
         maxMultiplier
       )
       expect(workerNode.workerUsage.tasks.queued).toBe(0)
+      expect(workerNode.workerUsage.tasks.maxQueued).toBe(1)
     }
   })
 
index 6091bf080f327a3218f53522fcfd1780ef543a70..dd5a398fc9a0b101b5a6fe27c3a1c38accaa94b9 100644 (file)
@@ -214,6 +214,7 @@ describe('Selection strategies test suite', () => {
           executed: maxMultiplier,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -274,6 +275,7 @@ describe('Selection strategies test suite', () => {
           executed: maxMultiplier,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -479,6 +481,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -538,6 +541,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -676,6 +680,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -739,6 +744,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -881,6 +887,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -942,6 +949,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1082,6 +1090,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1150,6 +1159,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1223,6 +1233,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1444,6 +1455,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1515,6 +1527,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1591,6 +1604,7 @@ describe('Selection strategies test suite', () => {
           executed: expect.any(Number),
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1824,6 +1838,7 @@ describe('Selection strategies test suite', () => {
           executed: maxMultiplier,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
@@ -1906,6 +1921,7 @@ describe('Selection strategies test suite', () => {
           executed: maxMultiplier,
           executing: 0,
           queued: 0,
+          maxQueued: 0,
           failed: 0
         },
         runTime: {
index f210d52016cac5115efff53991e465aebdd77621..8501ff7e04c808ba5dede16b920aec2cf041e316 100644 (file)
@@ -101,6 +101,7 @@ describe('Fixed thread pool test suite', () => {
       )
       expect(workerNode.workerUsage.tasks.executed).toBe(0)
       expect(workerNode.workerUsage.tasks.queued).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.tasks.maxQueued).toBeGreaterThan(0)
     }
     expect(queuePool.info.executingTasks).toBe(numberOfThreads)
     expect(queuePool.info.queuedTasks).toBe(
@@ -117,6 +118,7 @@ describe('Fixed thread pool test suite', () => {
         maxMultiplier
       )
       expect(workerNode.workerUsage.tasks.queued).toBe(0)
+      expect(workerNode.workerUsage.tasks.maxQueued).toBe(1)
     }
   })
 
index 3fd51dc8e4761df6c616424dea76dc7dbcd49d8c..9bb810d3a9d90464e7253384f8be0e25e4423913 100644 (file)
@@ -48,4 +48,18 @@ describe('Queue test suite', () => {
     expect(queue.maxSize).toBe(3)
     expect(queue.items).toStrictEqual([])
   })
+
+  it('Verify clear() behavior', () => {
+    const queue = new Queue()
+    queue.enqueue(1)
+    queue.enqueue(2)
+    queue.enqueue(3)
+    expect(queue.size).toBe(3)
+    expect(queue.maxSize).toBe(3)
+    queue.clear()
+    expect(queue.size).toBe(0)
+    expect(queue.maxSize).toBe(0)
+    expect(queue.items).toStrictEqual([])
+    expect(queue.offset).toBe(0)
+  })
 })