faa68fc74ce96ece284bb953f53db02c47c154e9
[poolifier.git] / src / pools / cluster / fixed.ts
1 import type { ClusterSettings, Worker } from 'node:cluster'
2 import cluster from 'node:cluster'
3 import type { MessageValue } from '../../utility-types'
4 import { AbstractPool } from '../abstract-pool'
5 import type { PoolOptions } from '../pool'
6 import { PoolType } from '../pool'
7
8 /**
9 * Options for a poolifier cluster pool.
10 */
11 export interface ClusterPoolOptions extends PoolOptions<Worker> {
12 /**
13 * Key/value pairs to add to worker process environment.
14 *
15 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
16 */
17 // eslint-disable-next-line @typescript-eslint/no-explicit-any
18 env?: any
19 /**
20 * Cluster settings.
21 *
22 * @see https://nodejs.org/api/cluster.html#cluster_cluster_settings
23 */
24 settings?: ClusterSettings
25 }
26
27 /**
28 * A cluster pool with a fixed number of workers.
29 *
30 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
31 *
32 * This pool selects the workers in a round robin fashion.
33 *
34 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
35 * @typeParam Response - Type of response of execution. This can only be serializable data.
36 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
37 * @since 2.0.0
38 */
39 export class FixedClusterPool<
40 Data = unknown,
41 Response = unknown
42 > extends AbstractPool<Worker, Data, Response> {
43 /**
44 * Constructs a new poolifier fixed cluster pool.
45 *
46 * @param numberOfWorkers - Number of workers for this pool.
47 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
48 * @param opts - Options for this fixed cluster pool.
49 */
50 public constructor (
51 numberOfWorkers: number,
52 filePath: string,
53 public readonly opts: ClusterPoolOptions = {}
54 ) {
55 super(numberOfWorkers, filePath, opts)
56 }
57
58 /** @inheritDoc */
59 protected setupHook (): void {
60 cluster.setupPrimary({ ...this.opts.settings, exec: this.filePath })
61 }
62
63 /** @inheritDoc */
64 protected isMain (): boolean {
65 return cluster.isPrimary
66 }
67
68 /** @inheritDoc */
69 protected destroyWorker (worker: Worker): void {
70 this.sendToWorker(worker, { kill: 1 })
71 worker.kill()
72 }
73
74 /** @inheritDoc */
75 protected sendToWorker (worker: Worker, message: MessageValue<Data>): void {
76 worker.send(message)
77 }
78
79 /** @inheritDoc */
80 protected registerWorkerMessageListener<Message extends Data | Response>(
81 worker: Worker,
82 listener: (message: MessageValue<Message>) => void
83 ): void {
84 worker.on('message', listener)
85 }
86
87 /** @inheritDoc */
88 protected createWorker (): Worker {
89 return cluster.fork(this.opts.env)
90 }
91
92 /** @inheritDoc */
93 protected afterWorkerSetup (worker: Worker): void {
94 // Listen to worker messages.
95 this.registerWorkerMessageListener(worker, super.workerListener())
96 }
97
98 /** @inheritDoc */
99 public get type (): PoolType {
100 return PoolType.FIXED
101 }
102
103 /** @inheritDoc */
104 protected get full (): boolean {
105 return this.workerNodes.length === this.numberOfWorkers
106 }
107
108 /** @inheritDoc */
109 protected get busy (): boolean {
110 return this.internalBusy()
111 }
112 }