+++ /dev/null
-'use strict'
-const {
- Worker, isMainThread, MessageChannel, SHARE_ENV
-} = require('worker_threads')
-const path = require('path')
-
-// FixedThreadPool , TrampolineThreadPool
-/**
- * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
- * This pool will select the worker thread in a round robin fashion. <br>
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class FixedThreadPool {
- /**
- *
- * @param {Number} numThreads Num of threads for this worker pool
- * @param {Object} an object with possible options for example maxConcurrency
- */
- constructor (numThreads, filename, opts) {
- if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
- if (!filename) throw new Error('Please specify a file with a worker implementation')
- this.numThreads = numThreads || 10
- this.workers = []
- this.nextWorker = 0
- process.env.proof = (data) => console.log(data)
- for (let i = 1; i <= numThreads; i++) {
- const worker = new Worker(path.resolve(filename), { env: SHARE_ENV })
- worker.on('error', (e) => console.error(e))
- worker.on('exit', () => console.log('EXITING'))
- this.workers.push(worker)
- const { port1, port2 } = new MessageChannel()
- // send the port to communicate with the main thread to the worker
- /* port2.on('message' , (message) => {
- console.log('Worker is sending a message : ' + message)
- }) */
- worker.postMessage({ parent: port1 }, [port1])
- worker.port1 = port1
- worker.port2 = port2
- }
- }
-
- /**
- *
- * @param {Function} task , a function to execute
- * @param {Any} the input for the task specified
- */
- async execute (data) {
- // configure worker to handle message with the specified task
- const worker = this._chooseWorker()
- const id = generateID()
- data._id = id
- const res = this._execute(worker, id)
- worker.postMessage(data)
- return res
- }
-
- _execute (worker, id) {
- return new Promise((resolve, reject) => {
- const listener = (message) => {
- if (message._id === id) {
- console.log('Received a message from worker : ' + message.data)
- worker.port2.removeListener('message' , listener)
- resolve(message.data)
- }
- }
- worker.port2.on('message', listener)
- })
- }
-
-
- _chooseWorker () {
- if ((this.workers.length - 1) === this.nextWorker) {
- this.nextWorker = 0
- return this.workers[this.nextWorker]
- } else {
- this.nextWorker++
- return this.workers[this.nextWorker]
- }
- }
-}
-
-/**
- * Return an id to be associated to a node.
- */
-const generateID = () => {
- return Math.random()
- .toString(36)
- .substring(2)
-}
-
-module.exports = FixedThreadPool
--- /dev/null
+module.exports.FixedThreadPool = require('./lib/fixed')
--- /dev/null
+'use strict'
+const FixedThreadPool = require('./fixed')
+
+/**
+ * A thread pool with a min/max number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
+ * This thread pool will create new workers when the other ones are busy, until the max number of threads,
+ * when the max number of threads is reached, an exception will be thrown.
+ * This pool will select the worker thread in a round robin fashion. <br>
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class DynamicThreadPool extends FixedThreadPool {
+ /**
+ *
+ * @param {Number} min Min number of threads that will be always active
+ * @param {Number} max Max number of threads that will be active
+ * @param {Object} an object with possible options for example maxConcurrency
+ */
+ constructor (min, max, filename, opts) {
+ super(min, filename, opts)
+ this.max = max
+ }
+
+ _chooseWorker () {
+ let worker
+ for (const entry of this.tasks) {
+ if (entry[1] === 0) {
+ worker = entry[0]
+ break
+ }
+ }
+
+ if (worker) {
+ // a worker is free, use it
+ return worker
+ } else {
+ if (this.workers.length === this.max) {
+ throw new Error('Max number of threads reached !!!')
+ }
+ // console.log('new thread is coming')
+ // all workers are busy create a new worker
+ const worker = this._newWorker()
+ worker.port2.on('message', (message) => {
+ if (message.kill) {
+ worker.postMessage({ kill: 1 })
+ worker.terminate()
+ }
+ })
+ return worker
+ }
+ }
+}
+
+module.exports = DynamicThreadPool
--- /dev/null
+'use strict'
+const {
+ Worker, isMainThread, MessageChannel, SHARE_ENV
+} = require('worker_threads')
+const path = require('path')
+const { generateID } = require('./util')
+
+/**
+ * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
+ * This pool will select the worker thread in a round robin fashion. <br>
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class FixedThreadPool {
+ /**
+ *
+ * @param {Number} numThreads Num of threads for this worker pool
+ * @param {Object} an object with possible options for example maxConcurrency
+ */
+ constructor (numThreads, filename, opts) {
+ if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
+ if (!filename) throw new Error('Please specify a file with a worker implementation')
+ this.numThreads = numThreads
+ this.workers = []
+ this.nextWorker = 0
+ this.opts = opts || { maxTasks: 1000 }
+ this.filename = filename
+ // threadId as key and an integer value
+ this.tasks = new Map()
+ for (let i = 1; i <= numThreads; i++) {
+ this._newWorker()
+ }
+ }
+
+ /**
+ * Execute the task specified into the constructor with the data parameter.
+ * @param {Any} the input for the task specified
+ * @returns {Promise} that is resolved when the task is done
+ */
+ async execute (data) {
+ // configure worker to handle message with the specified task
+ const worker = this._chooseWorker()
+ this.tasks.set(worker, this.tasks.get(worker) + 1)
+ // console.log('Num of pending tasks ', this.tasks.get(worker))
+ const id = generateID()
+ data._id = id
+ // console.log('Worker choosed is ' + worker.threadId)
+ const res = this._execute(worker, id)
+ worker.postMessage(data)
+ return res
+ }
+
+ _execute (worker, id) {
+ return new Promise((resolve, reject) => {
+ const listener = (message) => {
+ if (message._id === id) {
+ worker.port2.removeListener('message', listener)
+ // console.log(worker.port2.listenerCount('message'))
+ this.tasks.set(worker, this.tasks.get(worker) - 1)
+ resolve(message.data)
+ }
+ }
+ worker.port2.on('message', listener)
+ })
+ }
+
+ _chooseWorker () {
+ if ((this.workers.length - 1) === this.nextWorker) {
+ this.nextWorker = 0
+ return this.workers[this.nextWorker]
+ } else {
+ this.nextWorker++
+ return this.workers[this.nextWorker]
+ }
+ }
+
+ _newWorker () {
+ const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
+ worker.on('error', (e) => console.error(e))
+ worker.on('exit', () => console.log('EXITING'))
+ this.workers.push(worker)
+ const { port1, port2 } = new MessageChannel()
+ worker.postMessage({ parent: port1 }, [port1])
+ worker.port1 = port1
+ worker.port2 = port2
+ // we will attach a listener for every task,
+ // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
+ worker.port2.setMaxListeners(this.opts.maxTasks)
+ // init tasks map
+ this.tasks.set(worker, 0)
+ return worker
+ }
+}
+
+module.exports = FixedThreadPool
--- /dev/null
+/**
+ * Contains utility functions
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+
+const uuid = require('uuid/v4')
+/**
+ * Return an id to be associated to a node.
+ */
+module.exports.generateID = () => {
+ return uuid()
+}
--- /dev/null
+'use strict'
+const {
+ isMainThread, parentPort
+} = require('worker_threads')
+const maxInactiveTime = 1000 * 60
+
+/**
+ * An example worker that will be always alive, you just need to extend this class if you want a static pool.
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class ThreadWorker {
+ constructor (fn) {
+ if (!fn) throw new Error('Fn parameter is mandatory')
+ // keep the worker active
+ if (!isMainThread) {
+ this.interval =
+ setInterval(() => {
+ }, 10000)
+ }
+ parentPort.on('message', (value) => {
+ if (value.parent) {
+ // save the port to communicate with the main thread
+ this.parent = value.parent
+ } else if (value && value._id) {
+ // console.log('This is the main thread ' + isMainThread)
+ this.parent.postMessage({ data: fn(value), _id: value._id })
+ }
+ })
+ }
+}
+
+/**
+ * An example worker that will be always alive, you just need to extend this class if you want a static pool.<br>
+ * When this worker is inactive for more than 1 minute, it will send this info to the main thread,<br>
+ * if you are using DynamicThreadPool, the workers created after will be killed, the min num of thread will be guaranteed
+ * @author Alessandro Pio Ardizio
+ * @since 0.0.1
+ */
+class DynamicWorker {
+ constructor (fn) {
+ this.lastTask = Date.now()
+ if (!fn) throw new Error('Fn parameter is mandatory')
+ // keep the worker active
+ if (!isMainThread) {
+ this.interval = setInterval(this._checkAlive.bind(this), maxInactiveTime)
+ this._checkAlive.bind(this)()
+ }
+ parentPort.on('message', (value) => {
+ if (value && value._id) {
+ // here you will receive messages
+ // console.log('This is the main thread ' + isMainThread)
+ this.parent.postMessage({ data: fn(value), _id: value._id })
+ this.lastTask = Date.now()
+ } else if (value.parent) {
+ // save the port to communicate with the main thread
+ // this will be received once
+ this.parent = value.parent
+ } else if (value.kill) {
+ // here is time to kill this thread, just clearing the interval
+ clearInterval(this.interval)
+ }
+ })
+ }
+
+ _checkAlive () {
+ if ((Date.now() - this.lastTask) > maxInactiveTime) {
+ this.parent.postMessage({ kill: 1 })
+ }
+ }
+}
+
+module.exports.ThreadWorker = ThreadWorker
+module.exports.DynamicWorker = DynamicWorker
--- /dev/null
+const start = Date.now()
+const toBench = () => {
+ const iterations = 10000
+
+ for (let i = 0; i <= iterations; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
+}
+
+for (let i = 0; i < 1000; i++) {
+ toBench()
+}
+console.log('Time take is ' + (Date.now() - start))
"punycode": "^2.1.0"
}
},
+ "uuid": {
+ "version": "3.4.0",
+ "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.4.0.tgz",
+ "integrity": "sha512-HjSDRw6gZE5JMggctHBcjVak08+KEVhSIiDzFnT9S9aegmp85S/bReBVTb4QTFaRNptJ9kuYaNhnbNEOkbKb/A=="
+ },
"v8-compile-cache": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/v8-compile-cache/-/v8-compile-cache-2.1.0.tgz",
"homepage": "https://github.com/pioardi/node-pool#readme",
"devDependencies": {
"standard": "^14.3.1"
+ },
+ "dependencies": {
+ "uuid": "^3.4.0"
}
}
-const FixedThreadPool = require('./fixed')
+const FixedThreadPool = require('./lib/fixed')
+const DynamicThreadPool = require('./lib/dynamic')
let resolved = 0
-const pool = new FixedThreadPool(3, './yourWorker.js')
-
-async function proof () {
- const o = {
- a: 123
- }
- const res = await pool.execute(o)
- // console.log('Here we are')
- console.log('I am logging the result ' + res)
-}
-
-// proof()
+// const pool = new FixedThreadPool(15, './yourWorker.js')
+const pool = new DynamicThreadPool(15, 1020, './yourWorker.js')
const start = Date.now()
-const iterations = 50000
+const iterations = 1000
for (let i = 0; i <= iterations; i++) {
- const o = {
- a: i
- }
- pool.execute(o).then(res => {
- console.log(res)
+ pool.execute({}).then(res => {
+ // console.log(res)
resolved++
if (resolved === iterations) {
console.log('Time take is ' + (Date.now() - start))
+++ /dev/null
-'use strict'
-const {
- isMainThread, parentPort
-} = require('worker_threads')
-
-/**
- * An example worker that will be always alive, you just need to extend this class if you want a static pool.
- * @author Alessandro Pio Ardizio
- * @since 0.0.1
- */
-class ThreadWorker {
- constructor (fn) {
- if (!fn) throw new Error('Fn parameter is mandatory')
- // keep the worker active
- if (!isMainThread) {
- this.interval =
- setInterval(() => {
- }, 10000)
- }
- parentPort.on('message', (value) => {
- if (value.parent) {
- // save the port to communicate with the main thread
- this.parent = value.parent
- } else if (value && value._id) {
- // console.log('This is the main thread ' + isMainThread)
- this.parent.postMessage({ data: fn(value), _id: value._id })
- }
- })
- }
-}
-
-module.exports = ThreadWorker
'use strict'
-const ThreadWorker = require('./worker')
-const { isMainThread } = require('worker_threads')
+const { ThreadWorker, DynamicWorker } = require('./lib/workers')
-class MyWorker extends ThreadWorker {
+class MyWorker extends DynamicWorker {
constructor () {
super((data) => {
+ for (let i = 0; i <= 10000; i++) {
+ const o = {
+ a: i
+ }
+ JSON.stringify(o)
+ }
// console.log('This is the main thread ' + isMainThread)
- // this.parent.postMessage(JSON.stringify(data))
- return JSON.stringify(data)
+ return data
})
}
}