if (this.workers.length === this.max) {
throw new Error('Max number of threads reached !!!')
}
- // console.log('new thread is coming')
// all workers are busy create a new worker
const worker = this._newWorker()
worker.port2.on('message', (message) => {
const path = require('path')
const { generateID } = require('./util')
+function empty () {}
/**
* A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
* This pool will select the worker thread in a round robin fashion. <br>
/**
*
* @param {Number} numThreads Num of threads for this worker pool
- * @param {Object} an object with possible options for example maxConcurrency
+ * @param {string} a file path with implementation of @see ThreadWorker class
+ * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
*/
constructor (numThreads, filename, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
}
}
+ destroy () {
+ for (const worker of this.workers) {
+ worker.terminate()
+ }
+ this.emitDestroy()
+ }
+
/**
* Execute the task specified into the constructor with the data parameter.
* @param {Any} the input for the task specified
// configure worker to handle message with the specified task
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
- // console.log('Num of pending tasks ', this.tasks.get(worker))
const id = generateID()
data._id = id
- // console.log('Worker choosed is ' + worker.threadId)
const res = this._execute(worker, id)
worker.postMessage(data)
return res
const listener = (message) => {
if (message._id === id) {
worker.port2.removeListener('message', listener)
- // console.log(worker.port2.listenerCount('message'))
this.tasks.set(worker, this.tasks.get(worker) - 1)
resolve(message.data)
}
_newWorker () {
const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
- worker.on('error', (e) => console.error(e))
- worker.on('exit', () => console.log('EXITING'))
+ worker.on('error', this.opts.errorHandler || empty)
+ worker.on('exit', this.opts.exitHandler || empty)
+ worker.on('online', this.opts.onlineHandler || empty)
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.postMessage({ parent: port1 }, [port1])
const {
isMainThread, parentPort
} = require('worker_threads')
+const { AsyncResource } = require('async_hooks')
const maxInactiveTime = 1000 * 60
-/**
- * An example worker that will be always alive, you just need to extend this class if you want a static pool.
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class ThreadWorker {
- constructor (fn) {
- if (!fn) throw new Error('Fn parameter is mandatory')
- // keep the worker active
- if (!isMainThread) {
- this.interval =
- setInterval(() => {
- }, 10000)
- }
- parentPort.on('message', (value) => {
- if (value.parent) {
- // save the port to communicate with the main thread
- this.parent = value.parent
- } else if (value && value._id) {
- // console.log('This is the main thread ' + isMainThread)
- this.parent.postMessage({ data: fn(value), _id: value._id })
- }
- })
- }
-}
-
/**
* An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
* When this worker is inactive for more than 1 minute, it will send this info to the main thread,<br>
* @author Alessandro Pio Ardizio
* @since 0.0.1
*/
-class DynamicWorker {
+class ThreadWorker extends AsyncResource {
constructor (fn) {
+ super('worker-thread-pool:pioardi')
this.lastTask = Date.now()
if (!fn) throw new Error('Fn parameter is mandatory')
// keep the worker active
if (value && value._id) {
// here you will receive messages
// console.log('This is the main thread ' + isMainThread)
- this.parent.postMessage({ data: fn(value), _id: value._id })
+ const res = this.runInAsyncScope(fn, null, value)
+ this.parent.postMessage({ data: res, _id: value._id })
this.lastTask = Date.now()
} else if (value.parent) {
// save the port to communicate with the main thread
}
module.exports.ThreadWorker = ThreadWorker
-module.exports.DynamicWorker = DynamicWorker
const DynamicThreadPool = require('./lib/dynamic')
let resolved = 0
// const pool = new FixedThreadPool(15, './yourWorker.js')
-const pool = new DynamicThreadPool(15, 1020, './yourWorker.js')
+const pool = new DynamicThreadPool(300, 1020, './yourWorker.js')
const start = Date.now()
-const iterations = 1000
+const iterations = 300
for (let i = 0; i <= iterations; i++) {
pool.execute({}).then(res => {
- // console.log(res)
resolved++
if (resolved === iterations) {
console.log('Time take is ' + (Date.now() - start))
'use strict'
-const { ThreadWorker, DynamicWorker } = require('./lib/workers')
+const { ThreadWorker } = require('./lib/workers')
-class MyWorker extends DynamicWorker {
+class MyWorker extends ThreadWorker {
constructor () {
super((data) => {
for (let i = 0; i <= 10000; i++) {