} from '../utils'
import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
import { CircularArray } from '../circular-array'
+import { Queue } from '../queue'
import {
type IPool,
PoolEmitter,
return 0
}
return this.workerNodes.reduce(
- (accumulator, workerNode) => accumulator + workerNode.tasksQueue.length,
+ (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size(),
0
)
}
medRunTime: 0,
error: 0
},
- tasksQueue: []
+ tasksQueue: new Queue<Task<Data>>()
})
}
workerNodeKey: number,
worker: Worker,
tasksUsage: TasksUsage,
- tasksQueue: Array<Task<Data>>
+ tasksQueue: Queue<Task<Data>>
): void {
this.workerNodes[workerNodeKey] = {
worker,
}
private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
- return this.workerNodes[workerNodeKey].tasksQueue.push(task)
+ return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
}
private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
- return this.workerNodes[workerNodeKey].tasksQueue.shift()
+ return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
}
private tasksQueueSize (workerNodeKey: number): number {
- return this.workerNodes[workerNodeKey].tasksQueue.length
+ return this.workerNodes[workerNodeKey].tasksQueue.size()
}
private flushTasksQueue (workerNodeKey: number): void {
if (this.tasksQueueSize(workerNodeKey) > 0) {
- for (const task of this.workerNodes[workerNodeKey].tasksQueue) {
- this.executeTask(workerNodeKey, task)
+ for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
+ this.executeTask(
+ workerNodeKey,
+ this.dequeueTask(workerNodeKey) as Task<Data>
+ )
}
}
}