fix: ensure newly created worker is used only if needed
authorJérôme Benoit <jerome.benoit@sap.com>
Sun, 11 Jun 2023 22:07:22 +0000 (00:07 +0200)
committerJérôme Benoit <jerome.benoit@sap.com>
Sun, 11 Jun 2023 22:07:22 +0000 (00:07 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
src/pools/abstract-pool.ts
src/pools/pool.ts
src/pools/selection-strategies/abstract-worker-choice-strategy.ts
src/pools/selection-strategies/interleaved-weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/weighted-round-robin-worker-choice-strategy.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/selection-strategies/selection-strategies.test.js

index c3565446f254c0292978f1f3eed1c23c1b1ac267..2b59abf27cfa2a37a518610afce2998d2dff1c6a 100644 (file)
@@ -379,6 +379,11 @@ export abstract class AbstractPool<
    */
   protected abstract get busy (): boolean
 
+  /**
+   * Whether worker nodes are executing at least one task.
+   *
+   * @returns Worker nodes busyness boolean status.
+   */
   protected internalBusy (): boolean {
     return (
       this.workerNodes.findIndex(workerNode => {
@@ -434,7 +439,7 @@ export abstract class AbstractPool<
   }
 
   /**
-   * Shutdowns the given worker.
+   * Terminates the given worker.
    *
    * @param worker - A worker within `workerNodes`.
    */
@@ -609,18 +614,28 @@ export abstract class AbstractPool<
   /**
    * Chooses a worker node for the next task.
    *
-   * The default worker choice strategy uses a round robin algorithm to distribute the load.
+   * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
    *
    * @returns The worker node key
    */
-  protected chooseWorkerNode (): number {
+  private chooseWorkerNode (): number {
     if (this.shallCreateDynamicWorker()) {
-      return this.getWorkerNodeKey(this.createAndSetupDynamicWorker())
+      const worker = this.createAndSetupDynamicWorker()
+      if (
+        this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
+      ) {
+        return this.getWorkerNodeKey(worker)
+      }
     }
     return this.workerChoiceStrategyContext.execute()
   }
 
-  protected shallCreateDynamicWorker (): boolean {
+  /**
+   * Conditions for dynamic worker creation.
+   *
+   * @returns Whether to create a dynamic worker or not.
+   */
+  private shallCreateDynamicWorker (): boolean {
     return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
   }
 
@@ -646,7 +661,9 @@ export abstract class AbstractPool<
   >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
   /**
-   * Returns a newly created worker.
+   * Creates a newly worker.
+   *
+   * @returns Newly created worker.
    */
   protected abstract createWorker (): Worker
 
index 33c3ff3fd779aa63119c75886d35173c0ae390eb..257b0d64b0844d664ba00763b1dcde79c4711ee7 100644 (file)
@@ -188,7 +188,7 @@ export interface IPool<
    */
   execute: (data?: Data, name?: string) => Promise<Response>
   /**
-   * Shutdowns every current worker in this pool.
+   * Terminate every current worker in this pool.
    */
   destroy: () => Promise<void>
   /**
index d9a573417ec2f96c0cbf68bc88a6931aeb68de29..a060cea3a5ba4486059fec1f6b45dac515f155e3 100644 (file)
@@ -4,6 +4,7 @@ import type { IPool } from '../pool'
 import type { IWorker } from '../worker'
 import type {
   IWorkerChoiceStrategy,
+  StrategyPolicy,
   TaskStatisticsRequirements,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
@@ -24,6 +25,12 @@ export abstract class AbstractWorkerChoiceStrategy<
    * Toggles finding the last free worker node key.
    */
   private toggleFindLastFreeWorkerNodeKey: boolean = false
+
+  /** @inheritDoc */
+  public readonly strategyPolicy: StrategyPolicy = {
+    useDynamicWorker: false
+  }
+
   /** @inheritDoc */
   public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
     runTime: {
index 7d77a21db5340bcd68a72e731f07ef01de76898b..deaa48815e97f0b0d9fc8e42ffc845c6fa118982 100644 (file)
@@ -4,6 +4,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
+  StrategyPolicy,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -21,6 +22,11 @@ export class InterleavedWeightedRoundRobinWorkerChoiceStrategy<
   >
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
+  /** @inheritDoc */
+  public readonly strategyPolicy: StrategyPolicy = {
+    useDynamicWorker: true
+  }
+
   /**
    * Worker node id where the current task will be submitted.
    */
index ea174552685de1ba859a879b893061fe91798de5..936925313f99e9bafa6c3bb8c0a71f7ad0c94fad 100644 (file)
@@ -4,6 +4,7 @@ import type { IWorker } from '../worker'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
+  StrategyPolicy,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
 
@@ -21,6 +22,11 @@ export class RoundRobinWorkerChoiceStrategy<
   >
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
+  /** @inheritDoc */
+  public readonly strategyPolicy: StrategyPolicy = {
+    useDynamicWorker: true
+  }
+
   /**
    * Id of the next worker node.
    */
index b0b036ee9069893098507e91a6913e1948a2c82a..52e287e72dbf9592b28f181e209e3bb14a586e8d 100644 (file)
@@ -140,10 +140,24 @@ export interface TaskStatisticsRequirements {
   elu: MeasurementStatisticsRequirements
 }
 
+/**
+ * Strategy policy.
+ */
+export interface StrategyPolicy {
+  /**
+   * Expect direct usage of dynamic worker.
+   */
+  useDynamicWorker: boolean
+}
+
 /**
  * Worker choice strategy interface.
  */
 export interface IWorkerChoiceStrategy {
+  /**
+   * Strategy policy.
+   */
+  readonly strategyPolicy: StrategyPolicy
   /**
    * Tasks statistics requirements.
    */
index 1ce6041920cfa634071093561eabc32e405272d2..618b5065e01bd33a91fc9a170388ee063f2e8fcb 100644 (file)
@@ -4,6 +4,7 @@ import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
 import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
+  StrategyPolicy,
   TaskStatisticsRequirements,
   WorkerChoiceStrategyOptions
 } from './selection-strategies-types'
@@ -23,6 +24,11 @@ export class WeightedRoundRobinWorkerChoiceStrategy<
   >
   extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
   implements IWorkerChoiceStrategy {
+  /** @inheritDoc */
+  public readonly strategyPolicy: StrategyPolicy = {
+    useDynamicWorker: true
+  }
+
   /** @inheritDoc */
   public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
     runTime: {
index a0ba25838dc524ea678c97701e20ac7aa1f7b413..e1332b85f150d30395cff0938371290765032f1f 100644 (file)
@@ -9,6 +9,7 @@ import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy
 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
+  StrategyPolicy,
   TaskStatisticsRequirements,
   WorkerChoiceStrategy,
   WorkerChoiceStrategyOptions
@@ -104,6 +105,19 @@ export class WorkerChoiceStrategyContext<
     ])
   }
 
+  /**
+   * Gets the strategy policy in the context.
+   *
+   * @returns The strategy policy.
+   */
+  public getStrategyPolicy (): StrategyPolicy {
+    return (
+      this.workerChoiceStrategies.get(
+        this.workerChoiceStrategy
+      ) as IWorkerChoiceStrategy
+    ).strategyPolicy
+  }
+
   /**
    * Gets the worker choice strategy task statistics requirements in the context.
    *
index e9376281efbe66dec0a5f193b03a9fc02bf039fd..0df558deb71865434c57b2d3029f21222aa28b26 100644 (file)
@@ -115,7 +115,31 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+  it('Verify ROUND_ROBIN strategy default policy', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: true
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: true
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify ROUND_ROBIN strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
     let pool = new FixedThreadPool(
       max,
@@ -356,7 +380,31 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify LEAST_USED strategy default tasks usage statistics requirements', async () => {
+  it('Verify LEAST_USED strategy default policy', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify LEAST_USED strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_USED
     let pool = new FixedThreadPool(
       max,
@@ -521,7 +569,31 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify LEAST_BUSY strategy default tasks usage statistics requirements', async () => {
+  it('Verify LEAST_BUSY strategy default policy', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify LEAST_BUSY strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_BUSY
     let pool = new FixedThreadPool(
       max,
@@ -626,7 +698,7 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
       expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
@@ -700,7 +772,31 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => {
+  it('Verify LEAST_ELU strategy default policy', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify LEAST_ELU strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
     let pool = new FixedThreadPool(
       max,
@@ -877,7 +973,31 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
+  it('Verify FAIR_SHARE strategy default policy', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: false
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify FAIR_SHARE strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     let pool = new FixedThreadPool(
       max,
@@ -1013,7 +1133,7 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.workerUsage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           failed: 0
@@ -1046,6 +1166,10 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
+      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
       expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
       expect(workerNode.workerUsage.runTime.average).toBeGreaterThan(0)
       expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
@@ -1082,7 +1206,7 @@ describe('Selection strategies test suite', () => {
     for (const workerNode of pool.workerNodes) {
       expect(workerNode.workerUsage).toStrictEqual({
         tasks: {
-          executed: maxMultiplier,
+          executed: expect.any(Number),
           executing: 0,
           queued: 0,
           failed: 0
@@ -1115,6 +1239,10 @@ describe('Selection strategies test suite', () => {
           utilization: expect.any(Number)
         }
       })
+      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
       expect(workerNode.workerUsage.runTime.aggregate).toBeGreaterThan(0)
       expect(workerNode.workerUsage.runTime.median).toBeGreaterThan(0)
       expect(workerNode.workerUsage.elu.utilization).toBeGreaterThanOrEqual(0)
@@ -1203,7 +1331,31 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+  it('Verify WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: true
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: true
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN
     let pool = new FixedThreadPool(
       max,
@@ -1379,7 +1531,7 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
@@ -1455,7 +1607,7 @@ describe('Selection strategies test suite', () => {
           utilization: 0
         }
       })
-      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThan(0)
+      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
       expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
         max * maxMultiplier
       )
@@ -1554,7 +1706,32 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks usage statistics requirements', async () => {
+  it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default policy', async () => {
+    const workerChoiceStrategy =
+      WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: true
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(pool.workerChoiceStrategyContext.getStrategyPolicy()).toStrictEqual({
+      useDynamicWorker: true
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify INTERLEAVED_WEIGHTED_ROUND_ROBIN strategy default tasks statistics requirements', async () => {
     const workerChoiceStrategy =
       WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN
     let pool = new FixedThreadPool(