A dynamic thread pool and a new worker implementation. Next step is to write some...
[poolifier.git] / lib / fixed.js
1 'use strict'
2 const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4 } = require('worker_threads')
5 const path = require('path')
6 const { generateID } = require('./util')
7
8 /**
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
14 class FixedThreadPool {
15 /**
16 *
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {Object} an object with possible options for example maxConcurrency
19 */
20 constructor (numThreads, filename, opts) {
21 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
22 if (!filename) throw new Error('Please specify a file with a worker implementation')
23 this.numThreads = numThreads
24 this.workers = []
25 this.nextWorker = 0
26 this.opts = opts || { maxTasks: 1000 }
27 this.filename = filename
28 // threadId as key and an integer value
29 this.tasks = new Map()
30 for (let i = 1; i <= numThreads; i++) {
31 this._newWorker()
32 }
33 }
34
35 /**
36 * Execute the task specified into the constructor with the data parameter.
37 * @param {Any} the input for the task specified
38 * @returns {Promise} that is resolved when the task is done
39 */
40 async execute (data) {
41 // configure worker to handle message with the specified task
42 const worker = this._chooseWorker()
43 this.tasks.set(worker, this.tasks.get(worker) + 1)
44 // console.log('Num of pending tasks ', this.tasks.get(worker))
45 const id = generateID()
46 data._id = id
47 // console.log('Worker choosed is ' + worker.threadId)
48 const res = this._execute(worker, id)
49 worker.postMessage(data)
50 return res
51 }
52
53 _execute (worker, id) {
54 return new Promise((resolve, reject) => {
55 const listener = (message) => {
56 if (message._id === id) {
57 worker.port2.removeListener('message', listener)
58 // console.log(worker.port2.listenerCount('message'))
59 this.tasks.set(worker, this.tasks.get(worker) - 1)
60 resolve(message.data)
61 }
62 }
63 worker.port2.on('message', listener)
64 })
65 }
66
67 _chooseWorker () {
68 if ((this.workers.length - 1) === this.nextWorker) {
69 this.nextWorker = 0
70 return this.workers[this.nextWorker]
71 } else {
72 this.nextWorker++
73 return this.workers[this.nextWorker]
74 }
75 }
76
77 _newWorker () {
78 const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
79 worker.on('error', (e) => console.error(e))
80 worker.on('exit', () => console.log('EXITING'))
81 this.workers.push(worker)
82 const { port1, port2 } = new MessageChannel()
83 worker.postMessage({ parent: port1 }, [port1])
84 worker.port1 = port1
85 worker.port2 = port2
86 // we will attach a listener for every task,
87 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
88 worker.port2.setMaxListeners(this.opts.maxTasks)
89 // init tasks map
90 this.tasks.set(worker, 0)
91 return worker
92 }
93 }
94
95 module.exports = FixedThreadPool