From 506c2a140687bf8b1a1313a8d7aeb9e3f0f142a7 Mon Sep 17 00:00:00 2001 From: aardizio Date: Sun, 19 Jan 2020 21:40:18 +0100 Subject: [PATCH] Improvements and unit tests --- lib/dynamic.js | 9 ------- lib/fixed.js | 15 +++++------ lib/workers.js | 13 ++++----- package.json | 1 + tests/dynamic.test.js | 61 +++++++++++++++++++++++++++++++++++++++++++ tests/fixed.test.js | 52 ++++++++++++++++++++++++++++++++++++ tests/testWorker.js | 19 ++++++++++++++ tests/util.test.js | 19 +++++++++++--- yourWorker.js | 2 +- 9 files changed, 163 insertions(+), 28 deletions(-) create mode 100644 tests/dynamic.test.js create mode 100644 tests/fixed.test.js create mode 100644 tests/testWorker.js diff --git a/lib/dynamic.js b/lib/dynamic.js index cedb8c35..e5a9c690 100644 --- a/lib/dynamic.js +++ b/lib/dynamic.js @@ -24,15 +24,6 @@ class DynamicThreadPool extends FixedThreadPool { this.emitter = new MyEmitter() } - /** - * Return an event emitter that will send some messages, for example - * a message will be sent when max number of threads is reached and all threads are busy - * in this case it will emit a message - */ - emitter () { - return this.emitter - } - _chooseWorker () { let worker for (const entry of this.tasks) { diff --git a/lib/fixed.js b/lib/fixed.js index 63e62bad..833ae7b6 100644 --- a/lib/fixed.js +++ b/lib/fixed.js @@ -2,7 +2,6 @@ const { Worker, isMainThread, MessageChannel, SHARE_ENV } = require('worker_threads') -const path = require('path') const { generateID } = require('./util') function empty () {} @@ -16,17 +15,17 @@ class FixedThreadPool { /** * * @param {Number} numThreads Num of threads for this worker pool - * @param {string} a file path with implementation of @see ThreadWorker class + * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine * @param {Object} an object with possible options for example errorHandler, onlineHandler. */ - constructor (numThreads, filename, opts) { + constructor (numThreads, filePath, opts) { if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') - if (!filename) throw new Error('Please specify a file with a worker implementation') + if (!filePath) throw new Error('Please specify a file with a worker implementation') this.numThreads = numThreads this.workers = [] this.nextWorker = 0 this.opts = opts || { maxTasks: 1000 } - this.filename = filename + this.filePath = filePath // threadId as key and an integer value this.tasks = new Map() for (let i = 1; i <= numThreads; i++) { @@ -38,7 +37,6 @@ class FixedThreadPool { for (const worker of this.workers) { worker.terminate() } - this.emitDestroy() } /** @@ -51,9 +49,8 @@ class FixedThreadPool { const worker = this._chooseWorker() this.tasks.set(worker, this.tasks.get(worker) + 1) const id = generateID() - data._id = id const res = this._execute(worker, id) - worker.postMessage(data) + worker.postMessage({ data: data, _id: id }) return res } @@ -81,7 +78,7 @@ class FixedThreadPool { } _newWorker () { - const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV }) + const worker = new Worker(this.filePath, { env: SHARE_ENV }) worker.on('error', this.opts.errorHandler || empty) worker.on('online', this.opts.onlineHandler || empty) // TODO handle properly when a thread exit diff --git a/lib/workers.js b/lib/workers.js index 8641ed9e..ee03778d 100644 --- a/lib/workers.js +++ b/lib/workers.js @@ -3,7 +3,6 @@ const { isMainThread, parentPort } = require('worker_threads') const { AsyncResource } = require('async_hooks') -const maxInactiveTime = 1000 * 60 /** * An example worker that will be always alive, you just need to extend this class if you want a static pool.
@@ -13,20 +12,22 @@ const maxInactiveTime = 1000 * 60 * @since 0.0.1 */ class ThreadWorker extends AsyncResource { - constructor (fn) { + constructor (fn, opts) { super('worker-thread-pool:pioardi') + this.opts = opts || {} + this.maxInactiveTime = this.opts.maxInactiveTime || (1000 * 60) this.lastTask = Date.now() if (!fn) throw new Error('Fn parameter is mandatory') // keep the worker active if (!isMainThread) { - this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime) + this.interval = setInterval(this._checkAlive.bind(this), this.maxInactiveTime / 2) this._checkAlive.bind(this)() } parentPort.on('message', (value) => { - if (value && value._id) { + if (value && value.data && value._id) { // here you will receive messages // console.log('This is the main thread ' + isMainThread) - const res = this.runInAsyncScope(fn, null, value) + const res = this.runInAsyncScope(fn, null, value.data) this.parent.postMessage({ data: res, _id: value._id }) this.lastTask = Date.now() } else if (value.parent) { @@ -41,7 +42,7 @@ class ThreadWorker extends AsyncResource { } _checkAlive () { - if ((Date.now() - this.lastTask) > maxInactiveTime) { + if ((Date.now() - this.lastTask) > this.maxInactiveTime) { this.parent.postMessage({ kill: 1 }) } } diff --git a/package.json b/package.json index 0b3d306c..4bbd2b8f 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "scripts": { "build": "npm install", "test": "standard && nyc mocha --exit --timeout 10000 **/*test.js ", + "demontest": "nodemon --exec \"npm test\"", "coverage": "nyc report --reporter=text-lcov --timeout 5000 **/*test.js | coveralls", "standard-verbose": "npx standard --verbose", "lint": "standard --fix" diff --git a/tests/dynamic.test.js b/tests/dynamic.test.js new file mode 100644 index 00000000..7b0865bb --- /dev/null +++ b/tests/dynamic.test.js @@ -0,0 +1,61 @@ +const expect = require('expect') +const DynamicThreadPool = require('../lib/dynamic') +const min = 1 +const max = 10 +const pool = new DynamicThreadPool(min, max, + './tests/testWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) + +describe('Dynamic thread pool test suite ', () => { + it('Verify that the function is executed in a worker thread', async () => { + const result = await pool.execute({ test: 'test' }) + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Verify that new workers are created when required, max size is not exceeded and that after a while new workers will die', async () => { + const promises = [] + let closedThreads = 0 + for (let i = 0; i < (max * 3); i++) { + promises.push(pool.execute({ test: 'test' })) + } + expect(pool.workers.length).toBe(max) + pool.workers.forEach(w => { + w.on('exit', () => { + closedThreads++ + }) + }) + await new Promise(resolve => setTimeout(resolve, 1000 * 2)) + expect(closedThreads).toBe(max - min) + }) + + it('Shutdown test', async () => { + let closedThreads = 0 + pool.workers.forEach(w => { + w.on('exit', () => { + closedThreads++ + }) + }) + pool.destroy() + await new Promise(resolve => setTimeout(resolve, 1000)) + expect(closedThreads).toBe(min) + }) + + it('Validations test', () => { + let error + try { + const pool1 = new DynamicThreadPool() + console.log(pool1) + } catch (e) { + error = e + } + expect(error).toBeTruthy() + expect(error.message).toBeTruthy() + }) + + it('Should work even without opts in input', async () => { + const pool1 = new DynamicThreadPool(1, 1, './tests/testWorker.js') + const res = await pool1.execute({ test: 'test' }) + expect(res).toBeFalsy() + }) +}) diff --git a/tests/fixed.test.js b/tests/fixed.test.js new file mode 100644 index 00000000..78e960bf --- /dev/null +++ b/tests/fixed.test.js @@ -0,0 +1,52 @@ +const expect = require('expect') +const FixedThreadPool = require('../lib/fixed') +const numThreads = 10 +const pool = new FixedThreadPool(numThreads, + './tests/testWorker.js', + { errorHandler: (e) => console.error(e), onlineHandler: () => console.log('worker is online') }) + +describe('Fixed thread pool test suite ', () => { + it('Choose worker round robin test', async () => { + const results = new Set() + for (let i = 0; i < numThreads; i++) { + results.add(pool._chooseWorker().threadId) + } + expect(results.size).toBe(numThreads) + }) + + it('Verify that the function is executed in a worker thread', async () => { + const result = await pool.execute({ test: 'test' }) + expect(result).toBeDefined() + expect(result).toBeFalsy() + }) + + it('Shutdown test', async () => { + let closedThreads = 0 + pool.workers.forEach(w => { + w.on('exit', () => { + closedThreads++ + }) + }) + pool.destroy() + await new Promise(resolve => setTimeout(resolve, 1000)) + expect(closedThreads).toBe(numThreads) + }) + + it('Validations test', () => { + let error + try { + const pool1 = new FixedThreadPool() + console.log(pool1) + } catch (e) { + error = e + } + expect(error).toBeTruthy() + expect(error.message).toBeTruthy() + }) + + it('Should work even without opts in input', async () => { + const pool1 = new FixedThreadPool(1, './tests/testWorker.js') + const res = await pool1.execute({ test: 'test' }) + expect(res).toBeFalsy() + }) +}) diff --git a/tests/testWorker.js b/tests/testWorker.js new file mode 100644 index 00000000..d6923045 --- /dev/null +++ b/tests/testWorker.js @@ -0,0 +1,19 @@ +'use strict' +const { ThreadWorker } = require('../lib/workers') +const { isMainThread } = require('worker_threads') + +class MyWorker extends ThreadWorker { + constructor () { + super((data) => { + for (let i = 0; i <= 100; i++) { + const o = { + a: i + } + JSON.stringify(o) + } + return isMainThread + }, { maxInactiveTime: 1000 }) + } +} + +module.exports = new MyWorker() diff --git a/tests/util.test.js b/tests/util.test.js index 0d6c99f1..0cba02ce 100644 --- a/tests/util.test.js +++ b/tests/util.test.js @@ -1,7 +1,20 @@ const expect = require('expect') +const { generateID, randomWorker } = require('../lib/util') -describe('Generate uuid', () => { - it('Just a proof for CI', () => { - expect(true).toBeTruthy() +describe('Utility Tests ', () => { + it('Generate an id', () => { + const res = generateID() + expect(res).toBeTruthy() + expect(typeof res).toBe('string') + }) + + it('Choose a random worker', () => { + const input = new Map() + input.set(1, 1) + input.set(2, 2) + input.set(3, 3) + const worker = randomWorker(input) + expect(worker).toBeTruthy() + expect(Array.from(input.keys()).includes(worker)).toBeTruthy() }) }) diff --git a/yourWorker.js b/yourWorker.js index c3a7b0c4..34a85dbb 100644 --- a/yourWorker.js +++ b/yourWorker.js @@ -11,7 +11,7 @@ class MyWorker extends ThreadWorker { JSON.stringify(o) } // console.log('This is the main thread ' + isMainThread) - return data + return { ok: 1 } }) } } -- 2.34.1