this.emitter = new MyEmitter()
}
- /**
- * Return an event emitter that will send some messages, for example
- * a message will be sent when max number of threads is reached and all threads are busy
- * in this case it will emit a message
- */
- emitter () {
- return this.emitter
- }
-
_chooseWorker () {
let worker
for (const entry of this.tasks) {
const {
Worker, isMainThread, MessageChannel, SHARE_ENV
} = require('worker_threads')
-const path = require('path')
const { generateID } = require('./util')
function empty () {}
/**
*
* @param {Number} numThreads Num of threads for this worker pool
- * @param {string} a file path with implementation of @see ThreadWorker class
+ * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
* @param {Object} an object with possible options for example errorHandler, onlineHandler.
*/
- constructor (numThreads, filename, opts) {
+ constructor (numThreads, filePath, opts) {
if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
- if (!filename) throw new Error('Please specify a file with a worker implementation')
+ if (!filePath) throw new Error('Please specify a file with a worker implementation')
this.numThreads = numThreads
this.workers = []
this.nextWorker = 0
this.opts = opts || { maxTasks: 1000 }
- this.filename = filename
+ this.filePath = filePath
// threadId as key and an integer value
this.tasks = new Map()
for (let i = 1; i <= numThreads; i++) {
for (const worker of this.workers) {
worker.terminate()
}
- this.emitDestroy()
}
/**
const worker = this._chooseWorker()
this.tasks.set(worker, this.tasks.get(worker) + 1)
const id = generateID()
- data._id = id
const res = this._execute(worker, id)
- worker.postMessage(data)
+ worker.postMessage({ data: data, _id: id })
return res
}
}
_newWorker () {
- const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+ const worker = new Worker(this.filePath, { env: SHARE_ENV })
worker.on('error', this.opts.errorHandler || empty)
worker.on('online', this.opts.onlineHandler || empty)
// TODO handle properly when a thread exit
isMainThread, parentPort
} = require('worker_threads')
const { AsyncResource } = require('async_hooks')
-const maxInactiveTime = 1000 * 60
/**
* An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
* @since 0.0.1
*/
class ThreadWorker extends AsyncResource {
- constructor (fn) {
+ constructor (fn, opts) {
super('worker-thread-pool:pioardi')
+ this.opts = opts || {}
+ this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60)
this.lastTask = Date.now()
if (!fn) throw new Error('Fn parameter is mandatory')
// keep the worker active
if (!isMainThread) {
- this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime)
+ this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2)
this._checkAlive.bind(this)()
}
parentPort.on('message', (value) => {
- if (value && value._id) {
+ if (value && value.data && value._id) {
// here you will receive messages
// console.log('This is the main thread ' + isMainThread)
- const res = this.runInAsyncScope(fn, null, value)
+ const res = this.runInAsyncScope(fn, null, value.data)
this.parent.postMessage({ data: res, _id: value._id })
this.lastTask = Date.now()
} else if (value.parent) {
}
_checkAlive () {
- if ((Date.now() - this.lastTask) > maxInactiveTime) {
+ if ((Date.now() - this.lastTask) > this.maxInactiveTime) {
this.parent.postMessage({ kill: 1 })
}
}
"scripts": {
"build": "npm install",
"test": "standard && nyc mocha --exit --timeout 10000 **/*test.js ",
+ "demontest": "nodemon --exec \"npm test\"",
"coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls",
"standard-verbose": "npx standard --verbose",
"lint": "standard --fix"
--- /dev/null
+const expect = require('expect')
+const DynamicThreadPool = require('../lib/dynamic')
+const min = 1
+const max = 10
+const pool = new DynamicThreadPool(min, max,
+ './tests/testWorker.js',
+ { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
+describe('Dynamic thread pool test suite ', () => {
+ it('Verify that the function is executed in a worker thread', async () => {
+ const result = await pool.execute({ test: 'test' })
+ expect(result).toBeDefined()
+ expect(result).toBeFalsy()
+ })
+
+ it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => {
+ const promises = []
+ let closedThreads = 0
+ for (let i = 0; i < (max * 3); i++) {
+ promises.push(pool.execute({ test: 'test' }))
+ }
+ expect(pool.workers.length).toBe(max)
+ pool.workers.forEach(w => {
+ w.on('exit', () => {
+ closedThreads++
+ })
+ })
+ await new Promise(resolve => setTimeout(resolve, 1000 * 2))
+ expect(closedThreads).toBe(max - min)
+ })
+
+ it('Shutdown test', async () => {
+ let closedThreads = 0
+ pool.workers.forEach(w => {
+ w.on('exit', () => {
+ closedThreads++
+ })
+ })
+ pool.destroy()
+ await new Promise(resolve => setTimeout(resolve, 1000))
+ expect(closedThreads).toBe(min)
+ })
+
+ it('Validations test', () => {
+ let error
+ try {
+ const pool1 = new DynamicThreadPool()
+ console.log(pool1)
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeTruthy()
+ expect(error.message).toBeTruthy()
+ })
+
+ it('Should work even without opts in input', async () => {
+ const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js')
+ const res = await pool1.execute({ test: 'test' })
+ expect(res).toBeFalsy()
+ })
+})
--- /dev/null
+const expect = require('expect')
+const FixedThreadPool = require('../lib/fixed')
+const numThreads = 10
+const pool = new FixedThreadPool(numThreads,
+ './tests/testWorker.js',
+ { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') })
+
+describe('Fixed thread pool test suite ', () => {
+ it('Choose worker round robin test', async () => {
+ const results = new Set()
+ for (let i = 0; i < numThreads; i++) {
+ results.add(pool._chooseWorker().threadId)
+ }
+ expect(results.size).toBe(numThreads)
+ })
+
+ it('Verify that the function is executed in a worker thread', async () => {
+ const result = await pool.execute({ test: 'test' })
+ expect(result).toBeDefined()
+ expect(result).toBeFalsy()
+ })
+
+ it('Shutdown test', async () => {
+ let closedThreads = 0
+ pool.workers.forEach(w => {
+ w.on('exit', () => {
+ closedThreads++
+ })
+ })
+ pool.destroy()
+ await new Promise(resolve => setTimeout(resolve, 1000))
+ expect(closedThreads).toBe(numThreads)
+ })
+
+ it('Validations test', () => {
+ let error
+ try {
+ const pool1 = new FixedThreadPool()
+ console.log(pool1)
+ } catch (e) {
+ error = e
+ }
+ expect(error).toBeTruthy()
+ expect(error.message).toBeTruthy()
+ })
+
+ it('Should work even without opts in input', async () => {
+ const pool1 = new FixedThreadPool(1, './tests/testWorker.js')
+ const res = await pool1.execute({ test: 'test' })
+ expect(res).toBeFalsy()
+ })
+})
--- /dev/null
+'use strict'
+const { ThreadWorker } = require('../lib/workers')
+const { isMainThread } = require('worker_threads')
+
+class MyWorker extends ThreadWorker {
+ constructor () {
+ super((data) => {
+ for (let i = 0; i <= 100; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+ return isMainThread
+ }, { maxInactiveTime: 1000 })
+ }
+}
+
+module.exports = new MyWorker()
const expect = require('expect')
+const { generateID, randomWorker } = require('../lib/util')
-describe('Generate uuid', () => {
- it('Just a proof for CI', () => {
- expect(true).toBeTruthy()
+describe('Utility Tests ', () => {
+ it('Generate an id', () => {
+ const res = generateID()
+ expect(res).toBeTruthy()
+ expect(typeof res).toBe('string')
+ })
+
+ it('Choose a random worker', () => {
+ const input = new Map()
+ input.set(1, 1)
+ input.set(2, 2)
+ input.set(3, 3)
+ const worker = randomWorker(input)
+ expect(worker).toBeTruthy()
+ expect(Array.from(input.keys()).includes(worker)).toBeTruthy()
})
})
JSON.stringify(o)
}
// console.log('This is the main thread ' + isMainThread)
- return data
+ return { ok: 1 }
})
}
}