perf: optimize task(s) stealing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 8 May 2024 09:43:09 +0000 (11:43 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 8 May 2024 09:43:09 +0000 (11:43 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/priority-queue.ts
tests/priority-queue.test.mjs

index fa5c772de731c3baa7a95dd99852d9280a72e457..ab0615e167750db1667e38278f191abe0431bd6e 100644 (file)
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Optimize task(s) stealing by dequeuing task(s) from the last prioritized bucket.
+
 ## [4.0.2] - 2024-05-06
 
 ### Fixed
index ad7adce328f2c0c27618d2fd237e412e2a4fa17d..065d39964f793b489e283bf7c2e047b708348300 100644 (file)
@@ -1817,7 +1817,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.dequeueTask(1)!
+      const task = sourceWorkerNode.dequeueLastBucketTask()!
       this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1868,7 +1868,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.dequeueTask(1)!
+        const task = sourceWorkerNode.dequeueLastBucketTask()!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
@@ -2105,11 +2105,8 @@ export abstract class AbstractPool<
     return tasksQueueSize
   }
 
-  private dequeueTask (
-    workerNodeKey: number,
-    bucket?: number
-  ): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask(bucket)
+  private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].dequeueTask()
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
index 64400d9eb1426be9ffe546b45855b606bfec05c0..5d28802c12769c1cbaf0cd519e22708918788016 100644 (file)
@@ -95,6 +95,12 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     return this.tasksQueue.dequeue(bucket)
   }
 
