-import crypto from 'crypto'
-import fs from 'fs'
+import { strictEqual } from 'node:assert'
+
+import { bench, clear, group, run } from 'tatami-ng'
+
import {
DynamicClusterPool,
DynamicThreadPool,
FixedClusterPool,
- FixedThreadPool
+ FixedThreadPool,
+ Measurements,
+ PoolTypes,
+ WorkerChoiceStrategies,
+ WorkerTypes,
} from '../lib/index.mjs'
-import { PoolTypes, WorkerFunctions, WorkerTypes } from './benchmarks-types.mjs'
-
-export const runTest = async (pool, { taskExecutions, workerData }) => {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 1; i <= taskExecutions; i++) {
- pool
- .execute(workerData)
- .then(() => {
- ++executions
- if (executions === taskExecutions) {
- return resolve({ ok: 1 })
- }
- return null
- })
- .catch(err => {
- console.error(err)
- return reject(err)
- })
- }
- })
-}
-
-export const generateRandomInteger = (
- max = Number.MAX_SAFE_INTEGER,
- min = 0
-) => {
- if (max < min || max < 0 || min < 0) {
- throw new RangeError('Invalid interval')
- }
- max = Math.floor(max)
- if (min != null && min !== 0) {
- min = Math.ceil(min)
- return Math.floor(Math.random() * (max - min + 1)) + min
- }
- return Math.floor(Math.random() * (max + 1))
-}
-
-const jsonIntegerSerialization = n => {
- for (let i = 0; i < n; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
-}
-
-/**
- * Intentionally inefficient implementation.
- * @param {number} n - The number of fibonacci numbers to generate.
- * @returns {number} - The nth fibonacci number.
- */
-const fibonacci = n => {
- if (n <= 1) return n
- return fibonacci(n - 1) + fibonacci(n - 2)
-}
-
-/**
- * Intentionally inefficient implementation.
- * @param {number} n - The number to calculate the factorial of.
- * @returns {number} - The factorial of n.
- */
-const factorial = n => {
- if (n === 0) {
- return 1
- }
- return factorial(n - 1) * n
-}
-
-const readWriteFiles = (
- n,
- baseDirectory = `/tmp/poolifier-benchmarks/${crypto.randomInt(
- 281474976710655
- )}`
-) => {
- if (fs.existsSync(baseDirectory) === true) {
- fs.rmSync(baseDirectory, { recursive: true })
- }
- fs.mkdirSync(baseDirectory, { recursive: true })
- for (let i = 0; i < n; i++) {
- const filePath = `${baseDirectory}/${i}`
- fs.writeFileSync(filePath, i.toString(), {
- encoding: 'utf8',
- flag: 'a'
- })
- fs.readFileSync(filePath, 'utf8')
- }
- fs.rmSync(baseDirectory, { recursive: true })
-}
-
-export const executeWorkerFunction = data => {
- switch (data.function) {
- case WorkerFunctions.jsonIntegerSerialization:
- return jsonIntegerSerialization(data.taskSize || 1000)
- case WorkerFunctions.fibonacci:
- return fibonacci(data.taskSize || 1000)
- case WorkerFunctions.factorial:
- return factorial(data.taskSize || 1000)
- case WorkerFunctions.readWriteFiles:
- return readWriteFiles(data.taskSize || 1000)
- default:
- throw new Error('Unknown worker function')
- }
-}
+import { executeTaskFunction } from './benchmarks-utils.cjs'
-export const buildPool = (workerType, poolType, poolSize, poolOptions) => {
+const buildPoolifierPool = (workerType, poolType, poolSize, poolOptions) => {
switch (poolType) {
case PoolTypes.fixed:
switch (workerType) {
case WorkerTypes.cluster:
return new FixedClusterPool(
poolSize,
- './benchmarks/internal/cluster-worker.mjs',
+ './benchmarks/internal/cluster-worker.cjs',
poolOptions
)
}
case WorkerTypes.thread:
return new DynamicThreadPool(
Math.floor(poolSize / 2),
- poolSize * 3,
+ poolSize,
'./benchmarks/internal/thread-worker.mjs',
poolOptions
)
case WorkerTypes.cluster:
return new DynamicClusterPool(
Math.floor(poolSize / 2),
- poolSize * 3,
- './benchmarks/internal/cluster-worker.mjs',
+ poolSize,
+ './benchmarks/internal/cluster-worker.cjs',
poolOptions
)
}
break
}
}
+
+const runPoolifierPool = async (pool, { taskExecutions, workerData }) => {
+ for (let i = 1; i <= taskExecutions; i++) {
+ await pool.execute(workerData)
+ }
+}
+
+export const runPoolifierBenchmarkTatamiNg = async (
+ name,
+ workerType,
+ poolType,
+ poolSize,
+ { taskExecutions, workerData }
+) => {
+ try {
+ const pool = buildPoolifierPool(workerType, poolType, poolSize)
+ for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ for (const enableTasksQueue of [false, true]) {
+ if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
+ for (const measurement of [Measurements.runTime, Measurements.elu]) {
+ group(name, () => {
+ bench(
+ `${name} with ${workerChoiceStrategy}, with ${measurement} and ${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData,
+ })
+ },
+ {
+ before: () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy, {
+ measurement,
+ })
+ pool.enableTasksQueue(enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategyOptions.measurement,
+ measurement
+ )
+ },
+ }
+ )
+ })
+ }
+ } else {
+ group(name, () => {
+ bench(
+ `${name} with ${workerChoiceStrategy} and ${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData,
+ })
+ },
+ {
+ before: () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ pool.enableTasksQueue(enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ },
+ }
+ )
+ })
+ }
+ }
+ }
+ const report = await run()
+ clear()
+ await pool.destroy()
+ return report
+ } catch (error) {
+ console.error(error)
+ }
+}
+
+export const convertTatamiNgToBmf = report => {
+ return report.benchmarks
+ .map(({ name, stats }) => {
+ return {
+ [name]: {
+ latency: {
+ value: stats?.avg,
+ lower_value: stats?.min,
+ upper_value: stats?.max,
+ },
+ throughput: {
+ value: stats?.iter,
+ },
+ },
+ }
+ })
+ .reduce((obj, item) => Object.assign(obj, item), {})
+}
+
+export { executeTaskFunction }