Bump expect from 24.9.0 to 25.1.0
[poolifier.git] / lib / fixed.js
1 'use strict'
2 const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4 } = require('worker_threads')
5
6 function empty () {}
7 /**
8 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
9 * This pool will select the worker thread in a round robin fashion. <br>
10 * @author Alessandro Pio Ardizio
11 * @since 0.0.1
12 */
13 class FixedThreadPool {
14 /**
15 *
16 * @param {Number} numThreads Num of threads for this worker pool
17 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
18 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
19 */
20 constructor (numThreads, filePath, opts) {
21 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
22 if (!filePath) throw new Error('Please specify a file with a worker implementation')
23 this.numThreads = numThreads
24 this.workers = []
25 this.nextWorker = 0
26 this.opts = opts || { maxTasks: 1000 }
27 this.filePath = filePath
28 this._id = 0
29 // threadId as key and an integer value
30 this.tasks = new Map()
31 for (let i = 1; i <= numThreads; i++) {
32 this._newWorker()
33 }
34 }
35
36 destroy () {
37 for (const worker of this.workers) {
38 worker.terminate()
39 }
40 }
41
42 /**
43 * Execute the task specified into the constructor with the data parameter.
44 * @param {Any} the input for the task specified
45 * @returns {Promise} that is resolved when the task is done
46 */
47 async execute (data) {
48 // configure worker to handle message with the specified task
49 const worker = this._chooseWorker()
50 this.tasks.set(worker, this.tasks.get(worker) + 1)
51 const id = ++this._id
52 const res = this._execute(worker, id)
53 worker.postMessage({ data: data, _id: id })
54 return res
55 }
56
57 _execute (worker, id) {
58 return new Promise((resolve, reject) => {
59 const listener = (message) => {
60 if (message._id === id) {
61 worker.port2.removeListener('message', listener)
62 this.tasks.set(worker, this.tasks.get(worker) - 1)
63 resolve(message.data)
64 }
65 }
66 worker.port2.on('message', listener)
67 })
68 }
69
70 _chooseWorker () {
71 if ((this.workers.length - 1) === this.nextWorker) {
72 this.nextWorker = 0
73 return this.workers[this.nextWorker]
74 } else {
75 this.nextWorker++
76 return this.workers[this.nextWorker]
77 }
78 }
79
80 _newWorker () {
81 const worker = new Worker(this.filePath, { env: SHARE_ENV })
82 worker.on('error', this.opts.errorHandler || empty)
83 worker.on('online', this.opts.onlineHandler || empty)
84 // TODO handle properly when a thread exit
85 worker.on('exit', this.opts.exitHandler || empty)
86 this.workers.push(worker)
87 const { port1, port2 } = new MessageChannel()
88 worker.postMessage({ parent: port1 }, [port1])
89 worker.port1 = port1
90 worker.port2 = port2
91 // we will attach a listener for every task,
92 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
93 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
94 // init tasks map
95 this.tasks.set(worker, 0)
96 return worker
97 }
98 }
99
100 module.exports = FixedThreadPool