--- /dev/null
+const DynamicThreadPool = require('./lib/dynamic')
+let resolved = 0
+let maxReached = 0
+const pool = new DynamicThreadPool(100, 200, './yourWorker.js', { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+pool.emitter.on('FullPool', () => maxReached++)
+
+const start = Date.now()
+const iterations = 1000
+for (let i = 0; i <= iterations; i++) {
+ pool.execute({}).then(res => {
+ resolved++
+ if (resolved === iterations) {
+ console.log('Time take is ' + (Date.now() - start))
+ console.log('The pool was full for ' + maxReached + ' times')
+ }
+ })
+}
'use strict'
const FixedThreadPool = require('./fixed')
+const { randomWorker } = require('./util')
+const EventEmitter = require('events')
+class MyEmitter extends EventEmitter {}
/**
* A thread pool with a min/max number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
* This thread pool will create new workers when the other ones are busy, until the max number of threads,
- * when the max number of threads is reached, an exception will be thrown.
- * This pool will select the worker thread in a round robin fashion. <br>
+ * when the max number of threads is reached, an event will be emitted , if you want to listen this event use the emitter method.
* @author Alessandro Pio Ardizio
* @since 0.0.1
*/
constructor (min, max, filename, opts) {
super(min, filename, opts)
this.max = max
+ this.emitter = new MyEmitter()
+ }
+
+ /**
+ * Return an event emitter that will send some messages, for example
+ * a message will be sent when max number of threads is reached and all threads are busy
+ * in this case it will emit a message
+ */
+ emitter () {
+ return this.emitter
}
_chooseWorker () {
return worker
} else {
if (this.workers.length === this.max) {
- throw new Error('Max number of threads reached !!!')
+ this.emitter.emit('FullPool')
+ return randomWorker(this.tasks)
}
// all workers are busy create a new worker
const worker = this._newWorker()
*
* @param {Number} numThreads Num of threads for this worker pool
* @param {string} a file path with implementation of @see ThreadWorker class
- * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
+ * @param {Object} an object with possible options for example errorHandler, onlineHandler.
*/
constructor (numThreads, filename, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
worker.on('error', this.opts.errorHandler || empty)
worker.on('online', this.opts.onlineHandler || empty)
- // TODO remove the workers array , use only the map data structure
- // handle properly when a thread exit
+ // TODO handle properly when a thread exit
worker.on('exit', this.opts.exitHandler || empty)
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.port2 = port2
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
- worker.port2.setMaxListeners(this.opts.maxTasks)
+ worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
// init tasks map
this.tasks.set(worker, 0)
return worker
module.exports.generateID = () => {
return uuid()
}
+
+module.exports.randomWorker = (collection) => {
+ const keys = Array.from(collection.keys())
+ return keys[Math.floor(Math.random() * keys.length)]
+}
const FixedThreadPool = require('./lib/fixed')
-const DynamicThreadPool = require('./lib/dynamic')
let resolved = 0
-// const pool = new FixedThreadPool(15, './yourWorker.js')
-const pool = new DynamicThreadPool(300, 1020, './yourWorker.js')
+const pool = new FixedThreadPool(15,
+ './yourWorker.js',
+ { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
const start = Date.now()
-const iterations = 300
+const iterations = 1000
for (let i = 0; i <= iterations; i++) {
pool.execute({}).then(res => {
resolved++