X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=src%2Fpools%2Fworker-node.ts;h=079557010565cc7a7e129d0f314caf4762eb7700;hb=0cd1f28c613a876bfedd3b70b987e37f91888ed0;hp=64400d9eb1426be9ffe546b45855b606bfec05c0;hpb=5d9133ae9ef83c5f69e26daf8dcfea584884d9c4;p=poolifier.git diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 64400d9e..07955701 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'node:events' import { MessageChannel } from 'node:worker_threads' -import { CircularArray } from '../circular-array.js' +import { CircularBuffer } from '../circular-buffer.js' import { PriorityQueue } from '../priority-queue.js' import type { Task } from '../utility-types.js' import { DEFAULT_TASK_NAME } from '../utils.js' @@ -15,6 +15,7 @@ import { type EventHandler, type IWorker, type IWorkerNode, + MeasurementHistorySize, type StrategyData, type WorkerInfo, type WorkerNodeOptions, @@ -45,7 +46,7 @@ export class WorkerNode /** @inheritdoc */ public tasksQueueBackPressureSize: number private readonly tasksQueue: PriorityQueue> - private onBackPressureStarted: boolean + private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map /** @@ -70,7 +71,7 @@ export class WorkerNode // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) - this.onBackPressureStarted = false + this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } @@ -82,17 +83,38 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask (task: Task): number { const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) - if (this.hasBackPressure() && !this.onBackPressureStarted) { - this.onBackPressureStarted = true + if ( + !this.setBackPressureFlag && + this.hasBackPressure() && + !this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = true this.emit('backPressure', { workerId: this.info.id }) - this.onBackPressureStarted = false + this.setBackPressureFlag = false } return tasksQueueSize } /** @inheritdoc */ public dequeueTask (bucket?: number): Task | undefined { - return this.tasksQueue.dequeue(bucket) + const task = this.tasksQueue.dequeue(bucket) + if ( + !this.setBackPressureFlag && + !this.hasBackPressure() && + this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = false + this.setBackPressureFlag = false + } + return task + } + + /** @inheritdoc */ + public dequeueLastPrioritizedTask (): Task | undefined { + // Start from the last empty or partially filled bucket + return this.dequeueTask(this.tasksQueue.buckets + 1) } /** @inheritdoc */ @@ -191,7 +213,8 @@ export class WorkerNode type: getWorkerType(worker)!, dynamic: false, ready: false, - stealing: false + stealing: false, + backPressure: false } } @@ -217,17 +240,17 @@ export class WorkerNode failed: 0 }, runTime: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) }, waitTime: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) }, elu: { idle: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) }, active: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) } } } @@ -260,17 +283,17 @@ export class WorkerNode failed: 0 }, runTime: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) }, waitTime: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) }, elu: { idle: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) }, active: { - history: new CircularArray() + history: new CircularBuffer(MeasurementHistorySize) } } }