General clean up
[poolifier.git] / lib / fixed.js
index 442d58e459c7d0ce75058fa4a0518db9a66d34ee..5d5d40e1be1a53c8092360432f97f77a97d57e41 100644 (file)
@@ -5,6 +5,7 @@ const {
 const path = require('path')
 const { generateID } = require('./util')
 
+function empty () {}
 /**
  * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
  * This pool will select the worker thread in a round robin fashion. <br>
@@ -15,7 +16,8 @@ class FixedThreadPool {
   /**
     *
     * @param {Number} numThreads  Num of threads for this worker pool
-    * @param {Object} an object with possible options for example maxConcurrency
+    * @param {string} a file path with implementation of @see ThreadWorker class
+    * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
   */
   constructor (numThreads, filename, opts) {
     if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
@@ -32,6 +34,13 @@ class FixedThreadPool {
     }
   }
 
+  destroy () {
+    for (const worker of this.workers) {
+      worker.terminate()
+    }
+    this.emitDestroy()
+  }
+
   /**
    * Execute the task specified into the constructor with the data parameter.
    * @param {Any} the input for the task specified
@@ -41,10 +50,8 @@ class FixedThreadPool {
     // configure worker to handle message with the specified task
     const worker = this._chooseWorker()
     this.tasks.set(worker, this.tasks.get(worker) + 1)
-    // console.log('Num of pending tasks ', this.tasks.get(worker))
     const id = generateID()
     data._id = id
-    // console.log('Worker choosed is ' + worker.threadId)
     const res = this._execute(worker, id)
     worker.postMessage(data)
     return res
@@ -55,7 +62,6 @@ class FixedThreadPool {
       const listener = (message) => {
         if (message._id === id) {
           worker.port2.removeListener('message', listener)
-          // console.log(worker.port2.listenerCount('message'))
           this.tasks.set(worker, this.tasks.get(worker) - 1)
           resolve(message.data)
         }
@@ -76,8 +82,9 @@ class FixedThreadPool {
 
   _newWorker () {
     const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
-    worker.on('error', (e) => console.error(e))
-    worker.on('exit', () => console.log('EXITING'))
+    worker.on('error', this.opts.errorHandler || empty)
+    worker.on('exit', this.opts.exitHandler || empty)
+    worker.on('online', this.opts.onlineHandler || empty)
     this.workers.push(worker)
     const { port1, port2 } = new MessageChannel()
     worker.postMessage({ parent: port1 }, [port1])