From 27006a3d7cb579b1cddd70e32ebaaab1efff9555 Mon Sep 17 00:00:00 2001 From: aardizio Date: Sat, 18 Jan 2020 04:15:09 +0100 Subject: [PATCH] Working implementation with a very good benchmark based on num of threads --- fixed.js | 62 ++++++++++++++++++++++++++++----------------------- proof.js | 17 ++++++++++---- worker.js | 32 ++++++++++++++++++++++++++ yourWorker.js | 15 +++++++++++++ 4 files changed, 94 insertions(+), 32 deletions(-) create mode 100644 worker.js create mode 100644 yourWorker.js diff --git a/fixed.js b/fixed.js index 9b47484e..dea58045 100644 --- a/fixed.js +++ b/fixed.js @@ -1,11 +1,13 @@ 'use strict' const { - Worker, isMainThread, MessageChannel + Worker, isMainThread, MessageChannel, SHARE_ENV } = require('worker_threads') +const path = require('path') // FixedThreadPool , TrampolineThreadPool /** - * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer + * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
+ * This pool will select the worker thread in a round robin fashion.
* @author Alessandro Pio Ardizio * @since 0.0.1 */ @@ -15,19 +17,26 @@ class FixedThreadPool { * @param {Number} numThreads Num of threads for this worker pool * @param {Object} an object with possible options for example maxConcurrency */ - constructor (numThreads, task, opts) { - if (!isMainThread) { - throw new Error('Cannot start a thread pool from a worker thread !!!') - } - this.numThreads = numThreads + constructor (numThreads, filename, opts) { + if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') + if (!filename) throw new Error('Please specify a file with a worker implementation') + this.numThreads = numThreads || 10 this.workers = [] - this.task = task this.nextWorker = 0 + process.env.proof = (data) => console.log(data) for (let i = 1; i <= numThreads; i++) { - const worker = new Worker(__filename) + const worker = new Worker(path.resolve(filename), { env: SHARE_ENV }) + worker.on('error', (e) => console.error(e)) + worker.on('exit', () => console.log('EXITING')) this.workers.push(worker) - const { port1 } = new MessageChannel() - worker.emitter = port1 + const { port1, port2 } = new MessageChannel() + // send the port to communicate with the main thread to the worker + /* port2.on('message' , (message) => { + console.log('Worker is sending a message : ' + message) + }) */ + worker.postMessage({ parent: port1 }, [port1]) + worker.port1 = port1 + worker.port2 = port2 } } @@ -36,35 +45,32 @@ class FixedThreadPool { * @param {Function} task , a function to execute * @param {Any} the input for the task specified */ - execute (task, data) { - // TODO select a worker + async execute (data) { // configure worker to handle message with the specified task const worker = this._chooseWorker() const id = generateID() - const res = this._execute(worker, task, id) - worker.emitter.emit(id, data) + data._id = id + const res = this._execute(worker, id) + worker.postMessage(data) return res } - _execute (worker, task, id) { + _execute (worker, id) { return new Promise((resolve, reject) => { - console.log('Executing a task on worker thread ' + worker.threadId) - worker.emitter.once(id, (data) => { - console.log('Receivd a message') - try { - resolve(task(data)) - } catch (e) { - reject(e) + const listener = (message) => { + if (message._id === id) { + console.log('Received a message from worker : ' + message.data) + worker.port2.removeListener('message' , listener) + resolve(message.data) } - }) + } + worker.port2.on('message', listener) }) } + _chooseWorker () { - console.log(this.workers.length -1) - console.log(this.nextWorker) if ((this.workers.length - 1) === this.nextWorker) { - console.log('we are here') this.nextWorker = 0 return this.workers[this.nextWorker] } else { @@ -80,7 +86,7 @@ class FixedThreadPool { const generateID = () => { return Math.random() .toString(36) - .substring(7) + .substring(2) } module.exports = FixedThreadPool diff --git a/proof.js b/proof.js index b62e5ba5..afa3a28d 100644 --- a/proof.js +++ b/proof.js @@ -1,16 +1,25 @@ const FixedThreadPool = require('./fixed') let resolved = 0 +const pool = new FixedThreadPool(3, './yourWorker.js') -const pool = new FixedThreadPool(10) +async function proof () { + const o = { + a: 123 + } + const res = await pool.execute(o) + // console.log('Here we are') + console.log('I am logging the result ' + res) +} -const start = Date.now() -const iterations = 100000 +// proof() +const start = Date.now() +const iterations = 50000 for (let i = 0; i <= iterations; i++) { const o = { a: i } - pool.execute(JSON.stringify, o).then(res => { + pool.execute(o).then(res => { console.log(res) resolved++ if (resolved === iterations) { diff --git a/worker.js b/worker.js new file mode 100644 index 00000000..ac021b35 --- /dev/null +++ b/worker.js @@ -0,0 +1,32 @@ +'use strict' +const { + isMainThread, parentPort +} = require('worker_threads') + +/** + * An example worker that will be always alive, you just need to extend this class if you want a static pool. + * @author Alessandro Pio Ardizio + * @since 0.0.1 + */ +class ThreadWorker { + constructor (fn) { + if (!fn) throw new Error('Fn parameter is mandatory') + // keep the worker active + if (!isMainThread) { + this.interval = + setInterval(() => { + }, 10000) + } + parentPort.on('message', (value) => { + if (value.parent) { + // save the port to communicate with the main thread + this.parent = value.parent + } else if (value && value._id) { + // console.log('This is the main thread ' + isMainThread) + this.parent.postMessage({ data: fn(value), _id: value._id }) + } + }) + } +} + +module.exports = ThreadWorker diff --git a/yourWorker.js b/yourWorker.js new file mode 100644 index 00000000..ffb72c29 --- /dev/null +++ b/yourWorker.js @@ -0,0 +1,15 @@ +'use strict' +const ThreadWorker = require('./worker') +const { isMainThread } = require('worker_threads') + +class MyWorker extends ThreadWorker { + constructor () { + super((data) => { + // console.log('This is the main thread ' + isMainThread) + // this.parent.postMessage(JSON.stringify(data)) + return JSON.stringify(data) + }) + } +} + +module.exports = new MyWorker() -- 2.34.1