From: Alessandro Pio Ardizio Date: Mon, 22 Feb 2021 09:38:23 +0000 (+0100) Subject: Benchmarks and performance enhancements (#209) X-Git-Tag: v2.0.0-beta.7~2 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=be0676b3936d75f22ce55b0f71a1fb03d008a01c;p=poolifier.git Benchmarks and performance enhancements (#209) * Benchmarks and performance enhancements * Removed general Function type * Remove unknownn work around * Clean * Clean up old benchs, exclude benchmark folder from eslint * Clean up * Revert "Clean up" This reverts commit 31271d7f75392f04490a135a86b07e9480375505. * Revert "Clean up old benchs, exclude benchmark folder from eslint" This reverts commit 9467624c5713768cd44a545a48454a836d1af1b4. * Clean up for real * Clean up and dts for benchmarks * Run npm lint * Clean up benchmark eslint warnings and errors * Exclude no missing require in some bench files * Exclude no missing require in some bench files * Remove sonar code smells * Don't be overjoyed in readme * Improve interface definition * Use published poolifier * Rename * Ignore lint for benchmarks/versus-external-pools * Remove eslint-disable from benchmark files * Enhancement performance is now applied to both cluster and thread pools * Improve documentation * Unregister worker listener is no more needed * Update README.md * Improve readme * Add content to readme * Update benchmarks/README.md Co-authored-by: Shinigami92 --- diff --git a/.eslintignore b/.eslintignore index c3af8579..ea0e7a29 100644 --- a/.eslintignore +++ b/.eslintignore @@ -1 +1,2 @@ +benchmarks/versus-external-pools/ lib/ diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 00000000..907d8649 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,32 @@ +# Poolifier Benchmarks + +Welcome to poolifier benchmarks and thanks to look into this project. + +## Folder Structure + +The internal folder contains poolifier internal benchmarks. +The versus-external-pools folder contains benchmarks versus other Node.js pools. + +## Poolifier vs other pools benchmark + +To compare poolifier pools performance vs other pools performance we chose to use [hyperfine](https://github.com/sharkdp/hyperfine). +We chose to use this tool because it allows to run isolated Node.js processes so that each pool does not impact each other. + +We will add more details on each function that we benchmark. + +Those are our results: + +- CPU Intensive task with 100k operations submitted to each pool [BENCH-100000.MD](./versus-external-pools/BENCH-100000.MD) + +## How to run benchmarks + +### Internal + +To run the internal benchmark you just need to navigate to the root of poolifier project and run `npm run benchmark` + +## Versus other pools + +To run the benchmark versus other pools you will need to: + +- [Install hyperfine](https://github.com/sharkdp/hyperfine#installation) +- Run the `./bench.sh` into the `versus-external-pools` folder diff --git a/benchmarks/external/workerThreadsWorker.js b/benchmarks/external/workerThreadsWorker.js deleted file mode 100644 index 717f3338..00000000 --- a/benchmarks/external/workerThreadsWorker.js +++ /dev/null @@ -1,12 +0,0 @@ -const { isMainThread, parentPort } = require('worker_threads') - -if (!isMainThread) { - for (let i = 0; i <= 1000; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - // console.log('This is the main thread ' + isMainThread) - parentPort.postMessage({ ok: 1 }) -} diff --git a/benchmarks/external/workerpoolWorker.js b/benchmarks/external/workerpoolWorker.js deleted file mode 100644 index c5302573..00000000 --- a/benchmarks/external/workerpoolWorker.js +++ /dev/null @@ -1,16 +0,0 @@ -const workerpool = require('workerpool') - -function yourFunction (data) { - for (let i = 0; i <= 1000; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - // console.log('This is the main thread ' + isMainThread) - return { ok: 1 } -} - -workerpool.worker({ - yourFunction: yourFunction -}) diff --git a/benchmarks/bench.js b/benchmarks/internal/bench.js similarity index 100% rename from benchmarks/bench.js rename to benchmarks/internal/bench.js diff --git a/benchmarks/choose-worker.js b/benchmarks/internal/choose-worker.js similarity index 100% rename from benchmarks/choose-worker.js rename to benchmarks/internal/choose-worker.js diff --git a/benchmarks/cluster/dynamic.js b/benchmarks/internal/cluster/dynamic.js similarity index 85% rename from benchmarks/cluster/dynamic.js rename to benchmarks/internal/cluster/dynamic.js index 9321054d..468eee3b 100644 --- a/benchmarks/cluster/dynamic.js +++ b/benchmarks/internal/cluster/dynamic.js @@ -1,11 +1,11 @@ -const { DynamicClusterPool } = require('../../lib/index') +const { DynamicClusterPool } = require('../../../lib/index') const size = 30 const dynamicPool = new DynamicClusterPool( size / 2, size * 3, - './benchmarks/cluster/worker.js', + './benchmarks/internal/cluster/worker.js', { maxTasks: 10000 } diff --git a/benchmarks/cluster/fixed.js b/benchmarks/internal/cluster/fixed.js similarity index 75% rename from benchmarks/cluster/fixed.js rename to benchmarks/internal/cluster/fixed.js index c8fe7132..f2b80c71 100644 --- a/benchmarks/cluster/fixed.js +++ b/benchmarks/internal/cluster/fixed.js @@ -1,10 +1,14 @@ -const { FixedClusterPool } = require('../../lib/index') +const { FixedClusterPool } = require('../../../lib/index') const size = 30 -const fixedPool = new FixedClusterPool(size, './benchmarks/cluster/worker.js', { - maxTasks: 10000 -}) +const fixedPool = new FixedClusterPool( + size, + './benchmarks/internal/cluster/worker.js', + { + maxTasks: 10000 + } +) async function fixedClusterTest ( { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } } diff --git a/benchmarks/cluster/worker.js b/benchmarks/internal/cluster/worker.js similarity index 82% rename from benchmarks/cluster/worker.js rename to benchmarks/internal/cluster/worker.js index 82408e69..c7211b7b 100644 --- a/benchmarks/cluster/worker.js +++ b/benchmarks/internal/cluster/worker.js @@ -1,5 +1,5 @@ 'use strict' -const { ClusterWorker } = require('../../lib/index') +const { ClusterWorker } = require('../../../lib/index') function yourFunction (data) { for (let i = 0; i <= 1000; i++) { diff --git a/benchmarks/thread/dynamic.js b/benchmarks/internal/thread/dynamic.js similarity index 91% rename from benchmarks/thread/dynamic.js rename to benchmarks/internal/thread/dynamic.js index ad980009..dde75474 100644 --- a/benchmarks/thread/dynamic.js +++ b/benchmarks/internal/thread/dynamic.js @@ -1,4 +1,4 @@ -const { DynamicThreadPool } = require('../../lib/index') +const { DynamicThreadPool } = require('../../../lib/index') const size = 30 diff --git a/benchmarks/thread/fixed.js b/benchmarks/internal/thread/fixed.js similarity index 91% rename from benchmarks/thread/fixed.js rename to benchmarks/internal/thread/fixed.js index b5966828..8db41acb 100644 --- a/benchmarks/thread/fixed.js +++ b/benchmarks/internal/thread/fixed.js @@ -1,4 +1,4 @@ -const { FixedThreadPool } = require('../../lib/index') +const { FixedThreadPool } = require('../../../lib/index') const size = 30 diff --git a/benchmarks/thread/worker.js b/benchmarks/internal/thread/worker.js similarity index 83% rename from benchmarks/thread/worker.js rename to benchmarks/internal/thread/worker.js index 7272a485..2ec5f4c2 100644 --- a/benchmarks/thread/worker.js +++ b/benchmarks/internal/thread/worker.js @@ -1,5 +1,5 @@ 'use strict' -const { ThreadWorker } = require('../../lib/index') +const { ThreadWorker } = require('../../../lib/index') function yourFunction (data) { for (let i = 0; i <= 1000; i++) { diff --git a/benchmarks/myBench.js b/benchmarks/myBench.js deleted file mode 100644 index a69de002..00000000 --- a/benchmarks/myBench.js +++ /dev/null @@ -1,133 +0,0 @@ -const { FixedThreadPool } = require('../lib/index') -const { DynamicThreadPool } = require('../lib/index') -const WorkerThreadsPool = require('worker-threads-pool') -const workerpool = require('workerpool') -const tasks = 1000 -const size = 16 - -// pools -const workerThreadsPool = new WorkerThreadsPool({ max: size }) -const workerPool = workerpool.pool('./external/workerpoolWorker.js', { - minWorkers: size / 2, - maxWorkers: size * 3, - workerType: 'thread' -}) -const fixedPool = new FixedThreadPool(size, './thread/worker.js', { - maxTasks: 10000 -}) -const dynamicPool = new DynamicThreadPool( - size / 2, - size * 3, - './thread/worker.js', - { maxTasks: 10000 } -) - -// data -const workerData = { proof: 'ok' } - -// fixed pool proof -async function fixedTest () { - let executions = 0 - const time = Date.now() - for (let i = 0; i <= tasks; i++) { - fixedPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return console.log( - `Fixed pool take ${ - Date.now() - time - }ms to work on ${executions} tasks` - ) - } - return null - }) - .catch(err => console.error(err)) - } -} - -async function dynamicTest () { - let executions = 0 - const time = Date.now() - for (let i = 0; i <= tasks; i++) { - dynamicPool - .execute(workerData) - .then(res => { - executions++ - if (executions === tasks) { - return console.log( - `Dynamic pool take ${ - Date.now() - time - }ms to work on ${executions} tasks` - ) - } - return null - }) - .catch(err => console.error(err)) - } -} - -async function workerThreadsPoolTest () { - let executions = 0 - const time = Date.now() - for (let i = 0; i <= tasks; i++) { - new Promise((resolve, reject) => { - workerThreadsPool.acquire( - './external/workerThreadsWorker.js', - { workerData: workerData }, - (err, worker) => { - if (err) { - return reject(err) - } - worker.on('error', reject) - worker.on('message', res => { - executions++ - resolve(res) - }) - } - ) - }) - .then(res => { - if (tasks === executions) { - return console.log( - `worker threads pool take ${ - Date.now() - time - }ms to work on ${executions} tasks` - ) - } - return null - }) - .catch(err => console.error(err)) - } -} - -async function workerpoolTest () { - let executions = 0 - const time = Date.now() - for (let i = 0; i <= tasks; i++) { - workerPool - .exec('yourFunction', [workerData]) - .then(res => { - executions++ - if (tasks === executions) { - return console.log( - `workerpool take ${ - Date.now() - time - }ms to work on ${executions} tasks` - ) - } - return null - }) - .catch(err => console.error(err)) - } -} - -async function test () { - await fixedTest() - await dynamicTest() - await workerThreadsPoolTest() - await workerpoolTest() -} - -test() diff --git a/benchmarks/versus-external-pools/BENCH-100000.MD b/benchmarks/versus-external-pools/BENCH-100000.MD new file mode 100644 index 00000000..cd14d108 --- /dev/null +++ b/benchmarks/versus-external-pools/BENCH-100000.MD @@ -0,0 +1,5 @@ +| Command | Mean [s] | Min [s] | Max [s] | Relative | +|:---|---:|---:|---:|---:| +| `node static-suchmokuo-node-worker-threads-pool.js` | 47.473 ± 1.241 | 44.031 | 48.331 | 1.05 ± 0.03 | +| `node dynamic-poolifier.js` | 45.368 ± 0.173 | 45.028 | 45.595 | 1.00 | +| `node piscina.js` | 48.502 ± 0.390 | 47.896 | 49.362 | 1.07 ± 0.01 | diff --git a/benchmarks/versus-external-pools/README.md b/benchmarks/versus-external-pools/README.md new file mode 100644 index 00000000..02ba1a67 --- /dev/null +++ b/benchmarks/versus-external-pools/README.md @@ -0,0 +1,7 @@ +# Poolifier Benchmarks + +To run the benchmark versus other pools you will need to: + +- [Install hyperfine](https://github.com/sharkdp/hyperfine#installation) +- Run `npm install` +- Run the `./bench.sh` into the `versus-external-pools` folder diff --git a/benchmarks/versus-external-pools/bench.sh b/benchmarks/versus-external-pools/bench.sh new file mode 100755 index 00000000..05c156c9 --- /dev/null +++ b/benchmarks/versus-external-pools/bench.sh @@ -0,0 +1,37 @@ +export NODE_ENV=production + +# Execute bench +# export POOL_SIZE=8 +# export NUM_ITERATIONS=10000 +# hyperfine --export-markdown BENCH-10000.MD --min-runs 10 \ +# 'node dynamic-poolifier.js' \ +# 'node dynamic-suchmokuo-node-worker-threads-pool.js' \ +# 'node fixed-poolifier.js' \ +# 'node static-suchmokuo-node-worker-threads-pool.js' \ +# 'node piscina.js' + +# echo "Sleeping...." +# sleep 60 + +export POOL_SIZE=10 +export NUM_ITERATIONS=100000 +hyperfine --export-markdown BENCH-100000.MD --min-runs 10 \ + 'node static-suchmokuo-node-worker-threads-pool.js' \ + 'node dynamic-poolifier.js' \ + 'node piscina.js' + +# export POOL_SIZE=8 +# export NUM_ITERATIONS=50000 +# hyperfine --export-markdown BENCH-50000.MD --min-runs 10 \ +# 'node dynamic-poolifier.js' \ +# 'node dynamic-suchmokuo-node-worker-threads-pool.js' \ +# 'node fixed-poolifier.js' \ +# 'node static-suchmokuo-node-worker-threads-pool.js' \ +# 'node piscina.js' + +# export NUM_ITERATIONS=100000 +# hyperfine --export-markdown BENCH-50000.MD --min-runs 20 \ +# 'node dynamic-poolifier.js' \ +# 'node static-suchmokuo-node-worker-threads-pool.js' \ +# 'node piscina.js' + diff --git a/benchmarks/versus-external-pools/dynamic-poolifier.js b/benchmarks/versus-external-pools/dynamic-poolifier.js new file mode 100644 index 00000000..e4f5f132 --- /dev/null +++ b/benchmarks/versus-external-pools/dynamic-poolifier.js @@ -0,0 +1,28 @@ +// IMPORT LIBRARIES +const { FixedThreadPool, DynamicThreadPool } = require('poolifier') +// FINISH IMPORT LIBRARIES +const size = process.env.POOL_SIZE +const iterations = process.env.NUM_ITERATIONS +const data = { + test: 'MYBENCH' +} + +const dynamicPool = new DynamicThreadPool( + size, + size * 3, + './workers/poolifier/json-stringify.worker.js', + { + maxTasks: 10000 + } +) + +async function run () { + const promises = [] + for (let i = 0; i < iterations; i++) { + promises.push(dynamicPool.execute(data)) + } + await Promise.all(promises) + process.exit() +} + +run() diff --git a/benchmarks/versus-external-pools/dynamic-suchmokuo-node-worker-threads-pool.js b/benchmarks/versus-external-pools/dynamic-suchmokuo-node-worker-threads-pool.js new file mode 100644 index 00000000..8bf964d2 --- /dev/null +++ b/benchmarks/versus-external-pools/dynamic-suchmokuo-node-worker-threads-pool.js @@ -0,0 +1,29 @@ +// IMPORT LIBRARIES +const { DynamicPool, StaticPool } = require('node-worker-threads-pool') +// FINISH IMPORT LIBRARIES +// IMPORT FUNCTION TO BENCH +const functionToBench = require('./functions/json-stringify') +// FINISH IMPORT FUNCTION TO BENCH +const size = process.env.POOL_SIZE +const iterations = process.env.NUM_ITERATIONS +const data = { + test: 'MYBENCH' +} + +const pool = new DynamicPool(Number(size)) + +async function run () { + const promises = [] + for (let i = 0; i < iterations; i++) { + promises.push( + pool.exec({ + task: functionToBench, + param: data + }) + ) + } + await Promise.all(promises) + process.exit() +} + +run() diff --git a/benchmarks/versus-external-pools/fixed-poolifier.js b/benchmarks/versus-external-pools/fixed-poolifier.js new file mode 100644 index 00000000..b62cd593 --- /dev/null +++ b/benchmarks/versus-external-pools/fixed-poolifier.js @@ -0,0 +1,27 @@ +// IMPORT LIBRARIES +const { FixedThreadPool, DynamicThreadPool } = require('poolifier') +// FINISH IMPORT LIBRARIES +const size = process.env.POOL_SIZE +const iterations = process.env.NUM_ITERATIONS +const data = { + test: 'MYBENCH' +} + +const fixedPool = new FixedThreadPool( + size, + './workers/poolifier/json-stringify.worker.js', + { + maxTasks: 100000 + } +) + +async function run () { + const promises = [] + for (let i = 0; i < iterations; i++) { + promises.push(fixedPool.execute(data)) + } + await Promise.all(promises) + process.exit() +} + +run() diff --git a/benchmarks/versus-external-pools/functions/json-stringify.js b/benchmarks/versus-external-pools/functions/json-stringify.js new file mode 100644 index 00000000..4b009103 --- /dev/null +++ b/benchmarks/versus-external-pools/functions/json-stringify.js @@ -0,0 +1,10 @@ +module.exports = function (data) { + for (let i = 0; i <= 5000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + // console.log('STRINGIFY FUNCTION FINISHED') + return { ok: 1 } +} diff --git a/benchmarks/versus-external-pools/package-lock.json b/benchmarks/versus-external-pools/package-lock.json new file mode 100644 index 00000000..11c2250f --- /dev/null +++ b/benchmarks/versus-external-pools/package-lock.json @@ -0,0 +1,207 @@ +{ + "name": "poolifier-benchmarks", + "version": "1.0.0-internal", + "lockfileVersion": 1, + "requires": true, + "dependencies": { + "@assemblyscript/loader": { + "version": "0.10.1", + "resolved": "https://registry.npmjs.org/@assemblyscript/loader/-/loader-0.10.1.tgz", + "integrity": "sha512-H71nDOOL8Y7kWRLqf6Sums+01Q5msqBW2KhDUTemh1tvY04eSkSXrK0uj/4mmY0Xr16/3zyZmsrxN7CKuRbNRg==" + }, + "after-all": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz", + "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=", + "requires": { + "once": "^1.3.0" + } + }, + "base64-js": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", + "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==" + }, + "benchmark": { + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/benchmark/-/benchmark-2.1.4.tgz", + "integrity": "sha1-CfPeMckWQl1JjMLuVloOvzwqVik=", + "requires": { + "lodash": "^4.17.4", + "platform": "^1.3.3" + } + }, + "callsites": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", + "integrity": "sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ==" + }, + "debug": { + "version": "4.3.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz", + "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==", + "requires": { + "ms": "2.1.2" + } + }, + "esm": { + "version": "3.2.25", + "resolved": "https://registry.npmjs.org/esm/-/esm-3.2.25.tgz", + "integrity": "sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==", + "optional": true + }, + "eventemitter-asyncresource": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/eventemitter-asyncresource/-/eventemitter-asyncresource-1.0.0.tgz", + "integrity": "sha512-39F7TBIV0G7gTelxwbEqnwhp90eqCPON1k0NwNfwhgKn4Co4ybUbj2pECcXT0B3ztRKZ7Pw1JujUUgmQJHcVAQ==" + }, + "hdr-histogram-js": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/hdr-histogram-js/-/hdr-histogram-js-2.0.1.tgz", + "integrity": "sha512-uPZxl1dAFnjUFHWLZmt93vUUvtHeaBay9nVNHu38SdOjMSF/4KqJUqa1Seuj08ptU1rEb6AHvB41X8n/zFZ74Q==", + "requires": { + "@assemblyscript/loader": "^0.10.1", + "base64-js": "^1.2.0", + "pako": "^1.0.3" + } + }, + "hdr-histogram-percentiles-obj": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/hdr-histogram-percentiles-obj/-/hdr-histogram-percentiles-obj-3.0.0.tgz", + "integrity": "sha512-7kIufnBqdsBGcSZLPJwqHT3yhk1QTsSlFsVD3kx5ixH/AlgBs9yM1q6DPhXZ8f8gtdqgh7N7/5btRLpQsS2gHw==" + }, + "is-observable": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/is-observable/-/is-observable-1.1.0.tgz", + "integrity": "sha512-NqCa4Sa2d+u7BWc6CukaObG3Fh+CU9bvixbpcXYhy2VvYS7vVGIdAgnIS5Ks3A/cqk4rebLJ9s8zBstT2aKnIA==", + "requires": { + "symbol-observable": "^1.1.0" + } + }, + "lodash": { + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" + }, + "microjob": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/microjob/-/microjob-0.7.0.tgz", + "integrity": "sha512-ofDpqaVEv67ymCYdCE8eIVMCJY0H/hAtGxYmi1pXjtNkLOM0UUuUDshA1YLdVFwPwwt5+SW9EbBtSfsNokf+PA==" + }, + "ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" + }, + "nice-napi": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/nice-napi/-/nice-napi-1.0.2.tgz", + "integrity": "sha512-px/KnJAJZf5RuBGcfD+Sp2pAKq0ytz8j+1NehvgIGFkvtvFrDM3T8E4x/JJODXK9WZow8RRGrbA9QQ3hs+pDhA==", + "optional": true, + "requires": { + "node-addon-api": "^3.0.0", + "node-gyp-build": "^4.2.2" + } + }, + "node-addon-api": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-3.1.0.tgz", + "integrity": "sha512-flmrDNB06LIl5lywUz7YlNGZH/5p0M7W28k8hzd9Lshtdh1wshD2Y+U4h9LD6KObOy1f+fEVdgprPrEymjM5uw==", + "optional": true + }, + "node-gyp-build": { + "version": "4.2.3", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz", + "integrity": "sha512-MN6ZpzmfNCRM+3t57PTJHgHyw/h4OWnZ6mR8P5j/uZtqQr46RRuDE/P+g3n0YR/AiYXeWixZZzaip77gdICfRg==", + "optional": true + }, + "node-worker-threads-pool": { + "version": "1.4.3", + "resolved": "https://registry.npmjs.org/node-worker-threads-pool/-/node-worker-threads-pool-1.4.3.tgz", + "integrity": "sha512-US55ZGzEDQY2oq8Bc33dFVNKGpx4KaCJqThMDomSsUeX8tMdp2eDjQ6OP0yFd1HTEuHuLqxXSTWC4eidEsbXlg==" + }, + "observable-fns": { + "version": "0.5.1", + "resolved": "https://registry.npmjs.org/observable-fns/-/observable-fns-0.5.1.tgz", + "integrity": "sha512-wf7g4Jpo1Wt2KIqZKLGeiuLOEMqpaOZ5gJn7DmSdqXgTdxRwSdBhWegQQpPteQ2gZvzCKqNNpwb853wcpA0j7A==" + }, + "once": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", + "requires": { + "wrappy": "1" + } + }, + "pako": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz", + "integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw==" + }, + "piscina": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/piscina/-/piscina-2.1.0.tgz", + "integrity": "sha512-3FgX36QyZcU4prKuNKl7/lWlOF3HAv9n7JpCjw09Zbql2KkzXXQ7E5xUS+RV5wV24Rn0r6Lr8jLdtU/cNZHAnA==", + "requires": { + "eventemitter-asyncresource": "^1.0.0", + "hdr-histogram-js": "^2.0.1", + "hdr-histogram-percentiles-obj": "^3.0.0", + "nice-napi": "^1.0.2" + } + }, + "platform": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/platform/-/platform-1.3.6.tgz", + "integrity": "sha512-fnWVljUchTro6RiCFvCXBbNhJc2NijN7oIQxbwsyL0buWJPG85v81ehlHI9fXrJsMNgTofEoWIQeClKpgxFLrg==" + }, + "poolifier": { + "version": "2.0.0-beta.6", + "resolved": "https://registry.npmjs.org/poolifier/-/poolifier-2.0.0-beta.6.tgz", + "integrity": "sha512-n+IumaVITBY1/UD4gC4e6uXHlBIgy2+AKeX4BmhEP4phIM6DCYBYBRuVk5lk8LlXKiO8HFrQ3WXHzCpubqWyqA==" + }, + "symbol-observable": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/symbol-observable/-/symbol-observable-1.2.0.tgz", + "integrity": "sha512-e900nM8RRtGhlV36KGEU9k65K3mPb1WV70OdjfxlG2EAuM1noi/E/BaW/uMhL7bPEssK8QV57vN3esixjUvcXQ==" + }, + "threads": { + "version": "1.6.3", + "resolved": "https://registry.npmjs.org/threads/-/threads-1.6.3.tgz", + "integrity": "sha512-tKwFIWRgfAT85KGkrpDt2jWPO8IVH0sLNfB/pXad/VW9eUIY2Zlz+QyeizypXhPHv9IHfqRzvk2t3mPw+imhWw==", + "requires": { + "callsites": "^3.1.0", + "debug": "^4.1.1", + "is-observable": "^1.1.0", + "observable-fns": "^0.5.1", + "tiny-worker": ">= 2" + } + }, + "tiny-worker": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/tiny-worker/-/tiny-worker-2.3.0.tgz", + "integrity": "sha512-pJ70wq5EAqTAEl9IkGzA+fN0836rycEuz2Cn6yeZ6FRzlVS5IDOkFHpIoEsksPRQV34GDqXm65+OlnZqUSyK2g==", + "optional": true, + "requires": { + "esm": "^3.2.25" + } + }, + "worker-threads-pool": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz", + "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==", + "requires": { + "after-all": "^2.0.2" + } + }, + "workerpool": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.1.0.tgz", + "integrity": "sha512-toV7q9rWNYha963Pl/qyeZ6wG+3nnsyvolaNUS8+R5Wtw6qJPTxIlOP1ZSvcGhEJw+l3HMMmtiNo9Gl61G4GVg==" + }, + "wrappy": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=" + } + } +} diff --git a/benchmarks/versus-external-pools/package.json b/benchmarks/versus-external-pools/package.json new file mode 100644 index 00000000..596bc840 --- /dev/null +++ b/benchmarks/versus-external-pools/package.json @@ -0,0 +1,21 @@ +{ + "name": "poolifier-benchmarks", + "version": "1.0.0-internal", + "description": "This project contains benchmarks for the most used and popular Node.js thread/cluster pools implementations", + "private": true, + "main": "index.js", + "author": "pioardi", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "dependencies": { + "benchmark": "^2.1.4", + "microjob": "0.7.0", + "node-worker-threads-pool": "1.4.3", + "piscina": "2.1.0", + "poolifier": "2.0.0-beta.6", + "threads": "1.6.3", + "worker-threads-pool": "2.0.0", + "workerpool": "6.1.0" + } +} diff --git a/benchmarks/versus-external-pools/piscina.js b/benchmarks/versus-external-pools/piscina.js new file mode 100644 index 00000000..297ea681 --- /dev/null +++ b/benchmarks/versus-external-pools/piscina.js @@ -0,0 +1,26 @@ +// IMPORT LIBRARIES +const Piscina = require('piscina') +// FINISH IMPORT LIBRARIES +const size = process.env.POOL_SIZE +const iterations = process.env.NUM_ITERATIONS +const data = { + test: 'MYBENCH' +} + +const piscina = new Piscina({ + filename: './workers/piscina/json-stringify.worker.js', + minThreads: Number(size), + maxThreads: size * 3, + idleTimeout: 1000 * 60 // this is the same as poolifier default +}) + +async function run () { + const promises = [] + for (let i = 0; i < iterations; i++) { + promises.push(piscina.runTask(data)) + } + await Promise.all(promises) + process.exit() +} + +run() diff --git a/benchmarks/versus-external-pools/static-suchmokuo-node-worker-threads-pool.js b/benchmarks/versus-external-pools/static-suchmokuo-node-worker-threads-pool.js new file mode 100644 index 00000000..df7edd81 --- /dev/null +++ b/benchmarks/versus-external-pools/static-suchmokuo-node-worker-threads-pool.js @@ -0,0 +1,27 @@ +// IMPORT LIBRARIES +const { DynamicPool, StaticPool } = require('node-worker-threads-pool') +// FINISH IMPORT LIBRARIES +// IMPORT FUNCTION TO BENCH +const functionToBench = require('./functions/json-stringify') +// FINISH IMPORT FUNCTION TO BENCH +const size = process.env.POOL_SIZE +const iterations = process.env.NUM_ITERATIONS +const data = { + test: 'MYBENCH' +} + +const pool = new StaticPool({ + size: Number(size), + task: functionToBench +}) + +async function run () { + const promises = [] + for (let i = 0; i < iterations; i++) { + promises.push(pool.exec(data)) + } + await Promise.all(promises) + process.exit() +} + +run() diff --git a/benchmarks/versus-external-pools/workers/piscina/json-stringify.worker.js b/benchmarks/versus-external-pools/workers/piscina/json-stringify.worker.js new file mode 100644 index 00000000..154e04a7 --- /dev/null +++ b/benchmarks/versus-external-pools/workers/piscina/json-stringify.worker.js @@ -0,0 +1,3 @@ +'use strict' +const jsonStringify = require('../../functions/json-stringify') +module.exports = jsonStringify diff --git a/benchmarks/versus-external-pools/workers/poolifier/json-stringify.worker.js b/benchmarks/versus-external-pools/workers/poolifier/json-stringify.worker.js new file mode 100644 index 00000000..73d08855 --- /dev/null +++ b/benchmarks/versus-external-pools/workers/poolifier/json-stringify.worker.js @@ -0,0 +1,4 @@ +'use strict' +const { ThreadWorker } = require('poolifier') +const jsonStringify = require('../../functions/json-stringify') +module.exports = new ThreadWorker(jsonStringify) diff --git a/package-lock.json b/package-lock.json index 24bd1309..2729f787 100644 --- a/package-lock.json +++ b/package-lock.json @@ -718,15 +718,6 @@ "integrity": "sha512-K0Ptm/47OKfQRpNQ2J/oIN/3QYiK6FwW+eJbILhsdxh2WTLdl+30o8aGdTbm5JbffpFFAg/g+zi1E+jvJha5ng==", "dev": true }, - "after-all": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz", - "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=", - "dev": true, - "requires": { - "once": "^1.3.0" - } - }, "aggregate-error": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.1.0.tgz", @@ -4779,15 +4770,6 @@ "integrity": "sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus=", "dev": true }, - "worker-threads-pool": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz", - "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==", - "dev": true, - "requires": { - "after-all": "^2.0.2" - } - }, "workerpool": { "version": "6.1.0", "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.1.0.tgz", diff --git a/package.json b/package.json index 1cb4e9d7..0cc9f891 100644 --- a/package.json +++ b/package.json @@ -6,9 +6,9 @@ "scripts": { "build": "rollup --config --environment BUILD:development", "build:prod": "rollup --config", - "benchmark": "npm run build && node -r source-map-support/register benchmarks/bench.js", - "benchmark:debug": "npm run build && node -r source-map-support/register --inspect benchmarks/bench.js", - "benchmark:prod": "npm run build:prod && node -r source-map-support/register benchmarks/bench.js", + "benchmark": "npm run build && node -r source-map-support/register benchmarks/internal/bench.js", + "benchmark:debug": "npm run build && node -r source-map-support/register --inspect benchmarks/internal/bench.js", + "benchmark:prod": "npm run build:prod && node -r source-map-support/register benchmarks/internal/bench.js", "test": "npm run build && nyc mocha --parallel 'tests/**/*.test.js'", "test:debug": "npm run build && mocha --inspect 'tests/**/*.test.js'", "test:prod": "npm run build:prod && nyc mocha --parallel 'tests/**/*.test.js'", @@ -87,9 +87,7 @@ "rollup-plugin-typescript2": "^0.30.0", "sonar-scanner": "^3.1.0", "source-map-support": "^0.5.19", - "typescript": "^4.1.5", - "worker-threads-pool": "^2.0.0", - "workerpool": "^6.1.0" + "typescript": "^4.1.5" }, "engines": { "node": ">=12.11.0", diff --git a/src/pools/abstract-pool.ts b/src/pools/abstract-pool.ts index 7944606d..c123849e 100644 --- a/src/pools/abstract-pool.ts +++ b/src/pools/abstract-pool.ts @@ -1,4 +1,7 @@ -import type { MessageValue } from '../utility-types' +import type { + MessageValue, + PromiseWorkerResponseWrapper +} from '../utility-types' import type { IPoolInternal } from './pool-internal' import { PoolEmitter } from './pool-internal' import type { WorkerChoiceStrategy } from './selection-strategies' @@ -106,6 +109,19 @@ export abstract class AbstractPool< Data = unknown, Response = unknown > implements IPoolInternal { + /** + * The promise map. + * + * - `key`: This is the message ID of each submitted task. + * - `value`: An object that contains the worker, the resolve function and the reject function. + * + * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message. + */ + protected promiseMap: Map< + number, + PromiseWorkerResponseWrapper + > = new Map>() + /** @inheritdoc */ public readonly workers: Worker[] = [] @@ -284,24 +300,12 @@ export abstract class AbstractPool< Message extends Data | Response > (worker: Worker, listener: (message: MessageValue) => void): void - protected abstract unregisterWorkerMessageListener< - Message extends Data | Response - > (worker: Worker, listener: (message: MessageValue) => void): void - protected internalExecute ( worker: Worker, messageId: number ): Promise { - return new Promise((resolve, reject) => { - const listener: (message: MessageValue) => void = message => { - if (message.id === messageId) { - this.unregisterWorkerMessageListener(worker, listener) - this.decreaseWorkersTasks(worker) - if (message.error) reject(message.error) - else resolve(message.data as Response) - } - } - this.registerWorkerMessageListener(worker, listener) + return new Promise((resolve, reject) => { + this.promiseMap.set(messageId, { resolve, reject, worker }) }) } @@ -337,4 +341,24 @@ export abstract class AbstractPool< return worker } + + /** + * This function is the listener registered for each worker. + * + * @returns The listener function to execute when a message is sent from a worker. + */ + protected workerListener (): (message: MessageValue) => void { + const listener: (message: MessageValue) => void = message => { + if (message.id) { + const value = this.promiseMap.get(message.id) + if (value) { + this.decreaseWorkersTasks(value.worker) + if (message.error) value.reject(message.error) + else value.resolve(message.data as Response) + this.promiseMap.delete(message.id) + } + } + } + return listener + } } diff --git a/src/pools/cluster/fixed.ts b/src/pools/cluster/fixed.ts index 6246320e..4ec81c07 100644 --- a/src/pools/cluster/fixed.ts +++ b/src/pools/cluster/fixed.ts @@ -76,13 +76,6 @@ export class FixedClusterPool< worker.on('message', listener) } - protected unregisterWorkerMessageListener ( - worker: Worker, - listener: (message: MessageValue) => void - ): void { - worker.removeListener('message', listener) - } - protected createWorker (): Worker { return fork(this.opts.env) } @@ -91,5 +84,6 @@ export class FixedClusterPool< // We will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size worker.setMaxListeners(this.opts.maxTasks ?? 1000) + this.registerWorkerMessageListener(worker, super.workerListener()) } } diff --git a/src/pools/thread/fixed.ts b/src/pools/thread/fixed.ts index 021de492..1c91caa7 100644 --- a/src/pools/thread/fixed.ts +++ b/src/pools/thread/fixed.ts @@ -67,13 +67,6 @@ export class FixedThreadPool< messageChannel.port2?.on('message', listener) } - protected unregisterWorkerMessageListener ( - messageChannel: ThreadWorkerWithMessageChannel, - listener: (message: MessageValue) => void - ): void { - messageChannel.port2?.removeListener('message', listener) - } - protected createWorker (): ThreadWorkerWithMessageChannel { return new Worker(this.filePath, { env: SHARE_ENV @@ -86,7 +79,8 @@ export class FixedThreadPool< worker.port1 = port1 worker.port2 = port2 // We will attach a listener for every task, - // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size + // when the task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size. worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000) + this.registerWorkerMessageListener(worker, super.workerListener()) } } diff --git a/src/utility-types.ts b/src/utility-types.ts index 2403b8a2..9441fec4 100644 --- a/src/utility-types.ts +++ b/src/utility-types.ts @@ -1,5 +1,6 @@ import type { Worker } from 'cluster' import type { MessagePort } from 'worker_threads' +import type { IWorker } from './pools/abstract-pool' import type { KillBehavior } from './worker/worker-options' /** @@ -37,3 +38,27 @@ export interface MessageValue< */ readonly parent?: MainWorker } + +/** + * An object holding the worker that will be used to resolve/rejects the promise later on. + * + * @template Worker Type of worker. + * @template Response Type of response of execution. This can only be serializable data. + */ +export interface PromiseWorkerResponseWrapper< + Worker extends IWorker, + Response = unknown +> { + /** + * Resolve callback to fulfill the promise. + */ + readonly resolve: (value: Response) => void + /** + * Reject callback to reject the promise. + */ + readonly reject: (reason?: string) => void + /** + * The worker that has the assigned task. + */ + readonly worker: Worker +}