Commit | Line | Data |
---|---|---|
f045358d S |
1 | import FixedThreadPool, { |
2 | FixedThreadPoolOptions, | |
3 | WorkerWithMessageChannel | |
4 | } from './fixed' | |
5 | ||
4ade5f1f | 6 | import { EventEmitter } from 'events' |
4ade5f1f S |
7 | |
8 | class MyEmitter extends EventEmitter {} | |
9 | ||
10 | export type DynamicThreadPoolOptions = FixedThreadPoolOptions | |
11 | ||
12 | /** | |
13 | * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer. | |
14 | * | |
15 | * This thread pool will create new workers when the other ones are busy, until the max number of threads, | |
16 | * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method. | |
17 | * | |
18 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
19 | * @since 0.0.1 | |
20 | */ | |
21 | export default class DynamicThreadPool< | |
22 | Data = any, | |
23 | Response = any | |
24 | > extends FixedThreadPool<Data, Response> { | |
25 | public readonly emitter: MyEmitter | |
26 | ||
27 | /** | |
28 | * @param min Min number of threads that will be always active | |
29 | * @param max Max number of threads that will be active | |
30 | * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine. | |
31 | * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` | |
32 | */ | |
33 | public constructor ( | |
34 | public readonly min: number, | |
35 | public readonly max: number, | |
36 | public readonly filename: string, | |
37 | public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 } | |
38 | ) { | |
39 | super(min, filename, opts) | |
40 | ||
41 | this.emitter = new MyEmitter() | |
42 | } | |
43 | ||
44 | protected _chooseWorker (): WorkerWithMessageChannel { | |
45 | let worker: WorkerWithMessageChannel | |
46 | for (const entry of this.tasks) { | |
47 | if (entry[1] === 0) { | |
48 | worker = entry[0] | |
49 | break | |
50 | } | |
51 | } | |
52 | ||
53 | if (worker) { | |
54 | // a worker is free, use it | |
55 | return worker | |
56 | } else { | |
57 | if (this.workers.length === this.max) { | |
58 | this.emitter.emit('FullPool') | |
59 | return super._chooseWorker() | |
60 | } | |
61 | // all workers are busy create a new worker | |
62 | const worker = this._newWorker() | |
63 | worker.port2.on('message', (message) => { | |
64 | if (message.kill) { | |
65 | worker.postMessage({ kill: 1 }) | |
66 | worker.terminate() | |
68b2f517 S |
67 | // clean workers from data structures |
68 | const workerIndex = this.workers.indexOf(worker) | |
69 | this.workers.splice(workerIndex, 1) | |
70 | this.tasks.delete(worker) | |
4ade5f1f S |
71 | } |
72 | }) | |
73 | return worker | |
74 | } | |
75 | } | |
76 | } | |
77 | ||
78 | module.exports = DynamicThreadPool |