6dc67cda |
1 | 'use strict' |
2 | const { |
27006a3d |
3 | Worker, isMainThread, MessageChannel, SHARE_ENV |
6dc67cda |
4 | } = require('worker_threads') |
27006a3d |
5 | const path = require('path') |
6dc67cda |
6 | |
6dc67cda |
7 | // FixedThreadPool , TrampolineThreadPool |
8 | /** |
27006a3d |
9 | * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br> |
10 | * This pool will select the worker thread in a round robin fashion. <br> |
6dc67cda |
11 | * @author Alessandro Pio Ardizio |
12 | * @since 0.0.1 |
13 | */ |
14 | class FixedThreadPool { |
15 | /** |
16 | * |
17 | * @param {Number} numThreads Num of threads for this worker pool |
18 | * @param {Object} an object with possible options for example maxConcurrency |
19 | */ |
27006a3d |
20 | constructor (numThreads, filename, opts) { |
21 | if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') |
22 | if (!filename) throw new Error('Please specify a file with a worker implementation') |
23 | this.numThreads = numThreads || 10 |
6dc67cda |
24 | this.workers = [] |
3e460d6d |
25 | this.nextWorker = 0 |
27006a3d |
26 | process.env.proof = (data) => console.log(data) |
6dc67cda |
27 | for (let i = 1; i <= numThreads; i++) { |
27006a3d |
28 | const worker = new Worker(path.resolve(filename), { env: SHARE_ENV }) |
29 | worker.on('error', (e) => console.error(e)) |
30 | worker.on('exit', () => console.log('EXITING')) |
6dc67cda |
31 | this.workers.push(worker) |
27006a3d |
32 | const { port1, port2 } = new MessageChannel() |
33 | // send the port to communicate with the main thread to the worker |
34 | /* port2.on('message' , (message) => { |
35 | console.log('Worker is sending a message : ' + message) |
36 | }) */ |
37 | worker.postMessage({ parent: port1 }, [port1]) |
38 | worker.port1 = port1 |
39 | worker.port2 = port2 |
6dc67cda |
40 | } |
41 | } |
42 | |
43 | /** |
44 | * |
45 | * @param {Function} task , a function to execute |
46 | * @param {Any} the input for the task specified |
47 | */ |
27006a3d |
48 | async execute (data) { |
6dc67cda |
49 | // configure worker to handle message with the specified task |
3e460d6d |
50 | const worker = this._chooseWorker() |
c2dbbe2b |
51 | const id = generateID() |
27006a3d |
52 | data._id = id |
53 | const res = this._execute(worker, id) |
54 | worker.postMessage(data) |
6dc67cda |
55 | return res |
56 | } |
57 | |
27006a3d |
58 | _execute (worker, id) { |
6dc67cda |
59 | return new Promise((resolve, reject) => { |
27006a3d |
60 | const listener = (message) => { |
61 | if (message._id === id) { |
62 | console.log('Received a message from worker : ' + message.data) |
63 | worker.port2.removeListener('message' , listener) |
64 | resolve(message.data) |
6dc67cda |
65 | } |
27006a3d |
66 | } |
67 | worker.port2.on('message', listener) |
6dc67cda |
68 | }) |
69 | } |
27006a3d |
70 | |
6dc67cda |
71 | |
3e460d6d |
72 | _chooseWorker () { |
3e460d6d |
73 | if ((this.workers.length - 1) === this.nextWorker) { |
3e460d6d |
74 | this.nextWorker = 0 |
75 | return this.workers[this.nextWorker] |
76 | } else { |
77 | this.nextWorker++ |
78 | return this.workers[this.nextWorker] |
79 | } |
80 | } |
c2dbbe2b |
81 | } |
82 | |
83 | /** |
84 | * Return an id to be associated to a node. |
85 | */ |
86 | const generateID = () => { |
87 | return Math.random() |
88 | .toString(36) |
27006a3d |
89 | .substring(2) |
c2dbbe2b |
90 | } |
91 | |
6dc67cda |
92 | module.exports = FixedThreadPool |