Improvements based on https://github.com/pioardi/poolifier/issues/6
[poolifier.git] / lib / fixed.js
index 442d58e459c7d0ce75058fa4a0518db9a66d34ee..676947725bed0bb65fe72fe949be8652b08f39c5 100644 (file)
@@ -2,9 +2,9 @@
 const {
   Worker, isMainThread, MessageChannel, SHARE_ENV
 } = require('worker_threads')
-const path = require('path')
-const { generateID } = require('./util')
 
+function empty () {}
+const _void = {}
 /**
  * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
  * This pool will select the worker thread in a round robin fashion. <br>
@@ -15,16 +15,18 @@ class FixedThreadPool {
   /**
     *
     * @param {Number} numThreads  Num of threads for this worker pool
-    * @param {Object} an object with possible options for example maxConcurrency
+    * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
+    * @param {Object} an object with possible options for example errorHandler, onlineHandler.
   */
-  constructor (numThreads, filename, opts) {
+  constructor (numThreads, filePath, opts) {
     if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
-    if (!filename) throw new Error('Please specify a file with a worker implementation')
+    if (!filePath) throw new Error('Please specify a file with a worker implementation')
     this.numThreads = numThreads
     this.workers = []
     this.nextWorker = 0
     this.opts = opts || { maxTasks: 1000 }
-    this.filename = filename
+    this.filePath = filePath
+    this._id = 0
     // threadId as key and an integer value
     this.tasks = new Map()
     for (let i = 1; i <= numThreads; i++) {
@@ -32,6 +34,12 @@ class FixedThreadPool {
     }
   }
 
+  async destroy () {
+    for (const worker of this.workers) {
+      await worker.terminate()
+    }
+  }
+
   /**
    * Execute the task specified into the constructor with the data parameter.
    * @param {Any} the input for the task specified
@@ -41,12 +49,9 @@ class FixedThreadPool {
     // configure worker to handle message with the specified task
     const worker = this._chooseWorker()
     this.tasks.set(worker, this.tasks.get(worker) + 1)
-    // console.log('Num of pending tasks ', this.tasks.get(worker))
-    const id = generateID()
-    data._id = id
-    // console.log('Worker choosed is ' + worker.threadId)
+    const id = ++this._id
     const res = this._execute(worker, id)
-    worker.postMessage(data)
+    worker.postMessage({ data: data || _void, _id: id })
     return res
   }
 
@@ -55,9 +60,9 @@ class FixedThreadPool {
       const listener = (message) => {
         if (message._id === id) {
           worker.port2.removeListener('message', listener)
-          // console.log(worker.port2.listenerCount('message'))
           this.tasks.set(worker, this.tasks.get(worker) - 1)
-          resolve(message.data)
+          if (message.error) reject(message.error)
+          else resolve(message.data)
         }
       }
       worker.port2.on('message', listener)
@@ -75,9 +80,11 @@ class FixedThreadPool {
   }
 
   _newWorker () {
-    const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
-    worker.on('error', (e) => console.error(e))
-    worker.on('exit', () => console.log('EXITING'))
+    const worker = new Worker(this.filePath, { env: SHARE_ENV })
+    worker.on('error', this.opts.errorHandler || empty)
+    worker.on('online', this.opts.onlineHandler || empty)
+    // TODO handle properly when a thread exit
+    worker.on('exit', this.opts.exitHandler || empty)
     this.workers.push(worker)
     const { port1, port2 } = new MessageChannel()
     worker.postMessage({ parent: port1 }, [port1])
@@ -85,7 +92,7 @@ class FixedThreadPool {
     worker.port2 = port2
     // we will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
-    worker.port2.setMaxListeners(this.opts.maxTasks)
+    worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
     // init tasks map
     this.tasks.set(worker, 0)
     return worker