Merge branch 'master' into elu-strategy
authorJérôme Benoit <jerome.benoit@sap.com>
Fri, 9 Jun 2023 11:06:38 +0000 (13:06 +0200)
committerGitHub <noreply@github.com>
Fri, 9 Jun 2023 11:06:38 +0000 (13:06 +0200)
README.md
src/pools/selection-strategies/least-elu-worker-choice-strategy.ts [new file with mode: 0644]
src/pools/selection-strategies/selection-strategies-types.ts
src/pools/selection-strategies/worker-choice-strategy-context.ts
tests/pools/selection-strategies/selection-strategies.test.js
tests/pools/selection-strategies/worker-choice-strategy-context.test.js

index ca6d7aff2bd0581808bd8eb54fc1a400d101f9ec..e0f665fe72edb8d8207ac727c593504a4a7eca81 100644 (file)
--- a/README.md
+++ b/README.md
@@ -163,6 +163,7 @@ An object with these properties:
   - `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
   - `WorkerChoiceStrategies.LEAST_USED`: Submit tasks to the worker with the minimum number of executed, executing and queued tasks
   - `WorkerChoiceStrategies.LEAST_BUSY`: Submit tasks to the worker with the minimum tasks total execution and wait time
+  - `WorkerChoiceStrategies.LEAST_ELU`: Submit tasks to the worker with the minimum event loop utilization (ELU) (experimental)
   - `WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using a weighted round robin scheduling algorithm based on tasks execution time
   - `WorkerChoiceStrategies.INTERLEAVED_WEIGHTED_ROUND_ROBIN`: Submit tasks to worker by using an interleaved weighted round robin scheduling algorithm based on tasks execution time (experimental)
   - `WorkerChoiceStrategies.FAIR_SHARE`: Submit tasks to worker by using a fair share tasks scheduling algorithm based on tasks execution time
