* @param {Number} numThreads Num of threads for this worker pool
* @param {Object} an object with possible options for example maxConcurrency
*/
- constructor (numThreads, opts) {
+ constructor (numThreads, task, opts) {
if (!isMainThread) {
throw new Error('Cannot start a thread pool from a worker thread !!!')
}
this.numThreads = numThreads
this.workers = []
+ this.task = task
for (let i = 1; i <= numThreads; i++) {
const worker = new Worker(__filename)
this.workers.push(worker)
- const { port1, port2 } = new MessageChannel()
- worker.receiverPort = port1
- worker.sendPort = port2
+ const { port1 } = new MessageChannel()
+ worker.emitter = port1
}
}
execute (task, data) {
// TODO select a worker
// configure worker to handle message with the specified task
- const res = this._execute(this.workers[0], task)
- this.workers[0].sendPort.postMessage(data)
+ const idx = chooseWorker(0, this.numThreads - 1)
+ const worker = this.workers[idx]
+ const id = generateID()
+ const res = this._execute(worker, task, id)
+ worker.emitter.emit(id, data)
return res
}
- _execute (worker, task) {
+ _execute (worker, task, id) {
return new Promise((resolve, reject) => {
- worker.receiverPort.on('message', (data) => {
+ console.log('Executing a task on worker thread ' + worker.threadId)
+ worker.emitter.once(id, (data) => {
+ console.log('Receivd a message')
try {
resolve(task(data))
} catch (e) {
}
}
+function chooseWorker (min, max) {
+ min = Math.ceil(min)
+ max = Math.floor(max)
+ return Math.floor(Math.random() * (max - min + 1)) + min
+}
+
+/**
+ * Return an id to be associated to a node.
+ */
+const generateID = () => {
+ return Math.random()
+ .toString(36)
+ .substring(7)
+}
+
module.exports = FixedThreadPool
const FixedThreadPool = require('./fixed')
+let resolved = 0
-const o = {
- a: 'asdfsadfafdgmnsdfmnbgsdfgbsdfmnbgsdfmnbgsmd,fbgsmndfbg'
+const pool = new FixedThreadPool(100)
+let start = Date.now()
+const iterations = 5000
+for (let i = 0; i <= iterations; i++) {
+ const o = {
+ a: i
+ }
+ pool.execute(JSON.stringify, o).then(res => {
+ console.log(res)
+ resolved++
+ if(resolved === iterations) {
+ console.log('Time take is ' + (Date.now() - start))
+ }
+ } )
}
-const pool = new FixedThreadPool(3)
-pool.execute(JSON.stringify, o).then(res => console.log(res))