/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
+ private onBackPressureStarted: boolean
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
+ this.onBackPressureStarted = false
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
++this.onEmptyQueueCount
+ this.onEmptyQueue?.(this.info.id as number)
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}