f5ad269580a5d5475ff0a36a0a115f5ff30c6eb8
7 } = require('worker_threads')
12 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
13 * This pool will select the worker thread in a round robin fashion. <br>
14 * @author Alessandro Pio Ardizio
17 class FixedThreadPool
{
20 * @param {Number} numThreads Num of threads for this worker pool
21 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
22 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
24 constructor (numThreads
, filePath
, opts
) {
26 throw new Error('Cannot start a thread pool from a worker thread !!!')
29 throw new Error('Please specify a file with a worker implementation')
31 this.numThreads
= numThreads
34 this.opts
= opts
|| { maxTasks
: 1000 }
35 this.filePath
= filePath
37 // threadId as key and an integer value
38 this.tasks
= new Map()
39 for (let i
= 1; i
<= numThreads
; i
++) {
45 for (const worker
of this.workers
) {
46 await worker
.terminate()
51 * Execute the task specified into the constructor with the data parameter.
52 * @param {Any} the input for the task specified
53 * @returns {Promise} that is resolved when the task is done
55 async
execute (data
) {
56 // configure worker to handle message with the specified task
57 const worker
= this._chooseWorker()
58 this.tasks
.set(worker
, this.tasks
.get(worker
) + 1)
60 const res
= this._execute(worker
, id
)
61 worker
.postMessage({ data
: data
|| _void
, _id
: id
})
65 _execute (worker
, id
) {
66 return new Promise((resolve
, reject
) => {
67 const listener
= message
=> {
68 if (message
._id
=== id
) {
69 worker
.port2
.removeListener('message', listener
)
70 this.tasks
.set(worker
, this.tasks
.get(worker
) - 1)
71 if (message
.error
) reject(message
.error
)
72 else resolve(message
.data
)
75 worker
.port2
.on('message', listener
)
80 if (this.workers
.length
- 1 === this.nextWorker
) {
82 return this.workers
[this.nextWorker
]
85 return this.workers
[this.nextWorker
]
90 const worker
= new Worker(this.filePath
, { env
: SHARE_ENV
})
91 worker
.on('error', this.opts
.errorHandler
|| empty
)
92 worker
.on('online', this.opts
.onlineHandler
|| empty
)
93 // TODO handle properly when a thread exit
94 worker
.on('exit', this.opts
.exitHandler
|| empty
)
95 this.workers
.push(worker
)
96 const { port1
, port2
} = new MessageChannel()
97 worker
.postMessage({ parent
: port1
}, [port1
])
100 // we will attach a listener for every task,
101 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
102 worker
.port2
.setMaxListeners(this.opts
.maxTasks
|| 1000)
104 this.tasks
.set(worker
, 0)
109 module
.exports
= FixedThreadPool