From 95d1a734d57942c892202df7c0fcaf2fb5ab89ab Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Mon, 29 Apr 2024 20:01:23 +0200 Subject: [PATCH] feat: use priority queue for task queueing MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- src/deque.ts | 202 ----------------------------- src/index.ts | 3 +- src/pools/abstract-pool.ts | 43 +++++- src/pools/worker-node.ts | 28 +--- src/pools/worker.ts | 17 +-- src/priority-queue.ts | 55 +++++++- src/utility-types.ts | 12 +- tests/deque.test.mjs | 159 ----------------------- tests/pools/abstract-pool.test.mjs | 6 +- tests/pools/worker-node.test.mjs | 6 +- tests/priority-queue.test.mjs | 15 ++- 11 files changed, 124 insertions(+), 422 deletions(-) delete mode 100644 src/deque.ts delete mode 100644 tests/deque.test.mjs diff --git a/src/deque.ts b/src/deque.ts deleted file mode 100644 index 5fb26770..00000000 --- a/src/deque.ts +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright Jerome Benoit. 2023-2024. All Rights Reserved. - -/** - * Linked list node interface. - * - * @typeParam T - Type of linked list node data. - * @internal - */ -export interface ILinkedListNode { - data: T - next?: ILinkedListNode - prev?: ILinkedListNode -} - -/** - * Deque. - * Implemented with a doubly linked list. - * - * @typeParam T - Type of deque data. - * @internal - */ -export class Deque { - private head?: ILinkedListNode - private tail?: ILinkedListNode - /** The size of the deque. */ - public size!: number - /** The maximum size of the deque. */ - public maxSize!: number - - public constructor () { - this.clear() - } - - /** - * Appends data to the deque. - * - * @param data - Data to append. - * @returns The new size of the deque. - */ - public push (data: T): number { - const node: ILinkedListNode = { data } - if (this.tail == null) { - this.head = this.tail = node - } else { - node.prev = this.tail - this.tail = this.tail.next = node - } - return this.incrementSize() - } - - /** - * Prepends data to the deque. - * - * @param data - Data to prepend. - * @returns The new size of the deque. - */ - public unshift (data: T): number { - const node: ILinkedListNode = { data } - if (this.head == null) { - this.head = this.tail = node - } else { - node.next = this.head - this.head = this.head.prev = node - } - return this.incrementSize() - } - - /** - * Pops data from the deque. - * - * @returns The popped data or `undefined` if the deque is empty. - */ - public pop (): T | undefined { - if (this.head == null) { - return - } - const tail = this.tail - this.tail = this.tail?.prev - if (this.tail == null) { - delete this.head - } else { - delete this.tail.next - } - --this.size - return tail?.data - } - - /** - * Shifts data from the deque. - * - * @returns The shifted data or `undefined` if the deque is empty. - */ - public shift (): T | undefined { - if (this.head == null) { - return - } - const head = this.head - this.head = this.head.next - if (this.head == null) { - delete this.tail - } else { - delete this.head.prev - } - --this.size - return head.data - } - - /** - * Peeks at the first data. - * @returns The first data or `undefined` if the deque is empty. - */ - public peekFirst (): T | undefined { - return this.head?.data - } - - /** - * Peeks at the last data. - * @returns The last data or `undefined` if the deque is empty. - */ - public peekLast (): T | undefined { - return this.tail?.data - } - - /** - * Clears the deque. - */ - public clear (): void { - delete this.head - delete this.tail - this.size = 0 - this.maxSize = 0 - } - - /** - * Returns an iterator for the deque. - * - * @returns An iterator for the deque. - * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols - */ - [Symbol.iterator] (): Iterator { - let node = this.head - return { - next: () => { - if (node == null) { - return { - value: undefined, - done: true - } - } - const ret = { - value: node.data, - done: false - } - node = node.next - return ret - } - } - } - - /** - * Returns an backward iterator for the deque. - * - * @returns An backward iterator for the deque. - * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols - */ - backward (): Iterable { - return { - [Symbol.iterator]: (): Iterator => { - let node = this.tail - return { - next: () => { - if (node == null) { - return { - value: undefined, - done: true - } - } - const ret = { - value: node.data, - done: false - } - node = node.prev - return ret - } - } - } - } - } - - /** - * Increments the size of the deque. - * - * @returns The new size of the deque. - */ - private incrementSize (): number { - ++this.size - if (this.size > this.maxSize) { - this.maxSize = this.size - } - return this.size - } -} diff --git a/src/index.ts b/src/index.ts index 1d778513..a0081a47 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,4 @@ export type { CircularArray } from './circular-array.js' -export type { Deque, ILinkedListNode } from './deque.js' export type { AbstractPool } from './pools/abstract-pool.js' export { DynamicClusterPool } from './pools/cluster/dynamic.js' export type { ClusterPoolOptions } from './pools/cluster/fixed.js' @@ -50,7 +49,7 @@ export type { WorkerUsage } from './pools/worker.js' export { WorkerTypes } from './pools/worker.js' -export type { PriorityQueue } from './priority-queue.js' +export type { PriorityQueue, PriorityQueueNode } from './priority-queue.js' export type { MessageValue, PromiseResponseWrapper, diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 5e88a7cd..52829c28 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -47,6 +47,7 @@ import { import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js' import { checkFilePath, + checkValidPriority, checkValidTasksQueueOptions, checkValidWorkerChoiceStrategy, getDefaultTasksQueueOptions, @@ -855,6 +856,8 @@ export abstract class AbstractPool< if (typeof fn.taskFunction !== 'function') { throw new TypeError('taskFunction property must be a function') } + checkValidPriority(fn.priority) + checkValidWorkerChoiceStrategy(fn.strategy) const opResult = await this.sendTaskFunctionOperationToWorkers({ taskFunctionOperation: 'add', taskFunctionProperties: buildTaskFunctionProperties(name, fn), @@ -919,6 +922,25 @@ export abstract class AbstractPool< } } + /** + * Gets worker node task function priority, if any. + * + * @param workerNodeKey - The worker node key. + * @param name - The task function name. + * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise. + */ + private readonly getWorkerNodeTaskFunctionPriority = ( + workerNodeKey: number, + name?: string + ): number | undefined => { + if (name != null) { + return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find( + (taskFunctionProperties: TaskFunctionProperties) => + taskFunctionProperties.name === name + )?.priority + } + } + /** * Gets the worker choice strategies registered in this pool. * @@ -998,13 +1020,15 @@ export abstract class AbstractPool< return } const timestamp = performance.now() - const workerNodeKey = this.chooseWorkerNode( + const taskFunctionStrategy = this.getTaskFunctionWorkerWorkerChoiceStrategy(name) - ) + const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy) const task: Task = { name: name ?? DEFAULT_TASK_NAME, // eslint-disable-next-line @typescript-eslint/consistent-type-assertions data: data ?? ({} as Data), + priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name), + strategy: taskFunctionStrategy, transferList, timestamp, taskId: randomUUID() @@ -1742,7 +1766,7 @@ export abstract class AbstractPool< ) if (sourceWorkerNode != null) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueTask(1)! this.handleTask(workerNodeKey, task) this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion @@ -1793,7 +1817,7 @@ export abstract class AbstractPool< } workerInfo.stealing = true // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const task = sourceWorkerNode.popTask()! + const task = sourceWorkerNode.dequeueTask(1)! this.handleTask(workerNodeKey, task) // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!) @@ -1946,7 +1970,9 @@ export abstract class AbstractPool< this.opts.tasksQueueOptions?.size ?? getDefaultTasksQueueOptions( this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers - ).size + ).size, + tasksQueueBucketSize: + (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2 } ) // Flag the worker node as ready at pool startup. @@ -2027,8 +2053,11 @@ export abstract class AbstractPool< return tasksQueueSize } - private dequeueTask (workerNodeKey: number): Task | undefined { - return this.workerNodes[workerNodeKey].dequeueTask() + private dequeueTask ( + workerNodeKey: number, + bucket?: number + ): Task | undefined { + return this.workerNodes[workerNodeKey].dequeueTask(bucket) } private tasksQueueSize (workerNodeKey: number): number { diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index e7fbe0e8..64400d9e 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -2,7 +2,6 @@ import { EventEmitter } from 'node:events' import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array.js' -import { Deque } from '../deque.js' import { PriorityQueue } from '../priority-queue.js' import type { Task } from '../utility-types.js' import { DEFAULT_TASK_NAME } from '../utils.js' @@ -45,8 +44,7 @@ export class WorkerNode public messageChannel?: MessageChannel /** @inheritdoc */ public tasksQueueBackPressureSize: number - private readonly tasksQueue: Deque> - private readonly tasksQueue2 = new PriorityQueue>() + private readonly tasksQueue: PriorityQueue> private onBackPressureStarted: boolean private readonly taskFunctionsUsage: Map @@ -71,7 +69,7 @@ export class WorkerNode } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! - this.tasksQueue = new Deque>() + this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) this.onBackPressureStarted = false this.taskFunctionsUsage = new Map() } @@ -83,7 +81,7 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask (task: Task): number { - const tasksQueueSize = this.tasksQueue.push(task) + const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) if (this.hasBackPressure() && !this.onBackPressureStarted) { this.onBackPressureStarted = true this.emit('backPressure', { workerId: this.info.id }) @@ -93,24 +91,8 @@ export class WorkerNode } /** @inheritdoc */ - public unshiftTask (task: Task): number { - const tasksQueueSize = this.tasksQueue.unshift(task) - if (this.hasBackPressure() && !this.onBackPressureStarted) { - this.onBackPressureStarted = true - this.emit('backPressure', { workerId: this.info.id }) - this.onBackPressureStarted = false - } - return tasksQueueSize - } - - /** @inheritdoc */ - public dequeueTask (): Task | undefined { - return this.tasksQueue.shift() - } - - /** @inheritdoc */ - public popTask (): Task | undefined { - return this.tasksQueue.pop() + public dequeueTask (bucket?: number): Task | undefined { + return this.tasksQueue.dequeue(bucket) } /** @inheritdoc */ diff --git a/src/pools/worker.ts b/src/pools/worker.ts index ffe92e0c..35d9f755 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -267,6 +267,7 @@ export interface WorkerNodeOptions { workerOptions?: WorkerOptions env?: Record tasksQueueBackPressureSize: number | undefined + tasksQueueBucketSize: number | undefined } /** @@ -317,25 +318,13 @@ export interface IWorkerNode * @returns The tasks queue size. */ readonly enqueueTask: (task: Task) => number - /** - * Prepends a task to the tasks queue. - * - * @param task - The task to prepend. - * @returns The tasks queue size. - */ - readonly unshiftTask: (task: Task) => number /** * Dequeue task. * + * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0 * @returns The dequeued task. */ - readonly dequeueTask: () => Task | undefined - /** - * Pops a task from the tasks queue. - * - * @returns The popped task. - */ - readonly popTask: () => Task | undefined + readonly dequeueTask: (bucket?: number) => Task | undefined /** * Clears tasks queue. */ diff --git a/src/priority-queue.ts b/src/priority-queue.ts index cf4fb150..3c26ff35 100644 --- a/src/priority-queue.ts +++ b/src/priority-queue.ts @@ -1,13 +1,18 @@ +// Copyright Jerome Benoit. 2024. All Rights Reserved. + /** + * Priority queue node. + * + * @typeParam T - Type of priority queue node data. * @internal */ -interface PriorityQueueNode { +export interface PriorityQueueNode { data: T priority: number } /** - * k-priority queue. + * Priority queue. * * @typeParam T - Type of priority queue data. * @internal @@ -22,7 +27,7 @@ export class PriorityQueue { public maxSize!: number /** - * Constructs a k-priority queue. + * Constructs a priority queue. * * @param k - Prioritized bucket size. */ @@ -67,9 +72,21 @@ export class PriorityQueue { /** * Dequeue data from the priority queue. * + * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0 * @returns The dequeued data or `undefined` if the priority queue is empty. */ - public dequeue (): T | undefined { + public dequeue (bucket = 0): T | undefined { + if (this.k !== Infinity && bucket > 0) { + while (bucket > 0) { + const index = bucket * this.k + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (this.nodeArray[index] != null) { + --this.size + return this.nodeArray.splice(index, 1)[0].data + } + --bucket + } + } if (this.size > 0) { --this.size } @@ -102,9 +119,35 @@ export class PriorityQueue { } /** - * Increments the size of the deque. + * Returns an iterator for the priority queue. + * + * @returns An iterator for the deque. + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols + */ + [Symbol.iterator] (): Iterator { + let i = 0 + return { + next: () => { + if (i >= this.nodeArray.length) { + return { + value: undefined, + done: true + } + } + const value = this.nodeArray[i].data + i++ + return { + value, + done: false + } + } + } + } + + /** + * Increments the size of the priority queue. * - * @returns The new size of the deque. + * @returns The new size of the priority queue. */ private incrementSize (): number { ++this.size diff --git a/src/utility-types.ts b/src/utility-types.ts index 10d9183c..aa4e2198 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -100,6 +100,16 @@ export interface Task { * Task input data that will be passed to the worker. */ readonly data?: Data + /** + * Task priority. Lower values have higher priority. + * + * @defaultValue 0 + */ + readonly priority?: number + /** + * Task worker choice strategy. + */ + readonly strategy?: WorkerChoiceStrategy /** * Array of transferable objects. */ @@ -111,7 +121,7 @@ export interface Task { /** * Task UUID. */ - readonly taskId?: string + readonly taskId?: `${string}-${string}-${string}-${string}-${string}` } /** diff --git a/tests/deque.test.mjs b/tests/deque.test.mjs deleted file mode 100644 index f9a58c85..00000000 --- a/tests/deque.test.mjs +++ /dev/null @@ -1,159 +0,0 @@ -import { expect } from 'expect' - -import { Deque } from '../lib/deque.cjs' - -describe('Deque test suite', () => { - it('Verify push() behavior', () => { - const deque = new Deque() - let rtSize = deque.push(1) - expect(deque.size).toBe(1) - expect(deque.maxSize).toBe(1) - expect(rtSize).toBe(deque.size) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 1 }) - rtSize = deque.push(2) - expect(deque.size).toBe(2) - expect(deque.maxSize).toBe(2) - expect(rtSize).toBe(deque.size) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 2 }) - rtSize = deque.push(3) - expect(deque.size).toBe(3) - expect(deque.maxSize).toBe(3) - expect(rtSize).toBe(deque.size) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 3 }) - }) - - it('Verify unshift() behavior', () => { - const deque = new Deque() - let rtSize = deque.unshift(1) - expect(deque.size).toBe(1) - expect(deque.maxSize).toBe(1) - expect(rtSize).toBe(deque.size) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 1 }) - rtSize = deque.unshift(2) - expect(deque.size).toBe(2) - expect(deque.maxSize).toBe(2) - expect(rtSize).toBe(deque.size) - expect(deque.head).toMatchObject({ data: 2 }) - expect(deque.tail).toMatchObject({ data: 1 }) - rtSize = deque.unshift(3) - expect(deque.size).toBe(3) - expect(deque.maxSize).toBe(3) - expect(rtSize).toBe(deque.size) - expect(deque.head).toMatchObject({ data: 3 }) - expect(deque.tail).toMatchObject({ data: 1 }) - }) - - it('Verify pop() behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - let rtItem = deque.pop() - expect(deque.size).toBe(2) - expect(deque.maxSize).toBe(3) - expect(rtItem).toBe(3) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 2 }) - rtItem = deque.pop() - expect(deque.size).toBe(1) - expect(deque.maxSize).toBe(3) - expect(rtItem).toBe(2) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 1 }) - rtItem = deque.pop() - expect(deque.size).toBe(0) - expect(deque.maxSize).toBe(3) - expect(rtItem).toBe(1) - expect(deque.head).toBeUndefined() - expect(deque.tail).toBeUndefined() - }) - - it('Verify shift() behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - let rtItem = deque.shift() - expect(deque.size).toBe(2) - expect(deque.maxSize).toBe(3) - expect(rtItem).toBe(1) - expect(deque.head).toMatchObject({ data: 2 }) - expect(deque.tail).toMatchObject({ data: 3 }) - rtItem = deque.shift() - expect(deque.size).toBe(1) - expect(deque.maxSize).toBe(3) - expect(rtItem).toBe(2) - expect(deque.head).toMatchObject({ data: 3 }) - expect(deque.tail).toMatchObject({ data: 3 }) - rtItem = deque.shift() - expect(deque.size).toBe(0) - expect(deque.maxSize).toBe(3) - expect(rtItem).toBe(3) - expect(deque.head).toBeUndefined() - expect(deque.tail).toBeUndefined() - }) - - it('Verify peekFirst() behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - expect(deque.size).toBe(3) - expect(deque.peekFirst()).toBe(1) - expect(deque.size).toBe(3) - }) - - it('Verify peekLast() behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - expect(deque.size).toBe(3) - expect(deque.peekLast()).toBe(3) - expect(deque.size).toBe(3) - }) - - it('Verify clear() behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - expect(deque.size).toBe(3) - expect(deque.maxSize).toBe(3) - expect(deque.head).toMatchObject({ data: 1 }) - expect(deque.tail).toMatchObject({ data: 3 }) - deque.clear() - expect(deque.size).toBe(0) - expect(deque.maxSize).toBe(0) - expect(deque.head).toBeUndefined() - expect(deque.tail).toBeUndefined() - }) - - it('Verify iterator behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - let i = 1 - for (const value of deque) { - expect(value).toBe(i) - ++i - } - }) - - it('Verify backward() iterator behavior', () => { - const deque = new Deque() - deque.push(1) - deque.push(2) - deque.push(3) - let i = deque.size - for (const value of deque.backward()) { - expect(value).toBe(i) - --i - } - }) -}) diff --git a/tests/pools/abstract-pool.test.mjs b/tests/pools/abstract-pool.test.mjs index e7979518..7740b079 100644 --- a/tests/pools/abstract-pool.test.mjs +++ b/tests/pools/abstract-pool.test.mjs @@ -9,7 +9,6 @@ import { expect } from 'expect' import { restore, stub } from 'sinon' import { CircularArray } from '../../lib/circular-array.cjs' -import { Deque } from '../../lib/deque.cjs' import { DynamicClusterPool, DynamicThreadPool, @@ -21,6 +20,7 @@ import { WorkerTypes } from '../../lib/index.cjs' import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' import { waitPoolEvents } from '../test-utils.cjs' @@ -786,7 +786,7 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) } @@ -798,7 +798,7 @@ describe('Abstract pool test suite', () => { ) for (const workerNode of pool.workerNodes) { expect(workerNode).toBeInstanceOf(WorkerNode) - expect(workerNode.tasksQueue).toBeInstanceOf(Deque) + expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(workerNode.tasksQueue.size).toBe(0) expect(workerNode.tasksQueue.maxSize).toBe(0) } diff --git a/tests/pools/worker-node.test.mjs b/tests/pools/worker-node.test.mjs index 6aa889ba..b77af2fd 100644 --- a/tests/pools/worker-node.test.mjs +++ b/tests/pools/worker-node.test.mjs @@ -4,9 +4,9 @@ import { MessageChannel, Worker as ThreadWorker } from 'node:worker_threads' import { expect } from 'expect' import { CircularArray } from '../../lib/circular-array.cjs' -import { Deque } from '../../lib/deque.cjs' import { WorkerTypes } from '../../lib/index.cjs' import { WorkerNode } from '../../lib/pools/worker-node.cjs' +import { PriorityQueue } from '../../lib/priority-queue.cjs' import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs' describe('Worker node test suite', () => { @@ -156,7 +156,7 @@ describe('Worker node test suite', () => { }) expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel) expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12) - expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(threadWorkerNode.tasksQueue.size).toBe(0) expect(threadWorkerNode.tasksQueueSize()).toBe( threadWorkerNode.tasksQueue.size @@ -200,7 +200,7 @@ describe('Worker node test suite', () => { }) expect(clusterWorkerNode.messageChannel).toBeUndefined() expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12) - expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque) + expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue) expect(clusterWorkerNode.tasksQueue.size).toBe(0) expect(clusterWorkerNode.tasksQueueSize()).toBe( clusterWorkerNode.tasksQueue.size diff --git a/tests/priority-queue.test.mjs b/tests/priority-queue.test.mjs index 6b6c5f40..5014b3a4 100644 --- a/tests/priority-queue.test.mjs +++ b/tests/priority-queue.test.mjs @@ -1,9 +1,8 @@ import { expect } from 'expect' -// eslint-disable-next-line n/no-missing-import, import/no-unresolved import { PriorityQueue } from '../lib/priority-queue.cjs' -describe.skip('Priority queue test suite', () => { +describe('Priority queue test suite', () => { it('Verify constructor() behavior', () => { expect(() => new PriorityQueue('')).toThrow( new TypeError('k must be an integer') @@ -177,6 +176,18 @@ describe.skip('Priority queue test suite', () => { expect(priorityQueue.size).toBe(3) }) + it('Verify iterator behavior', () => { + const priorityQueue = new PriorityQueue() + priorityQueue.enqueue(1) + priorityQueue.enqueue(2) + priorityQueue.enqueue(3) + let i = 1 + for (const value of priorityQueue) { + expect(value).toBe(i) + ++i + } + }) + it('Verify clear() behavior', () => { const priorityQueue = new PriorityQueue() priorityQueue.enqueue(1) -- 2.34.1