X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fworker-node.ts;h=e9680261b591e26707c11c5ca868873f3ca5a6e4;hb=f7d15c1217381b306300beb7926b42fde93bc579;hp=78756560c0fbaa78fedb96aad7153a21d14e61ff;hpb=c172526cd82999efbace45402e0bd6d9ae173963;p=poolifier.git diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 78756560..e9680261 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -46,6 +46,7 @@ implements IWorkerNode { /** @inheritdoc */ public onEmptyQueue?: WorkerNodeEventCallback private readonly tasksQueue: Deque> + private onBackPressureStarted: boolean private onEmptyQueueCount: number private readonly taskFunctionsUsage: Map @@ -65,6 +66,7 @@ implements IWorkerNode { } this.tasksQueueBackPressureSize = tasksQueueBackPressureSize this.tasksQueue = new Deque>() + this.onBackPressureStarted = false this.onEmptyQueueCount = 0 this.taskFunctionsUsage = new Map() } @@ -77,8 +79,14 @@ implements IWorkerNode { /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.push(task) - if (this.onBackPressure != null && this.hasBackPressure()) { + if ( + this.onBackPressure != null && + this.hasBackPressure() && + !this.onBackPressureStarted + ) { + this.onBackPressureStarted = true this.onBackPressure(this.info.id as number) + this.onBackPressureStarted = false } return tasksQueueSize } @@ -86,8 +94,14 @@ implements IWorkerNode { /** @inheritdoc */ public unshiftTask (task: Task): number { const tasksQueueSize = this.tasksQueue.unshift(task) - if (this.onBackPressure != null && this.hasBackPressure()) { + if ( + this.onBackPressure != null && + this.hasBackPressure() && + !this.onBackPressureStarted + ) { + this.onBackPressureStarted = true this.onBackPressure(this.info.id as number) + this.onBackPressureStarted = false } return tasksQueueSize } @@ -95,7 +109,11 @@ implements IWorkerNode { /** @inheritdoc */ public dequeueTask (): Task | undefined { const task = this.tasksQueue.shift() - if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + if ( + this.onEmptyQueue != null && + this.tasksQueue.size === 0 && + this.onEmptyQueueCount === 0 + ) { this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task @@ -104,7 +122,11 @@ implements IWorkerNode { /** @inheritdoc */ public popTask (): Task | undefined { const task = this.tasksQueue.pop() - if (this.onEmptyQueue != null && this.tasksQueue.size === 0) { + if ( + this.onEmptyQueue != null && + this.tasksQueue.size === 0 && + this.onEmptyQueueCount === 0 + ) { this.startOnEmptyQueue().catch(EMPTY_FUNCTION) } return task @@ -161,6 +183,11 @@ implements IWorkerNode { return this.taskFunctionsUsage.get(name) } + /** @inheritdoc */ + public deleteTaskFunctionWorkerUsage (name: string): boolean { + return this.taskFunctionsUsage.delete(name) + } + private async startOnEmptyQueue (): Promise { if ( this.onEmptyQueueCount > 0 && @@ -169,8 +196,8 @@ implements IWorkerNode { this.onEmptyQueueCount = 0 return } - (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number) ++this.onEmptyQueueCount + this.onEmptyQueue?.(this.info.id as number) await sleep(exponentialDelay(this.onEmptyQueueCount)) await this.startOnEmptyQueue() }