X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=lib%2Ffixed.js;h=8d899d57bb4ed00a54e4d1a1f2093641d8bd3f77;hb=95fd8cf454e42bd20d42194487e7320c741ce2dd;hp=442d58e459c7d0ce75058fa4a0518db9a66d34ee;hpb=a32e02baa991ae01b5d677e3fd34821965daab1e;p=poolifier.git
diff --git a/lib/fixed.js b/lib/fixed.js
index 442d58e4..8d899d57 100644
--- a/lib/fixed.js
+++ b/lib/fixed.js
@@ -2,9 +2,9 @@
const {
Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
-const path = require('path')
-const { generateID } = require('./util')
+function empty () {}
+const _void = {}
/**
* A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This pool will select the worker thread in a round robin fashion.
@@ -15,16 +15,18 @@ class FixedThreadPool {
/**
*
* @param {Number} numThreads Num of threads for this worker pool
- * @param {Object} an object with possible options for example maxConcurrency
+ * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
+ * @param {Object} an object with possible options for example errorHandler, onlineHandler.
*/
- constructor (numThreads, filename, opts) {
+ constructor (numThreads, filePath, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
- if (!filename) throw new Error('Please specify a file with a worker implementation')
+ if (!filePath) throw new Error('Please specify a file with a worker implementation')
this.numThreads = numThreads
this.workers = []
this.nextWorker = 0
this.opts = opts || { maxTasks: 1000 }
- this.filename = filename
+ this.filePath = filePath
+ this._id = 0
// threadId as key and an integer value
this.tasks = new Map()
for (let i = 1; i <= numThreads; i++) {
@@ -32,6 +34,12 @@ class FixedThreadPool {
}
}
+ destroy () {
+ for (const worker of this.workers) {
+ worker.terminate()
+ }
+ }
+
/**
* Execute the task specified into the constructor with the data parameter.
* @param {Any} the input for the task specified
@@ -41,12 +49,9 @@ class FixedThreadPool {
// configure worker to handle message with the specified task
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
- // console.log('Num of pending tasks ', this.tasks.get(worker))
- const id = generateID()
- data._id = id
- // console.log('Worker choosed is ' + worker.threadId)
+ const id = ++this._id
const res = this._execute(worker, id)
- worker.postMessage(data)
+ worker.postMessage({ data: data || _void, _id: id })
return res
}
@@ -55,9 +60,9 @@ class FixedThreadPool {
const listener = (message) => {
if (message._id === id) {
worker.port2.removeListener('message', listener)
- // console.log(worker.port2.listenerCount('message'))
this.tasks.set(worker, this.tasks.get(worker) - 1)
- resolve(message.data)
+ if (message.error) reject(message.error)
+ else resolve(message.data)
}
}
worker.port2.on('message', listener)
@@ -75,9 +80,11 @@ class FixedThreadPool {
}
_newWorker () {
- const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
- worker.on('error', (e) => console.error(e))
- worker.on('exit', () => console.log('EXITING'))
+ const worker = new Worker(this.filePath, { env: SHARE_ENV })
+ worker.on('error', this.opts.errorHandler || empty)
+ worker.on('online', this.opts.onlineHandler || empty)
+ // TODO handle properly when a thread exit
+ worker.on('exit', this.opts.exitHandler || empty)
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.postMessage({ parent: port1 }, [port1])
@@ -85,7 +92,7 @@ class FixedThreadPool {
worker.port2 = port2
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
- worker.port2.setMaxListeners(this.opts.maxTasks)
+ worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
// init tasks map
this.tasks.set(worker, 0)
return worker