public tasksQueueBackPressureSize: number
private readonly tasksQueue: Deque<Task<Data>>
private onBackPressureStarted: boolean
- private onEmptyQueueCount: number
+ private onIdleWorkerNodeCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
/**
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
this.onBackPressureStarted = false
- this.onEmptyQueueCount = 0
+ this.onIdleWorkerNodeCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
- this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+ if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+ this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
}
return task
}
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.tasksQueue.size === 0 && this.onEmptyQueueCount === 0) {
- this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
+ if (this.isIdle() && this.onIdleWorkerNodeCount === 0) {
+ this.startOnIdleWorkerNode().catch(EMPTY_FUNCTION)
}
return task
}
return this.taskFunctionsUsage.delete(name)
}
- private async startOnEmptyQueue (): Promise<void> {
+ private async startOnIdleWorkerNode (): Promise<void> {
if (
- this.onEmptyQueueCount > 0 &&
+ this.onIdleWorkerNodeCount > 0 &&
(this.usage.tasks.executing > 0 || this.tasksQueue.size > 0)
) {
- this.onEmptyQueueCount = 0
+ this.onIdleWorkerNodeCount = 0
return
}
- ++this.onEmptyQueueCount
+ ++this.onIdleWorkerNodeCount
this.dispatchEvent(
- new CustomEvent<WorkerNodeEventDetail>('emptyQueue', {
+ new CustomEvent<WorkerNodeEventDetail>('idleWorkerNode', {
detail: { workerId: this.info.id as number }
})
)
- await sleep(exponentialDelay(this.onEmptyQueueCount))
- await this.startOnEmptyQueue()
+ await sleep(exponentialDelay(this.onIdleWorkerNodeCount))
+ await this.startOnIdleWorkerNode()
+ }
+
+ private isIdle (): boolean {
+ return this.usage.tasks.executing === 0 && this.tasksQueue.size === 0
}
private initWorkerInfo (worker: Worker): WorkerInfo {