'use strict'
const {
- Worker, isMainThread, MessageChannel
+ Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
+const path = require('path')
// FixedThreadPool , TrampolineThreadPool
/**
- * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer
+ * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
+ * This pool will select the worker thread in a round robin fashion. <br>
* @author Alessandro Pio Ardizio
* @since 0.0.1
*/
* @param {Number} numThreads Num of threads for this worker pool
* @param {Object} an object with possible options for example maxConcurrency
*/
- constructor (numThreads, task, opts) {
- if (!isMainThread) {
- throw new Error('Cannot start a thread pool from a worker thread !!!')
- }
- this.numThreads = numThreads
+ constructor (numThreads, filename, opts) {
+ if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
+ if (!filename) throw new Error('Please specify a file with a worker implementation')
+ this.numThreads = numThreads || 10
this.workers = []
- this.task = task
this.nextWorker = 0
+ process.env.proof = (data) => console.log(data)
for (let i = 1; i <= numThreads; i++) {
- const worker = new Worker(__filename)
+ const worker = new Worker(path.resolve(filename), { env: SHARE_ENV })
+ worker.on('error', (e) => console.error(e))
+ worker.on('exit', () => console.log('EXITING'))
this.workers.push(worker)
- const { port1 } = new MessageChannel()
- worker.emitter = port1
+ const { port1, port2 } = new MessageChannel()
+ // send the port to communicate with the main thread to the worker
+ /* port2.on('message' , (message) => {
+ console.log('Worker is sending a message : ' + message)
+ }) */
+ worker.postMessage({ parent: port1 }, [port1])
+ worker.port1 = port1
+ worker.port2 = port2
}
}
* @param {Function} task , a function to execute
* @param {Any} the input for the task specified
*/
- execute (task, data) {
- // TODO select a worker
+ async execute (data) {
// configure worker to handle message with the specified task
const worker = this._chooseWorker()
const id = generateID()
- const res = this._execute(worker, task, id)
- worker.emitter.emit(id, data)
+ data._id = id
+ const res = this._execute(worker, id)
+ worker.postMessage(data)
return res
}
- _execute (worker, task, id) {
+ _execute (worker, id) {
return new Promise((resolve, reject) => {
- console.log('Executing a task on worker thread ' + worker.threadId)
- worker.emitter.once(id, (data) => {
- console.log('Receivd a message')
- try {
- resolve(task(data))
- } catch (e) {
- reject(e)
+ const listener = (message) => {
+ if (message._id === id) {
+ console.log('Received a message from worker : ' + message.data)
+ worker.port2.removeListener('message' , listener)
+ resolve(message.data)
}
- })
+ }
+ worker.port2.on('message', listener)
})
}
+
_chooseWorker () {
- console.log(this.workers.length -1)
- console.log(this.nextWorker)
if ((this.workers.length - 1) === this.nextWorker) {
- console.log('we are here')
this.nextWorker = 0
return this.workers[this.nextWorker]
} else {
const generateID = () => {
return Math.random()
.toString(36)
- .substring(7)
+ .substring(2)
}
module.exports = FixedThreadPool
const FixedThreadPool = require('./fixed')
let resolved = 0
+const pool = new FixedThreadPool(3, './yourWorker.js')
-const pool = new FixedThreadPool(10)
+async function proof () {
+ const o = {
+ a: 123
+ }
+ const res = await pool.execute(o)
+ // console.log('Here we are')
+ console.log('I am logging the result ' + res)
+}
-const start = Date.now()
-const iterations = 100000
+// proof()
+const start = Date.now()
+const iterations = 50000
for (let i = 0; i <= iterations; i++) {
const o = {
a: i
}
- pool.execute(JSON.stringify, o).then(res => {
+ pool.execute(o).then(res => {
console.log(res)
resolved++
if (resolved === iterations) {
--- /dev/null
+'use strict'
+const {
+ isMainThread, parentPort
+} = require('worker_threads')
+
+/**
+ * An example worker that will be always alive, you just need to extend this class if you want a static pool.
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class ThreadWorker {
+ constructor (fn) {
+ if (!fn) throw new Error('Fn parameter is mandatory')
+ // keep the worker active
+ if (!isMainThread) {
+ this.interval =
+ setInterval(() => {
+ }, 10000)
+ }
+ parentPort.on('message', (value) => {
+ if (value.parent) {
+ // save the port to communicate with the main thread
+ this.parent = value.parent
+ } else if (value && value._id) {
+ // console.log('This is the main thread ' + isMainThread)
+ this.parent.postMessage({ data: fn(value), _id: value._id })
+ }
+ })
+ }
+}
+
+module.exports = ThreadWorker