a32e02ba |
1 | 'use strict' |
2 | const { |
cf9aa6c3 |
3 | Worker, |
4 | isMainThread, |
5 | MessageChannel, |
6 | SHARE_ENV |
a32e02ba |
7 | } = require('worker_threads') |
a32e02ba |
8 | |
50811da2 |
9 | function empty () {} |
106744f7 |
10 | const _void = {} |
a32e02ba |
11 | /** |
12 | * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br> |
13 | * This pool will select the worker thread in a round robin fashion. <br> |
14 | * @author Alessandro Pio Ardizio |
15 | * @since 0.0.1 |
16 | */ |
17 | class FixedThreadPool { |
18 | /** |
cf9aa6c3 |
19 | * |
20 | * @param {Number} numThreads Num of threads for this worker pool |
21 | * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine |
22 | * @param {Object} an object with possible options for example errorHandler, onlineHandler. |
23 | */ |
506c2a14 |
24 | constructor (numThreads, filePath, opts) { |
cf9aa6c3 |
25 | if (!isMainThread) { |
26 | throw new Error('Cannot start a thread pool from a worker thread !!!') |
27 | } |
28 | if (!filePath) { |
29 | throw new Error('Please specify a file with a worker implementation') |
30 | } |
a32e02ba |
31 | this.numThreads = numThreads |
32 | this.workers = [] |
33 | this.nextWorker = 0 |
34 | this.opts = opts || { maxTasks: 1000 } |
506c2a14 |
35 | this.filePath = filePath |
57df5469 |
36 | this._id = 0 |
a32e02ba |
37 | // threadId as key and an integer value |
38 | this.tasks = new Map() |
39 | for (let i = 1; i <= numThreads; i++) { |
40 | this._newWorker() |
41 | } |
42 | } |
43 | |
1f9a5a44 |
44 | async destroy () { |
50811da2 |
45 | for (const worker of this.workers) { |
1f9a5a44 |
46 | await worker.terminate() |
50811da2 |
47 | } |
50811da2 |
48 | } |
49 | |
a32e02ba |
50 | /** |
51 | * Execute the task specified into the constructor with the data parameter. |
52 | * @param {Any} the input for the task specified |
53 | * @returns {Promise} that is resolved when the task is done |
54 | */ |
55 | async execute (data) { |
56 | // configure worker to handle message with the specified task |
57 | const worker = this._chooseWorker() |
58 | this.tasks.set(worker, this.tasks.get(worker) + 1) |
57df5469 |
59 | const id = ++this._id |
a32e02ba |
60 | const res = this._execute(worker, id) |
106744f7 |
61 | worker.postMessage({ data: data || _void, _id: id }) |
a32e02ba |
62 | return res |
63 | } |
64 | |
65 | _execute (worker, id) { |
66 | return new Promise((resolve, reject) => { |
cf9aa6c3 |
67 | const listener = message => { |
a32e02ba |
68 | if (message._id === id) { |
69 | worker.port2.removeListener('message', listener) |
a32e02ba |
70 | this.tasks.set(worker, this.tasks.get(worker) - 1) |
106744f7 |
71 | if (message.error) reject(message.error) |
72 | else resolve(message.data) |
a32e02ba |
73 | } |
74 | } |
75 | worker.port2.on('message', listener) |
76 | }) |
77 | } |
78 | |
79 | _chooseWorker () { |
cf9aa6c3 |
80 | if (this.workers.length - 1 === this.nextWorker) { |
a32e02ba |
81 | this.nextWorker = 0 |
82 | return this.workers[this.nextWorker] |
83 | } else { |
84 | this.nextWorker++ |
85 | return this.workers[this.nextWorker] |
86 | } |
87 | } |
88 | |
89 | _newWorker () { |
506c2a14 |
90 | const worker = new Worker(this.filePath, { env: SHARE_ENV }) |
50811da2 |
91 | worker.on('error', this.opts.errorHandler || empty) |
50811da2 |
92 | worker.on('online', this.opts.onlineHandler || empty) |
bf962cba |
93 | // TODO handle properly when a thread exit |
8a306b85 |
94 | worker.on('exit', this.opts.exitHandler || empty) |
a32e02ba |
95 | this.workers.push(worker) |
96 | const { port1, port2 } = new MessageChannel() |
97 | worker.postMessage({ parent: port1 }, [port1]) |
98 | worker.port1 = port1 |
99 | worker.port2 = port2 |
100 | // we will attach a listener for every task, |
101 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size |
bf962cba |
102 | worker.port2.setMaxListeners(this.opts.maxTasks || 1000) |
a32e02ba |
103 | // init tasks map |
104 | this.tasks.set(worker, 0) |
105 | return worker |
106 | } |
107 | } |
108 | |
109 | module.exports = FixedThreadPool |