test: add tasks queuing tests
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 14 Apr 2023 15:13:51 +0000 (17:13 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Fri, 14 Apr 2023 15:13:51 +0000 (17:13 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
tests/pools/abstract/abstract-pool.test.js
tests/pools/cluster/fixed.test.js
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/selection-strategies/worker-choice-strategy-context.test.js
tests/pools/thread/fixed.test.js

index 6118897584d8b61c2693e19d8b8363462f266ef8..1a5a75a08df6ab03d84d7fa0aff06f617f59a797 100644 (file)
@@ -263,7 +263,7 @@ describe('Abstract pool test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
-    // The `full` event is triggered when the number of submitted tasks at once reach the number of dynamic pool workers.
+    // The `full` event is triggered when the number of submitted tasks at once reach the max number of workers in the dynamic pool.
     // So in total numberOfWorkers + 1 times for a loop submitting up to numberOfWorkers * 2 tasks to the dynamic pool.
     expect(poolFull).toBe(numberOfWorkers + 1)
     await pool.destroy()
index b9c7d676d0404c7787e2fd8a00560e4167b88290..ddcca10568fe0ee5a82ae18c659a9d23ece43561 100644 (file)
@@ -12,6 +12,17 @@ describe('Fixed cluster pool test suite', () => {
       errorHandler: e => console.error(e)
     }
   )
+  const queuePool = new FixedClusterPool(
+    numberOfWorkers,
+    './tests/worker-files/cluster/testWorker.js',
+    {
+      enableTasksQueue: true,
+      tasksQueueOptions: {
+        concurrency: 2
+      },
+      errorHandler: e => console.error(e)
+    }
+  )
   const emptyPool = new FixedClusterPool(
     numberOfWorkers,
     './tests/worker-files/cluster/emptyWorker.js',
@@ -47,6 +58,7 @@ describe('Fixed cluster pool test suite', () => {
     await errorPool.destroy()
     await asyncErrorPool.destroy()
     await emptyPool.destroy()
+    await queuePool.destroy()
   })
 
   it('Verify that the function is executed in a worker cluster', async () => {
@@ -76,6 +88,31 @@ describe('Fixed cluster pool test suite', () => {
     expect(poolBusy).toBe(numberOfWorkers + 1)
   })
 
+  it('Verify that tasks queuing is working', async () => {
+    const maxMultiplier = 10
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      queuePool.execute()
+    }
+    for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
+        queuePool.opts.tasksQueueOptions.concurrency
+      )
+      expect(workerNode.tasksUsage.run).toBe(0)
+      expect(workerNode.tasksQueue.length).toBeGreaterThan(0)
+    }
+    // FIXME: wait for ongoing tasks to be executed
+    const promises = []
+    for (let i = 0; i < numberOfWorkers * maxMultiplier; i++) {
+      promises.push(queuePool.execute())
+    }
+    await Promise.all(promises)
+    for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.tasksUsage.running).toBe(0)
+      expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
+      expect(workerNode.tasksQueue.length).toBe(0)
+    }
+  })
+
   it('Verify that is possible to have a worker that return undefined', async () => {
     const result = await emptyPool.execute()
     expect(result).toBeUndefined()
index d4d2ec1b233c3cc0eda0c78621733cc9441478d9..3292ce5686a5f6a014250ec3373510940d196cf3 100644 (file)
@@ -33,49 +33,103 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify ROUND_ROBIN strategy is taken at pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
-    )
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    expect(
-      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).nextWorkerNodeId
-    ).toBe(0)
-    // We need to clean up the resources after our test
-    await pool.destroy()
+  it('Verify available strategies are taken at pool creation', async () => {
+    for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+      const pool = new FixedThreadPool(
+        max,
+        './tests/worker-files/thread/testWorker.js',
+        { workerChoiceStrategy }
+      )
+      expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
+      expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+        workerChoiceStrategy
+      )
+      await pool.destroy()
+    }
   })
 
