676947725bed0bb65fe72fe949be8652b08f39c5
[poolifier.git] / lib / fixed.js
1 'use strict'
2 const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4 } = require('worker_threads')
5
6 function empty () {}
7 const _void = {}
8 /**
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
14 class FixedThreadPool {
15 /**
16 *
17 * @param {Number} numThreads Num of threads for this worker pool
18 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
19 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
20 */
21 constructor (numThreads, filePath, opts) {
22 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
23 if (!filePath) throw new Error('Please specify a file with a worker implementation')
24 this.numThreads = numThreads
25 this.workers = []
26 this.nextWorker = 0
27 this.opts = opts || { maxTasks: 1000 }
28 this.filePath = filePath
29 this._id = 0
30 // threadId as key and an integer value
31 this.tasks = new Map()
32 for (let i = 1; i <= numThreads; i++) {
33 this._newWorker()
34 }
35 }
36
37 async destroy () {
38 for (const worker of this.workers) {
39 await worker.terminate()
40 }
41 }
42
43 /**
44 * Execute the task specified into the constructor with the data parameter.
45 * @param {Any} the input for the task specified
46 * @returns {Promise} that is resolved when the task is done
47 */
48 async execute (data) {
49 // configure worker to handle message with the specified task
50 const worker = this._chooseWorker()
51 this.tasks.set(worker, this.tasks.get(worker) + 1)
52 const id = ++this._id
53 const res = this._execute(worker, id)
54 worker.postMessage({ data: data || _void, _id: id })
55 return res
56 }
57
58 _execute (worker, id) {
59 return new Promise((resolve, reject) => {
60 const listener = (message) => {
61 if (message._id === id) {
62 worker.port2.removeListener('message', listener)
63 this.tasks.set(worker, this.tasks.get(worker) - 1)
64 if (message.error) reject(message.error)
65 else resolve(message.data)
66 }
67 }
68 worker.port2.on('message', listener)
69 })
70 }
71
72 _chooseWorker () {
73 if ((this.workers.length - 1) === this.nextWorker) {
74 this.nextWorker = 0
75 return this.workers[this.nextWorker]
76 } else {
77 this.nextWorker++
78 return this.workers[this.nextWorker]
79 }
80 }
81
82 _newWorker () {
83 const worker = new Worker(this.filePath, { env: SHARE_ENV })
84 worker.on('error', this.opts.errorHandler || empty)
85 worker.on('online', this.opts.onlineHandler || empty)
86 // TODO handle properly when a thread exit
87 worker.on('exit', this.opts.exitHandler || empty)
88 this.workers.push(worker)
89 const { port1, port2 } = new MessageChannel()
90 worker.postMessage({ parent: port1 }, [port1])
91 worker.port1 = port1
92 worker.port2 = port2
93 // we will attach a listener for every task,
94 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
95 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
96 // init tasks map
97 this.tasks.set(worker, 0)
98 return worker
99 }
100 }
101
102 module.exports = FixedThreadPool