fix: ensure worker removal impact is propated to worker choice strategy
authorJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Apr 2023 17:09:06 +0000 (19:09 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Mon, 3 Apr 2023 17:09:06 +0000 (19:09 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
12 files changed:
CHANGELOG.md
src/pools/abstract-pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/dynamic-pool-worker-choice-strategy.ts
src/pools/selection-strategies/fair-share-worker-choice-strategy.ts
src/pools/selection-strategies/less-busy-worker-choice-strategy.ts
src/pools/selection-strategies/less-used-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/selection-strategies/selection-strategies.test.js

index 79788cf7c2cee98d8735eb1fd5efa41111050f8e..55c8239e280308b20465523f8d400e6c78d288c8 100644 (file)
@@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 
 - Ensure trimmable characters are checked at pool initialization.
 - Fix message id integer overflow.
+- Fix pool worker removal in worker choice strategy internals.
 
 ## [2.3.10] - 2023-03-18
 
index 172feda0555cac95e453860ebddda218e5a5755d..095a4f7b99840ff1c0ff9074d6cd66dca2efa1d0 100644 (file)
@@ -260,10 +260,7 @@ export abstract class AbstractPool<
     if (message.error != null) {
       ++workerTasksUsage.error
     }
-    if (
-      this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
-    ) {
+    if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
       workerTasksUsage.runTime += message.taskRunTime ?? 0
       if (workerTasksUsage.run !== 0) {
         workerTasksUsage.avgRunTime =
@@ -278,7 +275,9 @@ export abstract class AbstractPool<
    * @param worker - The worker that will be removed.
    */
   protected removeWorker (worker: Worker): void {
-    this.workers.splice(this.getWorkerKey(worker), 1)
+    const workerKey = this.getWorkerKey(worker)
+    this.workers.splice(workerKey, 1)
+    this.workerChoiceStrategyContext.remove(workerKey)
   }
 
   /**
index 5b526331824be72af5048eb8c76fe26b001a373a..b181273375777830ce5c2e5ebf1a504227994dcd 100644 (file)
@@ -41,4 +41,7 @@ export abstract class AbstractWorkerChoiceStrategy<
 
   /** {@inheritDoc} */
   public abstract choose (): number
+
+  /** {@inheritDoc} */
+  public abstract remove (workerKey: number): boolean
 }
index c122c2b11e418aa0da2e2cdef34c344e520be163..1658d176e961eda3295547f55e7050c5a8478143 100644 (file)
@@ -61,4 +61,9 @@ export class DynamicPoolWorkerChoiceStrategy<
     // All workers are busy, create a new worker
     return this.createWorkerCallback()
   }
+
+  /** {@inheritDoc} */
+  public remove (workerKey: number): boolean {
+    return this.workerChoiceStrategy.remove(workerKey)
+  }
 }
index af8eb75d03995d1eb37eeb8daf8d44a8cb6d9d70..dbe10ce7181355fbf56a35c42551721f592efba2 100644 (file)
@@ -60,6 +60,17 @@ export class FairShareWorkerChoiceStrategy<
     return chosenWorkerKey
   }
 
+  /** {@inheritDoc} */
+  public remove (workerKey: number): boolean {
+    const workerDeleted = this.workerLastVirtualTaskTimestamp.delete(workerKey)
+    for (const [key, value] of this.workerLastVirtualTaskTimestamp.entries()) {
+      if (key > workerKey) {
+        this.workerLastVirtualTaskTimestamp.set(key - 1, value)
+      }
+    }
+    return workerDeleted
+  }
+
   /**
    * Computes worker last virtual task timestamp.
    *
index 2bbafb59ec5b07dfca9001f587cb4878af87dca6..f2e401625ddd3764cd646d5cd931d23415f5f648 100644 (file)
@@ -39,4 +39,9 @@ export class LessBusyWorkerChoiceStrategy<
     }
     return lessBusyWorkerKey
   }
+
+  /** {@inheritDoc} */
+  public remove (workerKey: number): boolean {
+    return true
+  }
 }
index 2ed58c96eb3596dfbfafd6416b82b2e51f7e5a31..9cae4a54ae336204e4ab00f29b7789c280101682 100644 (file)
@@ -34,4 +34,9 @@ export class LessUsedWorkerChoiceStrategy<
     }
     return lessUsedWorkerKey
   }
+
+  /** {@inheritDoc} */
+  public remove (workerKey: number): boolean {
+    return true
+  }
 }
index e265172fad11beede47047155d360b38c5cff4a0..3880ca391ac4ed980abda24a8740282e750a9b81 100644 (file)
@@ -33,4 +33,15 @@ export class RoundRobinWorkerChoiceStrategy<
         : this.nextWorkerId + 1
     return chosenWorkerKey
   }
+
+  /** {@inheritDoc} */
+  public remove (workerKey: number): boolean {
+    if (this.nextWorkerId === workerKey) {
+      this.nextWorkerId =
+        this.nextWorkerId > this.pool.workers.length - 1
+          ? this.pool.workers.length - 1
+          : this.nextWorkerId
+    }
+    return true
+  }
 }
