Integrate rollup bundler (#120)
[poolifier.git] / src / pools / cluster / dynamic.ts
CommitLineData
fa699c42 1import { EventEmitter } from 'events'
325f50bc
S
2import type { FixedClusterPoolOptions, WorkerWithMessageChannel } from './fixed'
3import { FixedClusterPool } from './fixed'
f045358d 4
4ade5f1f
S
5class MyEmitter extends EventEmitter {}
6
325f50bc 7export type DynamicClusterPoolOptions = FixedClusterPoolOptions
4ade5f1f
S
8
9/**
325f50bc 10 * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
4ade5f1f 11 *
325f50bc
S
12 * This cluster pool will create new workers when the other ones are busy, until the max number of workers,
13 * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method.
4ade5f1f 14 *
325f50bc
S
15 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
16 * @since 2.0.0
4ade5f1f 17 */
325f50bc
S
18export class DynamicClusterPool<
19 // eslint-disable-next-line @typescript-eslint/no-explicit-any
4ade5f1f 20 Data = any,
325f50bc 21 // eslint-disable-next-line @typescript-eslint/no-explicit-any
4ade5f1f 22 Response = any
325f50bc 23> extends FixedClusterPool<Data, Response> {
4ade5f1f
S
24 public readonly emitter: MyEmitter
25
26 /**
325f50bc
S
27 * @param min Min number of workers that will be always active
28 * @param max Max number of workers that will be active
29 * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine.
4ade5f1f
S
30 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
31 */
32 public constructor (
33 public readonly min: number,
34 public readonly max: number,
35 public readonly filename: string,
325f50bc 36 public readonly opts: DynamicClusterPoolOptions = { maxTasks: 1000 }
4ade5f1f
S
37 ) {
38 super(min, filename, opts)
39
40 this.emitter = new MyEmitter()
41 }
42
fa0f5b28 43 protected chooseWorker (): WorkerWithMessageChannel {
ee99693b 44 let worker: WorkerWithMessageChannel | undefined
4ade5f1f
S
45 for (const entry of this.tasks) {
46 if (entry[1] === 0) {
47 worker = entry[0]
48 break
49 }
50 }
51
52 if (worker) {
53 // a worker is free, use it
54 return worker
55 } else {
56 if (this.workers.length === this.max) {
57 this.emitter.emit('FullPool')
fa0f5b28 58 return super.chooseWorker()
4ade5f1f
S
59 }
60 // all workers are busy create a new worker
fa0f5b28 61 const worker = this.newWorker()
325f50bc 62 worker.on('message', (message: { kill?: number }) => {
4ade5f1f 63 if (message.kill) {
325f50bc
S
64 worker.send({ kill: 1 })
65 worker.kill()
68b2f517
S
66 // clean workers from data structures
67 const workerIndex = this.workers.indexOf(worker)
68 this.workers.splice(workerIndex, 1)
69 this.tasks.delete(worker)
4ade5f1f
S
70 }
71 })
72 return worker
73 }
74 }
75}