X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;ds=sidebyside;f=src%2Fpools%2Fworker-node.ts;h=772a839b1c077d23ae53f73b7a1475daa4c8bddc;hb=2eee72204bc851f616ded11cb5381f96c6dc5cbf;hp=a2878d8980e4c300a7dc0712c7fbbbe971868f27;hpb=31847469b406e46688d8aafb880e250706dd8aee;p=poolifier.git diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index a2878d89..772a839b 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'node:events' import { MessageChannel } from 'node:worker_threads' import { CircularArray } from '../circular-array.js' -import { Deque } from '../deque.js' +import { PriorityQueue } from '../priority-queue.js' import type { Task } from '../utility-types.js' import { DEFAULT_TASK_NAME } from '../utils.js' import { @@ -44,8 +44,8 @@ export class WorkerNode public messageChannel?: MessageChannel /** @inheritdoc */ public tasksQueueBackPressureSize: number - private readonly tasksQueue: Deque> - private onBackPressureStarted: boolean + private readonly tasksQueue: PriorityQueue> + private setBackPressureFlag: boolean private readonly taskFunctionsUsage: Map /** @@ -69,8 +69,8 @@ export class WorkerNode } // eslint-disable-next-line @typescript-eslint/no-non-null-assertion this.tasksQueueBackPressureSize = opts.tasksQueueBackPressureSize! - this.tasksQueue = new Deque>() - this.onBackPressureStarted = false + this.tasksQueue = new PriorityQueue>(opts.tasksQueueBucketSize) + this.setBackPressureFlag = false this.taskFunctionsUsage = new Map() } @@ -81,34 +81,39 @@ export class WorkerNode /** @inheritdoc */ public enqueueTask (task: Task): number { - const tasksQueueSize = this.tasksQueue.push(task) - if (this.hasBackPressure() && !this.onBackPressureStarted) { - this.onBackPressureStarted = true + const tasksQueueSize = this.tasksQueue.enqueue(task, task.priority) + if ( + !this.setBackPressureFlag && + this.hasBackPressure() && + !this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = true this.emit('backPressure', { workerId: this.info.id }) - this.onBackPressureStarted = false } + this.setBackPressureFlag = false return tasksQueueSize } /** @inheritdoc */ - public unshiftTask (task: Task): number { - const tasksQueueSize = this.tasksQueue.unshift(task) - if (this.hasBackPressure() && !this.onBackPressureStarted) { - this.onBackPressureStarted = true - this.emit('backPressure', { workerId: this.info.id }) - this.onBackPressureStarted = false + public dequeueTask (bucket?: number): Task | undefined { + const task = this.tasksQueue.dequeue(bucket) + if ( + !this.setBackPressureFlag && + !this.hasBackPressure() && + this.info.backPressure + ) { + this.setBackPressureFlag = true + this.info.backPressure = false } - return tasksQueueSize + this.setBackPressureFlag = false + return task } /** @inheritdoc */ - public dequeueTask (): Task | undefined { - return this.tasksQueue.shift() - } - - /** @inheritdoc */ - public popTask (): Task | undefined { - return this.tasksQueue.pop() + public dequeueLastPrioritizedTask (): Task | undefined { + // Start from the last empty or partially filled bucket + return this.dequeueTask(this.tasksQueue.buckets + 1) } /** @inheritdoc */ @@ -121,12 +126,6 @@ export class WorkerNode return this.tasksQueue.size >= this.tasksQueueBackPressureSize } - /** @inheritdoc */ - public resetUsage (): void { - this.usage = this.initWorkerUsage() - this.taskFunctionsUsage.clear() - } - /** @inheritdoc */ public async terminate (): Promise { const waitWorkerExit = new Promise(resolve => { @@ -213,6 +212,7 @@ export class WorkerNode type: getWorkerType(worker)!, dynamic: false, ready: false, + backPressure: false, stealing: false } }