+  /** @inheritdoc */
+  public dequeueLastBucketTask (): Task<Data> | undefined {
+    // Start from the last empty or partially filled bucket
+    return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1)
+  }
+
   /** @inheritdoc */
   public clearTasksQueue (): void {
     this.tasksQueue.clear()
index 35d9f755bc3f9d4d321c8ac7b999261a3e1d613a..1a9973ba8a8ba53bcb9aedb59f4ac246d2e4045e 100644 (file)
@@ -325,6 +325,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * @returns The dequeued task.
    */
   readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
+  /**
+   * Dequeue last bucket task.
+   *
+   * @returns The dequeued task.
+   */
+  readonly dequeueLastBucketTask: () => Task<Data> | undefined
   /**
    * Clears tasks queue.
    */
index 64b5a9248a804130107b810c412bf4c978ae8cd9..1c0d8c929967b2fa52f728f7b395ed013f044fda 100644 (file)
@@ -26,6 +26,13 @@ export class PriorityQueue<T> {
   /** The maximum size of the priority queue. */
   public maxSize!: number
 
+  /**
+   * The number of filled prioritized buckets.
+   */
+  public get buckets (): number {
+    return this.k === Infinity ? 1 : Math.trunc(this.nodeArray.length / this.k)
+  }
+
   /**
    * Constructs a priority queue.
    *
@@ -51,10 +58,7 @@ export class PriorityQueue<T> {
    */
   public enqueue (data: T, priority?: number): number {
     priority = priority ?? 0
-    const startIndex =
-      this.k === Infinity || this.nodeArray.length / this.k < 1
-        ? 0
-        : Math.trunc(this.nodeArray.length / this.k) * this.k
+    const startIndex = this.k === Infinity ? 0 : this.buckets * this.k
     let inserted = false
     for (let index = startIndex; index < this.nodeArray.length; index++) {
       if (this.nodeArray[index].priority > priority) {
index 58931859cb02b7d67f63b6d3c25f0948d199c421..624d577a4c6d0466a1fe18272d3bb4310c7d0985 100644 (file)
@@ -15,11 +15,13 @@ describe('Priority queue test suite', () => {
     )
     let priorityQueue = new PriorityQueue()
     expect(priorityQueue.k).toBe(Infinity)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
     expect(priorityQueue.nodeArray).toStrictEqual([])
     priorityQueue = new PriorityQueue(2)
     expect(priorityQueue.k).toBe(2)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
     expect(priorityQueue.nodeArray).toStrictEqual([])
@@ -28,11 +30,13 @@ describe('Priority queue test suite', () => {
   it('Verify default k enqueue() behavior', () => {
     const priorityQueue = new PriorityQueue()
     let rtSize = priorityQueue.enqueue(1)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(1)
     expect(rtSize).toBe(priorityQueue.size)
     expect(priorityQueue.nodeArray).toStrictEqual([{ data: 1, priority: 0 }])
     rtSize = priorityQueue.enqueue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(2)
     expect(rtSize).toBe(priorityQueue.size)
@@ -41,6 +45,7 @@ describe('Priority queue test suite', () => {
       { data: 2, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtSize).toBe(priorityQueue.size)
@@ -50,6 +55,7 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(3, -1)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(4)
     expect(priorityQueue.maxSize).toBe(4)
     expect(rtSize).toBe(priorityQueue.size)
@@ -60,6 +66,7 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(1, 1)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(5)
     expect(priorityQueue.maxSize).toBe(5)
     expect(rtSize).toBe(priorityQueue.size)
@@ -75,11 +82,13 @@ describe('Priority queue test suite', () => {
   it('Verify k=2 enqueue() behavior', () => {
     const priorityQueue = new PriorityQueue(2)
     let rtSize = priorityQueue.enqueue(1)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(1)
     expect(rtSize).toBe(priorityQueue.size)
     expect(priorityQueue.nodeArray).toStrictEqual([{ data: 1, priority: 0 }])
     rtSize = priorityQueue.enqueue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(2)
     expect(rtSize).toBe(priorityQueue.size)
@@ -88,6 +97,7 @@ describe('Priority queue test suite', () => {
       { data: 2, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtSize).toBe(priorityQueue.size)
@@ -97,6 +107,7 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(3, -1)
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(4)
     expect(priorityQueue.maxSize).toBe(4)
     expect(rtSize).toBe(priorityQueue.size)
@@ -107,6 +118,7 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(1, 1)
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(5)
     expect(priorityQueue.maxSize).toBe(5)
     expect(rtSize).toBe(priorityQueue.size)
@@ -118,6 +130,7 @@ describe('Priority queue test suite', () => {
       { data: 1, priority: 1 }
     ])
     rtSize = priorityQueue.enqueue(3, -2)
+    expect(priorityQueue.buckets).toBe(3)
     expect(priorityQueue.size).toBe(6)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtSize).toBe(priorityQueue.size)
@@ -136,7 +149,11 @@ describe('Priority queue test suite', () => {
     priorityQueue.enqueue(1)
     priorityQueue.enqueue(2, -1)
     priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(1)
+    expect(priorityQueue.size).toBe(3)
+    expect(priorityQueue.maxSize).toBe(3)
     let rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtItem).toBe(2)
@@ -145,11 +162,13 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtItem).toBe(1)
     expect(priorityQueue.nodeArray).toStrictEqual([{ data: 3, priority: 0 }])
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtItem).toBe(3)
@@ -164,7 +183,11 @@ describe('Priority queue test suite', () => {
     priorityQueue.enqueue(3, -1)
     priorityQueue.enqueue(1, 1)
     priorityQueue.enqueue(3, -2)
+    expect(priorityQueue.buckets).toBe(3)
+    expect(priorityQueue.size).toBe(6)
+    expect(priorityQueue.maxSize).toBe(6)
     let rtItem = priorityQueue.dequeue(3)
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(5)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(3)
@@ -176,6 +199,7 @@ describe('Priority queue test suite', () => {
       { data: 1, priority: 1 }
     ])
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(4)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(1)
@@ -186,6 +210,7 @@ describe('Priority queue test suite', () => {
       { data: 1, priority: 1 }
     ])
     rtItem = priorityQueue.dequeue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(3)
@@ -195,6 +220,7 @@ describe('Priority queue test suite', () => {
       { data: 1, priority: 1 }
     ])
     rtItem = priorityQueue.dequeue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(1)
@@ -203,11 +229,13 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: -1 }
     ])
     rtItem = priorityQueue.dequeue(2)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(2)
     expect(priorityQueue.nodeArray).toStrictEqual([{ data: 3, priority: -1 }])
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(3)
@@ -251,6 +279,7 @@ describe('Priority queue test suite', () => {
     priorityQueue.enqueue(1)
     priorityQueue.enqueue(2)
     priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(priorityQueue.nodeArray).toStrictEqual([
@@ -259,6 +288,7 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     priorityQueue.clear()
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
     expect(priorityQueue.nodeArray).toStrictEqual([])