Error handling and unit tests
authorpioardi <alessandroardizio94@gmail.com>
Wed, 22 Jan 2020 16:32:20 +0000 (17:32 +0100)
committerpioardi <alessandroardizio94@gmail.com>
Wed, 22 Jan 2020 16:32:20 +0000 (17:32 +0100)
19 files changed:
.github/CONTRIBUTING.MD
README.MD
benchmarks/bench.js
benchmarks/myBench.js
benchmarks/yourWorker.js
examples/multiFunctionExample.js [new file with mode: 0644]
examples/multifunctionWorker.js [new file with mode: 0644]
examples/normal.js [deleted file]
examples/yourWorker.js
lib/fixed.js
lib/workers.js
package.json
tests/dynamic.test.js
tests/fixed.test.js
tests/testWorker.js [deleted file]
tests/workers/echoWorker.js [new file with mode: 0644]
tests/workers/emptyWorker.js [new file with mode: 0644]
tests/workers/errorWorker.js [new file with mode: 0644]
tests/workers/testWorker.js [new file with mode: 0644]

index 0636dcaa2612e5b4c757da173ee77ace3a0b44b1..bf5bc794ee8b3259738ad6b152667180fdb098a3 100644 (file)
@@ -5,7 +5,7 @@ This repo use standard js style , please use it if you want to contribute <br>
 Take tasks from todo list, develop a new feature or fix a bug and do a pull request.<br>
 Another thing that you can do to contribute is to build something on top of ring-election and link ring-election to your project <br>
 
-Please ask your PR to be merged on <strong>develop</strong> branch . <br>
+Please ask your PR to be merged on <strong>master</strong> branch . <br>
 
 <strong>How to run tests</strong><br>
 
