Merge pull request #1 from pioardi/dependabot/npm_and_yarn/expect-25.1.0
[poolifier.git] / lib / fixed.js
index 5d5d40e1be1a53c8092360432f97f77a97d57e41..8d899d57bb4ed00a54e4d1a1f2093641d8bd3f77 100644 (file)
@@ -2,10 +2,9 @@
 const {
   Worker, isMainThread, MessageChannel, SHARE_ENV
 } = require('worker_threads')
-const path = require('path')
-const { generateID } = require('./util')
 
 function empty () {}
+const _void = {}
 /**
  * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
  * This pool will select the worker thread in a round robin fashion. <br>
@@ -16,17 +15,18 @@ class FixedThreadPool {
   /**
     *
     * @param {Number} numThreads  Num of threads for this worker pool
-    * @param {string} a file path with implementation of @see ThreadWorker class
-    * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
+    * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
+    * @param {Object} an object with possible options for example errorHandler, onlineHandler.
   */
-  constructor (numThreads, filename, opts) {
+  constructor (numThreads, filePath, opts) {
     if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
-    if (!filename) throw new Error('Please specify a file with a worker implementation')
+    if (!filePath) throw new Error('Please specify a file with a worker implementation')
     this.numThreads = numThreads
     this.workers = []
     this.nextWorker = 0
     this.opts = opts || { maxTasks: 1000 }
-    this.filename = filename
+    this.filePath = filePath
+    this._id = 0
     // threadId as key and an integer value
     this.tasks = new Map()
     for (let i = 1; i <= numThreads; i++) {
@@ -38,7 +38,6 @@ class FixedThreadPool {
     for (const worker of this.workers) {
       worker.terminate()
     }
-    this.emitDestroy()
   }
 
   /**
@@ -50,10 +49,9 @@ class FixedThreadPool {
     // configure worker to handle message with the specified task
     const worker = this._chooseWorker()
     this.tasks.set(worker, this.tasks.get(worker) + 1)
-    const id = generateID()
-    data._id = id
+    const id = ++this._id
     const res = this._execute(worker, id)
-    worker.postMessage(data)
+    worker.postMessage({ data: data || _void, _id: id })
     return res
   }
 
@@ -63,7 +61,8 @@ class FixedThreadPool {
         if (message._id === id) {
           worker.port2.removeListener('message', listener)
           this.tasks.set(worker, this.tasks.get(worker) - 1)
-          resolve(message.data)
+          if (message.error) reject(message.error)
+          else resolve(message.data)
         }
       }
       worker.port2.on('message', listener)
@@ -81,10 +80,11 @@ class FixedThreadPool {
   }
 
   _newWorker () {
-    const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+    const worker = new Worker(this.filePath, { env: SHARE_ENV })
     worker.on('error', this.opts.errorHandler || empty)
-    worker.on('exit', this.opts.exitHandler || empty)
     worker.on('online', this.opts.onlineHandler || empty)
+    // TODO handle properly when a thread exit
+    worker.on('exit', this.opts.exitHandler || empty)
     this.workers.push(worker)
     const { port1, port2 } = new MessageChannel()
     worker.postMessage({ parent: port1 }, [port1])
@@ -92,7 +92,7 @@ class FixedThreadPool {
     worker.port2 = port2
     // we will attach a listener for every task,
     // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
-    worker.port2.setMaxListeners(this.opts.maxTasks)
+    worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
     // init tasks map
     this.tasks.set(worker, 0)
     return worker