From: Alessandro Pio Ardizio Date: Thu, 23 Jan 2020 18:42:10 +0000 (+0100) Subject: Merge pull request #1 from pioardi/dependabot/npm_and_yarn/expect-25.1.0 X-Git-Tag: v1.0.0~1 X-Git-Url: https://git.piment-noir.org/?a=commitdiff_plain;h=a3c8691eb5bd772a43746fd5860d54a786463039;hp=ee01008a726c54be4bb46a5c56774e50a2f89981;p=poolifier.git Merge pull request #1 from pioardi/dependabot/npm_and_yarn/expect-25.1.0 Bump expect from 24.9.0 to 25.1.0 --- diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b351307a..bbe2d690 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,6 +1,6 @@ name: NodeCI -on: [push] +on: [push, pull_request] jobs: build: @@ -9,7 +9,7 @@ jobs: strategy: matrix: - node-version: [10.x, 12.x] + node-version: [12.x, 13.x] steps: - uses: actions/checkout@v1 @@ -22,5 +22,7 @@ jobs: npm ci npm run build --if-present npm run test + export COVERALLS_REPO_TOKEN=${{ secrets.COVERALLS_REPO_TOKEN }} + npm run coverage env: CI: true diff --git a/.github/CONTRIBUTING.MD b/CONTRIBUTING.md similarity index 90% rename from .github/CONTRIBUTING.MD rename to CONTRIBUTING.md index 0636dcaa..bf5bc794 100644 --- a/.github/CONTRIBUTING.MD +++ b/CONTRIBUTING.md @@ -5,7 +5,7 @@ This repo use standard js style , please use it if you want to contribute
Take tasks from todo list, develop a new feature or fix a bug and do a pull request.
Another thing that you can do to contribute is to build something on top of ring-election and link ring-election to your project
-Please ask your PR to be merged on develop branch .
+Please ask your PR to be merged on master branch .
How to run tests
diff --git a/README.MD b/README.MD index ed090736..5ab24077 100644 --- a/README.MD +++ b/README.MD @@ -2,8 +2,11 @@ [![JavaScript Style Guide](https://img.shields.io/badge/code_style-standard-brightgreen.svg)](https://standardjs.com) [![Dependabot](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot)](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot) [![Actions Status](https://github.com/pioardi/node-pool/workflows/NodeCI/badge.svg)](https://github.com/pioardi/node-pool/actions) +[![Coverage Status](https://coveralls.io/repos/github/pioardi/node-thread-pool/badge.svg?branch=master)](https://coveralls.io/github/pioardi/node-thread-pool?branch=master) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](http://makeapullrequest.com) - +[![NODEP](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen +)](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen +)

Contents

@@ -25,12 +28,12 @@

Overview

Node pool contains two worker-threads pool implementations , you don' t have to deal with worker-threads complexity.
The first implementation is a static thread pool , with a defined number of threads that are started at creation time and will be reused.
-The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit ), the new created threads will be stopped after a configurable period of inactivity.
+The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit, these threads will be reused when active ), the new created threads will be stopped after a configurable period of inactivity.
You have to implement your worker extending the ThreadWorker class

Installation

``` -npm install node-thread-pool --save +npm install poolifier --save ```

Usage

@@ -38,15 +41,17 @@ You can implement a worker in a simple way , extending the class ThreadWorker : ```js 'use strict' -const { ThreadWorker } = require('node-pool') +const { ThreadWorker } = require('poolifier') + +function yourFunction (data) { + // this will be executed in the worker thread, + // the data will be received by using the execute method + return { ok: 1 } +} class MyWorker extends ThreadWorker { constructor () { - super((data) => { - // this will be executed in the worker thread, - // the data will be received by using the execute method - return { ok: 1 } - }, { maxInactiveTime: 1000 * 60}) + super(yourFunction, { maxInactiveTime: 1000 * 60}) } } module.exports = new MyWorker() @@ -56,15 +61,18 @@ Instantiate your pool based on your needed : ```js 'use strict' -const { FixedThreadPool, DynamicThreadPool } = require('node-pool') +const { FixedThreadPool, DynamicThreadPool } = require('poolifier') // a fixed thread pool const pool = new FixedThreadPool(15, - './yourWorker.js') + './yourWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) // or a dynamic thread pool const pool = new DynamicThreadPool(10, 100, - './yourWorker.js') + './yourWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) + pool.emitter.on('FullPool', () => console.log('Pool is full')) // the execute method signature is the same for both implementations, @@ -75,11 +83,11 @@ pool.execute({}).then(res => { ``` - See examples folder for more details. + See examples folder for more details ( in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js) ).

