[feat] support exec async func in worker threads
authorliuwenzhe <liuwenzhe_neoye@163.com>
Thu, 21 May 2020 07:02:45 +0000 (15:02 +0800)
committerliuwenzhe <liuwenzhe_neoye@163.com>
Thu, 21 May 2020 07:02:45 +0000 (15:02 +0800)
lib/workers.js
package-lock.json
tests/fixed.test.js
tests/workers/asyncWorker.js [new file with mode: 0644]

index 103061aa78b55e55625eb877f81b300239116d14..001c932ae7a92994b46f2a3e260a2affd337f87f 100644 (file)
@@ -16,6 +16,7 @@ class ThreadWorker extends AsyncResource {
     super('worker-thread-pool:pioardi')
     this.opts = opts || {}
     this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60)
+    this.async = !!this.opts.async
     this.lastTask = Date.now()
     if (!fn) throw new Error('Fn parameter is mandatory')
     // keep the worker active
@@ -27,7 +28,11 @@ class ThreadWorker extends AsyncResource {
       if (value && value.data && value._id) {
         // here you will receive messages
         // console.log('This is the main thread ' + isMainThread)
-        this.runInAsyncScope(this._run.bind(this), this, fn, value)
+        if (this.async) {
+          this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
+        } else {
+          this.runInAsyncScope(this._run.bind(this), this, fn, value)
+        }
       } else if (value.parent) {
         // save the port to communicate with the main thread
         // this will be received once
@@ -56,6 +61,16 @@ class ThreadWorker extends AsyncResource {
       this.lastTask = Date.now()
     }
   }
+
+  _runAsync(fn, value) {
+    fn(value.data).then(res => {
+      this.parent.postMessage({ data: res, _id: value._id })
+      this.lastTask = Date.now()
+    }).catch(e => {
+      this.parent.postMessage({ error: e, _id: value._id })
+      this.lastTask = Date.now()
+    })
+  }
 }
 
 module.exports.ThreadWorker = ThreadWorker
index 2ed66bfac5870a4a140f531f4abb9c04b6529293..e345b17fa8095a3f6498ba985434fcdfade574d0 100644 (file)
@@ -1,5 +1,5 @@
 {
-  "name": "node-thread-pool",
+  "name": "poolifier",
   "version": "1.0.0",
   "lockfileVersion": 1,
   "requires": true,
index e56e15ff301ea1e6fedcb9081e119bd208f310be..b591c54c64f0a3e7e3664a16898569e66a1328ea 100644 (file)
@@ -7,6 +7,7 @@ const pool = new FixedThreadPool(numThreads,
 const emptyPool = new FixedThreadPool(1, './tests/workers/emptyWorker.js')
 const echoPool = new FixedThreadPool(1, './tests/workers/echoWorker.js')
 const errorPool = new FixedThreadPool(1, './tests/workers/errorWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+const asyncPool = new FixedThreadPool(1, './tests/workers/asyncWorker.js')
 
 describe('Fixed thread pool test suite ', () => {
   it('Choose worker round robin test', async () => {
@@ -54,6 +55,16 @@ describe('Fixed thread pool test suite ', () => {
     expect(inError.message).toBeTruthy()
   })
 
+  it('Verify that async function is working properly', async () => {
+    const data = { f: 10 }
+    const startTime = new Date().getTime()
+    const result = await asyncPool.execute(data)
+    const usedTime = new Date().getTime() - startTime
+    expect(result).toBeTruthy()
+    expect(result.f).toBe(data.f)
+    expect(usedTime).toBeGreaterThan(2000)
+  })
+
   it('Shutdown test', async () => {
     let closedThreads = 0
     pool.workers.forEach(w => {
diff --git a/tests/workers/asyncWorker.js b/tests/workers/asyncWorker.js
new file mode 100644 (file)
index 0000000..b25b318
--- /dev/null
@@ -0,0 +1,10 @@
+'use strict'
+const { ThreadWorker } = require('../../lib/workers')
+
+async function sleep (data) {
+  return new Promise((resolve, reject) => {
+    setTimeout(() => resolve(data), 2000)
+  })
+}
+
+module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true })
\ No newline at end of file