Fix tests run on node 10.x and add test assertion
[poolifier.git] / lib / fixed.js
CommitLineData
a32e02ba 1'use strict'
2const {
3 Worker, isMainThread, MessageChannel, SHARE_ENV
4} = require('worker_threads')
a32e02ba 5const { generateID } = require('./util')
6
50811da2 7function empty () {}
a32e02ba 8/**
9 * A thread pool with a static number of threads , is possible to execute tasks in sync or async mode as you prefer. <br>
10 * This pool will select the worker thread in a round robin fashion. <br>
11 * @author Alessandro Pio Ardizio
12 * @since 0.0.1
13 */
14class FixedThreadPool {
15 /**
16 *
17 * @param {Number} numThreads Num of threads for this worker pool
506c2a14 18 * @param {string} a file path with implementation of @see ThreadWorker class, relative path is fine
bf962cba 19 * @param {Object} an object with possible options for example errorHandler, onlineHandler.
a32e02ba 20 */
506c2a14 21 constructor (numThreads, filePath, opts) {
a32e02ba 22 if (!isMainThread) throw new Error('Cannot start a thread pool from a worker thread !!!')
506c2a14 23 if (!filePath) throw new Error('Please specify a file with a worker implementation')
a32e02ba 24 this.numThreads = numThreads
25 this.workers = []
26 this.nextWorker = 0
27 this.opts = opts || { maxTasks: 1000 }
506c2a14 28 this.filePath = filePath
a32e02ba 29 // threadId as key and an integer value
30 this.tasks = new Map()
31 for (let i = 1; i <= numThreads; i++) {
32 this._newWorker()
33 }
34 }
35
50811da2 36 destroy () {
37 for (const worker of this.workers) {
38 worker.terminate()
39 }
50811da2 40 }
41
a32e02ba 42 /**
43 * Execute the task specified into the constructor with the data parameter.
44 * @param {Any} the input for the task specified
45 * @returns {Promise} that is resolved when the task is done
46 */
47 async execute (data) {
48 // configure worker to handle message with the specified task
49 const worker = this._chooseWorker()
50 this.tasks.set(worker, this.tasks.get(worker) + 1)
a32e02ba 51 const id = generateID()
a32e02ba 52 const res = this._execute(worker, id)
506c2a14 53 worker.postMessage({ data: data, _id: id })
a32e02ba 54 return res
55 }
56
57 _execute (worker, id) {
58 return new Promise((resolve, reject) => {
59 const listener = (message) => {
60 if (message._id === id) {
61 worker.port2.removeListener('message', listener)
a32e02ba 62 this.tasks.set(worker, this.tasks.get(worker) - 1)
63 resolve(message.data)
64 }
65 }
66 worker.port2.on('message', listener)
67 })
68 }
69
70 _chooseWorker () {
71 if ((this.workers.length - 1) === this.nextWorker) {
72 this.nextWorker = 0
73 return this.workers[this.nextWorker]
74 } else {
75 this.nextWorker++
76 return this.workers[this.nextWorker]
77 }
78 }
79
80 _newWorker () {
506c2a14 81 const worker = new Worker(this.filePath, { env: SHARE_ENV })
50811da2 82 worker.on('error', this.opts.errorHandler || empty)
50811da2 83 worker.on('online', this.opts.onlineHandler || empty)
bf962cba 84 // TODO handle properly when a thread exit
8a306b85 85 worker.on('exit', this.opts.exitHandler || empty)
a32e02ba 86 this.workers.push(worker)
87 const { port1, port2 } = new MessageChannel()
88 worker.postMessage({ parent: port1 }, [port1])
89 worker.port1 = port1
90 worker.port2 = port2
91 // we will attach a listener for every task,
92 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
bf962cba 93 worker.port2.setMaxListeners(this.opts.maxTasks || 1000)
a32e02ba 94 // init tasks map
95 this.tasks.set(worker, 0)
96 return worker
97 }
98}
99
100module.exports = FixedThreadPool