3 Worker
, isMainThread
, MessageChannel
, SHARE_ENV
4 } = require('worker_threads')
5 const path
= require('path')
6 const { generateID
} = require('./util')
10 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
11 * This pool will select the worker thread in a round robin fashion. <br>
12 * @author Alessandro Pio Ardizio
15 class FixedThreadPool
{
18 * @param {Number} numThreads Num of threads for this worker pool
19 * @param {string} a file path with implementation of @see ThreadWorker class
20 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
22 constructor (numThreads
, filename
, opts
) {
23 if (!isMainThread
) throw new Error('Cannot start a thread pool from a worker thread !!!')
24 if (!filename
) throw new Error('Please specify a file with a worker implementation')
25 this.numThreads
= numThreads
28 this.opts
= opts
|| { maxTasks
: 1000 }
29 this.filename
= filename
30 // threadId as key and an integer value
31 this.tasks
= new Map()
32 for (let i
= 1; i
<= numThreads
; i
++) {
38 for (const worker
of this.workers
) {
45 * Execute the task specified into the constructor with the data parameter.
46 * @param {Any} the input for the task specified
47 * @returns {Promise} that is resolved when the task is done
49 async
execute (data
) {
50 // configure worker to handle message with the specified task
51 const worker
= this._chooseWorker()
52 this.tasks
.set(worker
, this.tasks
.get(worker
) + 1)
53 const id
= generateID()
55 const res
= this._execute(worker
, id
)
56 worker
.postMessage(data
)
60 _execute (worker
, id
) {
61 return new Promise((resolve
, reject
) => {
62 const listener
= (message
) => {
63 if (message
._id
=== id
) {
64 worker
.port2
.removeListener('message', listener
)
65 this.tasks
.set(worker
, this.tasks
.get(worker
) - 1)
69 worker
.port2
.on('message', listener
)
74 if ((this.workers
.length
- 1) === this.nextWorker
) {
76 return this.workers
[this.nextWorker
]
79 return this.workers
[this.nextWorker
]
84 const worker
= new Worker(path
.resolve(this.filename
), { env
: SHARE_ENV
})
85 worker
.on('error', this.opts
.errorHandler
|| empty
)
86 worker
.on('online', this.opts
.onlineHandler
|| empty
)
87 // TODO handle properly when a thread exit
88 worker
.on('exit', this.opts
.exitHandler
|| empty
)
89 this.workers
.push(worker
)
90 const { port1
, port2
} = new MessageChannel()
91 worker
.postMessage({ parent
: port1
}, [port1
])
94 // we will attach a listener for every task,
95 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
96 worker
.port2
.setMaxListeners(this.opts
.maxTasks
|| 1000)
98 this.tasks
.set(worker
, 0)
103 module
.exports
= FixedThreadPool