index ed090736128b05e73cb47a8a898f1b469ff2cf14..35080cbf7fa7032cbf20bb9d9e68e076b99e8b67 100644 (file)
--- a/README.MD
+++ b/README.MD
@@ -3,7 +3,9 @@
 [![Dependabot](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot)](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot)
 [![Actions Status](https://github.com/pioardi/node-pool/workflows/NodeCI/badge.svg)](https://github.com/pioardi/node-pool/actions)
 [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](http://makeapullrequest.com)
-
+[![NODEP](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen
+)](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen
+)
 
 <h2>Contents </h2>
 <h3 align="center">
@@ -25,7 +27,7 @@
 <h2> Overview </h2>
 Node pool contains two <a href="https://nodejs.org/api/worker_threads.html#worker_threads_worker_threads">worker-threads </a> pool implementations , you don' t have to deal with worker-threads complexity. <br>
 The first implementation is a static thread pool , with a defined number of threads that are started at creation time and will be reused.<br>
-The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit ), the new created threads will be stopped after a configurable period of inactivity. <br>
+The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit, these threads will be reused when active ), the new created threads will be stopped after a configurable period of inactivity. <br>
 You have to implement your worker extending the ThreadWorker class<br>
 <h2 id="installation">Installation</h2>
 
@@ -40,13 +42,15 @@ You can implement a worker in a simple way , extending the class ThreadWorker :
 'use strict'
 const { ThreadWorker } = require('node-pool')
 
+function yourFunction (data) {
+  // this will be executed in the worker thread,
+  // the data will be received by using the execute method
+  return { ok: 1 }
+}
+
 class MyWorker extends ThreadWorker {
   constructor () {
-    super((data) => {
-      // this will be executed in the worker thread,
-      // the data will be received by using the execute method
-      return { ok: 1 }
-    }, { maxInactiveTime: 1000 * 60})
+    super(yourFunction, { maxInactiveTime: 1000 * 60})
   }
 }
 module.exports = new MyWorker()
@@ -60,11 +64,14 @@ const { FixedThreadPool, DynamicThreadPool } = require('node-pool')
 
 // a fixed thread pool
 const pool = new FixedThreadPool(15,
-  './yourWorker.js')
+  './yourWorker.js',
+  { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
 
 // or a dynamic thread pool
 const pool = new DynamicThreadPool(10, 100,
-  './yourWorker.js')
+  './yourWorker.js',
+  { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
 pool.emitter.on('FullPool', () => console.log('Pool is full'))
 
 // the execute method signature is the same for both implementations,
@@ -75,7 +82,7 @@ pool.execute({}).then(res => {
 
 ```
 
-<strong> See examples folder for more details.</strong>
+<strong> See examples folder for more details ( in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js) ).</strong>
 
 <h2 id="nv">Node versions</h2>
 
@@ -113,12 +120,12 @@ This method will call the terminate method on each worker.
 - `maxInactiveTime` - Max time to wait tasks to work on ( in ms) , after this period the new worker threads will die.
 
 <h2 id="cyp">Choose your pool</h2>
-Performance is one of the main target of these thread pool implementation, we want to have a strong focus on this.<br>
+Performance is one of the main target of these thread pool implementations, we want to have a strong focus on this.<br>
 We already have a bench folder where you can find some comparisons.
 To choose your pool consider that with a FixedThreadPool or a DynamicThreadPool ( in this case is important the min parameter passed to the constructor) your application memory footprint will increase . <br>
-Increasing the memory footprint your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory. <br>
+Increasing the memory footprint, your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory. <br>
 One good choose from my point of view is to profile your application using Fixed/Dynamic thread pool , and to see your application metrics when you increase/decrease the num of threads. <br>
-For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when requests, this is the advantage to use the DynamicThreadPool. <br>
+For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when needed, this is the advantage to use the DynamicThreadPool. <br>
 But in general , <strong>always profile your application </strong>
 
 <h2 id="contribute">Contribute</h2>
index a9515bff30fdae1501ddce269c35cbb0683ae5b5..d7b2f1a36c322f29b5745bcd527ccd159113a903 100644 (file)
@@ -3,52 +3,90 @@ const suite = new Benchmark.Suite()
 const FixedThreadPool = require('../lib/fixed')
 const DynamicThreadPool = require('../lib/dynamic')
 const Pool = require('worker-threads-pool')
-const size = 40
-const externalPool = new Pool({ max: size })
+const size = 30
+const tasks = 1
 
+// pools
+const externalPool = new Pool({ max: size })
 const fixedPool = new FixedThreadPool(size,
   './yourWorker.js', { maxTasks: 10000 })
-const dynamicPool = new DynamicThreadPool(size, size * 2, './yourWorker.js', { maxTasks: 10000 })
+const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 })
 const workerData = { proof: 'ok' }
-let executions = 0
-let executions1 = 0
 
 // wait some seconds before start, my pools need to load threads !!!
 setTimeout(async () => {
   test()
 }, 3000)
 
-async function test () {
-  // add tests
-  suite.add('PioardiStaticPool', async function () {
-    executions++
-    await fixedPool.execute(workerData)
+// fixed pool proof
+async function fixedTest () {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      fixedPool.execute(workerData).then(res => {
+        executions++
+        if (executions === tasks) {
+          resolve('FINISH')
+        }
+      })
+    }
   })
+}
 
-    .add('ExternalPool', async function () {
-      await new Promise((resolve, reject) => {
+async function dynamicTest () {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      dynamicPool.execute(workerData).then(res => {
+        executions++
+        if (executions === tasks) {
+          resolve('FINISH')
+        }
+      })
+    }
+  })
+}
+
+async function externalPoolTest () {
+  return new Promise((resolve, reject) => {
+    let executions = 0
+    for (let i = 0; i <= tasks; i++) {
+      new Promise((resolve, reject) => {
         externalPool.acquire('./externalWorker.js', { workerData: workerData }, (err, worker) => {
           if (err) {
             return reject(err)
           }
-          executions1++
           worker.on('error', reject)
           worker.on('message', res => {
+            executions++
             resolve(res)
           })
         })
+      }).then(res => {
+        if (tasks === executions) {
+          resolve('FINISH')
+        }
       })
-    })
+    }
+  })
+}
+
+async function test () {
+  // add tests
+  suite.add('PioardiStaticPool', async function () {
+    await fixedTest()
+  })
     .add('PioardiDynamicPool', async function () {
-      await dynamicPool.execute(workerData)
+      await dynamicTest()
+    })
+    .add('ExternalPool', async function () {
+      await externalPoolTest()
     })
   // add listeners
     .on('cycle', function (event) {
       console.log(String(event.target))
     })
     .on('complete', function () {
-      console.log(executions)
-      console.log(executions1)
       this.filter('fastest').map('name')
       console.log('Fastest is ' + this.filter('fastest').map('name'))
     })
index 45f11b296fbaa96408bdfd6caa923f257226eda7..9c1d713fc6820f8e3cefaa64a5785b13d50ae8ec 100644 (file)
@@ -2,12 +2,12 @@ const FixedThreadPool = require('../lib/fixed')
 const DynamicThreadPool = require('../lib/dynamic')
 const Pool = require('worker-threads-pool')
 const tasks = 1000
-const size = 10
+const size = 16
 
 // pools
 const externalPool = new Pool({ max: size })
 const fixedPool = new FixedThreadPool(size, './yourWorker.js', { maxTasks: 10000 })
-const dynamicPool = new DynamicThreadPool(size / 2, 50, './yourWorker.js', { maxTasks: 10000 })
+const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 })
 
 // data
 const workerData = { proof: 'ok' }
index 9c250220e8a510d1a9beeb589bb9300298add6ee..701aa8cf4d59cf15cd7623b0b09f8cbb0cef8928 100644 (file)
@@ -1,18 +1,20 @@
 'use strict'
 const { ThreadWorker } = require('../lib/workers')
 
+function yourFunction (data) {
+  for (let i = 0; i <= 1000; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+  // console.log('This is the main thread ' + isMainThread)
+  return { ok: 1 }
+}
+
 class MyWorker extends ThreadWorker {
   constructor () {
-    super((data) => {
-      for (let i = 0; i <= 1000; i++) {
-        const o = {
-          a: i
-        }
-        JSON.stringify(o)
-      }
-      // console.log('This is the main thread ' + isMainThread)
-      return { ok: 1 }
-    })
+    super(yourFunction)
   }
 }
 module.exports = new MyWorker()
diff --git a/examples/multiFunctionExample.js b/examples/multiFunctionExample.js
new file mode 100644 (file)
index 0000000..8b27f84
--- /dev/null
@@ -0,0 +1,9 @@
+const FixedThreadPool = require('../lib/fixed')
+const pool = new FixedThreadPool(15,
+  './multifunctionWorker.js',
+  { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
+pool.execute({ fname: 'fn0', input: 'hello' }).then(res => console.log(res))
+pool.execute({ fname: 'fn1', input: 'multifunction' }).then(res => console.log(res))
+
+setTimeout(pool.destroy.bind(pool), 3000)
diff --git a/examples/multifunctionWorker.js b/examples/multifunctionWorker.js
new file mode 100644 (file)
index 0000000..87edd57
--- /dev/null
@@ -0,0 +1,19 @@
+'use strict'
+const { ThreadWorker } = require('../lib/workers')
+
+function yourFunction (data) {
+  if (data.fname === 'fn0') {
+    console.log('Executing function 0')
+    return { data: '0 your input was' + data.input }
+  } else if (data.fname === 'fn1') {
+    console.log('Executing function 1')
+    return { data: '1 your input was' + data.input }
+  }
+}
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super(yourFunction)
+  }
+}
+module.exports = new MyWorker()
diff --git a/examples/normal.js b/examples/normal.js
deleted file mode 100644 (file)
index 68ac82c..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-const start = Date.now()
-const toBench = () => {
-  const iterations = 10000
-
-  for (let i = 0; i <= iterations; i++) {
-    const o = {
-      a: i
-    }
-    JSON.stringify(o)
-  }
-}
-
-for (let i = 0; i < 1000; i++) {
-  toBench()
-}
-console.log('Time take is ' + (Date.now() - start))
index 5036d5746be21fe1b48310b7909f0f6f0fb057b3..701aa8cf4d59cf15cd7623b0b09f8cbb0cef8928 100644 (file)
@@ -1,19 +1,20 @@
 'use strict'
 const { ThreadWorker } = require('../lib/workers')
 
+function yourFunction (data) {
+  for (let i = 0; i <= 1000; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+  // console.log('This is the main thread ' + isMainThread)
+  return { ok: 1 }
+}
+
 class MyWorker extends ThreadWorker {
   constructor () {
-    super((data) => {
-      for (let i = 0; i <= 10000; i++) {
-        const o = {
-          a: i
-        }
-        JSON.stringify(o)
-      }
-      // console.log('This is the main thread ' + isMainThread)
-      return { ok: 1 }
-    })
+    super(yourFunction)
   }
 }
-
 module.exports = new MyWorker()
index 645528283d80807cbc5374b5184c64b0f4c83c9e..8d899d57bb4ed00a54e4d1a1f2093641d8bd3f77 100644 (file)
@@ -4,6 +4,7 @@ const {
 } = require('worker_threads')
 
 function empty () {}
+const _void = {}
 /**
  * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
  * This pool will select the worker thread in a round robin fashion. <br>
@@ -50,7 +51,7 @@ class FixedThreadPool {
     this.tasks.set(worker, this.tasks.get(worker) + 1)
     const id = ++this._id
     const res = this._execute(worker, id)
-    worker.postMessage({ data: data, _id: id })
+    worker.postMessage({ data: data || _void, _id: id })
     return res
   }
 
@@ -60,7 +61,8 @@ class FixedThreadPool {
         if (message._id === id) {
           worker.port2.removeListener('message', listener)
           this.tasks.set(worker, this.tasks.get(worker) - 1)
-          resolve(message.data)
+          if (message.error) reject(message.error)
+          else resolve(message.data)
         }
       }
       worker.port2.on('message', listener)
index c192f1812c4a08cf8aa9cdc305a397c0740abdb5..0e8569e5423e3b91617d564e2a80cfde4c3aa45c 100644 (file)
@@ -27,9 +27,7 @@ class ThreadWorker extends AsyncResource {
       if (value && value.data && value._id) {
         // here you will receive messages
         // console.log('This is the main thread ' + isMainThread)
-        const res = this.runInAsyncScope(fn, null, value.data)
-        this.parent.postMessage({ data: res, _id: value._id })
-        this.lastTask = Date.now()
+        this._run(fn, value)
       } else if (value.parent) {
         // save the port to communicate with the main thread
         // this will be received once
@@ -47,6 +45,17 @@ class ThreadWorker extends AsyncResource {
       this.parent.postMessage({ kill: 1 })
     }
   }
+
+  _run (fn, value) {
+    try {
+      const res = this.runInAsyncScope(fn, null, value.data)
+      this.parent.postMessage({ data: res, _id: value._id })
+      this.lastTask = Date.now()
+    } catch (e) {
+      this.parent.postMessage({ error: e, _id: value._id })
+      this.lastTask = Date.now()
+    }
+  }
 }
 
 module.exports.ThreadWorker = ThreadWorker
index 20856725cfb21799ad9bcc5cc122fd02aa038e62..849e4811f2a0ab5cf66010aa270f6540ce5f97a8 100644 (file)
@@ -5,9 +5,10 @@
   "main": "index.js",
   "scripts": {
     "build": "npm install",
-    "test": "standard && nyc mocha --experimental-worker --exit --timeout 10000 tests/*test.js ",
+    "test": "standard && nyc mocha --experimental-worker --exit --timeout 15000 tests/*test.js ",
+    "debug-test": "mocha --inspect-brk --experimental-worker --exit tests/*test.js ",
     "demontest": "nodemon --exec \"npm test\"",
-    "coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls",
+    "coverage": "nyc report --reporter=text-lcov tests/*test.js | coveralls",
     "standard-verbose": "npx standard --verbose",
     "lint": "standard --fix"
   },
index 3784c65110b5161cbd9bb9a25b8eff0c0047d9ec..e23fd9af9fbd32ff210ef178f4cf298f7863af7c 100644 (file)
@@ -3,7 +3,7 @@ const DynamicThreadPool = require('../lib/dynamic')
 const min = 1
 const max = 3
 const pool = new DynamicThreadPool(min, max,
-  './tests/testWorker.js',
+  './tests/workers/testWorker.js',
   { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
 
 describe('Dynamic thread pool test suite ', () => {
@@ -57,7 +57,7 @@ describe('Dynamic thread pool test suite ', () => {
   })
 
   it('Should work even without opts in input', async () => {
-    const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js')
+    const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js')
     const res = await pool1.execute({ test: 'test' })
     expect(res).toBeFalsy()
   })
index 78e960bf547b8a65f91bdc6577765439f0021181..3b485fbf0ffd9db7fc35539f8a52db2164377183 100644 (file)
@@ -2,8 +2,11 @@ const expect = require('expect')
 const FixedThreadPool = require('../lib/fixed')
 const numThreads = 10
 const pool = new FixedThreadPool(numThreads,
-  './tests/testWorker.js',
+  './tests/workers/testWorker.js',
   { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js')
+const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js')
+const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
 
 describe('Fixed thread pool test suite ', () => {
   it('Choose worker round robin test', async () => {
@@ -20,6 +23,37 @@ describe('Fixed thread pool test suite ', () => {
     expect(result).toBeFalsy()
   })
 
+  it('Verify that is possible to invoke the execute method without input', async () => {
+    const result = await pool.execute()
+    expect(result).toBeDefined()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that is possible to have a worker that return undefined', async () => {
+    const result = await emptyPool.execute()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that data are sent to the worker correctly', async () => {
+    const data = { f: 10 }
+    const result = await echoPool.execute(data)
+    expect(result).toBeTruthy()
+    expect(result.f).toBe(data.f)
+  })
+
+  it('Verify that error handling is working properly', async () => {
+    const data = { f: 10 }
+    let inError
+    try {
+      await errorPool.execute(data)
+    } catch (e) {
+      inError = e
+    }
+    expect(inError).toBeTruthy()
+    expect(inError instanceof Error).toBeTruthy()
+    expect(inError.message).toBeTruthy()
+  })
+
   it('Shutdown test', async () => {
     let closedThreads = 0
     pool.workers.forEach(w => {
@@ -45,7 +79,7 @@ describe('Fixed thread pool test suite ', () => {
   })
 
   it('Should work even without opts in input', async () => {
-    const pool1 = new FixedThreadPool(1, './tests/testWorker.js')
+    const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js')
     const res = await pool1.execute({ test: 'test' })
     expect(res).toBeFalsy()
   })
diff --git a/tests/testWorker.js b/tests/testWorker.js
deleted file mode 100644 (file)
index 60e41d7..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-'use strict'
-const { ThreadWorker } = require('../lib/workers')
-const { isMainThread } = require('worker_threads')
-
-class MyWorker extends ThreadWorker {
-  constructor () {
-    super((data) => {
-      for (let i = 0; i <= 50; i++) {
-        const o = {
-          a: i
-        }
-        JSON.stringify(o)
-      }
-      return isMainThread
-    }, { maxInactiveTime: 500 })
-  }
-}
-
-module.exports = new MyWorker()
diff --git a/tests/workers/echoWorker.js b/tests/workers/echoWorker.js
new file mode 100644 (file)
index 0000000..2a5ef89
--- /dev/null
@@ -0,0 +1,14 @@
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+function echo (data) {
+  return data
+}
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super(echo, { maxInactiveTime: 500 })
+  }
+}
+
+module.exports = new MyWorker()
diff --git a/tests/workers/emptyWorker.js b/tests/workers/emptyWorker.js
new file mode 100644 (file)
index 0000000..21f93ae
--- /dev/null
@@ -0,0 +1,13 @@
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+function test (data) {
+}
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super(test, { maxInactiveTime: 500 })
+  }
+}
+
+module.exports = new MyWorker()
diff --git a/tests/workers/errorWorker.js b/tests/workers/errorWorker.js
new file mode 100644 (file)
index 0000000..ee3c74e
--- /dev/null
@@ -0,0 +1,14 @@
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+function error (data) {
+  throw new Error(data)
+}
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super(error, { maxInactiveTime: 500 })
+  }
+}
+
+module.exports = new MyWorker()
diff --git a/tests/workers/testWorker.js b/tests/workers/testWorker.js
new file mode 100644 (file)
index 0000000..5d66f62
--- /dev/null
@@ -0,0 +1,21 @@
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+const { isMainThread } = require('worker_threads')
+
+function test (data) {
+  for (let i = 0; i <= 50; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+  return isMainThread
+}
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super(test, { maxInactiveTime: 500 })
+  }
+}
+
+module.exports = new MyWorker()