Node versions

-You can use node version 10.x with --experimental-worker flag, or you can use an higher version (i.e 12.x)
+You can use node versions 12.x , 13.x

API

@@ -113,20 +121,19 @@ This method will call the terminate method on each worker. - `maxInactiveTime` - Max time to wait tasks to work on ( in ms) , after this period the new worker threads will die.

Choose your pool

-Performance is one of the main target of these thread pool implementation, we want to have a strong focus on this.
+Performance is one of the main target of these thread pool implementations, we want to have a strong focus on this.
We already have a bench folder where you can find some comparisons. To choose your pool consider that with a FixedThreadPool or a DynamicThreadPool ( in this case is important the min parameter passed to the constructor) your application memory footprint will increase .
-Increasing the memory footprint your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory.
+Increasing the memory footprint, your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory.
One good choose from my point of view is to profile your application using Fixed/Dynamic thread pool , and to see your application metrics when you increase/decrease the num of threads.
-For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when requests, this is the advantage to use the DynamicThreadPool.
+For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when needed, this is the advantage to use the DynamicThreadPool.
But in general , always profile your application

Contribute

-See guidelines [CONTRIBUTING](./.github/CONTRIBUTING.md) +See guidelines [CONTRIBUTING](CONTRIBUTING.md)

License

