667fa07a29780a4837c8f5a7f3af7643c8bb61e4
[poolifier.git] / src / dynamic.ts
1 import FixedThreadPool, {
2 FixedThreadPoolOptions,
3 WorkerWithMessageChannel
4 } from './fixed'
5
6 import { EventEmitter } from 'events'
7
8 class MyEmitter extends EventEmitter {}
9
10 export type DynamicThreadPoolOptions = FixedThreadPoolOptions
11
12 /**
13 * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer.
14 *
15 * This thread pool will create new workers when the other ones are busy, until the max number of threads,
16 * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method.
17 *
18 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
19 * @since 0.0.1
20 */
21 export default class DynamicThreadPool<
22 Data = any,
23 Response = any
24 > extends FixedThreadPool<Data, Response> {
25 public readonly emitter: MyEmitter
26
27 /**
28 * @param min Min number of threads that will be always active
29 * @param max Max number of threads that will be active
30 * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine.
31 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
32 */
33 public constructor (
34 public readonly min: number,
35 public readonly max: number,
36 public readonly filename: string,
37 public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 }
38 ) {
39 super(min, filename, opts)
40
41 this.emitter = new MyEmitter()
42 }
43
44 protected _chooseWorker (): WorkerWithMessageChannel {
45 let worker: WorkerWithMessageChannel
46 for (const entry of this.tasks) {
47 if (entry[1] === 0) {
48 worker = entry[0]
49 break
50 }
51 }
52
53 if (worker) {
54 // a worker is free, use it
55 return worker
56 } else {
57 if (this.workers.length === this.max) {
58 this.emitter.emit('FullPool')
59 return super._chooseWorker()
60 }
61 // all workers are busy create a new worker
62 const worker = this._newWorker()
63 worker.port2.on('message', (message) => {
64 if (message.kill) {
65 worker.postMessage({ kill: 1 })
66 worker.terminate()
67 // clean workers from data structures
68 const workerIndex = this.workers.indexOf(worker)
69 this.workers.splice(workerIndex, 1)
70 this.tasks.delete(worker)
71 }
72 })
73 return worker
74 }
75 }
76 }
77
78 module.exports = DynamicThreadPool