Added an emitter to the dynamic pool , example splitted and improved
[poolifier.git] / lib / fixed.js
1 'use strict'
2 const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4 } = require('worker_threads')
5 const path = require('path')
6 const { generateID } = require('./util')
7
8 function empty () {}
9 /**
10 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
11 * This pool will select the worker thread in a round robin fashion. <br>
12 * @author Alessandro Pio Ardizio
13 * @since 0.0.1
14 */
15 class FixedThreadPool {
16 /**
17 *
18 * @param {Number} numThreads Num of threads for this worker pool
19 * @param {string} a file path with implementation of @see ThreadWorker class
20 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
21 */
22 constructor (numThreads, filename, opts) {
23 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
24 if (!filename) throw new Error('Please specify a file with a worker implementation')
25 this.numThreads = numThreads
26 this.workers = []
27 this.nextWorker = 0
28 this.opts = opts || { maxTasks: 1000 }
29 this.filename = filename
30 // threadId as key and an integer value
31 this.tasks = new Map()
32 for (let i = 1; i <= numThreads; i++) {
33 this._newWorker()
34 }
35 }
36
37 destroy () {
38 for (const worker of this.workers) {
39 worker.terminate()
40 }
41 this.emitDestroy()
42 }
43
44 /**
45 * Execute the task specified into the constructor with the data parameter.
46 * @param {Any} the input for the task specified
47 * @returns {Promise} that is resolved when the task is done
48 */
49 async execute (data) {
50 // configure worker to handle message with the specified task
51 const worker = this._chooseWorker()
52 this.tasks.set(worker, this.tasks.get(worker) + 1)
53 const id = generateID()
54 data._id = id
55 const res = this._execute(worker, id)
56 worker.postMessage(data)
57 return res
58 }
59
60 _execute (worker, id) {
61 return new Promise((resolve, reject) => {
62 const listener = (message) => {
63 if (message._id === id) {
64 worker.port2.removeListener('message', listener)
65 this.tasks.set(worker, this.tasks.get(worker) - 1)
66 resolve(message.data)
67 }
68 }
69 worker.port2.on('message', listener)
70 })
71 }
72
73 _chooseWorker () {
74 if ((this.workers.length - 1) === this.nextWorker) {
75 this.nextWorker = 0
76 return this.workers[this.nextWorker]
77 } else {
78 this.nextWorker++
79 return this.workers[this.nextWorker]
80 }
81 }
82
83 _newWorker () {
84 const worker = new Worker(path.resolve(this.filename), { env: SHARE_ENV })
85 worker.on('error', this.opts.errorHandler || empty)
86 worker.on('online', this.opts.onlineHandler || empty)
87 // TODO handle properly when a thread exit
88 worker.on('exit', this.opts.exitHandler || empty)
89 this.workers.push(worker)
90 const { port1, port2 } = new MessageChannel()
91 worker.postMessage({ parent: port1 }, [port1])
92 worker.port1 = port1
93 worker.port2 = port2
94 // we will attach a listener for every task,
95 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
96 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
97 // init tasks map
98 this.tasks.set(worker, 0)
99 return worker
100 }
101 }
102
103 module.exports = FixedThreadPool