From: Jérôme Benoit Date: Fri, 18 Aug 2023 12:24:14 +0000 (+0200) Subject: refactor: cleanup worker node back pressure API X-Git-Tag: v2.6.29~18 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=e2b31e32498626103ef3c737bdffb285087b13e6;p=poolifier.git refactor: cleanup worker node back pressure API Signed-off-by: Jérôme Benoit --- diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index eb2c2f78..768114e4 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1242,6 +1242,17 @@ export abstract class AbstractPool< } } + /** @inheritDoc */ + public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + if ( + this.opts.enableTasksQueue === true && + this.workerNodes[workerNodeKey].hasBackPressure() + ) { + return true + } + return false + } + /** * Executes the given task on the worker given its worker node key. * @@ -1254,16 +1265,14 @@ export abstract class AbstractPool< } private enqueueTask (workerNodeKey: number, task: Task): number { - if ( - this.opts.enableTasksQueue === true && - this.workerNodes[workerNodeKey].hasBackPressure() - ) { + const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task) + if (this.hasWorkerNodeBackPressure(workerNodeKey)) { this.emitter?.emit(PoolEvents.backPressure, { workerId: this.getWorkerInfo(workerNodeKey).id, ...this.info }) } - return this.workerNodes[workerNodeKey].enqueueTask(task) + return tasksQueueSize } private dequeueTask (workerNodeKey: number): Task | undefined { diff --git a/src/pools/pool.ts b/src/pools/pool.ts index 6f3d9561..a0d3a941 100644 --- a/src/pools/pool.ts +++ b/src/pools/pool.ts @@ -180,6 +180,13 @@ export interface IPool< * Pool worker nodes. */ readonly workerNodes: Array> + /** + * Whether the worker node has back pressure (i.e. its tasks queue is full). + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node has back pressure, `false` otherwise. + */ + readonly hasWorkerNodeBackPressure: (workerNodeKey: number) => boolean /** * Emitter on which events can be listened to. * diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 9c20c5be..d4fcd8d0 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -114,6 +114,16 @@ export abstract class AbstractWorkerChoiceStrategy< return this.pool.workerNodes[workerNodeKey].info.ready } + /** + * Whether the worker node has back pressure or not (i.e. its tasks queue is full). + * + * @param workerNodeKey - The worker node key. + * @returns `true` if the worker node has back pressure, `false` otherwise. + */ + protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + return this.pool.hasWorkerNodeBackPressure(workerNodeKey) + } + /** * Gets the worker task runtime. * If the task statistics require the average runtime, the average runtime is returned. diff --git a/src/pools/worker-node.ts b/src/pools/worker-node.ts index 64880fcb..812d1444 100644 --- a/src/pools/worker-node.ts +++ b/src/pools/worker-node.ts @@ -30,7 +30,7 @@ implements IWorkerNode { public usage: WorkerUsage private readonly tasksUsage: Map private readonly tasksQueue: Queue> - private readonly tasksQueueBackPressureMaxSize: number + private readonly tasksQueueBackPressureSize: number /** * Constructs a new worker node. @@ -48,7 +48,7 @@ implements IWorkerNode { this.usage = this.initWorkerUsage() this.tasksUsage = new Map() this.tasksQueue = new Queue>() - this.tasksQueueBackPressureMaxSize = Math.pow(poolMaxSize, 2) + this.tasksQueueBackPressureSize = Math.pow(poolMaxSize, 2) } /** @inheritdoc */ @@ -82,7 +82,7 @@ implements IWorkerNode { /** @inheritdoc */ public hasBackPressure (): boolean { - return this.tasksQueueSize() >= this.tasksQueueBackPressureMaxSize + return this.tasksQueueSize() >= this.tasksQueueBackPressureSize } /** @inheritdoc */ diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 3c44f05e..d6c23659 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -243,7 +243,7 @@ export interface IWorkerNode { */ readonly clearTasksQueue: () => void /** - * Whether the worker node has back pressure. + * Whether the worker node has back pressure (i.e. its tasks queue is full). * * @returns `true` if the worker node has back pressure, `false` otherwise. */