diff --git a/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts b/src/pools/selection-strategies/least-elu-worker-choice-strategy.ts
new file mode 100644 (file)
index 0000000..5cc02bf
--- /dev/null
@@ -0,0 +1,80 @@
+import { DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS } from '../../utils'
+import type { IPool } from '../pool'
+import type { IWorker } from '../worker'
+import { AbstractWorkerChoiceStrategy } from './abstract-worker-choice-strategy'
+import type {
+  IWorkerChoiceStrategy,
+  TaskStatisticsRequirements,
+  WorkerChoiceStrategyOptions
+} from './selection-strategies-types'
+
+/**
+ * Selects the worker with the least ELU.
+ *
+ * @typeParam Worker - Type of worker which manages the strategy.
+ * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
+ * @typeParam Response - Type of execution response. This can only be serializable data.
+ */
+export class LeastEluWorkerChoiceStrategy<
+    Worker extends IWorker,
+    Data = unknown,
+    Response = unknown
+  >
+  extends AbstractWorkerChoiceStrategy<Worker, Data, Response>
+  implements IWorkerChoiceStrategy {
+  /** @inheritDoc */
+  public readonly taskStatisticsRequirements: TaskStatisticsRequirements = {
+    runTime: {
+      aggregate: false,
+      average: false,
+      median: false
+    },
+    waitTime: {
+      aggregate: false,
+      average: false,
+      median: false
+    },
+    elu: true
+  }
+
+  /** @inheritDoc */
+  public constructor (
+    pool: IPool<Worker, Data, Response>,
+    opts: WorkerChoiceStrategyOptions = DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
+  ) {
+    super(pool, opts)
+    this.setTaskStatisticsRequirements(this.opts)
+  }
+
+  /** @inheritDoc */
+  public reset (): boolean {
+    return true
+  }
+
+  /** @inheritDoc */
+  public update (): boolean {
+    return true
+  }
+
+  /** @inheritDoc */
+  public choose (): number {
+    let minWorkerElu = Infinity
+    let leastEluWorkerNodeKey!: number
+    for (const [workerNodeKey, workerNode] of this.pool.workerNodes.entries()) {
+      const workerUsage = workerNode.workerUsage
+      const workerElu = workerUsage.elu?.utilization ?? 0
+      if (workerElu === 0) {
+        return workerNodeKey
+      } else if (workerElu < minWorkerElu) {
+        minWorkerElu = workerElu
+        leastEluWorkerNodeKey = workerNodeKey
+      }
+    }
+    return leastEluWorkerNodeKey
+  }
+
+  /** @inheritDoc */
+  public remove (): boolean {
+    return true
+  }
+}
index 26fe436d7a8e587b651458e8359a0b4d0a2e27f9..0fbb4d4d43165737d3bd3c0c7378b2c55d7e03d2 100644 (file)
@@ -14,6 +14,12 @@ export const WorkerChoiceStrategies = Object.freeze({
    * Least busy worker selection strategy.
    */
   LEAST_BUSY: 'LEAST_BUSY',
+  /**
+   * Least ELU worker selection strategy.
+   *
+   * @experimental
+   */
+  LEAST_ELU: 'LEAST_ELU',
   /**
    * Fair share worker selection strategy.
    */
index f6e132c88514e3012dcf732cb01cde69a4b26410..a0ba25838dc524ea678c97701e20ac7aa1f7b413 100644 (file)
@@ -5,6 +5,7 @@ import { FairShareWorkerChoiceStrategy } from './fair-share-worker-choice-strate
 import { InterleavedWeightedRoundRobinWorkerChoiceStrategy } from './interleaved-weighted-round-robin-worker-choice-strategy'
 import { LeastBusyWorkerChoiceStrategy } from './least-busy-worker-choice-strategy'
 import { LeastUsedWorkerChoiceStrategy } from './least-used-worker-choice-strategy'
+import { LeastEluWorkerChoiceStrategy } from './least-elu-worker-choice-strategy'
 import { RoundRobinWorkerChoiceStrategy } from './round-robin-worker-choice-strategy'
 import type {
   IWorkerChoiceStrategy,
@@ -70,6 +71,13 @@ export class WorkerChoiceStrategyContext<
           opts
         )
       ],
+      [
+        WorkerChoiceStrategies.LEAST_ELU,
+        new (LeastEluWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
+          pool,
+          opts
+        )
+      ],
       [
         WorkerChoiceStrategies.FAIR_SHARE,
         new (FairShareWorkerChoiceStrategy.bind(this))<Worker, Data, Response>(
index d43023b302f3923fc87218cbc8799721ca7026fc..8e226860c2e3a018cda341a26959deb11ae88ed5 100644 (file)
@@ -6,6 +6,7 @@ const {
   FixedClusterPool
 } = require('../../../lib')
 const { CircularArray } = require('../../../lib/circular-array')
+const TestUtils = require('../../test-utils')
 
 describe('Selection strategies test suite', () => {
   const min = 0
@@ -15,6 +16,7 @@ describe('Selection strategies test suite', () => {
     expect(WorkerChoiceStrategies.ROUND_ROBIN).toBe('ROUND_ROBIN')
     expect(WorkerChoiceStrategies.LEAST_USED).toBe('LEAST_USED')
     expect(WorkerChoiceStrategies.LEAST_BUSY).toBe('LEAST_BUSY')
+    expect(WorkerChoiceStrategies.LEAST_ELU).toBe('LEAST_ELU')
     expect(WorkerChoiceStrategies.FAIR_SHARE).toBe('FAIR_SHARE')
     expect(WorkerChoiceStrategies.WEIGHTED_ROUND_ROBIN).toBe(
       'WEIGHTED_ROUND_ROBIN'
@@ -592,6 +594,111 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify LEAST_ELU strategy default tasks usage statistics requirements', async () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+    let pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(
+      pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+    ).toStrictEqual({
+      runTime: {
+        aggregate: false,
+        average: false,
+        median: false
+      },
+      waitTime: {
+        aggregate: false,
+        average: false,
+        median: false
+      },
+      elu: true
+    })
+    await pool.destroy()
+    pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy }
+    )
+    expect(
+      pool.workerChoiceStrategyContext.getTaskStatisticsRequirements()
+    ).toStrictEqual({
+      runTime: {
+        aggregate: false,
+        average: false,
+        median: false
+      },
+      waitTime: {
+        aggregate: false,
+        average: false,
+        median: false
+      },
+      elu: true
+    })
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
+  it('Verify LEAST_ELU strategy can be run in a fixed pool', async () => {
+    const pool = new FixedThreadPool(
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.LEAST_ELU }
+    )
+    // TODO: Create a better test to cover `LeastEluWorkerChoiceStrategy#choose`
+    const maxMultiplier = 2
+    for (let i = 0; i < max * maxMultiplier; i++) {
+      await pool.execute()
+      if (i !== max * maxMultiplier - 1) await TestUtils.sleep(500)
+    }
+    for (const workerNode of pool.workerNodes) {
+      const expectedWorkerUsage = {
+        tasks: {
+          executed: expect.any(Number),
+          executing: 0,
+          queued: 0,
+          failed: 0
+        },
+        runTime: {
+          aggregate: 0,
+          average: 0,
+          median: 0,
+          history: expect.any(CircularArray)
+        },
+        waitTime: {
+          aggregate: 0,
+          average: 0,
+          median: 0,
+          history: expect.any(CircularArray)
+        }
+      }
+      if (workerNode.workerUsage.elu === undefined) {
+        expect(workerNode.workerUsage).toStrictEqual({
+          ...expectedWorkerUsage,
+          elu: undefined
+        })
+      } else {
+        expect(workerNode.workerUsage).toStrictEqual({
+          ...expectedWorkerUsage,
+          elu: {
+            active: expect.any(Number),
+            idle: 0,
+            utilization: 1
+          }
+        })
+      }
+      expect(workerNode.workerUsage.tasks.executed).toBeGreaterThanOrEqual(0)
+      expect(workerNode.workerUsage.tasks.executed).toBeLessThanOrEqual(
+        max * maxMultiplier
+      )
+    }
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify FAIR_SHARE strategy default tasks usage statistics requirements', async () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     let pool = new FixedThreadPool(
index e39b1484ddb53a2a97cf22721ada61b565349fc6..2070f8e03301aca85ac8215c52e96e11bb065acd 100644 (file)
@@ -17,6 +17,9 @@ const {
 const {
   LeastBusyWorkerChoiceStrategy
 } = require('../../../lib/pools/selection-strategies/least-busy-worker-choice-strategy')
+const {
+  LeastEluWorkerChoiceStrategy
+} = require('../../../lib/pools/selection-strategies/least-elu-worker-choice-strategy')
 const {
   FairShareWorkerChoiceStrategy
 } = require('../../../lib/pools/selection-strategies/fair-share-worker-choice-strategy')
@@ -261,6 +264,38 @@ describe('Worker choice strategy context test suite', () => {
     )
   })
 
+  it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and fixed pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+    const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+      fixedPool
+    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+    expect(
+      workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      )
+    ).toBeInstanceOf(LeastEluWorkerChoiceStrategy)
+    expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+      workerChoiceStrategy
+    )
+  })
+
+  it('Verify that setWorkerChoiceStrategy() works with LEAST_ELU and dynamic pool', () => {
+    const workerChoiceStrategy = WorkerChoiceStrategies.LEAST_ELU
+    const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
+      dynamicPool
+    )
+    workerChoiceStrategyContext.setWorkerChoiceStrategy(workerChoiceStrategy)
+    expect(
+      workerChoiceStrategyContext.workerChoiceStrategies.get(
+        workerChoiceStrategy
+      )
+    ).toBeInstanceOf(LeastEluWorkerChoiceStrategy)
+    expect(workerChoiceStrategyContext.workerChoiceStrategy).toBe(
+      workerChoiceStrategy
+    )
+  })
+
   it('Verify that setWorkerChoiceStrategy() works with FAIR_SHARE and fixed pool', () => {
     const workerChoiceStrategy = WorkerChoiceStrategies.FAIR_SHARE
     const workerChoiceStrategyContext = new WorkerChoiceStrategyContext(