feat: use priority queue for task queueing
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 29 Apr 2024 18:01:23 +0000 (20:01 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Mon, 29 Apr 2024 18:01:23 +0000 (20:01 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
src/deque.ts [deleted file]
src/index.ts
src/pools/abstract-pool.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/priority-queue.ts
src/utility-types.ts
tests/deque.test.mjs [deleted file]
tests/pools/abstract-pool.test.mjs
tests/pools/worker-node.test.mjs
tests/priority-queue.test.mjs

diff --git a/src/deque.ts b/src/deque.ts
deleted file mode 100644 (file)
index 5fb2677..0000000
+++ /dev/null
@@ -1,202 +0,0 @@
-// Copyright Jerome Benoit. 2023-2024. All Rights Reserved.
-
-/**
- * Linked list node interface.
- *
- * @typeParam T - Type of linked list node data.
- * @internal
- */
-export interface ILinkedListNode<T> {
-  data: T
-  next?: ILinkedListNode<T>
-  prev?: ILinkedListNode<T>
-}
-
-/**
- * Deque.
- * Implemented with a doubly linked list.
- *
- * @typeParam T - Type of deque data.
- * @internal
- */
-export class Deque<T> {
-  private head?: ILinkedListNode<T>
-  private tail?: ILinkedListNode<T>
-  /** The size of the deque. */
-  public size!: number
-  /** The maximum size of the deque. */
-  public maxSize!: number
-
-  public constructor () {
-    this.clear()
-  }
-
-  /**
-   * Appends data to the deque.
-   *
-   * @param data - Data to append.
-   * @returns The new size of the deque.
-   */
-  public push (data: T): number {
-    const node: ILinkedListNode<T> = { data }
-    if (this.tail == null) {
-      this.head = this.tail = node
-    } else {
-      node.prev = this.tail
-      this.tail = this.tail.next = node
-    }
-    return this.incrementSize()
-  }
-
-  /**
-   * Prepends data to the deque.
-   *
-   * @param data - Data to prepend.
-   * @returns The new size of the deque.
-   */
-  public unshift (data: T): number {
-    const node: ILinkedListNode<T> = { data }
-    if (this.head == null) {
-      this.head = this.tail = node
-    } else {
-      node.next = this.head
-      this.head = this.head.prev = node
-    }
-    return this.incrementSize()
-  }
-
-  /**
-   * Pops data from the deque.
-   *
-   * @returns The popped data or `undefined` if the deque is empty.
-   */
-  public pop (): T | undefined {
-    if (this.head == null) {
-      return
-    }
-    const tail = this.tail
-    this.tail = this.tail?.prev
-    if (this.tail == null) {
-      delete this.head
-    } else {
-      delete this.tail.next
-    }
-    --this.size
-    return tail?.data
-  }
-
-  /**
-   * Shifts data from the deque.
-   *
-   * @returns The shifted data or `undefined` if the deque is empty.
-   */
-  public shift (): T | undefined {
-    if (this.head == null) {
-      return
-    }
-    const head = this.head
-    this.head = this.head.next
-    if (this.head == null) {
-      delete this.tail
-    } else {
-      delete this.head.prev
-    }
-    --this.size
-    return head.data
-  }
-
-  /**
-   * Peeks at the first data.
-   * @returns The first data or `undefined` if the deque is empty.
-   */
-  public peekFirst (): T | undefined {
-    return this.head?.data
-  }
-
-  /**
-   * Peeks at the last data.
-   * @returns The last data or `undefined` if the deque is empty.
-   */
-  public peekLast (): T | undefined {
-    return this.tail?.data
-  }
-
-  /**
-   * Clears the deque.
-   */
-  public clear (): void {
-    delete this.head
-    delete this.tail
-    this.size = 0
-    this.maxSize = 0
-  }
-
-  /**
-   * Returns an iterator for the deque.
-   *
-   * @returns An iterator for the deque.
-   * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
-   */
-  [Symbol.iterator] (): Iterator<T> {
-    let node = this.head
-    return {
-      next: () => {
-        if (node == null) {
-          return {
-            value: undefined,
-            done: true
-          }
-        }
-        const ret = {
-          value: node.data,
-          done: false
-        }
-        node = node.next
-        return ret
-      }
-    }
-  }
-
-  /**
-   * Returns an backward iterator for the deque.
-   *
-   * @returns An backward iterator for the deque.
-   * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
-   */
-  backward (): Iterable<T> {
-    return {
-      [Symbol.iterator]: (): Iterator<T> => {
-        let node = this.tail
-        return {
-          next: () => {
-            if (node == null) {
-              return {
-                value: undefined,
-                done: true
-              }
-            }
-            const ret = {
-              value: node.data,
-              done: false
-            }
-            node = node.prev
-            return ret
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Increments the size of the deque.
-   *
-   * @returns The new size of the deque.
-   */
-  private incrementSize (): number {
-    ++this.size
-    if (this.size > this.maxSize) {
-      this.maxSize = this.size
-    }
-    return this.size
-  }
-}
index 1d778513d8030595decb29b6aae67bbcddf5122d..a0081a4784a9330683b68c09fd1766ef5034cd51 100644 (file)
@@ -1,5 +1,4 @@
 export type { CircularArray } from './circular-array.js'
-export type { Deque, ILinkedListNode } from './deque.js'
 export type { AbstractPool } from './pools/abstract-pool.js'
 export { DynamicClusterPool } from './pools/cluster/dynamic.js'
 export type { ClusterPoolOptions } from './pools/cluster/fixed.js'
@@ -50,7 +49,7 @@ export type {
   WorkerUsage
 } from './pools/worker.js'
 export { WorkerTypes } from './pools/worker.js'
-export type { PriorityQueue } from './priority-queue.js'
+export type { PriorityQueue, PriorityQueueNode } from './priority-queue.js'
 export type {
   MessageValue,
   PromiseResponseWrapper,
index 5e88a7cd3392e98ae9e4757a4d4a00c174b4d471..52829c28163637f97947343b88aab983ab1a101f 100644 (file)
@@ -47,6 +47,7 @@ import {
 import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
 import {
   checkFilePath,
+  checkValidPriority,
   checkValidTasksQueueOptions,
   checkValidWorkerChoiceStrategy,
   getDefaultTasksQueueOptions,
@@ -855,6 +856,8 @@ export abstract class AbstractPool<
     if (typeof fn.taskFunction !== 'function') {
       throw new TypeError('taskFunction property must be a function')
     }
+    checkValidPriority(fn.priority)
+    checkValidWorkerChoiceStrategy(fn.strategy)
     const opResult = await this.sendTaskFunctionOperationToWorkers({
       taskFunctionOperation: 'add',
       taskFunctionProperties: buildTaskFunctionProperties(name, fn),
@@ -919,6 +922,25 @@ export abstract class AbstractPool<
     }
   }
 
+  /**
+   * Gets worker node task function priority, if any.
+   *
+   * @param workerNodeKey - The worker node key.
+   * @param name - The task function name.
+   * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+   */
+  private readonly getWorkerNodeTaskFunctionPriority = (
+    workerNodeKey: number,
+    name?: string
+  ): number | undefined => {
+    if (name != null) {
+      return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+        (taskFunctionProperties: TaskFunctionProperties) =>
+          taskFunctionProperties.name === name
+      )?.priority
+    }
+  }
+
   /**
    * Gets the worker choice strategies registered in this pool.
    *
@@ -998,13 +1020,15 @@ export abstract class AbstractPool<
         return
       }
       const timestamp = performance.now()
-      const workerNodeKey = this.chooseWorkerNode(
+      const taskFunctionStrategy =
         this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
-      )
+      const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
       const task: Task<Data> = {
         name: name ?? DEFAULT_TASK_NAME,
         // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
         data: data ?? ({} as Data),
+        priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+        strategy: taskFunctionStrategy,
         transferList,
         timestamp,
         taskId: randomUUID()
@@ -1742,7 +1766,7 @@ export abstract class AbstractPool<
     )
     if (sourceWorkerNode != null) {
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-      const task = sourceWorkerNode.popTask()!
+      const task = sourceWorkerNode.dequeueTask(1)!
       this.handleTask(workerNodeKey, task)
       this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
       // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -1793,7 +1817,7 @@ export abstract class AbstractPool<
         }
         workerInfo.stealing = true
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
-        const task = sourceWorkerNode.popTask()!
+        const task = sourceWorkerNode.dequeueTask(1)!
         this.handleTask(workerNodeKey, task)
         // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
         this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
@@ -1946,7 +1970,9 @@ export abstract class AbstractPool<
           this.opts.tasksQueueOptions?.size ??
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
-          ).size
+          ).size,
+        tasksQueueBucketSize:
+          (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
       }
     )
     // Flag the worker node as ready at pool startup.
@@ -2027,8 +2053,11 @@ export abstract class AbstractPool<
     return tasksQueueSize
   }
 
-  private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
-    return this.workerNodes[workerNodeKey].dequeueTask()
+  private dequeueTask (
+    workerNodeKey: number,
+    bucket?: number
+  ): Task<Data> | undefined {
+    return this.workerNodes[workerNodeKey].dequeueTask(bucket)
   }
 
   private tasksQueueSize (workerNodeKey: number): number {
index e7fbe0e83cc9d74fae0ccfd8338870fb8e0c2907..64400d9eb1426be9ffe546b45855b606bfec05c0 100644 (file)
@@ -2,7 +2,6 @@ import { EventEmitter } from 'node:events'
 import { MessageChannel } from 'node:worker_threads'
 
 import { CircularArray } from '../circular-array.js'
-import { Deque } from '../deque.js'
 import { PriorityQueue } from '../priority-queue.js'
 import type { Task } from '../utility-types.js'
 import { DEFAULT_TASK_NAME } from '../utils.js'
@@ -45,8 +44,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   public messageChannel?: MessageChannel
   /** @inheritdoc */
   public tasksQueueBackPressureSize: number
-  private readonly tasksQueue: Deque<Task<Data>>
-  private readonly tasksQueue2 = new PriorityQueue<Task<Data>>()
+  private readonly tasksQueue: PriorityQueue<Task<Data>>
   private onBackPressureStarted: boolean
   private readonly taskFunctionsUsage: Map<string, WorkerUsage>
 
@@ -71,7 +69,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
-    this.tasksQueue = new Deque<Task<Data>>()
+    this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
     this.onBackPressureStarted = false
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
@@ -83,7 +81,7 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
 
   /** @inheritdoc */
   public enqueueTask (task: Task<Data>): number {
-    const tasksQueueSize = this.tasksQueue.push(task)
+    const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
     if (this.hasBackPressure() && !this.onBackPressureStarted) {
       this.onBackPressureStarted = true
       this.emit('backPressure', { workerId: this.info.id })
@@ -93,24 +91,8 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
   }
 
   /** @inheritdoc */
-  public unshiftTask (task: Task<Data>): number {
-    const tasksQueueSize = this.tasksQueue.unshift(task)
-    if (this.hasBackPressure() && !this.onBackPressureStarted) {
-      this.onBackPressureStarted = true
-      this.emit('backPressure', { workerId: this.info.id })
-      this.onBackPressureStarted = false
-    }
-    return tasksQueueSize
-  }
-
-  /** @inheritdoc */
-  public dequeueTask (): Task<Data> | undefined {
-    return this.tasksQueue.shift()
-  }
-
-  /** @inheritdoc */
-  public popTask (): Task<Data> | undefined {
-    return this.tasksQueue.pop()
+  public dequeueTask (bucket?: number): Task<Data> | undefined {
+    return this.tasksQueue.dequeue(bucket)
   }
 
   /** @inheritdoc */
index ffe92e0cbaac75b7b041e5cf1c632f826450aed1..35d9f755bc3f9d4d321c8ac7b999261a3e1d613a 100644 (file)
@@ -267,6 +267,7 @@ export interface WorkerNodeOptions {
   workerOptions?: WorkerOptions
   env?: Record<string, unknown>
   tasksQueueBackPressureSize: number | undefined
+  tasksQueueBucketSize: number | undefined
 }
 
 /**
@@ -317,25 +318,13 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * @returns The tasks queue size.
    */
   readonly enqueueTask: (task: Task<Data>) => number
-  /**
-   * Prepends a task to the tasks queue.
-   *
-   * @param task - The task to prepend.
-   * @returns The tasks queue size.
-   */
-  readonly unshiftTask: (task: Task<Data>) => number
   /**
    * Dequeue task.
    *
+   * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
    * @returns The dequeued task.
    */
-  readonly dequeueTask: () => Task<Data> | undefined
-  /**
-   * Pops a task from the tasks queue.
-   *
-   * @returns The popped task.
-   */
-  readonly popTask: () => Task<Data> | undefined
+  readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
   /**
    * Clears tasks queue.
    */
index cf4fb150e1e1caba20ff9e9d579128c110c40e73..3c26ff35208b6f0ef599158ff6d446f47a1e8123 100644 (file)
@@ -1,13 +1,18 @@
+// Copyright Jerome Benoit. 2024. All Rights Reserved.
+
 /**
+ * Priority queue node.
+ *
+ * @typeParam T - Type of priority queue node data.
  * @internal
  */
-interface PriorityQueueNode<T> {
+export interface PriorityQueueNode<T> {
   data: T
   priority: number
 }
 
 /**
- * k-priority queue.
+ * Priority queue.
  *
  * @typeParam T - Type of priority queue data.
  * @internal
@@ -22,7 +27,7 @@ export class PriorityQueue<T> {
   public maxSize!: number
 
   /**
-   * Constructs a k-priority queue.
+   * Constructs a priority queue.
    *
    * @param k - Prioritized bucket size.
    */
@@ -67,9 +72,21 @@ export class PriorityQueue<T> {
   /**
    * Dequeue data from the priority queue.
    *
+   * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
    * @returns The dequeued data or `undefined` if the priority queue is empty.
    */
-  public dequeue (): T | undefined {
+  public dequeue (bucket = 0): T | undefined {
+    if (this.k !== Infinity && bucket > 0) {
+      while (bucket > 0) {
+        const index = bucket * this.k
+        // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+        if (this.nodeArray[index] != null) {
+          --this.size
+          return this.nodeArray.splice(index, 1)[0].data
+        }
+        --bucket
+      }
+    }
     if (this.size > 0) {
       --this.size
     }
@@ -102,9 +119,35 @@ export class PriorityQueue<T> {
   }
 
   /**
-   * Increments the size of the deque.
+   * Returns an iterator for the priority queue.
+   *
+   * @returns An iterator for the deque.
+   * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
+   */
+  [Symbol.iterator] (): Iterator<T> {
+    let i = 0
+    return {
+      next: () => {
+        if (i >= this.nodeArray.length) {
+          return {
+            value: undefined,
+            done: true
+          }
+        }
+        const value = this.nodeArray[i].data
+        i++
+        return {
+          value,
+          done: false
+        }
+      }
+    }
+  }
+
+  /**
+   * Increments the size of the priority queue.
    *
-   * @returns The new size of the deque.
+   * @returns The new size of the priority queue.
    */
   private incrementSize (): number {
     ++this.size
index 10d9183ca6f262cd25d475737d0e86613a55d247..aa4e219853caba926bc74867a2089827f6f69765 100644 (file)
@@ -100,6 +100,16 @@ export interface Task<Data = unknown> {
    * Task input data that will be passed to the worker.
    */
   readonly data?: Data
+  /**
+   * Task priority. Lower values have higher priority.
+   *
+   * @defaultValue 0
+   */
+  readonly priority?: number
+  /**
+   * Task worker choice strategy.
+   */
+  readonly strategy?: WorkerChoiceStrategy
   /**
    * Array of transferable objects.
    */
@@ -111,7 +121,7 @@ export interface Task<Data = unknown> {
   /**
    * Task UUID.
    */
-  readonly taskId?: string
+  readonly taskId?: `${string}-${string}-${string}-${string}-${string}`
 }
 
 /**
diff --git a/tests/deque.test.mjs b/tests/deque.test.mjs
deleted file mode 100644 (file)
index f9a58c8..0000000
+++ /dev/null
@@ -1,159 +0,0 @@
-import { expect } from 'expect'
-
-import { Deque } from '../lib/deque.cjs'
-
-describe('Deque test suite', () => {
-  it('Verify push() behavior', () => {
-    const deque = new Deque()
-    let rtSize = deque.push(1)
-    expect(deque.size).toBe(1)
-    expect(deque.maxSize).toBe(1)
-    expect(rtSize).toBe(deque.size)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 1 })
-    rtSize = deque.push(2)
-    expect(deque.size).toBe(2)
-    expect(deque.maxSize).toBe(2)
-    expect(rtSize).toBe(deque.size)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 2 })
-    rtSize = deque.push(3)
-    expect(deque.size).toBe(3)
-    expect(deque.maxSize).toBe(3)
-    expect(rtSize).toBe(deque.size)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 3 })
-  })
-
-  it('Verify unshift() behavior', () => {
-    const deque = new Deque()
-    let rtSize = deque.unshift(1)
-    expect(deque.size).toBe(1)
-    expect(deque.maxSize).toBe(1)
-    expect(rtSize).toBe(deque.size)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 1 })
-    rtSize = deque.unshift(2)
-    expect(deque.size).toBe(2)
-    expect(deque.maxSize).toBe(2)
-    expect(rtSize).toBe(deque.size)
-    expect(deque.head).toMatchObject({ data: 2 })
-    expect(deque.tail).toMatchObject({ data: 1 })
-    rtSize = deque.unshift(3)
-    expect(deque.size).toBe(3)
-    expect(deque.maxSize).toBe(3)
-    expect(rtSize).toBe(deque.size)
-    expect(deque.head).toMatchObject({ data: 3 })
-    expect(deque.tail).toMatchObject({ data: 1 })
-  })
-
-  it('Verify pop() behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    let rtItem = deque.pop()
-    expect(deque.size).toBe(2)
-    expect(deque.maxSize).toBe(3)
-    expect(rtItem).toBe(3)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 2 })
-    rtItem = deque.pop()
-    expect(deque.size).toBe(1)
-    expect(deque.maxSize).toBe(3)
-    expect(rtItem).toBe(2)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 1 })
-    rtItem = deque.pop()
-    expect(deque.size).toBe(0)
-    expect(deque.maxSize).toBe(3)
-    expect(rtItem).toBe(1)
-    expect(deque.head).toBeUndefined()
-    expect(deque.tail).toBeUndefined()
-  })
-
-  it('Verify shift() behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    let rtItem = deque.shift()
-    expect(deque.size).toBe(2)
-    expect(deque.maxSize).toBe(3)
-    expect(rtItem).toBe(1)
-    expect(deque.head).toMatchObject({ data: 2 })
-    expect(deque.tail).toMatchObject({ data: 3 })
-    rtItem = deque.shift()
-    expect(deque.size).toBe(1)
-    expect(deque.maxSize).toBe(3)
-    expect(rtItem).toBe(2)
-    expect(deque.head).toMatchObject({ data: 3 })
-    expect(deque.tail).toMatchObject({ data: 3 })
-    rtItem = deque.shift()
-    expect(deque.size).toBe(0)
-    expect(deque.maxSize).toBe(3)
-    expect(rtItem).toBe(3)
-    expect(deque.head).toBeUndefined()
-    expect(deque.tail).toBeUndefined()
-  })
-
-  it('Verify peekFirst() behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    expect(deque.size).toBe(3)
-    expect(deque.peekFirst()).toBe(1)
-    expect(deque.size).toBe(3)
-  })
-
-  it('Verify peekLast() behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    expect(deque.size).toBe(3)
-    expect(deque.peekLast()).toBe(3)
-    expect(deque.size).toBe(3)
-  })
-
-  it('Verify clear() behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    expect(deque.size).toBe(3)
-    expect(deque.maxSize).toBe(3)
-    expect(deque.head).toMatchObject({ data: 1 })
-    expect(deque.tail).toMatchObject({ data: 3 })
-    deque.clear()
-    expect(deque.size).toBe(0)
-    expect(deque.maxSize).toBe(0)
-    expect(deque.head).toBeUndefined()
-    expect(deque.tail).toBeUndefined()
-  })
-
-  it('Verify iterator behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    let i = 1
-    for (const value of deque) {
-      expect(value).toBe(i)
-      ++i
-    }
-  })
-
-  it('Verify backward() iterator behavior', () => {
-    const deque = new Deque()
-    deque.push(1)
-    deque.push(2)
-    deque.push(3)
-    let i = deque.size
-    for (const value of deque.backward()) {
-      expect(value).toBe(i)
-      --i
-    }
-  })
-})
index e79795188b272fe1842f539673f3f23841dff90f..7740b079cc5b68e7117eb12af70201f6750f6f62 100644 (file)
@@ -9,7 +9,6 @@ import { expect } from 'expect'
 import { restore, stub } from 'sinon'
 
 import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
 import {
   DynamicClusterPool,
   DynamicThreadPool,
@@ -21,6 +20,7 @@ import {
   WorkerTypes
 } from '../../lib/index.cjs'
 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
 import { waitPoolEvents } from '../test-utils.cjs'
 
@@ -786,7 +786,7 @@ describe('Abstract pool test suite', () => {
     )
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
-      expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+      expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
     }
@@ -798,7 +798,7 @@ describe('Abstract pool test suite', () => {
     )
     for (const workerNode of pool.workerNodes) {
       expect(workerNode).toBeInstanceOf(WorkerNode)
-      expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+      expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
     }
index 6aa889baa9bc4b1edb48d0dbcbd6d79592398f4b..b77af2fda507fb5401758e479094495dddd805fb 100644 (file)
@@ -4,9 +4,9 @@ import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads'
 import { expect } from 'expect'
 
 import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
 import { WorkerTypes } from '../../lib/index.cjs'
 import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
 import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
 
 describe('Worker node test suite', () => {
@@ -156,7 +156,7 @@ describe('Worker node test suite', () => {
     })
     expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
     expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
-    expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+    expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
     expect(threadWorkerNode.tasksQueue.size).toBe(0)
     expect(threadWorkerNode.tasksQueueSize()).toBe(
       threadWorkerNode.tasksQueue.size
@@ -200,7 +200,7 @@ describe('Worker node test suite', () => {
     })
     expect(clusterWorkerNode.messageChannel).toBeUndefined()
     expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
-    expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+    expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
     expect(clusterWorkerNode.tasksQueue.size).toBe(0)
     expect(clusterWorkerNode.tasksQueueSize()).toBe(
       clusterWorkerNode.tasksQueue.size
index 6b6c5f409b7d8d85319bcc7647fe4193f7a80045..5014b3a4af02f4e964b7ec38815506ee21beeb09 100644 (file)
@@ -1,9 +1,8 @@
 import { expect } from 'expect'
 
-// eslint-disable-next-line n/no-missing-import, import/no-unresolved
 import { PriorityQueue } from '../lib/priority-queue.cjs'
 
-describe.skip('Priority queue test suite', () => {
+describe('Priority queue test suite', () => {
   it('Verify constructor() behavior', () => {
     expect(() => new PriorityQueue('')).toThrow(
       new TypeError('k must be an integer')
@@ -177,6 +176,18 @@ describe.skip('Priority queue test suite', () => {
     expect(priorityQueue.size).toBe(3)
   })
 
+  it('Verify iterator behavior', () => {
+    const priorityQueue = new PriorityQueue()
+    priorityQueue.enqueue(1)
+    priorityQueue.enqueue(2)
+    priorityQueue.enqueue(3)
+    let i = 1
+    for (const value of priorityQueue) {
+      expect(value).toBe(i)
+      ++i
+    }
+  })
+
   it('Verify clear() behavior', () => {
     const priorityQueue = new PriorityQueue()
     priorityQueue.enqueue(1)