index a05fd9b784c9da682cd0e270f09b81ce41325d70..c098768533fcced20246044d5411ad1ebb7bb76c 100644 (file)
@@ -56,4 +56,10 @@ export interface IWorkerChoiceStrategy {
    * Chooses a worker in the pool and returns its key.
    */
   choose: () => number
+  /**
+   * Removes a worker reference from strategy internals.
+   *
+   * @param workerKey - The worker key.
+   */
+  remove: (workerKey: number) => boolean
 }
index e3d2be3f3e1c20d158d70dc0e85943e59bfeac4c..24a25b4d30de7fa553f500615b5b5e8e06efd972 100644 (file)
@@ -93,6 +93,23 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
     return chosenWorkerKey
   }
 
+  /** {@inheritDoc} */
+  public remove (workerKey: number): boolean {
+    if (this.currentWorkerId === workerKey) {
+      this.currentWorkerId =
+        this.currentWorkerId > this.pool.workers.length - 1
+          ? this.pool.workers.length - 1
+          : this.currentWorkerId
+    }
+    const workerDeleted = this.workersTaskRunTime.delete(workerKey)
+    for (const [key, value] of this.workersTaskRunTime) {
+      if (key > workerKey) {
+        this.workersTaskRunTime.set(key - 1, value)
+      }
+    }
+    return workerDeleted
+  }
+
   private initWorkersTaskRunTime (): void {
     for (const [index] of this.pool.workers.entries()) {
       this.initWorkerTaskRunTime(index)
index c149cc2e9b48a7a6dae62466aea312fd9054e6dc..011e00db2060b08dffa193b28be907b35b2eb64c 100644 (file)
@@ -4,6 +4,7 @@ import type { IPoolWorker } from '../pool-worker'
 import { DynamicPoolWorkerChoiceStrategy } from './dynamic-pool-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
+  RequiredStatistics,
   WorkerChoiceStrategy
 } from './selection-strategies-types'
 import { WorkerChoiceStrategies } from './selection-strategies-types'
@@ -61,11 +62,21 @@ export class WorkerChoiceStrategyContext<
    * Gets the worker choice strategy used in the context.
    *
    * @returns The worker choice strategy.
+   * @deprecated Scheduled removal.
    */
   public getWorkerChoiceStrategy (): IWorkerChoiceStrategy {
     return this.workerChoiceStrategy
   }
 
+  /**
+   * Gets the worker choice strategy required statistics.
+   *
+   * @returns The required statistics.
+   */
+  public getRequiredStatistics (): RequiredStatistics {
+    return this.workerChoiceStrategy.requiredStatistics
+  }
+
   /**
    * Sets the worker choice strategy to use in the context.
    *
@@ -87,4 +98,14 @@ export class WorkerChoiceStrategyContext<
   public execute (): number {
     return this.workerChoiceStrategy.choose()
   }
+
+  /**
+   * Removes a worker in the underlying selection strategy internals.
+   *
+   * @param workerKey - The key of the worker to remove.
+   * @returns `true` if the removal is successful, `false` otherwise.
+   */
+  public remove (workerKey: number): boolean {
+    return this.workerChoiceStrategy.remove(workerKey)
+  }
 }
index a62571797c08010ef6da62b947c5a81e67af1e84..d09dde6f636f29cb3839888889cd0e9a0dd5979f 100644 (file)
@@ -70,8 +70,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -81,8 +80,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.ROUND_ROBIN)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -213,8 +211,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(false)
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -224,8 +221,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_USED)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(false)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -297,8 +293,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(true)
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -308,8 +303,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.LESS_BUSY)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(true)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -395,8 +389,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(true)
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -406,8 +399,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.FAIR_SHARE)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(true)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -425,6 +417,10 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .workerLastVirtualTaskTimestamp.size
+    ).toBe(pool.workers.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -442,6 +438,10 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    // expect(
+    //   pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+    //     .workerChoiceStrategy.workerLastVirtualTaskTimestamp.size
+    // ).toBe(pool.workers.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -555,8 +555,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(true)
     await pool.destroy()
     pool = new DynamicThreadPool(
@@ -566,8 +565,7 @@ describe('Selection strategies test suite', () => {
     )
     pool.setWorkerChoiceStrategy(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN)
     expect(
-      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
-        .requiredStatistics.runTime
+      pool.workerChoiceStrategyContext.getRequiredStatistics().runTime
     ).toBe(true)
     // We need to clean up the resources after our test
     await pool.destroy()
@@ -585,6 +583,10 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    expect(
+      pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+        .workersTaskRunTime.size
+    ).toBe(pool.workers.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })
@@ -602,6 +604,10 @@ describe('Selection strategies test suite', () => {
       promises.push(pool.execute())
     }
     await Promise.all(promises)
+    // expect(
+    //   pool.workerChoiceStrategyContext.getWorkerChoiceStrategy()
+    //     .workerChoiceStrategy.workersTaskRunTime.size
+    // ).toBe(pool.workers.length)
     // We need to clean up the resources after our test
     await pool.destroy()
   })