]>
Commit | Line | Data |
---|---|---|
1 | import { isKillBehavior, KillBehaviors } from '../../worker/worker-options' | |
2 | import type { PoolOptions } from '../abstract-pool' | |
3 | import type { ThreadWorkerWithMessageChannel } from './fixed' | |
4 | import { FixedThreadPool } from './fixed' | |
5 | ||
6 | /** | |
7 | * A thread pool with a dynamic number of threads, but a guaranteed minimum number of threads. | |
8 | * | |
9 | * This thread pool creates new threads when the others are busy, up to the maximum number of threads. | |
10 | * When the maximum number of threads is reached, an event is emitted. If you want to listen to this event, use the pool's `emitter`. | |
11 | * | |
12 | * @template Data Type of data sent to the worker. This can only be serializable data. | |
13 | * @template Response Type of response of execution. This can only be serializable data. | |
14 | * | |
15 | * @author [Alessandro Pio Ardizio](https://github.com/pioardi) | |
16 | * @since 0.0.1 | |
17 | */ | |
18 | export class DynamicThreadPool< | |
19 | Data = unknown, | |
20 | Response = unknown | |
21 | > extends FixedThreadPool<Data, Response> { | |
22 | /** | |
23 | * Constructs a new poolifier dynamic thread pool. | |
24 | * | |
25 | * @param min Minimum number of threads which are always active. | |
26 | * @param max Maximum number of threads that can be created by this pool. | |
27 | * @param filePath Path to an implementation of a `ThreadWorker` file, which can be relative or absolute. | |
28 | * @param opts Options for this dynamic thread pool. Default: `{ maxTasks: 1000 }` | |
29 | */ | |
30 | public constructor ( | |
31 | min: number, | |
32 | public readonly max: number, | |
33 | filePath: string, | |
34 | opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 } | |
35 | ) { | |
36 | super(min, filePath, opts) | |
37 | } | |
38 | ||
39 | /** | |
40 | * Choose a thread for the next task. | |
41 | * | |
42 | * It will first check for and return an idle thread. | |
43 | * If all threads are busy, then it will try to create a new one up to the `max` thread count. | |
44 | * If the max thread count is reached, the emitter will emit a `FullPool` event and it will fall back to using a round robin algorithm to distribute the load. | |
45 | * | |
46 | * @returns Thread worker. | |
47 | */ | |
48 | protected chooseWorker (): ThreadWorkerWithMessageChannel { | |
49 | for (const [worker, numberOfTasks] of this.tasks) { | |
50 | if (numberOfTasks === 0) { | |
51 | // A worker is free, use it | |
52 | return worker | |
53 | } | |
54 | } | |
55 | ||
56 | if (this.workers.length === this.max) { | |
57 | this.emitter.emit('FullPool') | |
58 | return super.chooseWorker() | |
59 | } | |
60 | ||
61 | // All workers are busy, create a new worker | |
62 | const workerCreated = this.createAndSetupWorker() | |
63 | this.registerWorkerMessageListener<Data>(workerCreated, message => { | |
64 | const tasksInProgress = this.tasks.get(workerCreated) | |
65 | if ( | |
66 | isKillBehavior(KillBehaviors.HARD, message.kill) || | |
67 | tasksInProgress === 0 | |
68 | ) { | |
69 | // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime) | |
70 | void this.destroyWorker(workerCreated) | |
71 | } | |
72 | }) | |
73 | return workerCreated | |
74 | } | |
75 | } |