From: Jérôme Benoit Date: Tue, 17 Oct 2023 14:42:34 +0000 (+0200) Subject: fix: ensure a dynamic scheduled for removal can't be used X-Git-Tag: v3.0.2~1^2~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=ae3ab61d14adc1f0511e8dea1f69a1d44e7effdf;p=poolifier.git fix: ensure a dynamic scheduled for removal can't be used closes #1468 and #1468 Signed-off-by: Jérôme Benoit --- diff --git a/README.md b/README.md index 6db467fd..9e441e23 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ [![Javascript Standard Style Guide]()](https://standardjs.com) [![Discord](https://badgen.net/discord/online-members/vXxZhyb3b6?icon=discord&label=discord&color=green)](https://discord.gg/vXxZhyb3b6) [![Open Collective](https://opencollective.com/poolifier/tiers/badge.svg)](https://opencollective.com/poolifier) -[![PRs Welcome](https://badgen.net/static/PRs/welcome/green)](http://makeapullrequest.com) +[![PRs Welcome](https://badgen.net/static/PRs/welcome/green)](https://makeapullrequest.com) [![No Dependencies]()]() diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 8e0a259a..9601564f 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1276,6 +1276,8 @@ export abstract class AbstractPool< workerUsage.tasks.executing === 0 && this.tasksQueueSize(localWorkerNodeKey) === 0))) ) { + // Flag the worker as not ready immediately + this.flagWorkerNodeAsNotReady(localWorkerNodeKey) this.destroyWorkerNode(localWorkerNodeKey).catch(error => { this.emitter?.emit(PoolEvents.error, error) }) @@ -1641,6 +1643,10 @@ export abstract class AbstractPool< } } + protected flagWorkerNodeAsNotReady (workerNodeKey: number): void { + this.getWorkerInfo(workerNodeKey).ready = false + } + /** @inheritDoc */ public hasWorkerNodeBackPressure (workerNodeKey: number): boolean { return ( diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 68ae5005..5f067bd7 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -61,6 +61,7 @@ export class FixedClusterPool< /** @inheritDoc */ protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) this.flushTasksQueue(workerNodeKey) // FIXME: wait for tasks to be finished const workerNode = this.workerNodes[workerNodeKey] diff --git a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts index 68e09238..75c7450d 100644 --- a/src/pools/selection-strategies/abstract-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/abstract-worker-choice-strategy.ts @@ -122,8 +122,8 @@ export abstract class AbstractWorkerChoiceStrategy< * @param workerNodeKey - The worker node key. * @returns Whether the worker node is ready or not. */ - private isWorkerNodeReady (workerNodeKey: number): boolean { - return this.pool.workerNodes[workerNodeKey]?.info.ready + protected isWorkerNodeReady (workerNodeKey: number): boolean { + return this.pool.workerNodes[workerNodeKey]?.info.ready ?? false } /** @@ -132,26 +132,10 @@ export abstract class AbstractWorkerChoiceStrategy< * @param workerNodeKey - The worker node key. * @returns `true` if the worker node has back pressure, `false` otherwise. */ - private hasWorkerNodeBackPressure (workerNodeKey: number): boolean { + protected hasWorkerNodeBackPressure (workerNodeKey: number): boolean { return this.pool.hasWorkerNodeBackPressure(workerNodeKey) } - /** - * Whether the worker node is eligible or not. - * A worker node is eligible if it is ready and does not have back pressure. - * - * @param workerNodeKey - The worker node key. - * @returns `true` if the worker node is eligible, `false` otherwise. - * @see {@link isWorkerNodeReady} - * @see {@link hasWorkerNodeBackPressure} - */ - protected isWorkerNodeEligible (workerNodeKey: number): boolean { - return ( - this.isWorkerNodeReady(workerNodeKey) && - !this.hasWorkerNodeBackPressure(workerNodeKey) - ) - } - /** * Gets the worker node task runtime. * If the task statistics require the average runtime, the average runtime is returned. @@ -203,15 +187,6 @@ export abstract class AbstractWorkerChoiceStrategy< this.previousWorkerNodeKey = workerNodeKey ?? this.previousWorkerNodeKey } - /** - * Check the next worker node eligibility. - */ - protected checkNextWorkerNodeEligibility (): void { - if (!this.isWorkerNodeEligible(this.nextWorkerNodeKey as number)) { - delete this.nextWorkerNodeKey - } - } - protected computeDefaultWorkerWeight (): number { let cpusCycleTimeWeight = 0 for (const cpu of cpus()) { diff --git a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts index a337278b..f47a1e6e 100644 --- a/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/fair-share-worker-choice-strategy.ts @@ -89,9 +89,10 @@ export class FairShareWorkerChoiceStrategy< this.computeWorkerNodeVirtualTaskEndTimestamp(workerNodeKey) } } - return (workerNode.strategyData.virtualTaskEndTimestamp as number) < - ((workerNodes[minWorkerNodeKey].strategyData as StrategyData) - .virtualTaskEndTimestamp as number) + return this.isWorkerNodeReady(workerNodeKey) && + (workerNode.strategyData.virtualTaskEndTimestamp as number) < + ((workerNodes[minWorkerNodeKey].strategyData as StrategyData) + .virtualTaskEndTimestamp as number) ? workerNodeKey : minWorkerNodeKey }, diff --git a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts index 984340b5..81cadde1 100644 --- a/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-busy-worker-choice-strategy.ts @@ -74,10 +74,11 @@ export class LeastBusyWorkerChoiceStrategy< private leastBusyNextWorkerNodeKey (): number | undefined { return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return (workerNode.usage.runTime.aggregate ?? 0) + - (workerNode.usage.waitTime.aggregate ?? 0) < - (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) + - (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) + return this.isWorkerNodeReady(workerNodeKey) && + (workerNode.usage.runTime.aggregate ?? 0) + + (workerNode.usage.waitTime.aggregate ?? 0) < + (workerNodes[minWorkerNodeKey].usage.runTime.aggregate ?? 0) + + (workerNodes[minWorkerNodeKey].usage.waitTime.aggregate ?? 0) ? workerNodeKey : minWorkerNodeKey }, diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts index 0fe9cbec..2ad4e90d 100644 --- a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts @@ -70,8 +70,9 @@ export class LeastEluWorkerChoiceStrategy< private leastEluNextWorkerNodeKey (): number | undefined { return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return (workerNode.usage.elu.active.aggregate ?? 0) < - (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0) + return this.isWorkerNodeReady(workerNodeKey) && + (workerNode.usage.elu.active.aggregate ?? 0) < + (workerNodes[minWorkerNodeKey].usage.elu.active.aggregate ?? 0) ? workerNodeKey : minWorkerNodeKey }, diff --git a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts index dc249e65..1bd8d059 100644 --- a/src/pools/selection-strategies/least-used-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/least-used-worker-choice-strategy.ts @@ -55,12 +55,13 @@ export class LeastUsedWorkerChoiceStrategy< private leastUsedNextWorkerNodeKey (): number | undefined { return this.pool.workerNodes.reduce( (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => { - return workerNode.usage.tasks.executed + - workerNode.usage.tasks.executing + - workerNode.usage.tasks.queued < - workerNodes[minWorkerNodeKey].usage.tasks.executed + - workerNodes[minWorkerNodeKey].usage.tasks.executing + - workerNodes[minWorkerNodeKey].usage.tasks.queued + return this.isWorkerNodeReady(workerNodeKey) && + workerNode.usage.tasks.executed + + workerNode.usage.tasks.executing + + workerNode.usage.tasks.queued < + workerNodes[minWorkerNodeKey].usage.tasks.executed + + workerNodes[minWorkerNodeKey].usage.tasks.executing + + workerNodes[minWorkerNodeKey].usage.tasks.queued ? workerNodeKey : minWorkerNodeKey }, diff --git a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts index aa3ee108..19dd3872 100644 --- a/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/round-robin-worker-choice-strategy.ts @@ -70,10 +70,12 @@ export class RoundRobinWorkerChoiceStrategy< } private roundRobinNextWorkerNodeKey (): number | undefined { - this.nextWorkerNodeKey = - this.nextWorkerNodeKey === this.pool.workerNodes.length - 1 - ? 0 - : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 + do { + this.nextWorkerNodeKey = + this.nextWorkerNodeKey === this.pool.workerNodes.length - 1 + ? 0 + : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 + } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey)) return this.nextWorkerNodeKey } } diff --git a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts index b9b74b6c..22bf458f 100644 --- a/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts +++ b/src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts @@ -95,23 +95,25 @@ export class WeightedRoundRobinWorkerChoiceStrategy< } private weightedRoundRobinNextWorkerNodeKey (): number | undefined { - const workerWeight = - this.opts.weights?.[ - this.nextWorkerNodeKey ?? this.previousWorkerNodeKey - ] ?? this.defaultWorkerWeight - if (this.workerNodeVirtualTaskRunTime < workerWeight) { - this.workerNodeVirtualTaskRunTime = - this.workerNodeVirtualTaskRunTime + - this.getWorkerNodeTaskRunTime( + do { + const workerWeight = + this.opts.weights?.[ this.nextWorkerNodeKey ?? this.previousWorkerNodeKey - ) - } else { - this.nextWorkerNodeKey = - this.nextWorkerNodeKey === this.pool.workerNodes.length - 1 - ? 0 - : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 - this.workerNodeVirtualTaskRunTime = 0 - } + ] ?? this.defaultWorkerWeight + if (this.workerNodeVirtualTaskRunTime < workerWeight) { + this.workerNodeVirtualTaskRunTime = + this.workerNodeVirtualTaskRunTime + + this.getWorkerNodeTaskRunTime( + this.nextWorkerNodeKey ?? this.previousWorkerNodeKey + ) + } else { + this.nextWorkerNodeKey = + this.nextWorkerNodeKey === this.pool.workerNodes.length - 1 + ? 0 + : (this.nextWorkerNodeKey ?? this.previousWorkerNodeKey) + 1 + this.workerNodeVirtualTaskRunTime = 0 + } + } while (!this.isWorkerNodeReady(this.nextWorkerNodeKey as number)) return this.nextWorkerNodeKey } } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 7d47e81a..bf111d39 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -58,6 +58,7 @@ export class FixedThreadPool< /** @inheritDoc */ protected async destroyWorkerNode (workerNodeKey: number): Promise { + this.flagWorkerNodeAsNotReady(workerNodeKey) this.flushTasksQueue(workerNodeKey) // FIXME: wait for tasks to be finished const workerNode = this.workerNodes[workerNodeKey] @@ -80,7 +81,7 @@ export class FixedThreadPool< transferList?: TransferListItem[] ): void { ( - this.workerNodes[workerNodeKey].messageChannel as MessageChannel + this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel ).port1.postMessage( { ...message, workerId: this.getWorkerInfo(workerNodeKey).id }, transferList @@ -108,7 +109,7 @@ export class FixedThreadPool< listener: (message: MessageValue) => void ): void { ( - this.workerNodes[workerNodeKey].messageChannel as MessageChannel + this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel ).port1.on('message', listener) } @@ -118,7 +119,7 @@ export class FixedThreadPool< listener: (message: MessageValue) => void ): void { ( - this.workerNodes[workerNodeKey].messageChannel as MessageChannel + this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel ).port1.once('message', listener) } @@ -128,7 +129,7 @@ export class FixedThreadPool< listener: (message: MessageValue) => void ): void { ( - this.workerNodes[workerNodeKey].messageChannel as MessageChannel + this.workerNodes[workerNodeKey]?.messageChannel as MessageChannel ).port1.off('message', listener) } diff --git a/src/pools/worker.ts b/src/pools/worker.ts index 43e7fc60..03cecce8 100644 --- a/src/pools/worker.ts +++ b/src/pools/worker.ts @@ -240,7 +240,7 @@ export interface IWorkerNode readonly usage: WorkerUsage /** * Worker choice strategy data. - * This is used to store data that is specific to the worker choice strategy. + * This is used to store data that are specific to the worker choice strategy. */ strategyData?: StrategyData /**