X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fworker-node.ts;h=c1c1e0a99b2bc3200e58e85d7c8d1b29aacaa893;hb=40d04d55ce29490e98226cfe968df9502755782d;hp=5d28802c12769c1cbaf0cd519e22708918788016;hpb=0d4e88b32dcc9af05423c40e049fb2693012b6d8;p=poolifier.git diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 5d28802c..c1c1e0a9 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -45,7 +45,7 @@ export class WorkerNode /** @inheritdoc */ public tasksQueueBackPressureSize: number private readonly tasksQueue: PriorityQueue> - private onBackPressureStarted: boolean + private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map /** @@ -70,7 +70,7 @@ export class WorkerNode // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) - this.onBackPressureStarted = false + this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } @@ -82,23 +82,38 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) - if (this.hasBackPressure() && !this.onBackPressureStarted) { - this.onBackPressureStarted = true + if ( + !this.setBackPressureFlag && + this.hasBackPressure() && + !this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = true this.emit('backPressure', { workerId: this.info.id }) - this.onBackPressureStarted = false + this.setBackPressureFlag = false } return tasksQueueSize } /** @inheritdoc */ public dequeueTask (bucket?: number): Task | undefined { - return this.tasksQueue.dequeue(bucket) + const task = this.tasksQueue.dequeue(bucket) + if ( + !this.setBackPressureFlag && + !this.hasBackPressure() && + this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = false + this.setBackPressureFlag = false + } + return task } /** @inheritdoc */ - public dequeueLastBucketTask (): Task | undefined { + public dequeueLastPrioritizedTask (): Task | undefined { // Start from the last empty or partially filled bucket - return this.tasksQueue.dequeue(this.tasksQueue.buckets + 1) + return this.dequeueTask(this.tasksQueue.buckets + 1) } /** @inheritdoc */ @@ -197,7 +212,8 @@ export class WorkerNode type: getWorkerType(worker)!, dynamic: false, ready: false, - stealing: false + stealing: false, + backPressure: false } }