X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=lib%2Ffixed.js;h=676947725bed0bb65fe72fe949be8652b08f39c5;hb=0dcd2a0a01a00673fc709fe7b9b89718398fbcd9;hp=63e62bade18bf12d7779186516e30445e5e1eaaa;hpb=bf962cbad61db1b86a0cc0e9ae9fff1fda052a8e;p=poolifier.git
diff --git a/lib/fixed.js b/lib/fixed.js
index 63e62bad..67694772 100644
--- a/lib/fixed.js
+++ b/lib/fixed.js
@@ -2,10 +2,9 @@
const {
Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
-const path = require('path')
-const { generateID } = require('./util')
function empty () {}
+const _void = {}
/**
* A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This pool will select the worker thread in a round robin fashion.
@@ -16,17 +15,18 @@ class FixedThreadPool {
/**
*
* @param {Number} numThreads Num of threads for this worker pool
- * @param {string} a file path with implementation of @see ThreadWorker class
+ * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
* @param {Object} an object with possible options for example errorHandler, onlineHandler.
*/
- constructor (numThreads, filename, opts) {
+ constructor (numThreads, filePath, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
- if (!filename) throw new Error('Please specify a file with a worker implementation')
+ if (!filePath) throw new Error('Please specify a file with a worker implementation')
this.numThreads = numThreads
this.workers = []
this.nextWorker = 0
this.opts = opts || { maxTasks: 1000 }
- this.filename = filename
+ this.filePath = filePath
+ this._id = 0
// threadId as key and an integer value
this.tasks = new Map()
for (let i = 1; i <= numThreads; i++) {
@@ -34,11 +34,10 @@ class FixedThreadPool {
}
}
- destroy () {
+ async destroy () {
for (const worker of this.workers) {
- worker.terminate()
+ await worker.terminate()
}
- this.emitDestroy()
}
/**
@@ -50,10 +49,9 @@ class FixedThreadPool {
// configure worker to handle message with the specified task
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
- const id = generateID()
- data._id = id
+ const id = ++this._id
const res = this._execute(worker, id)
- worker.postMessage(data)
+ worker.postMessage({ data: data || _void, _id: id })
return res
}
@@ -63,7 +61,8 @@ class FixedThreadPool {
if (message._id === id) {
worker.port2.removeListener('message', listener)
this.tasks.set(worker, this.tasks.get(worker) - 1)
- resolve(message.data)
+ if (message.error) reject(message.error)
+ else resolve(message.data)
}
}
worker.port2.on('message', listener)
@@ -81,7 +80,7 @@ class FixedThreadPool {
}
_newWorker () {
- const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+ const worker = new Worker(this.filePath, { env: SHARE_ENV })
worker.on('error', this.opts.errorHandler || empty)
worker.on('online', this.opts.onlineHandler || empty)
// TODO handle properly when a thread exit