A dynamic thread pool and a new worker implementation. Next step is to write some...
authoraardizio <alessandroardizio94@gmail.com>
Sat, 18 Jan 2020 16:04:21 +0000 (17:04 +0100)
committeraardizio <alessandroardizio94@gmail.com>
Sat, 18 Jan 2020 16:04:21 +0000 (17:04 +0100)
12 files changed:
fixed.js [deleted file]
index.js [new file with mode: 0644]
lib/dynamic.js [new file with mode: 0644]
lib/fixed.js [new file with mode: 0644]
lib/util.js [new file with mode: 0644]
lib/workers.js [new file with mode: 0644]
normal.js [new file with mode: 0644]
package-lock.json
package.json
proof.js
worker.js [deleted file]
yourWorker.js

diff --git a/fixed.js b/fixed.js
deleted file mode 100644 (file)
index dea5804..0000000
--- a/fixed.js
+++ /dev/null
@@ -1,92 +0,0 @@
-'use strict'
-const {
-  Worker, isMainThread, MessageChannel, SHARE_ENV
-} = require('worker_threads')
-const path = require('path')
-
-// FixedThreadPool , TrampolineThreadPool
-/**
- * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
- * This pool will select the worker thread in a round robin fashion. <br>
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class FixedThreadPool {
-  /**
-     *
-     * @param {Number} numThreads  Num of threads for this worker pool
-     * @param {Object} an object with possible options for example maxConcurrency
-     */
-  constructor (numThreads, filename, opts) {
-    if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
-    if (!filename) throw new Error('Please specify a file with a worker implementation')
-    this.numThreads = numThreads || 10
-    this.workers = []
-    this.nextWorker = 0
-    process.env.proof = (data) => console.log(data)
-    for (let i = 1; i <= numThreads; i++) {
-      const worker = new Worker(path.resolve(filename), { env: SHARE_ENV })
-      worker.on('error', (e) => console.error(e))
-      worker.on('exit', () => console.log('EXITING'))
-      this.workers.push(worker)
-      const { port1, port2 } = new MessageChannel()
-      // send the port to communicate with the main thread to the worker
-      /* port2.on('message' , (message) => {
-        console.log('Worker is sending a message : ' + message)
-      }) */
-      worker.postMessage({ parent: port1 }, [port1])
-      worker.port1 = port1
-      worker.port2 = port2
-    }
-  }
-
-  /**
-   *
-   * @param {Function} task , a function to execute
-   * @param {Any} the input for the task specified
-   */
-  async execute (data) {
-    // configure worker to handle message with the specified task
-    const worker = this._chooseWorker()
-    const id = generateID()
-    data._id = id
-    const res = this._execute(worker, id)
-    worker.postMessage(data)
-    return res
-  }
-
-  _execute (worker, id) {
-    return new Promise((resolve, reject) => {
-      const listener =  (message) =>  {
-        if (message._id === id) {
-          console.log('Received a message from worker : ' + message.data)
-          worker.port2.removeListener('message' , listener)
-          resolve(message.data)
-        }
-      }
-      worker.port2.on('message', listener)
-    })
-  }
-  
-
-  _chooseWorker () {
-    if ((this.workers.length - 1) === this.nextWorker) {
-      this.nextWorker = 0
-      return this.workers[this.nextWorker]
-    } else {
-      this.nextWorker++
-      return this.workers[this.nextWorker]
-    }
-  }
-}
-
-/**
- * Return an id to be associated to a node.
- */
-const generateID = () => {
-  return Math.random()
-    .toString(36)
-    .substring(2)
-}
-
-module.exports = FixedThreadPool
diff --git a/index.js b/index.js
new file mode 100644 (file)
index 0000000..c2d4c91
--- /dev/null
+++ b/index.js
@@ -0,0 +1 @@
+module.exports.FixedThreadPool = require('./lib/fixed')
diff --git a/lib/dynamic.js b/lib/dynamic.js
new file mode 100644 (file)
index 0000000..53ff28f
--- /dev/null
@@ -0,0 +1,54 @@
+'use strict'
+const FixedThreadPool = require('./fixed')
+
+/**
+ * A thread pool with a min/max number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
+ * This thread pool will create new workers when the other ones are busy, until the max number of threads,
+ * when the max number of threads is reached, an exception will be thrown.
+ * This pool will select the worker thread in a round robin fashion. <br>
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class DynamicThreadPool extends FixedThreadPool {
+  /**
+    *
+    * @param {Number} min  Min number of threads that will be always active
+    * @param {Number} max  Max number of threads that will be active
+    * @param {Object} an object with possible options for example maxConcurrency
+  */
+  constructor (min, max, filename, opts) {
+    super(min, filename, opts)
+    this.max = max
+  }
+
+  _chooseWorker () {
+    let worker
+    for (const entry of this.tasks) {
+      if (entry[1] === 0) {
+        worker = entry[0]
+        break
+      }
+    }
+
+    if (worker) {
+      // a worker is free, use it
+      return worker
+    } else {
+      if (this.workers.length === this.max) {
+        throw new Error('Max number of threads reached !!!')
+      }
+      // console.log('new thread is coming')
+      // all workers are busy create a new worker
+      const worker = this._newWorker()
+      worker.port2.on('message', (message) => {
+        if (message.kill) {
+          worker.postMessage({ kill: 1 })
+          worker.terminate()
+        }
+      })
+      return worker
+    }
+  }
+}
+
+module.exports = DynamicThreadPool
diff --git a/lib/fixed.js b/lib/fixed.js
new file mode 100644 (file)
index 0000000..442d58e
--- /dev/null
@@ -0,0 +1,95 @@
+'use strict'
+const {
+  Worker, isMainThread, MessageChannel, SHARE_ENV
+} = require('worker_threads')
+const path = require('path')
+const { generateID } = require('./util')
+
+/**
+ * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
+ * This pool will select the worker thread in a round robin fashion. <br>
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class FixedThreadPool {
+  /**
+    *
+    * @param {Number} numThreads  Num of threads for this worker pool
+    * @param {Object} an object with possible options for example maxConcurrency
+  */
+  constructor (numThreads, filename, opts) {
+    if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
+    if (!filename) throw new Error('Please specify a file with a worker implementation')
+    this.numThreads = numThreads
+    this.workers = []
+    this.nextWorker = 0
+    this.opts = opts || { maxTasks: 1000 }
+    this.filename = filename
+    // threadId as key and an integer value
+    this.tasks = new Map()
+    for (let i = 1; i <= numThreads; i++) {
+      this._newWorker()
+    }
+  }
+
+  /**
+   * Execute the task specified into the constructor with the data parameter.
+   * @param {Any} the input for the task specified
+   * @returns {Promise} that is resolved when the task is done
+   */
+  async execute (data) {
+    // configure worker to handle message with the specified task
+    const worker = this._chooseWorker()
+    this.tasks.set(worker, this.tasks.get(worker) + 1)
+    // console.log('Num of pending tasks ', this.tasks.get(worker))
+    const id = generateID()
+    data._id = id
+    // console.log('Worker choosed is ' + worker.threadId)
+    const res = this._execute(worker, id)
+    worker.postMessage(data)
+    return res
+  }
+
+  _execute (worker, id) {
+    return new Promise((resolve, reject) => {
+      const listener = (message) => {
+        if (message._id === id) {
+          worker.port2.removeListener('message', listener)
+          // console.log(worker.port2.listenerCount('message'))
+          this.tasks.set(worker, this.tasks.get(worker) - 1)
+          resolve(message.data)
+        }
+      }
+      worker.port2.on('message', listener)
+    })
+  }
+
+  _chooseWorker () {
+    if ((this.workers.length - 1) === this.nextWorker) {
+      this.nextWorker = 0
+      return this.workers[this.nextWorker]
+    } else {
+      this.nextWorker++
+      return this.workers[this.nextWorker]
+    }
+  }
+
+  _newWorker () {
+    const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+    worker.on('error', (e) => console.error(e))
+    worker.on('exit', () => console.log('EXITING'))
+    this.workers.push(worker)
+    const { port1, port2 } = new MessageChannel()
+    worker.postMessage({ parent: port1 }, [port1])
+    worker.port1 = port1
+    worker.port2 = port2
+    // we will attach a listener for every task,
+    // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
+    worker.port2.setMaxListeners(this.opts.maxTasks)
+    // init tasks map
+    this.tasks.set(worker, 0)
+    return worker
+  }
+}
+
+module.exports = FixedThreadPool
diff --git a/lib/util.js b/lib/util.js
new file mode 100644 (file)
index 0000000..70a5074
--- /dev/null
@@ -0,0 +1,13 @@
+/**
+ * Contains utility functions
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+
+const uuid = require('uuid/v4')
+/**
+ * Return an id to be associated to a node.
+ */
+module.exports.generateID = () => {
+  return uuid()
+}
diff --git a/lib/workers.js b/lib/workers.js
new file mode 100644 (file)
index 0000000..e6d43c6
--- /dev/null
@@ -0,0 +1,74 @@
+'use strict'
+const {
+  isMainThread, parentPort
+} = require('worker_threads')
+const maxInactiveTime = 1000 * 60
+
+/**
+ * An example worker that will be always alive, you just need to extend this class if you want a static pool.
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class ThreadWorker {
+  constructor (fn) {
+    if (!fn) throw new Error('Fn parameter is mandatory')
+    // keep the worker active
+    if (!isMainThread) {
+      this.interval =
+      setInterval(() => {
+      }, 10000)
+    }
+    parentPort.on('message', (value) => {
+      if (value.parent) {
+        // save the port to communicate with the main thread
+        this.parent = value.parent
+      } else if (value && value._id) {
+        // console.log('This is the main thread ' + isMainThread)
+        this.parent.postMessage({ data: fn(value), _id: value._id })
+      }
+    })
+  }
+}
+
+/**
+ * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
+ * When this worker is inactive for more than 1 minute, it will send this info to the main thread,<br>
+ * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class DynamicWorker {
+  constructor (fn) {
+    this.lastTask = Date.now()
+    if (!fn) throw new Error('Fn parameter is mandatory')
+    // keep the worker active
+    if (!isMainThread) {
+      this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime)
+      this._checkAlive.bind(this)()
+    }
+    parentPort.on('message', (value) => {
+      if (value && value._id) {
+        // here you will receive messages
+        // console.log('This is the main thread ' + isMainThread)
+        this.parent.postMessage({ data: fn(value), _id: value._id })
+        this.lastTask = Date.now()
+      } else if (value.parent) {
+        // save the port to communicate with the main thread
+        // this will be received once
+        this.parent = value.parent
+      } else if (value.kill) {
+        // here is time to kill this thread, just clearing the interval
+        clearInterval(this.interval)
+      }
+    })
+  }
+
+  _checkAlive () {
+    if ((Date.now() - this.lastTask) > maxInactiveTime) {
+      this.parent.postMessage({ kill: 1 })
+    }
+  }
+}
+
+module.exports.ThreadWorker = ThreadWorker
+module.exports.DynamicWorker = DynamicWorker
diff --git a/normal.js b/normal.js
new file mode 100644 (file)
index 0000000..68ac82c
--- /dev/null
+++ b/normal.js
@@ -0,0 +1,16 @@
+const start = Date.now()
+const toBench = () => {
+  const iterations = 10000
+
+  for (let i = 0; i <= iterations; i++) {
+    const o = {
+      a: i
+    }
+    JSON.stringify(o)
+  }
+}
+
+for (let i = 0; i < 1000; i++) {
+  toBench()
+}
+console.log('Time take is ' + (Date.now() - start))
index e248e15d11796f329770c6391cbdf1029ee718c3..51c0ff295458057a373fdbcdde64299f0264d1bd 100644 (file)
         "punycode": "^2.1.0"
       }
     },
+    "uuid": {
+      "version": "3.4.0",
+      "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz",
+      "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A=="
+    },
     "v8-compile-cache": {
       "version": "2.1.0",
       "resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.1.0.tgz",
index 56f5401a4de76a5d23266c3e835fb4ce157e22ee..c22c9b754325781dd9cd9cafe5e62529b6069474 100644 (file)
@@ -33,5 +33,8 @@
   "homepage": "https://github.com/pioardi/node-pool#readme",
   "devDependencies": {
     "standard": "^14.3.1"
+  },
+  "dependencies": {
+    "uuid": "^3.4.0"
   }
 }
index afa3a28ddb2bccd391ba562a64e0284eae157c4f..4ac63d0544ec863b78c0c0af5b14c516def7747c 100644 (file)
--- a/proof.js
+++ b/proof.js
@@ -1,26 +1,14 @@
-const FixedThreadPool = require('./fixed')
+const FixedThreadPool = require('./lib/fixed')
+const DynamicThreadPool = require('./lib/dynamic')
 let resolved = 0
-const pool = new FixedThreadPool(3, './yourWorker.js')
-
-async function proof () {
-  const o = {
-    a: 123
-  }
-  const res = await pool.execute(o)
-  // console.log('Here we are')
-  console.log('I am logging the result ' + res)
-}
-
-// proof()
+// const pool = new FixedThreadPool(15, './yourWorker.js')
+const pool = new DynamicThreadPool(15, 1020, './yourWorker.js')
 
 const start = Date.now()
-const iterations = 50000
+const iterations = 1000
 for (let i = 0; i <= iterations; i++) {
-  const o = {
-    a: i
-  }
-  pool.execute(o).then(res => {
-    console.log(res)
+  pool.execute({}).then(res => {
+    // console.log(res)
     resolved++
     if (resolved === iterations) {
       console.log('Time take is ' + (Date.now() - start))
diff --git a/worker.js b/worker.js
deleted file mode 100644 (file)
index ac021b3..0000000
--- a/worker.js
+++ /dev/null
@@ -1,32 +0,0 @@
-'use strict'
-const {
-  isMainThread, parentPort
-} = require('worker_threads')
-
-/**
- * An example worker that will be always alive, you just need to extend this class if you want a static pool.
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class ThreadWorker {
-  constructor (fn) {
-    if (!fn) throw new Error('Fn parameter is mandatory')
-    // keep the worker active
-    if (!isMainThread) {
-      this.interval =
-      setInterval(() => {
-      }, 10000)
-    }
-    parentPort.on('message', (value) => {
-      if (value.parent) {
-        // save the port to communicate with the main thread
-        this.parent = value.parent
-      } else if (value && value._id) {
-        // console.log('This is the main thread ' + isMainThread)
-        this.parent.postMessage({ data: fn(value), _id: value._id })
-      }
-    })
-  }
-}
-
-module.exports = ThreadWorker
index ffb72c2969a0e6845801efa003b3e8c2dfc4fbce..a74c6341470c2f1c4c47a2432bf2dc83c2503360 100644 (file)
@@ -1,13 +1,17 @@
 'use strict'
-const ThreadWorker = require('./worker')
-const { isMainThread } = require('worker_threads')
+const { ThreadWorker, DynamicWorker } = require('./lib/workers')
 
-class MyWorker extends ThreadWorker {
+class MyWorker extends DynamicWorker {
   constructor () {
     super((data) => {
+      for (let i = 0; i <= 10000; i++) {
+        const o = {
+          a: i
+        }
+        JSON.stringify(o)
+      }
       // console.log('This is the main thread ' + isMainThread)
-      // this.parent.postMessage(JSON.stringify(data))
-      return JSON.stringify(data)
+      return data
     })
   }
 }