"description": "Library on top of node js worker threads that implement various worker pools type",
"main": "lib/index.js",
"scripts": {
- "build": "rollup --config",
- "build:dev": "rollup --config --environment BUILD:development",
- "benchmark": "npm run build && node benchmarks/bench.js",
- "benchmark:debug": "npm run build:dev && node -r source-map-support/register --inspect-brk benchmarks/bench.js",
- "benchmark:debug:vscode": "node -r source-map-support/register benchmarks/bench.js",
- "test": "npm run build && nyc mocha --exit --timeout 20000 'tests/**/*.test.js'",
- "test:debug": "npm run build:dev && mocha -r source-map-support/register --inspect-brk --exit 'tests/**/*.test.js'",
- "test:debug:vscode": "mocha -r source-map-support/register --exit 'tests/**/*.test.js'",
+ "build": "rollup --config --environment BUILD:development",
+ "build:prod": "rollup --config",
+ "benchmark": "npm run build && node -r source-map-support/register benchmarks/bench.js",
+ "benchmark:debug": "npm run build && node -r source-map-support/register --inspect benchmarks/bench.js",
+ "benchmark:prod": "npm run build:prod && node -r source-map-support/register benchmarks/bench.js",
- "test": "npm run build && nyc mocha -r source-map-support/register --exit --timeout 20000 tests/**/*.test.js",
- "test:debug": "npm run build && mocha -r source-map-support/register --inspect --exit --timeout 20000 tests/**/*.test.js",
- "test:prod": "npm run build:prod && nyc mocha -r source-map-support/register --exit --timeout 20000 tests/**/*.test.js",
++ "test": "npm run build && nyc mocha -r source-map-support/register --exit --timeout 20000 'tests/**/*.test.js'",
++ "test:debug": "npm run build && mocha -r source-map-support/register --inspect --exit --timeout 20000 'tests/**/*.test.js'",
++ "test:prod": "npm run build:prod && nyc mocha -r source-map-support/register --exit --timeout 20000 'tests/**/*.test.js'",
"sonar": "sonar-scanner",
"coverage": "nyc report --reporter=lcov --check-coverage --lines 80",
"coverage:html": "nyc report --reporter=html --check-coverage --lines 80",
"lib"
],
"devDependencies": {
- "@types/node": "^14.14.27",
- "@typescript-eslint/eslint-plugin": "^4.15.0",
- "@typescript-eslint/parser": "^4.15.0",
+ "@types/node": "^14.14.28",
+ "@typescript-eslint/eslint-plugin": "^4.15.1",
+ "@typescript-eslint/parser": "^4.15.1",
"benchmark": "^2.1.4",
"eslint": "^7.20.0",
"eslint-config-standard": "^16.0.2",
"eslint-plugin-import": "^2.22.1",
- "eslint-plugin-jsdoc": "^31.6.1",
+ "eslint-plugin-jsdoc": "^32.0.1",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettierx": "^0.17.1",
"eslint-plugin-promise": "^4.3.1",
--- /dev/null
- './tests/worker/cluster/testWorker.js',
+ const expect = require('expect')
+ const { FixedThreadPool } = require('../../../lib/index')
+ const expectedError = new Error('Worker could not be found in tasks map')
+
+ class StubPoolWithTasksMapClear extends FixedThreadPool {
+ removeAllWorker () {
+ this.tasks.clear()
+ }
+ }
+
+ class StubPoolWithIsMainMethod extends FixedThreadPool {
+ isMain () {
+ return false
+ }
+ }
+
+ describe('Abstract pool test suite ', () => {
+ it('Simulate worker not found during increaseWorkersTask', () => {
+ const pool = new StubPoolWithTasksMapClear(
+ 1,
- './tests/worker/cluster/testWorker.js',
++ './tests/worker-files/cluster/testWorker.js',
+ {
+ errorHandler: e => console.error(e)
+ }
+ )
+ // simulate worker not found.
+ pool.removeAllWorker()
+ expect(() => pool.increaseWorkersTask()).toThrowError(expectedError)
+ })
+
+ it('Simulate worker not found during decreaseWorkersTasks', () => {
+ const pool = new StubPoolWithTasksMapClear(
+ 1,
- './tests/worker/cluster/testWorker.js',
++ './tests/worker-files/cluster/testWorker.js',
+ {
+ errorHandler: e => console.error(e)
+ }
+ )
+ // simulate worker not found.
+ pool.removeAllWorker()
+ expect(() => pool.decreaseWorkersTasks()).toThrowError(expectedError)
+ })
+
+ it('Simulate pool creation from a non main thread/process', () => {
+ expect(() => {
+ const pool = new StubPoolWithIsMainMethod(
+ 1,
++ './tests/worker-files/cluster/testWorker.js',
+ {
+ errorHandler: e => console.error(e)
+ }
+ )
+ }).toThrowError()
+ })
+ })
const pool = new DynamicClusterPool(
min,
max,
- './tests/worker/cluster/testWorker.js',
+ './tests/worker-files/cluster/testWorker.js',
{
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
+ errorHandler: e => console.error(e)
}
)
pool.execute({ test: 'test' })
}
expect(pool.workers.length).toBeGreaterThan(min)
- await new Promise(resolve => setTimeout(resolve, 2000))
+ await new Promise(resolve => setTimeout(resolve, 3000))
expect(pool.workers.length).toBe(min)
})
it('Shutdown test', async () => {
})
})
pool.destroy()
- await new Promise(resolve => setTimeout(resolve, 1000))
+ await new Promise(resolve => setTimeout(resolve, 2000))
expect(closedWorkers).toBe(min)
})
const pool1 = new DynamicClusterPool(
1,
1,
- './tests/worker/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.js'
)
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
+
+ it('Verify scale processes up and down is working when long running task is used:hard', async () => {
+ const longRunningPool = new DynamicClusterPool(
+ min,
+ max,
+ './tests/worker/cluster/longRunningWorkerHardBehavior.js'
+ )
+ expect(longRunningPool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ longRunningPool.execute({ test: 'test' })
+ }
+ expect(longRunningPool.workers.length).toBe(max)
+ await new Promise(resolve => setTimeout(resolve, 3000))
+ // Here we expect the workers to be at the max size since that the task is still running
+ expect(longRunningPool.workers.length).toBe(min)
+ })
+
+ it('Verify scale processes up and down is working when long running task is used:soft', async () => {
+ const longRunningPool = new DynamicClusterPool(
+ min,
+ max,
+ './tests/worker/cluster/longRunningWorkerSoftBehavior.js'
+ )
+ expect(longRunningPool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ longRunningPool.execute({ test: 'test' })
+ }
+ expect(longRunningPool.workers.length).toBe(max)
+ await new Promise(resolve => setTimeout(resolve, 3000))
+ // Here we expect the workers to be at the max size since that the task is still running
+ expect(longRunningPool.workers.length).toBe(max)
+ })
})
const maxTasks = 500
const pool = new FixedClusterPool(
numberOfWorkers,
- './tests/worker/cluster/testWorker.js',
+ './tests/worker-files/cluster/testWorker.js',
{
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
+ errorHandler: e => console.error(e)
}
)
const emptyPool = new FixedClusterPool(
1,
- './tests/worker/cluster/emptyWorker.js'
+ './tests/worker-files/cluster/emptyWorker.js'
+)
+const echoPool = new FixedClusterPool(
+ 1,
+ './tests/worker-files/cluster/echoWorker.js'
)
-const echoPool = new FixedClusterPool(1, './tests/worker/cluster/echoWorker.js')
const errorPool = new FixedClusterPool(
1,
- './tests/worker/cluster/errorWorker.js',
+ './tests/worker-files/cluster/errorWorker.js',
{
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
+ errorHandler: e => console.error(e)
}
)
const asyncErrorPool = new FixedClusterPool(
1,
- './tests/worker/cluster/asyncErrorWorker.js',
+ './tests/worker-files/cluster/asyncErrorWorker.js',
{
onlineHandler: () => console.log('worker is online')
}
)
const asyncPool = new FixedClusterPool(
1,
- './tests/worker/cluster/asyncWorker.js',
+ './tests/worker-files/cluster/asyncWorker.js',
{
maxTasks: maxTasks
}
})
})
await pool.destroy()
- await new Promise(resolve => setTimeout(resolve, 200))
+ await new Promise(resolve => setTimeout(resolve, 500))
expect(closedWorkers).toBe(numberOfWorkers)
})
it('Should work even without opts in input', async () => {
const pool1 = new FixedClusterPool(
1,
- './tests/worker/cluster/testWorker.js'
+ './tests/worker-files/cluster/testWorker.js'
)
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
const pool = new DynamicThreadPool(
min,
max,
- './tests/worker/thread/testWorker.js',
+ './tests/worker-files/thread/testWorker.js',
{
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
+ errorHandler: e => console.error(e)
}
)
pool.execute({ test: 'test' })
}
expect(pool.workers.length).toBe(max)
- await new Promise(resolve => setTimeout(resolve, 1000))
+ await new Promise(resolve => setTimeout(resolve, 1500))
expect(pool.workers.length).toBe(min)
})
+
it('Shutdown test', async () => {
let closedThreads = 0
pool.workers.forEach(w => {
const pool1 = new DynamicThreadPool(
1,
1,
- './tests/worker/thread/testWorker.js'
+ './tests/worker-files/thread/testWorker.js'
)
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
+
+ it('Verify scale thread up and down is working when long running task is used:hard', async () => {
+ const longRunningPool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker/thread/longRunningWorkerHardBehavior.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+ )
+ expect(longRunningPool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ longRunningPool.execute({ test: 'test' })
+ }
+ expect(longRunningPool.workers.length).toBe(max)
+ await new Promise(resolve => setTimeout(resolve, 1500))
+ expect(longRunningPool.workers.length).toBe(min)
+ })
+
+ it('Verify scale thread up and down is working when long running task is used:soft', async () => {
+ const longRunningPool = new DynamicThreadPool(
+ min,
+ max,
+ './tests/worker/thread/longRunningWorkerSoftBehavior.js',
+ {
+ errorHandler: e => console.error(e),
+ onlineHandler: () => console.log('worker is online')
+ }
+ )
+ expect(longRunningPool.workers.length).toBe(min)
+ for (let i = 0; i < max * 10; i++) {
+ longRunningPool.execute({ test: 'test' })
+ }
+ expect(longRunningPool.workers.length).toBe(max)
+ await new Promise(resolve => setTimeout(resolve, 1500))
+ // Here we expect the workers to be at the max size since that the task is still running
+ expect(longRunningPool.workers.length).toBe(max)
+ })
})
const maxTasks = 400
const pool = new FixedThreadPool(
numberOfThreads,
- './tests/worker/thread/testWorker.js',
+ './tests/worker-files/thread/testWorker.js',
{
- errorHandler: e => console.error(e),
- onlineHandler: () => console.log('worker is online')
+ errorHandler: e => console.error(e)
}
)
-const emptyPool = new FixedThreadPool(1, './tests/worker/thread/emptyWorker.js')
-const echoPool = new FixedThreadPool(1, './tests/worker/thread/echoWorker.js')
+const emptyPool = new FixedThreadPool(
+ 1,
+ './tests/worker-files/thread/emptyWorker.js'
+)
+const echoPool = new FixedThreadPool(
+ 1,
+ './tests/worker-files/thread/echoWorker.js'
+)
const errorPool = new FixedThreadPool(
1,
- './tests/worker/thread/errorWorker.js',
+ './tests/worker-files/thread/errorWorker.js',
{
errorHandler: e => console.error(e),
onlineHandler: () => console.log('worker is online')
)
const asyncPool = new FixedThreadPool(
1,
- './tests/worker/thread/asyncWorker.js',
+ './tests/worker-files/thread/asyncWorker.js',
{ maxTasks: maxTasks }
)
})
it('Should work even without opts in input', async () => {
- const pool1 = new FixedThreadPool(1, './tests/worker/thread/testWorker.js')
+ const pool1 = new FixedThreadPool(
+ 1,
+ './tests/worker-files/thread/testWorker.js'
+ )
const res = await pool1.execute({ test: 'test' })
expect(res).toBeFalsy()
})
'use strict'
- const { ClusterWorker } = require('../../../lib/index')
+ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
async function error (data) {
return new Promise((resolve, reject) => {
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
- async: true
+ async: true,
+ killBehavior: KillBehaviors.HARD
})
'use strict'
- const { ClusterWorker } = require('../../../lib/index')
+ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
async function sleep (data) {
return new Promise((resolve, reject) => {
})
}
- module.exports = new ClusterWorker(sleep, { maxInactiveTime: 500, async: true })
+ module.exports = new ClusterWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ClusterWorker } = require('../../../lib/index')
+ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
function echo (data) {
return data
}
- module.exports = new ClusterWorker(echo, { maxInactiveTime: 500 })
+ module.exports = new ClusterWorker(echo, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ClusterWorker } = require('../../../lib/index')
+ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
function test (data) {}
- module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
+ module.exports = new ClusterWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ClusterWorker } = require('../../../lib/index')
+ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
function error (data) {
throw new Error('Error Message from ClusterWorker')
module.exports = new ClusterWorker(error, {
maxInactiveTime: 500,
- async: false
+ async: false,
+ killBehavior: KillBehaviors.HARD
})
'use strict'
- const { ClusterWorker } = require('../../../lib/index')
+ const { ClusterWorker, KillBehaviors } = require('../../../lib/index')
const { isMaster } = require('cluster')
function test (data) {
return isMaster
}
- module.exports = new ClusterWorker(test, { maxInactiveTime: 500 })
+ module.exports = new ClusterWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ThreadWorker } = require('../../../lib/index')
+ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
async function sleep (data) {
return new Promise((resolve, reject) => {
})
}
- module.exports = new ThreadWorker(sleep, { maxInactiveTime: 500, async: true })
+ module.exports = new ThreadWorker(sleep, {
+ maxInactiveTime: 500,
+ async: true,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ThreadWorker } = require('../../../lib/index')
+ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
function echo (data) {
return data
}
- module.exports = new ThreadWorker(echo, { maxInactiveTime: 500 })
+ module.exports = new ThreadWorker(echo, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ThreadWorker } = require('../../../lib/index')
+ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
function test (data) {}
- module.exports = new ThreadWorker(test, { maxInactiveTime: 500 })
+ module.exports = new ThreadWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ThreadWorker } = require('../../../lib/index')
+ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
function error (data) {
throw new Error(data)
}
- module.exports = new ThreadWorker(error, { maxInactiveTime: 500 })
+ module.exports = new ThreadWorker(error, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })
'use strict'
- const { ThreadWorker } = require('../../../lib/index')
+ const { ThreadWorker, KillBehaviors } = require('../../../lib/index')
const { isMainThread } = require('worker_threads')
function test (data) {
return isMainThread
}
- module.exports = new ThreadWorker(test, { maxInactiveTime: 500 })
+ module.exports = new ThreadWorker(test, {
+ maxInactiveTime: 500,
+ killBehavior: KillBehaviors.HARD
+ })