repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'master' into feature/task-functions
[poolifier.git]
/
src
/
pools
/
worker-node.ts
diff --git
a/src/pools/worker-node.ts
b/src/pools/worker-node.ts
index 78756560c0fbaa78fedb96aad7153a21d14e61ff..ed6677027f810474aa9aa01e768fd5f483250679 100644
(file)
--- a/
src/pools/worker-node.ts
+++ b/
src/pools/worker-node.ts
@@
-46,6
+46,7
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
+ private onBackPressureStarted: boolean
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
@@
-65,6
+66,7
@@
implements IWorkerNode<Worker, Data> {
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
+ this.onBackPressureStarted = false
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
@@
-77,8
+79,14
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
}
return tasksQueueSize
}
@@
-86,8
+94,14
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
}
return tasksQueueSize
}
@@
-95,7
+109,11
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
@@
-104,7
+122,11
@@
implements IWorkerNode<Worker, Data> {
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
@@
-169,8
+191,8
@@
implements IWorkerNode<Worker, Data> {
this.onEmptyQueueCount = 0
return
}
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
++this.onEmptyQueueCount
++this.onEmptyQueueCount
+ this.onEmptyQueue?.(this.info.id as number)
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}