const {
Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
-const path = require('path')
-const { generateID } = require('./util')
function empty () {}
+const _void = {}
/**
* A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
* This pool will select the worker thread in a round robin fashion. <br>
/**
*
* @param {Number} numThreads Num of threads for this worker pool
- * @param {string} a file path with implementation of @see ThreadWorker class
- * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler
+ * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
+ * @param {Object} an object with possible options for example errorHandler, onlineHandler.
*/
- constructor (numThreads, filename, opts) {
+ constructor (numThreads, filePath, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
- if (!filename) throw new Error('Please specify a file with a worker implementation')
+ if (!filePath) throw new Error('Please specify a file with a worker implementation')
this.numThreads = numThreads
this.workers = []
this.nextWorker = 0
this.opts = opts || { maxTasks: 1000 }
- this.filename = filename
+ this.filePath = filePath
+ this._id = 0
// threadId as key and an integer value
this.tasks = new Map()
for (let i = 1; i <= numThreads; i++) {
}
}
- destroy () {
+ async destroy () {
for (const worker of this.workers) {
- worker.terminate()
+ await worker.terminate()
}
- this.emitDestroy()
}
/**
// configure worker to handle message with the specified task
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
- const id = generateID()
- data._id = id
+ const id = ++this._id
const res = this._execute(worker, id)
- worker.postMessage(data)
+ worker.postMessage({ data: data || _void, _id: id })
return res
}
if (message._id === id) {
worker.port2.removeListener('message', listener)
this.tasks.set(worker, this.tasks.get(worker) - 1)
- resolve(message.data)
+ if (message.error) reject(message.error)
+ else resolve(message.data)
}
}
worker.port2.on('message', listener)
}
_newWorker () {
- const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+ const worker = new Worker(this.filePath, { env: SHARE_ENV })
worker.on('error', this.opts.errorHandler || empty)
worker.on('online', this.opts.onlineHandler || empty)
- // TODO remove the workers array , use only the map data structure
- // handle properly when a thread exit
+ // TODO handle properly when a thread exit
worker.on('exit', this.opts.exitHandler || empty)
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.port2 = port2
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
- worker.port2.setMaxListeners(this.opts.maxTasks)
+ worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
// init tasks map
this.tasks.set(worker, 0)
return worker