[MIT](./LICENSE) - diff --git a/benchmarks/bench.js b/benchmarks/bench.js index a9515bff..d7b2f1a3 100644 --- a/benchmarks/bench.js +++ b/benchmarks/bench.js @@ -3,52 +3,90 @@ const suite = new Benchmark.Suite() const FixedThreadPool = require('../lib/fixed') const DynamicThreadPool = require('../lib/dynamic') const Pool = require('worker-threads-pool') -const size = 40 -const externalPool = new Pool({ max: size }) +const size = 30 +const tasks = 1 +// pools +const externalPool = new Pool({ max: size }) const fixedPool = new FixedThreadPool(size, './yourWorker.js', { maxTasks: 10000 }) -const dynamicPool = new DynamicThreadPool(size, size * 2, './yourWorker.js', { maxTasks: 10000 }) +const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 }) const workerData = { proof: 'ok' } -let executions = 0 -let executions1 = 0 // wait some seconds before start, my pools need to load threads !!! setTimeout(async () => { test() }, 3000) -async function test () { - // add tests - suite.add('PioardiStaticPool', async function () { - executions++ - await fixedPool.execute(workerData) +// fixed pool proof +async function fixedTest () { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + fixedPool.execute(workerData).then(res => { + executions++ + if (executions === tasks) { + resolve('FINISH') + } + }) + } }) +} - .add('ExternalPool', async function () { - await new Promise((resolve, reject) => { +async function dynamicTest () { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + dynamicPool.execute(workerData).then(res => { + executions++ + if (executions === tasks) { + resolve('FINISH') + } + }) + } + }) +} + +async function externalPoolTest () { + return new Promise((resolve, reject) => { + let executions = 0 + for (let i = 0; i <= tasks; i++) { + new Promise((resolve, reject) => { externalPool.acquire('./externalWorker.js', { workerData: workerData }, (err, worker) => { if (err) { return reject(err) } - executions1++ worker.on('error', reject) worker.on('message', res => { + executions++ resolve(res) }) }) + }).then(res => { + if (tasks === executions) { + resolve('FINISH') + } }) - }) + } + }) +} + +async function test () { + // add tests + suite.add('PioardiStaticPool', async function () { + await fixedTest() + }) .add('PioardiDynamicPool', async function () { - await dynamicPool.execute(workerData) + await dynamicTest() + }) + .add('ExternalPool', async function () { + await externalPoolTest() }) // add listeners .on('cycle', function (event) { console.log(String(event.target)) }) .on('complete', function () { - console.log(executions) - console.log(executions1) this.filter('fastest').map('name') console.log('Fastest is ' + this.filter('fastest').map('name')) }) diff --git a/benchmarks/myBench.js b/benchmarks/myBench.js index 45f11b29..9c1d713f 100644 --- a/benchmarks/myBench.js +++ b/benchmarks/myBench.js @@ -2,12 +2,12 @@ const FixedThreadPool = require('../lib/fixed') const DynamicThreadPool = require('../lib/dynamic') const Pool = require('worker-threads-pool') const tasks = 1000 -const size = 10 +const size = 16 // pools const externalPool = new Pool({ max: size }) const fixedPool = new FixedThreadPool(size, './yourWorker.js', { maxTasks: 10000 }) -const dynamicPool = new DynamicThreadPool(size / 2, 50, './yourWorker.js', { maxTasks: 10000 }) +const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 }) // data const workerData = { proof: 'ok' } diff --git a/benchmarks/yourWorker.js b/benchmarks/yourWorker.js index 9c250220..701aa8cf 100644 --- a/benchmarks/yourWorker.js +++ b/benchmarks/yourWorker.js @@ -1,18 +1,20 @@ 'use strict' const { ThreadWorker } = require('../lib/workers') +function yourFunction (data) { + for (let i = 0; i <= 1000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + // console.log('This is the main thread ' + isMainThread) + return { ok: 1 } +} + class MyWorker extends ThreadWorker { constructor () { - super((data) => { - for (let i = 0; i <= 1000; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - // console.log('This is the main thread ' + isMainThread) - return { ok: 1 } - }) + super(yourFunction) } } module.exports = new MyWorker() diff --git a/examples/multiFunctionExample.js b/examples/multiFunctionExample.js new file mode 100644 index 00000000..8b27f841 --- /dev/null +++ b/examples/multiFunctionExample.js @@ -0,0 +1,9 @@ +const FixedThreadPool = require('../lib/fixed') +const pool = new FixedThreadPool(15, + './multifunctionWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) + +pool.execute({ fname: 'fn0', input: 'hello' }).then(res => console.log(res)) +pool.execute({ fname: 'fn1', input: 'multifunction' }).then(res => console.log(res)) + +setTimeout(pool.destroy.bind(pool), 3000) diff --git a/examples/multifunctionWorker.js b/examples/multifunctionWorker.js new file mode 100644 index 00000000..87edd57d --- /dev/null +++ b/examples/multifunctionWorker.js @@ -0,0 +1,19 @@ +'use strict' +const { ThreadWorker } = require('../lib/workers') + +function yourFunction (data) { + if (data.fname === 'fn0') { + console.log('Executing function 0') + return { data: '0 your input was' + data.input } + } else if (data.fname === 'fn1') { + console.log('Executing function 1') + return { data: '1 your input was' + data.input } + } +} + +class MyWorker extends ThreadWorker { + constructor () { + super(yourFunction) + } +} +module.exports = new MyWorker() diff --git a/examples/normal.js b/examples/normal.js deleted file mode 100644 index 68ac82c4..00000000 --- a/examples/normal.js +++ /dev/null @@ -1,16 +0,0 @@ -const start = Date.now() -const toBench = () => { - const iterations = 10000 - - for (let i = 0; i <= iterations; i++) { - const o = { - a: i - } - JSON.stringify(o) - } -} - -for (let i = 0; i < 1000; i++) { - toBench() -} -console.log('Time take is ' + (Date.now() - start)) diff --git a/examples/yourWorker.js b/examples/yourWorker.js index 5036d574..701aa8cf 100644 --- a/examples/yourWorker.js +++ b/examples/yourWorker.js @@ -1,19 +1,20 @@ 'use strict' const { ThreadWorker } = require('../lib/workers') +function yourFunction (data) { + for (let i = 0; i <= 1000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + // console.log('This is the main thread ' + isMainThread) + return { ok: 1 } +} + class MyWorker extends ThreadWorker { constructor () { - super((data) => { - for (let i = 0; i <= 10000; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - // console.log('This is the main thread ' + isMainThread) - return { ok: 1 } - }) + super(yourFunction) } } - module.exports = new MyWorker() diff --git a/lib/fixed.js b/lib/fixed.js index 64552828..8d899d57 100644 --- a/lib/fixed.js +++ b/lib/fixed.js @@ -4,6 +4,7 @@ const { } = require('worker_threads') function empty () {} +const _void = {} /** * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This pool will select the worker thread in a round robin fashion.
@@ -50,7 +51,7 @@ class FixedThreadPool { this.tasks.set(worker, this.tasks.get(worker) + 1) const id = ++this._id const res = this._execute(worker, id) - worker.postMessage({ data: data, _id: id }) + worker.postMessage({ data: data || _void, _id: id }) return res } @@ -60,7 +61,8 @@ class FixedThreadPool { if (message._id === id) { worker.port2.removeListener('message', listener) this.tasks.set(worker, this.tasks.get(worker) - 1) - resolve(message.data) + if (message.error) reject(message.error) + else resolve(message.data) } } worker.port2.on('message', listener) diff --git a/lib/workers.js b/lib/workers.js index c192f181..103061aa 100644 --- a/lib/workers.js +++ b/lib/workers.js @@ -27,9 +27,7 @@ class ThreadWorker extends AsyncResource { if (value && value.data && value._id) { // here you will receive messages // console.log('This is the main thread ' + isMainThread) - const res = this.runInAsyncScope(fn, null, value.data) - this.parent.postMessage({ data: res, _id: value._id }) - this.lastTask = Date.now() + this.runInAsyncScope(this._run.bind(this), this, fn, value) } else if (value.parent) { // save the port to communicate with the main thread // this will be received once @@ -47,6 +45,17 @@ class ThreadWorker extends AsyncResource { this.parent.postMessage({ kill: 1 }) } } + + _run (fn, value) { + try { + const res = fn(value.data) + this.parent.postMessage({ data: res, _id: value._id }) + this.lastTask = Date.now() + } catch (e) { + this.parent.postMessage({ error: e, _id: value._id }) + this.lastTask = Date.now() + } + } } module.exports.ThreadWorker = ThreadWorker diff --git a/package.json b/package.json index 53d9e0d2..88d328a2 100644 --- a/package.json +++ b/package.json @@ -1,19 +1,20 @@ { - "name": "node-thread-pool", - "version": "0.0.1", + "name": "poolifier", + "version": "0.0.2", "description": "Library on top of node js worker threads that implement various worker pools type", "main": "index.js", "scripts": { "build": "npm install", - "test": "standard && nyc mocha --experimental-worker --exit --timeout 10000 tests/*test.js ", + "test": "standard && nyc mocha --exit --timeout 20000 tests/*test.js ", + "debug-test": "mocha --inspect-brk --exit tests/*test.js ", "demontest": "nodemon --exec \"npm test\"", - "coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls", + "coverage": "nyc report --reporter=text-lcov | coveralls", "standard-verbose": "npx standard --verbose", "lint": "standard --fix" }, "repository": { "type": "git", - "url": "git+https://github.com/pioardi/node-thread-pool.git" + "url": "git+https://github.com/pioardi/poolifier.git" }, "keywords": [ "node", @@ -33,9 +34,9 @@ "author": "pioardi", "license": "MIT", "bugs": { - "url": "https://github.com/pioardi/node-thread-pool/issues" + "url": "https://github.com/pioardi/poolifier/issues" }, - "homepage": "https://github.com/pioardi/node-thread-pool#readme", + "homepage": "https://github.com/pioardi/poolifier#readme", "devDependencies": { "benchmark": "^2.1.4", "coveralls": "^3.0.9", @@ -45,5 +46,8 @@ "nyc": "^15.0.0", "standard": "^14.3.1", "worker-threads-pool": "^2.0.0" + }, + "engines": { + "node": ">=12.0.0" } } diff --git a/tests/dynamic.test.js b/tests/dynamic.test.js index 3784c651..e23fd9af 100644 --- a/tests/dynamic.test.js +++ b/tests/dynamic.test.js @@ -3,7 +3,7 @@ const DynamicThreadPool = require('../lib/dynamic') const min = 1 const max = 3 const pool = new DynamicThreadPool(min, max, - './tests/testWorker.js', + './tests/workers/testWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) describe('Dynamic thread pool test suite ', () => { @@ -57,7 +57,7 @@ describe('Dynamic thread pool test suite ', () => { }) it('Should work even without opts in input', async () => { - const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js') + const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js') const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) diff --git a/tests/fixed.test.js b/tests/fixed.test.js index 78e960bf..3b485fbf 100644 --- a/tests/fixed.test.js +++ b/tests/fixed.test.js @@ -2,8 +2,11 @@ const expect = require('expect') const FixedThreadPool = require('../lib/fixed') const numThreads = 10 const pool = new FixedThreadPool(numThreads, - './tests/testWorker.js', + './tests/workers/testWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) +const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js') +const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js') +const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) describe('Fixed thread pool test suite ', () => { it('Choose worker round robin test', async () => { @@ -20,6 +23,37 @@ describe('Fixed thread pool test suite ', () => { expect(result).toBeFalsy() }) + it('Verify that is possible to invoke the execute method without input', async () => { + const result = await pool.execute() + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Verify that is possible to have a worker that return undefined', async () => { + const result = await emptyPool.execute() + expect(result).toBeFalsy() + }) + + it('Verify that data are sent to the worker correctly', async () => { + const data = { f: 10 } + const result = await echoPool.execute(data) + expect(result).toBeTruthy() + expect(result.f).toBe(data.f) + }) + + it('Verify that error handling is working properly', async () => { + const data = { f: 10 } + let inError + try { + await errorPool.execute(data) + } catch (e) { + inError = e + } + expect(inError).toBeTruthy() + expect(inError instanceof Error).toBeTruthy() + expect(inError.message).toBeTruthy() + }) + it('Shutdown test', async () => { let closedThreads = 0 pool.workers.forEach(w => { @@ -45,7 +79,7 @@ describe('Fixed thread pool test suite ', () => { }) it('Should work even without opts in input', async () => { - const pool1 = new FixedThreadPool(1, './tests/testWorker.js') + const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js') const res = await pool1.execute({ test: 'test' }) expect(res).toBeFalsy() }) diff --git a/tests/testWorker.js b/tests/testWorker.js deleted file mode 100644 index 60e41d79..00000000 --- a/tests/testWorker.js +++ /dev/null @@ -1,19 +0,0 @@ -'use strict' -const { ThreadWorker } = require('../lib/workers') -const { isMainThread } = require('worker_threads') - -class MyWorker extends ThreadWorker { - constructor () { - super((data) => { - for (let i = 0; i <= 50; i++) { - const o = { - a: i - } - JSON.stringify(o) - } - return isMainThread - }, { maxInactiveTime: 500 }) - } -} - -module.exports = new MyWorker() diff --git a/tests/workers/echoWorker.js b/tests/workers/echoWorker.js new file mode 100644 index 00000000..2a5ef89d --- /dev/null +++ b/tests/workers/echoWorker.js @@ -0,0 +1,14 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') + +function echo (data) { + return data +} + +class MyWorker extends ThreadWorker { + constructor () { + super(echo, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/workers/emptyWorker.js b/tests/workers/emptyWorker.js new file mode 100644 index 00000000..21f93aeb --- /dev/null +++ b/tests/workers/emptyWorker.js @@ -0,0 +1,13 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') + +function test (data) { +} + +class MyWorker extends ThreadWorker { + constructor () { + super(test, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/workers/errorWorker.js b/tests/workers/errorWorker.js new file mode 100644 index 00000000..ee3c74e7 --- /dev/null +++ b/tests/workers/errorWorker.js @@ -0,0 +1,14 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') + +function error (data) { + throw new Error(data) +} + +class MyWorker extends ThreadWorker { + constructor () { + super(error, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/workers/testWorker.js b/tests/workers/testWorker.js new file mode 100644 index 00000000..5d66f62e --- /dev/null +++ b/tests/workers/testWorker.js @@ -0,0 +1,21 @@ +'use strict' +const { ThreadWorker } = require('../../lib/workers') +const { isMainThread } = require('worker_threads') + +function test (data) { + for (let i = 0; i <= 50; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + return isMainThread +} + +class MyWorker extends ThreadWorker { + constructor () { + super(test, { maxInactiveTime: 500 }) + } +} + +module.exports = new MyWorker()