feat: implement k-priority queue
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 29 Apr 2024 15:32:59 +0000 (17:32 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 29 Apr 2024 15:32:59 +0000 (17:32 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/pools/worker-node.ts
src/priority-queue.ts
tests/priority-queue.test.mjs

index 99f4ab2006734ce5fce3a82df59a436115c3fc7a..e7fbe0e83cc9d74fae0ccfd8338870fb8e0c2907 100644 (file)
@@ -3,6 +3,7 @@ import { MessageChannel } from 'node:worker_threads'
 
 import { CircularArray } from '../circular-array.js'
 import { Deque } from '../deque.js'
+import { PriorityQueue } from '../priority-queue.js'
 import type { Task } from '../utility-types.js'
 import { DEFAULT_TASK_NAME } from '../utils.js'
 import {
@@ -45,6 +46,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   /** @inheritdoc */
   public tasksQueueBackPressureSize: number
   private readonly tasksQueue: Deque<Task<Data>>
+  private readonly tasksQueue2 = new PriorityQueue<Task<Data>>()
   private onBackPressureStarted: boolean
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
index 168e6c3455ca9fe82a864ad93fb61e3e894397a5..cf4fb150e1e1caba20ff9e9d579128c110c40e73 100644 (file)
@@ -27,8 +27,14 @@ export class PriorityQueue<T> {
    * @param k - Prioritized bucket size.
    */
   public constructor (k = Infinity) {
-    this.clear()
+    if (k !== Infinity && !Number.isSafeInteger(k)) {
+      throw new TypeError('k must be an integer')
+    }
+    if (k < 1) {
+      throw new RangeError('k must be greater than or equal to 1')
+    }
     this.k = k
+    this.clear()
   }
 
   /**
@@ -40,9 +46,13 @@ export class PriorityQueue<T> {
    */
   public enqueue (data: T, priority?: number): number {
     priority = priority ?? 0
+    const startIndex =
+      this.k === Infinity || this.nodeArray.length / this.k < 1
+        ? 0
+        : Math.trunc(this.nodeArray.length / this.k) * this.k
     let inserted = false
-    for (const [index, node] of this.nodeArray.entries()) {
-      if (node.priority > priority) {
+    for (let index = startIndex; index < this.nodeArray.length; index++) {
+      if (this.nodeArray[index].priority > priority) {
         this.nodeArray.splice(index, 0, { data, priority })
         inserted = true
         break
index 0c39e7461b2c6c4ef5cb1fa4e8ac4614e682932e..6b6c5f409b7d8d85319bcc7647fe4193f7a80045 100644 (file)
@@ -4,7 +4,29 @@ import { expect } from 'expect'
 import { PriorityQueue } from '../lib/priority-queue.cjs'
 
 describe.skip('Priority queue test suite', () => {
-  it('Verify enqueue() behavior', () => {
+  it('Verify constructor() behavior', () => {
+    expect(() => new PriorityQueue('')).toThrow(
+      new TypeError('k must be an integer')
+    )
+    expect(() => new PriorityQueue(-1)).toThrow(
+      new RangeError('k must be greater than or equal to 1')
+    )
+    expect(() => new PriorityQueue(0)).toThrow(
+      new RangeError('k must be greater than or equal to 1')
+    )
+    let priorityQueue = new PriorityQueue()
+    expect(priorityQueue.k).toBe(Infinity)
+    expect(priorityQueue.size).toBe(0)
+    expect(priorityQueue.maxSize).toBe(0)
+    expect(priorityQueue.nodeArray).toStrictEqual([])
+    priorityQueue = new PriorityQueue(2)
+    expect(priorityQueue.k).toBe(2)
+    expect(priorityQueue.size).toBe(0)
+    expect(priorityQueue.maxSize).toBe(0)
+    expect(priorityQueue.nodeArray).toStrictEqual([])
+  })
+
+  it('Verify default k enqueue() behavior', () => {
     const priorityQueue = new PriorityQueue()
     let rtSize = priorityQueue.enqueue(1)
     expect(priorityQueue.size).toBe(1)
@@ -51,6 +73,65 @@ describe.skip('Priority queue test suite', () => {
     ])
   })
 
+  it('Verify k=2 enqueue() behavior', () => {
+    const priorityQueue = new PriorityQueue(2)
+    let rtSize = priorityQueue.enqueue(1)
+    expect(priorityQueue.size).toBe(1)
+    expect(priorityQueue.maxSize).toBe(1)
+    expect(rtSize).toBe(priorityQueue.size)
+    expect(priorityQueue.nodeArray).toStrictEqual([{ data: 1, priority: 0 }])
+    rtSize = priorityQueue.enqueue(2)
+    expect(priorityQueue.size).toBe(2)
+    expect(priorityQueue.maxSize).toBe(2)
+    expect(rtSize).toBe(priorityQueue.size)
+    expect(priorityQueue.nodeArray).toStrictEqual([
+      { data: 1, priority: 0 },
+      { data: 2, priority: 0 }
+    ])
+    rtSize = priorityQueue.enqueue(3)
+    expect(priorityQueue.size).toBe(3)
+    expect(priorityQueue.maxSize).toBe(3)
+    expect(rtSize).toBe(priorityQueue.size)
+    expect(priorityQueue.nodeArray).toStrictEqual([
+      { data: 1, priority: 0 },
+      { data: 2, priority: 0 },
+      { data: 3, priority: 0 }
+    ])
+    rtSize = priorityQueue.enqueue(3, -1)
+    expect(priorityQueue.size).toBe(4)
+    expect(priorityQueue.maxSize).toBe(4)
+    expect(rtSize).toBe(priorityQueue.size)
+    expect(priorityQueue.nodeArray).toStrictEqual([
+      { data: 1, priority: 0 },
+      { data: 2, priority: 0 },
+      { data: 3, priority: -1 },
+      { data: 3, priority: 0 }
+    ])
+    rtSize = priorityQueue.enqueue(1, 1)
+    expect(priorityQueue.size).toBe(5)
+    expect(priorityQueue.maxSize).toBe(5)
+    expect(rtSize).toBe(priorityQueue.size)
+    expect(priorityQueue.nodeArray).toStrictEqual([
+      { data: 1, priority: 0 },
+      { data: 2, priority: 0 },
+      { data: 3, priority: -1 },
+      { data: 3, priority: 0 },
+      { data: 1, priority: 1 }
+    ])
+    rtSize = priorityQueue.enqueue(2, -2)
+    expect(priorityQueue.size).toBe(6)
+    expect(priorityQueue.maxSize).toBe(6)
+    expect(rtSize).toBe(priorityQueue.size)
+    expect(priorityQueue.nodeArray).toStrictEqual([
+      { data: 1, priority: 0 },
+      { data: 2, priority: 0 },
+      { data: 3, priority: -1 },
+      { data: 3, priority: 0 },
+      { data: 2, priority: -2 },
+      { data: 1, priority: 1 }
+    ])
+  })
+
   it('Verify dequeue() behavior', () => {
     const priorityQueue = new PriorityQueue()
     priorityQueue.enqueue(1)