a32e02ba |
1 | 'use strict' |
2 | const { |
3 | Worker, isMainThread, MessageChannel, SHARE_ENV |
4 | } = require('worker_threads') |
5 | const path = require('path') |
6 | const { generateID } = require('./util') |
7 | |
50811da2 |
8 | function empty () {} |
a32e02ba |
9 | /** |
10 | * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br> |
11 | * This pool will select the worker thread in a round robin fashion. <br> |
12 | * @author Alessandro Pio Ardizio |
13 | * @since 0.0.1 |
14 | */ |
15 | class FixedThreadPool { |
16 | /** |
17 | * |
18 | * @param {Number} numThreads Num of threads for this worker pool |
50811da2 |
19 | * @param {string} a file path with implementation of @see ThreadWorker class |
20 | * @param {Object} an object with possible options for example errorHandler, onlineHandler, exitHandler |
a32e02ba |
21 | */ |
22 | constructor (numThreads, filename, opts) { |
23 | if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') |
24 | if (!filename) throw new Error('Please specify a file with a worker implementation') |
25 | this.numThreads = numThreads |
26 | this.workers = [] |
27 | this.nextWorker = 0 |
28 | this.opts = opts || { maxTasks: 1000 } |
29 | this.filename = filename |
30 | // threadId as key and an integer value |
31 | this.tasks = new Map() |
32 | for (let i = 1; i <= numThreads; i++) { |
33 | this._newWorker() |
34 | } |
35 | } |
36 | |
50811da2 |
37 | destroy () { |
38 | for (const worker of this.workers) { |
39 | worker.terminate() |
40 | } |
41 | this.emitDestroy() |
42 | } |
43 | |
a32e02ba |
44 | /** |
45 | * Execute the task specified into the constructor with the data parameter. |
46 | * @param {Any} the input for the task specified |
47 | * @returns {Promise} that is resolved when the task is done |
48 | */ |
49 | async execute (data) { |
50 | // configure worker to handle message with the specified task |
51 | const worker = this._chooseWorker() |
52 | this.tasks.set(worker, this.tasks.get(worker) + 1) |
a32e02ba |
53 | const id = generateID() |
54 | data._id = id |
a32e02ba |
55 | const res = this._execute(worker, id) |
56 | worker.postMessage(data) |
57 | return res |
58 | } |
59 | |
60 | _execute (worker, id) { |
61 | return new Promise((resolve, reject) => { |
62 | const listener = (message) => { |
63 | if (message._id === id) { |
64 | worker.port2.removeListener('message', listener) |
a32e02ba |
65 | this.tasks.set(worker, this.tasks.get(worker) - 1) |
66 | resolve(message.data) |
67 | } |
68 | } |
69 | worker.port2.on('message', listener) |
70 | }) |
71 | } |
72 | |
73 | _chooseWorker () { |
74 | if ((this.workers.length - 1) === this.nextWorker) { |
75 | this.nextWorker = 0 |
76 | return this.workers[this.nextWorker] |
77 | } else { |
78 | this.nextWorker++ |
79 | return this.workers[this.nextWorker] |
80 | } |
81 | } |
82 | |
83 | _newWorker () { |
84 | const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV }) |
50811da2 |
85 | worker.on('error', this.opts.errorHandler || empty) |
50811da2 |
86 | worker.on('online', this.opts.onlineHandler || empty) |
8a306b85 |
87 | // TODO remove the workers array , use only the map data structure |
88 | // handle properly when a thread exit |
89 | worker.on('exit', this.opts.exitHandler || empty) |
a32e02ba |
90 | this.workers.push(worker) |
91 | const { port1, port2 } = new MessageChannel() |
92 | worker.postMessage({ parent: port1 }, [port1]) |
93 | worker.port1 = port1 |
94 | worker.port2 = port2 |
95 | // we will attach a listener for every task, |
96 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size |
97 | worker.port2.setMaxListeners(this.opts.maxTasks) |
98 | // init tasks map |
99 | this.tasks.set(worker, 0) |
100 | return worker |
101 | } |
102 | } |
103 | |
104 | module.exports = FixedThreadPool |