3 Worker
, isMainThread
, MessageChannel
, SHARE_ENV
4 } = require('worker_threads')
5 const { generateID
} = require('./util')
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
14 class FixedThreadPool
{
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
19 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
21 constructor (numThreads
, filePath
, opts
) {
22 if (!isMainThread
) throw new Error('Cannot start a thread pool from a worker thread !!!')
23 if (!filePath
) throw new Error('Please specify a file with a worker implementation')
24 this.numThreads
= numThreads
27 this.opts
= opts
|| { maxTasks
: 1000 }
28 this.filePath
= filePath
29 // threadId as key and an integer value
30 this.tasks
= new Map()
31 for (let i
= 1; i
<= numThreads
; i
++) {
37 for (const worker
of this.workers
) {
43 * Execute the task specified into the constructor with the data parameter.
44 * @param {Any} the input for the task specified
45 * @returns {Promise} that is resolved when the task is done
47 async
execute (data
) {
48 // configure worker to handle message with the specified task
49 const worker
= this._chooseWorker()
50 this.tasks
.set(worker
, this.tasks
.get(worker
) + 1)
51 const id
= generateID()
52 const res
= this._execute(worker
, id
)
53 worker
.postMessage({ data
: data
, _id
: id
})
57 _execute (worker
, id
) {
58 return new Promise((resolve
, reject
) => {
59 const listener
= (message
) => {
60 if (message
._id
=== id
) {
61 worker
.port2
.removeListener('message', listener
)
62 this.tasks
.set(worker
, this.tasks
.get(worker
) - 1)
66 worker
.port2
.on('message', listener
)
71 if ((this.workers
.length
- 1) === this.nextWorker
) {
73 return this.workers
[this.nextWorker
]
76 return this.workers
[this.nextWorker
]
81 const worker
= new Worker(this.filePath
, { env
: SHARE_ENV
})
82 worker
.on('error', this.opts
.errorHandler
|| empty
)
83 worker
.on('online', this.opts
.onlineHandler
|| empty
)
84 // TODO handle properly when a thread exit
85 worker
.on('exit', this.opts
.exitHandler
|| empty
)
86 this.workers
.push(worker
)
87 const { port1
, port2
} = new MessageChannel()
88 worker
.postMessage({ parent
: port1
}, [port1
])
91 // we will attach a listener for every task,
92 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
93 worker
.port2
.setMaxListeners(this.opts
.maxTasks
|| 1000)
95 this.tasks
.set(worker
, 0)
100 module
.exports
= FixedThreadPool