3 Worker
, isMainThread
, MessageChannel
, SHARE_ENV
4 } = require('worker_threads')
5 const path
= require('path')
7 // FixedThreadPool , TrampolineThreadPool
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
14 class FixedThreadPool
{
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {Object} an object with possible options for example maxConcurrency
20 constructor (numThreads
, filename
, opts
) {
21 if (!isMainThread
) throw new Error('Cannot start a thread pool from a worker thread !!!')
22 if (!filename
) throw new Error('Please specify a file with a worker implementation')
23 this.numThreads
= numThreads
|| 10
26 process
.env
.proof
= (data
) => console
.log(data
)
27 for (let i
= 1; i
<= numThreads
; i
++) {
28 const worker
= new Worker(path
.resolve(filename
), { env
: SHARE_ENV
})
29 worker
.on('error', (e
) => console
.error(e
))
30 worker
.on('exit', () => console
.log('EXITING'))
31 this.workers
.push(worker
)
32 const { port1
, port2
} = new MessageChannel()
33 // send the port to communicate with the main thread to the worker
34 /* port2.on('message' , (message) => {
35 console.log('Worker is sending a message : ' + message)
37 worker
.postMessage({ parent
: port1
}, [port1
])
45 * @param {Function} task , a function to execute
46 * @param {Any} the input for the task specified
48 async
execute (data
) {
49 // configure worker to handle message with the specified task
50 const worker
= this._chooseWorker()
51 const id
= generateID()
53 const res
= this._execute(worker
, id
)
54 worker
.postMessage(data
)
58 _execute (worker
, id
) {
59 return new Promise((resolve
, reject
) => {
60 const listener
= (message
) => {
61 if (message
._id
=== id
) {
62 console
.log('Received a message from worker : ' + message
.data
)
63 worker
.port2
.removeListener('message' , listener
)
67 worker
.port2
.on('message', listener
)
73 if ((this.workers
.length
- 1) === this.nextWorker
) {
75 return this.workers
[this.nextWorker
]
78 return this.workers
[this.nextWorker
]
84 * Return an id to be associated to a node.
86 const generateID
= () => {
92 module
.exports
= FixedThreadPool