X-Git-Url: https://git.piment-noir.org/?a=blobdiff_plain;f=lib%2Ffixed.js;h=8d899d57bb4ed00a54e4d1a1f2093641d8bd3f77;hb=95fd8cf454e42bd20d42194487e7320c741ce2dd;hp=442d58e459c7d0ce75058fa4a0518db9a66d34ee;hpb=a32e02baa991ae01b5d677e3fd34821965daab1e;p=poolifier.git diff --git a/lib/fixed.js b/lib/fixed.js index 442d58e4..8d899d57 100644 --- a/lib/fixed.js +++ b/lib/fixed.js @@ -2,9 +2,9 @@ const { Worker, isMainThread, MessageChannel, SHARE_ENV } = require('worker_threads') -const path = require('path') -const { generateID } = require('./util') +function empty () {} +const _void = {} /** * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer.
* This pool will select the worker thread in a round robin fashion.
@@ -15,16 +15,18 @@ class FixedThreadPool { /** * * @param {Number} numThreads Num of threads for this worker pool - * @param {Object} an object with possible options for example maxConcurrency + * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine + * @param {Object} an object with possible options for example errorHandler, onlineHandler. */ - constructor (numThreads, filename, opts) { + constructor (numThreads, filePath, opts) { if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') - if (!filename) throw new Error('Please specify a file with a worker implementation') + if (!filePath) throw new Error('Please specify a file with a worker implementation') this.numThreads = numThreads this.workers = [] this.nextWorker = 0 this.opts = opts || { maxTasks: 1000 } - this.filename = filename + this.filePath = filePath + this._id = 0 // threadId as key and an integer value this.tasks = new Map() for (let i = 1; i <= numThreads; i++) { @@ -32,6 +34,12 @@ class FixedThreadPool { } } + destroy () { + for (const worker of this.workers) { + worker.terminate() + } + } + /** * Execute the task specified into the constructor with the data parameter. * @param {Any} the input for the task specified @@ -41,12 +49,9 @@ class FixedThreadPool { // configure worker to handle message with the specified task const worker = this._chooseWorker() this.tasks.set(worker, this.tasks.get(worker) + 1) - // console.log('Num of pending tasks ', this.tasks.get(worker)) - const id = generateID() - data._id = id - // console.log('Worker choosed is ' + worker.threadId) + const id = ++this._id const res = this._execute(worker, id) - worker.postMessage(data) + worker.postMessage({ data: data || _void, _id: id }) return res } @@ -55,9 +60,9 @@ class FixedThreadPool { const listener = (message) => { if (message._id === id) { worker.port2.removeListener('message', listener) - // console.log(worker.port2.listenerCount('message')) this.tasks.set(worker, this.tasks.get(worker) - 1) - resolve(message.data) + if (message.error) reject(message.error) + else resolve(message.data) } } worker.port2.on('message', listener) @@ -75,9 +80,11 @@ class FixedThreadPool { } _newWorker () { - const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV }) - worker.on('error', (e) => console.error(e)) - worker.on('exit', () => console.log('EXITING')) + const worker = new Worker(this.filePath, { env: SHARE_ENV }) + worker.on('error', this.opts.errorHandler || empty) + worker.on('online', this.opts.onlineHandler || empty) + // TODO handle properly when a thread exit + worker.on('exit', this.opts.exitHandler || empty) this.workers.push(worker) const { port1, port2 } = new MessageChannel() worker.postMessage({ parent: port1 }, [port1]) @@ -85,7 +92,7 @@ class FixedThreadPool { worker.port2 = port2 // we will attach a listener for every task, // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size - worker.port2.setMaxListeners(this.opts.maxTasks) + worker.port2.setMaxListeners(this.opts.maxTasks || 1000) // init tasks map this.tasks.set(worker, 0) return worker