1 import cluster
, { type ClusterSettings
, type Worker
} from
'node:cluster'
2 import type { MessageValue
} from
'../../utility-types'
3 import { AbstractPool
} from
'../abstract-pool'
4 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
5 import { type WorkerType
, WorkerTypes
} from
'../worker'
8 * Options for a poolifier cluster pool.
10 export interface ClusterPoolOptions
extends PoolOptions
<Worker
> {
12 * Key/value pairs to add to worker process environment.
14 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
16 env
?: Record
<string, unknown
>
20 * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
22 settings
?: ClusterSettings
26 * A cluster pool with a fixed number of workers.
28 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
29 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
30 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
33 export class FixedClusterPool
<
36 > extends AbstractPool
<Worker
, Data
, Response
> {
38 * Constructs a new poolifier fixed cluster pool.
40 * @param numberOfWorkers - Number of workers for this pool.
41 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
42 * @param opts - Options for this fixed cluster pool.
45 numberOfWorkers
: number,
47 protected readonly opts
: ClusterPoolOptions
= {}
49 super(numberOfWorkers
, filePath
, opts
)
53 protected setupHook (): void {
54 cluster
.setupPrimary({ ...this.opts
.settings
, exec
: this.filePath
})
58 protected isMain (): boolean {
59 return cluster
.isPrimary
63 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
64 this.flushTasksQueue(workerNodeKey
)
65 // FIXME: wait for tasks to be finished
66 const worker
= this.workerNodes
[workerNodeKey
].worker
67 const workerExitPromise
= new Promise
<void>(resolve
=> {
68 worker
.on('exit', () => {
72 worker
.on('disconnect', () => {
75 this.sendToWorker(workerNodeKey
, { kill
: true, workerId
: worker
.id
})
77 await workerExitPromise
81 protected sendToWorker (
82 workerNodeKey
: number,
83 message
: MessageValue
<Data
>
85 this.workerNodes
[workerNodeKey
].worker
.send(message
)
89 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
90 this.sendToWorker(workerNodeKey
, {
92 workerId
: this.workerNodes
[workerNodeKey
].worker
.id
97 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
98 workerNodeKey
: number,
99 listener
: (message
: MessageValue
<Message
>) => void
101 this.workerNodes
[workerNodeKey
].worker
.on('message', listener
)
105 protected createWorker (): Worker
{
106 return cluster
.fork(this.opts
.env
)
110 protected get
type (): PoolType
{
111 return PoolTypes
.fixed
115 protected get
worker (): WorkerType
{
116 return WorkerTypes
.cluster
120 protected get
minSize (): number {
121 return this.numberOfWorkers
125 protected get
maxSize (): number {
126 return this.numberOfWorkers
130 protected get
busy (): boolean {
131 return this.internalBusy()