repositories
/
poolifier.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Update package.json
[poolifier.git]
/
lib
/
fixed.js
diff --git
a/lib/fixed.js
b/lib/fixed.js
index 5d5d40e1be1a53c8092360432f97f77a97d57e41..833ae7b6f1f048c8ff0a92e7eb42a1bdacd36b78 100644
(file)
--- a/
lib/fixed.js
+++ b/
lib/fixed.js
@@
-2,7
+2,6
@@
const {
Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
const {
Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
-const path = require('path')
const { generateID } = require('./util')
function empty () {}
const { generateID } = require('./util')
function empty () {}
@@
-16,17
+15,17
@@
class FixedThreadPool {
/**
*
* @param {Number} numThreads Num of threads for this worker pool
/**
*
* @param {Number} numThreads Num of threads for this worker pool
- * @param {string} a file path with implementation of @see ThreadWorker class
- * @param {Object} an object with possible options for example errorHandler, onlineHandler
, exitHandler
+ * @param {string} a file path with implementation of @see ThreadWorker class
, relative path is fine
+ * @param {Object} an object with possible options for example errorHandler, onlineHandler
.
*/
*/
- constructor (numThreads, file
name
, opts) {
+ constructor (numThreads, file
Path
, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
- if (!file
name
) throw new Error('Please specify a file with a worker implementation')
+ if (!file
Path
) throw new Error('Please specify a file with a worker implementation')
this.numThreads = numThreads
this.workers = []
this.nextWorker = 0
this.opts = opts || { maxTasks: 1000 }
this.numThreads = numThreads
this.workers = []
this.nextWorker = 0
this.opts = opts || { maxTasks: 1000 }
- this.file
name = filename
+ this.file
Path = filePath
// threadId as key and an integer value
this.tasks = new Map()
for (let i = 1; i <= numThreads; i++) {
// threadId as key and an integer value
this.tasks = new Map()
for (let i = 1; i <= numThreads; i++) {
@@
-38,7
+37,6
@@
class FixedThreadPool {
for (const worker of this.workers) {
worker.terminate()
}
for (const worker of this.workers) {
worker.terminate()
}
- this.emitDestroy()
}
/**
}
/**
@@
-51,9
+49,8
@@
class FixedThreadPool {
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
const id = generateID()
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
const id = generateID()
- data._id = id
const res = this._execute(worker, id)
const res = this._execute(worker, id)
- worker.postMessage(
data
)
+ worker.postMessage(
{ data: data, _id: id }
)
return res
}
return res
}
@@
-81,10
+78,11
@@
class FixedThreadPool {
}
_newWorker () {
}
_newWorker () {
- const worker = new Worker(
path.resolve(this.filename)
, { env: SHARE_ENV })
+ const worker = new Worker(
this.filePath
, { env: SHARE_ENV })
worker.on('error', this.opts.errorHandler || empty)
worker.on('error', this.opts.errorHandler || empty)
- worker.on('exit', this.opts.exitHandler || empty)
worker.on('online', this.opts.onlineHandler || empty)
worker.on('online', this.opts.onlineHandler || empty)
+ // TODO handle properly when a thread exit
+ worker.on('exit', this.opts.exitHandler || empty)
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.postMessage({ parent: port1 }, [port1])
this.workers.push(worker)
const { port1, port2 } = new MessageChannel()
worker.postMessage({ parent: port1 }, [port1])
@@
-92,7
+90,7
@@
class FixedThreadPool {
worker.port2 = port2
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
worker.port2 = port2
// we will attach a listener for every task,
// when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
- worker.port2.setMaxListeners(this.opts.maxTasks)
+ worker.port2.setMaxListeners(this.opts.maxTasks
|| 1000
)
// init tasks map
this.tasks.set(worker, 0)
return worker
// init tasks map
this.tasks.set(worker, 0)
return worker