From bf962cbad61db1b86a0cc0e9ae9fff1fda052a8e Mon Sep 17 00:00:00 2001 From: aardizio Date: Sun, 19 Jan 2020 16:45:31 +0100 Subject: [PATCH] Added an emitter to the dynamic pool , example splitted and improved --- dynamicExample.js | 17 +++++++++++++++++ lib/dynamic.js | 19 ++++++++++++++++--- lib/fixed.js | 7 +++---- lib/util.js | 5 +++++ proof.js => staticExample.js | 8 ++++---- 5 files changed, 45 insertions(+), 11 deletions(-) create mode 100644 dynamicExample.js rename proof.js => staticExample.js (58%) diff --git a/dynamicExample.js b/dynamicExample.js new file mode 100644 index 00000000..11032a92 --- /dev/null +++ b/dynamicExample.js @@ -0,0 +1,17 @@ +const DynamicThreadPool = require('./lib/dynamic') +let resolved = 0 +let maxReached = 0 +const pool = new DynamicThreadPool(100, 200, './yourWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) +pool.emitter.on('FullPool', () => maxReached++) + +const start = Date.now() +const iterations = 1000 +for (let i = 0; i <= iterations; i++) { + pool.execute({}).then(res => { + resolved++ + if (resolved === iterations) { + console.log('Time take is ' + (Date.now() - start)) + console.log('The pool was full for ' + maxReached + ' times') + } + }) +} diff --git a/lib/dynamic.js b/lib/dynamic.js index fa8a3a52..cedb8c35 100644 --- a/lib/dynamic.js +++ b/lib/dynamic.js @@ -1,11 +1,13 @@ 'use strict' const FixedThreadPool = require('./fixed') +const { randomWorker } = require('./util') +const EventEmitter = require('events') +class MyEmitter extends EventEmitter {} /** * A thread pool with a min/max number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This thread pool will create new workers when the other ones are busy, until the max number of threads, - * when the max number of threads is reached, an exception will be thrown. - * This pool will select the worker thread in a round robin fashion.
+ * when the max number of threads is reached, an event will be emitted , if you want to listen this event use the emitter method. * @author Alessandro Pio Ardizio * @since 0.0.1 */ @@ -19,6 +21,16 @@ class DynamicThreadPool extends FixedThreadPool { constructor (min, max, filename, opts) { super(min, filename, opts) this.max = max + this.emitter = new MyEmitter() + } + + /** + * Return an event emitter that will send some messages, for example + * a message will be sent when max number of threads is reached and all threads are busy + * in this case it will emit a message + */ + emitter () { + return this.emitter } _chooseWorker () { @@ -35,7 +47,8 @@ class DynamicThreadPool extends FixedThreadPool { return worker } else { if (this.workers.length === this.max) { - throw new Error('Max number of threads reached !!!') + this.emitter.emit('FullPool') + return randomWorker(this.tasks) } // all workers are busy create a new worker const worker = this._newWorker() diff --git a/lib/fixed.js b/lib/fixed.js index 95fe6862..63e62bad 100644 --- a/lib/fixed.js +++ b/lib/fixed.js @@ -17,7 +17,7 @@ class FixedThreadPool { * * @param {Number} numThreads Num of threads for this worker pool * @param {string} a file path with implementation of @see ThreadWorker class - * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler + * @param {Object} an object with possible options for example errorHandler, onlineHandler. */ constructor (numThreads, filename, opts) { if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') @@ -84,8 +84,7 @@ class FixedThreadPool { const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV }) worker.on('error', this.opts.errorHandler || empty) worker.on('online', this.opts.onlineHandler || empty) - // TODO remove the workers array , use only the map data structure - // handle properly when a thread exit + // TODO handle properly when a thread exit worker.on('exit', this.opts.exitHandler || empty) this.workers.push(worker) const { port1, port2 } = new MessageChannel() @@ -94,7 +93,7 @@ class FixedThreadPool { worker.port2 = port2 // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size - worker.port2.setMaxListeners(this.opts.maxTasks) + worker.port2.setMaxListeners(this.opts.maxTasks || 1000) // init tasks map this.tasks.set(worker, 0) return worker diff --git a/lib/util.js b/lib/util.js index 70a50744..bac53ec4 100644 --- a/lib/util.js +++ b/lib/util.js @@ -11,3 +11,8 @@ const uuid = require('uuid/v4') module.exports.generateID = () => { return uuid() } + +module.exports.randomWorker = (collection) => { + const keys = Array.from(collection.keys()) + return keys[Math.floor(Math.random() * keys.length)] +} diff --git a/proof.js b/staticExample.js similarity index 58% rename from proof.js rename to staticExample.js index d6af4033..074f37ca 100644 --- a/proof.js +++ b/staticExample.js @@ -1,11 +1,11 @@ const FixedThreadPool = require('./lib/fixed') -const DynamicThreadPool = require('./lib/dynamic') let resolved = 0 -// const pool = new FixedThreadPool(15, './yourWorker.js') -const pool = new DynamicThreadPool(300, 1020, './yourWorker.js') +const pool = new FixedThreadPool(15, + './yourWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) const start = Date.now() -const iterations = 300 +const iterations = 1000 for (let i = 0; i <= iterations; i++) { pool.execute({}).then(res => { resolved++ -- 2.34.1