From 50811da2e375e78179b56fe78fd7674ad2f360fd Mon Sep 17 00:00:00 2001 From: aardizio Date: Sun, 19 Jan 2020 14:56:05 +0100 Subject: [PATCH] General clean up --- lib/dynamic.js | 1 - lib/fixed.js | 19 +++++++++++++------ lib/workers.js | 34 +++++----------------------------- proof.js | 5 ++--- yourWorker.js | 4 ++-- 5 files changed, 22 insertions(+), 41 deletions(-) diff --git a/lib/dynamic.js b/lib/dynamic.js index 53ff28fa..fa8a3a52 100644 --- a/lib/dynamic.js +++ b/lib/dynamic.js @@ -37,7 +37,6 @@ class DynamicThreadPool extends FixedThreadPool { if (this.workers.length === this.max) { throw new Error('Max number of threads reached !!!') } - // console.log('new thread is coming') // all workers are busy create a new worker const worker = this._newWorker() worker.port2.on('message', (message) => { diff --git a/lib/fixed.js b/lib/fixed.js index 442d58e4..5d5d40e1 100644 --- a/lib/fixed.js +++ b/lib/fixed.js @@ -5,6 +5,7 @@ const { const path = require('path') const { generateID } = require('./util') +function empty () {} /** * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This pool will select the worker thread in a round robin fashion.
@@ -15,7 +16,8 @@ class FixedThreadPool { /** * * @param {Number} numThreads Num of threads for this worker pool - * @param {Object} an object with possible options for example maxConcurrency + * @param {string} a file path with implementation of @see ThreadWorker class + * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler */ constructor (numThreads, filename, opts) { if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') @@ -32,6 +34,13 @@ class FixedThreadPool { } } + destroy () { + for (const worker of this.workers) { + worker.terminate() + } + this.emitDestroy() + } + /** * Execute the task specified into the constructor with the data parameter. * @param {Any} the input for the task specified @@ -41,10 +50,8 @@ class FixedThreadPool { // configure worker to handle message with the specified task const worker = this._chooseWorker() this.tasks.set(worker, this.tasks.get(worker) + 1) - // console.log('Num of pending tasks ', this.tasks.get(worker)) const id = generateID() data._id = id - // console.log('Worker choosed is ' + worker.threadId) const res = this._execute(worker, id) worker.postMessage(data) return res @@ -55,7 +62,6 @@ class FixedThreadPool { const listener = (message) => { if (message._id === id) { worker.port2.removeListener('message', listener) - // console.log(worker.port2.listenerCount('message')) this.tasks.set(worker, this.tasks.get(worker) - 1) resolve(message.data) } @@ -76,8 +82,9 @@ class FixedThreadPool { _newWorker () { const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV }) - worker.on('error', (e) => console.error(e)) - worker.on('exit', () => console.log('EXITING')) + worker.on('error', this.opts.errorHandler || empty) + worker.on('exit', this.opts.exitHandler || empty) + worker.on('online', this.opts.onlineHandler || empty) this.workers.push(worker) const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) diff --git a/lib/workers.js b/lib/workers.js index e6d43c69..8641ed9e 100644 --- a/lib/workers.js +++ b/lib/workers.js @@ -2,34 +2,9 @@ const { isMainThread, parentPort } = require('worker_threads') +const { AsyncResource } = require('async_hooks') const maxInactiveTime = 1000 * 60 -/** - * An example worker that will be always alive, you just need to extend this class if you want a static pool. - * @author Alessandro Pio Ardizio - * @since 0.0.1 - */ -class ThreadWorker { - constructor (fn) { - if (!fn) throw new Error('Fn parameter is mandatory') - // keep the worker active - if (!isMainThread) { - this.interval = - setInterval(() => { - }, 10000) - } - parentPort.on('message', (value) => { - if (value.parent) { - // save the port to communicate with the main thread - this.parent = value.parent - } else if (value && value._id) { - // console.log('This is the main thread ' + isMainThread) - this.parent.postMessage({ data: fn(value), _id: value._id }) - } - }) - } -} - /** * An example worker that will be always alive, you just need to extend this class if you want a static pool.
* When this worker is inactive for more than 1 minute, it will send this info to the main thread,
@@ -37,8 +12,9 @@ class ThreadWorker { * @author Alessandro Pio Ardizio * @since 0.0.1 */ -class DynamicWorker { +class ThreadWorker extends AsyncResource { constructor (fn) { + super('worker-thread-pool:pioardi') this.lastTask = Date.now() if (!fn) throw new Error('Fn parameter is mandatory') // keep the worker active @@ -50,7 +26,8 @@ class DynamicWorker { if (value && value._id) { // here you will receive messages // console.log('This is the main thread ' + isMainThread) - this.parent.postMessage({ data: fn(value), _id: value._id }) + const res = this.runInAsyncScope(fn, null, value) + this.parent.postMessage({ data: res, _id: value._id }) this.lastTask = Date.now() } else if (value.parent) { // save the port to communicate with the main thread @@ -71,4 +48,3 @@ class DynamicWorker { } module.exports.ThreadWorker = ThreadWorker -module.exports.DynamicWorker = DynamicWorker diff --git a/proof.js b/proof.js index 4ac63d05..d6af4033 100644 --- a/proof.js +++ b/proof.js @@ -2,13 +2,12 @@ const FixedThreadPool = require('./lib/fixed') const DynamicThreadPool = require('./lib/dynamic') let resolved = 0 // const pool = new FixedThreadPool(15, './yourWorker.js') -const pool = new DynamicThreadPool(15, 1020, './yourWorker.js') +const pool = new DynamicThreadPool(300, 1020, './yourWorker.js') const start = Date.now() -const iterations = 1000 +const iterations = 300 for (let i = 0; i <= iterations; i++) { pool.execute({}).then(res => { - // console.log(res) resolved++ if (resolved === iterations) { console.log('Time take is ' + (Date.now() - start)) diff --git a/yourWorker.js b/yourWorker.js index a74c6341..c3a7b0c4 100644 --- a/yourWorker.js +++ b/yourWorker.js @@ -1,7 +1,7 @@ 'use strict' -const { ThreadWorker, DynamicWorker } = require('./lib/workers') +const { ThreadWorker } = require('./lib/workers') -class MyWorker extends DynamicWorker { +class MyWorker extends ThreadWorker { constructor () { super((data) => { for (let i = 0; i <= 10000; i++) { -- 2.34.1