Only allow primitive JSON for transfer between worker and main worker (#128)
[poolifier.git] / src / pools / cluster / dynamic.ts
CommitLineData
c97c7edb 1import type { Worker } from 'cluster'
d3c8a1a8 2import type { JSONValue, MessageValue } from '../../utility-types'
c97c7edb 3import type { ClusterPoolOptions } from './fixed'
325f50bc 4import { FixedClusterPool } from './fixed'
f045358d 5
4ade5f1f 6/**
325f50bc 7 * A cluster pool with a min/max number of workers, is possible to execute tasks in sync or async mode as you prefer.
4ade5f1f 8 *
325f50bc
S
9 * This cluster pool will create new workers when the other ones are busy, until the max number of workers,
10 * when the max number of workers is reached, an event will be emitted, if you want to listen this event use the emitter method.
4ade5f1f 11 *
325f50bc
S
12 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
13 * @since 2.0.0
4ade5f1f 14 */
325f50bc 15export class DynamicClusterPool<
d3c8a1a8
S
16 Data extends JSONValue = JSONValue,
17 Response extends JSONValue = JSONValue
325f50bc 18> extends FixedClusterPool<Data, Response> {
4ade5f1f 19 /**
325f50bc
S
20 * @param min Min number of workers that will be always active
21 * @param max Max number of workers that will be active
22 * @param filename A file path with implementation of `ClusterWorker` class, relative path is fine.
4ade5f1f
S
23 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
24 */
25 public constructor (
c97c7edb 26 min: number,
4ade5f1f 27 public readonly max: number,
c97c7edb
S
28 filename: string,
29 opts: ClusterPoolOptions = { maxTasks: 1000 }
4ade5f1f
S
30 ) {
31 super(min, filename, opts)
4ade5f1f
S
32 }
33
c97c7edb
S
34 protected chooseWorker (): Worker {
35 let worker: Worker | undefined
4ade5f1f
S
36 for (const entry of this.tasks) {
37 if (entry[1] === 0) {
38 worker = entry[0]
39 break
40 }
41 }
42
43 if (worker) {
44 // a worker is free, use it
45 return worker
46 } else {
47 if (this.workers.length === this.max) {
48 this.emitter.emit('FullPool')
fa0f5b28 49 return super.chooseWorker()
4ade5f1f
S
50 }
51 // all workers are busy create a new worker
c97c7edb
S
52 const worker = this.internalNewWorker()
53 worker.on('message', (message: MessageValue<Data>) => {
4ade5f1f 54 if (message.kill) {
c97c7edb
S
55 this.sendToWorker(worker, { kill: 1 })
56 void this.destroyWorker(worker)
68b2f517
S
57 // clean workers from data structures
58 const workerIndex = this.workers.indexOf(worker)
59 this.workers.splice(workerIndex, 1)
60 this.tasks.delete(worker)
4ade5f1f
S
61 }
62 })
63 return worker
64 }
65 }
66}