From cde5b54ed8ffa2a288b697c3ab921a6cf7231694 Mon Sep 17 00:00:00 2001 From: =?utf8?q?J=C3=A9r=C3=B4me=20Benoit?= Date: Sun, 17 Sep 2023 12:18:37 +0200 Subject: [PATCH] perf: add dynamic thread pool to continuous benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Jérôme Benoit --- CHANGELOG.md | 1 + benchmarks/benchmarks-utils.mjs | 70 ++++++++++++++++++++++++++------- benchmarks/internal/bench.mjs | 66 +++++++++---------------------- tests/test-utils.js | 8 ++-- 4 files changed, 79 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f40bbb4f..2dc18b9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not. +- Add `start()` method to pool API to start the minimum number of workers. - Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not. - Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench. diff --git a/benchmarks/benchmarks-utils.mjs b/benchmarks/benchmarks-utils.mjs index 762966cf..2d553580 100644 --- a/benchmarks/benchmarks-utils.mjs +++ b/benchmarks/benchmarks-utils.mjs @@ -1,11 +1,14 @@ import crypto from 'node:crypto' +import assert from 'node:assert' import fs from 'node:fs' +import Benchmark from 'benchmark' import { DynamicClusterPool, DynamicThreadPool, FixedClusterPool, FixedThreadPool, PoolTypes, + WorkerChoiceStrategies, WorkerTypes } from '../lib/index.mjs' import { TaskFunctions } from './benchmarks-types.mjs' @@ -54,11 +57,11 @@ export const buildPoolifierPool = ( } } -export const runPoolifierTest = async ( +export const runPoolifierPool = async ( pool, { taskExecutions, workerData } ) => { - return new Promise((resolve, reject) => { + return await new Promise((resolve, reject) => { let executions = 0 for (let i = 1; i <= taskExecutions; i++) { pool @@ -66,28 +69,67 @@ export const runPoolifierTest = async ( .then(() => { ++executions if (executions === taskExecutions) { - return resolve({ ok: 1 }) + resolve({ ok: 1 }) } return null }) .catch(err => { console.error(err) - return reject(err) + reject(err) }) } }) } -export const getPoolImplementationName = pool => { - if (pool instanceof FixedThreadPool) { - return 'FixedThreadPool' - } else if (pool instanceof DynamicThreadPool) { - return 'DynamicThreadPool' - } else if (pool instanceof FixedClusterPool) { - return 'FixedClusterPool' - } else if (pool instanceof DynamicClusterPool) { - return 'DynamicClusterPool' - } +export const runPoolifierPoolBenchmark = async ( + name, + pool, + { taskExecutions, workerData } +) => { + return await new Promise((resolve, reject) => { + try { + const suite = new Benchmark.Suite(name) + for (const workerChoiceStrategy of Object.values( + WorkerChoiceStrategies + )) { + for (const enableTasksQueue of [false, true]) { + suite.add( + `${name}|${workerChoiceStrategy}|${ + enableTasksQueue ? 'with' : 'without' + } tasks queue`, + async () => { + pool.setWorkerChoiceStrategy(workerChoiceStrategy) + pool.enableTasksQueue(enableTasksQueue) + assert.strictEqual( + pool.opts.workerChoiceStrategy, + workerChoiceStrategy + ) + assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue) + await runPoolifierPool(pool, { + taskExecutions, + workerData + }) + } + ) + } + } + suite + .on('cycle', event => { + console.info(event.target.toString()) + }) + .on('complete', async function () { + console.info( + 'Fastest is ' + + LIST_FORMATTER.format(this.filter('fastest').map('name')) + ) + await pool.destroy() + resolve() + }) + .run({ async: true }) + } catch (error) { + reject(error) + } + }) } export const LIST_FORMATTER = new Intl.ListFormat('en-US', { diff --git a/benchmarks/internal/bench.mjs b/benchmarks/internal/bench.mjs index 891ff9ec..3484951c 100644 --- a/benchmarks/internal/bench.mjs +++ b/benchmarks/internal/bench.mjs @@ -1,67 +1,37 @@ -import assert from 'node:assert' -import Benchmark from 'benchmark' import { PoolTypes, - WorkerChoiceStrategies, WorkerTypes, availableParallelism } from '../../lib/index.mjs' import { TaskFunctions } from '../benchmarks-types.mjs' import { - LIST_FORMATTER, buildPoolifierPool, - getPoolImplementationName, - runPoolifierTest + runPoolifierPoolBenchmark } from '../benchmarks-utils.mjs' const poolSize = availableParallelism() -const fixedThreadPool = buildPoolifierPool( - WorkerTypes.thread, - PoolTypes.fixed, - poolSize -) - const taskExecutions = 1 const workerData = { function: TaskFunctions.jsonIntegerSerialization, taskSize: 1000 } -const poolifierSuite = new Benchmark.Suite('Poolifier') - -for (const pool of [fixedThreadPool]) { - for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) { - for (const enableTasksQueue of [false, true]) { - poolifierSuite.add( - `${getPoolImplementationName(pool)}|${workerChoiceStrategy}|${ - enableTasksQueue ? 'with' : 'without' - } tasks queue`, - async () => { - pool.setWorkerChoiceStrategy(workerChoiceStrategy) - pool.enableTasksQueue(enableTasksQueue) - assert.strictEqual( - pool.opts.workerChoiceStrategy, - workerChoiceStrategy - ) - assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue) - await runPoolifierTest(pool, { - taskExecutions, - workerData - }) - } - ) - } +// FixedThreadPool +await runPoolifierPoolBenchmark( + 'Poolifier FixedThreadPool', + buildPoolifierPool(WorkerTypes.thread, PoolTypes.fixed, poolSize), + { + taskExecutions, + workerData } -} +) -poolifierSuite - .on('cycle', event => { - console.info(event.target.toString()) - }) - .on('complete', async function () { - console.info( - 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name')) - ) - await fixedThreadPool.destroy() - }) - .run({ async: true }) +// DynamicThreadPool +await runPoolifierPoolBenchmark( + 'Poolifier DynamicThreadPool', + buildPoolifierPool(WorkerTypes.thread, PoolTypes.dynamic, poolSize), + { + taskExecutions, + workerData + } +) diff --git a/tests/test-utils.js b/tests/test-utils.js index b55f7621..4478d290 100644 --- a/tests/test-utils.js +++ b/tests/test-utils.js @@ -1,7 +1,7 @@ const { TaskFunctions } = require('./test-types') const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => { - return new Promise(resolve => { + return await new Promise(resolve => { let events = 0 if (numberOfEventsToWait === 0) { resolve(events) @@ -18,7 +18,7 @@ const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => { } const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => { - return new Promise(resolve => { + return await new Promise(resolve => { let events = 0 if (numberOfEventsToWait === 0) { resolve(events) @@ -33,7 +33,7 @@ const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => { } const sleep = async ms => { - return new Promise(resolve => setTimeout(resolve, ms)) + return await new Promise(resolve => setTimeout(resolve, ms)) } const sleepTaskFunction = async ( @@ -42,7 +42,7 @@ const sleepTaskFunction = async ( rejection = false, rejectionMessage = '' ) => { - return new Promise((resolve, reject) => { + return await new Promise((resolve, reject) => { setTimeout( () => rejection === true -- 2.34.1