Added an emitter to the dynamic pool , example splitted and improved
authoraardizio <alessandroardizio94@gmail.com>
Sun, 19 Jan 2020 15:45:31 +0000 (16:45 +0100)
committeraardizio <alessandroardizio94@gmail.com>
Sun, 19 Jan 2020 15:45:31 +0000 (16:45 +0100)
dynamicExample.js [new file with mode: 0644]
lib/dynamic.js
lib/fixed.js
lib/util.js
staticExample.js [moved from proof.js with 58% similarity]

diff --git a/dynamicExample.js b/dynamicExample.js
new file mode 100644 (file)
index 0000000..11032a9
--- /dev/null
@@ -0,0 +1,17 @@
+const DynamicThreadPool = require('./lib/dynamic')
+let resolved = 0
+let maxReached = 0
+const pool = new DynamicThreadPool(100, 200, './yourWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+pool.emitter.on('FullPool', () => maxReached++)
+
+const start = Date.now()
+const iterations = 1000
+for (let i = 0; i <= iterations; i++) {
+  pool.execute({}).then(res => {
+    resolved++
+    if (resolved === iterations) {
+      console.log('Time take is ' + (Date.now() - start))
+      console.log('The pool was full for ' + maxReached + ' times')
+    }
+  })
+}
index fa8a3a521c96b0b01954dcf303a7264308e541b0..cedb8c35b93c0c230f62024bbd9facd04c6e1b7a 100644 (file)
@@ -1,11 +1,13 @@
 'use strict'
 const FixedThreadPool = require('./fixed')
+const { randomWorker } = require('./util')
+const EventEmitter = require('events')
+class MyEmitter extends EventEmitter {}
 
 /**
  * A thread pool with a min/max number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
  * This thread pool will create new workers when the other ones are busy, until the max number of threads,
- * when the max number of threads is reached, an exception will be thrown.
- * This pool will select the worker thread in a round robin fashion. <br>
+ * when the max number of threads is reached, an event will be emitted , if you want to listen this event use the emitter method.
  * @author Alessandro Pio Ardizio
  * @since 0.0.1
  */
@@ -19,6 +21,16 @@ class DynamicThreadPool extends FixedThreadPool {
   constructor (min, max, filename, opts) {
     super(min, filename, opts)
     this.max = max
+    this.emitter = new MyEmitter()
+  }
+
+  /**
+   * Return an event emitter that will send some messages, for example
+   * a message will be sent when max number of threads is reached and all threads are busy
+   * in this case it will emit a message
+   */
+  emitter () {
+    return this.emitter
   }
 
   _chooseWorker () {
@@ -35,7 +47,8 @@ class DynamicThreadPool extends FixedThreadPool {
       return worker
     } else {
       if (this.workers.length === this.max) {
-        throw new Error('Max number of threads reached !!!')
+        this.emitter.emit('FullPool')
+        return randomWorker(this.tasks)
       }
       // all workers are busy create a new worker
       const worker = this._newWorker()
index 95fe686218c5758997cf1111e2f8148d08e32f6d..63e62bade18bf12d7779186516e30445e5e1eaaa 100644 (file)
@@ -17,7 +17,7 @@ class FixedThreadPool {
     *
     * @param {Number} numThreads  Num of threads for this worker pool
     * @param {string} a file path with implementation of @see ThreadWorker class
-    * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
+    * @param {Object} an object with possible options for example errorHandler, onlineHandler.
   */
   constructor (numThreads, filename, opts) {
     if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
@@ -84,8 +84,7 @@ class FixedThreadPool {
     const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
     worker.on('error', this.opts.errorHandler || empty)
     worker.on('online', this.opts.onlineHandler || empty)
-    // TODO remove the workers array , use only the map data structure
-    // handle properly when a thread exit
+    // TODO handle properly when a thread exit
     worker.on('exit', this.opts.exitHandler || empty)
     this.workers.push(worker)
     const { port1, port2 } = new MessageChannel()
@@ -94,7 +93,7 @@ class FixedThreadPool {
     worker.port2 = port2
     // we will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
-    worker.port2.setMaxListeners(this.opts.maxTasks)
+    worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
     // init tasks map
     this.tasks.set(worker, 0)
     return worker
index 70a50744d684ef562eeae9ca9a5902feb5c27f72..bac53ec46c75c78cbe9d7b8b31a60fd6d8d2bd64 100644 (file)
@@ -11,3 +11,8 @@ const uuid = require('uuid/v4')
 module.exports.generateID = () => {
   return uuid()
 }
+
+module.exports.randomWorker = (collection) => {
+  const keys = Array.from(collection.keys())
+  return keys[Math.floor(Math.random() * keys.length)]
+}
similarity index 58%
rename from proof.js
rename to staticExample.js
index d6af4033796352695e3da3bdcc74c7033c64d1d7..074f37ca84fcc34c09b33581cbec7c91e4feeeaf 100644 (file)
--- a/proof.js
@@ -1,11 +1,11 @@
 const FixedThreadPool = require('./lib/fixed')
-const DynamicThreadPool = require('./lib/dynamic')
 let resolved = 0
-// const pool = new FixedThreadPool(15, './yourWorker.js')
-const pool = new DynamicThreadPool(300, 1020, './yourWorker.js')
+const pool = new FixedThreadPool(15,
+  './yourWorker.js',
+  { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
 
 const start = Date.now()
-const iterations = 300
+const iterations = 1000
 for (let i = 0; i <= iterations; i++) {
   pool.execute({}).then(res => {
     resolved++