Take tasks from todo list, develop a new feature or fix a bug and do a pull request.<br>
Another thing that you can do to contribute is to build something on top of ring-election and link ring-election to your project <br>
-Please ask your PR to be merged on <strong>develop</strong> branch . <br>
+Please ask your PR to be merged on <strong>master</strong> branch . <br>
<strong>How to run tests</strong><br>
[![Dependabot](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot)](https://badgen.net/dependabot/dependabot/dependabot-core/?icon=dependabot)
[![Actions Status](https://github.com/pioardi/node-pool/workflows/NodeCI/badge.svg)](https://github.com/pioardi/node-pool/actions)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg?style=flat-square)](http://makeapullrequest.com)
-
+[![NODEP](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen
+)](https://img.shields.io/static/v1?label=dependencies&message=no%20dependencies&color=brightgreen
+)
<h2>Contents </h2>
<h3 align="center">
<h2> Overview </h2>
Node pool contains two <a href="https://nodejs.org/api/worker_threads.html#worker_threads_worker_threads">worker-threads </a> pool implementations , you don' t have to deal with worker-threads complexity. <br>
The first implementation is a static thread pool , with a defined number of threads that are started at creation time and will be reused.<br>
-The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit ), the new created threads will be stopped after a configurable period of inactivity. <br>
+The second implementation is a dynamic thread pool with a number of threads started at creation time ( these threads will be always active and reused) and other threads created when the load will increase ( with an upper limit, these threads will be reused when active ), the new created threads will be stopped after a configurable period of inactivity. <br>
You have to implement your worker extending the ThreadWorker class<br>
<h2 id="installation">Installation</h2>
'use strict'
const { ThreadWorker } = require('node-pool')
+function yourFunction (data) {
+ // this will be executed in the worker thread,
+ // the data will be received by using the execute method
+ return { ok: 1 }
+}
+
class MyWorker extends ThreadWorker {
constructor () {
- super((data) => {
- // this will be executed in the worker thread,
- // the data will be received by using the execute method
- return { ok: 1 }
- }, { maxInactiveTime: 1000 * 60})
+ super(yourFunction, { maxInactiveTime: 1000 * 60})
}
}
module.exports = new MyWorker()
// a fixed thread pool
const pool = new FixedThreadPool(15,
- './yourWorker.js')
+ './yourWorker.js',
+ { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
// or a dynamic thread pool
const pool = new DynamicThreadPool(10, 100,
- './yourWorker.js')
+ './yourWorker.js',
+ { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
pool.emitter.on('FullPool', () => console.log('Pool is full'))
// the execute method signature is the same for both implementations,
```
-<strong> See examples folder for more details.</strong>
+<strong> See examples folder for more details ( in particular if you want to use a pool for [multiple functions](./examples/multiFunctionExample.js) ).</strong>
<h2 id="nv">Node versions</h2>
- `maxInactiveTime` - Max time to wait tasks to work on ( in ms) , after this period the new worker threads will die.
<h2 id="cyp">Choose your pool</h2>
-Performance is one of the main target of these thread pool implementation, we want to have a strong focus on this.<br>
+Performance is one of the main target of these thread pool implementations, we want to have a strong focus on this.<br>
We already have a bench folder where you can find some comparisons.
To choose your pool consider that with a FixedThreadPool or a DynamicThreadPool ( in this case is important the min parameter passed to the constructor) your application memory footprint will increase . <br>
-Increasing the memory footprint your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory. <br>
+Increasing the memory footprint, your application will be ready to accept more CPU bound tasks, but during idle time your application will consume more memory. <br>
One good choose from my point of view is to profile your application using Fixed/Dynamic thread pool , and to see your application metrics when you increase/decrease the num of threads. <br>
-For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when requests, this is the advantage to use the DynamicThreadPool. <br>
+For example you could keep the memory footprint low choosing a DynamicThreadPool with 5 threads, and allow to create new threads until 50/100 when needed, this is the advantage to use the DynamicThreadPool. <br>
But in general , <strong>always profile your application </strong>
<h2 id="contribute">Contribute</h2>
const FixedThreadPool = require('../lib/fixed')
const DynamicThreadPool = require('../lib/dynamic')
const Pool = require('worker-threads-pool')
-const size = 40
-const externalPool = new Pool({ max: size })
+const size = 30
+const tasks = 1
+// pools
+const externalPool = new Pool({ max: size })
const fixedPool = new FixedThreadPool(size,
'./yourWorker.js', { maxTasks: 10000 })
-const dynamicPool = new DynamicThreadPool(size, size * 2, './yourWorker.js', { maxTasks: 10000 })
+const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 })
const workerData = { proof: 'ok' }
-let executions = 0
-let executions1 = 0
// wait some seconds before start, my pools need to load threads !!!
setTimeout(async () => {
test()
}, 3000)
-async function test () {
- // add tests
- suite.add('PioardiStaticPool', async function () {
- executions++
- await fixedPool.execute(workerData)
+// fixed pool proof
+async function fixedTest () {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ fixedPool.execute(workerData).then(res => {
+ executions++
+ if (executions === tasks) {
+ resolve('FINISH')
+ }
+ })
+ }
})
+}
- .add('ExternalPool', async function () {
- await new Promise((resolve, reject) => {
+async function dynamicTest () {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ dynamicPool.execute(workerData).then(res => {
+ executions++
+ if (executions === tasks) {
+ resolve('FINISH')
+ }
+ })
+ }
+ })
+}
+
+async function externalPoolTest () {
+ return new Promise((resolve, reject) => {
+ let executions = 0
+ for (let i = 0; i <= tasks; i++) {
+ new Promise((resolve, reject) => {
externalPool.acquire('./externalWorker.js', { workerData: workerData }, (err, worker) => {
if (err) {
return reject(err)
}
- executions1++
worker.on('error', reject)
worker.on('message', res => {
+ executions++
resolve(res)
})
})
+ }).then(res => {
+ if (tasks === executions) {
+ resolve('FINISH')
+ }
})
- })
+ }
+ })
+}
+
+async function test () {
+ // add tests
+ suite.add('PioardiStaticPool', async function () {
+ await fixedTest()
+ })
.add('PioardiDynamicPool', async function () {
- await dynamicPool.execute(workerData)
+ await dynamicTest()
+ })
+ .add('ExternalPool', async function () {
+ await externalPoolTest()
})
// add listeners
.on('cycle', function (event) {
console.log(String(event.target))
})
.on('complete', function () {
- console.log(executions)
- console.log(executions1)
this.filter('fastest').map('name')
console.log('Fastest is ' + this.filter('fastest').map('name'))
})
const DynamicThreadPool = require('../lib/dynamic')
const Pool = require('worker-threads-pool')
const tasks = 1000
-const size = 10
+const size = 16
// pools
const externalPool = new Pool({ max: size })
const fixedPool = new FixedThreadPool(size, './yourWorker.js', { maxTasks: 10000 })
-const dynamicPool = new DynamicThreadPool(size / 2, 50, './yourWorker.js', { maxTasks: 10000 })
+const dynamicPool = new DynamicThreadPool(size / 2, size * 3, './yourWorker.js', { maxTasks: 10000 })
// data
const workerData = { proof: 'ok' }
'use strict'
const { ThreadWorker } = require('../lib/workers')
+function yourFunction (data) {
+ for (let i = 0; i <= 1000; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+ // console.log('This is the main thread ' + isMainThread)
+ return { ok: 1 }
+}
+
class MyWorker extends ThreadWorker {
constructor () {
- super((data) => {
- for (let i = 0; i <= 1000; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
- // console.log('This is the main thread ' + isMainThread)
- return { ok: 1 }
- })
+ super(yourFunction)
}
}
module.exports = new MyWorker()
--- /dev/null
+const FixedThreadPool = require('../lib/fixed')
+const pool = new FixedThreadPool(15,
+ './multifunctionWorker.js',
+ { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
+pool.execute({ fname: 'fn0', input: 'hello' }).then(res => console.log(res))
+pool.execute({ fname: 'fn1', input: 'multifunction' }).then(res => console.log(res))
+
+setTimeout(pool.destroy.bind(pool), 3000)
--- /dev/null
+'use strict'
+const { ThreadWorker } = require('../lib/workers')
+
+function yourFunction (data) {
+ if (data.fname === 'fn0') {
+ console.log('Executing function 0')
+ return { data: '0 your input was' + data.input }
+ } else if (data.fname === 'fn1') {
+ console.log('Executing function 1')
+ return { data: '1 your input was' + data.input }
+ }
+}
+
+class MyWorker extends ThreadWorker {
+ constructor () {
+ super(yourFunction)
+ }
+}
+module.exports = new MyWorker()
+++ /dev/null
-const start = Date.now()
-const toBench = () => {
- const iterations = 10000
-
- for (let i = 0; i <= iterations; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
-}
-
-for (let i = 0; i < 1000; i++) {
- toBench()
-}
-console.log('Time take is ' + (Date.now() - start))
'use strict'
const { ThreadWorker } = require('../lib/workers')
+function yourFunction (data) {
+ for (let i = 0; i <= 1000; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+ // console.log('This is the main thread ' + isMainThread)
+ return { ok: 1 }
+}
+
class MyWorker extends ThreadWorker {
constructor () {
- super((data) => {
- for (let i = 0; i <= 10000; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
- // console.log('This is the main thread ' + isMainThread)
- return { ok: 1 }
- })
+ super(yourFunction)
}
}
-
module.exports = new MyWorker()
} = require('worker_threads')
function empty () {}
+const _void = {}
/**
* A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
* This pool will select the worker thread in a round robin fashion. <br>
this.tasks.set(worker, this.tasks.get(worker) + 1)
const id = ++this._id
const res = this._execute(worker, id)
- worker.postMessage({ data: data, _id: id })
+ worker.postMessage({ data: data || _void, _id: id })
return res
}
if (message._id === id) {
worker.port2.removeListener('message', listener)
this.tasks.set(worker, this.tasks.get(worker) - 1)
- resolve(message.data)
+ if (message.error) reject(message.error)
+ else resolve(message.data)
}
}
worker.port2.on('message', listener)
if (value && value.data && value._id) {
// here you will receive messages
// console.log('This is the main thread ' + isMainThread)
- const res = this.runInAsyncScope(fn, null, value.data)
- this.parent.postMessage({ data: res, _id: value._id })
- this.lastTask = Date.now()
+ this._run(fn, value)
} else if (value.parent) {
// save the port to communicate with the main thread
// this will be received once
this.parent.postMessage({ kill: 1 })
}
}
+
+ _run (fn, value) {
+ try {
+ const res = this.runInAsyncScope(fn, null, value.data)
+ this.parent.postMessage({ data: res, _id: value._id })
+ this.lastTask = Date.now()
+ } catch (e) {
+ this.parent.postMessage({ error: e, _id: value._id })
+ this.lastTask = Date.now()
+ }
+ }
}
module.exports.ThreadWorker = ThreadWorker
"main": "index.js",
"scripts": {
"build": "npm install",
- "test": "standard && nyc mocha --experimental-worker --exit --timeout 10000 tests/*test.js ",
+ "test": "standard && nyc mocha --experimental-worker --exit --timeout 15000 tests/*test.js ",
+ "debug-test": "mocha --inspect-brk --experimental-worker --exit tests/*test.js ",
"demontest": "nodemon --exec \"npm test\"",
- "coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls",
+ "coverage": "nyc report --reporter=text-lcov tests/*test.js | coveralls",
"standard-verbose": "npx standard --verbose",
"lint": "standard --fix"
},
const min = 1
const max = 3
const pool = new DynamicThreadPool(min, max,
- './tests/testWorker.js',
+ './tests/workers/testWorker.js',
{ errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
describe('Dynamic thread pool test suite ', () => {
})
it('Should work even without opts in input', async () => {
- const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js')
+ const pool1 = new DynamicThreadPool(1, 1, './tests/workers/testWorker.js')
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
const FixedThreadPool = require('../lib/fixed')
const numThreads = 10
const pool = new FixedThreadPool(numThreads,
- './tests/testWorker.js',
+ './tests/workers/testWorker.js',
{ errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js')
+const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js')
+const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
describe('Fixed thread pool test suite ', () => {
it('Choose worker round robin test', async () => {
expect(result).toBeFalsy()
})
+ it('Verify that is possible to invoke the execute method without input', async () => {
+ const result = await pool.execute()
+ expect(result).toBeDefined()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that is possible to have a worker that return undefined', async () => {
+ const result = await emptyPool.execute()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that data are sent to the worker correctly', async () => {
+ const data = { f: 10 }
+ const result = await echoPool.execute(data)
+ expect(result).toBeTruthy()
+ expect(result.f).toBe(data.f)
+ })
+
+ it('Verify that error handling is working properly', async () => {
+ const data = { f: 10 }
+ let inError
+ try {
+ await errorPool.execute(data)
+ } catch (e) {
+ inError = e
+ }
+ expect(inError).toBeTruthy()
+ expect(inError instanceof Error).toBeTruthy()
+ expect(inError.message).toBeTruthy()
+ })
+
it('Shutdown test', async () => {
let closedThreads = 0
pool.workers.forEach(w => {
})
it('Should work even without opts in input', async () => {
- const pool1 = new FixedThreadPool(1, './tests/testWorker.js')
+ const pool1 = new FixedThreadPool(1, './tests/workers/testWorker.js')
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
+++ /dev/null
-'use strict'
-const { ThreadWorker } = require('../lib/workers')
-const { isMainThread } = require('worker_threads')
-
-class MyWorker extends ThreadWorker {
- constructor () {
- super((data) => {
- for (let i = 0; i <= 50; i++) {
- const o = {
- a: i
- }
- JSON.stringify(o)
- }
- return isMainThread
- }, { maxInactiveTime: 500 })
- }
-}
-
-module.exports = new MyWorker()
--- /dev/null
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+function echo (data) {
+ return data
+}
+
+class MyWorker extends ThreadWorker {
+ constructor () {
+ super(echo, { maxInactiveTime: 500 })
+ }
+}
+
+module.exports = new MyWorker()
--- /dev/null
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+function test (data) {
+}
+
+class MyWorker extends ThreadWorker {
+ constructor () {
+ super(test, { maxInactiveTime: 500 })
+ }
+}
+
+module.exports = new MyWorker()
--- /dev/null
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+function error (data) {
+ throw new Error(data)
+}
+
+class MyWorker extends ThreadWorker {
+ constructor () {
+ super(error, { maxInactiveTime: 500 })
+ }
+}
+
+module.exports = new MyWorker()
--- /dev/null
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+const { isMainThread } = require('worker_threads')
+
+function test (data) {
+ for (let i = 0; i <= 50; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+ return isMainThread
+}
+
+class MyWorker extends ThreadWorker {
+ constructor () {
+ super(test, { maxInactiveTime: 500 })
+ }
+}
+
+module.exports = new MyWorker()