-  it('Verify ROUND_ROBIN strategy can be set after pool creation', async () => {
-    const pool = new DynamicThreadPool(
-      min,
+  it('Verify available strategies can be set after pool creation', async () => {
+    for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+      const pool = new DynamicThreadPool(
+        min,
+        max,
+        './tests/worker-files/thread/testWorker.js',
+        { workerChoiceStrategy }
+      )
+      pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+      expect(pool.opts.workerChoiceStrategy).toBe(workerChoiceStrategy)
+      expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+        workerChoiceStrategy
+      )
+      await pool.destroy()
+    }
+  })
+
+  it('Verify available strategies default internals at pool creation', async () => {
+    const pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js'
     )
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    // We need to clean up the resources after our test
+    for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+      if (workerChoiceStrategy === WorkerChoiceStrategies.ROUND_ROBIN) {
+        expect(
+          pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+            workerChoiceStrategy
+          ).nextWorkerNodeId
+        ).toBe(0)
+      } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
+        for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+          .get(workerChoiceStrategy)
+          .workerLastVirtualTaskTimestamp.keys()) {
+          expect(
+            pool.workerChoiceStrategyContext.workerChoiceStrategies
+              .get(workerChoiceStrategy)
+              .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
+          ).toBe(0)
+          expect(
+            pool.workerChoiceStrategyContext.workerChoiceStrategies
+              .get(workerChoiceStrategy)
+              .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
+          ).toBe(0)
+        }
+      } else if (
+        workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+      ) {
+        expect(
+          pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+            workerChoiceStrategy
+          ).currentWorkerNodeId
+        ).toBe(0)
+        expect(
+          pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+            workerChoiceStrategy
+          ).defaultWorkerWeight
+        ).toBeGreaterThan(0)
+        for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
+          .get(workerChoiceStrategy)
+          .workersTaskRunTime.keys()) {
+          expect(
+            pool.workerChoiceStrategyContext.workerChoiceStrategies
+              .get(workerChoiceStrategy)
+              .workersTaskRunTime.get(workerNodeKey).weight
+          ).toBeGreaterThan(0)
+          expect(
+            pool.workerChoiceStrategyContext.workerChoiceStrategies
+              .get(workerChoiceStrategy)
+              .workersTaskRunTime.get(workerNodeKey).runTime
+          ).toBe(0)
+        }
+      }
+    }
     await pool.destroy()
   })
 
   it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -91,7 +145,7 @@ describe('Selection strategies test suite', () => {
       min,
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -140,9 +194,11 @@ describe('Selection strategies test suite', () => {
   })
 
   it('Verify ROUND_ROBIN strategy runtime behavior', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     let pool = new FixedClusterPool(
       max,
-      './tests/worker-files/cluster/testWorker.js'
+      './tests/worker-files/cluster/testWorker.js',
+      { workerChoiceStrategy }
     )
     let results = new Set()
     for (let i = 0; i < max; i++) {
@@ -150,7 +206,11 @@ describe('Selection strategies test suite', () => {
     }
     expect(results.size).toBe(max)
     await pool.destroy()
-    pool = new FixedThreadPool(max, './tests/worker-files/thread/testWorker.js')
+    pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
     results = new Set()
     for (let i = 0; i < max; i++) {
       results.add(pool.chooseWorkerNode()[1].worker.threadId)
@@ -160,6 +220,7 @@ describe('Selection strategies test suite', () => {
   })
 
   it('Verify ROUND_ROBIN strategy internals are resets after setting it', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js',
@@ -167,10 +228,10 @@ describe('Selection strategies test suite', () => {
     )
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       ).nextWorkerNodeId
     ).toBeDefined()
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
+    pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -185,10 +246,10 @@ describe('Selection strategies test suite', () => {
     )
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       ).nextWorkerNodeId
     ).toBeDefined()
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
+    pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -198,43 +259,12 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify LESS_USED strategy is taken at pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
-    )
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_USED
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_USED
-    )
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
-  it('Verify LESS_USED strategy can be set after pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js'
-    )
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED)
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_USED
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_USED
-    )
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
   it('Verify LESS_USED strategy default tasks usage statistics requirements', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LESS_USED
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -250,7 +280,7 @@ describe('Selection strategies test suite', () => {
       min,
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -298,43 +328,12 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify LESS_BUSY strategy is taken at pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
-    )
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_BUSY
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_BUSY
-    )
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
-  it('Verify LESS_BUSY strategy can be set after pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js'
-    )
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY)
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_BUSY
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_BUSY
-    )
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
   it('Verify LESS_BUSY strategy default tasks usage statistics requirements', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LESS_BUSY
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -350,7 +349,7 @@ describe('Selection strategies test suite', () => {
       min,
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -398,57 +397,12 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify FAIR_SHARE strategy is taken at pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
-    )
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.FAIR_SHARE
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.FAIR_SHARE
-    )
-    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-      .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-      .workerLastVirtualTaskTimestamp.keys()) {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
-      ).toBe(0)
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
-      ).toBe(0)
-    }
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
-  it('Verify FAIR_SHARE strategy can be set after pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js'
-    )
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.FAIR_SHARE
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.FAIR_SHARE
-    )
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
   it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -464,7 +418,7 @@ describe('Selection strategies test suite', () => {
       min,
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -526,16 +480,17 @@ describe('Selection strategies test suite', () => {
   })
 
   it('Verify FAIR_SHARE strategy internals are resets after setting it', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js'
     )
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.FAIR_SHARE
+        workerChoiceStrategy
       ).workerLastVirtualTaskTimestamp
     ).toBeDefined()
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+    pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
       .workerLastVirtualTaskTimestamp.keys()) {
@@ -558,10 +513,10 @@ describe('Selection strategies test suite', () => {
     )
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.FAIR_SHARE
+        workerChoiceStrategy
       ).workerLastVirtualTaskTimestamp
     ).toBeDefined()
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
+    pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
       .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
       .workerLastVirtualTaskTimestamp.keys()) {
@@ -580,67 +535,12 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify WEIGHTED_ROUND_ROBIN strategy is taken at pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
-    )
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-    )
-    expect(
-      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).currentWorkerNodeId
-    ).toBe(0)
-    expect(
-      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).defaultWorkerWeight
-    ).toBeGreaterThan(0)
-    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-      .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-      .workersTaskRunTime.keys()) {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workersTaskRunTime.get(workerNodeKey).weight
-      ).toBeGreaterThan(0)
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workersTaskRunTime.get(workerNodeKey).runTime
-      ).toBe(0)
-    }
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
-  it('Verify WEIGHTED_ROUND_ROBIN strategy can be set after pool creation', async () => {
-    const pool = new FixedThreadPool(
-      max,
-      './tests/worker-files/thread/testWorker.js'
-    )
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
-    expect(pool.opts.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-    )
-    expect(pool.workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-    )
-    // We need to clean up the resources after our test
-    await pool.destroy()
-  })
-
   it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -656,7 +556,7 @@ describe('Selection strategies test suite', () => {
       min,
       max,
       './tests/worker-files/thread/testWorker.js',
-      { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
+      { workerChoiceStrategy }
     )
     expect(
       pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
@@ -721,26 +621,27 @@ describe('Selection strategies test suite', () => {
   })
 
   it('Verify WEIGHTED_ROUND_ROBIN strategy internals are resets after setting it', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
     let pool = new FixedThreadPool(
       max,
       './tests/worker-files/thread/testWorker.js'
     )
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).currentWorkerNodeId
     ).toBeDefined()
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).defaultWorkerWeight
     ).toBeDefined()
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).workersTaskRunTime
     ).toBeDefined()
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+    pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
@@ -768,20 +669,20 @@ describe('Selection strategies test suite', () => {
     )
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).currentWorkerNodeId
     ).toBeDefined()
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).defaultWorkerWeight
     ).toBeDefined()
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).workersTaskRunTime
     ).toBeDefined()
