## [Unreleased]
+### Changed
+
+- Use O(1) queue implementation for tasks queueing.
+
## [2.4.11] - 2023-04-23
### Changed
} from '../utils'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
import {
type IPool,
PoolEmitter,
return 0
}
return this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+ (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size(),
0
)
}
medRunTime: 0,
error: 0
},
- tasksQueue: []
+ tasksQueue: new Queue<Task<Data>>()
})
}
workerNodeKey: number,
worker: Worker,
tasksUsage: TasksUsage,
- tasksQueue: Array<Task<Data>>
+ tasksQueue: Queue<Task<Data>>
): void {
this.workerNodes[workerNodeKey] = {
worker,
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.push(task)
+ return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.shift()
+ return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.length
+ return this.workerNodes[workerNodeKey].tasksQueue.size()
}
private flushTasksQueue (workerNodeKey: number): void {
if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (const task of this.workerNodes[workerNodeKey].tasksQueue) {
- this.executeTask(workerNodeKey, task)
+ for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
}
}
import type { CircularArray } from '../circular-array'
+import type { Queue } from '../queue'
/**
* Callback invoked if the worker has received a message.
/**
* Worker node tasks queue.
*/
- readonly tasksQueue: Array<Task<Data>>
+ readonly tasksQueue: Queue<Task<Data>>
}
--- /dev/null
+/**
+ * Queue
+ */
+export class Queue<T> {
+ private items: Record<number, T>
+ private head: number
+ private tail: number
+
+ constructor () {
+ this.items = {}
+ this.head = 0
+ this.tail = 0
+ }
+
+ enqueue (item: T): number {
+ this.items[this.tail] = item
+ this.tail++
+ return this.size()
+ }
+
+ dequeue (): T | undefined {
+ if (this.size() <= 0) return undefined
+ const item = this.items[this.head]
+ // eslint-disable-next-line @typescript-eslint/no-dynamic-delete
+ delete this.items[this.head]
+ this.head++
+ if (this.head === this.tail) {
+ this.head = 0
+ this.tail = 0
+ }
+ return item
+ }
+
+ size (): number {
+ return this.tail - this.head
+ }
+}
WorkerChoiceStrategies
} = require('../../../lib')
const { CircularArray } = require('../../../lib/circular-array')
+const { Queue } = require('../../../lib/queue')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 1
)
for (const workerNode of pool.workerNodes) {
expect(workerNode.tasksQueue).toBeDefined()
- expect(workerNode.tasksQueue).toBeInstanceOf(Array)
- expect(workerNode.tasksQueue.length).toBe(0)
+ expect(workerNode.tasksQueue).toBeInstanceOf(Queue)
+ expect(workerNode.tasksQueue.size()).toBe(0)
}
await pool.destroy()
})
queuePool.opts.tasksQueueOptions.concurrency
)
expect(workerNode.tasksUsage.run).toBe(0)
- expect(workerNode.tasksQueue.length).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.size()).toBeGreaterThan(0)
}
expect(queuePool.numberOfRunningTasks).toBe(numberOfWorkers)
expect(queuePool.numberOfQueuedTasks).toBe(
for (const workerNode of queuePool.workerNodes) {
expect(workerNode.tasksUsage.running).toBe(0)
expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksQueue.length).toBe(0)
+ expect(workerNode.tasksQueue.size()).toBe(0)
}
promises.clear()
})
queuePool.opts.tasksQueueOptions.concurrency
)
expect(workerNode.tasksUsage.run).toBe(0)
- expect(workerNode.tasksQueue.length).toBeGreaterThan(0)
+ expect(workerNode.tasksQueue.size()).toBeGreaterThan(0)
}
expect(queuePool.numberOfRunningTasks).toBe(numberOfThreads)
expect(queuePool.numberOfQueuedTasks).toBe(
for (const workerNode of queuePool.workerNodes) {
expect(workerNode.tasksUsage.running).toBe(0)
expect(workerNode.tasksUsage.run).toBeGreaterThan(0)
- expect(workerNode.tasksQueue.length).toBe(0)
+ expect(workerNode.tasksQueue.size()).toBe(0)
}
promises.clear()
})