442d58e459c7d0ce75058fa4a0518db9a66d34ee
3 Worker
, isMainThread
, MessageChannel
, SHARE_ENV
4 } = require('worker_threads')
5 const path
= require('path')
6 const { generateID
} = require('./util')
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
14 class FixedThreadPool
{
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {Object} an object with possible options for example maxConcurrency
20 constructor (numThreads
, filename
, opts
) {
21 if (!isMainThread
) throw new Error('Cannot start a thread pool from a worker thread !!!')
22 if (!filename
) throw new Error('Please specify a file with a worker implementation')
23 this.numThreads
= numThreads
26 this.opts
= opts
|| { maxTasks
: 1000 }
27 this.filename
= filename
28 // threadId as key and an integer value
29 this.tasks
= new Map()
30 for (let i
= 1; i
<= numThreads
; i
++) {
36 * Execute the task specified into the constructor with the data parameter.
37 * @param {Any} the input for the task specified
38 * @returns {Promise} that is resolved when the task is done
40 async
execute (data
) {
41 // configure worker to handle message with the specified task
42 const worker
= this._chooseWorker()
43 this.tasks
.set(worker
, this.tasks
.get(worker
) + 1)
44 // console.log('Num of pending tasks ', this.tasks.get(worker))
45 const id
= generateID()
47 // console.log('Worker choosed is ' + worker.threadId)
48 const res
= this._execute(worker
, id
)
49 worker
.postMessage(data
)
53 _execute (worker
, id
) {
54 return new Promise((resolve
, reject
) => {
55 const listener
= (message
) => {
56 if (message
._id
=== id
) {
57 worker
.port2
.removeListener('message', listener
)
58 // console.log(worker.port2.listenerCount('message'))
59 this.tasks
.set(worker
, this.tasks
.get(worker
) - 1)
63 worker
.port2
.on('message', listener
)
68 if ((this.workers
.length
- 1) === this.nextWorker
) {
70 return this.workers
[this.nextWorker
]
73 return this.workers
[this.nextWorker
]
78 const worker
= new Worker(path
.resolve(this.filename
), { env
: SHARE_ENV
})
79 worker
.on('error', (e
) => console
.error(e
))
80 worker
.on('exit', () => console
.log('EXITING'))
81 this.workers
.push(worker
)
82 const { port1
, port2
} = new MessageChannel()
83 worker
.postMessage({ parent
: port1
}, [port1
])
86 // we will attach a listener for every task,
87 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
88 worker
.port2
.setMaxListeners(this.opts
.maxTasks
)
90 this.tasks
.set(worker
, 0)
95 module
.exports
= FixedThreadPool