perf: add dynamic thread pool to continuous benchmark
authorJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Sep 2023 10:18:37 +0000 (12:18 +0200)
committerJérôme Benoit <jerome.benoit@piment-noir.org>
Sun, 17 Sep 2023 10:18:37 +0000 (12:18 +0200)
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
CHANGELOG.md
benchmarks/benchmarks-utils.mjs
benchmarks/internal/bench.mjs
tests/test-utils.js

index f40bbb4f299608631aeb934920e6d34d5df26510..2dc18b9d3475745bf97fc151a4a936cd25aa9b2e 100644 (file)
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 ### Added
 
 - Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+- Add `start()` method to pool API to start the minimum number of workers.
 - Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
 - Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
 
index 762966cf2437974977ec2e70012488a4e541c8ee..2d553580c42e29491d8dc7efbfa4750482f1dc15 100644 (file)
@@ -1,11 +1,14 @@
 import crypto from 'node:crypto'
+import assert from 'node:assert'
 import fs from 'node:fs'
+import Benchmark from 'benchmark'
 import {
   DynamicClusterPool,
   DynamicThreadPool,
   FixedClusterPool,
   FixedThreadPool,
   PoolTypes,
+  WorkerChoiceStrategies,
   WorkerTypes
 } from '../lib/index.mjs'
 import { TaskFunctions } from './benchmarks-types.mjs'
@@ -54,11 +57,11 @@ export const buildPoolifierPool = (
   }
 }
 
