From 106744f7518d0f64ce85c4507157092083c2c4d4 Mon Sep 17 00:00:00 2001 From: pioardi Date: Wed, 22 Jan 2020 17:32:20 +0100 Subject: [PATCH] Error handling and unit tests --- .github/CONTRIBUTING.MD | 2 +- README.MD | 33 +++++++++------ benchmarks/bench.js | 72 ++++++++++++++++++++++++-------- benchmarks/myBench.js | 4 +- benchmarks/yourWorker.js | 22 +++++----- examples/multiFunctionExample.js | 9 ++++ examples/multifunctionWorker.js | 19 +++++++++ examples/normal.js | 16 ------- examples/yourWorker.js | 23 +++++----- lib/fixed.js | 6 ++- lib/workers.js | 15 +++++-- package.json | 5 ++- tests/dynamic.test.js | 4 +- tests/fixed.test.js | 38 ++++++++++++++++- tests/testWorker.js | 19 --------- tests/workers/echoWorker.js | 14 +++++++ tests/workers/emptyWorker.js | 13 ++++++ tests/workers/errorWorker.js | 14 +++++++ tests/workers/testWorker.js | 21 ++++++++++ 19 files changed, 249 insertions(+), 100 deletions(-) create mode 100644 examples/multiFunctionExample.js create mode 100644 examples/multifunctionWorker.js delete mode 100644 examples/normal.js delete mode 100644 tests/testWorker.js create mode 100644 tests/workers/echoWorker.js create mode 100644 tests/workers/emptyWorker.js create mode 100644 tests/workers/errorWorker.js create mode 100644 tests/workers/testWorker.js diff --git a/.github/CONTRIBUTING.MD b/.github/CONTRIBUTING.MD index 0636dcaa..bf5bc794 100644 --- a/.github/CONTRIBUTING.MD +++ b/.github/CONTRIBUTING.MD @@ -5,7 +5,7 @@ This repo use standard js style , please use it if you want to contribute
Take tasks from todo list, develop a new feature or fix a bug and do a pull request.
Another thing that you can do to contribute is to build something on top of ring-election and link ring-election to your project
-Please ask your PR to be merged on develop branch .
+Please ask your PR to be merged on master branch .
How to run tests
diff --git a/README.MD b/README.MD index ed090736..35080cbf 100644 --- a/README.MD +++ b/README.MD @@ -3,7 +3,9 @@ [![Dependabot](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot)](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot) [![Actions Status](https://github.com/pioardi/node-pool/workflows/NodeCI/badge.svg)](https://github.com/pioardi/node-pool/actions) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](http://makeapullrequest.com) - +[![NODEP](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen +)](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen +)

Contents

@@ -25,7 +27,7 @@

Overview

Node pool contains two worker-threads pool implementations , you don' t have to deal with worker-threads complexity.
The first implementation is a static thread pool , with a defined number of threads that are started at creation time and will be reused.
-The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit ), the new created threads will be stopped after a configurable period of inactivity.
+The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit, these threads will be reused when active ), the new created threads will be stopped after a configurable period of inactivity.
You have to implement your worker extending the ThreadWorker class

Installation

