fix: fix fair share algorithm implementation
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
index 7755cc341f6ec97f143ba7253dae95601345de3e..392aafcf3a58a79f6744a37268b01415243a2b97 100644 (file)
@@ -4,7 +4,7 @@ const {
   DynamicThreadPool,
   FixedThreadPool,
   FixedClusterPool
-} = require('../../../lib/index')
+} = require('../../../lib')
 
 describe('Selection strategies test suite', () => {
   const min = 0
@@ -77,20 +77,16 @@ describe('Selection strategies test suite', () => {
           ).nextWorkerNodeId
         ).toBe(0)
       } else if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
-        for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.keys()) {
-          expect(
-            pool.workerChoiceStrategyContext.workerChoiceStrategies
-              .get(workerChoiceStrategy)
-              .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
-          ).toBe(0)
-          expect(
-            pool.workerChoiceStrategyContext.workerChoiceStrategies
-              .get(workerChoiceStrategy)
-              .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
-          ).toBe(0)
-        }
+        expect(
+          pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+            workerChoiceStrategy
+          ).workersVirtualTaskTimestamp
+        ).toBeInstanceOf(Array)
+        expect(
+          pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+            workerChoiceStrategy
+          ).workersVirtualTaskTimestamp.length
+        ).toBe(0)
       } else if (
         workerChoiceStrategy === WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
       ) {
@@ -104,20 +100,11 @@ describe('Selection strategies test suite', () => {
             workerChoiceStrategy
           ).defaultWorkerWeight
         ).toBeGreaterThan(0)
-        for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(workerChoiceStrategy)
-          .workersTaskRunTime.keys()) {
-          expect(
-            pool.workerChoiceStrategyContext.workerChoiceStrategies
-              .get(workerChoiceStrategy)
-              .workersTaskRunTime.get(workerNodeKey).weight
-          ).toBeGreaterThan(0)
-          expect(
-            pool.workerChoiceStrategyContext.workerChoiceStrategies
-              .get(workerChoiceStrategy)
-              .workersTaskRunTime.get(workerNodeKey).runTime
-          ).toBe(0)
-        }
+        expect(
+          pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+            workerChoiceStrategy
+          ).workerVirtualTaskRunTime
+        ).toBe(0)
       }
     }
     await pool.destroy()
@@ -166,11 +153,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -183,11 +169,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `RoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -301,11 +286,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
     )
     // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -318,11 +302,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_USED }
     )
     // TODO: Create a better test to cover `LessUsedWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -370,11 +353,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
     )
     // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -387,11 +369,10 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.LESS_BUSY }
     )
     // TODO: Create a better test to cover `LessBusyWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
     }
-    await Promise.all(promises)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -439,15 +420,20 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
+    }
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
     }
-    await Promise.all(promises)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).workerLastVirtualTaskTimestamp.size
+      ).workersVirtualTaskTimestamp.length
     ).toBe(pool.workerNodes.length)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -461,19 +447,53 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE }
     )
     // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
-    const promises = []
     const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
+    }
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+    }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(pool.workerNodes.length)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify FAIR_SHARE strategy can be run in a dynamic pool with median run time statistic', async () => {
+    const pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      {
+        workerChoiceStrategy: WorkerChoiceStrategies.FAIR_SHARE,
+        workerChoiceStrategyOptions: {
+          medRunTime: true
+        }
+      }
+    )
+    // TODO: Create a better test to cover `FairShareChoiceStrategy#choose`
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
+    }
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0)
     }
-    await Promise.all(promises)
-    // if (process.platform !== 'win32') {
-    //   expect(
-    //     pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-    //       pool.workerChoiceStrategyContext.workerChoiceStrategy
-    //     ).workerLastVirtualTaskTimestamp.size
-    //   ).toBe(pool.workerNodes.length)
-    // }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(pool.workerNodes.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -487,23 +507,32 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         workerChoiceStrategy
-      ).workerLastVirtualTaskTimestamp
-    ).toBeDefined()
+      ).workersVirtualTaskTimestamp
+    ).toBeInstanceOf(Array)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(0)
+    pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+      workerChoiceStrategy
+    ).workersVirtualTaskTimestamp[0] = 0
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(1)
     pool.setWorkerChoiceStrategy(workerChoiceStrategy)
-    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-      .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-      .workerLastVirtualTaskTimestamp.keys()) {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
-      ).toBe(0)
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
-      ).toBe(0)
-    }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp
+    ).toBeInstanceOf(Array)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(0)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -513,23 +542,32 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         workerChoiceStrategy
-      ).workerLastVirtualTaskTimestamp
-    ).toBeDefined()
+      ).workersVirtualTaskTimestamp
+    ).toBeInstanceOf(Array)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(0)
+    pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+      workerChoiceStrategy
+    ).workersVirtualTaskTimestamp[0] = 0
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(1)
     pool.setWorkerChoiceStrategy(workerChoiceStrategy)
