refactor: imports cleanup
[poolifier.git] / src / pools / cluster / fixed.ts
1 import cluster, { type ClusterSettings, type Worker } from 'node:cluster'
2 import type { MessageValue } from '../../utility-types'
3 import { AbstractPool } from '../abstract-pool'
4 import {
5 type PoolOptions,
6 type PoolType,
7 PoolTypes,
8 type WorkerType,
9 WorkerTypes
10 } from '../pool'
11
12 /**
13 * Options for a poolifier cluster pool.
14 */
15 export interface ClusterPoolOptions extends PoolOptions<Worker> {
16 /**
17 * Key/value pairs to add to worker process environment.
18 *
19 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
20 */
21 // eslint-disable-next-line @typescript-eslint/no-explicit-any
22 env?: any
23 /**
24 * Cluster settings.
25 *
26 * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
27 */
28 settings?: ClusterSettings
29 }
30
31 /**
32 * A cluster pool with a fixed number of workers.
33 *
34 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
35 *
36 * This pool selects the workers in a round robin fashion.
37 *
38 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
39 * @typeParam Response - Type of execution response. This can only be serializable data.
40 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
41 * @since 2.0.0
42 */
43 export class FixedClusterPool<
44 Data = unknown,
45 Response = unknown
46 > extends AbstractPool<Worker, Data, Response> {
47 /**
48 * Constructs a new poolifier fixed cluster pool.
49 *
50 * @param numberOfWorkers - Number of workers for this pool.
51 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
52 * @param opts - Options for this fixed cluster pool.
53 */
54 public constructor (
55 numberOfWorkers: number,
56 filePath: string,
57 public readonly opts: ClusterPoolOptions = {}
58 ) {
59 super(numberOfWorkers, filePath, opts)
60 }
61
62 /** @inheritDoc */
63 protected setupHook (): void {
64 cluster.setupPrimary({ ...this.opts.settings, exec: this.filePath })
65 }
66
67 /** @inheritDoc */
68 protected isMain (): boolean {
69 return cluster.isPrimary
70 }
71
72 /** @inheritDoc */
73 protected destroyWorker (worker: Worker): void {
74 this.sendToWorker(worker, { kill: 1 })
75 worker.kill()
76 }
77
78 /** @inheritDoc */
79 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
80 worker.send(message)
81 }
82
83 /** @inheritDoc */
84 protected registerWorkerMessageListener<Message extends Data | Response>(
85 worker: Worker,
86 listener: (message: MessageValue<Message>) => void
87 ): void {
88 worker.on('message', listener)
89 }
90
91 /** @inheritDoc */
92 protected createWorker (): Worker {
93 return cluster.fork(this.opts.env)
94 }
95
96 /** @inheritDoc */
97 protected afterWorkerSetup (worker: Worker): void {
98 // Listen to worker messages.
99 this.registerWorkerMessageListener(worker, super.workerListener())
100 }
101
102 /** @inheritDoc */
103 public get type (): PoolType {
104 return PoolTypes.fixed
105 }
106
107 /** @inheritDoc */
108 protected get worker (): WorkerType {
109 return WorkerTypes.cluster
110 }
111
112 /** @inheritDoc */
113 protected get minSize (): number {
114 return this.numberOfWorkers
115 }
116
117 /** @inheritDoc */
118 protected get maxSize (): number {
119 return this.numberOfWorkers
120 }
121
122 /** @inheritDoc */
123 protected get full (): boolean {
124 return this.workerNodes.length >= this.numberOfWorkers
125 }
126
127 /** @inheritDoc */
128 protected get busy (): boolean {
129 return this.internalBusy()
130 }
131 }