"lines": 94,
"statements": 94,
"functions": 95,
- "branches": 93
+ "branches": 92
}
skipWords: [
'axios',
'benoit',
- 'benny',
'browserslist',
'builtins',
'christopher',
name: Benchmark
on:
- workflow_dispatch:
+ push:
+ branches:
+ - master
+
+permissions:
+ contents: write
+ deployments: write
jobs:
internal-benchmark:
- strategy:
- matrix:
- os: [windows-latest, macos-latest, ubuntu-latest]
- node: ['16.x', '18.x', '20.x']
-
- name: Internal benchmark with Node.js ${{ matrix.node }} on ${{ matrix.os }}
-
- runs-on: ${{ matrix.os }}
-
+ if: github.repository == 'poolifier/poolifier'
+ name: Internal benchmark
+ runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
-
- name: Setup pnpm
uses: pnpm/action-setup@v2
with:
version: 8
-
- - name: Setup Node.js ${{ matrix.node }}
+ - name: Setup Node.js
uses: actions/setup-node@v3
with:
- node-version: ${{ matrix.node }}
+ node-version: '18.x'
cache: 'pnpm'
-
- - name: Install
+ - name: Install dependencies
run: pnpm install --ignore-scripts
-
- - name: Production Benchmark
- run: pnpm benchmark:prod
+ - name: Run production benchmark
+ run: pnpm benchmark:prod | tee ./benchmarks/internal/output.txt
+ - name: Store production benchmark result
+ uses: benchmark-action/github-action-benchmark@v1
+ with:
+ name: Internal benchmark
+ tool: 'benchmarkjs'
+ output-file-path: ./benchmarks/internal/output.txt
+ github-token: ${{ secrets.BENCHMARK_RESULTS_TOKEN }}
+ auto-push: true
+ alert-threshold: '200%'
+ comment-on-alert: true
+ fail-on-alert: true
+ gh-repository: 'github.com/poolifier/benchmark-results'
node-version: ${{ matrix.node }}
cache: 'pnpm'
- - name: Install
+ - name: Install Dependencies
run: pnpm install --ignore-scripts
- name: Build
-name: Node.js Package
+name: Publish package
on:
release:
node-version: '18.x'
cache: 'pnpm'
- - name: Install
+ - name: Install Dependencies
run: pnpm install --ignore-scripts
- name: Tests & Coverage
registry-url: https://registry.npmjs.org/
cache: 'pnpm'
- - name: Install
+ - name: Install Dependencies
run: pnpm install --ignore-scripts
- name: Read package.json version
# registry-url: https://npm.pkg.github.com
# cache: 'pnpm'
- # - name: Install
+ # - name: Install Dependencies
# run: pnpm install --ignore-scripts
# - name: Read package.json version
dist
tmp
reports/
-benchmarks/internal/results/
- Add `addTaskFunction()`, `removeTaskFunction()`, `setDefaultTaskFunction()` methods to pool API.
+### Added
+
+- Add `startWorkers` to pool options to whether start the minimum number of workers at pool creation or not.
+- Add `taskStealing` and `tasksStealingOnPressure` to tasks queue options to whether enable task stealing or not and whether enable tasks stealing on back pressure or not.
+- Continuous internal benchmarking: https://poolifier.github.io/benchmark-results/dev/bench.
+
## [2.6.44] - 2023-09-08
### Fixed
- Optimize worker alive status check.
- BREAKING CHANGE: Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
- Optimize `LESS_USED` worker choice strategy.
-- Update benchmarks versus external threads pools.
+- Update benchmark versus external threads pools.
- Optimize tasks usage statistics requirements for worker choice strategy.
### Fixed
- Optimize worker alive status check.
- BREAKING CHANGE: Rename worker choice strategy `LESS_RECENTLY_USED` to `LESS_USED`.
- Optimize `LESS_USED` worker choice strategy.
-- Update benchmarks versus external threads pools.
+- Update benchmark versus external threads pools.
### Fixed
### Changed
- Optimize fair share task scheduling algorithm implementation.
-- Update benchmarks versus external pools results with latest version.
+- Update benchmark versus external pools results with latest version.
## [2.3.3] - 2022-10-15
## Folder Structure
-The [internal](./internal) folder contains poolifier internal benchmarks.
-The [versus-external-pools](./versus-external-pools) folder contains benchmarks versus other Node.js pools.
+The [internal](./internal) folder contains poolifier internal benchmark.
+The [versus-external-pools](./versus-external-pools) folder contains benchmark versus other Node.js pools.
## Poolifier vs other pools benchmark
- [microjob](https://github.com/wilk/microjob): removed because unmaintained since more than 5 years.
- [threads.js](https://github.com/andywer/threads.js/): removed because not a threads pool.
-> :warning: **We would need funds to run our benchmarks more often and on Cloud VMs, please consider to sponsor this project**
+> :warning: **We would need funds to run our benchmark more often and on Cloud VMs, please consider to sponsor this project**
-Read the [README.md](./versus-external-pools/README.md) to know how to run these benchmarks.
+Read the [README.md](./versus-external-pools/README.md) to know how to run the benchmark.
## Poolifier internal benchmark
-To run the internal benchmarks, you just need to navigate to the root of poolifier project and run `pnpm benchmark`
+### Usage
+
+To run the internal benchmark, you just need to navigate to the root of poolifier project and run `pnpm benchmark`.
+
+### [Results](https://poolifier.github.io/benchmark-results/dev/bench)
})
}
+export const getPoolImplementationName = pool => {
+ if (pool instanceof FixedThreadPool) {
+ return 'FixedThreadPool'
+ } else if (pool instanceof DynamicThreadPool) {
+ return 'DynamicThreadPool'
+ } else if (pool instanceof FixedClusterPool) {
+ return 'FixedClusterPool'
+ } else if (pool instanceof DynamicClusterPool) {
+ return 'DynamicClusterPool'
+ }
+}
+
+export const LIST_FORMATTER = new Intl.ListFormat('en-US', {
+ style: 'long',
+ type: 'conjunction'
+})
+
export const executeAsyncFn = async fn => {
try {
await fn()
-import { add, complete, cycle, save, suite } from 'benny'
+import assert from 'node:assert'
+import Benchmark from 'benchmark'
import {
- Measurements,
PoolTypes,
WorkerChoiceStrategies,
WorkerTypes,
availableParallelism
} from '../../lib/index.mjs'
import { TaskFunctions } from '../benchmarks-types.mjs'
-import { buildPoolifierPool, runPoolifierTest } from '../benchmarks-utils.mjs'
+import {
+ LIST_FORMATTER,
+ buildPoolifierPool,
+ getPoolImplementationName,
+ runPoolifierTest
+} from '../benchmarks-utils.mjs'
const poolSize = availableParallelism()
-const pools = []
-for (const poolType of Object.values(PoolTypes)) {
- for (const workerType of Object.values(WorkerTypes)) {
- if (workerType === WorkerTypes.cluster) {
- continue
- }
- for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
- for (const enableTasksQueue of [false, true]) {
- if (workerChoiceStrategy === WorkerChoiceStrategies.FAIR_SHARE) {
- for (const measurement of [Measurements.runTime, Measurements.elu]) {
- pools.push([
- `${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}|measurement:${measurement}`,
- buildPoolifierPool(workerType, poolType, poolSize, {
- workerChoiceStrategy,
- workerChoiceStrategyOptions: {
- measurement
- },
- enableTasksQueue
- })
- ])
- }
- } else {
- pools.push([
- `${poolType}|${workerType}|${workerChoiceStrategy}|tasks queue:${enableTasksQueue}`,
- buildPoolifierPool(workerType, poolType, poolSize, {
- workerChoiceStrategy,
- enableTasksQueue
- })
- ])
- }
- }
- }
- }
-}
+const fixedThreadPool = buildPoolifierPool(
+ WorkerTypes.thread,
+ PoolTypes.fixed,
+ poolSize
+)
const taskExecutions = 1
const workerData = {
function: TaskFunctions.jsonIntegerSerialization,
taskSize: 1000
}
-const addPools = pools =>
- pools.map(([name, pool]) => {
- return add(name, async () => {
- await runPoolifierTest(pool, {
- taskExecutions,
- workerData
- })
- })
- })
-const resultsFile = 'poolifier'
-const resultsFolder = 'benchmarks/internal/results'
-suite(
- 'Poolifier',
- ...addPools(pools),
- cycle(),
- complete(),
- save({
- file: resultsFile,
- folder: resultsFolder,
- format: 'json',
- details: true
- }),
- save({
- file: resultsFile,
- folder: resultsFolder,
- format: 'chart.html',
- details: true
- }),
- save({
- file: resultsFile,
- folder: resultsFolder,
- format: 'table.html',
- details: true
+const poolifierSuite = new Benchmark.Suite('Poolifier')
+
+for (const pool of [fixedThreadPool]) {
+ for (const workerChoiceStrategy of Object.values(WorkerChoiceStrategies)) {
+ for (const enableTasksQueue of [false, true]) {
+ poolifierSuite.add(
+ `${getPoolImplementationName(pool)}|${workerChoiceStrategy}|${
+ enableTasksQueue ? 'with' : 'without'
+ } tasks queue`,
+ async () => {
+ pool.setWorkerChoiceStrategy(workerChoiceStrategy)
+ pool.enableTasksQueue(enableTasksQueue)
+ assert.strictEqual(
+ pool.opts.workerChoiceStrategy,
+ workerChoiceStrategy
+ )
+ assert.strictEqual(pool.opts.enableTasksQueue, enableTasksQueue)
+ await runPoolifierTest(pool, {
+ taskExecutions,
+ workerData
+ })
+ }
+ )
+ }
+ }
+}
+
+poolifierSuite
+ .on('cycle', event => {
+ console.info(event.target.toString())
})
-)
- .then(() => {
- // eslint-disable-next-line n/no-process-exit
- return process.exit()
+ .on('complete', async function () {
+ console.info(
+ 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
+ )
+ await fixedThreadPool.destroy()
})
- .catch(err => console.error(err))
+ .run({ async: true })
import { executeTaskFunction } from '../benchmarks-utils.mjs'
import { TaskFunctions } from '../benchmarks-types.mjs'
-const debug = false
-
const taskFunction = data => {
data = data || {}
data.function = data.function || TaskFunctions.jsonIntegerSerialization
+ data.debug = data.debug || false
const res = executeTaskFunction(data)
- debug === true && console.debug(`This is the main thread ${isPrimary}`)
+ data.debug === true && console.debug(`This is the main thread ${isPrimary}`)
return res
}
import { executeTaskFunction } from '../benchmarks-utils.mjs'
import { TaskFunctions } from '../benchmarks-types.mjs'
-const debug = false
-
const taskFunction = data => {
data = data || {}
data.function = data.function || TaskFunctions.jsonIntegerSerialization
+ data.debug = data.debug || false
const res = executeTaskFunction(data)
- debug === true && console.debug(`This is the main thread ${isMainThread}`)
+ data.debug === true &&
+ console.debug(`This is the main thread ${isMainThread}`)
return res
}
-import Benchmark from 'benny'
-import { generateRandomInteger } from '../benchmarks-utils.mjs'
+import Benchmark from 'benchmark'
+import { LIST_FORMATTER, generateRandomInteger } from '../benchmarks-utils.mjs'
function generateRandomTasksMap (
numberOfWorkers,
)
}
-Benchmark.suite(
- 'Least used worker tasks distribution',
- Benchmark.add('Loop select', () => {
+new Benchmark.Suite('Least used worker tasks distribution')
+ .add('Loop select', () => {
loopSelect(tasksMap)
- }),
- Benchmark.add('Array sort select', () => {
+ })
+ .add('Array sort select', () => {
arraySortSelect(tasksMap)
- }),
- Benchmark.add('Quick select loop', () => {
+ })
+ .add('Quick select loop', () => {
quickSelectLoop(tasksMap)
- }),
- Benchmark.add('Quick select loop with random pivot', () => {
+ })
+ .add('Quick select loop with random pivot', () => {
quickSelectLoopRandomPivot(tasksMap)
- }),
- Benchmark.add('Quick select recursion', () => {
+ })
+ .add('Quick select recursion', () => {
quickSelectRecursion(tasksMap)
- }),
- Benchmark.add('Quick select recursion with random pivot', () => {
+ })
+ .add('Quick select recursion with random pivot', () => {
quickSelectRecursionRandomPivot(tasksMap)
- }),
- Benchmark.cycle(),
- Benchmark.complete()
-)
+ })
+ .on('cycle', event => {
+ console.info(event.target.toString())
+ })
+ .on('complete', function () {
+ console.info(
+ 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
+ )
+ })
+ .run()
-import Benchmark from 'benny'
+import Benchmark from 'benchmark'
+import { LIST_FORMATTER } from '../benchmarks-utils.mjs'
function generateWorkersArray (numberOfWorkers) {
return [...Array(numberOfWorkers).keys()]
return chosenWorker
}
-Benchmark.suite(
- 'Round robin tasks distribution',
- Benchmark.add('Ternary off by one', () => {
+new Benchmark.Suite('Round robin tasks distribution')
+ .add('Ternary off by one', () => {
nextWorkerIndex = 0
roundRobinTernaryOffByOne()
- }),
- Benchmark.add('Ternary with negation', () => {
+ })
+ .add('Ternary with negation', () => {
nextWorkerIndex = 0
roundRobinTernaryWithNegation()
- }),
- Benchmark.add('Ternary with pre-choosing', () => {
+ })
+ .add('Ternary with pre-choosing', () => {
nextWorkerIndex = 0
roundRobinTernaryWithPreChoosing()
- }),
- Benchmark.add('Increment+Modulo', () => {
+ })
+ .add('Increment+Modulo', () => {
nextWorkerIndex = 0
roundRobinIncrementModulo()
- }),
- Benchmark.cycle(),
- Benchmark.complete()
-)
+ })
+ .on('cycle', event => {
+ console.info(event.target.toString())
+ })
+ .on('complete', function () {
+ console.info(
+ 'Fastest is ' + LIST_FORMATTER.format(this.filter('fastest').map('name'))
+ )
+ })
+ .run()
{
- "$schema": "https://biomejs.dev/schemas/1.2.1/schema.json",
+ "$schema": "https://biomejs.dev/schemas/1.2.2/schema.json",
"organizeImports": {
"enabled": false
},
- [`pool = new FixedThreadPool/FixedClusterPool(numberOfThreads/numberOfWorkers, filePath, opts)`](#pool--new-fixedthreadpoolfixedclusterpoolnumberofthreadsnumberofworkers-filepath-opts)
- [`pool = new DynamicThreadPool/DynamicClusterPool(min, max, filePath, opts)`](#pool--new-dynamicthreadpooldynamicclusterpoolmin-max-filepath-opts)
- [`pool.execute(data, name, transferList)`](#poolexecutedata-name-transferlist)
+ - [`pool.start()`](#poolstart)
- [`pool.destroy()`](#pooldestroy)
- [`pool.listTaskFunctionNames()`](#poollisttaskfunctionnames)
- [`PoolOptions`](#pooloptions)
This method is available on both pool implementations and returns a promise with the task function execution response.
+### `pool.start()`
+
+This method is available on both pool implementations and will start the minimum number of workers.
+
### `pool.destroy()`
This method is available on both pool implementations and will call the terminate method on each worker.
An object with these properties:
-- `onlineHandler` (optional) - A function that will listen for online event on each worker
-- `messageHandler` (optional) - A function that will listen for message event on each worker
-- `errorHandler` (optional) - A function that will listen for error event on each worker
-- `exitHandler` (optional) - A function that will listen for exit event on each worker
+- `onlineHandler` (optional) - A function that will listen for online event on each worker.
+ Default: `() => {}`
+- `messageHandler` (optional) - A function that will listen for message event on each worker.
+ Default: `() => {}`
+- `errorHandler` (optional) - A function that will listen for error event on each worker.
+ Default: `() => {}`
+- `exitHandler` (optional) - A function that will listen for exit event on each worker.
+ Default: `() => {}`
+
- `workerChoiceStrategy` (optional) - The worker choice strategy to use in this pool:
- `WorkerChoiceStrategies.ROUND_ROBIN`: Submit tasks to worker in a round robin fashion
Default: `{ retries: 6, runTime: { median: false }, waitTime: { median: false }, elu: { median: false } }`
+- `startWorkers` (optional) - Start the minimum number of workers at pool creation.
+ Default: `true`
- `restartWorkerOnError` (optional) - Restart worker on uncaught error in this pool.
Default: `true`
- `enableEvents` (optional) - Events emission enablement in this pool.
- `size` (optional) - The maximum number of tasks that can be queued on a worker before flagging it as back pressured. It must be a positive integer.
- `concurrency` (optional) - The maximum number of tasks that can be executed concurrently on a worker. It must be a positive integer.
+ - `taskStealing` (optional) - Task stealing enablement.
+ - `tasksStealingOnBackPressure` (optional) - Tasks stealing enablement on back pressure.
- Default: `{ size: (pool maximum size)^2, concurrency: 1 }`
+ Default: `{ size: (pool maximum size)^2, concurrency: 1, taskStealing: true, tasksStealingOnBackPressure: true }`
#### `ThreadPoolOptions extends PoolOptions`
}
},
"devDependencies": {
- "@biomejs/biome": "^1.2.1",
+ "@biomejs/biome": "^1.2.2",
"@commitlint/cli": "^17.7.1",
"@commitlint/config-conventional": "^17.7.0",
"@release-it/bumper": "^5.1.0",
"@release-it/keep-a-changelog": "^4.0.0",
"@rollup/plugin-terser": "^0.4.3",
"@rollup/plugin-typescript": "^11.1.3",
- "@types/node": "^20.6.1",
+ "@types/node": "^20.6.2",
"@typescript-eslint/eslint-plugin": "^6.7.0",
"@typescript-eslint/parser": "^6.7.0",
- "benny": "^3.7.1",
+ "benchmark": "^2.1.4",
"c8": "^8.0.1",
"eslint": "^8.49.0",
"eslint-config-standard": "^17.1.0",
devDependencies:
'@biomejs/biome':
- specifier: ^1.2.1
- version: 1.2.1
+ specifier: ^1.2.2
+ version: 1.2.2
'@commitlint/cli':
specifier: ^17.7.1
version: 17.7.1
specifier: ^11.1.3
version: 11.1.3(rollup@3.29.2)(typescript@5.2.2)
'@types/node':
- specifier: ^20.6.1
- version: 20.6.1
+ specifier: ^20.6.2
+ version: 20.6.2
'@typescript-eslint/eslint-plugin':
specifier: ^6.7.0
version: 6.7.0(@typescript-eslint/parser@6.7.0)(eslint@8.49.0)(typescript@5.2.2)
'@typescript-eslint/parser':
specifier: ^6.7.0
version: 6.7.0(eslint@8.49.0)(typescript@5.2.2)
- benny:
- specifier: ^3.7.1
- version: 3.7.1
+ benchmark:
+ specifier: ^2.1.4
+ version: 2.1.4
c8:
specifier: ^8.0.1
version: 8.0.1
engines: {node: '>=0.10.0'}
dev: true
- /@arrows/array@1.4.1:
- resolution: {integrity: sha512-MGYS8xi3c4tTy1ivhrVntFvufoNzje0PchjEz6G/SsWRgUKxL4tKwS6iPdO8vsaJYldagAeWMd5KRD0aX3Q39g==}
- dependencies:
- '@arrows/composition': 1.2.2
- dev: true
-
- /@arrows/composition@1.2.2:
- resolution: {integrity: sha512-9fh1yHwrx32lundiB3SlZ/VwuStPB4QakPsSLrGJFH6rCXvdrd060ivAZ7/2vlqPnEjBkPRRXOcG1YOu19p2GQ==}
- dev: true
-
- /@arrows/dispatch@1.0.3:
- resolution: {integrity: sha512-v/HwvrFonitYZM2PmBlAlCqVqxrkIIoiEuy5bQgn0BdfvlL0ooSBzcPzTMrtzY8eYktPyYcHg8fLbSgyybXEqw==}
- dependencies:
- '@arrows/composition': 1.2.2
- dev: true
-
- /@arrows/error@1.0.2:
- resolution: {integrity: sha512-yvkiv1ay4Z3+Z6oQsUkedsQm5aFdyPpkBUQs8vejazU/RmANABx6bMMcBPPHI4aW43VPQmXFfBzr/4FExwWTEA==}
- dev: true
-
- /@arrows/multimethod@1.4.1:
- resolution: {integrity: sha512-AZnAay0dgPnCJxn3We5uKiB88VL+1ZIF2SjZohLj6vqY2UyvB/sKdDnFP+LZNVsTC5lcnGPmLlRRkAh4sXkXsQ==}
- dependencies:
- '@arrows/array': 1.4.1
- '@arrows/composition': 1.2.2
- '@arrows/error': 1.0.2
- fast-deep-equal: 3.1.3
- dev: true
-
/@babel/code-frame@7.22.13:
resolution: {integrity: sha512-XktuhWlJ5g+3TJXc5upd9Ks1HutSArik6jf2eAjYFyIOf4ej3RN+184cZbzDvbPnuTJIUhPKKJE3cIsYTiAT3w==}
engines: {node: '>=6.9.0'}
resolution: {integrity: sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==}
dev: true
- /@biomejs/biome@1.2.1:
- resolution: {integrity: sha512-olT0ldjncEQx+mtVLJ1LwJikrDtMYZakJ8dIMYXEoX9t10xjKpeY4OyKBa9Os8/EUHah9YmpYL64hQooza3WrA==}
+ /@biomejs/biome@1.2.2:
+ resolution: {integrity: sha512-fXwXi56ZdaKO/N3rTmhWw41UxstoviODk+wia4WWNSlm23r8xJ/NxjaZ88scV2IsmsFHqc8rmwb2dkrStAdIEw==}
engines: {node: '>=14.*'}
hasBin: true
requiresBuild: true
optionalDependencies:
- '@biomejs/cli-darwin-arm64': 1.2.1
- '@biomejs/cli-darwin-x64': 1.2.1
- '@biomejs/cli-linux-arm64': 1.2.1
- '@biomejs/cli-linux-x64': 1.2.1
- '@biomejs/cli-win32-arm64': 1.2.1
- '@biomejs/cli-win32-x64': 1.2.1
+ '@biomejs/cli-darwin-arm64': 1.2.2
+ '@biomejs/cli-darwin-x64': 1.2.2
+ '@biomejs/cli-linux-arm64': 1.2.2
+ '@biomejs/cli-linux-x64': 1.2.2
+ '@biomejs/cli-win32-arm64': 1.2.2
+ '@biomejs/cli-win32-x64': 1.2.2
dev: true
- /@biomejs/cli-darwin-arm64@1.2.1:
- resolution: {integrity: sha512-lz/Hn/isGnnZqILhnPiwO3Hy4mhGr1APrjXkCBolONyYG67x1OT3l8T5yaNW62GsIEeblabWkwLl/MkoPJJXZQ==}
+ /@biomejs/cli-darwin-arm64@1.2.2:
+ resolution: {integrity: sha512-Fx1IURKhoqH6wPawtKLT6wcfMSjRRcNK8+VWau0iDOjXvNtjJpSmICbU89B7Vt/gZRwPqkfDMBkFwm6V5vFTSQ==}
engines: {node: '>=14.*'}
cpu: [arm64]
os: [darwin]
dev: true
optional: true
- /@biomejs/cli-darwin-x64@1.2.1:
- resolution: {integrity: sha512-mWJE2+sPiHJk0kHkuHby9ssTm2WR/R7USwJbbNmy7PM07DZdcciF2XbyLofO2ZGh0QI0LEW59OcjMwYChQnZbA==}
+ /@biomejs/cli-darwin-x64@1.2.2:
+ resolution: {integrity: sha512-JNaAFOI/ZisnmzvcFNd73geJxaFaN2L4YsWM6cgBeKyLY/ycl9C/PBTFfEmeB1c7f5XIIal8P2cj47kLJpN5Ig==}
engines: {node: '>=14.*'}
cpu: [x64]
os: [darwin]
dev: true
optional: true
- /@biomejs/cli-linux-arm64@1.2.1:
- resolution: {integrity: sha512-M81if0mY66Feq3nsOoNRa+o57k6YecCeH4EX2EqkU/ObqYfVLmWnIvFsgqEZE/e/bguNmqBoAIJaIV26PvyyJg==}
+ /@biomejs/cli-linux-arm64@1.2.2:
+ resolution: {integrity: sha512-JHXRnfhOLx8UO/Fcyn2c5pFRri0XKqRZm2wf5oH5GSfLVpckDw2X15dYGbu3nmfM/3pcAaTV46pUpjrCnaAieg==}
engines: {node: '>=14.*'}
cpu: [arm64]
os: [linux]
dev: true
optional: true
- /@biomejs/cli-linux-x64@1.2.1:
- resolution: {integrity: sha512-gzTpmpvBmSFu6oeUeFKP8C34WpV91rha4gS+3swAuw5G+C4PosOTpAVPKnElbgl9iS0+stfP73tXARQUXySDPA==}
+ /@biomejs/cli-linux-x64@1.2.2:
+ resolution: {integrity: sha512-5Zr+iM7lUKsw81p9PkXRESuH2/AhRZ6RCWkgE+FSLcxMhXy/4RDR+o2YQDsJM6cWKIzOJM05vDHTGrDq7vXE4A==}
engines: {node: '>=14.*'}
cpu: [x64]
os: [linux]
dev: true
optional: true
- /@biomejs/cli-win32-arm64@1.2.1:
- resolution: {integrity: sha512-SveEeHYjiXzRZhTE3HyURQ+CyZ3yLeKHUrggH4bSDQ1+b7rgDEF/XIEgMl+/3SWFlc+HdEpgbJWWZQCfSCqxww==}
+ /@biomejs/cli-win32-arm64@1.2.2:
+ resolution: {integrity: sha512-HvUcG2p++RvYP0zfOqh+DgiUUH+JI/uETr0kzWlOJ9F3lsG525pkywg4RSd4OvJd7Wpd3wt3UpN/A4IEJaVmbA==}
engines: {node: '>=14.*'}
cpu: [arm64]
os: [win32]
dev: true
optional: true
- /@biomejs/cli-win32-x64@1.2.1:
- resolution: {integrity: sha512-ooEjE+A6kbQhf0cPNvC8bXje1jk2uKWgJ8S3DgHZRBvr6DlBiUsU8C1ycURdhDiHZ5d6nOI98LFrj3WWR1ODzA==}
+ /@biomejs/cli-win32-x64@1.2.2:
+ resolution: {integrity: sha512-bfaFJwqJ9ApFga2o88OaROSd3pasYRzRGXHJWAE9VUUKdSNSTYxHOqVrNvV54yYPtL6Kt9xkuZa4HNu9it3TaA==}
engines: {node: '>=14.*'}
cpu: [x64]
os: [win32]
lodash.merge: 4.6.2
lodash.uniq: 4.5.0
resolve-from: 5.0.0
- ts-node: 10.9.1(@types/node@20.6.1)(typescript@5.2.2)
+ ts-node: 10.9.1(@types/node@20.6.2)(typescript@5.2.2)
typescript: 5.2.2
transitivePeerDependencies:
- '@swc/core'
'@jest/schemas': 29.6.3
'@types/istanbul-lib-coverage': 2.0.4
'@types/istanbul-reports': 3.0.1
- '@types/node': 20.6.1
+ '@types/node': 20.6.2
'@types/yargs': 17.0.24
chalk: 4.1.2
dev: true
optional: true
dependencies:
'@rollup/pluginutils': 5.0.4(rollup@3.29.2)
- resolve: 1.22.5
+ resolve: 1.22.6
rollup: 3.29.2
typescript: 5.2.2
dev: true
resolution: {integrity: sha512-ZUxbzKl0IfJILTS6t7ip5fQQM/J3TJYubDm3nMbgubNNYS62eXeUpoLUC8/7fJNiFYHTrGPQn7hspDUzIHX3UA==}
dependencies:
'@types/minimatch': 5.1.2
- '@types/node': 20.6.1
+ '@types/node': 20.6.2
dev: true
- /@types/http-cache-semantics@4.0.1:
- resolution: {integrity: sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ==}
+ /@types/http-cache-semantics@4.0.2:
+ resolution: {integrity: sha512-FD+nQWA2zJjh4L9+pFXqWOi0Hs1ryBCfI+985NjluQ1p8EYtoLvjLOKidXBtZ4/IcxDX4o8/E8qDS3540tNliw==}
dev: true
/@types/istanbul-lib-coverage@2.0.4:
'@types/istanbul-lib-report': 3.0.0
dev: true
- /@types/json-schema@7.0.12:
- resolution: {integrity: sha512-Hr5Jfhc9eYOQNPYO5WLDq/n4jqijdHNlDXjuAQkkt+mWdQR+XJToOHrsD4cPaMXpn6KO7y2+wM8AZEs8VpBLVA==}
+ /@types/json-schema@7.0.13:
+ resolution: {integrity: sha512-RbSSoHliUbnXj3ny0CNFOoxrIDV6SUGyStHsvDqosw6CkdPV8TtWGlfecuK4ToyMEAql6pzNxgCFKanovUzlgQ==}
dev: true
/@types/json5@0.0.29:
resolution: {integrity: sha512-bUBrPjEry2QUTsnuEjzjbS7voGWCc30W0qzgMf90GPeDGFRakvrz47ju+oqDAKCXLUCe39u57/ORMl/O/04/9g==}
dev: true
- /@types/node@20.6.1:
- resolution: {integrity: sha512-4LcJvuXQlv4lTHnxwyHQZ3uR9Zw2j7m1C9DfuwoTFQQP4Pmu04O6IfLYgMmHoOCt0nosItLLZAH+sOrRE0Bo8g==}
+ /@types/node@20.6.2:
+ resolution: {integrity: sha512-Y+/1vGBHV/cYk6OI1Na/LHzwnlNCAfU3ZNGrc1LdRe/LAIbdDPTTv/HU3M7yXN448aTVDq3eKRm2cg7iKLb8gw==}
dev: true
/@types/normalize-package-data@2.4.1:
eslint: ^6.0.0 || ^7.0.0 || ^8.0.0
dependencies:
'@eslint-community/eslint-utils': 4.4.0(eslint@8.49.0)
- '@types/json-schema': 7.0.12
+ '@types/json-schema': 7.0.13
'@types/semver': 7.5.2
'@typescript-eslint/scope-manager': 5.62.0
'@typescript-eslint/types': 5.62.0
eslint: ^7.0.0 || ^8.0.0
dependencies:
'@eslint-community/eslint-utils': 4.4.0(eslint@8.49.0)
- '@types/json-schema': 7.0.12
+ '@types/json-schema': 7.0.13
'@types/semver': 7.5.2
'@typescript-eslint/scope-manager': 6.7.0
'@typescript-eslint/types': 6.7.0
tslib: 2.6.2
dev: true
- /astral-regex@2.0.0:
- resolution: {integrity: sha512-Z7tMw1ytTXt5jqMcOP+OQteU1VuNK9Y02uuJtKQ1Sv69jXQKKg5cibLwGJow8yzZP+eAc18EmLGPal0bp36rvQ==}
- engines: {node: '>=8'}
- dev: true
-
/async-retry@1.3.3:
resolution: {integrity: sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==}
dependencies:
platform: 1.3.6
dev: true
- /benny@3.7.1:
- resolution: {integrity: sha512-USzYxODdVfOS7JuQq/L0naxB788dWCiUgUTxvN+WLPt/JfcDURNNj8kN/N+uK6PDvuR67/9/55cVKGPleFQINA==}
- engines: {node: '>=12'}
- dependencies:
- '@arrows/composition': 1.2.2
- '@arrows/dispatch': 1.0.3
- '@arrows/multimethod': 1.4.1
- benchmark: 2.1.4
- common-tags: 1.8.2
- fs-extra: 10.1.0
- json2csv: 5.0.7
- kleur: 4.1.5
- log-update: 4.0.0
- dev: true
-
/big-integer@1.6.51:
resolution: {integrity: sha512-GPEid2Y9QU1Exl1rpO9B2IPJGHPSupF5GnVIP0blYvNOMer2bTvSWs1jGOUg04hTmu67nmLsQ9TBo1puaotBHg==}
engines: {node: '>=0.6'}
resolution: {integrity: sha512-3SD4rrMu1msNGEtNSt8Od6enwdo//U9s4ykmXfA2TD58kcLkCobtCDiby7kNyj7a/Q7lz/mAesAFI54rTdnvBA==}
engines: {node: '>=14.16'}
dependencies:
- '@types/http-cache-semantics': 4.0.1
+ '@types/http-cache-semantics': 4.0.2
get-stream: 6.0.1
http-cache-semantics: 4.1.1
keyv: 4.5.3
resolution: {integrity: sha512-GpVkmM8vF2vQUkj2LvZmD35JxeJOLCwJ9cUkugyk2nuhbv3+mJvpLYYt+0+USMxE+oj+ey/lJEnhZw75x/OMcQ==}
dev: true
- /commander@6.2.1:
- resolution: {integrity: sha512-U7VdrJFnJgo4xjrHpTzu0yrHPGImdsmD95ZlgYSEajAn2JKzDhDTPG9kBTefmObL2w/ngeZnilk+OV9CG3d7UA==}
- engines: {node: '>= 6'}
- dev: true
-
/comment-parser@1.4.0:
resolution: {integrity: sha512-QLyTNiZ2KDOibvFPlZ6ZngVsZ/0gYnE6uTXi5aoDg8ed3AkJAz4sEje3Y8a29hQ1s6A99MZXe47fLAXQ1rTqaw==}
engines: {node: '>= 12.0.0'}
dev: true
- /common-tags@1.8.2:
- resolution: {integrity: sha512-gk/Z852D2Wtb//0I+kRFNKKE9dIIVirjoqPoA1wJU+XePVXZfGeBpk45+A1rKO4Q43prqWBNY/MiIeRLbPWUaA==}
- engines: {node: '>=4.0.0'}
- dev: true
-
/compare-func@2.0.0:
resolution: {integrity: sha512-zHig5N+tPWARooBnb0Zx1MFcdfpyJrfTJ3Y5L+IFvUm8rM74hHz66z0gw0x4tijh5CorKkKUCnW82R2vmpeCRA==}
dependencies:
dependencies:
'@types/node': 20.4.7
cosmiconfig: 8.3.6(typescript@5.2.2)
- ts-node: 10.9.1(@types/node@20.6.1)(typescript@5.2.2)
+ ts-node: 10.9.1(@types/node@20.6.2)(typescript@5.2.2)
typescript: 5.2.2
dev: true
dependencies:
debug: 3.2.7
is-core-module: 2.13.0
- resolve: 1.22.5
+ resolve: 1.22.6
transitivePeerDependencies:
- supports-color
dev: true
ignore: 5.2.4
is-core-module: 2.13.0
minimatch: 3.1.2
- resolve: 1.22.5
+ resolve: 1.22.6
semver: 7.5.4
dev: true
ignore: 5.2.4
is-core-module: 2.13.0
minimatch: 3.1.2
- resolve: 1.22.5
+ resolve: 1.22.6
semver: 7.5.4
dev: true
resolution: {integrity: sha512-OHx4Qwrrt0E4jEIcI5/Xb+f+QmJYNj2rrK8wiIdQOIrB9WrrJL8cjZvXdXuBTkkEwEqLycb5BeZDV1o2i9bTew==}
engines: {node: '>=12.0.0'}
dependencies:
- flatted: 3.2.7
+ flatted: 3.2.8
keyv: 4.5.3
rimraf: 3.0.2
dev: true
hasBin: true
dev: true
- /flatted@3.2.7:
- resolution: {integrity: sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==}
+ /flatted@3.2.8:
+ resolution: {integrity: sha512-6qu0W+A94UKNJRs3ffE8s/fWSHQbjqdNx8elGAe95IqnJA77P68TFz4+2cwC28ouAibiZdGBeV6DsvvMg+4vhQ==}
dev: true
/for-each@0.3.3:
engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
dependencies:
'@jest/types': 29.6.3
- '@types/node': 20.6.1
+ '@types/node': 20.6.2
chalk: 4.1.2
ci-info: 3.8.0
graceful-fs: 4.2.11
resolution: {integrity: sha512-ZClg6AaYvamvYEE82d3Iyd3vSSIjQ+odgjaTzRuO3s7toCdFKczob2i0zCh7JE8kWn17yvAWhUVxvqGwUalsRA==}
dev: true
- /json2csv@5.0.7:
- resolution: {integrity: sha512-YRZbUnyaJZLZUJSRi2G/MqahCyRv9n/ds+4oIetjDF3jWQA7AG7iSeKTiZiCNqtMZM7HDyt0e/W6lEnoGEmMGA==}
- engines: {node: '>= 10', npm: '>= 6.13.0'}
- deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info.
- hasBin: true
- dependencies:
- commander: 6.2.1
- jsonparse: 1.3.1
- lodash.get: 4.4.2
- dev: true
-
/json5@1.0.2:
resolution: {integrity: sha512-g1MWMLBiz8FKi1e4w0UyVL3w+iJceWAFBAaBnnGKOpNa5f8TLktkbre1+s6oICydWAm+HRUGTmI+//xv2hvXYA==}
hasBin: true
engines: {node: '>=0.10.0'}
dev: true
- /kleur@4.1.5:
- resolution: {integrity: sha512-o+NO+8WrRiQEE4/7nwRJhN1HWpVmJm511pBHUxPLtp0BUISzlBplORYSmTclCnJvQq2tKu/sgl3xVpkc7ZWuQQ==}
- engines: {node: '>=6'}
- dev: true
-
/latest-version@7.0.0:
resolution: {integrity: sha512-KvNT4XqAMzdcL6ka6Tl3i2lYeFDgXNCuIX+xNx6ZMVR1dFq+idXd9FLKNMOIx0t9mJ9/HudyX4oZWXZQ0UJHeg==}
engines: {node: '>=14.16'}
is-unicode-supported: 1.3.0
dev: true
- /log-update@4.0.0:
- resolution: {integrity: sha512-9fkkDevMefjg0mmzWFBW8YkFP91OrizzkW3diF7CpG+S2EYdy4+TVfGwz1zeF8x7hCx1ovSPTOE9Ngib74qqUg==}
- engines: {node: '>=10'}
- dependencies:
- ansi-escapes: 4.3.2
- cli-cursor: 3.1.0
- slice-ansi: 4.0.0
- wrap-ansi: 6.2.0
- dev: true
-
/log-update@5.0.1:
resolution: {integrity: sha512-5UtUDQ/6edw4ofyljDNcOVJQ4c7OjDro4h3y8e1GQL5iYElYclVHJ3zeWchylvMaKnDbDilC8irOVyexnA/Slw==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
resolution: {integrity: sha512-/5CMN3T0R4XTj4DcGaexo+roZSdSFW/0AOOTROrjxzCG1wrWXEsGbRKevjlIL+ZDE4sZlJr5ED4YW0yqmkK+eA==}
dependencies:
hosted-git-info: 2.8.9
- resolve: 1.22.5
+ resolve: 1.22.6
semver: 7.5.4
validate-npm-package-license: 3.0.4
dev: true
resolution: {integrity: sha512-HFM8rkZ+i3zrV+4LQjwQ0W+ez98pApMGM3HUrN04j3CqzPOzl9nmP15Y8YXNm8QHGv/eacOVEjqhmWpkRV0NAw==}
engines: {node: '>= 0.10'}
dependencies:
- resolve: 1.22.5
+ resolve: 1.22.6
dev: true
/redent@3.0.0:
path-parse: 1.0.7
dev: true
- /resolve@1.22.5:
- resolution: {integrity: sha512-qWhv7PF1V95QPvRoUGHxOtnAlEvlXBylMZcjUR9pAumMmveFtcHJRXGIr+TkjfNJVQypqv2qcDiiars2y1PsSg==}
+ /resolve@1.22.6:
+ resolution: {integrity: sha512-njhxM7mV12JfufShqGy3Rz8j11RPdLy4xi15UurGJeoHLfJpVXKdh3ueuOqbYUcDZnffr6X739JBo5LzyahEsw==}
hasBin: true
dependencies:
is-core-module: 2.13.0
engines: {node: '>=12'}
dev: true
- /slice-ansi@4.0.0:
- resolution: {integrity: sha512-qMCMfhY040cVHT43K9BFygqYbUPFZKHOg7K73mtTWJRb8pyP3fzf4Ixd5SzdEJQ6MRUg/WBnOLxghZtKKurENQ==}
- engines: {node: '>=10'}
- dependencies:
- ansi-styles: 4.3.0
- astral-regex: 2.0.0
- is-fullwidth-code-point: 3.0.0
- dev: true
-
/slice-ansi@5.0.0:
resolution: {integrity: sha512-FC+lgizVPfie0kkhqUScwRu1O/lF6NOgJmlCgK+/LYxDCTk8sGelYaHDhFcDN+Sn3Cv+3VSa4Byeo+IMCzpMgQ==}
engines: {node: '>=12'}
typescript: 5.2.2
dev: true
- /ts-node@10.9.1(@types/node@20.6.1)(typescript@5.2.2):
+ /ts-node@10.9.1(@types/node@20.6.2)(typescript@5.2.2):
resolution: {integrity: sha512-NtVysVPkxxrwFGUUxGYhfux8k78pQB3JqYBXlLRZgdGUqTO5wU/UyHop5p70iEbGhB7q5KmiZiU0Y3KlJrScEw==}
hasBin: true
peerDependencies:
'@tsconfig/node12': 1.0.11
'@tsconfig/node14': 1.0.3
'@tsconfig/node16': 1.0.4
- '@types/node': 20.6.1
+ '@types/node': 20.6.2
acorn: 8.10.0
acorn-walk: 8.2.0
arg: 4.1.3
*/
private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
- /**
- * Whether the pool is starting or not.
- */
- private readonly starting: boolean
/**
* Whether the pool is started or not.
*/
private started: boolean
+ /**
+ * Whether the pool is starting or not.
+ */
+ private starting: boolean
/**
* The start timestamp of the pool.
*/
this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
- this.starting = true
- this.startPool()
+ this.started = false
this.starting = false
- this.started = true
+ if (this.opts.startWorkers === true) {
+ this.start()
+ }
this.startTimestamp = performance.now()
}
private checkPoolOptions (opts: PoolOptions<Worker>): void {
if (isPlainObject(opts)) {
+ this.opts.startWorkers = opts.startWorkers ?? true
this.opts.workerChoiceStrategy =
opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
`Invalid worker node tasks concurrency: ${tasksQueueOptions?.concurrency} is a negative integer or zero`
)
}
- if (tasksQueueOptions?.queueMaxSize != null) {
- throw new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- }
if (
tasksQueueOptions?.size != null &&
!Number.isSafeInteger(tasksQueueOptions?.size)
}
}
- private startPool (): void {
- while (
- this.workerNodes.reduce(
- (accumulator, workerNode) =>
- !workerNode.info.dynamic ? accumulator + 1 : accumulator,
- 0
- ) < this.numberOfWorkers
- ) {
- this.createAndSetupWorkerNode()
- }
- }
-
/** @inheritDoc */
public get info (): PoolInfo {
return {
version,
type: this.type,
worker: this.worker,
+ started: this.started,
ready: this.ready,
strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
minSize: this.minSize,
return {
...{
size: Math.pow(this.maxSize, 2),
- concurrency: 1
+ concurrency: 1,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
},
...tasksQueueOptions
}
): Promise<Response> {
return await new Promise<Response>((resolve, reject) => {
if (!this.started) {
- reject(new Error('Cannot execute a task on destroyed pool'))
+ reject(new Error('Cannot execute a task on not started pool'))
return
}
if (name != null && typeof name !== 'string') {
})
}
+ /** @inheritdoc */
+ public start (): void {
+ this.starting = true
+ while (
+ this.workerNodes.reduce(
+ (accumulator, workerNode) =>
+ !workerNode.info.dynamic ? accumulator + 1 : accumulator,
+ 0
+ ) < this.numberOfWorkers
+ ) {
+ this.createAndSetupWorkerNode()
+ }
+ this.starting = false
+ this.started = true
+ }
+
/** @inheritDoc */
public async destroy (): Promise<void> {
await Promise.all(
// Send the statistics message to worker.
this.sendStatisticsMessageToWorker(workerNodeKey)
if (this.opts.enableTasksQueue === true) {
- this.workerNodes[workerNodeKey].onEmptyQueue =
- this.taskStealingOnEmptyQueue.bind(this)
- this.workerNodes[workerNodeKey].onBackPressure =
- this.tasksStealingOnBackPressure.bind(this)
+ if (this.opts.tasksQueueOptions?.taskStealing === true) {
+ this.workerNodes[workerNodeKey].onEmptyQueue =
+ this.taskStealingOnEmptyQueue.bind(this)
+ }
+ if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
+ this.workerNodes[workerNodeKey].onBackPressure =
+ this.tasksStealingOnBackPressure.bind(this)
+ }
}
}
readonly version: string
readonly type: PoolType
readonly worker: WorkerType
+ readonly started: boolean
readonly ready: boolean
readonly strategy: WorkerChoiceStrategy
readonly minSize: number
* @defaultValue (pool maximum size)^2
*/
readonly size?: number
- /**
- * @deprecated Use `size` instead.
- */
- readonly queueMaxSize?: number
/**
* Maximum number of tasks that can be executed concurrently on a worker node.
*
* @defaultValue 1
*/
readonly concurrency?: number
+ /**
+ * Whether to enable task stealing.
+ *
+ * @defaultValue true
+ */
+ readonly taskStealing?: boolean
+ /**
+ * Whether to enable tasks stealing on back pressure.
+ *
+ * @defaultValue true
+ */
+ readonly tasksStealingOnBackPressure?: boolean
}
/**
* A function that will listen for exit event on each worker.
*/
exitHandler?: ExitHandler<Worker>
+ /**
+ * Whether to start the minimum number of workers at pool initialization.
+ *
+ * @defaultValue false
+ */
+ startWorkers?: boolean
/**
* The worker choice strategy to use in this pool.
*
name?: string,
transferList?: TransferListItem[]
) => Promise<Response>
+ /**
+ * Starts the minimum number of workers in this pool.
+ */
+ readonly start: () => void
/**
* Terminates all workers in this pool.
*/
/** @inheritdoc */
public onEmptyQueue?: WorkerNodeEventCallback
private readonly tasksQueue: Deque<Task<Data>>
+ private onBackPressureStarted: boolean
private onEmptyQueueCount: number
private readonly taskFunctionsUsage: Map<string, WorkerUsage>
}
this.tasksQueueBackPressureSize = tasksQueueBackPressureSize
this.tasksQueue = new Deque<Task<Data>>()
+ this.onBackPressureStarted = false
this.onEmptyQueueCount = 0
this.taskFunctionsUsage = new Map<string, WorkerUsage>()
}
/** @inheritdoc */
public enqueueTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.push(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public unshiftTask (task: Task<Data>): number {
const tasksQueueSize = this.tasksQueue.unshift(task)
- if (this.onBackPressure != null && this.hasBackPressure()) {
+ if (
+ this.onBackPressure != null &&
+ this.hasBackPressure() &&
+ !this.onBackPressureStarted
+ ) {
+ this.onBackPressureStarted = true
this.onBackPressure(this.info.id as number)
+ this.onBackPressureStarted = false
}
return tasksQueueSize
}
/** @inheritdoc */
public dequeueTask (): Task<Data> | undefined {
const task = this.tasksQueue.shift()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
/** @inheritdoc */
public popTask (): Task<Data> | undefined {
const task = this.tasksQueue.pop()
- if (this.onEmptyQueue != null && this.tasksQueue.size === 0) {
+ if (
+ this.onEmptyQueue != null &&
+ this.tasksQueue.size === 0 &&
+ this.onEmptyQueueCount === 0
+ ) {
this.startOnEmptyQueue().catch(EMPTY_FUNCTION)
}
return task
this.onEmptyQueueCount = 0
return
}
- (this.onEmptyQueue as WorkerNodeEventCallback)(this.info.id as number)
++this.onEmptyQueueCount
+ this.onEmptyQueue?.(this.info.id as number)
await sleep(exponentialDelay(this.onEmptyQueueCount))
await this.startOnEmptyQueue()
}
const { DEFAULT_TASK_NAME } = require('../../../lib/utils')
const { version } = require('../../../package.json')
const { waitPoolEvents } = require('../../test-utils')
+const { WorkerNode } = require('../../../lib/pools/worker-node')
describe('Abstract pool test suite', () => {
const numberOfWorkers = 2
'./tests/worker-files/thread/testWorker.js'
)
expect(pool.emitter).toBeInstanceOf(EventEmitter)
- expect(pool.opts.enableEvents).toBe(true)
- expect(pool.opts.restartWorkerOnError).toBe(true)
- expect(pool.opts.enableTasksQueue).toBe(false)
- expect(pool.opts.tasksQueueOptions).toBeUndefined()
- expect(pool.opts.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.ROUND_ROBIN
- )
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: false },
- waitTime: { median: false },
- elu: { median: false }
+ expect(pool.opts).toStrictEqual({
+ startWorkers: true,
+ enableEvents: true,
+ restartWorkerOnError: true,
+ enableTasksQueue: false,
+ workerChoiceStrategy: WorkerChoiceStrategies.ROUND_ROBIN,
+ workerChoiceStrategyOptions: {
+ retries: 6,
+ runTime: { median: false },
+ waitTime: { median: false },
+ elu: { median: false }
+ }
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
retries: 6,
elu: { median: false }
})
}
- expect(pool.opts.messageHandler).toBeUndefined()
- expect(pool.opts.errorHandler).toBeUndefined()
- expect(pool.opts.onlineHandler).toBeUndefined()
- expect(pool.opts.exitHandler).toBeUndefined()
await pool.destroy()
const testHandler = () => console.info('test handler executed')
pool = new FixedThreadPool(
}
)
expect(pool.emitter).toBeUndefined()
- expect(pool.opts.enableEvents).toBe(false)
- expect(pool.opts.restartWorkerOnError).toBe(false)
- expect(pool.opts.enableTasksQueue).toBe(true)
- expect(pool.opts.tasksQueueOptions).toStrictEqual({
- concurrency: 2,
- size: 4
- })
- expect(pool.opts.workerChoiceStrategy).toBe(
- WorkerChoiceStrategies.LEAST_USED
- )
- expect(pool.opts.workerChoiceStrategyOptions).toStrictEqual({
- retries: 6,
- runTime: { median: true },
- waitTime: { median: false },
- elu: { median: false },
- weights: { 0: 300, 1: 200 }
+ expect(pool.opts).toStrictEqual({
+ startWorkers: true,
+ enableEvents: false,
+ restartWorkerOnError: false,
+ enableTasksQueue: true,
+ tasksQueueOptions: {
+ concurrency: 2,
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
+ },
+ workerChoiceStrategy: WorkerChoiceStrategies.LEAST_USED,
+ workerChoiceStrategyOptions: {
+ retries: 6,
+ runTime: { median: true },
+ waitTime: { median: false },
+ elu: { median: false },
+ weights: { 0: 300, 1: 200 }
+ },
+ onlineHandler: testHandler,
+ messageHandler: testHandler,
+ errorHandler: testHandler,
+ exitHandler: testHandler
})
expect(pool.workerChoiceStrategyContext.opts).toStrictEqual({
retries: 6,
weights: { 0: 300, 1: 200 }
})
}
- expect(pool.opts.messageHandler).toStrictEqual(testHandler)
- expect(pool.opts.errorHandler).toStrictEqual(testHandler)
- expect(pool.opts.onlineHandler).toStrictEqual(testHandler)
- expect(pool.opts.exitHandler).toStrictEqual(testHandler)
await pool.destroy()
})
).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(
- () =>
- new FixedThreadPool(
- numberOfWorkers,
- './tests/worker-files/thread/testWorker.js',
- {
- enableTasksQueue: true,
- tasksQueueOptions: { queueMaxSize: 2 }
- }
- )
- ).toThrowError(
- new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- )
expect(
() =>
new FixedThreadPool(
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.enableTasksQueue(true, { concurrency: 2 })
expect(pool.opts.enableTasksQueue).toBe(true)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.enableTasksQueue(false)
expect(pool.opts.enableTasksQueue).toBe(false)
)
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 1,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
pool.setTasksQueueOptions({ concurrency: 2 })
expect(pool.opts.tasksQueueOptions).toStrictEqual({
concurrency: 2,
- size: 4
+ size: 4,
+ taskStealing: true,
+ tasksStealingOnBackPressure: true
})
expect(() =>
pool.setTasksQueueOptions('invalidTasksQueueOptions')
expect(() => pool.setTasksQueueOptions({ concurrency: 0.2 })).toThrowError(
new TypeError('Invalid worker node tasks concurrency: must be an integer')
)
- expect(() => pool.setTasksQueueOptions({ queueMaxSize: 2 })).toThrowError(
- new Error(
- 'Invalid tasks queue options: queueMaxSize is deprecated, please use size instead'
- )
- )
expect(() => pool.setTasksQueueOptions({ size: 0 })).toThrowError(
new RangeError(
'Invalid worker node tasks queue size: 0 is a negative integer or zero'
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: numberOfWorkers,
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: Math.floor(numberOfWorkers / 2),
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.usage).toStrictEqual({
tasks: {
executed: 0,
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
'./tests/worker-files/thread/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
- expect(workerNode.tasksQueue).toBeDefined()
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.tasksQueue).toBeInstanceOf(Deque)
expect(workerNode.tasksQueue.size).toBe(0)
expect(workerNode.tasksQueue.maxSize).toBe(0)
'./tests/worker-files/cluster/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.info).toStrictEqual({
id: expect.any(Number),
type: WorkerTypes.cluster,
'./tests/worker-files/thread/testWorker.js'
)
for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
expect(workerNode.info).toStrictEqual({
id: expect.any(Number),
type: WorkerTypes.thread,
await pool.destroy()
})
+ it('Verify that pool can be started after initialization', async () => {
+ const pool = new FixedClusterPool(
+ numberOfWorkers,
+ './tests/worker-files/cluster/testWorker.js',
+ {
+ startWorkers: false
+ }
+ )
+ expect(pool.info.started).toBe(false)
+ expect(pool.info.ready).toBe(false)
+ expect(pool.workerNodes).toStrictEqual([])
+ await expect(pool.execute()).rejects.toThrowError(
+ new Error('Cannot execute a task on not started pool')
+ )
+ pool.start()
+ expect(pool.info.started).toBe(true)
+ expect(pool.info.ready).toBe(true)
+ expect(pool.workerNodes.length).toBe(numberOfWorkers)
+ for (const workerNode of pool.workerNodes) {
+ expect(workerNode).toBeInstanceOf(WorkerNode)
+ }
+ await pool.destroy()
+ })
+
it('Verify that pool execute() arguments are checked', async () => {
const pool = new FixedClusterPool(
numberOfWorkers,
"Task function 'unknown' not found"
)
await pool.destroy()
- await expect(pool.execute(undefined, undefined, {})).rejects.toThrowError(
- new Error('Cannot execute a task on destroyed pool')
+ await expect(pool.execute()).rejects.toThrowError(
+ new Error('Cannot execute a task on not started pool')
)
})
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.cluster,
+ started: true,
ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
version,
type: PoolTypes.dynamic,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),
version,
type: PoolTypes.fixed,
worker: WorkerTypes.thread,
- ready: expect.any(Boolean),
+ started: true,
+ ready: true,
strategy: WorkerChoiceStrategies.ROUND_ROBIN,
minSize: expect.any(Number),
maxSize: expect.any(Number),