## [Unreleased]
+### Changed
+
+- Optimize tasks queue implementation.
+- Enable prioritized tasks queueing only when necessary.
+
## [4.0.12] - 2024-05-25
### Changed
export class CircularBuffer {
private readIdx: number
private writeIdx: number
- private items: Float32Array
+ private readonly items: Float32Array
private readonly maxArrayIdx: number
public size: number
return Array.from(this.items.filter(item => item !== -1))
}
+ /**
+ * Checks the buffer size.
+ *
+ * @param size - Buffer size.
+ */
private checkSize (size: number): void {
if (!Number.isSafeInteger(size)) {
throw new TypeError(
private readonly nodeArray: Array<FixedPriorityQueueNode<T>>
/** The fixed priority queue capacity. */
public readonly capacity: number
- /** The fixed priority queue size */
+ /** The fixed priority queue size. */
public size!: number
+ /** Whether to enable priority. */
+ public enablePriority: boolean
/**
* Constructs a fixed priority queue.
*
* @param size - Fixed priority queue size. @defaultValue defaultQueueSize
+ * @param enablePriority - Whether to enable priority. @defaultValue false
* @returns FixedPriorityQueue.
*/
- constructor (size: number = defaultQueueSize) {
+ constructor (size: number = defaultQueueSize, enablePriority = false) {
this.checkSize(size)
this.capacity = size
+ this.enablePriority = enablePriority
this.nodeArray = new Array<FixedPriorityQueueNode<T>>(this.capacity)
this.clear()
}
throw new Error('Priority queue is full')
}
priority = priority ?? 0
- let index = this.start
let inserted = false
- for (let i = 0; i < this.size; i++) {
- if (this.nodeArray[index].priority > priority) {
- this.nodeArray.splice(index, 0, { data, priority })
- this.nodeArray.length !== this.capacity &&
- (this.nodeArray.length = this.capacity)
- inserted = true
- break
- }
- ++index
- if (index === this.capacity) {
- index = 0
+ if (this.enablePriority) {
+ let index = this.start
+ for (let i = 0; i < this.size; i++) {
+ if (this.nodeArray[index].priority > priority) {
+ this.nodeArray.splice(index, 0, { data, priority })
+ this.nodeArray.length !== this.capacity &&
+ (this.nodeArray.length = this.capacity)
+ inserted = true
+ break
+ }
+ ++index
+ if (index === this.capacity) {
+ index = 0
+ }
}
}
if (!inserted) {
}
/**
- * Checks the size.
+ * Checks the queue size.
*
- * @param size - The size to check.
+ * @param size - Queue size.
*/
private checkSize (size: number): void {
if (!Number.isSafeInteger(size)) {
}
}
+ private setTasksQueuePriority (workerNodeKey: number): void {
+ this.workerNodes[workerNodeKey].setTasksQueuePriority(
+ this.getTasksQueuePriority()
+ )
+ }
+
/**
* This method is the message listener registered on each worker.
*/
if (workerInfo != null) {
workerInfo.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
}
} else if (taskId != null) {
// Task execution response received from worker
workerNode.info.ready = ready
workerNode.info.taskFunctionsProperties = taskFunctionsProperties
this.sendStatisticsMessageToWorker(workerNodeKey)
+ this.setTasksQueuePriority(workerNodeKey)
this.checkAndEmitReadyEvent()
}
return this.workerNodes[workerNodeKey]?.info
}
+ private getTasksQueuePriority (): boolean {
+ return this.listTaskFunctionsProperties().some(
+ taskFunctionProperties => taskFunctionProperties.priority != null
+ )
+ }
+
/**
* Creates a worker node.
*
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
).size,
- tasksQueueBucketSize: defaultBucketSize
+ tasksQueueBucketSize: defaultBucketSize,
+ tasksQueuePriority: this.getTasksQueuePriority()
}
)
// Flag the worker node as ready at pool startup.
}
if (!isPlainObject(opts)) {
throw new TypeError(
- 'Cannot construct a worker node with invalid options: must be a plain object'
+ 'Cannot construct a worker node with invalid worker node options: must be a plain object'
)
}
if (opts.tasksQueueBackPressureSize == null) {
'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
)
}
+ if (opts.tasksQueuePriority == null) {
+ throw new TypeError(
+ 'Cannot construct a worker node without a tasks queue priority option'
+ )
+ }
+ if (typeof opts.tasksQueuePriority !== 'boolean') {
+ throw new TypeError(
+ 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+ )
+ }
}
/**
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
- this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
+ this.tasksQueue = new PriorityQueue<Task<Data>>(
+ opts.tasksQueueBucketSize,
+ opts.tasksQueuePriority
+ )
this.setBackPressureFlag = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
+ /** @inheritdoc */
+ public setTasksQueuePriority (enablePriority: boolean): void {
+ this.tasksQueue.enablePriority = enablePriority
+ }
+
/** @inheritdoc */
public tasksQueueSize (): number {
return this.tasksQueue.size
env?: Record<string, unknown>
tasksQueueBackPressureSize: number | undefined
tasksQueueBucketSize: number | undefined
+ tasksQueuePriority: boolean | undefined
}
/**
* This is the number of tasks that can be enqueued before the worker node has back pressure.
*/
tasksQueueBackPressureSize: number
+ /**
+ * Sets tasks queue priority.
+ *
+ * @param enablePriority - Whether to enable tasks queue priority.
+ */
+ readonly setTasksQueuePriority: (enablePriority: boolean) => void
/**
* Tasks queue size.
*
* Constructs a priority queue.
*
* @param bucketSize - Prioritized bucket size. @defaultValue defaultBucketSize
+ * @param enablePriority - Whether to enable priority. @defaultValue false
* @returns PriorityQueue.
*/
- public constructor (bucketSize: number = defaultBucketSize) {
+ public constructor (
+ bucketSize: number = defaultBucketSize,
+ enablePriority = false
+ ) {
if (!Number.isSafeInteger(bucketSize)) {
throw new TypeError(
`Invalid bucket size: '${bucketSize}' is not an integer`
throw new RangeError(`Invalid bucket size: ${bucketSize} < 0`)
}
this.bucketSize = bucketSize
- this.clear()
+ this.head = this.tail = new FixedPriorityQueue(
+ this.bucketSize,
+ enablePriority
+ )
+ this.maxSize = 0
}
/**
return size
}
+ public get enablePriority (): boolean {
+ return this.head.enablePriority
+ }
+
+ public set enablePriority (enablePriority: boolean) {
+ if (this.head.enablePriority === enablePriority) {
+ return
+ }
+ let node: PriorityQueueNode<T> | undefined = this.tail
+ while (node != null) {
+ node.enablePriority = enablePriority
+ node = node.next
+ }
+ }
+
/**
* The number of filled prioritized buckets.
*/
*/
public enqueue (data: T, priority?: number): number {
if (this.head.full()) {
- this.head = this.head.next = new FixedPriorityQueue(this.bucketSize)
+ this.head = this.head.next = new FixedPriorityQueue(
+ this.bucketSize,
+ this.enablePriority
+ )
}
this.head.enqueue(data, priority)
const size = this.size
* Clears the priority queue.
*/
public clear (): void {
- this.head = this.tail = new FixedPriorityQueue(this.bucketSize)
+ this.head = this.tail = new FixedPriorityQueue(
+ this.bucketSize,
+ this.enablePriority
+ )
this.maxSize = 0
}
expect(fixedPriorityQueue.size).toBe(0)
expect(fixedPriorityQueue.nodeArray).toBeInstanceOf(Array)
expect(fixedPriorityQueue.capacity).toBe(defaultQueueSize)
- fixedPriorityQueue = new FixedPriorityQueue(2)
+ expect(fixedPriorityQueue.enablePriority).toBe(false)
+ fixedPriorityQueue = new FixedPriorityQueue(2, true)
expect(fixedPriorityQueue.start).toBe(0)
expect(fixedPriorityQueue.size).toBe(0)
expect(fixedPriorityQueue.nodeArray).toBeInstanceOf(Array)
expect(fixedPriorityQueue.capacity).toBe(2)
+ expect(fixedPriorityQueue.enablePriority).toBe(true)
})
it('Verify enqueue() behavior', () => {
const queueSize = 5
- const fixedPriorityQueue = new FixedPriorityQueue(queueSize)
+ const fixedPriorityQueue = new FixedPriorityQueue(queueSize, true)
let rtSize = fixedPriorityQueue.enqueue(1)
expect(fixedPriorityQueue.start).toBe(0)
expect(fixedPriorityQueue.size).toBe(1)
})
it('Verify get() behavior', () => {
- const fixedPriorityQueue = new FixedPriorityQueue()
+ const fixedPriorityQueue = new FixedPriorityQueue(defaultQueueSize, true)
fixedPriorityQueue.enqueue(1)
fixedPriorityQueue.enqueue(2, -1)
fixedPriorityQueue.enqueue(3)
it('Verify dequeue() behavior', () => {
const queueSize = 5
- const fixedPriorityQueue = new FixedPriorityQueue(queueSize)
+ const fixedPriorityQueue = new FixedPriorityQueue(queueSize, true)
fixedPriorityQueue.enqueue(1)
fixedPriorityQueue.enqueue(2, -1)
fixedPriorityQueue.enqueue(3)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
pool = new DynamicThreadPool(
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
expect(workerNode.tasksQueue.bucketSize).toBe(defaultBucketSize)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
}
await pool.destroy()
})
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(false)
for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
expect(
workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
{ name: DEFAULT_TASK_NAME },
{ name: 'jsonIntegerSerialization' },
{ name: 'factorial' },
- { name: 'fibonacci' }
+ { name: 'fibonacci', priority: -5 }
])
expect(workerNode.taskFunctionsUsage.size).toBe(3)
expect(workerNode.usage.tasks.executed).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.enablePriority).toBe(true)
for (const taskFunctionProperties of pool.listTaskFunctionsProperties()) {
expect(
workerNode.getTaskFunctionWorkerUsage(taskFunctionProperties.name)
const threadWorkerNode = new WorkerNode(
WorkerTypes.thread,
'./tests/worker-files/thread/testWorker.mjs',
- { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
+ {
+ tasksQueueBackPressureSize: 12,
+ tasksQueueBucketSize: 6,
+ tasksQueuePriority: true
+ }
)
const clusterWorkerNode = new WorkerNode(
WorkerTypes.cluster,
'./tests/worker-files/cluster/testWorker.cjs',
- { tasksQueueBackPressureSize: 12, tasksQueueBucketSize: 6 }
+ {
+ tasksQueueBackPressureSize: 12,
+ tasksQueueBucketSize: 6,
+ tasksQueuePriority: true
+ }
)
it('Worker node instantiation', () => {
() =>
new WorkerNode(
'invalidWorkerType',
- './tests/worker-files/thread/testWorker.mjs',
- { tasksQueueBackPressureSize: 12 }
+ './tests/worker-files/thread/testWorker.mjs'
)
).toThrow(
new TypeError(
)
).toThrow(
new TypeError(
- 'Cannot construct a worker node with invalid options: must be a plain object'
+ 'Cannot construct a worker node with invalid worker node options: must be a plain object'
)
)
expect(
'Cannot construct a worker node with a tasks queue bucket size option that is not a positive integer'
)
)
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ {
+ tasksQueueBackPressureSize: 12,
+ tasksQueueBucketSize: 6
+ }
+ )
+ ).toThrow(
+ new RangeError(
+ 'Cannot construct a worker node without a tasks queue priority option'
+ )
+ )
+ expect(
+ () =>
+ new WorkerNode(
+ WorkerTypes.thread,
+ './tests/worker-files/thread/testWorker.mjs',
+ {
+ tasksQueueBackPressureSize: 12,
+ tasksQueueBucketSize: 6,
+ tasksQueuePriority: 'invalidTasksQueuePriority'
+ }
+ )
+ ).toThrow(
+ new RangeError(
+ 'Cannot construct a worker node with a tasks queue priority option that is not a boolean'
+ )
+ )
expect(threadWorkerNode).toBeInstanceOf(WorkerNode)
expect(threadWorkerNode.worker).toBeInstanceOf(ThreadWorker)
expect(threadWorkerNode.info).toStrictEqual({
expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(threadWorkerNode.tasksQueue.size).toBe(0)
expect(threadWorkerNode.tasksQueue.bucketSize).toBe(6)
+ expect(threadWorkerNode.tasksQueue.enablePriority).toBe(true)
expect(threadWorkerNode.tasksQueueSize()).toBe(
threadWorkerNode.tasksQueue.size
)
expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(clusterWorkerNode.tasksQueue.size).toBe(0)
expect(clusterWorkerNode.tasksQueue.bucketSize).toBe(6)
+ expect(clusterWorkerNode.tasksQueue.enablePriority).toBe(true)
expect(clusterWorkerNode.tasksQueueSize()).toBe(
clusterWorkerNode.tasksQueue.size
)
expect(priorityQueue.buckets).toBe(0)
expect(priorityQueue.size).toBe(0)
expect(priorityQueue.maxSize).toBe(0)
+ expect(priorityQueue.enablePriority).toBe(false)
expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue)
expect(priorityQueue.head.next).toBe(undefined)
expect(priorityQueue.head.capacity).toBe(defaultBucketSize)
expect(priorityQueue.tail).toBeInstanceOf(FixedPriorityQueue)
expect(priorityQueue.tail).toStrictEqual(priorityQueue.head)
const bucketSize = 2
- priorityQueue = new PriorityQueue(bucketSize)
+ priorityQueue = new PriorityQueue(bucketSize, true)
expect(priorityQueue.bucketSize).toBe(bucketSize)
expect(priorityQueue.buckets).toBe(0)
expect(priorityQueue.size).toBe(0)
expect(priorityQueue.maxSize).toBe(0)
+ expect(priorityQueue.enablePriority).toBe(true)
expect(priorityQueue.head).toBeInstanceOf(FixedPriorityQueue)
expect(priorityQueue.head.next).toBe(undefined)
expect(priorityQueue.head.capacity).toBe(bucketSize)
})
it('Verify default bucket size enqueue() behavior', () => {
- const priorityQueue = new PriorityQueue()
+ const priorityQueue = new PriorityQueue(defaultBucketSize, true)
let rtSize = priorityQueue.enqueue(1)
expect(priorityQueue.buckets).toBe(0)
expect(priorityQueue.size).toBe(1)
})
it('Verify bucketSize=2 enqueue() behavior', () => {
- const priorityQueue = new PriorityQueue(2)
+ const priorityQueue = new PriorityQueue(2, true)
let rtSize = priorityQueue.enqueue(1)
expect(priorityQueue.buckets).toBe(0)
expect(priorityQueue.size).toBe(1)
})
it('Verify default bucket size dequeue() behavior', () => {
- const priorityQueue = new PriorityQueue()
+ const priorityQueue = new PriorityQueue(defaultBucketSize, true)
priorityQueue.enqueue(1)
priorityQueue.enqueue(2, -1)
priorityQueue.enqueue(3)
})
it('Verify bucketSize=2 dequeue() behavior', () => {
- const priorityQueue = new PriorityQueue(2)
+ const priorityQueue = new PriorityQueue(2, true)
priorityQueue.enqueue(1)
priorityQueue.enqueue(2)
priorityQueue.enqueue(3)
expect(priorityQueue.tail.next).toBe(undefined)
})
+ it('Verify enablePriority setter behavior', () => {
+ const priorityQueue = new PriorityQueue(2)
+ expect(priorityQueue.enablePriority).toBe(false)
+ priorityQueue.enqueue(1)
+ priorityQueue.enqueue(2)
+ priorityQueue.enqueue(3)
+ priorityQueue.enqueue(4)
+ let buckets = 0
+ let node = priorityQueue.tail
+ while (node != null) {
+ expect(node.enablePriority).toBe(false)
+ node = node.next
+ ++buckets
+ }
+ expect(buckets).toBe(2)
+ priorityQueue.enablePriority = true
+ expect(priorityQueue.enablePriority).toBe(true)
+ node = priorityQueue.tail
+ while (node != null) {
+ expect(node.enablePriority).toBe(true)
+ node = node.next
+ }
+ })
+
it('Verify iterator behavior', () => {
const priorityQueue = new PriorityQueue(2)
priorityQueue.enqueue(1)
'use strict'
-const { KillBehaviors, ThreadWorker } = require('../../../lib/index.cjs')
+const { KillBehaviors, ClusterWorker } = require('../../../lib/index.cjs')
const {
factorial,
fibonacci,
jsonIntegerSerialization
} = require('../../test-utils.cjs')
-module.exports = new ThreadWorker(
+module.exports = new ClusterWorker(
{
jsonIntegerSerialization: {
taskFunction: data => jsonIntegerSerialization(data.n)
},
factorial: { taskFunction: data => factorial(data.n) },
- fibonacci: { taskFunction: data => fibonacci(data.n) }
+ fibonacci: { taskFunction: data => fibonacci(data.n), priority: -5 }
},
{
killBehavior: KillBehaviors.HARD,
taskFunction: data => jsonIntegerSerialization(data.n)
},
factorial: { taskFunction: data => factorial(data.n) },
- fibonacci: { taskFunction: data => fibonacci(data.n) }
+ fibonacci: { taskFunction: data => fibonacci(data.n), priority: -5 }
},
{
killBehavior: KillBehaviors.HARD,