Bump expect from 26.5.2 to 26.5.3
[poolifier.git] / lib / workers.js
index e6d43c698de5b85cd0ec61baa479c32f1f8b7cc9..abd60b4ff0ba814aac9d0af43cd4a5f7fe34d31b 100644 (file)
@@ -2,33 +2,7 @@
 const {
   isMainThread, parentPort
 } = require('worker_threads')
-const maxInactiveTime = 1000 * 60
-
-/**
- * An example worker that will be always alive, you just need to extend this class if you want a static pool.
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class ThreadWorker {
-  constructor (fn) {
-    if (!fn) throw new Error('Fn parameter is mandatory')
-    // keep the worker active
-    if (!isMainThread) {
-      this.interval =
-      setInterval(() => {
-      }, 10000)
-    }
-    parentPort.on('message', (value) => {
-      if (value.parent) {
-        // save the port to communicate with the main thread
-        this.parent = value.parent
-      } else if (value && value._id) {
-        // console.log('This is the main thread ' + isMainThread)
-        this.parent.postMessage({ data: fn(value), _id: value._id })
-      }
-    })
-  }
-}
+const { AsyncResource } = require('async_hooks')
 
 /**
  * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
@@ -37,21 +11,28 @@ class ThreadWorker {
  * @author Alessandro Pio Ardizio
  * @since 0.0.1
  */
-class DynamicWorker {
-  constructor (fn) {
+class ThreadWorker extends AsyncResource {
+  constructor (fn, opts) {
+    super('worker-thread-pool:pioardi')
+    this.opts = opts || {}
+    this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60)
+    this.async = !!this.opts.async
     this.lastTask = Date.now()
     if (!fn) throw new Error('Fn parameter is mandatory')
     // keep the worker active
     if (!isMainThread) {
-      this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime)
+      this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2)
       this._checkAlive.bind(this)()
     }
     parentPort.on('message', (value) => {
-      if (value && value._id) {
+      if (value && value.data && value._id) {
         // here you will receive messages
         // console.log('This is the main thread ' + isMainThread)
-        this.parent.postMessage({ data: fn(value), _id: value._id })
-        this.lastTask = Date.now()
+        if (this.async) {
+          this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
+        } else {
+          this.runInAsyncScope(this._run.bind(this), this, fn, value)
+        }
       } else if (value.parent) {
         // save the port to communicate with the main thread
         // this will be received once
@@ -59,16 +40,37 @@ class DynamicWorker {
       } else if (value.kill) {
         // here is time to kill this thread, just clearing the interval
         clearInterval(this.interval)
+        this.emitDestroy()
       }
     })
   }
 
   _checkAlive () {
-    if ((Date.now() - this.lastTask) > maxInactiveTime) {
+    if ((Date.now() - this.lastTask) > this.maxInactiveTime) {
       this.parent.postMessage({ kill: 1 })
     }
   }
+
+  _run (fn, value) {
+    try {
+      const res = fn(value.data)
+      this.parent.postMessage({ data: res, _id: value._id })
+      this.lastTask = Date.now()
+    } catch (e) {
+      this.parent.postMessage({ error: e, _id: value._id })
+      this.lastTask = Date.now()
+    }
+  }
+
+  _runAsync (fn, value) {
+    fn(value.data).then(res => {
+      this.parent.postMessage({ data: res, _id: value._id })
+      this.lastTask = Date.now()
+    }).catch(e => {
+      this.parent.postMessage({ error: e, _id: value._id })
+      this.lastTask = Date.now()
+    })
+  }
 }
 
 module.exports.ThreadWorker = ThreadWorker
-module.exports.DynamicWorker = DynamicWorker