a32e02ba |
1 | 'use strict' |
2 | const { |
3 | Worker, isMainThread, MessageChannel, SHARE_ENV |
4 | } = require('worker_threads') |
a32e02ba |
5 | const { generateID } = require('./util') |
6 | |
50811da2 |
7 | function empty () {} |
a32e02ba |
8 | /** |
9 | * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br> |
10 | * This pool will select the worker thread in a round robin fashion. <br> |
11 | * @author Alessandro Pio Ardizio |
12 | * @since 0.0.1 |
13 | */ |
14 | class FixedThreadPool { |
15 | /** |
16 | * |
17 | * @param {Number} numThreads Num of threads for this worker pool |
506c2a14 |
18 | * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine |
bf962cba |
19 | * @param {Object} an object with possible options for example errorHandler, onlineHandler. |
a32e02ba |
20 | */ |
506c2a14 |
21 | constructor (numThreads, filePath, opts) { |
a32e02ba |
22 | if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!') |
506c2a14 |
23 | if (!filePath) throw new Error('Please specify a file with a worker implementation') |
a32e02ba |
24 | this.numThreads = numThreads |
25 | this.workers = [] |
26 | this.nextWorker = 0 |
27 | this.opts = opts || { maxTasks: 1000 } |
506c2a14 |
28 | this.filePath = filePath |
a32e02ba |
29 | // threadId as key and an integer value |
30 | this.tasks = new Map() |
31 | for (let i = 1; i <= numThreads; i++) { |
32 | this._newWorker() |
33 | } |
34 | } |
35 | |
50811da2 |
36 | destroy () { |
37 | for (const worker of this.workers) { |
38 | worker.terminate() |
39 | } |
50811da2 |
40 | } |
41 | |
a32e02ba |
42 | /** |
43 | * Execute the task specified into the constructor with the data parameter. |
44 | * @param {Any} the input for the task specified |
45 | * @returns {Promise} that is resolved when the task is done |
46 | */ |
47 | async execute (data) { |
48 | // configure worker to handle message with the specified task |
49 | const worker = this._chooseWorker() |
50 | this.tasks.set(worker, this.tasks.get(worker) + 1) |
a32e02ba |
51 | const id = generateID() |
a32e02ba |
52 | const res = this._execute(worker, id) |
506c2a14 |
53 | worker.postMessage({ data: data, _id: id }) |
a32e02ba |
54 | return res |
55 | } |
56 | |
57 | _execute (worker, id) { |
58 | return new Promise((resolve, reject) => { |
59 | const listener = (message) => { |
60 | if (message._id === id) { |
61 | worker.port2.removeListener('message', listener) |
a32e02ba |
62 | this.tasks.set(worker, this.tasks.get(worker) - 1) |
63 | resolve(message.data) |
64 | } |
65 | } |
66 | worker.port2.on('message', listener) |
67 | }) |
68 | } |
69 | |
70 | _chooseWorker () { |
71 | if ((this.workers.length - 1) === this.nextWorker) { |
72 | this.nextWorker = 0 |
73 | return this.workers[this.nextWorker] |
74 | } else { |
75 | this.nextWorker++ |
76 | return this.workers[this.nextWorker] |
77 | } |
78 | } |
79 | |
80 | _newWorker () { |
506c2a14 |
81 | const worker = new Worker(this.filePath, { env: SHARE_ENV }) |
50811da2 |
82 | worker.on('error', this.opts.errorHandler || empty) |
50811da2 |
83 | worker.on('online', this.opts.onlineHandler || empty) |
bf962cba |
84 | // TODO handle properly when a thread exit |
8a306b85 |
85 | worker.on('exit', this.opts.exitHandler || empty) |
a32e02ba |
86 | this.workers.push(worker) |
87 | const { port1, port2 } = new MessageChannel() |
88 | worker.postMessage({ parent: port1 }, [port1]) |
89 | worker.port1 = port1 |
90 | worker.port2 = port2 |
91 | // we will attach a listener for every task, |
92 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size |
bf962cba |
93 | worker.port2.setMaxListeners(this.opts.maxTasks || 1000) |
a32e02ba |
94 | // init tasks map |
95 | this.tasks.set(worker, 0) |
96 | return worker |
97 | } |
98 | } |
99 | |
100 | module.exports = FixedThreadPool |