3 Worker
, isMainThread
, MessageChannel
, SHARE_ENV
4 } = require('worker_threads')
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
14 class FixedThreadPool
{
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
19 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
21 constructor (numThreads
, filePath
, opts
) {
22 if (!isMainThread
) throw new Error('Cannot start a thread pool from a worker thread !!!')
23 if (!filePath
) throw new Error('Please specify a file with a worker implementation')
24 this.numThreads
= numThreads
27 this.opts
= opts
|| { maxTasks
: 1000 }
28 this.filePath
= filePath
30 // threadId as key and an integer value
31 this.tasks
= new Map()
32 for (let i
= 1; i
<= numThreads
; i
++) {
38 for (const worker
of this.workers
) {
39 await worker
.terminate()
44 * Execute the task specified into the constructor with the data parameter.
45 * @param {Any} the input for the task specified
46 * @returns {Promise} that is resolved when the task is done
48 async
execute (data
) {
49 // configure worker to handle message with the specified task
50 const worker
= this._chooseWorker()
51 this.tasks
.set(worker
, this.tasks
.get(worker
) + 1)
53 const res
= this._execute(worker
, id
)
54 worker
.postMessage({ data
: data
|| _void
, _id
: id
})
58 _execute (worker
, id
) {
59 return new Promise((resolve
, reject
) => {
60 const listener
= (message
) => {
61 if (message
._id
=== id
) {
62 worker
.port2
.removeListener('message', listener
)
63 this.tasks
.set(worker
, this.tasks
.get(worker
) - 1)
64 if (message
.error
) reject(message
.error
)
65 else resolve(message
.data
)
68 worker
.port2
.on('message', listener
)
73 if ((this.workers
.length
- 1) === this.nextWorker
) {
75 return this.workers
[this.nextWorker
]
78 return this.workers
[this.nextWorker
]
83 const worker
= new Worker(this.filePath
, { env
: SHARE_ENV
})
84 worker
.on('error', this.opts
.errorHandler
|| empty
)
85 worker
.on('online', this.opts
.onlineHandler
|| empty
)
86 // TODO handle properly when a thread exit
87 worker
.on('exit', this.opts
.exitHandler
|| empty
)
88 this.workers
.push(worker
)
89 const { port1
, port2
} = new MessageChannel()
90 worker
.postMessage({ parent
: port1
}, [port1
])
93 // we will attach a listener for every task,
94 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
95 worker
.port2
.setMaxListeners(this.opts
.maxTasks
|| 1000)
97 this.tasks
.set(worker
, 0)
102 module
.exports
= FixedThreadPool