Merge pull request #1 from pioardi/dependabot/npm_and_yarn/expect-25.1.0
[poolifier.git] / lib / workers.js
index 8641ed9e871deafce80c0adafd5b9531443ec209..103061aa78b55e55625eb877f81b300239116d14 100644 (file)
@@ -3,7 +3,6 @@ const {
   isMainThread, parentPort
 } = require('worker_threads')
 const { AsyncResource } = require('async_hooks')
-const maxInactiveTime = 1000 * 60
 
 /**
  * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
@@ -13,22 +12,22 @@ const maxInactiveTime = 1000 * 60
  * @since 0.0.1
  */
 class ThreadWorker extends AsyncResource {
-  constructor (fn) {
+  constructor (fn, opts) {
     super('worker-thread-pool:pioardi')
+    this.opts = opts || {}
+    this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60)
     this.lastTask = Date.now()
     if (!fn) throw new Error('Fn parameter is mandatory')
     // keep the worker active
     if (!isMainThread) {
-      this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime)
+      this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2)
       this._checkAlive.bind(this)()
     }
     parentPort.on('message', (value) => {
-      if (value && value._id) {
+      if (value && value.data && value._id) {
         // here you will receive messages
         // console.log('This is the main thread ' + isMainThread)
-        const res = this.runInAsyncScope(fn, null, value)
-        this.parent.postMessage({ data: res, _id: value._id })
-        this.lastTask = Date.now()
+        this.runInAsyncScope(this._run.bind(this), this, fn, value)
       } else if (value.parent) {
         // save the port to communicate with the main thread
         // this will be received once
@@ -36,15 +35,27 @@ class ThreadWorker extends AsyncResource {
       } else if (value.kill) {
         // here is time to kill this thread, just clearing the interval
         clearInterval(this.interval)
+        this.emitDestroy()
       }
     })
   }
 
   _checkAlive () {
-    if ((Date.now() - this.lastTask) > maxInactiveTime) {
+    if ((Date.now() - this.lastTask) > this.maxInactiveTime) {
       this.parent.postMessage({ kill: 1 })
     }
   }
+
+  _run (fn, value) {
+    try {
+      const res = fn(value.data)
+      this.parent.postMessage({ data: res, _id: value._id })
+      this.lastTask = Date.now()
+    } catch (e) {
+      this.parent.postMessage({ error: e, _id: value._id })
+      this.lastTask = Date.now()
+    }
+  }
 }
 
 module.exports.ThreadWorker = ThreadWorker