Working implementation with a very good benchmark based on num of threads
[poolifier.git] / fixed.js
1 'use strict'
2 const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4 } = require('worker_threads')
5 const path = require('path')
6
7 // FixedThreadPool , TrampolineThreadPool
8 /**
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
14 class FixedThreadPool {
15 /**
16 *
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {Object} an object with possible options for example maxConcurrency
19 */
20 constructor (numThreads, filename, opts) {
21 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
22 if (!filename) throw new Error('Please specify a file with a worker implementation')
23 this.numThreads = numThreads || 10
24 this.workers = []
25 this.nextWorker = 0
26 process.env.proof = (data) => console.log(data)
27 for (let i = 1; i <= numThreads; i++) {
28 const worker = new Worker(path.resolve(filename), { env: SHARE_ENV })
29 worker.on('error', (e) => console.error(e))
30 worker.on('exit', () => console.log('EXITING'))
31 this.workers.push(worker)
32 const { port1, port2 } = new MessageChannel()
33 // send the port to communicate with the main thread to the worker
34 /* port2.on('message' , (message) => {
35 console.log('Worker is sending a message : ' + message)
36 }) */
37 worker.postMessage({ parent: port1 }, [port1])
38 worker.port1 = port1
39 worker.port2 = port2
40 }
41 }
42
43 /**
44 *
45 * @param {Function} task , a function to execute
46 * @param {Any} the input for the task specified
47 */
48 async execute (data) {
49 // configure worker to handle message with the specified task
50 const worker = this._chooseWorker()
51 const id = generateID()
52 data._id = id
53 const res = this._execute(worker, id)
54 worker.postMessage(data)
55 return res
56 }
57
58 _execute (worker, id) {
59 return new Promise((resolve, reject) => {
60 const listener = (message) => {
61 if (message._id === id) {
62 console.log('Received a message from worker : ' + message.data)
63 worker.port2.removeListener('message' , listener)
64 resolve(message.data)
65 }
66 }
67 worker.port2.on('message', listener)
68 })
69 }
70
71
72 _chooseWorker () {
73 if ((this.workers.length - 1) === this.nextWorker) {
74 this.nextWorker = 0
75 return this.workers[this.nextWorker]
76 } else {
77 this.nextWorker++
78 return this.workers[this.nextWorker]
79 }
80 }
81 }
82
83 /**
84 * Return an id to be associated to a node.
85 */
86 const generateID = () => {
87 return Math.random()
88 .toString(36)
89 .substring(2)
90 }
91
92 module.exports = FixedThreadPool