From 97256a852ff3c5a6f85542fe749b831a8c600e30 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Tue, 17 Oct 2023 17:23:26 +0200 Subject: [PATCH] feat: handle worker node readyness in IWRR strategy MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 4 +++ .../abstract-worker-choice-strategy.ts | 10 ------- ...hted-round-robin-worker-choice-strategy.ts | 27 ++++++++++--------- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57cfceb4..2509e78a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix race condition at dynamic worker node task assignment and scheduled removal. See issue [#1468](https://github.com/poolifier/poolifier/issues/1468) and [#1496](https://github.com/poolifier/poolifier/issues/1496) + ## [3.0.1] - 2023-10-16 ### Fixed diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 75c7450d..7a64a7e5 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -126,16 +126,6 @@ export abstract class AbstractWorkerChoiceStrategy< return this.pool.workerNodes[workerNodeKey]?.info.ready ?? false } - /** - * Whether the worker node has back pressure or not (i.e. its tasks queue is full). - * - * @param workerNodeKey - The worker node key. - * @returns `true` if the worker node has back pressure, `false` otherwise. - */ - protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean { - return this.pool.hasWorkerNodeBackPressure(workerNodeKey) - } - /** * Gets the worker node task runtime. * If the task statistics require the average runtime, the average runtime is returned. diff --git a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts index b034cba1..15a6c7f4 100644 --- a/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts @@ -105,6 +105,7 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< const workerWeight = this.opts.weights?.[workerNodeKey] ?? this.defaultWorkerWeight if ( + this.isWorkerNodeReady(workerNodeKey) && workerWeight >= this.roundWeights[roundIndex] && this.workerNodeVirtualTaskRunTime < workerWeight ) { @@ -121,18 +122,20 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy< } private interleavedWeightedRoundRobinNextWorkerNodeId (): void { - if ( - this.roundId === this.roundWeights.length - 1 && - this.workerNodeId === this.pool.workerNodes.length - 1 - ) { - this.roundId = 0 - this.workerNodeId = 0 - } else if (this.workerNodeId === this.pool.workerNodes.length - 1) { - this.roundId = this.roundId + 1 - this.workerNodeId = 0 - } else { - this.workerNodeId = this.workerNodeId + 1 - } + do { + if ( + this.roundId === this.roundWeights.length - 1 && + this.workerNodeId === this.pool.workerNodes.length - 1 + ) { + this.roundId = 0 + this.workerNodeId = 0 + } else if (this.workerNodeId === this.pool.workerNodes.length - 1) { + this.roundId = this.roundId + 1 + this.workerNodeId = 0 + } else { + this.workerNodeId = this.workerNodeId + 1 + } + } while (!this.isWorkerNodeReady(this.workerNodeId)) } /** @inheritDoc */ -- 2.34.1