General clean up
authoraardizio <alessandroardizio94@gmail.com>
Sun, 19 Jan 2020 13:56:05 +0000 (14:56 +0100)
committeraardizio <alessandroardizio94@gmail.com>
Sun, 19 Jan 2020 13:56:05 +0000 (14:56 +0100)
lib/dynamic.js
lib/fixed.js
lib/workers.js
proof.js
yourWorker.js

index 53ff28fadf06c0fde0536edceb9358fef839510e..fa8a3a521c96b0b01954dcf303a7264308e541b0 100644 (file)
@@ -37,7 +37,6 @@ class DynamicThreadPool extends FixedThreadPool {
       if (this.workers.length === this.max) {
         throw new Error('Max number of threads reached !!!')
       }
-      // console.log('new thread is coming')
       // all workers are busy create a new worker
       const worker = this._newWorker()
       worker.port2.on('message', (message) => {
index 442d58e459c7d0ce75058fa4a0518db9a66d34ee..5d5d40e1be1a53c8092360432f97f77a97d57e41 100644 (file)
@@ -5,6 +5,7 @@ const {
 const path = require('path')
 const { generateID } = require('./util')
 
+function empty () {}
 /**
  * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
  * This pool will select the worker thread in a round robin fashion. <br>
@@ -15,7 +16,8 @@ class FixedThreadPool {
   /**
     *
     * @param {Number} numThreads  Num of threads for this worker pool
-    * @param {Object} an object with possible options for example maxConcurrency
+    * @param {string} a file path with implementation of @see ThreadWorker class
+    * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
   */
   constructor (numThreads, filename, opts) {
     if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
@@ -32,6 +34,13 @@ class FixedThreadPool {
     }
   }
 
+  destroy () {
+    for (const worker of this.workers) {
+      worker.terminate()
+    }
+    this.emitDestroy()
+  }
+
   /**
    * Execute the task specified into the constructor with the data parameter.
    * @param {Any} the input for the task specified
@@ -41,10 +50,8 @@ class FixedThreadPool {
     // configure worker to handle message with the specified task
     const worker = this._chooseWorker()
     this.tasks.set(worker, this.tasks.get(worker) + 1)
-    // console.log('Num of pending tasks ', this.tasks.get(worker))
     const id = generateID()
     data._id = id
-    // console.log('Worker choosed is ' + worker.threadId)
     const res = this._execute(worker, id)
     worker.postMessage(data)
     return res
@@ -55,7 +62,6 @@ class FixedThreadPool {
       const listener = (message) => {
         if (message._id === id) {
           worker.port2.removeListener('message', listener)
-          // console.log(worker.port2.listenerCount('message'))
           this.tasks.set(worker, this.tasks.get(worker) - 1)
           resolve(message.data)
         }
@@ -76,8 +82,9 @@ class FixedThreadPool {
 
   _newWorker () {
     const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
-    worker.on('error', (e) => console.error(e))
-    worker.on('exit', () => console.log('EXITING'))
+    worker.on('error', this.opts.errorHandler || empty)
+    worker.on('exit', this.opts.exitHandler || empty)
+    worker.on('online', this.opts.onlineHandler || empty)
     this.workers.push(worker)
     const { port1, port2 } = new MessageChannel()
     worker.postMessage({ parent: port1 }, [port1])
index e6d43c698de5b85cd0ec61baa479c32f1f8b7cc9..8641ed9e871deafce80c0adafd5b9531443ec209 100644 (file)
@@ -2,34 +2,9 @@
 const {
   isMainThread, parentPort
 } = require('worker_threads')
+const { AsyncResource } = require('async_hooks')
 const maxInactiveTime = 1000 * 60
 
-/**
- * An example worker that will be always alive, you just need to extend this class if you want a static pool.
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class ThreadWorker {
-  constructor (fn) {
-    if (!fn) throw new Error('Fn parameter is mandatory')
-    // keep the worker active
-    if (!isMainThread) {
-      this.interval =
-      setInterval(() => {
-      }, 10000)
-    }
-    parentPort.on('message', (value) => {
-      if (value.parent) {
-        // save the port to communicate with the main thread
-        this.parent = value.parent
-      } else if (value && value._id) {
-        // console.log('This is the main thread ' + isMainThread)
-        this.parent.postMessage({ data: fn(value), _id: value._id })
-      }
-    })
-  }
-}
-
 /**
  * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
  * When this worker is inactive for more than 1 minute, it will send this info to the main thread,<br>
@@ -37,8 +12,9 @@ class ThreadWorker {
  * @author Alessandro Pio Ardizio
  * @since 0.0.1
  */
-class DynamicWorker {
+class ThreadWorker extends AsyncResource {
   constructor (fn) {
+    super('worker-thread-pool:pioardi')
     this.lastTask = Date.now()
     if (!fn) throw new Error('Fn parameter is mandatory')
     // keep the worker active
@@ -50,7 +26,8 @@ class DynamicWorker {
       if (value && value._id) {
         // here you will receive messages
         // console.log('This is the main thread ' + isMainThread)
-        this.parent.postMessage({ data: fn(value), _id: value._id })
+        const res = this.runInAsyncScope(fn, null, value)
+        this.parent.postMessage({ data: res, _id: value._id })
         this.lastTask = Date.now()
       } else if (value.parent) {
         // save the port to communicate with the main thread
@@ -71,4 +48,3 @@ class DynamicWorker {
 }
 
 module.exports.ThreadWorker = ThreadWorker
-module.exports.DynamicWorker = DynamicWorker
index 4ac63d0544ec863b78c0c0af5b14c516def7747c..d6af4033796352695e3da3bdcc74c7033c64d1d7 100644 (file)
--- a/proof.js
+++ b/proof.js
@@ -2,13 +2,12 @@ const FixedThreadPool = require('./lib/fixed')
 const DynamicThreadPool = require('./lib/dynamic')
 let resolved = 0
 // const pool = new FixedThreadPool(15, './yourWorker.js')
-const pool = new DynamicThreadPool(15, 1020, './yourWorker.js')
+const pool = new DynamicThreadPool(300, 1020, './yourWorker.js')
 
 const start = Date.now()
-const iterations = 1000
+const iterations = 300
 for (let i = 0; i <= iterations; i++) {
   pool.execute({}).then(res => {
-    // console.log(res)
     resolved++
     if (resolved === iterations) {
       console.log('Time take is ' + (Date.now() - start))
index a74c6341470c2f1c4c47a2432bf2dc83c2503360..c3a7b0c4a9d606abee2c8b9780e0fa6df0af59e4 100644 (file)
@@ -1,7 +1,7 @@
 'use strict'
-const { ThreadWorker, DynamicWorker } = require('./lib/workers')
+const { ThreadWorker } = require('./lib/workers')
 
-class MyWorker extends DynamicWorker {
+class MyWorker extends ThreadWorker {
   constructor () {
     super((data) => {
       for (let i = 0; i <= 10000; i++) {