Added prettier standard to support prettier and use it in combination with standard
[poolifier.git] / lib / fixed.js
1 'use strict'
2 const {
3 Worker,
4 isMainThread,
5 MessageChannel,
6 SHARE_ENV
7 } = require('worker_threads')
8
9 function empty () {}
10 const _void = {}
11 /**
12 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
13 * This pool will select the worker thread in a round robin fashion. <br>
14 * @author Alessandro Pio Ardizio
15 * @since 0.0.1
16 */
17 class FixedThreadPool {
18 /**
19 *
20 * @param {Number} numThreads Num of threads for this worker pool
21 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
22 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
23 */
24 constructor (numThreads, filePath, opts) {
25 if (!isMainThread) {
26 throw new Error('Cannot start a thread pool from a worker thread !!!')
27 }
28 if (!filePath) {
29 throw new Error('Please specify a file with a worker implementation')
30 }
31 this.numThreads = numThreads
32 this.workers = []
33 this.nextWorker = 0
34 this.opts = opts || { maxTasks: 1000 }
35 this.filePath = filePath
36 this._id = 0
37 // threadId as key and an integer value
38 this.tasks = new Map()
39 for (let i = 1; i <= numThreads; i++) {
40 this._newWorker()
41 }
42 }
43
44 async destroy () {
45 for (const worker of this.workers) {
46 await worker.terminate()
47 }
48 }
49
50 /**
51 * Execute the task specified into the constructor with the data parameter.
52 * @param {Any} the input for the task specified
53 * @returns {Promise} that is resolved when the task is done
54 */
55 async execute (data) {
56 // configure worker to handle message with the specified task
57 const worker = this._chooseWorker()
58 this.tasks.set(worker, this.tasks.get(worker) + 1)
59 const id = ++this._id
60 const res = this._execute(worker, id)
61 worker.postMessage({ data: data || _void, _id: id })
62 return res
63 }
64
65 _execute (worker, id) {
66 return new Promise((resolve, reject) => {
67 const listener = message => {
68 if (message._id === id) {
69 worker.port2.removeListener('message', listener)
70 this.tasks.set(worker, this.tasks.get(worker) - 1)
71 if (message.error) reject(message.error)
72 else resolve(message.data)
73 }
74 }
75 worker.port2.on('message', listener)
76 })
77 }
78
79 _chooseWorker () {
80 if (this.workers.length - 1 === this.nextWorker) {
81 this.nextWorker = 0
82 return this.workers[this.nextWorker]
83 } else {
84 this.nextWorker++
85 return this.workers[this.nextWorker]
86 }
87 }
88
89 _newWorker () {
90 const worker = new Worker(this.filePath, { env: SHARE_ENV })
91 worker.on('error', this.opts.errorHandler || empty)
92 worker.on('online', this.opts.onlineHandler || empty)
93 // TODO handle properly when a thread exit
94 worker.on('exit', this.opts.exitHandler || empty)
95 this.workers.push(worker)
96 const { port1, port2 } = new MessageChannel()
97 worker.postMessage({ parent: port1 }, [port1])
98 worker.port1 = port1
99 worker.port2 = port2
100 // we will attach a listener for every task,
101 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
102 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
103 // init tasks map
104 this.tasks.set(worker, 0)
105 return worker
106 }
107 }
108
109 module.exports = FixedThreadPool