-import crypto from 'crypto'
-import fs from 'fs'
+import { strictEqual } from 'node:assert'
+
+import Benchmark from 'benchmark'
+import { bench, group } from 'mitata'
+
import {
DynamicClusterPool,
DynamicThreadPool,
FixedClusterPool,
- FixedThreadPool
+ FixedThreadPool,
+ Measurements,
+ PoolTypes,
+ WorkerChoiceStrategies,
+ WorkerTypes
} from '../lib/index.mjs'
-import { PoolTypes, WorkerFunctions, WorkerTypes } from './benchmarks-types.mjs'
-
-export async function runTest (pool, { taskExecutions, workerData }) {
- return new Promise((resolve, reject) => {
- let executions = 0
- for (let i = 1; i <= taskExecutions; i++) {
- pool
- .execute(workerData)
- .then(() => {
- ++executions
- if (executions === taskExecutions) {
- return resolve({ ok: 1 })
- }
- return null
- })
- .catch(err => {
- console.error(err)
- return reject(err)
- })
- }
- })
-}
-
-export function generateRandomInteger (max = Number.MAX_SAFE_INTEGER, min = 0) {
- if (max < min || max < 0 || min < 0) {
- throw new RangeError('Invalid interval')
- }
- max = Math.floor(max)
- if (min != null && min !== 0) {
- min = Math.ceil(min)
- return Math.floor(Math.random() * (max - min + 1)) + min
- }
- return Math.floor(Math.random() * (max + 1))
-}
+import { executeTaskFunction } from './benchmarks-utils.cjs'
-function jsonIntegerSerialization (n) {
- for (let i = 0; i < n; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
-}
-
-/**
- * Intentionally inefficient implementation.
- * @param {number} n - The number of fibonacci numbers to generate.
- * @returns {number} - The nth fibonacci number.
- */
-export function fibonacci (n) {
- if (n <= 1) return n
- return fibonacci(n - 1) + fibonacci(n - 2)
-}
-
-/**
- * Intentionally inefficient implementation.
- * @param {number} n - The number to calculate the factorial of.
- * @returns {number} - The factorial of n.
- */
-export function factorial (n) {
- if (n === 0) {
- return 1
- }
- return factorial(n - 1) * n
-}
-
-export function readWriteFiles (
- n,
- baseDirectory = `/tmp/poolifier-benchmarks/${crypto.randomInt(
- 281474976710655
- )}`
-) {
- if (fs.existsSync(baseDirectory) === true) {
- fs.rmSync(baseDirectory, { recursive: true })
- }
- fs.mkdirSync(baseDirectory, { recursive: true })
- for (let i = 0; i < n; i++) {
- const filePath = `${baseDirectory}/${i}`
- fs.writeFileSync(filePath, i.toString(), {
- encoding: 'utf8',
- flag: 'a'
- })
- fs.readFileSync(filePath, 'utf8')
- }
- fs.rmSync(baseDirectory, { recursive: true })
-}
-
-export function executeWorkerFunction (data) {
- switch (data.function) {
- case WorkerFunctions.jsonIntegerSerialization:
- return jsonIntegerSerialization(data.taskSize || 1000)
- case WorkerFunctions.fibonacci:
- return fibonacci(data.taskSize || 1000)
- case WorkerFunctions.factorial:
- return factorial(data.taskSize || 1000)
- case WorkerFunctions.readWriteFiles:
- return readWriteFiles(data.taskSize || 1000)
- default:
- throw new Error('Unknown worker function')
- }
-}
-
-export function buildPool (workerType, poolType, poolSize, poolOptions) {
+const buildPoolifierPool = (workerType, poolType, poolSize, poolOptions) => {
switch (poolType) {
case PoolTypes.fixed:
switch (workerType) {
case WorkerTypes.cluster:
return new FixedClusterPool(
poolSize,
- './benchmarks/internal/cluster-worker.mjs',
+ './benchmarks/internal/cluster-worker.cjs',
poolOptions
)
}
switch (workerType) {
case WorkerTypes.thread:
return new DynamicThreadPool(
- poolSize / 2,
- poolSize * 3,
+ Math.floor(poolSize / 2),
+ poolSize,
'./benchmarks/internal/thread-worker.mjs',
poolOptions
)
case WorkerTypes.cluster:
return new DynamicClusterPool(
- poolSize / 2,
- poolSize * 3,
- './benchmarks/internal/cluster-worker.mjs',
+ Math.floor(poolSize / 2),
+ poolSize,
+ './benchmarks/internal/cluster-worker.cjs',
poolOptions
)
}
break
}
}
+
+const runPoolifierPool = async (pool, { taskExecutions, workerData }) => {
+ return await new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 1; i <= taskExecutions; i++) {
+ pool
+ .execute(workerData)
+ .then(() => {
+ ++executions
+ if (executions === taskExecutions) {
+ resolve({ ok: 1 })
+ }
+ return undefined
+ })
+ .catch(err => {
+ console.error(err)
+ reject(err)
+ })
+ }
+ })
+}
+
+export const runPoolifierBenchmarkBenchmarkJs = async (
+ name,
+ workerType,
+ poolType,
+ poolSize,
+ poolOptions,
+ { taskExecutions, workerData }
+) => {
+ return await new Promise((resolve, reject) => {
+ const pool = buildPoolifierPool(workerType, poolType, poolSize, poolOptions)
+ let workerChoiceStrategy
+ let enableTasksQueue
+ let workerChoiceStrategyOptions
+ if (poolOptions != null) {
+ ({
+ workerChoiceStrategy,
+ enableTasksQueue,
+ workerChoiceStrategyOptions
+ } = poolOptions)
+ }
+ const measurement = workerChoiceStrategyOptions?.measurement
+ new Benchmark(
+ `${name} with ${workerChoiceStrategy ?? pool.opts.workerChoiceStrategy}${
+ measurement != null ? `, with ${measurement}` : ''
+ } and ${enableTasksQueue ? 'with' : 'without'} tasks queue`,
+ async () => {
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ },
+ {
+ onStart: () => {
+ if (workerChoiceStrategy != null) {
+ strictEqual(pool.opts.workerChoiceStrategy, workerChoiceStrategy)
+ }
+ if (enableTasksQueue != null) {
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ }
+ if (measurement != null) {
+ strictEqual(
+ pool.opts.workerChoiceStrategyOptions.measurement,
+ measurement
+ )
+ }
+ },
+ onComplete: event => {
+ console.info(event.target.toString())
+ if (pool.started && !pool.destroying) {
+ pool.destroy().then(resolve).catch(reject)
+ } else {
+ resolve()
+ }
+ },
+ onError: event => {
+ if (pool.started && !pool.destroying) {
+ pool
+ .destroy()
+ .then(() => {
+ return reject(event.target.error)
+ })
+ .catch(() => {})
+ } else {
+ reject(event.target.error)
+ }
+ }
+ }
+ ).run({ async: true })
+ })
+}
+
+export const runPoolifierBenchmarkBenchmarkJsSuite = async (
+ name,
+ workerType,
+ poolType,
+ poolSize,
+ { taskExecutions, workerData }
+) => {
+ return await new Promise((resolve, reject) => {
+ const pool = buildPoolifierPool(workerType, poolType, poolSize)
+ const suite = new Benchmark.Suite(name, {
+ onComplete: () => {
+ if (pool.started && !pool.destroying) {
+ pool.destroy().then(resolve).catch(reject)
+ } else {
+ resolve()
+ }
+ },
+ onCycle: event => {
+ console.info(event.target.toString())
+ },
+ onError: event => {
+ if (pool.started && !pool.destroying) {
+ pool
+ .destroy()
+ .then(() => {
+ return reject(event.target.error)
+ })
+ .catch(() => {})
+ } else {
+ reject(event.target.error)
+ }
+ }
+ })
+ for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ for (const enableTasksQueue of [false, true]) {
+ if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
+ for (const measurement of [Measurements.runTime, Measurements.elu]) {
+ suite.add(
+ `${name} with ${workerChoiceStrategy}, with ${measurement} and ${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ },
+ {
+ onStart: () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy, {
+ measurement
+ })
+ pool.enableTasksQueue(enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategyOptions.measurement,
+ measurement
+ )
+ }
+ }
+ )
+ }
+ } else {
+ suite.add(
+ `${name} with ${workerChoiceStrategy} and ${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ },
+ {
+ onStart: () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ pool.enableTasksQueue(enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ }
+ }
+ )
+ }
+ }
+ }
+ suite
+ .on('complete', function () {
+ console.info(
+ 'Fastest is ' +
+ LIST_FORMATTER.format(this.filter('fastest').map('name'))
+ )
+ })
+ .run({ async: true })
+ })
+}
+
+export const buildPoolifierBenchmarkMitata = (
+ name,
+ workerType,
+ poolType,
+ poolSize,
+ { taskExecutions, workerData }
+) => {
+ try {
+ const pool = buildPoolifierPool(workerType, poolType, poolSize)
+ for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ for (const enableTasksQueue of [false, true]) {
+ if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
+ for (const measurement of [Measurements.runTime, Measurements.elu]) {
+ group(name, () => {
+ bench(
+ `${name} with ${workerChoiceStrategy}, with ${measurement} and ${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy, {
+ measurement
+ })
+ pool.enableTasksQueue(enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategyOptions.measurement,
+ measurement
+ )
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ }
+ )
+ })
+ }
+ } else {
+ group(name, () => {
+ bench(
+ `${name} with ${workerChoiceStrategy} and ${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ pool.enableTasksQueue(enableTasksQueue)
+ strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ }
+ )
+ })
+ }
+ }
+ }
+ return pool
+ } catch (error) {
+ console.error(error)
+ }
+}
+
+const LIST_FORMATTER = new Intl.ListFormat('en-US', {
+ style: 'long',
+ type: 'conjunction'
+})
+
+export { executeTaskFunction }