-    pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
+    pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
index c571478cbba66e3810e7cb66c740dd681cc95353..23d954c9f13159940e542c8774a3d22feefeaaad 100644 (file)
@@ -112,238 +112,228 @@ describe('Worker choice strategy context test suite', () => {
   })
 
   it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and fixed pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       fixedPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(false)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.ROUND_ROBIN
+      workerChoiceStrategy
     )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with ROUND_ROBIN and dynamic pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(true)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
-    )
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.ROUND_ROBIN
+      workerChoiceStrategy
     )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.ROUND_ROBIN
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(RoundRobinWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.ROUND_ROBIN
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with LESS_USED and fixed pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LESS_USED
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       fixedPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_USED
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(false)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.LESS_USED
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_USED
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(LessUsedWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_USED
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with LESS_USED and dynamic pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LESS_USED
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_USED
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(true)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.LESS_USED
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_USED
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(LessUsedWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_USED
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and fixed pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LESS_BUSY
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       fixedPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_BUSY
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(false)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.LESS_BUSY
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_BUSY
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(LessBusyWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_BUSY
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with LESS_BUSY and dynamic pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LESS_BUSY
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_BUSY
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(true)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.LESS_BUSY
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.LESS_BUSY
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(LessBusyWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.LESS_BUSY
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       fixedPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.FAIR_SHARE
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(false)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.FAIR_SHARE
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.FAIR_SHARE
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(FairShareWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.FAIR_SHARE
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and dynamic pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.FAIR_SHARE
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(true)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.FAIR_SHARE
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.FAIR_SHARE
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(FairShareWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.FAIR_SHARE
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and fixed pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       fixedPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(false)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+      workerChoiceStrategy
     )
   })
 
   it('Verify that setWorkerChoiceStrategy() works with WEIGHTED_ROUND_ROBIN and dynamic pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
       dynamicPool
     )
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       ).isDynamicPool
     ).toBe(true)
-    workerChoiceStrategyContext.setWorkerChoiceStrategy(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
-    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
       workerChoiceStrategyContext.workerChoiceStrategies.get(
-        WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+        workerChoiceStrategy
       )
     ).toBeInstanceOf(WeightedRoundRobinWorkerChoiceStrategy)
     expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
-      WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+      workerChoiceStrategy
     )
   })
 })