-    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-      .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-      .workerLastVirtualTaskTimestamp.keys()) {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.get(workerNodeKey).start
-      ).toBe(0)
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workerLastVirtualTaskTimestamp.get(workerNodeKey).end
-      ).toBe(0)
-    }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp
+    ).toBeInstanceOf(Array)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workersVirtualTaskTimestamp.length
+    ).toBe(0)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -577,16 +615,26 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
-    for (let i = 0; i < max * 2; i++) {
-      promises.push(pool.execute())
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
+    }
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
     }
-    await Promise.all(promises)
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).workersTaskRunTime.size
-    ).toBe(pool.workerNodes.length)
+      ).defaultWorkerWeight
+    ).toBeGreaterThan(0)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).workerVirtualTaskRunTime
+    ).toBeGreaterThanOrEqual(0)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -599,22 +647,63 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN }
     )
     // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
-    const promises = []
-    const maxMultiplier =
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
+    }
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBeGreaterThanOrEqual(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBe(0)
+    }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).defaultWorkerWeight
+    ).toBeGreaterThan(0)
+    expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         pool.workerChoiceStrategyContext.workerChoiceStrategy
-      ).defaultWorkerWeight * 50
+      ).workerVirtualTaskRunTime
+    ).toBeGreaterThanOrEqual(0)
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify WEIGHTED_ROUND_ROBIN strategy can be run in a dynamic pool with median run time statistic', async () => {
+    const pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      {
+        workerChoiceStrategy: WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN,
+        workerChoiceStrategyOptions: {
+          medRunTime: true
+        }
+      }
+    )
+    // TODO: Create a better test to cover `WeightedRoundRobinWorkerChoiceStrategy#choose`
+    const maxMultiplier = 2
     for (let i = 0; i < max * maxMultiplier; i++) {
-      promises.push(pool.execute())
+      await pool.execute()
     }
-    await Promise.all(promises)
-    if (process.platform !== 'win32') {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
-          pool.workerChoiceStrategyContext.workerChoiceStrategy
-        ).workersTaskRunTime.size
-      ).toBe(pool.workerNodes.length)
+    for (const workerNode of pool.workerNodes) {
+      expect(workerNode.tasksUsage.avgRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.avgRunTime).toBe(0)
+      expect(workerNode.tasksUsage.medRunTime).toBeDefined()
+      expect(workerNode.tasksUsage.medRunTime).toBeGreaterThanOrEqual(0)
     }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).defaultWorkerWeight
+    ).toBeGreaterThan(0)
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        pool.workerChoiceStrategyContext.workerChoiceStrategy
+      ).workerVirtualTaskRunTime
+    ).toBeGreaterThanOrEqual(0)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -638,7 +727,7 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         workerChoiceStrategy
-      ).workersTaskRunTime
+      ).workerVirtualTaskRunTime
     ).toBeDefined()
     pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
@@ -651,15 +740,11 @@ describe('Selection strategies test suite', () => {
         pool.workerChoiceStrategyContext.workerChoiceStrategy
       ).defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-      .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-      .workersTaskRunTime.keys()) {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workersTaskRunTime.get(workerNodeKey).runTime
-      ).toBe(0)
-    }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workerVirtualTaskRunTime
+    ).toBe(0)
     await pool.destroy()
     pool = new DynamicThreadPool(
       min,
@@ -679,7 +764,7 @@ describe('Selection strategies test suite', () => {
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
         workerChoiceStrategy
-      ).workersTaskRunTime
+      ).workerVirtualTaskRunTime
     ).toBeDefined()
     pool.setWorkerChoiceStrategy(workerChoiceStrategy)
     expect(
@@ -692,15 +777,11 @@ describe('Selection strategies test suite', () => {
         pool.workerChoiceStrategyContext.workerChoiceStrategy
       ).defaultWorkerWeight
     ).toBeGreaterThan(0)
-    for (const workerNodeKey of pool.workerChoiceStrategyContext.workerChoiceStrategies
-      .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-      .workersTaskRunTime.keys()) {
-      expect(
-        pool.workerChoiceStrategyContext.workerChoiceStrategies
-          .get(pool.workerChoiceStrategyContext.workerChoiceStrategy)
-          .workersTaskRunTime.get(workerNodeKey).runTime
-      ).toBe(0)
-    }
+    expect(
+      pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      ).workerVirtualTaskRunTime
+    ).toBe(0)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -714,8 +795,6 @@ describe('Selection strategies test suite', () => {
           './tests/worker-files/thread/testWorker.js',
           { workerChoiceStrategy: 'UNKNOWN_STRATEGY' }
         )
-    ).toThrowError(
-      new Error("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
-    )
+    ).toThrowError("Invalid worker choice strategy 'UNKNOWN_STRATEGY'")
   })
 })