Cleanups on bechmarking and strategies code: (#227)
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Tue, 23 Feb 2021 09:38:00 +0000 (10:38 +0100)
committerGitHub <noreply@github.com>
Tue, 23 Feb 2021 09:38:00 +0000 (10:38 +0100)
+ Factor out common code
+ Renaming
+ Add benchmarking on strategies
+ Add microtime to improve benchmark time resolution

Signed-off-by: Jérôme Benoit <jerome.benoit@sap.com>
16 files changed:
benchmarks/internal/bench.js
benchmarks/internal/benchmark-utils.js [new file with mode: 0644]
benchmarks/internal/choose-worker.js
benchmarks/internal/cluster/dynamic.js
benchmarks/internal/cluster/fixed.js
benchmarks/internal/cluster/worker.js
benchmarks/internal/thread/dynamic.js
benchmarks/internal/thread/fixed.js
package-lock.json
package.json
src/pools/abstract-pool.ts
src/pools/cluster/dynamic.ts
src/pools/pool-internal.ts
src/pools/selection-strategies.ts
src/pools/thread/dynamic.ts
tests/pools/selection-strategies.test.js

index 2ad7f22c5e47ec7827366ed3c95b21a3402c06cf..c3db71651a1e84b333490ded1bcd10f29650ff41 100644 (file)
@@ -1,10 +1,16 @@
 const Benchmark = require('benchmark')
-const { dynamicClusterTest } = require('./cluster/dynamic')
+const {
+  dynamicClusterTest,
+  dynamicClusterTestLessRecentlyUsed
+} = require('./cluster/dynamic')
 const { fixedClusterTest } = require('./cluster/fixed')
-const { dynamicThreadTest } = require('./thread/dynamic')
+const {
+  dynamicThreadTest,
+  dynamicThreadTestLessRecentlyUsed
+} = require('./thread/dynamic')
 const { fixedThreadTest } = require('./thread/fixed')
 
-const suite = new Benchmark.Suite()
+const suite = new Benchmark.Suite('poolifier')
 
 const LIST_FORMATTER = new Intl.ListFormat('en-US', {
   style: 'long',
@@ -19,18 +25,24 @@ setTimeout(async () => {
 async function test () {
   // Add tests
   suite
-    .add('Pioardi:Static:ThreadPool', async function () {
+    .add('Poolifier:Static:ThreadPool', async function () {
       await fixedThreadTest()
     })
-    .add('Pioardi:Dynamic:ThreadPool', async function () {
+    .add('Poolifier:Dynamic:ThreadPool', async function () {
       await dynamicThreadTest()
     })
-    .add('Pioardi:Static:ClusterPool', async function () {
+    .add('Poolifier:Dynamic:ThreadPool:LessRecentlyUsed', async function () {
+      await dynamicThreadTestLessRecentlyUsed()
+    })
+    .add('Poolifier:Static:ClusterPool', async function () {
       await fixedClusterTest()
     })
-    .add('Pioardi:Dynamic:ClusterPool', async function () {
+    .add('Poolifier:Dynamic:ClusterPool', async function () {
       await dynamicClusterTest()
     })
+    .add('Poolifier:Dynamic:ClusterPool:LessRecentlyUsed', async function () {
+      await dynamicClusterTestLessRecentlyUsed()
+    })
     // Add listeners
     .on('cycle', function (event) {
       console.log(event.target.toString())
@@ -43,5 +55,5 @@ async function test () {
       // eslint-disable-next-line no-process-exit
       process.exit()
     })
-    .run()
+    .run({ async: true, queued: true })
 }
diff --git a/benchmarks/internal/benchmark-utils.js b/benchmarks/internal/benchmark-utils.js
new file mode 100644 (file)
index 0000000..f84227d
--- /dev/null
@@ -0,0 +1,19 @@
+async function runTest (pool, { tasks, workerData }) {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      pool
+        .execute(workerData)
+        .then(res => {
+          executions++
+          if (executions === tasks) {
+            return resolve('FINISH')
+          }
+          return null
+        })
+        .catch(err => console.error(err))
+    }
+  })
+}
+
+module.exports = { runTest }
index fe98eff1e0123ad143d2e394ffa59c53ee8afe4a..92eadfe4bd4bacced81ffa3af45faced420e3e88 100644 (file)
@@ -11,7 +11,7 @@ const workers = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
 
 let nextWorkerIndex = 0
 
-function chooseWorkerTernary () {
+function chooseWorkerTernaryOffByOne () {
   nextWorkerIndex =
     workers.length - 1 === nextWorkerIndex ? 0 : nextWorkerIndex + 1
   return workers[nextWorkerIndex]
@@ -40,9 +40,9 @@ function chooseWorkerIncrementModulo () {
 }
 
 suite
-  .add('Ternary', function () {
+  .add('Ternary off by one', function () {
     nextWorkerIndex = 0
-    chooseWorkerTernary()
+    chooseWorkerTernaryOffByOne()
   })
   .add('Ternary with negation', function () {
     nextWorkerIndex = 0
index 7e607c87059229431490d29e1cee0b56b5e2d51c..7ad9dd282d5e27a7afcdcd10e9a0d4c860e4e75c 100644 (file)
@@ -1,4 +1,8 @@
-const { DynamicClusterPool } = require('../../../lib/index')
+const {
+  DynamicClusterPool,
+  WorkerChoiceStrategies
+} = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
 
 const size = 30
 
@@ -8,24 +12,26 @@ const dynamicPool = new DynamicClusterPool(
   './benchmarks/internal/cluster/worker.js'
 )
 
+const dynamicPoolLessRecentlyUsed = new DynamicClusterPool(
+  size / 2,
+  size * 3,
+  './benchmarks/internal/cluster/worker.js',
+  { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
+)
+
 async function dynamicClusterTest (
   { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
 ) {
-  return new Promise((resolve, reject) => {
-    let executions = 0
-    for (let i = 0; i <= tasks; i++) {
-      dynamicPool
-        .execute(workerData)
-        .then(res => {
-          executions++
-          if (executions === tasks) {
-            return resolve('FINISH')
-          }
-          return null
-        })
-        .catch(err => console.error(err))
-    }
-  })
+  return runTest(dynamicPool, { tasks, workerData })
 }
 
-module.exports = { dynamicClusterTest }
+async function dynamicClusterTestLessRecentlyUsed (
+  { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+  return runTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
+}
+
+module.exports = {
+  dynamicClusterTest,
+  dynamicClusterTestLessRecentlyUsed
+}
index 1fd8d6797c62e1db75e362225e07958d4c5f9dd7..60a5b938f8a5a6f46fc474a1e0e115152dba05e0 100644 (file)
@@ -1,4 +1,5 @@
 const { FixedClusterPool } = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
 
 const size = 30
 
@@ -10,23 +11,7 @@ const fixedPool = new FixedClusterPool(
 async function fixedClusterTest (
   { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
 ) {
-  return new Promise((resolve, reject) => {
-    let executions = 0
-    for (let i = 0; i <= tasks; i++) {
-      fixedPool
-        .execute(workerData)
-        .then(res => {
-          executions++
-          if (executions === tasks) {
-            return resolve('FINISH')
-          }
-          return null
-        })
-        .catch(err => {
-          console.error(err)
-        })
-    }
-  })
+  return runTest(fixedPool, { tasks, workerData })
 }
 
 module.exports = { fixedClusterTest }
index c7211b7bfd2b9fe5883059ef9bba8771d614381d..1692c284b218c1decb5b5b451e80b1ff81e0283f 100644 (file)
@@ -8,7 +8,7 @@ function yourFunction (data) {
     }
     JSON.stringify(o)
   }
-  // console.log('This is the main thread ' + isMainThread)
+  // console.log('This is the main thread ' + isMaster)
   return { ok: 1 }
 }
 
index f3f0e04977e3ee718180bd3122b5d08b39ef3497..bc9b3d8194e91e14305419eb2265bc4bced929b3 100644 (file)
@@ -1,27 +1,37 @@
-const { DynamicThreadPool } = require('../../../lib/index')
+const {
+  DynamicThreadPool,
+  WorkerChoiceStrategies
+} = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
 
 const size = 30
 
-const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './worker.js')
+const dynamicPool = new DynamicThreadPool(
+  size / 2,
+  size * 3,
+  './benchmarks/internal/thread/worker.js'
+)
+
+const dynamicPoolLessRecentlyUsed = new DynamicThreadPool(
+  size / 2,
+  size * 3,
+  './benchmarks/internal/thread/worker.js',
+  { workerChoiceStrategy: DynamicThreadPool.LESS_RECENTLY_USED }
+)
 
 async function dynamicThreadTest (
   { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
 ) {
-  return new Promise((resolve, reject) => {
-    let executions = 0
-    for (let i = 0; i <= tasks; i++) {
-      dynamicPool
-        .execute(workerData)
-        .then(res => {
-          executions++
-          if (executions === tasks) {
-            return resolve('FINISH')
-          }
-          return null
-        })
-        .catch(err => console.error(err))
-    }
-  })
+  return runTest(dynamicPool, { tasks, workerData })
+}
+
+async function dynamicThreadTestLessRecentlyUsed (
+  { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
+) {
+  return runTest(dynamicPoolLessRecentlyUsed, { tasks, workerData })
 }
 
-module.exports = { dynamicThreadTest }
+module.exports = {
+  dynamicThreadTest,
+  dynamicThreadTestLessRecentlyUsed
+}
index 8168a6dc20ca113dad6c8ac532b182ce94ac9ac7..7ad60eeb3cf0b5b5ff1b3601b725f4adf58a6469 100644 (file)
@@ -1,29 +1,17 @@
 const { FixedThreadPool } = require('../../../lib/index')
+const { runTest } = require('../benchmark-utils')
 
 const size = 30
 
-const fixedPool = new FixedThreadPool(size, './worker.js')
+const fixedPool = new FixedThreadPool(
+  size,
+  './benchmarks/internal/thread/worker.js'
+)
 
 async function fixedThreadTest (
   { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
 ) {
-  return new Promise((resolve, reject) => {
-    let executions = 0
-    for (let i = 0; i <= tasks; i++) {
-      fixedPool
-        .execute(workerData)
-        .then(res => {
-          executions++
-          if (executions === tasks) {
-            return resolve('FINISH')
-          }
-          return null
-        })
-        .catch(err => {
-          console.error(err)
-        })
-    }
-  })
+  return runTest(fixedPool, { tasks, workerData })
 }
 
 module.exports = { fixedThreadTest }
index 3596c88e476bd5d44938ba29ab5050f97ef8f119..cba840580728ce0499bbdab7ee1774d777d2223f 100644 (file)
         "picomatch": "^2.0.5"
       }
     },
+    "microtime": {
+      "version": "3.0.0",
+      "resolved": "https://registry.npmjs.org/microtime/-/microtime-3.0.0.tgz",
+      "integrity": "sha512-SirJr7ZL4ow2iWcb54bekS4aWyBQNVcEDBiwAz9D/sTgY59A+uE8UJU15cp5wyZmPBwg/3zf8lyCJ5NUe1nVlQ==",
+      "dev": true,
+      "requires": {
+        "node-addon-api": "^1.2.0",
+        "node-gyp-build": "^3.8.0"
+      }
+    },
     "mimic-fn": {
       "version": "3.1.0",
       "resolved": "https://registry.npmjs.org/mimic-fn/-/mimic-fn-3.1.0.tgz",
       "integrity": "sha512-Yd3UES5mWCSqR+qNT93S3UoYUkqAZ9lLg8a7g9rimsWmYGK8cVToA4/sF3RrshdyV3sAGMXVUmpMYOw+dLpOuw==",
       "dev": true
     },
+    "node-addon-api": {
+      "version": "1.7.2",
+      "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-1.7.2.tgz",
+      "integrity": "sha512-ibPK3iA+vaY1eEjESkQkM0BbCqFOaZMiXRTtdB0u7b4djtY6JnsjvPdUHVMg6xQt3B8fpTTWHI9A+ADjM9frzg==",
+      "dev": true
+    },
+    "node-gyp-build": {
+      "version": "3.9.0",
+      "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-3.9.0.tgz",
+      "integrity": "sha512-zLcTg6P4AbcHPq465ZMFNXx7XpKKJh+7kkN699NiQWisR2uWYOWNWqRHAmbnmKiL4e9aLSlmy5U7rEMUXV59+A==",
+      "dev": true
+    },
     "node-preload": {
       "version": "0.2.1",
       "resolved": "https://registry.npmjs.org/node-preload/-/node-preload-0.2.1.tgz",
index ad26b2568fad787ae9b00e51f365f9a08669e7a7..0a08c94ace9d67435ebbd303ff7908473b61cf6c 100644 (file)
@@ -74,6 +74,7 @@
     "eslint-plugin-promise": "^4.3.1",
     "eslint-plugin-spellcheck": "0.0.17",
     "expect": "^26.6.2",
+    "microtime": "^3.0.0",
     "mocha": "^8.3.0",
     "mocha-lcov-reporter": "^1.3.0",
     "nyc": "^15.1.0",
index 9d7e4f0c2306ad2dd9970cc7ac7cd9094959b136..7024f0998797936be1fe8d43710e1e2f1c827633 100644 (file)
@@ -187,13 +187,13 @@ export abstract class AbstractPool<
       throw new Error(
         'Cannot instantiate a pool with a negative number of workers'
       )
-    } else if (!this.isDynamic() && numberOfWorkers === 0) {
+    } else if (!this.dynamic && numberOfWorkers === 0) {
       throw new Error('Cannot instantiate a fixed pool with no worker')
     }
   }
 
   /** @inheritdoc */
-  public isDynamic (): boolean {
+  public get dynamic (): boolean {
     return false
   }
 
index 4a5f720aca0a315af076cfacd3bb17725f27a3a8..f0c3c0e40cf21cceb0e2048d1b7931417d972c96 100644 (file)
@@ -35,7 +35,7 @@ export class DynamicClusterPool<
   }
 
   /** @inheritdoc */
-  public isDynamic (): boolean {
+  public get dynamic (): boolean {
     return true
   }
 }
index ddee27eac0ed21bf43ca4f61b8378ee3a3146b6a..d641fcf9d9fec246ee26a92a67a78d348b3dacf3 100644 (file)
@@ -42,17 +42,17 @@ export interface IPoolInternal<
    */
   readonly emitter: PoolEmitter
 
-  /**
-   * Maximum number of workers that can be created by this pool.
-   */
-  readonly max?: number
-
   /**
    * Whether the pool is dynamic or not.
    *
    * If it is dynamic, it provides the `max` property.
    */
-  isDynamic(): boolean
+  readonly dynamic: boolean
+
+  /**
+   * Maximum number of workers that can be created by this pool.
+   */
+  readonly max?: number
 
   /**
    * Creates a new worker for this pool and sets it up completely.
index 5c40a71b404e3dadb74a468e282f9e5accb4ac95..f86442b0f5fc836fd9cacf77333d1bab69f1fae2 100644 (file)
@@ -90,11 +90,12 @@ class LessRecentlyUsedWorkerChoiceStrategy<
 
   /** @inheritdoc */
   public choose (): Worker {
+    const isPoolDynamic = this.pool.dynamic
     let minNumberOfTasks = Infinity
     // A worker is always found because it picks the one with fewer tasks
     let lessRecentlyUsedWorker!: Worker
     for (const [worker, numberOfTasks] of this.pool.tasks) {
-      if (numberOfTasks === 0) {
+      if (!isPoolDynamic && numberOfTasks === 0) {
         return worker
       } else if (numberOfTasks < minNumberOfTasks) {
         minNumberOfTasks = numberOfTasks
@@ -105,29 +106,6 @@ class LessRecentlyUsedWorkerChoiceStrategy<
   }
 }
 
-/**
- * Get the worker choice strategy instance.
- *
- * @param pool The pool instance.
- * @param workerChoiceStrategy The worker choice strategy.
- * @returns The worker choice strategy instance.
- */
-function getWorkerChoiceStrategy<Worker extends IWorker, Data, Response> (
-  pool: IPoolInternal<Worker, Data, Response>,
-  workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
-): IWorkerChoiceStrategy<Worker> {
-  switch (workerChoiceStrategy) {
-    case WorkerChoiceStrategies.ROUND_ROBIN:
-      return new RoundRobinWorkerChoiceStrategy(pool)
-    case WorkerChoiceStrategies.LESS_RECENTLY_USED:
-      return new LessRecentlyUsedWorkerChoiceStrategy(pool)
-    default:
-      throw new Error(
-        `Worker choice strategy '${workerChoiceStrategy}' not found`
-      )
-  }
-}
-
 /**
  * Dynamically choose a worker.
  *
@@ -149,34 +127,17 @@ class DynamicPoolWorkerChoiceStrategy<Worker extends IWorker, Data, Response>
     private readonly pool: IPoolInternal<Worker, Data, Response>,
     workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
   ) {
-    this.workerChoiceStrategy = getWorkerChoiceStrategy(
+    this.workerChoiceStrategy = SelectionStrategiesUtils.getWorkerChoiceStrategy(
       this.pool,
       workerChoiceStrategy
     )
   }
 
-  /**
-   * Find a free worker based on number of tasks the worker has applied.
-   *
-   * If a worker was found that has `0` tasks, it is detected as free and will be returned.
-   *
-   * If no free worker was found, `null` will be returned.
-   *
-   * @returns A free worker if there was one, otherwise `null`.
-   */
-  private findFreeWorkerBasedOnTasks (): Worker | null {
-    for (const [worker, numberOfTasks] of this.pool.tasks) {
-      if (numberOfTasks === 0) {
-        // A worker is free, use it
-        return worker
-      }
-    }
-    return null
-  }
-
   /** @inheritdoc */
   public choose (): Worker {
-    const freeWorker = this.findFreeWorkerBasedOnTasks()
+    const freeWorker = SelectionStrategiesUtils.findFreeWorkerBasedOnTasks(
+      this.pool
+    )
     if (freeWorker) {
       return freeWorker
     }
@@ -239,13 +200,16 @@ export class WorkerChoiceStrategyContext<
   private getPoolWorkerChoiceStrategy (
     workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
   ): IWorkerChoiceStrategy<Worker> {
-    if (this.pool.isDynamic()) {
+    if (this.pool.dynamic) {
       return new DynamicPoolWorkerChoiceStrategy(
         this.pool,
         workerChoiceStrategy
       )
     }
-    return getWorkerChoiceStrategy(this.pool, workerChoiceStrategy)
+    return SelectionStrategiesUtils.getWorkerChoiceStrategy(
+      this.pool,
+      workerChoiceStrategy
+    )
   }
 
   /**
@@ -270,3 +234,59 @@ export class WorkerChoiceStrategyContext<
     return this.workerChoiceStrategy.choose()
   }
 }
+
+/**
+ * Worker selection strategies helpers.
+ */
+class SelectionStrategiesUtils {
+  /**
+   * Find a free worker based on number of tasks the worker has applied.
+   *
+   * If a worker was found that has `0` tasks, it is detected as free and will be returned.
+   *
+   * If no free worker was found, `null` will be returned.
+   *
+   * @param pool The pool instance.
+   * @returns A free worker if there was one, otherwise `null`.
+   */
+  public static findFreeWorkerBasedOnTasks<
+    Worker extends IWorker,
+    Data,
+    Response
+  > (pool: IPoolInternal<Worker, Data, Response>): Worker | null {
+    for (const [worker, numberOfTasks] of pool.tasks) {
+      if (numberOfTasks === 0) {
+        // A worker is free, use it
+        return worker
+      }
+    }
+    return null
+  }
+
+  /**
+   * Get the worker choice strategy instance.
+   *
+   * @param pool The pool instance.
+   * @param workerChoiceStrategy The worker choice strategy.
+   * @returns The worker choice strategy instance.
+   */
+  public static getWorkerChoiceStrategy<
+    Worker extends IWorker,
+    Data,
+    Response
+  > (
+    pool: IPoolInternal<Worker, Data, Response>,
+    workerChoiceStrategy: WorkerChoiceStrategy = WorkerChoiceStrategies.ROUND_ROBIN
+  ): IWorkerChoiceStrategy<Worker> {
+    switch (workerChoiceStrategy) {
+      case WorkerChoiceStrategies.ROUND_ROBIN:
+        return new RoundRobinWorkerChoiceStrategy(pool)
+      case WorkerChoiceStrategies.LESS_RECENTLY_USED:
+        return new LessRecentlyUsedWorkerChoiceStrategy(pool)
+      default:
+        throw new Error(
+          `Worker choice strategy '${workerChoiceStrategy}' not found`
+        )
+    }
+  }
+}
index 04125854e2f24e7bb6c1d058bb57fa953105eb35..b4fd46c1c401d5bde0fd990fe35a05e1e4129bb4 100644 (file)
@@ -36,7 +36,7 @@ export class DynamicThreadPool<
   }
 
   /** @inheritdoc */
-  public isDynamic (): boolean {
+  public get dynamic (): boolean {
     return true
   }
 }
index 604dda3d52f9ed46165e8943d145b4bc99c1e8e0..d631afc6ed30644aa01378bf9534f7cd730ff149 100644 (file)
@@ -39,7 +39,7 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
-  it('Verify LESS_RECENTLY_USED strategy can be run in a pool', async () => {
+  it('Verify LESS_RECENTLY_USED strategy can be run in a fixed pool', async () => {
     const max = 3
     const pool = new FixedThreadPool(
       max,
@@ -57,6 +57,26 @@ describe('Selection strategies test suite', () => {
     await pool.destroy()
   })
 
+  it('Verify LESS_RECENTLY_USED strategy can be run in a dynamic pool', async () => {
+    const min = 0
+    const max = 3
+    const pool = new DynamicThreadPool(
+      min,
+      max,
+      './tests/worker-files/thread/testWorker.js',
+      { workerChoiceStrategy: WorkerChoiceStrategies.LESS_RECENTLY_USED }
+    )
+    // TODO: Create a better test to cover `LessRecentlyUsedWorkerChoiceStrategy#choose`
+    const promises = []
+    for (let i = 0; i < max * 2; i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    await Promise.all(promises)
+
+    // We need to clean up the resources after our test
+    await pool.destroy()
+  })
+
   it('Verify unknown strategies throw error', () => {
     const min = 1
     const max = 3