Error handling and unit tests
[poolifier.git] / lib / fixed.js
CommitLineData
a32e02ba 1'use strict'
2const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4} = require('worker_threads')
a32e02ba 5
50811da2 6function empty () {}
106744f7 7const _void = {}
a32e02ba 8/**
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
14class FixedThreadPool {
15 /**
16 *
17 * @param {Number} numThreads Num of threads for this worker pool
506c2a14 18 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
bf962cba 19 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
a32e02ba 20 */
506c2a14 21 constructor (numThreads, filePath, opts) {
a32e02ba 22 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
506c2a14 23 if (!filePath) throw new Error('Please specify a file with a worker implementation')
a32e02ba 24 this.numThreads = numThreads
25 this.workers = []
26 this.nextWorker = 0
27 this.opts = opts || { maxTasks: 1000 }
506c2a14 28 this.filePath = filePath
57df5469 29 this._id = 0
a32e02ba 30 // threadId as key and an integer value
31 this.tasks = new Map()
32 for (let i = 1; i <= numThreads; i++) {
33 this._newWorker()
34 }
35 }
36
50811da2 37 destroy () {
38 for (const worker of this.workers) {
39 worker.terminate()
40 }
50811da2 41 }
42
a32e02ba 43 /**
44 * Execute the task specified into the constructor with the data parameter.
45 * @param {Any} the input for the task specified
46 * @returns {Promise} that is resolved when the task is done
47 */
48 async execute (data) {
49 // configure worker to handle message with the specified task
50 const worker = this._chooseWorker()
51 this.tasks.set(worker, this.tasks.get(worker) + 1)
57df5469 52 const id = ++this._id
a32e02ba 53 const res = this._execute(worker, id)
106744f7 54 worker.postMessage({ data: data || _void, _id: id })
a32e02ba 55 return res
56 }
57
58 _execute (worker, id) {
59 return new Promise((resolve, reject) => {
60 const listener = (message) => {
61 if (message._id === id) {
62 worker.port2.removeListener('message', listener)
a32e02ba 63 this.tasks.set(worker, this.tasks.get(worker) - 1)
106744f7 64 if (message.error) reject(message.error)
65 else resolve(message.data)
a32e02ba 66 }
67 }
68 worker.port2.on('message', listener)
69 })
70 }
71
72 _chooseWorker () {
73 if ((this.workers.length - 1) === this.nextWorker) {
74 this.nextWorker = 0
75 return this.workers[this.nextWorker]
76 } else {
77 this.nextWorker++
78 return this.workers[this.nextWorker]
79 }
80 }
81
82 _newWorker () {
506c2a14 83 const worker = new Worker(this.filePath, { env: SHARE_ENV })
50811da2 84 worker.on('error', this.opts.errorHandler || empty)
50811da2 85 worker.on('online', this.opts.onlineHandler || empty)
bf962cba 86 // TODO handle properly when a thread exit
8a306b85 87 worker.on('exit', this.opts.exitHandler || empty)
a32e02ba 88 this.workers.push(worker)
89 const { port1, port2 } = new MessageChannel()
90 worker.postMessage({ parent: port1 }, [port1])
91 worker.port1 = port1
92 worker.port2 = port2
93 // we will attach a listener for every task,
94 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
bf962cba 95 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
a32e02ba 96 // init tasks map
97 this.tasks.set(worker, 0)
98 return worker
99 }
100}
101
102module.exports = FixedThreadPool