-export const runPoolifierTest = async (
+export const runPoolifierPool = async (
   pool,
   { taskExecutions, workerData }
 ) => {
-  return new Promise((resolve, reject) => {
+  return await new Promise((resolve, reject) => {
     let executions = 0
     for (let i = 1; i <= taskExecutions; i++) {
       pool
@@ -66,28 +69,67 @@ export const runPoolifierTest = async (
         .then(() => {
           ++executions
           if (executions === taskExecutions) {
-            return resolve({ ok: 1 })
+            resolve({ ok: 1 })
           }
           return null
         })
         .catch(err => {
           console.error(err)
-          return reject(err)
+          reject(err)
         })
     }
   })
 }
 
-export const getPoolImplementationName = pool => {
-  if (pool instanceof FixedThreadPool) {
-    return 'FixedThreadPool'
-  } else if (pool instanceof DynamicThreadPool) {
-    return 'DynamicThreadPool'
-  } else if (pool instanceof FixedClusterPool) {
-    return 'FixedClusterPool'
-  } else if (pool instanceof DynamicClusterPool) {
-    return 'DynamicClusterPool'
-  }
+export const runPoolifierPoolBenchmark = async (
+  name,
+  pool,
+  { taskExecutions, workerData }
+) => {
+  return await new Promise((resolve, reject) => {
+    try {
+      const suite = new Benchmark.Suite(name)
+      for (const workerChoiceStrategy of Object.values(
+        WorkerChoiceStrategies
+      )) {
+        for (const enableTasksQueue of [false, true]) {
+          suite.add(
+            `${name}|${workerChoiceStrategy}|${
+              enableTasksQueue ? 'with' : 'without'
+            } tasks queue`,
+            async () => {
+              pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+              pool.enableTasksQueue(enableTasksQueue)
+              assert.strictEqual(
+                pool.opts.workerChoiceStrategy,
+                workerChoiceStrategy
+              )
+              assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+              await runPoolifierPool(pool, {
+                taskExecutions,
+                workerData
+              })
+            }
+          )
+        }
+      }
+      suite
+        .on('cycle', event => {
+          console.info(event.target.toString())
+        })
+        .on('complete', async function () {
+          console.info(
+            'Fastest is ' +
+              LIST_FORMATTER.format(this.filter('fastest').map('name'))
+          )
+          await pool.destroy()
+          resolve()
+        })
+        .run({ async: true })
+    } catch (error) {
+      reject(error)
+    }
+  })
 }
 
 export const LIST_FORMATTER = new Intl.ListFormat('en-US', {
index 891ff9eca7118d5bb530e43d23315dcbdfa7c59f..3484951cfb187a42bf1af5f325cd1c436adb6baf 100644 (file)
@@ -1,67 +1,37 @@
-import assert from 'node:assert'
-import Benchmark from 'benchmark'
 import {
   PoolTypes,
-  WorkerChoiceStrategies,
   WorkerTypes,
   availableParallelism
 } from '../../lib/index.mjs'
 import { TaskFunctions } from '../benchmarks-types.mjs'
 import {
-  LIST_FORMATTER,
   buildPoolifierPool,
-  getPoolImplementationName,
-  runPoolifierTest
+  runPoolifierPoolBenchmark
 } from '../benchmarks-utils.mjs'
 
 const poolSize = availableParallelism()
-const fixedThreadPool = buildPoolifierPool(
-  WorkerTypes.thread,
-  PoolTypes.fixed,
-  poolSize
-)
-
 const taskExecutions = 1
 const workerData = {
   function: TaskFunctions.jsonIntegerSerialization,
   taskSize: 1000
 }
 
-const poolifierSuite = new Benchmark.Suite('Poolifier')
-
-for (const pool of [fixedThreadPool]) {
-  for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
-    for (const enableTasksQueue of [false, true]) {
-      poolifierSuite.add(
-        `${getPoolImplementationName(pool)}|${workerChoiceStrategy}|${
-          enableTasksQueue ? 'with' : 'without'
-        } tasks queue`,
-        async () => {
-          pool.setWorkerChoiceStrategy(workerChoiceStrategy)
-          pool.enableTasksQueue(enableTasksQueue)
-          assert.strictEqual(
-            pool.opts.workerChoiceStrategy,
-            workerChoiceStrategy
-          )
-          assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
-          await runPoolifierTest(pool, {
-            taskExecutions,
-            workerData
-          })
-        }
-      )
-    }
+// FixedThreadPool
+await runPoolifierPoolBenchmark(
+  'Poolifier FixedThreadPool',
+  buildPoolifierPool(WorkerTypes.thread, PoolTypes.fixed, poolSize),
+  {
+    taskExecutions,
+    workerData
   }
-}
+)
 
-poolifierSuite
-  .on('cycle', event => {
-    console.info(event.target.toString())
-  })
-  .on('complete', async function () {
-    console.info(
-      'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
-    )
-    await fixedThreadPool.destroy()
-  })
-  .run({ async: true })
+// DynamicThreadPool
+await runPoolifierPoolBenchmark(
+  'Poolifier DynamicThreadPool',
+  buildPoolifierPool(WorkerTypes.thread, PoolTypes.dynamic, poolSize),
+  {
+    taskExecutions,
+    workerData
+  }
+)
index b55f76215e3d8f651281148eb8c57be59e7cc9fb..4478d29015ede00b981e8e66262204e2dc686a04 100644 (file)
@@ -1,7 +1,7 @@
 const { TaskFunctions } = require('./test-types')
 
 const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => {
-  return new Promise(resolve => {
+  return await new Promise(resolve => {
     let events = 0
     if (numberOfEventsToWait === 0) {
       resolve(events)
@@ -18,7 +18,7 @@ const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => {
 }
 
 const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
-  return new Promise(resolve => {
+  return await new Promise(resolve => {
     let events = 0
     if (numberOfEventsToWait === 0) {
       resolve(events)
@@ -33,7 +33,7 @@ const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
 }
 
 const sleep = async ms => {
-  return new Promise(resolve => setTimeout(resolve, ms))
+  return await new Promise(resolve => setTimeout(resolve, ms))
 }
 
 const sleepTaskFunction = async (
@@ -42,7 +42,7 @@ const sleepTaskFunction = async (
   rejection = false,
   rejectionMessage = ''
 ) => {
-  return new Promise((resolve, reject) => {
+  return await new Promise((resolve, reject) => {
     setTimeout(
       () =>
         rejection === true