'cpus',
'ctx',
'deprecations',
+ 'deque',
'dequeue',
'dequeued',
'ecma',
'piscina',
'pnpm',
'poolifier',
- 'poolify',
+ 'prepend',
+ 'prepends',
'readdir',
'readonly',
'req',
"codeql",
"commitlint",
"Dependabot",
+ "deque",
"eventloop",
"Fastify",
"FOSS",
### Added
- Add `queueMaxSize` option to tasks queue options.
+- Add O(1) deque implementation implemented with doubly linked list and use it for tasks queueing.
## [2.6.31] - 2023-08-20
MIT License
-Copyright (c) 2019-2021 Alessandro Pio Ardizio
+Copyright (c) 2019-2023 Alessandro Pio Ardizio
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
--- /dev/null
+// Copyright Jerome Benoit. 2023. All Rights Reserved.
+
+class Node<T> {
+ public value: T
+ public next?: Node<T>
+ public prev?: Node<T>
+
+ public constructor (value: T) {
+ this.value = value
+ }
+}
+
+/**
+ * Deque.
+ * Implemented with a doubly linked list.
+ *
+ * @typeParam T - Type of deque values.
+ */
+export class Deque<T> {
+ private head?: Node<T>
+ private tail?: Node<T>
+ /** The size of the deque. */
+ public size!: number
+ /** The maximum size of the deque. */
+ public maxSize!: number
+
+ public constructor () {
+ this.clear()
+ }
+
+ /**
+ * Appends a value to the deque.
+ *
+ * @param value - Value to append.
+ * @returns The new size of the queue.
+ */
+ public push (value: T): number {
+ const node = new Node(value)
+ if (this.tail == null) {
+ this.head = this.tail = node
+ } else {
+ node.prev = this.tail
+ this.tail = this.tail.next = node
+ }
+ return this.incrementSize()
+ }
+
+ /**
+ * Prepends a value to the deque.
+ *
+ * @param value - Value to prepend.
+ * @returns The new size of the queue.
+ */
+ public unshift (value: T): number {
+ const node = new Node(value)
+ if (this.head == null) {
+ this.head = this.tail = node
+ } else {
+ node.next = this.head
+ this.head = this.head.prev = node
+ }
+ return this.incrementSize()
+ }
+
+ /**
+ * Pops a value from the deque.
+ */
+ public pop (): T | undefined {
+ if (this.head == null) {
+ return undefined
+ }
+ const tail = this.tail
+ this.tail = (this.tail as Node<T>).prev
+ if (this.tail == null) {
+ this.head = undefined
+ } else {
+ this.tail.next = undefined
+ }
+ --this.size
+ return tail?.value
+ }
+
+ /**
+ * Shifts a value from the deque.
+ *
+ * @returns The shifted value or `undefined` if the deque is empty.
+ */
+ public shift (): T | undefined {
+ if (this.head == null) {
+ return undefined
+ }
+ const head = this.head
+ this.head = this.head.next
+ if (this.head == null) {
+ this.tail = undefined
+ } else {
+ this.head.prev = undefined
+ }
+ --this.size
+ return head?.value
+ }
+
+ /**
+ * Peeks at the first value.
+ * @returns The first value or `undefined` if the queue is empty.
+ */
+ public peekFirst (): T | undefined {
+ return this.head?.value
+ }
+
+ /**
+ * Peeks at the last value.
+ * @returns The last value or `undefined` if the queue is empty.
+ */
+ public peekLast (): T | undefined {
+ return this.tail?.value
+ }
+
+ /**
+ * Clears the deque.
+ */
+ public clear (): void {
+ this.head = undefined
+ this.tail = undefined
+ this.size = 0
+ this.maxSize = 0
+ }
+
+ /**
+ * Returns an iterator for the deque.
+ *
+ * @returns An iterator for the deque.
+ * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
+ */
+ [Symbol.iterator] (): Iterator<T> {
+ let node = this.head
+ return {
+ next: () => {
+ if (node == null) {
+ return {
+ value: undefined,
+ done: true
+ }
+ }
+ const ret = {
+ value: node.value,
+ done: false
+ }
+ node = node.next as Node<T>
+ return ret
+ }
+ }
+ }
+
+ private incrementSize (): number {
+ ++this.size
+ if (this.size > this.maxSize) {
+ this.maxSize = this.size
+ }
+ return this.size
+ }
+}
import { MessageChannel } from 'node:worker_threads'
import { CircularArray } from '../circular-array'
-import { Queue } from '../queue'
import type { Task } from '../utility-types'
import { DEFAULT_TASK_NAME } from '../utils'
+import { Deque } from '../deque'
import {
type IWorker,
type IWorkerNode,
/** @inheritdoc */
public tasksQueueBackPressureSize: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
- private readonly tasksQueue: Queue<Task<Data>>
+ private readonly tasksQueue: Deque<Task<Data>>
/**
* Constructs a new worker node.
}
this.usage = this.initWorkerUsage()
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
- this.tasksQueue = new Queue<Task<Data>>()
+ this.tasksQueue = new Deque<Task<Data>>()
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
- return this.tasksQueue.enqueue(task)
+ return this.tasksQueue.push(task)
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
- return this.tasksQueue.dequeue()
+ return this.tasksQueue.shift()
}
/** @inheritdoc */
-// Copyright Jerome Benoit. 2021-2023. All Rights Reserved.
+// Copyright Jerome Benoit. 2022-2023. All Rights Reserved.
/**
- * Queue
+ * Queue.
*
* @typeParam T - Type of queue items.
*/
--- /dev/null
+const { expect } = require('expect')
+const { Deque } = require('../lib/deque')
+
+describe('Deque test suite', () => {
+ it('Verify push() behavior', () => {
+ const deque = new Deque()
+ let rtSize = deque.push(1)
+ expect(deque.size).toBe(1)
+ expect(deque.maxSize).toBe(1)
+ expect(rtSize).toBe(deque.size)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 1 })
+ rtSize = deque.push(2)
+ expect(deque.size).toBe(2)
+ expect(deque.maxSize).toBe(2)
+ expect(rtSize).toBe(deque.size)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 2 })
+ rtSize = deque.push(3)
+ expect(deque.size).toBe(3)
+ expect(deque.maxSize).toBe(3)
+ expect(rtSize).toBe(deque.size)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 3 })
+ })
+
+ it('Verify unshift() behavior', () => {
+ const deque = new Deque()
+ let rtSize = deque.unshift(1)
+ expect(deque.size).toBe(1)
+ expect(deque.maxSize).toBe(1)
+ expect(rtSize).toBe(deque.size)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 1 })
+ rtSize = deque.unshift(2)
+ expect(deque.size).toBe(2)
+ expect(deque.maxSize).toBe(2)
+ expect(rtSize).toBe(deque.size)
+ expect(deque.head).toMatchObject({ value: 2 })
+ expect(deque.tail).toMatchObject({ value: 1 })
+ rtSize = deque.unshift(3)
+ expect(deque.size).toBe(3)
+ expect(deque.maxSize).toBe(3)
+ expect(rtSize).toBe(deque.size)
+ expect(deque.head).toMatchObject({ value: 3 })
+ expect(deque.tail).toMatchObject({ value: 1 })
+ })
+
+ it('Verify pop() behavior', () => {
+ const deque = new Deque()
+ deque.push(1)
+ deque.push(2)
+ deque.push(3)
+ let rtItem = deque.pop()
+ expect(deque.size).toBe(2)
+ expect(deque.maxSize).toBe(3)
+ expect(rtItem).toBe(3)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 2 })
+ rtItem = deque.pop()
+ expect(deque.size).toBe(1)
+ expect(deque.maxSize).toBe(3)
+ expect(rtItem).toBe(2)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 1 })
+ rtItem = deque.pop()
+ expect(deque.size).toBe(0)
+ expect(deque.maxSize).toBe(3)
+ expect(rtItem).toBe(1)
+ expect(deque.head).toBeUndefined()
+ expect(deque.tail).toBeUndefined()
+ })
+
+ it('Verify shift() behavior', () => {
+ const deque = new Deque()
+ deque.push(1)
+ deque.push(2)
+ deque.push(3)
+ let rtItem = deque.shift()
+ expect(deque.size).toBe(2)
+ expect(deque.maxSize).toBe(3)
+ expect(rtItem).toBe(1)
+ expect(deque.head).toMatchObject({ value: 2 })
+ expect(deque.tail).toMatchObject({ value: 3 })
+ rtItem = deque.shift()
+ expect(deque.size).toBe(1)
+ expect(deque.maxSize).toBe(3)
+ expect(rtItem).toBe(2)
+ expect(deque.head).toMatchObject({ value: 3 })
+ expect(deque.tail).toMatchObject({ value: 3 })
+ rtItem = deque.shift()
+ expect(deque.size).toBe(0)
+ expect(deque.maxSize).toBe(3)
+ expect(rtItem).toBe(3)
+ expect(deque.head).toBeUndefined()
+ expect(deque.tail).toBeUndefined()
+ })
+
+ it('Verify clear() behavior', () => {
+ const deque = new Deque()
+ deque.push(1)
+ deque.push(2)
+ deque.push(3)
+ expect(deque.size).toBe(3)
+ expect(deque.maxSize).toBe(3)
+ expect(deque.head).toMatchObject({ value: 1 })
+ expect(deque.tail).toMatchObject({ value: 3 })
+ deque.clear()
+ expect(deque.size).toBe(0)
+ expect(deque.maxSize).toBe(0)
+ expect(deque.head).toBeUndefined()
+ expect(deque.tail).toBeUndefined()
+ })
+
+ it('Verify iterator behavior', () => {
+ const deque = new Deque()
+ deque.push(1)
+ deque.push(2)
+ deque.push(3)
+ let i = 1
+ for (const value of deque) {
+ expect(value).toBe(i)
+ ++i
+ }
+ })
+})
WorkerTypes
} = require('../../../lib')
const { CircularArray } = require('../../../lib/circular-array')
-const { Queue } = require('../../../lib/queue')
+const { Deque } = require('../../../lib/deque')
const { version } = require('../../../package.json')
const { waitPoolEvents } = require('../../test-utils')
)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueue).toBeDefined()
- expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
+ expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueue).toBeDefined()
- expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
+ expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
-const { expect } = require('expect')
-const { Queue } = require('../lib/queue')
+// const { expect } = require('expect')
+// const { Queue } = require('../lib/queue')
-describe('Queue test suite', () => {
- it('Verify enqueue() behavior', () => {
- const queue = new Queue()
- let rtSize = queue.enqueue(1)
- expect(queue.size).toBe(1)
- expect(rtSize).toBe(queue.size)
- expect(queue.offset).toBe(0)
- expect(queue.maxSize).toBe(1)
- expect(queue.items).toStrictEqual([1])
- rtSize = queue.enqueue(2)
- expect(queue.size).toBe(2)
- expect(rtSize).toBe(queue.size)
- expect(queue.offset).toBe(0)
- expect(queue.maxSize).toBe(2)
- expect(queue.items).toStrictEqual([1, 2])
- rtSize = queue.enqueue(3)
- expect(queue.size).toBe(3)
- expect(rtSize).toBe(queue.size)
- expect(queue.offset).toBe(0)
- expect(queue.maxSize).toBe(3)
- expect(queue.items).toStrictEqual([1, 2, 3])
- })
+// describe('Queue test suite', () => {
+// it('Verify enqueue() behavior', () => {
+// const queue = new Queue()
+// let rtSize = queue.enqueue(1)
+// expect(queue.size).toBe(1)
+// expect(rtSize).toBe(queue.size)
+// expect(queue.offset).toBe(0)
+// expect(queue.maxSize).toBe(1)
+// expect(queue.items).toStrictEqual([1])
+// rtSize = queue.enqueue(2)
+// expect(queue.size).toBe(2)
+// expect(rtSize).toBe(queue.size)
+// expect(queue.offset).toBe(0)
+// expect(queue.maxSize).toBe(2)
+// expect(queue.items).toStrictEqual([1, 2])
+// rtSize = queue.enqueue(3)
+// expect(queue.size).toBe(3)
+// expect(rtSize).toBe(queue.size)
+// expect(queue.offset).toBe(0)
+// expect(queue.maxSize).toBe(3)
+// expect(queue.items).toStrictEqual([1, 2, 3])
+// })
- it('Verify dequeue() behavior', () => {
- const queue = new Queue()
- queue.enqueue(1)
- queue.enqueue(2)
- queue.enqueue(3)
- let rtItem = queue.dequeue()
- expect(queue.size).toBe(2)
- expect(rtItem).toBe(1)
- expect(queue.offset).toBe(1)
- expect(queue.maxSize).toBe(3)
- expect(queue.items).toStrictEqual([1, 2, 3])
- rtItem = queue.dequeue()
- expect(queue.size).toBe(1)
- expect(rtItem).toBe(2)
- expect(queue.offset).toBe(0)
- expect(queue.maxSize).toBe(3)
- expect(queue.items).toStrictEqual([3])
- rtItem = queue.dequeue()
- expect(queue.size).toBe(0)
- expect(rtItem).toBe(3)
- expect(queue.offset).toBe(0)
- expect(queue.maxSize).toBe(3)
- expect(queue.items).toStrictEqual([])
- })
+// it('Verify dequeue() behavior', () => {
+// const queue = new Queue()
+// queue.enqueue(1)
+// queue.enqueue(2)
+// queue.enqueue(3)
+// let rtItem = queue.dequeue()
+// expect(queue.size).toBe(2)
+// expect(rtItem).toBe(1)
+// expect(queue.offset).toBe(1)
+// expect(queue.maxSize).toBe(3)
+// expect(queue.items).toStrictEqual([1, 2, 3])
+// rtItem = queue.dequeue()
+// expect(queue.size).toBe(1)
+// expect(rtItem).toBe(2)
+// expect(queue.offset).toBe(0)
+// expect(queue.maxSize).toBe(3)
+// expect(queue.items).toStrictEqual([3])
+// rtItem = queue.dequeue()
+// expect(queue.size).toBe(0)
+// expect(rtItem).toBe(3)
+// expect(queue.offset).toBe(0)
+// expect(queue.maxSize).toBe(3)
+// expect(queue.items).toStrictEqual([])
+// })
- it('Verify clear() behavior', () => {
- const queue = new Queue()
- queue.enqueue(1)
- queue.enqueue(2)
- queue.enqueue(3)
- expect(queue.size).toBe(3)
- expect(queue.maxSize).toBe(3)
- queue.clear()
- expect(queue.size).toBe(0)
- expect(queue.maxSize).toBe(0)
- expect(queue.items).toStrictEqual([])
- expect(queue.offset).toBe(0)
- })
+// it('Verify clear() behavior', () => {
+// const queue = new Queue()
+// queue.enqueue(1)
+// queue.enqueue(2)
+// queue.enqueue(3)
+// expect(queue.size).toBe(3)
+// expect(queue.maxSize).toBe(3)
+// expect(queue.items).toStrictEqual([1, 2, 3])
+// expect(queue.offset).toBe(0)
+// queue.clear()
+// expect(queue.size).toBe(0)
+// expect(queue.maxSize).toBe(0)
+// expect(queue.items).toStrictEqual([])
+// expect(queue.offset).toBe(0)
+// })
- it('Verify iterator behavior', () => {
- const queue = new Queue()
- queue.enqueue(1)
- queue.enqueue(2)
- queue.enqueue(3)
- let i = 1
- for (const item of queue) {
- expect(item).toBe(i)
- ++i
- }
- })
-})
+// it('Verify iterator behavior', () => {
+// const queue = new Queue()
+// queue.enqueue(1)
+// queue.enqueue(2)
+// queue.enqueue(3)
+// let i = 1
+// for (const item of queue) {
+// expect(item).toBe(i)
+// ++i
+// }
+// })
+// })