Working implementation with a very good benchmark based on num of threads
authoraardizio <alessandroardizio94@gmail.com>
Sat, 18 Jan 2020 03:15:09 +0000 (04:15 +0100)
committeraardizio <alessandroardizio94@gmail.com>
Sat, 18 Jan 2020 03:15:09 +0000 (04:15 +0100)
fixed.js
proof.js
worker.js [new file with mode: 0644]
yourWorker.js [new file with mode: 0644]

index 9b47484e7137b6752450e407b0863b2d34e92152..dea580454de1b747836ba47537d3a2cdb9351f2b 100644 (file)
--- a/fixed.js
+++ b/fixed.js
@@ -1,11 +1,13 @@
 'use strict'
 const {
-  Worker, isMainThread, MessageChannel
+  Worker, isMainThread, MessageChannel, SHARE_ENV
 } = require('worker_threads')
+const path = require('path')
 
 // FixedThreadPool , TrampolineThreadPool
 /**
- * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer
+ * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
+ * This pool will select the worker thread in a round robin fashion. <br>
  * @author Alessandro Pio Ardizio
  * @since 0.0.1
  */
@@ -15,19 +17,26 @@ class FixedThreadPool {
      * @param {Number} numThreads  Num of threads for this worker pool
      * @param {Object} an object with possible options for example maxConcurrency
      */
-  constructor (numThreads, task, opts) {
-    if (!isMainThread) {
-      throw new Error('Cannot start a thread pool from a worker thread !!!')
-    }
-    this.numThreads = numThreads
+  constructor (numThreads, filename, opts) {
+    if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
+    if (!filename) throw new Error('Please specify a file with a worker implementation')
+    this.numThreads = numThreads || 10
     this.workers = []
-    this.task = task
     this.nextWorker = 0
+    process.env.proof = (data) => console.log(data)
     for (let i = 1; i <= numThreads; i++) {
-      const worker = new Worker(__filename)
+      const worker = new Worker(path.resolve(filename), { env: SHARE_ENV })
+      worker.on('error', (e) => console.error(e))
+      worker.on('exit', () => console.log('EXITING'))
       this.workers.push(worker)
-      const { port1 } = new MessageChannel()
-      worker.emitter = port1
+      const { port1, port2 } = new MessageChannel()
+      // send the port to communicate with the main thread to the worker
+      /* port2.on('message' , (message) => {
+        console.log('Worker is sending a message : ' + message)
+      }) */
+      worker.postMessage({ parent: port1 }, [port1])
+      worker.port1 = port1
+      worker.port2 = port2
     }
   }
 
@@ -36,35 +45,32 @@ class FixedThreadPool {
    * @param {Function} task , a function to execute
    * @param {Any} the input for the task specified
    */
-  execute (task, data) {
-    // TODO select a worker
+  async execute (data) {
     // configure worker to handle message with the specified task
     const worker = this._chooseWorker()
     const id = generateID()
-    const res = this._execute(worker, task, id)
-    worker.emitter.emit(id, data)
+    data._id = id
+    const res = this._execute(worker, id)
+    worker.postMessage(data)
     return res
   }
 
-  _execute (worker, task, id) {
+  _execute (worker, id) {
     return new Promise((resolve, reject) => {
-      console.log('Executing a task on worker thread ' + worker.threadId)
-      worker.emitter.once(id, (data) => {
-        console.log('Receivd a message')
-        try {
-          resolve(task(data))
-        } catch (e) {
-          reject(e)
+      const listener =  (message) =>  {
+        if (message._id === id) {
+          console.log('Received a message from worker : ' + message.data)
+          worker.port2.removeListener('message' , listener)
+          resolve(message.data)
         }
-      })
+      }
+      worker.port2.on('message', listener)
     })
   }
+  
 
   _chooseWorker () {
-    console.log(this.workers.length -1)
-    console.log(this.nextWorker)
     if ((this.workers.length - 1) === this.nextWorker) {
-      console.log('we are here')
       this.nextWorker = 0
       return this.workers[this.nextWorker]
     } else {
@@ -80,7 +86,7 @@ class FixedThreadPool {
 const generateID = () => {
   return Math.random()
     .toString(36)
-    .substring(7)
+    .substring(2)
 }
 
 module.exports = FixedThreadPool
index b62e5ba5b9cc15af8cbf05dd1bc1f7a0697b26f9..afa3a28ddb2bccd391ba562a64e0284eae157c4f 100644 (file)
--- a/proof.js
+++ b/proof.js
@@ -1,16 +1,25 @@
 const FixedThreadPool = require('./fixed')
 let resolved = 0
+const pool = new FixedThreadPool(3, './yourWorker.js')
 
-const pool = new FixedThreadPool(10)
+async function proof () {
+  const o = {
+    a: 123
+  }
+  const res = await pool.execute(o)
+  // console.log('Here we are')
+  console.log('I am logging the result ' + res)
+}
 
-const start = Date.now()
-const iterations = 100000
+// proof()
 
+const start = Date.now()
+const iterations = 50000
 for (let i = 0; i <= iterations; i++) {
   const o = {
     a: i
   }
-  pool.execute(JSON.stringify, o).then(res => {
+  pool.execute(o).then(res => {
     console.log(res)
     resolved++
     if (resolved === iterations) {
diff --git a/worker.js b/worker.js
new file mode 100644 (file)
index 0000000..ac021b3
--- /dev/null
+++ b/worker.js
@@ -0,0 +1,32 @@
+'use strict'
+const {
+  isMainThread, parentPort
+} = require('worker_threads')
+
+/**
+ * An example worker that will be always alive, you just need to extend this class if you want a static pool.
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class ThreadWorker {
+  constructor (fn) {
+    if (!fn) throw new Error('Fn parameter is mandatory')
+    // keep the worker active
+    if (!isMainThread) {
+      this.interval =
+      setInterval(() => {
+      }, 10000)
+    }
+    parentPort.on('message', (value) => {
+      if (value.parent) {
+        // save the port to communicate with the main thread
+        this.parent = value.parent
+      } else if (value && value._id) {
+        // console.log('This is the main thread ' + isMainThread)
+        this.parent.postMessage({ data: fn(value), _id: value._id })
+      }
+    })
+  }
+}
+
+module.exports = ThreadWorker
diff --git a/yourWorker.js b/yourWorker.js
new file mode 100644 (file)
index 0000000..ffb72c2
--- /dev/null
@@ -0,0 +1,15 @@
+'use strict'
+const ThreadWorker = require('./worker')
+const { isMainThread } = require('worker_threads')
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super((data) => {
+      // console.log('This is the main thread ' + isMainThread)
+      // this.parent.postMessage(JSON.stringify(data))
+      return JSON.stringify(data)
+    })
+  }
+}
+
+module.exports = new MyWorker()