Working implementation with a very good benchmark based on num of threads
[poolifier.git] / fixed.js
CommitLineData
6dc67cda 1'use strict'
2const {
27006a3d 3 Worker, isMainThread, MessageChannel, SHARE_ENV
6dc67cda 4} = require('worker_threads')
27006a3d 5const path = require('path')
6dc67cda 6
6dc67cda 7// FixedThreadPool , TrampolineThreadPool
8/**
27006a3d 9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
6dc67cda 11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
14class FixedThreadPool {
15 /**
16 *
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {Object} an object with possible options for example maxConcurrency
19 */
27006a3d 20 constructor (numThreads, filename, opts) {
21 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
22 if (!filename) throw new Error('Please specify a file with a worker implementation')
23 this.numThreads = numThreads || 10
6dc67cda 24 this.workers = []
3e460d6d 25 this.nextWorker = 0
27006a3d 26 process.env.proof = (data) => console.log(data)
6dc67cda 27 for (let i = 1; i <= numThreads; i++) {
27006a3d 28 const worker = new Worker(path.resolve(filename), { env: SHARE_ENV })
29 worker.on('error', (e) => console.error(e))
30 worker.on('exit', () => console.log('EXITING'))
6dc67cda 31 this.workers.push(worker)
27006a3d 32 const { port1, port2 } = new MessageChannel()
33 // send the port to communicate with the main thread to the worker
34 /* port2.on('message' , (message) => {
35 console.log('Worker is sending a message : ' + message)
36 }) */
37 worker.postMessage({ parent: port1 }, [port1])
38 worker.port1 = port1
39 worker.port2 = port2
6dc67cda 40 }
41 }
42
43 /**
44 *
45 * @param {Function} task , a function to execute
46 * @param {Any} the input for the task specified
47 */
27006a3d 48 async execute (data) {
6dc67cda 49 // configure worker to handle message with the specified task
3e460d6d 50 const worker = this._chooseWorker()
c2dbbe2b 51 const id = generateID()
27006a3d 52 data._id = id
53 const res = this._execute(worker, id)
54 worker.postMessage(data)
6dc67cda 55 return res
56 }
57
27006a3d 58 _execute (worker, id) {
6dc67cda 59 return new Promise((resolve, reject) => {
27006a3d 60 const listener = (message) => {
61 if (message._id === id) {
62 console.log('Received a message from worker : ' + message.data)
63 worker.port2.removeListener('message' , listener)
64 resolve(message.data)
6dc67cda 65 }
27006a3d 66 }
67 worker.port2.on('message', listener)
6dc67cda 68 })
69 }
27006a3d 70
6dc67cda 71
3e460d6d 72 _chooseWorker () {
3e460d6d 73 if ((this.workers.length - 1) === this.nextWorker) {
3e460d6d 74 this.nextWorker = 0
75 return this.workers[this.nextWorker]
76 } else {
77 this.nextWorker++
78 return this.workers[this.nextWorker]
79 }
80 }
c2dbbe2b 81}
82
83/**
84 * Return an id to be associated to a node.
85 */
86const generateID = () => {
87 return Math.random()
88 .toString(36)
27006a3d 89 .substring(2)
c2dbbe2b 90}
91
6dc67cda 92module.exports = FixedThreadPool