From a32e02baa991ae01b5d677e3fd34821965daab1e Mon Sep 17 00:00:00 2001 From: aardizio Date: Sat, 18 Jan 2020 17:04:21 +0100 Subject: [PATCH] A dynamic thread pool and a new worker implementation. Next step is to write some unit tests for fixed thread pool --- fixed.js | 92 --------------------------------------------- index.js | 1 + lib/dynamic.js | 54 +++++++++++++++++++++++++++ lib/fixed.js | 95 +++++++++++++++++++++++++++++++++++++++++++++++ lib/util.js | 13 +++++++ lib/workers.js | 74 ++++++++++++++++++++++++++++++++++++ normal.js | 16 ++++++++ package-lock.json | 5 +++ package.json | 3 ++ proof.js | 26 ++++--------- worker.js | 32 ---------------- yourWorker.js | 14 ++++--- 12 files changed, 277 insertions(+), 148 deletions(-) delete mode 100644 fixed.js create mode 100644 index.js create mode 100644 lib/dynamic.js create mode 100644 lib/fixed.js create mode 100644 lib/util.js create mode 100644 lib/workers.js create mode 100644 normal.js delete mode 100644 worker.js diff --git a/fixed.js b/fixed.js deleted file mode 100644 index dea58045..00000000 --- a/fixed.js +++ /dev/null @@ -1,92 +0,0 @@ -'use strict' -const { - Worker, isMainThread, MessageChannel, SHARE_ENV -} = require('worker_threads') -const path = require('path') - -// FixedThreadPool , TrampolineThreadPool -/** - * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
- * This pool will select the worker thread in a round robin fashion.
- * @author Alessandro Pio Ardizio - * @since 0.0.1 - */ -class FixedThreadPool { - /** - * - * @param {Number} numThreads Num of threads for this worker pool - * @param {Object} an object with possible options for example maxConcurrency - */ - constructor (numThreads, filename, opts) { - if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') - if (!filename) throw new Error('Please specify a file with a worker implementation') - this.numThreads = numThreads || 10 - this.workers = [] - this.nextWorker = 0 - process.env.proof = (data) => console.log(data) - for (let i = 1; i <= numThreads; i++) { - const worker = new Worker(path.resolve(filename), { env: SHARE_ENV }) - worker.on('error', (e) => console.error(e)) - worker.on('exit', () => console.log('EXITING')) - this.workers.push(worker) - const { port1, port2 } = new MessageChannel() - // send the port to communicate with the main thread to the worker - /* port2.on('message' , (message) => { - console.log('Worker is sending a message : ' + message) - }) */ - worker.postMessage({ parent: port1 }, [port1]) - worker.port1 = port1 - worker.port2 = port2 - } - } - - /** - * - * @param {Function} task , a function to execute - * @param {Any} the input for the task specified - */ - async execute (data) { - // configure worker to handle message with the specified task - const worker = this._chooseWorker() - const id = generateID() - data._id = id - const res = this._execute(worker, id) - worker.postMessage(data) - return res - } - - _execute (worker, id) { - return new Promise((resolve, reject) => { - const listener = (message) => { - if (message._id === id) { - console.log('Received a message from worker : ' + message.data) - worker.port2.removeListener('message' , listener) - resolve(message.data) - } - } - worker.port2.on('message', listener) - }) - } - - - _chooseWorker () { - if ((this.workers.length - 1) === this.nextWorker) { - this.nextWorker = 0 - return this.workers[this.nextWorker] - } else { - this.nextWorker++ - return this.workers[this.nextWorker] - } - } -} - -/** - * Return an id to be associated to a node. - */ -const generateID = () => { - return Math.random() - .toString(36) - .substring(2) -} - -module.exports = FixedThreadPool diff --git a/index.js b/index.js new file mode 100644 index 00000000..c2d4c91e --- /dev/null +++ b/index.js @@ -0,0 +1 @@ +module.exports.FixedThreadPool = require('./lib/fixed') diff --git a/lib/dynamic.js b/lib/dynamic.js new file mode 100644 index 00000000..53ff28fa --- /dev/null +++ b/lib/dynamic.js @@ -0,0 +1,54 @@ +'use strict' +const FixedThreadPool = require('./fixed') + +/** + * A thread pool with a min/max number of threads , is possible to execute tasks in sync or async mode as you prefer.
+ * This thread pool will create new workers when the other ones are busy, until the max number of threads, + * when the max number of threads is reached, an exception will be thrown. + * This pool will select the worker thread in a round robin fashion.
+ * @author Alessandro Pio Ardizio + * @since 0.0.1 + */ +class DynamicThreadPool extends FixedThreadPool { + /** + * + * @param {Number} min Min number of threads that will be always active + * @param {Number} max Max number of threads that will be active + * @param {Object} an object with possible options for example maxConcurrency + */ + constructor (min, max, filename, opts) { + super(min, filename, opts) + this.max = max + } + + _chooseWorker () { + let worker + for (const entry of this.tasks) { + if (entry[1] === 0) { + worker = entry[0] + break + } + } + + if (worker) { + // a worker is free, use it + return worker + } else { + if (this.workers.length === this.max) { + throw new Error('Max number of threads reached !!!') + } + // console.log('new thread is coming') + // all workers are busy create a new worker + const worker = this._newWorker() + worker.port2.on('message', (message) => { + if (message.kill) { + worker.postMessage({ kill: 1 }) + worker.terminate() + } + }) + return worker + } + } +} + +module.exports = DynamicThreadPool diff --git a/lib/fixed.js b/lib/fixed.js new file mode 100644 index 00000000..442d58e4 --- /dev/null +++ b/lib/fixed.js @@ -0,0 +1,95 @@ +'use strict' +const { + Worker, isMainThread, MessageChannel, SHARE_ENV +} = require('worker_threads') +const path = require('path') +const { generateID } = require('./util') + +/** + * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
+ * This pool will select the worker thread in a round robin fashion.
+ * @author Alessandro Pio Ardizio + * @since 0.0.1 + */ +class FixedThreadPool { + /** + * + * @param {Number} numThreads Num of threads for this worker pool + * @param {Object} an object with possible options for example maxConcurrency + */ + constructor (numThreads, filename, opts) { + if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') + if (!filename) throw new Error('Please specify a file with a worker implementation') + this.numThreads = numThreads + this.workers = [] + this.nextWorker = 0 + this.opts = opts || { maxTasks: 1000 } + this.filename = filename + // threadId as key and an integer value + this.tasks = new Map() + for (let i = 1; i <= numThreads; i++) { + this._newWorker() + } + } + + /** + * Execute the task specified into the constructor with the data parameter. + * @param {Any} the input for the task specified + * @returns {Promise} that is resolved when the task is done + */ + async execute (data) { + // configure worker to handle message with the specified task + const worker = this._chooseWorker() + this.tasks.set(worker, this.tasks.get(worker) + 1) + // console.log('Num of pending tasks ', this.tasks.get(worker)) + const id = generateID() + data._id = id + // console.log('Worker choosed is ' + worker.threadId) + const res = this._execute(worker, id) + worker.postMessage(data) + return res + } + + _execute (worker, id) { + return new Promise((resolve, reject) => { + const listener = (message) => { + if (message._id === id) { + worker.port2.removeListener('message', listener) + // console.log(worker.port2.listenerCount('message')) + this.tasks.set(worker, this.tasks.get(worker) - 1) + resolve(message.data) + } + } + worker.port2.on('message', listener) + }) + } + + _chooseWorker () { + if ((this.workers.length - 1) === this.nextWorker) { + this.nextWorker = 0 + return this.workers[this.nextWorker] + } else { + this.nextWorker++ + return this.workers[this.nextWorker] + } + } + + _newWorker () { + const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV }) + worker.on('error', (e) => console.error(e)) + worker.on('exit', () => console.log('EXITING')) + this.workers.push(worker) + const { port1, port2 } = new MessageChannel() + worker.postMessage({ parent: port1 }, [port1]) + worker.port1 = port1 + worker.port2 = port2 + // we will attach a listener for every task, + // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size + worker.port2.setMaxListeners(this.opts.maxTasks) + // init tasks map + this.tasks.set(worker, 0) + return worker + } +} + +module.exports = FixedThreadPool diff --git a/lib/util.js b/lib/util.js new file mode 100644 index 00000000..70a50744 --- /dev/null +++ b/lib/util.js @@ -0,0 +1,13 @@ +/** + * Contains utility functions + * @author Alessandro Pio Ardizio + * @since 0.0.1 + */ + +const uuid = require('uuid/v4') +/** + * Return an id to be associated to a node. + */ +module.exports.generateID = () => { + return uuid() +} diff --git a/lib/workers.js b/lib/workers.js new file mode 100644 index 00000000..e6d43c69 --- /dev/null +++ b/lib/workers.js @@ -0,0 +1,74 @@ +'use strict' +const { + isMainThread, parentPort +} = require('worker_threads') +const maxInactiveTime = 1000 * 60 + +/** + * An example worker that will be always alive, you just need to extend this class if you want a static pool. + * @author Alessandro Pio Ardizio + * @since 0.0.1 + */ +class ThreadWorker { + constructor (fn) { + if (!fn) throw new Error('Fn parameter is mandatory') + // keep the worker active + if (!isMainThread) { + this.interval = + setInterval(() => { + }, 10000) + } + parentPort.on('message', (value) => { + if (value.parent) { + // save the port to communicate with the main thread + this.parent = value.parent + } else if (value && value._id) { + // console.log('This is the main thread ' + isMainThread) + this.parent.postMessage({ data: fn(value), _id: value._id }) + } + }) + } +} + +/** + * An example worker that will be always alive, you just need to extend this class if you want a static pool.
+ * When this worker is inactive for more than 1 minute, it will send this info to the main thread,
+ * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed + * @author Alessandro Pio Ardizio + * @since 0.0.1 + */ +class DynamicWorker { + constructor (fn) { + this.lastTask = Date.now() + if (!fn) throw new Error('Fn parameter is mandatory') + // keep the worker active + if (!isMainThread) { + this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime) + this._checkAlive.bind(this)() + } + parentPort.on('message', (value) => { + if (value && value._id) { + // here you will receive messages + // console.log('This is the main thread ' + isMainThread) + this.parent.postMessage({ data: fn(value), _id: value._id }) + this.lastTask = Date.now() + } else if (value.parent) { + // save the port to communicate with the main thread + // this will be received once + this.parent = value.parent + } else if (value.kill) { + // here is time to kill this thread, just clearing the interval + clearInterval(this.interval) + } + }) + } + + _checkAlive () { + if ((Date.now() - this.lastTask) > maxInactiveTime) { + this.parent.postMessage({ kill: 1 }) + } + } +} + +module.exports.ThreadWorker = ThreadWorker +module.exports.DynamicWorker = DynamicWorker diff --git a/normal.js b/normal.js new file mode 100644 index 00000000..68ac82c4 --- /dev/null +++ b/normal.js @@ -0,0 +1,16 @@ +const start = Date.now() +const toBench = () => { + const iterations = 10000 + + for (let i = 0; i <= iterations; i++) { + const o = { + a: i + } + JSON.stringify(o) + } +} + +for (let i = 0; i < 1000; i++) { + toBench() +} +console.log('Time take is ' + (Date.now() - start)) diff --git a/package-lock.json b/package-lock.json index e248e15d..51c0ff29 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1786,6 +1786,11 @@ "punycode": "^2.1.0" } }, + "uuid": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz", + "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A==" + }, "v8-compile-cache": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.1.0.tgz", diff --git a/package.json b/package.json index 56f5401a..c22c9b75 100644 --- a/package.json +++ b/package.json @@ -33,5 +33,8 @@ "homepage": "https://github.com/pioardi/node-pool#readme", "devDependencies": { "standard": "^14.3.1" + }, + "dependencies": { + "uuid": "^3.4.0" } } diff --git a/proof.js b/proof.js index afa3a28d..4ac63d05 100644 --- a/proof.js +++ b/proof.js @@ -1,26 +1,14 @@ -const FixedThreadPool = require('./fixed') +const FixedThreadPool = require('./lib/fixed') +const DynamicThreadPool = require('./lib/dynamic') let resolved = 0 -const pool = new FixedThreadPool(3, './yourWorker.js') - -async function proof () { - const o = { - a: 123 - } - const res = await pool.execute(o) - // console.log('Here we are') - console.log('I am logging the result ' + res) -} - -// proof() +// const pool = new FixedThreadPool(15, './yourWorker.js') +const pool = new DynamicThreadPool(15, 1020, './yourWorker.js') const start = Date.now() -const iterations = 50000 +const iterations = 1000 for (let i = 0; i <= iterations; i++) { - const o = { - a: i - } - pool.execute(o).then(res => { - console.log(res) + pool.execute({}).then(res => { + // console.log(res) resolved++ if (resolved === iterations) { console.log('Time take is ' + (Date.now() - start)) diff --git a/worker.js b/worker.js deleted file mode 100644 index ac021b35..00000000 --- a/worker.js +++ /dev/null @@ -1,32 +0,0 @@ -'use strict' -const { - isMainThread, parentPort -} = require('worker_threads') - -/** - * An example worker that will be always alive, you just need to extend this class if you want a static pool. - * @author Alessandro Pio Ardizio - * @since 0.0.1 - */ -class ThreadWorker { - constructor (fn) { - if (!fn) throw new Error('Fn parameter is mandatory') - // keep the worker active - if (!isMainThread) { - this.interval = - setInterval(() => { - }, 10000) - } - parentPort.on('message', (value) => { - if (value.parent) { - // save the port to communicate with the main thread - this.parent = value.parent - } else if (value && value._id) { - // console.log('This is the main thread ' + isMainThread) - this.parent.postMessage({ data: fn(value), _id: value._id }) - } - }) - } -} - -module.exports = ThreadWorker diff --git a/yourWorker.js b/yourWorker.js index ffb72c29..a74c6341 100644 --- a/yourWorker.js +++ b/yourWorker.js @@ -1,13 +1,17 @@ 'use strict' -const ThreadWorker = require('./worker') -const { isMainThread } = require('worker_threads') +const { ThreadWorker, DynamicWorker } = require('./lib/workers') -class MyWorker extends ThreadWorker { +class MyWorker extends DynamicWorker { constructor () { super((data) => { + for (let i = 0; i <= 10000; i++) { + const o = { + a: i + } + JSON.stringify(o) + } // console.log('This is the main thread ' + isMainThread) - // this.parent.postMessage(JSON.stringify(data)) - return JSON.stringify(data) + return data }) } } -- 2.34.1