Improvements and unit tests
authoraardizio <alessandroardizio94@gmail.com>
Sun, 19 Jan 2020 20:40:18 +0000 (21:40 +0100)
committeraardizio <alessandroardizio94@gmail.com>
Sun, 19 Jan 2020 20:40:18 +0000 (21:40 +0100)
lib/dynamic.js
lib/fixed.js
lib/workers.js
package.json
tests/dynamic.test.js [new file with mode: 0644]
tests/fixed.test.js [new file with mode: 0644]
tests/testWorker.js [new file with mode: 0644]
tests/util.test.js
yourWorker.js

index cedb8c35b93c0c230f62024bbd9facd04c6e1b7a..e5a9c690b95a27bf161519cd7a6840ef01242bcf 100644 (file)
@@ -24,15 +24,6 @@ class DynamicThreadPool extends FixedThreadPool {
     this.emitter = new MyEmitter()
   }
 
-  /**
-   * Return an event emitter that will send some messages, for example
-   * a message will be sent when max number of threads is reached and all threads are busy
-   * in this case it will emit a message
-   */
-  emitter () {
-    return this.emitter
-  }
-
   _chooseWorker () {
     let worker
     for (const entry of this.tasks) {
index 63e62bade18bf12d7779186516e30445e5e1eaaa..833ae7b6f1f048c8ff0a92e7eb42a1bdacd36b78 100644 (file)
@@ -2,7 +2,6 @@
 const {
   Worker, isMainThread, MessageChannel, SHARE_ENV
 } = require('worker_threads')
-const path = require('path')
 const { generateID } = require('./util')
 
 function empty () {}
@@ -16,17 +15,17 @@ class FixedThreadPool {
   /**
     *
     * @param {Number} numThreads  Num of threads for this worker pool
-    * @param {string} a file path with implementation of @see ThreadWorker class
+    * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
     * @param {Object} an object with possible options for example errorHandler, onlineHandler.
   */
-  constructor (numThreads, filename, opts) {
+  constructor (numThreads, filePath, opts) {
     if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
-    if (!filename) throw new Error('Please specify a file with a worker implementation')
+    if (!filePath) throw new Error('Please specify a file with a worker implementation')
     this.numThreads = numThreads
     this.workers = []
     this.nextWorker = 0
     this.opts = opts || { maxTasks: 1000 }
-    this.filename = filename
+    this.filePath = filePath
     // threadId as key and an integer value
     this.tasks = new Map()
     for (let i = 1; i <= numThreads; i++) {
@@ -38,7 +37,6 @@ class FixedThreadPool {
     for (const worker of this.workers) {
       worker.terminate()
     }
-    this.emitDestroy()
   }
 
   /**
@@ -51,9 +49,8 @@ class FixedThreadPool {
     const worker = this._chooseWorker()
     this.tasks.set(worker, this.tasks.get(worker) + 1)
     const id = generateID()
-    data._id = id
     const res = this._execute(worker, id)
-    worker.postMessage(data)
+    worker.postMessage({ data: data, _id: id })
     return res
   }
 
@@ -81,7 +78,7 @@ class FixedThreadPool {
   }
 
   _newWorker () {
-    const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+    const worker = new Worker(this.filePath, { env: SHARE_ENV })
     worker.on('error', this.opts.errorHandler || empty)
     worker.on('online', this.opts.onlineHandler || empty)
     // TODO handle properly when a thread exit
index 8641ed9e871deafce80c0adafd5b9531443ec209..ee03778d47d603711ee16e6b994e5955a49855d7 100644 (file)
@@ -3,7 +3,6 @@ const {
   isMainThread, parentPort
 } = require('worker_threads')
 const { AsyncResource } = require('async_hooks')
-const maxInactiveTime = 1000 * 60
 
 /**
  * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
@@ -13,20 +12,22 @@ const maxInactiveTime = 1000 * 60
  * @since 0.0.1
  */
 class ThreadWorker extends AsyncResource {
-  constructor (fn) {
+  constructor (fn, opts) {
     super('worker-thread-pool:pioardi')
+    this.opts = opts || {}
+    this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60)
     this.lastTask = Date.now()
     if (!fn) throw new Error('Fn parameter is mandatory')
     // keep the worker active
     if (!isMainThread) {
-      this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime)
+      this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2)
       this._checkAlive.bind(this)()
     }
     parentPort.on('message', (value) => {
-      if (value && value._id) {
+      if (value && value.data && value._id) {
         // here you will receive messages
         // console.log('This is the main thread ' + isMainThread)
-        const res = this.runInAsyncScope(fn, null, value)
+        const res = this.runInAsyncScope(fn, null, value.data)
         this.parent.postMessage({ data: res, _id: value._id })
         this.lastTask = Date.now()
       } else if (value.parent) {
@@ -41,7 +42,7 @@ class ThreadWorker extends AsyncResource {
   }
 
   _checkAlive () {
-    if ((Date.now() - this.lastTask) > maxInactiveTime) {
+    if ((Date.now() - this.lastTask) > this.maxInactiveTime) {
       this.parent.postMessage({ kill: 1 })
     }
   }
index 0b3d306cdcda8efe2e8f34557a622598ee520a6e..4bbd2b8fb34f187d9f025a63fded9ee64e6bf849 100644 (file)
@@ -6,6 +6,7 @@
   "scripts": {
     "build": "npm install",
     "test": "standard && nyc mocha --exit --timeout 10000 **/*test.js ",
+    "demontest": "nodemon --exec \"npm test\"",
     "coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls",
     "standard-verbose": "npx standard --verbose",
     "lint": "standard --fix"
diff --git a/tests/dynamic.test.js b/tests/dynamic.test.js
new file mode 100644 (file)
index 0000000..7b0865b
--- /dev/null
@@ -0,0 +1,61 @@
+const expect = require('expect')
+const DynamicThreadPool = require('../lib/dynamic')
+const min = 1
+const max = 10
+const pool = new DynamicThreadPool(min, max,
+  './tests/testWorker.js',
+  { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
+describe('Dynamic thread pool test suite ', () => {
+  it('Verify that the function is executed in a worker thread', async () => {
+    const result = await pool.execute({ test: 'test' })
+    expect(result).toBeDefined()
+    expect(result).toBeFalsy()
+  })
+
+  it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
+    const promises = []
+    let closedThreads = 0
+    for (let i = 0; i < (max * 3); i++) {
+      promises.push(pool.execute({ test: 'test' }))
+    }
+    expect(pool.workers.length).toBe(max)
+    pool.workers.forEach(w => {
+      w.on('exit', () => {
+        closedThreads++
+      })
+    })
+    await new Promise(resolve => setTimeout(resolve, 1000 * 2))
+    expect(closedThreads).toBe(max - min)
+  })
+
+  it('Shutdown test', async () => {
+    let closedThreads = 0
+    pool.workers.forEach(w => {
+      w.on('exit', () => {
+        closedThreads++
+      })
+    })
+    pool.destroy()
+    await new Promise(resolve => setTimeout(resolve, 1000))
+    expect(closedThreads).toBe(min)
+  })
+
+  it('Validations test', () => {
+    let error
+    try {
+      const pool1 = new DynamicThreadPool()
+      console.log(pool1)
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeTruthy()
+    expect(error.message).toBeTruthy()
+  })
+
+  it('Should work even without opts in input', async () => {
+    const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js')
+    const res = await pool1.execute({ test: 'test' })
+    expect(res).toBeFalsy()
+  })
+})
diff --git a/tests/fixed.test.js b/tests/fixed.test.js
new file mode 100644 (file)
index 0000000..78e960b
--- /dev/null
@@ -0,0 +1,52 @@
+const expect = require('expect')
+const FixedThreadPool = require('../lib/fixed')
+const numThreads = 10
+const pool = new FixedThreadPool(numThreads,
+  './tests/testWorker.js',
+  { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
+describe('Fixed thread pool test suite ', () => {
+  it('Choose worker round robin test', async () => {
+    const results = new Set()
+    for (let i = 0; i < numThreads; i++) {
+      results.add(pool._chooseWorker().threadId)
+    }
+    expect(results.size).toBe(numThreads)
+  })
+
+  it('Verify that the function is executed in a worker thread', async () => {
+    const result = await pool.execute({ test: 'test' })
+    expect(result).toBeDefined()
+    expect(result).toBeFalsy()
+  })
+
+  it('Shutdown test', async () => {
+    let closedThreads = 0
+    pool.workers.forEach(w => {
+      w.on('exit', () => {
+        closedThreads++
+      })
+    })
+    pool.destroy()
+    await new Promise(resolve => setTimeout(resolve, 1000))
+    expect(closedThreads).toBe(numThreads)
+  })
+
+  it('Validations test', () => {
+    let error
+    try {
+      const pool1 = new FixedThreadPool()
+      console.log(pool1)
+    } catch (e) {
+      error = e
+    }
+    expect(error).toBeTruthy()
+    expect(error.message).toBeTruthy()
+  })
+
+  it('Should work even without opts in input', async () => {
+    const pool1 = new FixedThreadPool(1, './tests/testWorker.js')
+    const res = await pool1.execute({ test: 'test' })
+    expect(res).toBeFalsy()
+  })
+})
diff --git a/tests/testWorker.js b/tests/testWorker.js
new file mode 100644 (file)
index 0000000..d692304
--- /dev/null
@@ -0,0 +1,19 @@
+'use strict'
+const { ThreadWorker } = require('../lib/workers')
+const { isMainThread } = require('worker_threads')
+
+class MyWorker extends ThreadWorker {
+  constructor () {
+    super((data) => {
+      for (let i = 0; i <= 100; i++) {
+        const o = {
+          a: i
+        }
+        JSON.stringify(o)
+      }
+      return isMainThread
+    }, { maxInactiveTime: 1000 })
+  }
+}
+
+module.exports = new MyWorker()
index 0d6c99f18d0ad95d576efe0170bdab415a467ab4..0cba02ce65fdcd1c6448d0015373a2a1e52067b6 100644 (file)
@@ -1,7 +1,20 @@
 const expect = require('expect')
+const { generateID, randomWorker } = require('../lib/util')
 
-describe('Generate uuid', () => {
-  it('Just a proof for CI', () => {
-    expect(true).toBeTruthy()
+describe('Utility Tests ', () => {
+  it('Generate an id', () => {
+    const res = generateID()
+    expect(res).toBeTruthy()
+    expect(typeof res).toBe('string')
+  })
+
+  it('Choose a random worker', () => {
+    const input = new Map()
+    input.set(1, 1)
+    input.set(2, 2)
+    input.set(3, 3)
+    const worker = randomWorker(input)
+    expect(worker).toBeTruthy()
+    expect(Array.from(input.keys()).includes(worker)).toBeTruthy()
   })
 })
index c3a7b0c4a9d606abee2c8b9780e0fa6df0af59e4..34a85dbbe8f64040c027c44b78b6987c3d25d5d3 100644 (file)
@@ -11,7 +11,7 @@ class MyWorker extends ThreadWorker {
         JSON.stringify(o)
       }
       // console.log('This is the main thread ' + isMainThread)
-      return data
+      return { ok: 1 }
     })
   }
 }