Encapsulate logic of cluster and thread worker/pool (#116)
[poolifier.git] / src / pools / thread / dynamic.ts
1 import type { MessageValue } from '../../utility-types'
2 import type { PoolOptions } from '../abstract-pool'
3 import type { ThreadWorkerWithMessageChannel } from './fixed'
4 import { FixedThreadPool } from './fixed'
5
6 /**
7 * A thread pool with a min/max number of threads, is possible to execute tasks in sync or async mode as you prefer.
8 *
9 * This thread pool will create new workers when the other ones are busy, until the max number of threads,
10 * when the max number of threads is reached, an event will be emitted, if you want to listen this event use the emitter method.
11 *
12 * @author [Alessandro Pio Ardizio](https://github.com/pioardi)
13 * @since 0.0.1
14 */
15 export class DynamicThreadPool<
16 // eslint-disable-next-line @typescript-eslint/no-explicit-any
17 Data = any,
18 // eslint-disable-next-line @typescript-eslint/no-explicit-any
19 Response = any
20 > extends FixedThreadPool<Data, Response> {
21 /**
22 * @param min Min number of threads that will be always active
23 * @param max Max number of threads that will be active
24 * @param filename A file path with implementation of `ThreadWorker` class, relative path is fine.
25 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
26 */
27 public constructor (
28 min: number,
29 public readonly max: number,
30 filename: string,
31 opts: PoolOptions<ThreadWorkerWithMessageChannel> = { maxTasks: 1000 }
32 ) {
33 super(min, filename, opts)
34 }
35
36 protected chooseWorker (): ThreadWorkerWithMessageChannel {
37 let worker: ThreadWorkerWithMessageChannel | undefined
38 for (const entry of this.tasks) {
39 if (entry[1] === 0) {
40 worker = entry[0]
41 break
42 }
43 }
44
45 if (worker) {
46 // a worker is free, use it
47 return worker
48 } else {
49 if (this.workers.length === this.max) {
50 this.emitter.emit('FullPool')
51 return super.chooseWorker()
52 }
53 // all workers are busy create a new worker
54 const worker = this.internalNewWorker()
55 worker.port2?.on('message', (message: MessageValue<Data>) => {
56 if (message.kill) {
57 this.sendToWorker(worker, { kill: 1 })
58 void this.destroyWorker(worker)
59 // clean workers from data structures
60 const workerIndex = this.workers.indexOf(worker)
61 this.workers.splice(workerIndex, 1)
62 this.tasks.delete(worker)
63 }
64 })
65 return worker
66 }
67 }
68 }