@@ -40,13 +42,15 @@ You can implement a worker in a simple way , extending the class ThreadWorker : 'use strict' const { ThreadWorker } = require('node-pool') +function yourFunction (data) { + // this will be executed in the worker thread, + // the data will be received by using the execute method + return { ok: 1 } +} + class MyWorker extends ThreadWorker { constructor () { - super((data) => { - // this will be executed in the worker thread, - // the data will be received by using the execute method - return { ok: 1 } - }, { maxInactiveTime: 1000 * 60}) + super(yourFunction, { maxInactiveTime: 1000 * 60}) } } module.exports = new MyWorker() @@ -60,11 +64,14 @@ const { FixedThreadPool, DynamicThreadPool } = require('node-pool') // a fixed thread pool const pool = new FixedThreadPool(15, - './yourWorker.js') + './yourWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) // or a dynamic thread pool const pool = new DynamicThreadPool(10, 100, - './yourWorker.js') + './yourWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) + pool.emitter.on('FullPool', () => console.log('Pool is full')) // the execute method signature is the same for both implementations, @@ -75,7 +82,7 @@ pool.execute({}).then(res => { ``` - See examples folder for more details. + See examples folder for more details ( in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js) ).

Node versions

@@ -113,12 +120,12 @@ This method will call the terminate method on each worker. - `maxInactiveTime` - Max time to wait tasks to work on ( in ms) , after this period the new worker threads will die.

Choose your pool

-Performance is one of the main target of these thread pool implementation, we want to have a strong focus on this.
+Performance is one of the main target of these thread pool implementations, we want to have a strong focus on this.
We already have a bench folder where you can find some comparisons. To choose your pool consider that with a FixedThreadPool or a DynamicThreadPool ( in this case is important the min parameter passed to the constructor) your application memory footprint will increase .
-Increasing the memory footprint your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory.
+Increasing the memory footprint, your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory.
One good choose from my point of view is to profile your application using Fixed/Dynamic thread pool , and to see your application metrics when you increase/decrease the num of threads.
-For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when requests, this is the advantage to use the DynamicThreadPool.
+For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when needed, this is the advantage to use the DynamicThreadPool.
But in general , always profile your application

Contribute

diff --git a/benchmarks/bench.js b/benchmarks/bench.js index a9515bff..d7b2f1a3 100644 --- a/benchmarks/bench.js +++ b/benchmarks/bench.js @@ -3,52 +3,90 @@ const suite = new Benchmark.Suite() const FixedThreadPool = require('../lib/fixed') const DynamicThreadPool = require('../lib/dynamic') const Pool = require('worker-threads-pool') -const size = 40 -const externalPool = new Pool({ max: size }) +const size = 30 +const tasks = 1 +// pools +const externalPool = new Pool({ max: size }) const fixedPool = new FixedThreadPool(size, './yourWorker.js', { maxTasks: 10000 }) -const dynamicPool = new DynamicThreadPool(size, size * 2, './yourWorker.js', { maxTasks: 10000 }) +const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 }) const workerData = { proof: 'ok' } -let executions = 0 -let executions1 = 0 // wait some seconds before start, my pools need to load threads !!! setTimeout(async () => { test() }, 3000) -async function test () { - // add tests - suite.add('PioardiStaticPool', async function () { - executions++ - await fixedPool.execute(workerData) +// fixed pool proof +async function fixedTest () { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + fixedPool.execute(workerData).then(res => { + executions++ + if (executions === tasks) { + resolve('FINISH') + } + }) + } }) +} - .add('ExternalPool', async function () { - await new Promise((resolve, reject) => { +async function dynamicTest () { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + dynamicPool.execute(workerData).then(res => { + executions++ + if (executions === tasks) { + resolve('FINISH') + } + }) + } + }) +} + +async function externalPoolTest () { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + new Promise((resolve, reject) => { externalPool.acquire('./externalWorker.js', { workerData: workerData }, (err, worker) => { if (err) { return reject(err) } - executions1++ worker.on('error', reject) worker.on('message', res => { + executions++ resolve(res) }) }) + }).then(res => { + if (tasks === executions) { + resolve('FINISH') + } }) - }) + } + }) +} + +async function test () { + // add tests + suite.add('PioardiStaticPool', async function () { + await fixedTest() + }) .add('PioardiDynamicPool', async function () { - await dynamicPool.execute(workerData) + await dynamicTest() + }) + .add('ExternalPool', async function () { + await externalPoolTest() }) // add listeners .on('cycle', function (event) { console.log(String(event.target)) }) .on('complete', function () { - console.log(executions) - console.log(executions1) this.filter('fastest').map('name') console.log('Fastest is ' + this.filter('fastest').map('name')) }) diff --git a/benchmarks/myBench.js b/benchmarks/myBench.js index 45f11b29..9c1d713f 100644 --- a/benchmarks/myBench.js +++ b/benchmarks/myBench.js @@ -2,12 +2,12 @@ const FixedThreadPool = require('../lib/fixed') const DynamicThreadPool = require('../lib/dynamic') const Pool = require('worker-threads-pool') const tasks = 1000 -const size = 10 +const size = 16 // pools const externalPool = new Pool({ max: size }) const fixedPool = new FixedThreadPool(size, './yourWorker.js', { maxTasks: 10000 }) -const dynamicPool = new DynamicThreadPool(size / 2, 50, './yourWorker.js', { maxTasks: 10000 }) +const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 }) // data const workerData = { proof: 'ok' } diff --git a/benchmarks/yourWorker.js b/benchmarks/yourWorker.js index 9c250220..701aa8cf 100644 --- a/benchmarks/yourWorker.js +++ b/benchmarks/yourWorker.js @@ -1,18 +1,20 @@ 'use strict' const { ThreadWorker } = require('../lib/workers') +function yourFunction (data) { + for (let i = 0; i <= 1000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + // console.log('This is the main thread ' + isMainThread) + return { ok: 1 } +} + class MyWorker extends ThreadWorker { constructor () { - super((data) => { - for (let i = 0; i <= 1000; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - // console.log('This is the main thread ' + isMainThread) - return { ok: 1 } - }) + super(yourFunction) } } module.exports = new MyWorker() diff --git a/examples/multiFunctionExample.js b/examples/multiFunctionExample.js new file mode 100644 index 00000000..8b27f841 --- /dev/null +++ b/examples/multiFunctionExample.js @@ -0,0 +1,9 @@ +const FixedThreadPool = require('../lib/fixed') +const pool = new FixedThreadPool(15, + './multifunctionWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) + +pool.execute({ fname: 'fn0', input: 'hello' }).then(res => console.log(res)) +pool.execute({ fname: 'fn1', input: 'multifunction' }).then(res => console.log(res)) + +setTimeout(pool.destroy.bind(pool), 3000) diff --git a/examples/multifunctionWorker.js b/examples/multifunctionWorker.js new file mode 100644 index 00000000..87edd57d --- /dev/null +++ b/examples/multifunctionWorker.js @@ -0,0 +1,19 @@ +'use strict' +const { ThreadWorker } = require('../lib/workers') + +function yourFunction (data) { + if (data.fname === 'fn0') { + console.log('Executing function 0') + return { data: '0 your input was' + data.input } + } else if (data.fname === 'fn1') { + console.log('Executing function 1') + return { data: '1 your input was' + data.input } + } +} + +class MyWorker extends ThreadWorker { + constructor () { + super(yourFunction) + } +} +module.exports = new MyWorker() diff --git a/examples/normal.js b/examples/normal.js deleted file mode 100644 index 68ac82c4..00000000 --- a/examples/normal.js +++ /dev/null @@ -1,16 +0,0 @@ -const start = Date.now() -const toBench = () => { - const iterations = 10000 - - for (let i = 0; i <= iterations; i++) { - const o = { - a: i - } - JSON.stringify(o) - } -} - -for (let i = 0; i < 1000; i++) { - toBench() -} -console.log('Time take is ' + (Date.now() - start)) diff --git a/examples/yourWorker.js b/examples/yourWorker.js index 5036d574..701aa8cf 100644 --- a/examples/yourWorker.js +++ b/examples/yourWorker.js @@ -1,19 +1,20 @@ 'use strict' const { ThreadWorker } = require('../lib/workers') +function yourFunction (data) { + for (let i = 0; i <= 1000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + // console.log('This is the main thread ' + isMainThread) + return { ok: 1 } +} + class MyWorker extends ThreadWorker { constructor () { - super((data) => { - for (let i = 0; i <= 10000; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - // console.log('This is the main thread ' + isMainThread) - return { ok: 1 } - }) + super(yourFunction) } } - module.exports = new MyWorker() diff --git a/lib/fixed.js b/lib/fixed.js index 64552828..8d899d57 100644 --- a/lib/fixed.js +++ b/lib/fixed.js @@ -4,6 +4,7 @@ const { } = require('worker_threads') function empty () {} +const _void = {} /** * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This pool will select the worker thread in a round robin fashion.
@@ -50,7 +51,7 @@ class FixedThreadPool { this.tasks.set(worker, this.tasks.get(worker) + 1) const id = ++this._id const res = this._execute(worker, id) - worker.postMessage({ data: data, _id: id }) + worker.postMessage({ data: data || _void, _id: id }) return res } @@ -60,7 +61,8 @@ class FixedThreadPool { if (message._id === id) { worker.port2.removeListener('message', listener) this.tasks.set(worker, this.tasks.get(worker) - 1) - resolve(message.data) + if (message.error) reject(message.error) + else resolve(message.data) } } worker.port2.on('message', listener) diff --git a/lib/workers.js b/lib/workers.js index c192f181..0e8569e5 100644 --- a/lib/workers.js +++ b/lib/workers.js @@ -27,9 +27,7 @@ class ThreadWorker extends AsyncResource { if (value && value.data && value._id) { // here you will receive messages // console.log('This is the main thread ' + isMainThread) - const res = this.runInAsyncScope(fn, null, value.data) - this.parent.postMessage({ data: res, _id: value._id }) - this.lastTask = Date.now() + this._run(fn, value) } else if (value.parent) { // save the port to communicate with the main thread // this will be received once @@ -47,6 +45,17 @@ class ThreadWorker extends AsyncResource { this.parent.postMessage({ kill: 1 }) } } + + _run (fn, value) { + try { + const res = this.runInAsyncScope(fn, null, value.data) + this.parent.postMessage({ data: res, _id: value._id }) + this.lastTask = Date.now() + } catch (e) { + this.parent.postMessage({ error: e, _id: value._id }) + this.lastTask = Date.now() + } + } } module.exports.ThreadWorker = ThreadWorker diff --git a/package.json b/package.json index 20856725..849e4811 100644 --- a/package.json +++ b/package.json @@ -5,9 +5,10 @@ "main": "index.js", "scripts": { "build": "npm install", - "test": "standard && nyc mocha --experimental-worker --exit --timeout 10000 tests/*test.js ", + "test": "standard && nyc mocha --experimental-worker --exit --timeout 15000 tests/*test.js ", + "debug-test": "mocha --inspect-brk --experimental-worker --exit tests/*test.js ", "demontest": "nodemon --exec \"npm test\"", - "coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls", + "coverage": "nyc report --reporter=text-lcov tests/*test.js | coveralls", "standard-verbose": "npx standard --verbose", "lint": "standard --fix" }, diff --git a/tests/dynamic.test.js b/tests/dynamic.test.js index 3784c651..e23fd9af 100644 --- a/tests/dynamic.test.js +++ b/tests/dynamic.test.js @@ -3,7 +3,7 @@ const DynamicThreadPool = require('../lib/dynamic') const min = 1 const max = 3 const pool = new DynamicThreadPool(min, max, - './tests/testWorker.js', + './tests/workers/testWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) describe('Dynamic thread pool test suite ', () => { @@ -57,7 +57,7 @@ describe('Dynamic thread pool test suite ', () => { }) it('Should work even without opts in input', async () => { - const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js') + const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js') const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) diff --git a/tests/fixed.test.js b/tests/fixed.test.js index 78e960bf..3b485fbf 100644 --- a/tests/fixed.test.js +++ b/tests/fixed.test.js @@ -2,8 +2,11 @@ const expect = require('expect') const FixedThreadPool = require('../lib/fixed') const numThreads = 10 const pool = new FixedThreadPool(numThreads, - './tests/testWorker.js', + './tests/workers/testWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) +const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js') +const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js') +const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) describe('Fixed thread pool test suite ', () => { it('Choose worker round robin test', async () => { @@ -20,6 +23,37 @@ describe('Fixed thread pool test suite ', () => { expect(result).toBeFalsy() }) + it('Verify that is possible to invoke the execute method without input', async () => { + const result = await pool.execute() + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Verify that is possible to have a worker that return undefined', async () => { + const result = await emptyPool.execute() + expect(result).toBeFalsy() + }) + + it('Verify that data are sent to the worker correctly', async () => { + const data = { f: 10 } + const result = await echoPool.execute(data) + expect(result).toBeTruthy() + expect(result.f).toBe(data.f) + }) + + it('Verify that error handling is working properly', async () => { + const data = { f: 10 } + let inError + try { + await errorPool.execute(data) + } catch (e) { + inError = e + } + expect(inError).toBeTruthy() + expect(inError instanceof Error).toBeTruthy() + expect(inError.message).toBeTruthy() + }) + it('Shutdown test', async () => { let closedThreads = 0 pool.workers.forEach(w => { @@ -45,7 +79,7 @@ describe('Fixed thread pool test suite ', () => { }) it('Should work even without opts in input', async () => { - const pool1 = new FixedThreadPool(1, './tests/testWorker.js') + const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js') const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) diff --git a/tests/testWorker.js b/tests/testWorker.js deleted file mode 100644 index 60e41d79..00000000 --- a/tests/testWorker.js +++ /dev/null @@ -1,19 +0,0 @@ -'use strict' -const { ThreadWorker } = require('../lib/workers') -const { isMainThread } = require('worker_threads') - -class MyWorker extends ThreadWorker { - constructor () { - super((data) => { - for (let i = 0; i <= 50; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - return isMainThread - }, { maxInactiveTime: 500 }) - } -} - -module.exports = new MyWorker() diff --git a/tests/workers/echoWorker.js b/tests/workers/echoWorker.js new file mode 100644 index 00000000..2a5ef89d --- /dev/null +++ b/tests/workers/echoWorker.js @@ -0,0 +1,14 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') + +function echo (data) { + return data +} + +class MyWorker extends ThreadWorker { + constructor () { + super(echo, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/workers/emptyWorker.js b/tests/workers/emptyWorker.js new file mode 100644 index 00000000..21f93aeb --- /dev/null +++ b/tests/workers/emptyWorker.js @@ -0,0 +1,13 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') + +function test (data) { +} + +class MyWorker extends ThreadWorker { + constructor () { + super(test, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/workers/errorWorker.js b/tests/workers/errorWorker.js new file mode 100644 index 00000000..ee3c74e7 --- /dev/null +++ b/tests/workers/errorWorker.js @@ -0,0 +1,14 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') + +function error (data) { + throw new Error(data) +} + +class MyWorker extends ThreadWorker { + constructor () { + super(error, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/workers/testWorker.js b/tests/workers/testWorker.js new file mode 100644 index 00000000..5d66f62e --- /dev/null +++ b/tests/workers/testWorker.js @@ -0,0 +1,21 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') +const { isMainThread } = require('worker_threads') + +function test (data) { + for (let i = 0; i <= 50; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + return isMainThread +} + +class MyWorker extends ThreadWorker { + constructor () { + super(test, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() -- 2.34.1