a32e02ba |
1 | 'use strict' |
2 | const { |
3 | Worker, isMainThread, MessageChannel, SHARE_ENV |
4 | } = require('worker_threads') |
a32e02ba |
5 | |
50811da2 |
6 | function empty () {} |
106744f7 |
7 | const _void = {} |
a32e02ba |
8 | /** |
9 | * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br> |
10 | * This pool will select the worker thread in a round robin fashion. <br> |
11 | * @author Alessandro Pio Ardizio |
12 | * @since 0.0.1 |
13 | */ |
14 | class FixedThreadPool { |
15 | /** |
16 | * |
17 | * @param {Number} numThreads Num of threads for this worker pool |
506c2a14 |
18 | * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine |
bf962cba |
19 | * @param {Object} an object with possible options for example errorHandler, onlineHandler. |
a32e02ba |
20 | */ |
506c2a14 |
21 | constructor (numThreads, filePath, opts) { |
a32e02ba |
22 | if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') |
506c2a14 |
23 | if (!filePath) throw new Error('Please specify a file with a worker implementation') |
a32e02ba |
24 | this.numThreads = numThreads |
25 | this.workers = [] |
26 | this.nextWorker = 0 |
27 | this.opts = opts || { maxTasks: 1000 } |
506c2a14 |
28 | this.filePath = filePath |
57df5469 |
29 | this._id = 0 |
a32e02ba |
30 | // threadId as key and an integer value |
31 | this.tasks = new Map() |
32 | for (let i = 1; i <= numThreads; i++) { |
33 | this._newWorker() |
34 | } |
35 | } |
36 | |
1f9a5a44 |
37 | async destroy () { |
50811da2 |
38 | for (const worker of this.workers) { |
1f9a5a44 |
39 | await worker.terminate() |
50811da2 |
40 | } |
50811da2 |
41 | } |
42 | |
a32e02ba |
43 | /** |
44 | * Execute the task specified into the constructor with the data parameter. |
45 | * @param {Any} the input for the task specified |
46 | * @returns {Promise} that is resolved when the task is done |
47 | */ |
48 | async execute (data) { |
49 | // configure worker to handle message with the specified task |
50 | const worker = this._chooseWorker() |
51 | this.tasks.set(worker, this.tasks.get(worker) + 1) |
57df5469 |
52 | const id = ++this._id |
a32e02ba |
53 | const res = this._execute(worker, id) |
106744f7 |
54 | worker.postMessage({ data: data || _void, _id: id }) |
a32e02ba |
55 | return res |
56 | } |
57 | |
58 | _execute (worker, id) { |
59 | return new Promise((resolve, reject) => { |
60 | const listener = (message) => { |
61 | if (message._id === id) { |
62 | worker.port2.removeListener('message', listener) |
a32e02ba |
63 | this.tasks.set(worker, this.tasks.get(worker) - 1) |
106744f7 |
64 | if (message.error) reject(message.error) |
65 | else resolve(message.data) |
a32e02ba |
66 | } |
67 | } |
68 | worker.port2.on('message', listener) |
69 | }) |
70 | } |
71 | |
72 | _chooseWorker () { |
73 | if ((this.workers.length - 1) === this.nextWorker) { |
74 | this.nextWorker = 0 |
75 | return this.workers[this.nextWorker] |
76 | } else { |
77 | this.nextWorker++ |
78 | return this.workers[this.nextWorker] |
79 | } |
80 | } |
81 | |
82 | _newWorker () { |
506c2a14 |
83 | const worker = new Worker(this.filePath, { env: SHARE_ENV }) |
50811da2 |
84 | worker.on('error', this.opts.errorHandler || empty) |
50811da2 |
85 | worker.on('online', this.opts.onlineHandler || empty) |
bf962cba |
86 | // TODO handle properly when a thread exit |
8a306b85 |
87 | worker.on('exit', this.opts.exitHandler || empty) |
a32e02ba |
88 | this.workers.push(worker) |
89 | const { port1, port2 } = new MessageChannel() |
90 | worker.postMessage({ parent: port1 }, [port1]) |
91 | worker.port1 = port1 |
92 | worker.port2 = port2 |
93 | // we will attach a listener for every task, |
94 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size |
bf962cba |
95 | worker.port2.setMaxListeners(this.opts.maxTasks || 1000) |
a32e02ba |
96 | // init tasks map |
97 | this.tasks.set(worker, 0) |
98 | return worker |
99 | } |
100 | } |
101 | |
102 | module.exports = FixedThreadPool |