[feat] support exec async func in worker threads
[poolifier.git] / lib / workers.js
CommitLineData
a32e02ba 1'use strict'
2const {
3 isMainThread, parentPort
4} = require('worker_threads')
50811da2 5const { AsyncResource } = require('async_hooks')
a32e02ba 6
a32e02ba 7/**
8 * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
9 * When this worker is inactive for more than 1 minute, it will send this info to the main thread,<br>
10 * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed
11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
50811da2 14class ThreadWorker extends AsyncResource {
506c2a14 15 constructor (fn, opts) {
50811da2 16 super('worker-thread-pool:pioardi')
506c2a14 17 this.opts = opts || {}
18 this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60)
7784f548 19 this.async = !!this.opts.async
a32e02ba 20 this.lastTask = Date.now()
21 if (!fn) throw new Error('Fn parameter is mandatory')
22 // keep the worker active
23 if (!isMainThread) {
506c2a14 24 this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2)
a32e02ba 25 this._checkAlive.bind(this)()
26 }
27 parentPort.on('message', (value) => {
506c2a14 28 if (value && value.data && value._id) {
a32e02ba 29 // here you will receive messages
30 // console.log('This is the main thread ' + isMainThread)
7784f548 31 if (this.async) {
32 this.runInAsyncScope(this._runAsync.bind(this), this, fn, value)
33 } else {
34 this.runInAsyncScope(this._run.bind(this), this, fn, value)
35 }
a32e02ba 36 } else if (value.parent) {
37 // save the port to communicate with the main thread
38 // this will be received once
39 this.parent = value.parent
40 } else if (value.kill) {
41 // here is time to kill this thread, just clearing the interval
42 clearInterval(this.interval)
34a572eb 43 this.emitDestroy()
a32e02ba 44 }
45 })
46 }
47
48 _checkAlive () {
506c2a14 49 if ((Date.now() - this.lastTask) > this.maxInactiveTime) {
a32e02ba 50 this.parent.postMessage({ kill: 1 })
51 }
52 }
106744f7 53
54 _run (fn, value) {
55 try {
8950a941 56 const res = fn(value.data)
106744f7 57 this.parent.postMessage({ data: res, _id: value._id })
58 this.lastTask = Date.now()
59 } catch (e) {
60 this.parent.postMessage({ error: e, _id: value._id })
61 this.lastTask = Date.now()
62 }
63 }
7784f548 64
65 _runAsync(fn, value) {
66 fn(value.data).then(res => {
67 this.parent.postMessage({ data: res, _id: value._id })
68 this.lastTask = Date.now()
69 }).catch(e => {
70 this.parent.postMessage({ error: e, _id: value._id })
71 this.lastTask = Date.now()
72 })
73 }
a32e02ba 74}
75
76module.exports.ThreadWorker = ThreadWorker