Benchmarks and performance enhancements (#209)
authorAlessandro Pio Ardizio <alessandroardizio94@gmail.com>
Mon, 22 Feb 2021 09:38:23 +0000 (10:38 +0100)
committerGitHub <noreply@github.com>
Mon, 22 Feb 2021 09:38:23 +0000 (10:38 +0100)
* Benchmarks and performance enhancements

* Removed general Function type

* Remove unknownn work around

* Clean

* Clean up old benchs, exclude benchmark folder from eslint

* Clean up

* Revert "Clean up"

This reverts commit 31271d7f75392f04490a135a86b07e9480375505.

* Revert "Clean up old benchs, exclude benchmark folder from eslint"

This reverts commit 9467624c5713768cd44a545a48454a836d1af1b4.

* Clean up for real

* Clean up and dts for benchmarks

* Run npm lint

* Clean up benchmark eslint warnings and errors

* Exclude no missing require in some bench files

* Exclude no missing require in some bench files

* Remove sonar code smells

* Don't be overjoyed in readme

* Improve interface definition

* Use published poolifier

* Rename

* Ignore lint for benchmarks/versus-external-pools

* Remove eslint-disable from benchmark files

* Enhancement performance is now applied to both cluster and thread pools

* Improve documentation

* Unregister worker listener is no more needed

* Update README.md

* Improve readme

* Add content to readme

* Update benchmarks/README.md

Co-authored-by: Shinigami92 <chrissi92@hotmail.de>
32 files changed:
.eslintignore
benchmarks/README.md [new file with mode: 0644]
benchmarks/external/workerThreadsWorker.js [deleted file]
benchmarks/external/workerpoolWorker.js [deleted file]
benchmarks/internal/bench.js [moved from benchmarks/bench.js with 100% similarity]
benchmarks/internal/choose-worker.js [moved from benchmarks/choose-worker.js with 100% similarity]
benchmarks/internal/cluster/dynamic.js [moved from benchmarks/cluster/dynamic.js with 85% similarity]
benchmarks/internal/cluster/fixed.js [moved from benchmarks/cluster/fixed.js with 75% similarity]
benchmarks/internal/cluster/worker.js [moved from benchmarks/cluster/worker.js with 82% similarity]
benchmarks/internal/thread/dynamic.js [moved from benchmarks/thread/dynamic.js with 91% similarity]
benchmarks/internal/thread/fixed.js [moved from benchmarks/thread/fixed.js with 91% similarity]
benchmarks/internal/thread/worker.js [moved from benchmarks/thread/worker.js with 83% similarity]
benchmarks/myBench.js [deleted file]
benchmarks/versus-external-pools/BENCH-100000.MD [new file with mode: 0644]
benchmarks/versus-external-pools/README.md [new file with mode: 0644]
benchmarks/versus-external-pools/bench.sh [new file with mode: 0755]
benchmarks/versus-external-pools/dynamic-poolifier.js [new file with mode: 0644]
benchmarks/versus-external-pools/dynamic-suchmokuo-node-worker-threads-pool.js [new file with mode: 0644]
benchmarks/versus-external-pools/fixed-poolifier.js [new file with mode: 0644]
benchmarks/versus-external-pools/functions/json-stringify.js [new file with mode: 0644]
benchmarks/versus-external-pools/package-lock.json [new file with mode: 0644]
benchmarks/versus-external-pools/package.json [new file with mode: 0644]
benchmarks/versus-external-pools/piscina.js [new file with mode: 0644]
benchmarks/versus-external-pools/static-suchmokuo-node-worker-threads-pool.js [new file with mode: 0644]
benchmarks/versus-external-pools/workers/piscina/json-stringify.worker.js [new file with mode: 0644]
benchmarks/versus-external-pools/workers/poolifier/json-stringify.worker.js [new file with mode: 0644]
package-lock.json
package.json
src/pools/abstract-pool.ts
src/pools/cluster/fixed.ts
src/pools/thread/fixed.ts
src/utility-types.ts

index c3af857904ebfd987602da36b219a186a84e4e7a..ea0e7a293e7f73ed20c3bb798111596536316fb7 100644 (file)
@@ -1 +1,2 @@
+benchmarks/versus-external-pools/
 lib/
diff --git a/benchmarks/README.md b/benchmarks/README.md
new file mode 100644 (file)
index 0000000..907d864
--- /dev/null
@@ -0,0 +1,32 @@
+# Poolifier Benchmarks
+
+Welcome to poolifier benchmarks and thanks to look into this project.
+
+## Folder Structure
+
+The internal folder contains poolifier internal benchmarks.
+The versus-external-pools folder contains benchmarks versus other Node.js pools.
+
+## Poolifier vs other pools benchmark
+
+To compare poolifier pools performance vs other pools performance we chose to use [hyperfine](https://github.com/sharkdp/hyperfine).
+We chose to use this tool because it allows to run isolated Node.js processes so that each pool does not impact each other.
+
+We will add more details on each function that we benchmark.
+
+Those are our results:
+
+- CPU Intensive task with 100k operations submitted to each pool [BENCH-100000.MD](./versus-external-pools/BENCH-100000.MD)
+
+## How to run benchmarks
+
+### Internal
+
+To run the internal benchmark you just need to navigate to the root of poolifier project and run `npm run benchmark`
+
+## Versus other pools
+
+To run the benchmark versus other pools you will need to:
+
+- [Install hyperfine](https://github.com/sharkdp/hyperfine#installation)
+- Run the `./bench.sh` into the `versus-external-pools` folder
diff --git a/benchmarks/external/workerThreadsWorker.js b/benchmarks/external/workerThreadsWorker.js
deleted file mode 100644 (file)
index 717f333..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-const { isMainThread, parentPort } = require('worker_threads')
-
-if (!isMainThread) {
-  for (let i = 0; i <= 1000; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
-  // console.log('This is the main thread ' + isMainThread)
-  parentPort.postMessage({ ok: 1 })
-}
diff --git a/benchmarks/external/workerpoolWorker.js b/benchmarks/external/workerpoolWorker.js
deleted file mode 100644 (file)
index c530257..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-const workerpool = require('workerpool')
-
-function yourFunction (data) {
-  for (let i = 0; i <= 1000; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
-  // console.log('This is the main thread ' + isMainThread)
-  return { ok: 1 }
-}
-
-workerpool.worker({
-  yourFunction: yourFunction
-})
similarity index 85%
rename from benchmarks/cluster/dynamic.js
rename to benchmarks/internal/cluster/dynamic.js
index 9321054d1d2e79cb2b71e05c731682dfc90f4a55..468eee3b68db72e0d67671aeaf493d2e2b50e5ef 100644 (file)
@@ -1,11 +1,11 @@
-const { DynamicClusterPool } = require('../../lib/index')
+const { DynamicClusterPool } = require('../../../lib/index')
 
 const size = 30
 
 const dynamicPool = new DynamicClusterPool(
   size / 2,
   size * 3,
-  './benchmarks/cluster/worker.js',
+  './benchmarks/internal/cluster/worker.js',
   {
     maxTasks: 10000
   }
similarity index 75%
rename from benchmarks/cluster/fixed.js
rename to benchmarks/internal/cluster/fixed.js
index c8fe7132a60acd24d94e178517c9756cd0511955..f2b80c7198d71872e6ad858c7db661b1894ee46b 100644 (file)
@@ -1,10 +1,14 @@
-const { FixedClusterPool } = require('../../lib/index')
+const { FixedClusterPool } = require('../../../lib/index')
 
 const size = 30
 
-const fixedPool = new FixedClusterPool(size, './benchmarks/cluster/worker.js', {
-  maxTasks: 10000
-})
+const fixedPool = new FixedClusterPool(
+  size,
+  './benchmarks/internal/cluster/worker.js',
+  {
+    maxTasks: 10000
+  }
+)
 
 async function fixedClusterTest (
   { tasks, workerData } = { tasks: 1, workerData: { proof: 'ok' } }
similarity index 82%
rename from benchmarks/cluster/worker.js
rename to benchmarks/internal/cluster/worker.js
index 82408e69bfb8d6eb1090953412b5470f9e8496f0..c7211b7bfd2b9fe5883059ef9bba8771d614381d 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ClusterWorker } = require('../../lib/index')
+const { ClusterWorker } = require('../../../lib/index')
 
 function yourFunction (data) {
   for (let i = 0; i <= 1000; i++) {
similarity index 91%
rename from benchmarks/thread/dynamic.js
rename to benchmarks/internal/thread/dynamic.js
index ad9800094d531a0d51c1e19251c9fd577f667232..dde75474daccf296dfab673737e02c5082cc787c 100644 (file)
@@ -1,4 +1,4 @@
-const { DynamicThreadPool } = require('../../lib/index')
+const { DynamicThreadPool } = require('../../../lib/index')
 
 const size = 30
 
similarity index 91%
rename from benchmarks/thread/fixed.js
rename to benchmarks/internal/thread/fixed.js
index b59668282ea60c3d14fdd4649dafae3cef3f1d5e..8db41acbde4f40cb38104aaedc009d87c3b43111 100644 (file)
@@ -1,4 +1,4 @@
-const { FixedThreadPool } = require('../../lib/index')
+const { FixedThreadPool } = require('../../../lib/index')
 
 const size = 30
 
similarity index 83%
rename from benchmarks/thread/worker.js
rename to benchmarks/internal/thread/worker.js
index 7272a485897bee3699936c13e2e23950c470f53c..2ec5f4c2a54ff25550029e28e6ad491c61934bb3 100644 (file)
@@ -1,5 +1,5 @@
 'use strict'
-const { ThreadWorker } = require('../../lib/index')
+const { ThreadWorker } = require('../../../lib/index')
 
 function yourFunction (data) {
   for (let i = 0; i <= 1000; i++) {
diff --git a/benchmarks/myBench.js b/benchmarks/myBench.js
deleted file mode 100644 (file)
index a69de00..0000000
+++ /dev/null
@@ -1,133 +0,0 @@
-const { FixedThreadPool } = require('../lib/index')
-const { DynamicThreadPool } = require('../lib/index')
-const WorkerThreadsPool = require('worker-threads-pool')
-const workerpool = require('workerpool')
-const tasks = 1000
-const size = 16
-
-// pools
-const workerThreadsPool = new WorkerThreadsPool({ max: size })
-const workerPool = workerpool.pool('./external/workerpoolWorker.js', {
-  minWorkers: size / 2,
-  maxWorkers: size * 3,
-  workerType: 'thread'
-})
-const fixedPool = new FixedThreadPool(size, './thread/worker.js', {
-  maxTasks: 10000
-})
-const dynamicPool = new DynamicThreadPool(
-  size / 2,
-  size * 3,
-  './thread/worker.js',
-  { maxTasks: 10000 }
-)
-
-// data
-const workerData = { proof: 'ok' }
-
-// fixed pool proof
-async function fixedTest () {
-  let executions = 0
-  const time = Date.now()
-  for (let i = 0; i <= tasks; i++) {
-    fixedPool
-      .execute(workerData)
-      .then(res => {
-        executions++
-        if (executions === tasks) {
-          return console.log(
-            `Fixed pool take ${
-              Date.now() - time
-            }ms to work on ${executions} tasks`
-          )
-        }
-        return null
-      })
-      .catch(err => console.error(err))
-  }
-}
-
-async function dynamicTest () {
-  let executions = 0
-  const time = Date.now()
-  for (let i = 0; i <= tasks; i++) {
-    dynamicPool
-      .execute(workerData)
-      .then(res => {
-        executions++
-        if (executions === tasks) {
-          return console.log(
-            `Dynamic pool take ${
-              Date.now() - time
-            }ms to work on ${executions} tasks`
-          )
-        }
-        return null
-      })
-      .catch(err => console.error(err))
-  }
-}
-
-async function workerThreadsPoolTest () {
-  let executions = 0
-  const time = Date.now()
-  for (let i = 0; i <= tasks; i++) {
-    new Promise((resolve, reject) => {
-      workerThreadsPool.acquire(
-        './external/workerThreadsWorker.js',
-        { workerData: workerData },
-        (err, worker) => {
-          if (err) {
-            return reject(err)
-          }
-          worker.on('error', reject)
-          worker.on('message', res => {
-            executions++
-            resolve(res)
-          })
-        }
-      )
-    })
-      .then(res => {
-        if (tasks === executions) {
-          return console.log(
-            `worker threads pool take ${
-              Date.now() - time
-            }ms to work on ${executions} tasks`
-          )
-        }
-        return null
-      })
-      .catch(err => console.error(err))
-  }
-}
-
-async function workerpoolTest () {
-  let executions = 0
-  const time = Date.now()
-  for (let i = 0; i <= tasks; i++) {
-    workerPool
-      .exec('yourFunction', [workerData])
-      .then(res => {
-        executions++
-        if (tasks === executions) {
-          return console.log(
-            `workerpool take ${
-              Date.now() - time
-            }ms to work on ${executions} tasks`
-          )
-        }
-        return null
-      })
-      .catch(err => console.error(err))
-  }
-}
-
-async function test () {
-  await fixedTest()
-  await dynamicTest()
-  await workerThreadsPoolTest()
-  await workerpoolTest()
-}
-
-test()
diff --git a/benchmarks/versus-external-pools/BENCH-100000.MD b/benchmarks/versus-external-pools/BENCH-100000.MD
new file mode 100644 (file)
index 0000000..cd14d10
--- /dev/null
@@ -0,0 +1,5 @@
+| Command | Mean [s] | Min [s] | Max [s] | Relative |
+|:---|---:|---:|---:|---:|
+| `node static-suchmokuo-node-worker-threads-pool.js` | 47.473 Â± 1.241 | 44.031 | 48.331 | 1.05 Â± 0.03 |
+| `node dynamic-poolifier.js` | 45.368 Â± 0.173 | 45.028 | 45.595 | 1.00 |
+| `node piscina.js` | 48.502 Â± 0.390 | 47.896 | 49.362 | 1.07 Â± 0.01 |
diff --git a/benchmarks/versus-external-pools/README.md b/benchmarks/versus-external-pools/README.md
new file mode 100644 (file)
index 0000000..02ba1a6
--- /dev/null
@@ -0,0 +1,7 @@
+# Poolifier Benchmarks
+
+To run the benchmark versus other pools you will need to:
+
+- [Install hyperfine](https://github.com/sharkdp/hyperfine#installation)
+- Run `npm install`
+- Run the `./bench.sh` into the `versus-external-pools` folder
diff --git a/benchmarks/versus-external-pools/bench.sh b/benchmarks/versus-external-pools/bench.sh
new file mode 100755 (executable)
index 0000000..05c156c
--- /dev/null
@@ -0,0 +1,37 @@
+export NODE_ENV=production
+
+# Execute bench
+# export POOL_SIZE=8
+# export NUM_ITERATIONS=10000
+# hyperfine --export-markdown BENCH-10000.MD --min-runs 10 \
+#   'node dynamic-poolifier.js' \
+#   'node dynamic-suchmokuo-node-worker-threads-pool.js' \
+#   'node fixed-poolifier.js' \
+#   'node static-suchmokuo-node-worker-threads-pool.js' \
+#   'node piscina.js'
+
+# echo "Sleeping...."
+# sleep 60
+
+export POOL_SIZE=10
+export NUM_ITERATIONS=100000
+hyperfine --export-markdown BENCH-100000.MD --min-runs 10 \
+  'node static-suchmokuo-node-worker-threads-pool.js' \
+  'node dynamic-poolifier.js' \
+  'node piscina.js'
+
+# export POOL_SIZE=8
+# export NUM_ITERATIONS=50000
+# hyperfine --export-markdown BENCH-50000.MD --min-runs 10 \
+#   'node dynamic-poolifier.js' \
+#   'node dynamic-suchmokuo-node-worker-threads-pool.js' \
+#   'node fixed-poolifier.js' \
+#   'node static-suchmokuo-node-worker-threads-pool.js' \
+#   'node piscina.js'
+
+# export NUM_ITERATIONS=100000
+#   hyperfine --export-markdown BENCH-50000.MD --min-runs 20 \
+#     'node dynamic-poolifier.js' \
+#     'node static-suchmokuo-node-worker-threads-pool.js' \
+#     'node piscina.js'
+
diff --git a/benchmarks/versus-external-pools/dynamic-poolifier.js b/benchmarks/versus-external-pools/dynamic-poolifier.js
new file mode 100644 (file)
index 0000000..e4f5f13
--- /dev/null
@@ -0,0 +1,28 @@
+// IMPORT LIBRARIES
+const { FixedThreadPool, DynamicThreadPool } = require('poolifier')
+// FINISH IMPORT LIBRARIES
+const size = process.env.POOL_SIZE
+const iterations = process.env.NUM_ITERATIONS
+const data = {
+  test: 'MYBENCH'
+}
+
+const dynamicPool = new DynamicThreadPool(
+  size,
+  size * 3,
+  './workers/poolifier/json-stringify.worker.js',
+  {
+    maxTasks: 10000
+  }
+)
+
+async function run () {
+  const promises = []
+  for (let i = 0; i < iterations; i++) {
+    promises.push(dynamicPool.execute(data))
+  }
+  await Promise.all(promises)
+  process.exit()
+}
+
+run()
diff --git a/benchmarks/versus-external-pools/dynamic-suchmokuo-node-worker-threads-pool.js b/benchmarks/versus-external-pools/dynamic-suchmokuo-node-worker-threads-pool.js
new file mode 100644 (file)
index 0000000..8bf964d
--- /dev/null
@@ -0,0 +1,29 @@
+// IMPORT LIBRARIES
+const { DynamicPool, StaticPool } = require('node-worker-threads-pool')
+// FINISH IMPORT LIBRARIES
+// IMPORT FUNCTION TO BENCH
+const functionToBench = require('./functions/json-stringify')
+// FINISH IMPORT FUNCTION TO BENCH
+const size = process.env.POOL_SIZE
+const iterations = process.env.NUM_ITERATIONS
+const data = {
+  test: 'MYBENCH'
+}
+
+const pool = new DynamicPool(Number(size))
+
+async function run () {
+  const promises = []
+  for (let i = 0; i < iterations; i++) {
+    promises.push(
+      pool.exec({
+        task: functionToBench,
+        param: data
+      })
+    )
+  }
+  await Promise.all(promises)
+  process.exit()
+}
+
+run()
diff --git a/benchmarks/versus-external-pools/fixed-poolifier.js b/benchmarks/versus-external-pools/fixed-poolifier.js
new file mode 100644 (file)
index 0000000..b62cd59
--- /dev/null
@@ -0,0 +1,27 @@
+// IMPORT LIBRARIES
+const { FixedThreadPool, DynamicThreadPool } = require('poolifier')
+// FINISH IMPORT LIBRARIES
+const size = process.env.POOL_SIZE
+const iterations = process.env.NUM_ITERATIONS
+const data = {
+  test: 'MYBENCH'
+}
+
+const fixedPool = new FixedThreadPool(
+  size,
+  './workers/poolifier/json-stringify.worker.js',
+  {
+    maxTasks: 100000
+  }
+)
+
+async function run () {
+  const promises = []
+  for (let i = 0; i < iterations; i++) {
+    promises.push(fixedPool.execute(data))
+  }
+  await Promise.all(promises)
+  process.exit()
+}
+
+run()
diff --git a/benchmarks/versus-external-pools/functions/json-stringify.js b/benchmarks/versus-external-pools/functions/json-stringify.js
new file mode 100644 (file)
index 0000000..4b00910
--- /dev/null
@@ -0,0 +1,10 @@
+module.exports = function (data) {
+  for (let i = 0; i <= 5000; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+  // console.log('STRINGIFY FUNCTION FINISHED')
+  return { ok: 1 }
+}
diff --git a/benchmarks/versus-external-pools/package-lock.json b/benchmarks/versus-external-pools/package-lock.json
new file mode 100644 (file)
index 0000000..11c2250
--- /dev/null
@@ -0,0 +1,207 @@
+{
+  "name": "poolifier-benchmarks",
+  "version": "1.0.0-internal",
+  "lockfileVersion": 1,
+  "requires": true,
+  "dependencies": {
+    "@assemblyscript/loader": {
+      "version": "0.10.1",
+      "resolved": "https://registry.npmjs.org/@assemblyscript/loader/-/loader-0.10.1.tgz",
+      "integrity": "sha512-H71nDOOL8Y7kWRLqf6Sums+01Q5msqBW2KhDUTemh1tvY04eSkSXrK0uj/4mmY0Xr16/3zyZmsrxN7CKuRbNRg=="
+    },
+    "after-all": {
+      "version": "2.0.2",
+      "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz",
+      "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=",
+      "requires": {
+        "once": "^1.3.0"
+      }
+    },
+    "base64-js": {
+      "version": "1.5.1",
+      "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz",
+      "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA=="
+    },
+    "benchmark": {
+      "version": "2.1.4",
+      "resolved": "https://registry.npmjs.org/benchmark/-/benchmark-2.1.4.tgz",
+      "integrity": "sha1-CfPeMckWQl1JjMLuVloOvzwqVik=",
+      "requires": {
+        "lodash": "^4.17.4",
+        "platform": "^1.3.3"
+      }
+    },
+    "callsites": {
+      "version": "3.1.0",
+      "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz",
+      "integrity": "sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ=="
+    },
+    "debug": {
+      "version": "4.3.1",
+      "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
+      "integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
+      "requires": {
+        "ms": "2.1.2"
+      }
+    },
+    "esm": {
+      "version": "3.2.25",
+      "resolved": "https://registry.npmjs.org/esm/-/esm-3.2.25.tgz",
+      "integrity": "sha512-U1suiZ2oDVWv4zPO56S0NcR5QriEahGtdN2OR6FiOG4WJvcjBVFB0qI4+eKoWFH483PKGuLuu6V8Z4T5g63UVA==",
+      "optional": true
+    },
+    "eventemitter-asyncresource": {
+      "version": "1.0.0",
+      "resolved": "https://registry.npmjs.org/eventemitter-asyncresource/-/eventemitter-asyncresource-1.0.0.tgz",
+      "integrity": "sha512-39F7TBIV0G7gTelxwbEqnwhp90eqCPON1k0NwNfwhgKn4Co4ybUbj2pECcXT0B3ztRKZ7Pw1JujUUgmQJHcVAQ=="
+    },
+    "hdr-histogram-js": {
+      "version": "2.0.1",
+      "resolved": "https://registry.npmjs.org/hdr-histogram-js/-/hdr-histogram-js-2.0.1.tgz",
+      "integrity": "sha512-uPZxl1dAFnjUFHWLZmt93vUUvtHeaBay9nVNHu38SdOjMSF/4KqJUqa1Seuj08ptU1rEb6AHvB41X8n/zFZ74Q==",
+      "requires": {
+        "@assemblyscript/loader": "^0.10.1",
+        "base64-js": "^1.2.0",
+        "pako": "^1.0.3"
+      }
+    },
+    "hdr-histogram-percentiles-obj": {
+      "version": "3.0.0",
+      "resolved": "https://registry.npmjs.org/hdr-histogram-percentiles-obj/-/hdr-histogram-percentiles-obj-3.0.0.tgz",
+      "integrity": "sha512-7kIufnBqdsBGcSZLPJwqHT3yhk1QTsSlFsVD3kx5ixH/AlgBs9yM1q6DPhXZ8f8gtdqgh7N7/5btRLpQsS2gHw=="
+    },
+    "is-observable": {
+      "version": "1.1.0",
+      "resolved": "https://registry.npmjs.org/is-observable/-/is-observable-1.1.0.tgz",
+      "integrity": "sha512-NqCa4Sa2d+u7BWc6CukaObG3Fh+CU9bvixbpcXYhy2VvYS7vVGIdAgnIS5Ks3A/cqk4rebLJ9s8zBstT2aKnIA==",
+      "requires": {
+        "symbol-observable": "^1.1.0"
+      }
+    },
+    "lodash": {
+      "version": "4.17.21",
+      "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
+      "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
+    },
+    "microjob": {
+      "version": "0.7.0",
+      "resolved": "https://registry.npmjs.org/microjob/-/microjob-0.7.0.tgz",
+      "integrity": "sha512-ofDpqaVEv67ymCYdCE8eIVMCJY0H/hAtGxYmi1pXjtNkLOM0UUuUDshA1YLdVFwPwwt5+SW9EbBtSfsNokf+PA=="
+    },
+    "ms": {
+      "version": "2.1.2",
+      "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
+      "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w=="
+    },
+    "nice-napi": {
+      "version": "1.0.2",
+      "resolved": "https://registry.npmjs.org/nice-napi/-/nice-napi-1.0.2.tgz",
+      "integrity": "sha512-px/KnJAJZf5RuBGcfD+Sp2pAKq0ytz8j+1NehvgIGFkvtvFrDM3T8E4x/JJODXK9WZow8RRGrbA9QQ3hs+pDhA==",
+      "optional": true,
+      "requires": {
+        "node-addon-api": "^3.0.0",
+        "node-gyp-build": "^4.2.2"
+      }
+    },
+    "node-addon-api": {
+      "version": "3.1.0",
+      "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-3.1.0.tgz",
+      "integrity": "sha512-flmrDNB06LIl5lywUz7YlNGZH/5p0M7W28k8hzd9Lshtdh1wshD2Y+U4h9LD6KObOy1f+fEVdgprPrEymjM5uw==",
+      "optional": true
+    },
+    "node-gyp-build": {
+      "version": "4.2.3",
+      "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.2.3.tgz",
+      "integrity": "sha512-MN6ZpzmfNCRM+3t57PTJHgHyw/h4OWnZ6mR8P5j/uZtqQr46RRuDE/P+g3n0YR/AiYXeWixZZzaip77gdICfRg==",
+      "optional": true
+    },
+    "node-worker-threads-pool": {
+      "version": "1.4.3",
+      "resolved": "https://registry.npmjs.org/node-worker-threads-pool/-/node-worker-threads-pool-1.4.3.tgz",
+      "integrity": "sha512-US55ZGzEDQY2oq8Bc33dFVNKGpx4KaCJqThMDomSsUeX8tMdp2eDjQ6OP0yFd1HTEuHuLqxXSTWC4eidEsbXlg=="
+    },
+    "observable-fns": {
+      "version": "0.5.1",
+      "resolved": "https://registry.npmjs.org/observable-fns/-/observable-fns-0.5.1.tgz",
+      "integrity": "sha512-wf7g4Jpo1Wt2KIqZKLGeiuLOEMqpaOZ5gJn7DmSdqXgTdxRwSdBhWegQQpPteQ2gZvzCKqNNpwb853wcpA0j7A=="
+    },
+    "once": {
+      "version": "1.4.0",
+      "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz",
+      "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=",
+      "requires": {
+        "wrappy": "1"
+      }
+    },
+    "pako": {
+      "version": "1.0.11",
+      "resolved": "https://registry.npmjs.org/pako/-/pako-1.0.11.tgz",
+      "integrity": "sha512-4hLB8Py4zZce5s4yd9XzopqwVv/yGNhV1Bl8NTmCq1763HeK2+EwVTv+leGeL13Dnh2wfbqowVPXCIO0z4taYw=="
+    },
+    "piscina": {
+      "version": "2.1.0",
+      "resolved": "https://registry.npmjs.org/piscina/-/piscina-2.1.0.tgz",
+      "integrity": "sha512-3FgX36QyZcU4prKuNKl7/lWlOF3HAv9n7JpCjw09Zbql2KkzXXQ7E5xUS+RV5wV24Rn0r6Lr8jLdtU/cNZHAnA==",
+      "requires": {
+        "eventemitter-asyncresource": "^1.0.0",
+        "hdr-histogram-js": "^2.0.1",
+        "hdr-histogram-percentiles-obj": "^3.0.0",
+        "nice-napi": "^1.0.2"
+      }
+    },
+    "platform": {
+      "version": "1.3.6",
+      "resolved": "https://registry.npmjs.org/platform/-/platform-1.3.6.tgz",
+      "integrity": "sha512-fnWVljUchTro6RiCFvCXBbNhJc2NijN7oIQxbwsyL0buWJPG85v81ehlHI9fXrJsMNgTofEoWIQeClKpgxFLrg=="
+    },
+    "poolifier": {
+      "version": "2.0.0-beta.6",
+      "resolved": "https://registry.npmjs.org/poolifier/-/poolifier-2.0.0-beta.6.tgz",
+      "integrity": "sha512-n+IumaVITBY1/UD4gC4e6uXHlBIgy2+AKeX4BmhEP4phIM6DCYBYBRuVk5lk8LlXKiO8HFrQ3WXHzCpubqWyqA=="
+    },
+    "symbol-observable": {
+      "version": "1.2.0",
+      "resolved": "https://registry.npmjs.org/symbol-observable/-/symbol-observable-1.2.0.tgz",
+      "integrity": "sha512-e900nM8RRtGhlV36KGEU9k65K3mPb1WV70OdjfxlG2EAuM1noi/E/BaW/uMhL7bPEssK8QV57vN3esixjUvcXQ=="
+    },
+    "threads": {
+      "version": "1.6.3",
+      "resolved": "https://registry.npmjs.org/threads/-/threads-1.6.3.tgz",
+      "integrity": "sha512-tKwFIWRgfAT85KGkrpDt2jWPO8IVH0sLNfB/pXad/VW9eUIY2Zlz+QyeizypXhPHv9IHfqRzvk2t3mPw+imhWw==",
+      "requires": {
+        "callsites": "^3.1.0",
+        "debug": "^4.1.1",
+        "is-observable": "^1.1.0",
+        "observable-fns": "^0.5.1",
+        "tiny-worker": ">= 2"
+      }
+    },
+    "tiny-worker": {
+      "version": "2.3.0",
+      "resolved": "https://registry.npmjs.org/tiny-worker/-/tiny-worker-2.3.0.tgz",
+      "integrity": "sha512-pJ70wq5EAqTAEl9IkGzA+fN0836rycEuz2Cn6yeZ6FRzlVS5IDOkFHpIoEsksPRQV34GDqXm65+OlnZqUSyK2g==",
+      "optional": true,
+      "requires": {
+        "esm": "^3.2.25"
+      }
+    },
+    "worker-threads-pool": {
+      "version": "2.0.0",
+      "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz",
+      "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==",
+      "requires": {
+        "after-all": "^2.0.2"
+      }
+    },
+    "workerpool": {
+      "version": "6.1.0",
+      "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.1.0.tgz",
+      "integrity": "sha512-toV7q9rWNYha963Pl/qyeZ6wG+3nnsyvolaNUS8+R5Wtw6qJPTxIlOP1ZSvcGhEJw+l3HMMmtiNo9Gl61G4GVg=="
+    },
+    "wrappy": {
+      "version": "1.0.2",
+      "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",
+      "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8="
+    }
+  }
+}
diff --git a/benchmarks/versus-external-pools/package.json b/benchmarks/versus-external-pools/package.json
new file mode 100644 (file)
index 0000000..596bc84
--- /dev/null
@@ -0,0 +1,21 @@
+{
+  "name": "poolifier-benchmarks",
+  "version": "1.0.0-internal",
+  "description": "This project contains benchmarks for the most used and popular Node.js thread/cluster pools implementations",
+  "private": true,
+  "main": "index.js",
+  "author": "pioardi",
+  "scripts": {
+    "test": "echo \"Error: no test specified\" && exit 1"
+  },
+  "dependencies": {
+    "benchmark": "^2.1.4",
+    "microjob": "0.7.0",
+    "node-worker-threads-pool": "1.4.3",
+    "piscina": "2.1.0",
+    "poolifier": "2.0.0-beta.6",
+    "threads": "1.6.3",
+    "worker-threads-pool": "2.0.0",
+    "workerpool": "6.1.0"
+  }
+}
diff --git a/benchmarks/versus-external-pools/piscina.js b/benchmarks/versus-external-pools/piscina.js
new file mode 100644 (file)
index 0000000..297ea68
--- /dev/null
@@ -0,0 +1,26 @@
+// IMPORT LIBRARIES
+const Piscina = require('piscina')
+// FINISH IMPORT LIBRARIES
+const size = process.env.POOL_SIZE
+const iterations = process.env.NUM_ITERATIONS
+const data = {
+  test: 'MYBENCH'
+}
+
+const piscina = new Piscina({
+  filename: './workers/piscina/json-stringify.worker.js',
+  minThreads: Number(size),
+  maxThreads: size * 3,
+  idleTimeout: 1000 * 60 // this is the same as poolifier default
+})
+
+async function run () {
+  const promises = []
+  for (let i = 0; i < iterations; i++) {
+    promises.push(piscina.runTask(data))
+  }
+  await Promise.all(promises)
+  process.exit()
+}
+
+run()
diff --git a/benchmarks/versus-external-pools/static-suchmokuo-node-worker-threads-pool.js b/benchmarks/versus-external-pools/static-suchmokuo-node-worker-threads-pool.js
new file mode 100644 (file)
index 0000000..df7edd8
--- /dev/null
@@ -0,0 +1,27 @@
+// IMPORT LIBRARIES
+const { DynamicPool, StaticPool } = require('node-worker-threads-pool')
+// FINISH IMPORT LIBRARIES
+// IMPORT FUNCTION TO BENCH
+const functionToBench = require('./functions/json-stringify')
+// FINISH IMPORT FUNCTION TO BENCH
+const size = process.env.POOL_SIZE
+const iterations = process.env.NUM_ITERATIONS
+const data = {
+  test: 'MYBENCH'
+}
+
+const pool = new StaticPool({
+  size: Number(size),
+  task: functionToBench
+})
+
+async function run () {
+  const promises = []
+  for (let i = 0; i < iterations; i++) {
+    promises.push(pool.exec(data))
+  }
+  await Promise.all(promises)
+  process.exit()
+}
+
+run()
diff --git a/benchmarks/versus-external-pools/workers/piscina/json-stringify.worker.js b/benchmarks/versus-external-pools/workers/piscina/json-stringify.worker.js
new file mode 100644 (file)
index 0000000..154e04a
--- /dev/null
@@ -0,0 +1,3 @@
+'use strict'
+const jsonStringify = require('../../functions/json-stringify')
+module.exports = jsonStringify
diff --git a/benchmarks/versus-external-pools/workers/poolifier/json-stringify.worker.js b/benchmarks/versus-external-pools/workers/poolifier/json-stringify.worker.js
new file mode 100644 (file)
index 0000000..73d0885
--- /dev/null
@@ -0,0 +1,4 @@
+'use strict'
+const { ThreadWorker } = require('poolifier')
+const jsonStringify = require('../../functions/json-stringify')
+module.exports = new ThreadWorker(jsonStringify)
index 24bd130963e23b09712bbd6fed9d61fb749277da..2729f7877903c03d7c317b1f0287d40b629c31bc 100644 (file)
       "integrity": "sha512-K0Ptm/47OKfQRpNQ2J/oIN/3QYiK6FwW+eJbILhsdxh2WTLdl+30o8aGdTbm5JbffpFFAg/g+zi1E+jvJha5ng==",
       "dev": true
     },
-    "after-all": {
-      "version": "2.0.2",
-      "resolved": "https://registry.npmjs.org/after-all/-/after-all-2.0.2.tgz",
-      "integrity": "sha1-IDACmO1glLTIXJjnyK1NymKPn3M=",
-      "dev": true,
-      "requires": {
-        "once": "^1.3.0"
-      }
-    },
     "aggregate-error": {
       "version": "3.1.0",
       "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.1.0.tgz",
       "integrity": "sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus=",
       "dev": true
     },
-    "worker-threads-pool": {
-      "version": "2.0.0",
-      "resolved": "https://registry.npmjs.org/worker-threads-pool/-/worker-threads-pool-2.0.0.tgz",
-      "integrity": "sha512-5dtGbEucee6o5/kQgpyKIUoHGWf8488DP3ihZDJzDIVvH4V+NA6HdBl/I5ckI4yN1NwM68pdZDbrwac1M95mEA==",
-      "dev": true,
-      "requires": {
-        "after-all": "^2.0.2"
-      }
-    },
     "workerpool": {
       "version": "6.1.0",
       "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.1.0.tgz",
index 1cb4e9d729a7db71da5b1e45514a46dbc73981f6..0cc9f89167ba1fc60d8c92ff7e2402b510f2c7d1 100644 (file)
@@ -6,9 +6,9 @@
   "scripts": {
     "build": "rollup --config --environment BUILD:development",
     "build:prod": "rollup --config",
-    "benchmark": "npm run build && node -r source-map-support/register benchmarks/bench.js",
-    "benchmark:debug": "npm run build && node -r source-map-support/register --inspect benchmarks/bench.js",
-    "benchmark:prod": "npm run build:prod && node -r source-map-support/register benchmarks/bench.js",
+    "benchmark": "npm run build && node -r source-map-support/register benchmarks/internal/bench.js",
+    "benchmark:debug": "npm run build && node -r source-map-support/register --inspect benchmarks/internal/bench.js",
+    "benchmark:prod": "npm run build:prod && node -r source-map-support/register benchmarks/internal/bench.js",
     "test": "npm run build && nyc mocha --parallel 'tests/**/*.test.js'",
     "test:debug": "npm run build && mocha --inspect 'tests/**/*.test.js'",
     "test:prod": "npm run build:prod && nyc mocha --parallel 'tests/**/*.test.js'",
@@ -87,9 +87,7 @@
     "rollup-plugin-typescript2": "^0.30.0",
     "sonar-scanner": "^3.1.0",
     "source-map-support": "^0.5.19",
-    "typescript": "^4.1.5",
-    "worker-threads-pool": "^2.0.0",
-    "workerpool": "^6.1.0"
+    "typescript": "^4.1.5"
   },
   "engines": {
     "node": ">=12.11.0",
index 7944606d9c84dbcc7dc32a9547c8a6e425b48e01..c123849e6bc2aff5844185be05303f9b15bd36d6 100644 (file)
@@ -1,4 +1,7 @@
-import type { MessageValue } from '../utility-types'
+import type {
+  MessageValue,
+  PromiseWorkerResponseWrapper
+} from '../utility-types'
 import type { IPoolInternal } from './pool-internal'
 import { PoolEmitter } from './pool-internal'
 import type { WorkerChoiceStrategy } from './selection-strategies'
@@ -106,6 +109,19 @@ export abstract class AbstractPool<
   Data = unknown,
   Response = unknown
 > implements IPoolInternal<Worker, Data, Response> {
+  /**
+   * The promise map.
+   *
+   * - `key`: This is the message ID of each submitted task.
+   * - `value`: An object that contains the worker, the resolve function and the reject function.
+   *
+   * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
+   */
+  protected promiseMap: Map<
+    number,
+    PromiseWorkerResponseWrapper<Worker, Response>
+  > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
+
   /** @inheritdoc */
   public readonly workers: Worker[] = []
 
@@ -284,24 +300,12 @@ export abstract class AbstractPool<
     Message extends Data | Response
   > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
 
-  protected abstract unregisterWorkerMessageListener<
-    Message extends Data | Response
-  > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
-
   protected internalExecute (
     worker: Worker,
     messageId: number
   ): Promise<Response> {
-    return new Promise((resolve, reject) => {
-      const listener: (message: MessageValue<Response>) => void = message => {
-        if (message.id === messageId) {
-          this.unregisterWorkerMessageListener(worker, listener)
-          this.decreaseWorkersTasks(worker)
-          if (message.error) reject(message.error)
-          else resolve(message.data as Response)
-        }
-      }
-      this.registerWorkerMessageListener(worker, listener)
+    return new Promise<Response>((resolve, reject) => {
+      this.promiseMap.set(messageId, { resolve, reject, worker })
     })
   }
 
@@ -337,4 +341,24 @@ export abstract class AbstractPool<
 
     return worker
   }
+
+  /**
+   * This function is the listener registered for each worker.
+   *
+   * @returns The listener function to execute when a message is sent from a worker.
+   */
+  protected workerListener (): (message: MessageValue<Response>) => void {
+    const listener: (message: MessageValue<Response>) => void = message => {
+      if (message.id) {
+        const value = this.promiseMap.get(message.id)
+        if (value) {
+          this.decreaseWorkersTasks(value.worker)
+          if (message.error) value.reject(message.error)
+          else value.resolve(message.data as Response)
+          this.promiseMap.delete(message.id)
+        }
+      }
+    }
+    return listener
+  }
 }
index 6246320e3072d18371646d431692abda06b85ea1..4ec81c07bab66b0ebad1cb3eed371ea568989ef0 100644 (file)
@@ -76,13 +76,6 @@ export class FixedClusterPool<
     worker.on('message', listener)
   }
 
-  protected unregisterWorkerMessageListener<Message extends Data | Response> (
-    worker: Worker,
-    listener: (message: MessageValue<Message>) => void
-  ): void {
-    worker.removeListener('message', listener)
-  }
-
   protected createWorker (): Worker {
     return fork(this.opts.env)
   }
@@ -91,5 +84,6 @@ export class FixedClusterPool<
     // We will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
     worker.setMaxListeners(this.opts.maxTasks ?? 1000)
+    this.registerWorkerMessageListener(worker, super.workerListener())
   }
 }
index 021de492540f50884a8e554d570f5194061212d0..1c91caa752702fefcccecb88074875ea4a27031c 100644 (file)
@@ -67,13 +67,6 @@ export class FixedThreadPool<
     messageChannel.port2?.on('message', listener)
   }
 
-  protected unregisterWorkerMessageListener<Message extends Data | Response> (
-    messageChannel: ThreadWorkerWithMessageChannel,
-    listener: (message: MessageValue<Message>) => void
-  ): void {
-    messageChannel.port2?.removeListener('message', listener)
-  }
-
   protected createWorker (): ThreadWorkerWithMessageChannel {
     return new Worker(this.filePath, {
       env: SHARE_ENV
@@ -86,7 +79,8 @@ export class FixedThreadPool<
     worker.port1 = port1
     worker.port2 = port2
     // We will attach a listener for every task,
-    // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
+    // when the task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size.
     worker.port2.setMaxListeners(this.opts.maxTasks ?? 1000)
+    this.registerWorkerMessageListener(worker, super.workerListener())
   }
 }
index 2403b8a257b0377e441a4dc2568b49571be4cdc2..9441fec4e420292905a23150d73580e73b3411cd 100644 (file)
@@ -1,5 +1,6 @@
 import type { Worker } from 'cluster'
 import type { MessagePort } from 'worker_threads'
+import type { IWorker } from './pools/abstract-pool'
 import type { KillBehavior } from './worker/worker-options'
 
 /**
@@ -37,3 +38,27 @@ export interface MessageValue<
    */
   readonly parent?: MainWorker
 }
+
+/**
+ * An object holding the worker that will be used to resolve/rejects the promise later on.
+ *
+ * @template Worker Type of worker.
+ * @template Response Type of response of execution. This can only be serializable data.
+ */
+export interface PromiseWorkerResponseWrapper<
+  Worker extends IWorker,
+  Response = unknown
+> {
+  /**
+   * Resolve callback to fulfill the promise.
+   */
+  readonly resolve: (value: Response) => void
+  /**
+   * Reject callback to reject the promise.
+   */
+  readonly reject: (reason?: string) => void
+  /**
+   * The worker that has the assigned task.
+   */
+  readonly worker: Worker
+}