perf: fine tune continuous task stealing algorithm
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 26 Aug 2023 14:26:55 +0000 (16:26 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sat, 26 Aug 2023 14:26:55 +0000 (16:26 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/worker-node.ts
tests/pools/thread/fixed.test.js
tests/utils.test.js

index 020b4b6194a04cad3e056ed90e386570f4224a7e..941e085e94d0d883f61b22454948f337571fb8d9 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Make continuous tasks stealing algorithm less aggressive.
+
 ## [2.6.35] - 2023-08-25
 
 ### Fixed
index 9ecfd012f9b5402c4676004f24f7c04ecc4bf020..fbd00096a769d53cd5d3d9415fae64c428cfb3ee 100644 (file)
@@ -178,7 +178,11 @@ implements IWorkerNode<Worker, Data> {
   }
 
   private async startOnEmptyQueue (): Promise<void> {
-    if (this.tasksQueue.size > 0) {
+    if (
+      this.onEmptyQueueCount > 0 &&
+      this.usage.tasks.executing > 0 &&
+      this.tasksQueue.size > 0
+    ) {
       this.onEmptyQueueCount = 0
       return
     }
index 8cd337ee6fa1ceb7109e42a7e55255f76690392c..29ccd827102a7afe6b2a099a8ec5937f37a6e30a 100644 (file)
@@ -124,7 +124,9 @@ describe('Fixed thread pool test suite', () => {
       expect(workerNode.usage.tasks.maxQueued).toBe(
         maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
       )
+      expect(workerNode.usage.tasks.stolen).toBe(0)
     }
+    expect(queuePool.info.executedTasks).toBe(0)
     expect(queuePool.info.executingTasks).toBe(
       numberOfThreads * queuePool.opts.tasksQueueOptions.concurrency
     )
@@ -137,6 +139,7 @@ describe('Fixed thread pool test suite', () => {
         (maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency)
     )
     expect(queuePool.info.backPressure).toBe(false)
+    expect(queuePool.info.stolenTasks).toBe(0)
     await Promise.all(promises)
     for (const workerNode of queuePool.workerNodes) {
       expect(workerNode.usage.tasks.executing).toBeGreaterThanOrEqual(0)
@@ -148,7 +151,17 @@ describe('Fixed thread pool test suite', () => {
       expect(workerNode.usage.tasks.maxQueued).toBe(
         maxMultiplier - queuePool.opts.tasksQueueOptions.concurrency
       )
+      expect(workerNode.usage.tasks.stolen).toBeGreaterThanOrEqual(0)
+      expect(workerNode.usage.tasks.stolen).toBeLessThanOrEqual(
+        numberOfThreads * maxMultiplier
+      )
     }
+    expect(queuePool.info.executedTasks).toBe(numberOfThreads * maxMultiplier)
+    expect(queuePool.info.backPressure).toBe(false)
+    expect(queuePool.info.stolenTasks).toBeGreaterThanOrEqual(0)
+    expect(queuePool.info.stolenTasks).toBeLessThanOrEqual(
+      numberOfThreads * maxMultiplier
+    )
   })
 
   it('Verify that is possible to have a worker that return undefined', async () => {
index 0f72aa429fc5caae1494231f2bafaf7a3676b51b..6cfe99ae2f1198592d940bd0b1414d4fa038bb36 100644 (file)
@@ -34,13 +34,10 @@ describe('Utils test suite', () => {
   })
 
   it('Verify exponentialDelay() behavior', () => {
-    expect(typeof exponentialDelay(randomInt(1000)) === 'number').toBe(true)
-    expect(exponentialDelay(randomInt(1000))).toBeGreaterThanOrEqual(
-      Number.MIN_VALUE
-    )
-    expect(exponentialDelay(randomInt(1000))).toBeLessThanOrEqual(
-      Number.MAX_VALUE
-    )
+    const delay = exponentialDelay(randomInt(1000))
+    expect(typeof delay === 'number').toBe(true)
+    expect(delay).toBeGreaterThanOrEqual(Number.MIN_VALUE)
+    expect(delay).toBeLessThanOrEqual(Number.MAX_VALUE)
   })
 
   it('Verify average() computation', () => {