Rewrite to TypeScript
[poolifier.git] / src / dynamic.ts
1 import { EventEmitter } from 'events'
2 import type { FixedThreadPoolOptions, WorkerWithMessageChannel } from './fixed'
3 import FixedThreadPool from './fixed'
4
5 class MyEmitter extends EventEmitter {}
6
7 export type DynamicThreadPoolOptions = FixedThreadPoolOptions
8
9 /**
10 * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer.
11 *
12 * This thread pool will create new workers when the other ones are busy, until the max number of threads,
13 * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method.
14 *
15 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
16 * @since 0.0.1
17 */
18 export default class DynamicThreadPool<
19 Data = any,
20 Response = any
21 > extends FixedThreadPool<Data, Response> {
22 public readonly emitter: MyEmitter
23
24 /**
25 * @param min Min number of threads that will be always active
26 * @param max Max number of threads that will be active
27 * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine.
28 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
29 */
30 public constructor (
31 public readonly min: number,
32 public readonly max: number,
33 public readonly filename: string,
34 public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 }
35 ) {
36 super(min, filename, opts)
37
38 this.emitter = new MyEmitter()
39 }
40
41 protected _chooseWorker (): WorkerWithMessageChannel {
42 let worker: WorkerWithMessageChannel
43 for (const entry of this.tasks) {
44 if (entry[1] === 0) {
45 worker = entry[0]
46 break
47 }
48 }
49
50 if (worker) {
51 // a worker is free, use it
52 return worker
53 } else {
54 if (this.workers.length === this.max) {
55 this.emitter.emit('FullPool')
56 return super._chooseWorker()
57 }
58 // all workers are busy create a new worker
59 const worker = this._newWorker()
60 worker.port2.on('message', (message) => {
61 if (message.kill) {
62 worker.postMessage({ kill: 1 })
63 worker.terminate()
64 }
65 })
66 return worker
67 }
68 }
69 }
70
71 module.exports = DynamicThreadPool