feat: continuous task stealing
[poolifier.git] / tests / pools / selection-strategies / selection-strategies.test.js
index 8894ba3126102faef6ab1dbea23ef20676c21a37..77713b6f03f20f8dc883cad07820b292a16d47de 100644 (file)
@@ -3,11 +3,9 @@ const {
   DynamicThreadPool,
   FixedClusterPool,
   FixedThreadPool,
-  PoolEvents,
   WorkerChoiceStrategies
 } = require('../../../lib')
 const { CircularArray } = require('../../../lib/circular-array')
-const { waitPoolEvents } = require('../../test-utils')
 
 describe('Selection strategies test suite', () => {
   const min = 0
@@ -125,7 +123,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: true
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -135,7 +134,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: true
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -217,6 +217,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -261,10 +262,11 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -282,6 +284,10 @@ describe('Selection strategies test suite', () => {
           }
         }
       })
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -366,7 +372,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -376,7 +383,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -458,6 +466,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -505,6 +514,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -539,7 +549,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -549,7 +560,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -631,6 +643,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -688,6 +701,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -732,7 +746,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -742,7 +757,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -824,6 +840,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -833,12 +850,12 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -877,6 +894,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -886,12 +904,12 @@ describe('Selection strategies test suite', () => {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -917,7 +935,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -927,7 +946,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: false
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -1009,21 +1029,22 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: expect.objectContaining({
+        runTime: {
           history: expect.any(CircularArray)
-        }),
+        },
         waitTime: {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1077,21 +1098,22 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: expect.objectContaining({
+        runTime: {
           history: expect.any(CircularArray)
-        }),
+        },
         waitTime: {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1150,21 +1172,22 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: expect.objectContaining({
+        runTime: {
           history: expect.any(CircularArray)
-        }),
+        },
         waitTime: {
           history: expect.any(CircularArray)
         },
         elu: {
-          idle: expect.objectContaining({
+          idle: {
             history: expect.any(CircularArray)
-          }),
-          active: expect.objectContaining({
+          },
+          active: {
             history: expect.any(CircularArray)
-          })
+          }
         }
       })
       expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
@@ -1279,7 +1302,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: true
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -1289,7 +1313,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: true
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -1371,6 +1396,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: expect.objectContaining({
@@ -1438,15 +1464,12 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: {
-          aggregate: expect.any(Number),
-          maximum: expect.any(Number),
-          minimum: expect.any(Number),
-          average: expect.any(Number),
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         waitTime: {
           history: expect.any(CircularArray)
         },
@@ -1463,8 +1486,16 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
-      expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+      if (workerNode.usage.runTime.aggregate == null) {
+        expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+      } else {
+        expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+      }
+      if (workerNode.usage.runTime.average == null) {
+        expect(workerNode.usage.runTime.average).toBeUndefined()
+      } else {
+        expect(workerNode.usage.runTime.average).toBeGreaterThan(0)
+      }
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1506,15 +1537,12 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
-        runTime: {
-          aggregate: expect.any(Number),
-          maximum: expect.any(Number),
-          minimum: expect.any(Number),
-          median: expect.any(Number),
+        runTime: expect.objectContaining({
           history: expect.any(CircularArray)
-        },
+        }),
         waitTime: {
           history: expect.any(CircularArray)
         },
@@ -1531,8 +1559,16 @@ describe('Selection strategies test suite', () => {
       expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
-      expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
-      expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
+      if (workerNode.usage.runTime.aggregate == null) {
+        expect(workerNode.usage.runTime.aggregate).toBeUndefined()
+      } else {
+        expect(workerNode.usage.runTime.aggregate).toBeGreaterThan(0)
+      }
+      if (workerNode.usage.runTime.median == null) {
+        expect(workerNode.usage.runTime.median).toBeUndefined()
+      } else {
+        expect(workerNode.usage.runTime.median).toBeGreaterThan(0)
+      }
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(
@@ -1635,7 +1671,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: true
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -1645,7 +1682,8 @@ describe('Selection strategies test suite', () => {
       { workerChoiceStrategy }
     )
     expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
-      useDynamicWorker: true
+      dynamicWorkerUsage: false,
+      dynamicWorkerReady: true
     })
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -1717,8 +1755,6 @@ describe('Selection strategies test suite', () => {
           WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
       }
     )
-    // FIXME: shall not be needed
-    await waitPoolEvents(pool, PoolEvents.ready, 1)
     // TODO: Create a better test to cover `InterleavedWeightedRoundRobinWorkerChoiceStrategy#choose`
     const promises = new Set()
     const maxMultiplier = 2
@@ -1733,6 +1769,7 @@ describe('Selection strategies test suite', () => {
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -1799,10 +1836,11 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.usage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           maxQueued: 0,
+          stolen: 0,
           failed: 0
         },
         runTime: {
@@ -1820,6 +1858,10 @@ describe('Selection strategies test suite', () => {
           }
         }
       })
+      expect(workerNode.usage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
     }
     expect(
       pool.workerChoiceStrategyContext.workerChoiceStrategies.get(