fix: fix task dequeuing from the last bucket
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 28 May 2024 17:42:19 +0000 (19:42 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 28 May 2024 17:42:19 +0000 (19:42 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/fixed-priority-queue.ts
src/pools/worker-node.ts
src/priority-queue.ts
tests/priority-queue.test.mjs

index 53ef6edbd082ffa33184ecf95171ef4fa3d859c7..2424b652033d42ee2dad7538a09b64621b60a4ae 100644 (file)
@@ -40,14 +40,31 @@ export class FixedPriorityQueue<T> {
     this.clear()
   }
 
+  /**
+   * Checks if the fixed priority queue is empty.
+   *
+   * @returns `true` if the fixed priority queue is empty, `false` otherwise.
+   */
   public empty (): boolean {
     return this.size === 0
   }
 
+  /**
+   * Checks if the fixed priority queue is full.
+   *
+   * @returns `true` if the fixed priority queue is full, `false` otherwise.
+   */
   public full (): boolean {
     return this.size === this.capacity
   }
 
+  /**
+   * Enqueue data into the fixed priority queue.
+   *
+   * @param data - Data to enqueue.
+   * @param priority - Priority of the data. Lower values have higher priority.
+   * @returns The new size of the priority queue.
+   */
   public enqueue (data: T, priority?: number): number {
     if (this.full()) {
       throw new Error('Priority queue is full')
@@ -78,6 +95,11 @@ export class FixedPriorityQueue<T> {
     return this.incrementSize()
   }
 
+  /**
+   * Dequeue data from the fixed priority queue.
+   *
+   * @returns The dequeued data or `undefined` if the priority queue is empty.
+   */
   public dequeue (): T | undefined {
     if (this.empty()) {
       return undefined
@@ -144,6 +166,11 @@ export class FixedPriorityQueue<T> {
     return this.size
   }
 
+  /**
+   * Checks the size.
+   *
+   * @param size - The size to check.
+   */
   private checkSize (size: number): void {
     if (!Number.isSafeInteger(size)) {
       throw new TypeError(
index 7ea617fbc4cd3023770699585a83757596efe12f..079557010565cc7a7e129d0f314caf4762eb7700 100644 (file)
@@ -114,7 +114,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public dequeueLastPrioritizedTask (): Task<Data> | undefined {
     // Start from the last empty or partially filled bucket
-    return this.dequeueTask()
+    return this.dequeueTask(this.tasksQueue.buckets + 1)
   }
 
   /** @inheritdoc */
index d1d9307804822bd7a243fe51004ee1bb9b08a280..ed4d63f5fa9cb03b71f1e91b472f84d1978d2573 100644 (file)
@@ -36,11 +36,19 @@ export class PriorityQueue<T> {
    * @returns PriorityQueue.
    */
   public constructor (bucketSize: number = defaultBucketSize) {
+    if (!Number.isSafeInteger(bucketSize)) {
+      throw new TypeError('bucketSize must be an integer')
+    }
+    if (bucketSize < 1) {
+      throw new RangeError('bucketSize must be greater than or equal to 1')
+    }
     this.bucketSize = bucketSize
     this.clear()
   }
 
-  /** The size of the priority queue. */
+  /**
+   * The size of the priority queue.
+   */
   public get size (): number {
     let node: PriorityQueueNode<T> | undefined = this.tail
     let size = 0
@@ -51,6 +59,13 @@ export class PriorityQueue<T> {
     return size
   }
 
+  /**
+   * The number of filled prioritized buckets.
+   */
+  public get buckets (): number {
+    return Math.trunc(this.size / this.bucketSize)
+  }
+
   /**
    * Enqueue data into the priority queue.
    *
index f366dc0fe5e442d3500b035de1f925abb840a1c3..bcd3cf0a169e0a9b0ca1f806f3328a2d3d719131 100644 (file)
@@ -5,7 +5,18 @@ import { defaultBucketSize, PriorityQueue } from '../lib/priority-queue.cjs'
 
 describe('Priority queue test suite', () => {
   it('Verify constructor() behavior', () => {
-    const priorityQueue = new PriorityQueue()
+    expect(() => new PriorityQueue('')).toThrow(
+      new TypeError('bucketSize must be an integer')
+    )
+    expect(() => new PriorityQueue(-1)).toThrow(
+      new RangeError('bucketSize must be greater than or equal to 1')
+    )
+    expect(() => new PriorityQueue(0)).toThrow(
+      new RangeError('bucketSize must be greater than or equal to 1')
+    )
+    let priorityQueue = new PriorityQueue()
+    expect(priorityQueue.bucketSize).toBe(defaultBucketSize)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
     expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue)
@@ -13,11 +24,23 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.capacity).toBe(defaultBucketSize)
     expect(priorityQueue.tail).toBeInstanceOf(FixedPriorityQueue)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
+    const bucketSize = 2
+    priorityQueue = new PriorityQueue(bucketSize)
+    expect(priorityQueue.bucketSize).toBe(bucketSize)
+    expect(priorityQueue.buckets).toBe(0)
+    expect(priorityQueue.size).toBe(0)
+    expect(priorityQueue.maxSize).toBe(0)
+    expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.head.next).toBe(undefined)
+    expect(priorityQueue.head.capacity).toBe(bucketSize)
+    expect(priorityQueue.tail).toBeInstanceOf(FixedPriorityQueue)
+    expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
   })
 
   it('Verify default bucket size enqueue() behavior', () => {
     const priorityQueue = new PriorityQueue()
     let rtSize = priorityQueue.enqueue(1)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(1)
     expect(rtSize).toBe(priorityQueue.size)
@@ -27,6 +50,7 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(2)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(2)
     expect(rtSize).toBe(priorityQueue.size)
@@ -37,6 +61,7 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtSize).toBe(priorityQueue.size)
@@ -48,6 +73,7 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(3, -1)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(4)
     expect(priorityQueue.maxSize).toBe(4)
     expect(rtSize).toBe(priorityQueue.size)
@@ -60,6 +86,7 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(1, 1)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(5)
     expect(priorityQueue.maxSize).toBe(5)
     expect(rtSize).toBe(priorityQueue.size)
@@ -77,6 +104,7 @@ describe('Priority queue test suite', () => {
   it('Verify bucketSize=2 enqueue() behavior', () => {
     const priorityQueue = new PriorityQueue(2)
     let rtSize = priorityQueue.enqueue(1)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(1)
     expect(rtSize).toBe(priorityQueue.size)
@@ -86,6 +114,7 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(2)
     expect(rtSize).toBe(priorityQueue.size)
@@ -96,6 +125,7 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtSize).toBe(priorityQueue.size)
@@ -109,6 +139,7 @@ describe('Priority queue test suite', () => {
     ])
     expect(priorityQueue.tail.next).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(3, -1)
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(4)
     expect(priorityQueue.maxSize).toBe(4)
     expect(rtSize).toBe(priorityQueue.size)
@@ -123,6 +154,7 @@ describe('Priority queue test suite', () => {
     ])
     expect(priorityQueue.tail.next).toStrictEqual(priorityQueue.head)
     rtSize = priorityQueue.enqueue(1, 1)
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(5)
     expect(priorityQueue.maxSize).toBe(5)
     expect(rtSize).toBe(priorityQueue.size)
@@ -140,6 +172,7 @@ describe('Priority queue test suite', () => {
       { data: 3, priority: 0 }
     ])
     rtSize = priorityQueue.enqueue(3, -2)
+    expect(priorityQueue.buckets).toBe(3)
     expect(priorityQueue.size).toBe(6)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtSize).toBe(priorityQueue.size)
@@ -164,23 +197,27 @@ describe('Priority queue test suite', () => {
     priorityQueue.enqueue(1)
     priorityQueue.enqueue(2, -1)
     priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBe(undefined)
     let rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtItem).toBe(2)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBe(undefined)
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtItem).toBe(1)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBe(undefined)
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(3)
     expect(rtItem).toBe(3)
@@ -196,41 +233,48 @@ describe('Priority queue test suite', () => {
     priorityQueue.enqueue(3, -1)
     priorityQueue.enqueue(1, 1)
     priorityQueue.enqueue(3, -2)
+    expect(priorityQueue.buckets).toBe(3)
     expect(priorityQueue.size).toBe(6)
     expect(priorityQueue.maxSize).toBe(6)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
     let rtItem = priorityQueue.dequeue(3)
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(5)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(3)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(2)
     expect(priorityQueue.size).toBe(4)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(1)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
     rtItem = priorityQueue.dequeue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(3)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
     rtItem = priorityQueue.dequeue(2)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(2)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(3)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
     rtItem = priorityQueue.dequeue(2)
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(1)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail.next).toBeInstanceOf(FixedPriorityQueue)
     rtItem = priorityQueue.dequeue()
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(6)
     expect(rtItem).toBe(2)
@@ -255,12 +299,14 @@ describe('Priority queue test suite', () => {
     priorityQueue.enqueue(1)
     priorityQueue.enqueue(2)
     priorityQueue.enqueue(3)
+    expect(priorityQueue.buckets).toBe(1)
     expect(priorityQueue.size).toBe(3)
     expect(priorityQueue.maxSize).toBe(3)
     expect(priorityQueue.head.empty()).toBe(false)
     expect(priorityQueue.tail.empty()).toBe(false)
     expect(priorityQueue.tail).not.toStrictEqual(priorityQueue.head)
     priorityQueue.clear()
+    expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
     expect(priorityQueue.head.empty()).toBe(true)