X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=lib%2Fworkers.js;h=abd60b4ff0ba814aac9d0af43cd4a5f7fe34d31b;hb=086bf7bb050ac2305359a4e9d907fee5c787e5db;hp=e6d43c698de5b85cd0ec61baa479c32f1f8b7cc9;hpb=a32e02baa991ae01b5d677e3fd34821965daab1e;p=poolifier.git diff --git a/lib/workers.js b/lib/workers.js index e6d43c69..abd60b4f 100644 --- a/lib/workers.js +++ b/lib/workers.js @@ -2,33 +2,7 @@ const { isMainThread, parentPort } = require('worker_threads') -const maxInactiveTime = 1000 * 60 - -/** - * An example worker that will be always alive, you just need to extend this class if you want a static pool. - * @author Alessandro Pio Ardizio - * @since 0.0.1 - */ -class ThreadWorker { - constructor (fn) { - if (!fn) throw new Error('Fn parameter is mandatory') - // keep the worker active - if (!isMainThread) { - this.interval = - setInterval(() => { - }, 10000) - } - parentPort.on('message', (value) => { - if (value.parent) { - // save the port to communicate with the main thread - this.parent = value.parent - } else if (value && value._id) { - // console.log('This is the main thread ' + isMainThread) - this.parent.postMessage({ data: fn(value), _id: value._id }) - } - }) - } -} +const { AsyncResource } = require('async_hooks') /** * An example worker that will be always alive, you just need to extend this class if you want a static pool.
@@ -37,21 +11,28 @@ class ThreadWorker { * @author Alessandro Pio Ardizio * @since 0.0.1 */ -class DynamicWorker { - constructor (fn) { +class ThreadWorker extends AsyncResource { + constructor (fn, opts) { + super('worker-thread-pool:pioardi') + this.opts = opts || {} + this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60) + this.async = !!this.opts.async this.lastTask = Date.now() if (!fn) throw new Error('Fn parameter is mandatory') // keep the worker active if (!isMainThread) { - this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime) + this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2) this._checkAlive.bind(this)() } parentPort.on('message', (value) => { - if (value && value._id) { + if (value && value.data && value._id) { // here you will receive messages // console.log('This is the main thread ' + isMainThread) - this.parent.postMessage({ data: fn(value), _id: value._id }) - this.lastTask = Date.now() + if (this.async) { + this.runInAsyncScope(this._runAsync.bind(this), this, fn, value) + } else { + this.runInAsyncScope(this._run.bind(this), this, fn, value) + } } else if (value.parent) { // save the port to communicate with the main thread // this will be received once @@ -59,16 +40,37 @@ class DynamicWorker { } else if (value.kill) { // here is time to kill this thread, just clearing the interval clearInterval(this.interval) + this.emitDestroy() } }) } _checkAlive () { - if ((Date.now() - this.lastTask) > maxInactiveTime) { + if ((Date.now() - this.lastTask) > this.maxInactiveTime) { this.parent.postMessage({ kill: 1 }) } } + + _run (fn, value) { + try { + const res = fn(value.data) + this.parent.postMessage({ data: res, _id: value._id }) + this.lastTask = Date.now() + } catch (e) { + this.parent.postMessage({ error: e, _id: value._id }) + this.lastTask = Date.now() + } + } + + _runAsync (fn, value) { + fn(value.data).then(res => { + this.parent.postMessage({ data: res, _id: value._id }) + this.lastTask = Date.now() + }).catch(e => { + this.parent.postMessage({ error: e, _id: value._id }) + this.lastTask = Date.now() + }) + } } module.exports.ThreadWorker = ThreadWorker -module.exports.DynamicWorker = DynamicWorker