index cf5c3175df42d9a5c94732e46d440af5a8f5deb1..7b4b9bf536acd131c3dcc1a9e729f90e86ff05e4 100644 (file)
@@ -12,6 +12,17 @@ describe('Fixed thread pool test suite', () => {
       errorHandler: e => console.error(e)
     }
   )
+  const queuePool = new FixedThreadPool(
+    numberOfThreads,
+    './tests/worker-files/thread/testWorker.js',
+    {
+      enableTasksQueue: true,
+      tasksQueueOptions: {
+        concurrency: 2
+      },
+      errorHandler: e => console.error(e)
+    }
+  )
   const emptyPool = new FixedThreadPool(
     numberOfThreads,
     './tests/worker-files/thread/emptyWorker.js',
@@ -47,6 +58,7 @@ describe('Fixed thread pool test suite', () => {
     await errorPool.destroy()
     await asyncErrorPool.destroy()
     await emptyPool.destroy()
+    await queuePool.destroy()
   })
 
   it('Verify that the function is executed in a worker thread', async () => {
@@ -76,6 +88,31 @@ describe('Fixed thread pool test suite', () => {
     expect(poolBusy).toBe(numberOfThreads + 1)
   })
 
+  it('Verify that tasks queuing is working', async () => {
+    const maxMultiplier = 10
+    for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
+      queuePool.execute()
+    }
+    for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.tasksUsage.running).toBeLessThanOrEqual(
+        queuePool.opts.tasksQueueOptions.concurrency
+      )
+      expect(workerNode.tasksUsage.run).toBe(0)
+      expect(workerNode.tasksQueue.length).toBeGreaterThan(0)
+    }
+    // FIXME: wait for ongoing tasks to be executed
+    const promises = []
+    for (let i = 0; i < numberOfThreads * maxMultiplier; i++) {
+      promises.push(queuePool.execute())
+    }
+    await Promise.all(promises)
+    for (const workerNode of queuePool.workerNodes) {
+      expect(workerNode.tasksUsage.running).toBe(0)
+      expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
+      expect(workerNode.tasksQueue.length).toBe(0)
+    }
+  })
+
   it('Verify that is possible to have a worker that return undefined', async () => {
     const result = await emptyPool.execute()
     expect(result).toBeUndefined()