feat: expose worker type in pool information
[poolifier.git] / src / pools / cluster / fixed.ts
1 import type { ClusterSettings, Worker } from 'node:cluster'
2 import cluster from 'node:cluster'
3 import type { MessageValue } from '../../utility-types'
4 import { AbstractPool } from '../abstract-pool'
5 import {
6 type PoolOptions,
7 type PoolType,
8 PoolTypes,
9 type WorkerType,
10 WorkerTypes
11 } from '../pool'
12
13 /**
14 * Options for a poolifier cluster pool.
15 */
16 export interface ClusterPoolOptions extends PoolOptions<Worker> {
17 /**
18 * Key/value pairs to add to worker process environment.
19 *
20 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
21 */
22 // eslint-disable-next-line @typescript-eslint/no-explicit-any
23 env?: any
24 /**
25 * Cluster settings.
26 *
27 * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
28 */
29 settings?: ClusterSettings
30 }
31
32 /**
33 * A cluster pool with a fixed number of workers.
34 *
35 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
36 *
37 * This pool selects the workers in a round robin fashion.
38 *
39 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
40 * @typeParam Response - Type of execution response. This can only be serializable data.
41 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
42 * @since 2.0.0
43 */
44 export class FixedClusterPool<
45 Data = unknown,
46 Response = unknown
47 > extends AbstractPool<Worker, Data, Response> {
48 /**
49 * Constructs a new poolifier fixed cluster pool.
50 *
51 * @param numberOfWorkers - Number of workers for this pool.
52 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
53 * @param opts - Options for this fixed cluster pool.
54 */
55 public constructor (
56 numberOfWorkers: number,
57 filePath: string,
58 public readonly opts: ClusterPoolOptions = {}
59 ) {
60 super(numberOfWorkers, filePath, opts)
61 }
62
63 /** @inheritDoc */
64 protected setupHook (): void {
65 cluster.setupPrimary({ ...this.opts.settings, exec: this.filePath })
66 }
67
68 /** @inheritDoc */
69 protected isMain (): boolean {
70 return cluster.isPrimary
71 }
72
73 /** @inheritDoc */
74 protected destroyWorker (worker: Worker): void {
75 this.sendToWorker(worker, { kill: 1 })
76 worker.kill()
77 }
78
79 /** @inheritDoc */
80 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
81 worker.send(message)
82 }
83
84 /** @inheritDoc */
85 protected registerWorkerMessageListener<Message extends Data | Response>(
86 worker: Worker,
87 listener: (message: MessageValue<Message>) => void
88 ): void {
89 worker.on('message', listener)
90 }
91
92 /** @inheritDoc */
93 protected createWorker (): Worker {
94 return cluster.fork(this.opts.env)
95 }
96
97 /** @inheritDoc */
98 protected afterWorkerSetup (worker: Worker): void {
99 // Listen to worker messages.
100 this.registerWorkerMessageListener(worker, super.workerListener())
101 }
102
103 /** @inheritDoc */
104 public get type (): PoolType {
105 return PoolTypes.fixed
106 }
107
108 /** @inheritDoc */
109 protected get worker (): WorkerType {
110 return WorkerTypes.cluster
111 }
112
113 /** @inheritDoc */
114 protected get minSize (): number {
115 return this.numberOfWorkers
116 }
117
118 /** @inheritDoc */
119 protected get maxSize (): number {
120 return this.numberOfWorkers
121 }
122
123 /** @inheritDoc */
124 protected get full (): boolean {
125 return this.workerNodes.length >= this.numberOfWorkers
126 }
127
128 /** @inheritDoc */
129 protected get busy (): boolean {
130 return this.internalBusy()
131 }
132 }