perf: enable prioritized tasks queue only when necessary
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 29 May 2024 16:00:47 +0000 (18:00 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Wed, 29 May 2024 16:00:47 +0000 (18:00 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
14 files changed:
CHANGELOG.md
src/circular-buffer.ts
src/fixed-priority-queue.ts
src/pools/abstract-pool.ts
src/pools/utils.ts
src/pools/worker-node.ts
src/pools/worker.ts
src/priority-queue.ts
tests/fixed-priority-queue.test.mjs
tests/pools/abstract-pool.test.mjs
tests/pools/worker-node.test.mjs
tests/priority-queue.test.mjs
tests/worker-files/cluster/testTaskFunctionObjectsWorker.cjs
tests/worker-files/thread/testTaskFunctionObjectsWorker.mjs

index f7bcd6171809f2d32dbb2d695d5dda887b3a08cb..410f4f178c3ee263a1195ef4b2db24e3ca5bbc0c 100644 (file)
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 ## [Unreleased]
 
+### Changed
+
+- Optimize tasks queue implementation.
+- Enable prioritized tasks queueing only when necessary.
+
 ## [4.0.12] - 2024-05-25
 
 ### Changed
index 7abb982ea6adb3ebcd2881efbb9d1b7ef8c66682..7d12069fe369b48c72bf592d9b46a8eae0294031 100644 (file)
@@ -11,7 +11,7 @@ export const defaultBufferSize = 2048
 export class CircularBuffer {
   private readIdx: number
   private writeIdx: number
-  private items: Float32Array
+  private readonly items: Float32Array
   private readonly maxArrayIdx: number
   public size: number
 
@@ -84,6 +84,11 @@ export class CircularBuffer {
     return Array.from(this.items.filter(item => item !== -1))
   }
 
+  /**
+   * Checks the buffer size.
+   *
+   * @param size - Buffer size.
+   */
   private checkSize (size: number): void {
     if (!Number.isSafeInteger(size)) {
       throw new TypeError(
index a248aa7e2b3e90541fa8c0f5f03abc00c2ecaa18..792f23d7e1347558cfdb55d30bc1436ad848ecdf 100644 (file)
@@ -25,18 +25,22 @@ export class FixedPriorityQueue<T> {
   private readonly nodeArray: Array<FixedPriorityQueueNode<T>>
   /** The fixed priority queue capacity. */
   public readonly capacity: number
-  /** The fixed priority queue size */
+  /** The fixed priority queue size. */
   public size!: number
+  /** Whether to enable priority. */
+  public enablePriority: boolean
 
   /**
    * Constructs a fixed priority queue.
    *
    * @param size - Fixed priority queue size. @defaultValue defaultQueueSize
+   * @param enablePriority - Whether to enable priority. @defaultValue false
    * @returns FixedPriorityQueue.
    */
-  constructor (size: number = defaultQueueSize) {
+  constructor (size: number = defaultQueueSize, enablePriority = false) {
     this.checkSize(size)
     this.capacity = size
+    this.enablePriority = enablePriority
     this.nodeArray = new Array<FixedPriorityQueueNode<T>>(this.capacity)
     this.clear()
   }
@@ -72,19 +76,21 @@ export class FixedPriorityQueue<T> {
       throw new Error('Priority queue is full')
     }
     priority = priority ?? 0
-    let index = this.start
     let inserted = false
-    for (let i = 0; i < this.size; i++) {
-      if (this.nodeArray[index].priority > priority) {
-        this.nodeArray.splice(index, 0, { data, priority })
-        this.nodeArray.length !== this.capacity &&
-          (this.nodeArray.length = this.capacity)
-        inserted = true
-        break
-      }
-      ++index
-      if (index === this.capacity) {
-        index = 0
+    if (this.enablePriority) {
+      let index = this.start
+      for (let i = 0; i < this.size; i++) {
+        if (this.nodeArray[index].priority > priority) {
+          this.nodeArray.splice(index, 0, { data, priority })
+          this.nodeArray.length !== this.capacity &&
+            (this.nodeArray.length = this.capacity)
+          inserted = true
+          break
+        }
+        ++index
+        if (index === this.capacity) {
+          index = 0
+        }
       }
     }
     if (!inserted) {
@@ -172,9 +178,9 @@ export class FixedPriorityQueue<T> {
   }
 
   /**
-   * Checks the size.
+   * Checks the queue size.
    *
-   * @param size - The size to check.
+   * @param size - Queue size.
    */
   private checkSize (size: number): void {
     if (!Number.isSafeInteger(size)) {
index 2c4fdf5a99f8e3ce2accd39311caa5e8e776a737..8f57a9067e0967bf87dd303efabfe694b00bdbc6 100644 (file)
@@ -2025,6 +2025,12 @@ export abstract class AbstractPool<
     }
   }
 
+  private setTasksQueuePriority (workerNodeKey: number): void {
+    this.workerNodes[workerNodeKey].setTasksQueuePriority(
+      this.getTasksQueuePriority()
+    )
+  }
+
   /**
    * This method is the message listener registered on each worker.
    */
@@ -2043,6 +2049,7 @@ export abstract class AbstractPool<
       if (workerInfo != null) {
         workerInfo.taskFunctionsProperties = taskFunctionsProperties
         this.sendStatisticsMessageToWorker(workerNodeKey)
+        this.setTasksQueuePriority(workerNodeKey)
       }
     } else if (taskId != null) {
       // Task execution response received from worker
@@ -2067,6 +2074,7 @@ export abstract class AbstractPool<
     workerNode.info.ready = ready
     workerNode.info.taskFunctionsProperties = taskFunctionsProperties
     this.sendStatisticsMessageToWorker(workerNodeKey)
+    this.setTasksQueuePriority(workerNodeKey)
     this.checkAndEmitReadyEvent()
   }
 
@@ -2154,6 +2162,12 @@ export abstract class AbstractPool<
     return this.workerNodes[workerNodeKey]?.info
   }
 
+  private getTasksQueuePriority (): boolean {
+    return this.listTaskFunctionsProperties().some(
+      taskFunctionProperties => taskFunctionProperties.priority != null
+    )
+  }
+
   /**
    * Creates a worker node.
    *
@@ -2171,7 +2185,8 @@ export abstract class AbstractPool<
           getDefaultTasksQueueOptions(
             this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
           ).size,
-        tasksQueueBucketSize: defaultBucketSize
+        tasksQueueBucketSize: defaultBucketSize,
+        tasksQueuePriority: this.getTasksQueuePriority()
       }
     )
     // Flag the worker node as ready at pool startup.
index f2ede8b6d2b07d44316cfdaa14abd91cb10204cc..d4649d70a56cc31dbb51ddf62aa3b79eca3ef4ef 100644 (file)
@@ -169,7 +169,7 @@ export const checkWorkerNodeArguments = (
   }
   if (!isPlainObject(opts)) {
     throw new TypeError(
-      'Cannot construct a worker node with invalid options: must be a plain object'
+      'Cannot construct a worker node with invalid worker node options: must be a plain object'
     )
   }
   if (opts.tasksQueueBackPressureSize == null) {
@@ -202,6 +202,16 @@ export const checkWorkerNodeArguments = (
       'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
     )
   }
+  if (opts.tasksQueuePriority == null) {
+    throw new TypeError(
+      'Cannot construct a worker node without a tasks queue priority option'
+    )
+  }
+  if (typeof opts.tasksQueuePriority !== 'boolean') {
+    throw new TypeError(
+      'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+    )
+  }
 }
 
 /**
index 079557010565cc7a7e129d0f314caf4762eb7700..f7a48d3ba43e06cadf9a5da8c855a0b5ce1547bb 100644 (file)
@@ -70,11 +70,19 @@ export class WorkerNode<Worker extends IWorker, Data = unknown>
     }
     // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
     this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
-    this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
+    this.tasksQueue = new PriorityQueue<Task<Data>>(
+      opts.tasksQueueBucketSize,
+      opts.tasksQueuePriority
+    )
     this.setBackPressureFlag = false
     this.taskFunctionsUsage = new Map<string, WorkerUsage>()
   }
 
+  /** @inheritdoc */
+  public setTasksQueuePriority (enablePriority: boolean): void {
+    this.tasksQueue.enablePriority = enablePriority
+  }
+
   /** @inheritdoc */
   public tasksQueueSize (): number {
     return this.tasksQueue.size
index b2e6ba80198480f0edfe72e45c5eee56f80d03f7..c1af1cf79c99d05e7c803e7d072529301b6d678a 100644 (file)
@@ -278,6 +278,7 @@ export interface WorkerNodeOptions {
   env?: Record<string, unknown>
   tasksQueueBackPressureSize: number | undefined
   tasksQueueBucketSize: number | undefined
+  tasksQueuePriority: boolean | undefined
 }
 
 /**
@@ -315,6 +316,12 @@ export interface IWorkerNode<Worker extends IWorker, Data = unknown>
    * This is the number of tasks that can be enqueued before the worker node has back pressure.
    */
   tasksQueueBackPressureSize: number
+  /**
+   * Sets tasks queue priority.
+   *
+   * @param enablePriority - Whether to enable tasks queue priority.
+   */
+  readonly setTasksQueuePriority: (enablePriority: boolean) => void
   /**
    * Tasks queue size.
    *
index 0f75e52b2e7368ca1d7932283972f70f8537a62f..ee3310c3d8f2cb8cf6e4d6287d0a668eb20f4a9c 100644 (file)
@@ -34,9 +34,13 @@ export class PriorityQueue<T> {
    * Constructs a priority queue.
    *
    * @param bucketSize - Prioritized bucket size. @defaultValue defaultBucketSize
+   * @param enablePriority - Whether to enable priority. @defaultValue false
    * @returns PriorityQueue.
    */
-  public constructor (bucketSize: number = defaultBucketSize) {
+  public constructor (
+    bucketSize: number = defaultBucketSize,
+    enablePriority = false
+  ) {
     if (!Number.isSafeInteger(bucketSize)) {
       throw new TypeError(
         `Invalid bucket size: '${bucketSize}' is not an integer`
@@ -46,7 +50,11 @@ export class PriorityQueue<T> {
       throw new RangeError(`Invalid bucket size: ${bucketSize} < 0`)
     }
     this.bucketSize = bucketSize
-    this.clear()
+    this.head = this.tail = new FixedPriorityQueue(
+      this.bucketSize,
+      enablePriority
+    )
+    this.maxSize = 0
   }
 
   /**
@@ -62,6 +70,21 @@ export class PriorityQueue<T> {
     return size
   }
 
+  public get enablePriority (): boolean {
+    return this.head.enablePriority
+  }
+
+  public set enablePriority (enablePriority: boolean) {
+    if (this.head.enablePriority === enablePriority) {
+      return
+    }
+    let node: PriorityQueueNode<T> | undefined = this.tail
+    while (node != null) {
+      node.enablePriority = enablePriority
+      node = node.next
+    }
+  }
+
   /**
    * The number of filled prioritized buckets.
    */
@@ -78,7 +101,10 @@ export class PriorityQueue<T> {
    */
   public enqueue (data: T, priority?: number): number {
     if (this.head.full()) {
-      this.head = this.head.next = new FixedPriorityQueue(this.bucketSize)
+      this.head = this.head.next = new FixedPriorityQueue(
+        this.bucketSize,
+        this.enablePriority
+      )
     }
     this.head.enqueue(data, priority)
     const size = this.size
@@ -136,7 +162,10 @@ export class PriorityQueue<T> {
    * Clears the priority queue.
    */
   public clear (): void {
-    this.head = this.tail = new FixedPriorityQueue(this.bucketSize)
+    this.head = this.tail = new FixedPriorityQueue(
+      this.bucketSize,
+      this.enablePriority
+    )
     this.maxSize = 0
   }
 
index e4f3abd3607ea9485ed7838bc0e85950e72d9cee..99dcb00120a60c4b09a3a9be595919f902686a13 100644 (file)
@@ -18,16 +18,18 @@ describe('Fixed priority queue test suite', () => {
     expect(fixedPriorityQueue.size).toBe(0)
     expect(fixedPriorityQueue.nodeArray).toBeInstanceOf(Array)
     expect(fixedPriorityQueue.capacity).toBe(defaultQueueSize)
-    fixedPriorityQueue = new FixedPriorityQueue(2)
+    expect(fixedPriorityQueue.enablePriority).toBe(false)
+    fixedPriorityQueue = new FixedPriorityQueue(2, true)
     expect(fixedPriorityQueue.start).toBe(0)
     expect(fixedPriorityQueue.size).toBe(0)
     expect(fixedPriorityQueue.nodeArray).toBeInstanceOf(Array)
     expect(fixedPriorityQueue.capacity).toBe(2)
+    expect(fixedPriorityQueue.enablePriority).toBe(true)
   })
 
   it('Verify enqueue() behavior', () => {
     const queueSize = 5
-    const fixedPriorityQueue = new FixedPriorityQueue(queueSize)
+    const fixedPriorityQueue = new FixedPriorityQueue(queueSize, true)
     let rtSize = fixedPriorityQueue.enqueue(1)
     expect(fixedPriorityQueue.start).toBe(0)
     expect(fixedPriorityQueue.size).toBe(1)
@@ -84,7 +86,7 @@ describe('Fixed priority queue test suite', () => {
   })
 
   it('Verify get() behavior', () => {
-    const fixedPriorityQueue = new FixedPriorityQueue()
+    const fixedPriorityQueue = new FixedPriorityQueue(defaultQueueSize, true)
     fixedPriorityQueue.enqueue(1)
     fixedPriorityQueue.enqueue(2, -1)
     fixedPriorityQueue.enqueue(3)
@@ -96,7 +98,7 @@ describe('Fixed priority queue test suite', () => {
 
   it('Verify dequeue() behavior', () => {
     const queueSize = 5
-    const fixedPriorityQueue = new FixedPriorityQueue(queueSize)
+    const fixedPriorityQueue = new FixedPriorityQueue(queueSize, true)
     fixedPriorityQueue.enqueue(1)
     fixedPriorityQueue.enqueue(2, -1)
     fixedPriorityQueue.enqueue(3)
index 62df26e3aab52961562e9187a7aafadc6f8c27be..ec8fe516ffbd278d162306920d182be9e76c8427 100644 (file)
@@ -790,6 +790,7 @@ describe('Abstract pool test suite', () => {
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
       expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+      expect(workerNode.tasksQueue.enablePriority).toBe(false)
     }
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -803,6 +804,7 @@ describe('Abstract pool test suite', () => {
       expect(workerNode.tasksQueue.size).toBe(0)
       expect(workerNode.tasksQueue.maxSize).toBe(0)
       expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+      expect(workerNode.tasksQueue.enablePriority).toBe(false)
     }
     await pool.destroy()
   })
@@ -1662,6 +1664,7 @@ describe('Abstract pool test suite', () => {
       ])
       expect(workerNode.taskFunctionsUsage.size).toBe(3)
       expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.tasksQueue.enablePriority).toBe(false)
       for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
         expect(
           workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
@@ -1727,10 +1730,11 @@ describe('Abstract pool test suite', () => {
         { name: DEFAULT_TASK_NAME },
         { name: 'jsonIntegerSerialization' },
         { name: 'factorial' },
-        { name: 'fibonacci' }
+        { name: 'fibonacci', priority: -5 }
       ])
       expect(workerNode.taskFunctionsUsage.size).toBe(3)
       expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.tasksQueue.enablePriority).toBe(true)
       for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
         expect(
           workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
index 54a69ca458a8566067c2c8af8160e6605214a6cf..e47ba10326d82cce07c5737b2050b0711d21d729 100644 (file)
@@ -13,12 +13,20 @@ describe('Worker node test suite', () => {
   const threadWorkerNode = new WorkerNode(
     WorkerTypes.thread,
     './tests/worker-files/thread/testWorker.mjs',
-    { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
+    {
+      tasksQueueBackPressureSize: 12,
+      tasksQueueBucketSize: 6,
+      tasksQueuePriority: true
+    }
   )
   const clusterWorkerNode = new WorkerNode(
     WorkerTypes.cluster,
     './tests/worker-files/cluster/testWorker.cjs',
-    { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
+    {
+      tasksQueueBackPressureSize: 12,
+      tasksQueueBucketSize: 6,
+      tasksQueuePriority: true
+    }
   )
 
   it('Worker node instantiation', () => {
@@ -29,8 +37,7 @@ describe('Worker node test suite', () => {
       () =>
         new WorkerNode(
           'invalidWorkerType',
-          './tests/worker-files/thread/testWorker.mjs',
-          { tasksQueueBackPressureSize: 12 }
+          './tests/worker-files/thread/testWorker.mjs'
         )
     ).toThrow(
       new TypeError(
@@ -57,7 +64,7 @@ describe('Worker node test suite', () => {
         )
     ).toThrow(
       new TypeError(
-        'Cannot construct a worker node with invalid options: must be a plain object'
+        'Cannot construct a worker node with invalid worker node options: must be a plain object'
       )
     )
     expect(
@@ -185,6 +192,37 @@ describe('Worker node test suite', () => {
         'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
       )
     )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          {
+            tasksQueueBackPressureSize: 12,
+            tasksQueueBucketSize: 6
+          }
+        )
+    ).toThrow(
+      new RangeError(
+        'Cannot construct a worker node without a tasks queue priority option'
+      )
+    )
+    expect(
+      () =>
+        new WorkerNode(
+          WorkerTypes.thread,
+          './tests/worker-files/thread/testWorker.mjs',
+          {
+            tasksQueueBackPressureSize: 12,
+            tasksQueueBucketSize: 6,
+            tasksQueuePriority: 'invalidTasksQueuePriority'
+          }
+        )
+    ).toThrow(
+      new RangeError(
+        'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+      )
+    )
     expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
     expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
     expect(threadWorkerNode.info).toStrictEqual({
@@ -225,6 +263,7 @@ describe('Worker node test suite', () => {
     expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
     expect(threadWorkerNode.tasksQueue.size).toBe(0)
     expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
+    expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
     expect(threadWorkerNode.tasksQueueSize()).toBe(
       threadWorkerNode.tasksQueue.size
     )
@@ -271,6 +310,7 @@ describe('Worker node test suite', () => {
     expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
     expect(clusterWorkerNode.tasksQueue.size).toBe(0)
     expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
+    expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
     expect(clusterWorkerNode.tasksQueueSize()).toBe(
       clusterWorkerNode.tasksQueue.size
     )
index ddc8c09d6bdd0422428fe87317c381dd6b8b1fee..5240efd2e6da1f956a5f1a7149036e29c6fa6be9 100644 (file)
@@ -16,17 +16,19 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
+    expect(priorityQueue.enablePriority).toBe(false)
     expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue)
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.head.capacity).toBe(defaultBucketSize)
     expect(priorityQueue.tail).toBeInstanceOf(FixedPriorityQueue)
     expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
     const bucketSize = 2
-    priorityQueue = new PriorityQueue(bucketSize)
+    priorityQueue = new PriorityQueue(bucketSize, true)
     expect(priorityQueue.bucketSize).toBe(bucketSize)
     expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(0)
     expect(priorityQueue.maxSize).toBe(0)
+    expect(priorityQueue.enablePriority).toBe(true)
     expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue)
     expect(priorityQueue.head.next).toBe(undefined)
     expect(priorityQueue.head.capacity).toBe(bucketSize)
@@ -35,7 +37,7 @@ describe('Priority queue test suite', () => {
   })
 
   it('Verify default bucket size enqueue() behavior', () => {
-    const priorityQueue = new PriorityQueue()
+    const priorityQueue = new PriorityQueue(defaultBucketSize, true)
     let rtSize = priorityQueue.enqueue(1)
     expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
@@ -99,7 +101,7 @@ describe('Priority queue test suite', () => {
   })
 
   it('Verify bucketSize=2 enqueue() behavior', () => {
-    const priorityQueue = new PriorityQueue(2)
+    const priorityQueue = new PriorityQueue(2, true)
     let rtSize = priorityQueue.enqueue(1)
     expect(priorityQueue.buckets).toBe(0)
     expect(priorityQueue.size).toBe(1)
@@ -190,7 +192,7 @@ describe('Priority queue test suite', () => {
   })
 
   it('Verify default bucket size dequeue() behavior', () => {
-    const priorityQueue = new PriorityQueue()
+    const priorityQueue = new PriorityQueue(defaultBucketSize, true)
     priorityQueue.enqueue(1)
     priorityQueue.enqueue(2, -1)
     priorityQueue.enqueue(3)
@@ -223,7 +225,7 @@ describe('Priority queue test suite', () => {
   })
 
   it('Verify bucketSize=2 dequeue() behavior', () => {
-    const priorityQueue = new PriorityQueue(2)
+    const priorityQueue = new PriorityQueue(2, true)
     priorityQueue.enqueue(1)
     priorityQueue.enqueue(2)
     priorityQueue.enqueue(3)
@@ -279,6 +281,30 @@ describe('Priority queue test suite', () => {
     expect(priorityQueue.tail.next).toBe(undefined)
   })
 
+  it('Verify enablePriority setter behavior', () => {
+    const priorityQueue = new PriorityQueue(2)
+    expect(priorityQueue.enablePriority).toBe(false)
+    priorityQueue.enqueue(1)
+    priorityQueue.enqueue(2)
+    priorityQueue.enqueue(3)
+    priorityQueue.enqueue(4)
+    let buckets = 0
+    let node = priorityQueue.tail
+    while (node != null) {
+      expect(node.enablePriority).toBe(false)
+      node = node.next
+      ++buckets
+    }
+    expect(buckets).toBe(2)
+    priorityQueue.enablePriority = true
+    expect(priorityQueue.enablePriority).toBe(true)
+    node = priorityQueue.tail
+    while (node != null) {
+      expect(node.enablePriority).toBe(true)
+      node = node.next
+    }
+  })
+
   it('Verify iterator behavior', () => {
     const priorityQueue = new PriorityQueue(2)
     priorityQueue.enqueue(1)
index cb81d15b9d988c6fcf5b696fe25fa9edbe08ad9f..fcb6f18aa8086e0123862f6fc7795e76af2d21b6 100644 (file)
@@ -1,18 +1,18 @@
 'use strict'
-const { KillBehaviors, ThreadWorker } = require('../../../lib/index.cjs')
+const { KillBehaviors, ClusterWorker } = require('../../../lib/index.cjs')
 const {
   factorial,
   fibonacci,
   jsonIntegerSerialization
 } = require('../../test-utils.cjs')
 
-module.exports = new ThreadWorker(
+module.exports = new ClusterWorker(
   {
     jsonIntegerSerialization: {
       taskFunction: data => jsonIntegerSerialization(data.n)
     },
     factorial: { taskFunction: data => factorial(data.n) },
-    fibonacci: { taskFunction: data => fibonacci(data.n) }
+    fibonacci: { taskFunction: data => fibonacci(data.n), priority: -5 }
   },
   {
     killBehavior: KillBehaviors.HARD,
index 6b5a7eda0f3b6f40f7bfab59947957e0a22eb43f..45f4f9f2eae43037f7516afa1921d4581e9dfe0f 100644 (file)
@@ -11,7 +11,7 @@ export default new ThreadWorker(
       taskFunction: data => jsonIntegerSerialization(data.n)
     },
     factorial: { taskFunction: data => factorial(data.n) },
-    fibonacci: { taskFunction: data => fibonacci(data.n) }
+    fibonacci: { taskFunction: data => fibonacci(data.n), priority: -5 }
   },
   {
     killBehavior: KillBehaviors.HARD,