Commit | Line | Data |
---|---|---|
ee99693b S |
1 | /* eslint-disable @typescript-eslint/strict-boolean-expressions */ |
2 | ||
f045358d S |
3 | import FixedThreadPool, { |
4 | FixedThreadPoolOptions, | |
5 | WorkerWithMessageChannel | |
6 | } from './fixed' | |
7 | ||
4ade5f1f | 8 | import { EventEmitter } from 'events' |
4ade5f1f S |
9 | |
10 | class MyEmitter extends EventEmitter {} | |
11 | ||
12 | export type DynamicThreadPoolOptions = FixedThreadPoolOptions | |
13 | ||
14 | /** | |
15 | * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer. | |
16 | * | |
17 | * This thread pool will create new workers when the other ones are busy, until the max number of threads, | |
18 | * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method. | |
19 | * | |
20 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
21 | * @since 0.0.1 | |
22 | */ | |
23 | export default class DynamicThreadPool< | |
24 | Data = any, | |
25 | Response = any | |
26 | > extends FixedThreadPool<Data, Response> { | |
27 | public readonly emitter: MyEmitter | |
28 | ||
29 | /** | |
30 | * @param min Min number of threads that will be always active | |
31 | * @param max Max number of threads that will be active | |
32 | * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine. | |
33 | * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` | |
34 | */ | |
35 | public constructor ( | |
36 | public readonly min: number, | |
37 | public readonly max: number, | |
38 | public readonly filename: string, | |
39 | public readonly opts: DynamicThreadPoolOptions = { maxTasks: 1000 } | |
40 | ) { | |
41 | super(min, filename, opts) | |
42 | ||
43 | this.emitter = new MyEmitter() | |
44 | } | |
45 | ||
46 | protected _chooseWorker (): WorkerWithMessageChannel { | |
ee99693b | 47 | let worker: WorkerWithMessageChannel | undefined |
4ade5f1f S |
48 | for (const entry of this.tasks) { |
49 | if (entry[1] === 0) { | |
50 | worker = entry[0] | |
51 | break | |
52 | } | |
53 | } | |
54 | ||
55 | if (worker) { | |
56 | // a worker is free, use it | |
57 | return worker | |
58 | } else { | |
59 | if (this.workers.length === this.max) { | |
60 | this.emitter.emit('FullPool') | |
61 | return super._chooseWorker() | |
62 | } | |
63 | // all workers are busy create a new worker | |
64 | const worker = this._newWorker() | |
ee99693b | 65 | worker.port2?.on('message', (message: { kill?: number }) => { |
4ade5f1f S |
66 | if (message.kill) { |
67 | worker.postMessage({ kill: 1 }) | |
ee99693b S |
68 | // eslint-disable-next-line no-void |
69 | void worker.terminate() | |
68b2f517 S |
70 | // clean workers from data structures |
71 | const workerIndex = this.workers.indexOf(worker) | |
72 | this.workers.splice(workerIndex, 1) | |
73 | this.tasks.delete(worker) | |
4ade5f1f S |
74 | } |
75 | }) | |
76 | return worker | |
77 | } | |
78 | } | |
79 | } | |
80 | ||
81 | module.exports = DynamicThreadPool |