+++ /dev/null
-// Copyright Jerome Benoit. 2023-2024. All Rights Reserved.
-
-/**
- * Linked list node interface.
- *
- * @typeParam T - Type of linked list node data.
- * @internal
- */
-export interface ILinkedListNode<T> {
- data: T
- next?: ILinkedListNode<T>
- prev?: ILinkedListNode<T>
-}
-
-/**
- * Deque.
- * Implemented with a doubly linked list.
- *
- * @typeParam T - Type of deque data.
- * @internal
- */
-export class Deque<T> {
- private head?: ILinkedListNode<T>
- private tail?: ILinkedListNode<T>
- /** The size of the deque. */
- public size!: number
- /** The maximum size of the deque. */
- public maxSize!: number
-
- public constructor () {
- this.clear()
- }
-
- /**
- * Appends data to the deque.
- *
- * @param data - Data to append.
- * @returns The new size of the deque.
- */
- public push (data: T): number {
- const node: ILinkedListNode<T> = { data }
- if (this.tail == null) {
- this.head = this.tail = node
- } else {
- node.prev = this.tail
- this.tail = this.tail.next = node
- }
- return this.incrementSize()
- }
-
- /**
- * Prepends data to the deque.
- *
- * @param data - Data to prepend.
- * @returns The new size of the deque.
- */
- public unshift (data: T): number {
- const node: ILinkedListNode<T> = { data }
- if (this.head == null) {
- this.head = this.tail = node
- } else {
- node.next = this.head
- this.head = this.head.prev = node
- }
- return this.incrementSize()
- }
-
- /**
- * Pops data from the deque.
- *
- * @returns The popped data or `undefined` if the deque is empty.
- */
- public pop (): T | undefined {
- if (this.head == null) {
- return
- }
- const tail = this.tail
- this.tail = this.tail?.prev
- if (this.tail == null) {
- delete this.head
- } else {
- delete this.tail.next
- }
- --this.size
- return tail?.data
- }
-
- /**
- * Shifts data from the deque.
- *
- * @returns The shifted data or `undefined` if the deque is empty.
- */
- public shift (): T | undefined {
- if (this.head == null) {
- return
- }
- const head = this.head
- this.head = this.head.next
- if (this.head == null) {
- delete this.tail
- } else {
- delete this.head.prev
- }
- --this.size
- return head.data
- }
-
- /**
- * Peeks at the first data.
- * @returns The first data or `undefined` if the deque is empty.
- */
- public peekFirst (): T | undefined {
- return this.head?.data
- }
-
- /**
- * Peeks at the last data.
- * @returns The last data or `undefined` if the deque is empty.
- */
- public peekLast (): T | undefined {
- return this.tail?.data
- }
-
- /**
- * Clears the deque.
- */
- public clear (): void {
- delete this.head
- delete this.tail
- this.size = 0
- this.maxSize = 0
- }
-
- /**
- * Returns an iterator for the deque.
- *
- * @returns An iterator for the deque.
- * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
- */
- [Symbol.iterator] (): Iterator<T> {
- let node = this.head
- return {
- next: () => {
- if (node == null) {
- return {
- value: undefined,
- done: true
- }
- }
- const ret = {
- value: node.data,
- done: false
- }
- node = node.next
- return ret
- }
- }
- }
-
- /**
- * Returns an backward iterator for the deque.
- *
- * @returns An backward iterator for the deque.
- * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
- */
- backward (): Iterable<T> {
- return {
- [Symbol.iterator]: (): Iterator<T> => {
- let node = this.tail
- return {
- next: () => {
- if (node == null) {
- return {
- value: undefined,
- done: true
- }
- }
- const ret = {
- value: node.data,
- done: false
- }
- node = node.prev
- return ret
- }
- }
- }
- }
- }
-
- /**
- * Increments the size of the deque.
- *
- * @returns The new size of the deque.
- */
- private incrementSize (): number {
- ++this.size
- if (this.size > this.maxSize) {
- this.maxSize = this.size
- }
- return this.size
- }
-}
export type { CircularArray } from './circular-array.js'
-export type { Deque, ILinkedListNode } from './deque.js'
export type { AbstractPool } from './pools/abstract-pool.js'
export { DynamicClusterPool } from './pools/cluster/dynamic.js'
export type { ClusterPoolOptions } from './pools/cluster/fixed.js'
WorkerUsage
} from './pools/worker.js'
export { WorkerTypes } from './pools/worker.js'
-export type { PriorityQueue } from './priority-queue.js'
+export type { PriorityQueue, PriorityQueueNode } from './priority-queue.js'
export type {
MessageValue,
PromiseResponseWrapper,
import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
import {
checkFilePath,
+ checkValidPriority,
checkValidTasksQueueOptions,
checkValidWorkerChoiceStrategy,
getDefaultTasksQueueOptions,
if (typeof fn.taskFunction !== 'function') {
throw new TypeError('taskFunction property must be a function')
}
+ checkValidPriority(fn.priority)
+ checkValidWorkerChoiceStrategy(fn.strategy)
const opResult = await this.sendTaskFunctionOperationToWorkers({
taskFunctionOperation: 'add',
taskFunctionProperties: buildTaskFunctionProperties(name, fn),
}
}
+ /**
+ * Gets worker node task function priority, if any.
+ *
+ * @param workerNodeKey - The worker node key.
+ * @param name - The task function name.
+ * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
+ */
+ private readonly getWorkerNodeTaskFunctionPriority = (
+ workerNodeKey: number,
+ name?: string
+ ): number | undefined => {
+ if (name != null) {
+ return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
+ (taskFunctionProperties: TaskFunctionProperties) =>
+ taskFunctionProperties.name === name
+ )?.priority
+ }
+ }
+
/**
* Gets the worker choice strategies registered in this pool.
*
return
}
const timestamp = performance.now()
- const workerNodeKey = this.chooseWorkerNode(
+ const taskFunctionStrategy =
this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
- )
+ const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
const task: Task<Data> = {
name: name ?? DEFAULT_TASK_NAME,
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
data: data ?? ({} as Data),
+ priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
+ strategy: taskFunctionStrategy,
transferList,
timestamp,
taskId: randomUUID()
)
if (sourceWorkerNode != null) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueTask(1)!
this.handleTask(workerNodeKey, task)
this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
}
workerInfo.stealing = true
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const task = sourceWorkerNode.popTask()!
+ const task = sourceWorkerNode.dequeueTask(1)!
this.handleTask(workerNodeKey, task)
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
this.opts.tasksQueueOptions?.size ??
getDefaultTasksQueueOptions(
this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
- ).size
+ ).size,
+ tasksQueueBucketSize:
+ (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
}
)
// Flag the worker node as ready at pool startup.
return tasksQueueSize
}
- private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].dequeueTask()
+ private dequeueTask (
+ workerNodeKey: number,
+ bucket?: number
+ ): Task<Data> | undefined {
+ return this.workerNodes[workerNodeKey].dequeueTask(bucket)
}
private tasksQueueSize (workerNodeKey: number): number {
import { MessageChannel } from 'node:worker_threads'
import { CircularArray } from '../circular-array.js'
-import { Deque } from '../deque.js'
import { PriorityQueue } from '../priority-queue.js'
import type { Task } from '../utility-types.js'
import { DEFAULT_TASK_NAME } from '../utils.js'
public messageChannel?: MessageChannel
/** @inheritdoc */
public tasksQueueBackPressureSize: number
- private readonly tasksQueue: Deque<Task<Data>>
- private readonly tasksQueue2 = new PriorityQueue<Task<Data>>()
+ private readonly tasksQueue: PriorityQueue<Task<Data>>
private onBackPressureStarted: boolean
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize!
- this.tasksQueue = new Deque<Task<Data>>()
+ this.tasksQueue = new PriorityQueue<Task<Data>>(opts.tasksQueueBucketSize)
this.onBackPressureStarted = false
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
- const tasksQueueSize = this.tasksQueue.push(task)
+ const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority)
if (this.hasBackPressure() && !this.onBackPressureStarted) {
this.onBackPressureStarted = true
this.emit('backPressure', { workerId: this.info.id })
}
/** @inheritdoc */
- public unshiftTask (task: Task<Data>): number {
- const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.hasBackPressure() && !this.onBackPressureStarted) {
- this.onBackPressureStarted = true
- this.emit('backPressure', { workerId: this.info.id })
- this.onBackPressureStarted = false
- }
- return tasksQueueSize
- }
-
- /** @inheritdoc */
- public dequeueTask (): Task<Data> | undefined {
- return this.tasksQueue.shift()
- }
-
- /** @inheritdoc */
- public popTask (): Task<Data> | undefined {
- return this.tasksQueue.pop()
+ public dequeueTask (bucket?: number): Task<Data> | undefined {
+ return this.tasksQueue.dequeue(bucket)
}
/** @inheritdoc */
workerOptions?: WorkerOptions
env?: Record<string, unknown>
tasksQueueBackPressureSize: number | undefined
+ tasksQueueBucketSize: number | undefined
}
/**
* @returns The tasks queue size.
*/
readonly enqueueTask: (task: Task<Data>) => number
- /**
- * Prepends a task to the tasks queue.
- *
- * @param task - The task to prepend.
- * @returns The tasks queue size.
- */
- readonly unshiftTask: (task: Task<Data>) => number
/**
* Dequeue task.
*
+ * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
* @returns The dequeued task.
*/
- readonly dequeueTask: () => Task<Data> | undefined
- /**
- * Pops a task from the tasks queue.
- *
- * @returns The popped task.
- */
- readonly popTask: () => Task<Data> | undefined
+ readonly dequeueTask: (bucket?: number) => Task<Data> | undefined
/**
* Clears tasks queue.
*/
+// Copyright Jerome Benoit. 2024. All Rights Reserved.
+
/**
+ * Priority queue node.
+ *
+ * @typeParam T - Type of priority queue node data.
* @internal
*/
-interface PriorityQueueNode<T> {
+export interface PriorityQueueNode<T> {
data: T
priority: number
}
/**
- * k-priority queue.
+ * Priority queue.
*
* @typeParam T - Type of priority queue data.
* @internal
public maxSize!: number
/**
- * Constructs a k-priority queue.
+ * Constructs a priority queue.
*
* @param k - Prioritized bucket size.
*/
/**
* Dequeue data from the priority queue.
*
+ * @param bucket - The prioritized bucket to dequeue from. @defaultValue 0
* @returns The dequeued data or `undefined` if the priority queue is empty.
*/
- public dequeue (): T | undefined {
+ public dequeue (bucket = 0): T | undefined {
+ if (this.k !== Infinity && bucket > 0) {
+ while (bucket > 0) {
+ const index = bucket * this.k
+ // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
+ if (this.nodeArray[index] != null) {
+ --this.size
+ return this.nodeArray.splice(index, 1)[0].data
+ }
+ --bucket
+ }
+ }
if (this.size > 0) {
--this.size
}
}
/**
- * Increments the size of the deque.
+ * Returns an iterator for the priority queue.
+ *
+ * @returns An iterator for the deque.
+ * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols
+ */
+ [Symbol.iterator] (): Iterator<T> {
+ let i = 0
+ return {
+ next: () => {
+ if (i >= this.nodeArray.length) {
+ return {
+ value: undefined,
+ done: true
+ }
+ }
+ const value = this.nodeArray[i].data
+ i++
+ return {
+ value,
+ done: false
+ }
+ }
+ }
+ }
+
+ /**
+ * Increments the size of the priority queue.
*
- * @returns The new size of the deque.
+ * @returns The new size of the priority queue.
*/
private incrementSize (): number {
++this.size
* Task input data that will be passed to the worker.
*/
readonly data?: Data
+ /**
+ * Task priority. Lower values have higher priority.
+ *
+ * @defaultValue 0
+ */
+ readonly priority?: number
+ /**
+ * Task worker choice strategy.
+ */
+ readonly strategy?: WorkerChoiceStrategy
/**
* Array of transferable objects.
*/
/**
* Task UUID.
*/
- readonly taskId?: string
+ readonly taskId?: `${string}-${string}-${string}-${string}-${string}`
}
/**
+++ /dev/null
-import { expect } from 'expect'
-
-import { Deque } from '../lib/deque.cjs'
-
-describe('Deque test suite', () => {
- it('Verify push() behavior', () => {
- const deque = new Deque()
- let rtSize = deque.push(1)
- expect(deque.size).toBe(1)
- expect(deque.maxSize).toBe(1)
- expect(rtSize).toBe(deque.size)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 1 })
- rtSize = deque.push(2)
- expect(deque.size).toBe(2)
- expect(deque.maxSize).toBe(2)
- expect(rtSize).toBe(deque.size)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 2 })
- rtSize = deque.push(3)
- expect(deque.size).toBe(3)
- expect(deque.maxSize).toBe(3)
- expect(rtSize).toBe(deque.size)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 3 })
- })
-
- it('Verify unshift() behavior', () => {
- const deque = new Deque()
- let rtSize = deque.unshift(1)
- expect(deque.size).toBe(1)
- expect(deque.maxSize).toBe(1)
- expect(rtSize).toBe(deque.size)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 1 })
- rtSize = deque.unshift(2)
- expect(deque.size).toBe(2)
- expect(deque.maxSize).toBe(2)
- expect(rtSize).toBe(deque.size)
- expect(deque.head).toMatchObject({ data: 2 })
- expect(deque.tail).toMatchObject({ data: 1 })
- rtSize = deque.unshift(3)
- expect(deque.size).toBe(3)
- expect(deque.maxSize).toBe(3)
- expect(rtSize).toBe(deque.size)
- expect(deque.head).toMatchObject({ data: 3 })
- expect(deque.tail).toMatchObject({ data: 1 })
- })
-
- it('Verify pop() behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- let rtItem = deque.pop()
- expect(deque.size).toBe(2)
- expect(deque.maxSize).toBe(3)
- expect(rtItem).toBe(3)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 2 })
- rtItem = deque.pop()
- expect(deque.size).toBe(1)
- expect(deque.maxSize).toBe(3)
- expect(rtItem).toBe(2)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 1 })
- rtItem = deque.pop()
- expect(deque.size).toBe(0)
- expect(deque.maxSize).toBe(3)
- expect(rtItem).toBe(1)
- expect(deque.head).toBeUndefined()
- expect(deque.tail).toBeUndefined()
- })
-
- it('Verify shift() behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- let rtItem = deque.shift()
- expect(deque.size).toBe(2)
- expect(deque.maxSize).toBe(3)
- expect(rtItem).toBe(1)
- expect(deque.head).toMatchObject({ data: 2 })
- expect(deque.tail).toMatchObject({ data: 3 })
- rtItem = deque.shift()
- expect(deque.size).toBe(1)
- expect(deque.maxSize).toBe(3)
- expect(rtItem).toBe(2)
- expect(deque.head).toMatchObject({ data: 3 })
- expect(deque.tail).toMatchObject({ data: 3 })
- rtItem = deque.shift()
- expect(deque.size).toBe(0)
- expect(deque.maxSize).toBe(3)
- expect(rtItem).toBe(3)
- expect(deque.head).toBeUndefined()
- expect(deque.tail).toBeUndefined()
- })
-
- it('Verify peekFirst() behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- expect(deque.size).toBe(3)
- expect(deque.peekFirst()).toBe(1)
- expect(deque.size).toBe(3)
- })
-
- it('Verify peekLast() behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- expect(deque.size).toBe(3)
- expect(deque.peekLast()).toBe(3)
- expect(deque.size).toBe(3)
- })
-
- it('Verify clear() behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- expect(deque.size).toBe(3)
- expect(deque.maxSize).toBe(3)
- expect(deque.head).toMatchObject({ data: 1 })
- expect(deque.tail).toMatchObject({ data: 3 })
- deque.clear()
- expect(deque.size).toBe(0)
- expect(deque.maxSize).toBe(0)
- expect(deque.head).toBeUndefined()
- expect(deque.tail).toBeUndefined()
- })
-
- it('Verify iterator behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- let i = 1
- for (const value of deque) {
- expect(value).toBe(i)
- ++i
- }
- })
-
- it('Verify backward() iterator behavior', () => {
- const deque = new Deque()
- deque.push(1)
- deque.push(2)
- deque.push(3)
- let i = deque.size
- for (const value of deque.backward()) {
- expect(value).toBe(i)
- --i
- }
- })
-})
import { restore, stub } from 'sinon'
import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
import {
DynamicClusterPool,
DynamicThreadPool,
WorkerTypes
} from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
import { waitPoolEvents } from '../test-utils.cjs'
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
)
for (const workerNode of pool.workerNodes) {
expect(workerNode).toBeInstanceOf(WorkerNode)
- expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(workerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
}
import { expect } from 'expect'
import { CircularArray } from '../../lib/circular-array.cjs'
-import { Deque } from '../../lib/deque.cjs'
import { WorkerTypes } from '../../lib/index.cjs'
import { WorkerNode } from '../../lib/pools/worker-node.cjs'
+import { PriorityQueue } from '../../lib/priority-queue.cjs'
import { DEFAULT_TASK_NAME } from '../../lib/utils.cjs'
describe('Worker node test suite', () => {
})
expect(threadWorkerNode.messageChannel).toBeInstanceOf(MessageChannel)
expect(threadWorkerNode.tasksQueueBackPressureSize).toBe(12)
- expect(threadWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(threadWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(threadWorkerNode.tasksQueue.size).toBe(0)
expect(threadWorkerNode.tasksQueueSize()).toBe(
threadWorkerNode.tasksQueue.size
})
expect(clusterWorkerNode.messageChannel).toBeUndefined()
expect(clusterWorkerNode.tasksQueueBackPressureSize).toBe(12)
- expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(Deque)
+ expect(clusterWorkerNode.tasksQueue).toBeInstanceOf(PriorityQueue)
expect(clusterWorkerNode.tasksQueue.size).toBe(0)
expect(clusterWorkerNode.tasksQueueSize()).toBe(
clusterWorkerNode.tasksQueue.size
import { expect } from 'expect'
-// eslint-disable-next-line n/no-missing-import, import/no-unresolved
import { PriorityQueue } from '../lib/priority-queue.cjs'
-describe.skip('Priority queue test suite', () => {
+describe('Priority queue test suite', () => {
it('Verify constructor() behavior', () => {
expect(() => new PriorityQueue('')).toThrow(
new TypeError('k must be an integer')
expect(priorityQueue.size).toBe(3)
})
+ it('Verify iterator behavior', () => {
+ const priorityQueue = new PriorityQueue()
+ priorityQueue.enqueue(1)
+ priorityQueue.enqueue(2)
+ priorityQueue.enqueue(3)
+ let i = 1
+ for (const value of priorityQueue) {
+ expect(value).toBe(i)
+ ++i
+ }
+ })
+
it('Verify clear() behavior', () => {
const priorityQueue = new PriorityQueue()
priorityQueue.enqueue(1)