### Added
- Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+- Add `start()` method to pool API to start the minimum number of workers.
- Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
- Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
import crypto from 'node:crypto'
+import assert from 'node:assert'
import fs from 'node:fs'
+import Benchmark from 'benchmark'
import {
DynamicClusterPool,
DynamicThreadPool,
FixedClusterPool,
FixedThreadPool,
PoolTypes,
+ WorkerChoiceStrategies,
WorkerTypes
} from '../lib/index.mjs'
import { TaskFunctions } from './benchmarks-types.mjs'
}
}
-export const runPoolifierTest = async (
+export const runPoolifierPool = async (
pool,
{ taskExecutions, workerData }
) => {
- return new Promise((resolve, reject) => {
+ return await new Promise((resolve, reject) => {
let executions = 0
for (let i = 1; i <= taskExecutions; i++) {
pool
.then(() => {
++executions
if (executions === taskExecutions) {
- return resolve({ ok: 1 })
+ resolve({ ok: 1 })
}
return null
})
.catch(err => {
console.error(err)
- return reject(err)
+ reject(err)
})
}
})
}
-export const getPoolImplementationName = pool => {
- if (pool instanceof FixedThreadPool) {
- return 'FixedThreadPool'
- } else if (pool instanceof DynamicThreadPool) {
- return 'DynamicThreadPool'
- } else if (pool instanceof FixedClusterPool) {
- return 'FixedClusterPool'
- } else if (pool instanceof DynamicClusterPool) {
- return 'DynamicClusterPool'
- }
+export const runPoolifierPoolBenchmark = async (
+ name,
+ pool,
+ { taskExecutions, workerData }
+) => {
+ return await new Promise((resolve, reject) => {
+ try {
+ const suite = new Benchmark.Suite(name)
+ for (const workerChoiceStrategy of Object.values(
+ WorkerChoiceStrategies
+ )) {
+ for (const enableTasksQueue of [false, true]) {
+ suite.add(
+ `${name}|${workerChoiceStrategy}|${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ pool.enableTasksQueue(enableTasksQueue)
+ assert.strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ await runPoolifierPool(pool, {
+ taskExecutions,
+ workerData
+ })
+ }
+ )
+ }
+ }
+ suite
+ .on('cycle', event => {
+ console.info(event.target.toString())
+ })
+ .on('complete', async function () {
+ console.info(
+ 'Fastest is ' +
+ LIST_FORMATTER.format(this.filter('fastest').map('name'))
+ )
+ await pool.destroy()
+ resolve()
+ })
+ .run({ async: true })
+ } catch (error) {
+ reject(error)
+ }
+ })
}
export const LIST_FORMATTER = new Intl.ListFormat('en-US', {
-import assert from 'node:assert'
-import Benchmark from 'benchmark'
import {
PoolTypes,
- WorkerChoiceStrategies,
WorkerTypes,
availableParallelism
} from '../../lib/index.mjs'
import { TaskFunctions } from '../benchmarks-types.mjs'
import {
- LIST_FORMATTER,
buildPoolifierPool,
- getPoolImplementationName,
- runPoolifierTest
+ runPoolifierPoolBenchmark
} from '../benchmarks-utils.mjs'
const poolSize = availableParallelism()
-const fixedThreadPool = buildPoolifierPool(
- WorkerTypes.thread,
- PoolTypes.fixed,
- poolSize
-)
-
const taskExecutions = 1
const workerData = {
function: TaskFunctions.jsonIntegerSerialization,
taskSize: 1000
}
-const poolifierSuite = new Benchmark.Suite('Poolifier')
-
-for (const pool of [fixedThreadPool]) {
- for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
- for (const enableTasksQueue of [false, true]) {
- poolifierSuite.add(
- `${getPoolImplementationName(pool)}|${workerChoiceStrategy}|${
- enableTasksQueue ? 'with' : 'without'
- } tasks queue`,
- async () => {
- pool.setWorkerChoiceStrategy(workerChoiceStrategy)
- pool.enableTasksQueue(enableTasksQueue)
- assert.strictEqual(
- pool.opts.workerChoiceStrategy,
- workerChoiceStrategy
- )
- assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
- await runPoolifierTest(pool, {
- taskExecutions,
- workerData
- })
- }
- )
- }
+// FixedThreadPool
+await runPoolifierPoolBenchmark(
+ 'Poolifier FixedThreadPool',
+ buildPoolifierPool(WorkerTypes.thread, PoolTypes.fixed, poolSize),
+ {
+ taskExecutions,
+ workerData
}
-}
+)
-poolifierSuite
- .on('cycle', event => {
- console.info(event.target.toString())
- })
- .on('complete', async function () {
- console.info(
- 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
- )
- await fixedThreadPool.destroy()
- })
- .run({ async: true })
+// DynamicThreadPool
+await runPoolifierPoolBenchmark(
+ 'Poolifier DynamicThreadPool',
+ buildPoolifierPool(WorkerTypes.thread, PoolTypes.dynamic, poolSize),
+ {
+ taskExecutions,
+ workerData
+ }
+)
const { TaskFunctions } = require('./test-types')
const waitWorkerEvents = async (pool, workerEvent, numberOfEventsToWait) => {
- return new Promise(resolve => {
+ return await new Promise(resolve => {
let events = 0
if (numberOfEventsToWait === 0) {
resolve(events)
}
const waitPoolEvents = async (pool, poolEvent, numberOfEventsToWait) => {
- return new Promise(resolve => {
+ return await new Promise(resolve => {
let events = 0
if (numberOfEventsToWait === 0) {
resolve(events)
}
const sleep = async ms => {
- return new Promise(resolve => setTimeout(resolve, ms))
+ return await new Promise(resolve => setTimeout(resolve, ms))
}
const sleepTaskFunction = async (
rejection = false,
rejectionMessage = ''
) => {
- return new Promise((resolve, reject) => {
+ return await new Promise((resolve, reject) => {
setTimeout(
() =>
rejection === true