4a8cb4f1eeeafeb59adf1e30c5d678328cb19b70
[poolifier.git] / src / pools / cluster / fixed.ts
1 import cluster, { type Worker } from 'node:cluster'
2
3 import type { MessageValue } from '../../utility-types.js'
4 import { AbstractPool } from '../abstract-pool.js'
5 import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
6 import { type WorkerType, WorkerTypes } from '../worker.js'
7
8 /**
9 * Options for a poolifier cluster pool.
10 */
11 export type ClusterPoolOptions = PoolOptions<Worker>
12
13 /**
14 * A cluster pool with a fixed number of workers.
15 *
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
19 * @since 2.0.0
20 */
21 export class FixedClusterPool<
22 Data = unknown,
23 Response = unknown
24 > extends AbstractPool<Worker, Data, Response> {
25 /**
26 * Constructs a new poolifier fixed cluster pool.
27 *
28 * @param numberOfWorkers - Number of workers for this pool.
29 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
30 * @param opts - Options for this fixed cluster pool.
31 */
32 public constructor (
33 numberOfWorkers: number,
34 filePath: string,
35 opts: ClusterPoolOptions = {},
36 maximumNumberOfWorkers?: number
37 ) {
38 super(numberOfWorkers, filePath, opts, maximumNumberOfWorkers)
39 }
40
41 /** @inheritDoc */
42 protected setupHook (): void {
43 cluster.setupPrimary({ ...this.opts.settings, exec: this.filePath })
44 }
45
46 /** @inheritDoc */
47 protected isMain (): boolean {
48 return cluster.isPrimary
49 }
50
51 /** @inheritDoc */
52 protected sendToWorker (
53 workerNodeKey: number,
54 message: MessageValue<Data>
55 ): void {
56 this.workerNodes[workerNodeKey]?.worker.send({
57 ...message,
58 workerId: this.getWorkerInfo(workerNodeKey)?.id
59 } satisfies MessageValue<Data>)
60 }
61
62 /** @inheritDoc */
63 protected sendStartupMessageToWorker (workerNodeKey: number): void {
64 this.sendToWorker(workerNodeKey, {
65 ready: false
66 })
67 }
68
69 /** @inheritDoc */
70 protected registerWorkerMessageListener<Message extends Data | Response>(
71 workerNodeKey: number,
72 listener: (message: MessageValue<Message>) => void
73 ): void {
74 this.workerNodes[workerNodeKey].worker.on('message', listener)
75 }
76
77 /** @inheritDoc */
78 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
79 workerNodeKey: number,
80 listener: (message: MessageValue<Message>) => void
81 ): void {
82 this.workerNodes[workerNodeKey].worker.once('message', listener)
83 }
84
85 /** @inheritDoc */
86 protected deregisterWorkerMessageListener<Message extends Data | Response>(
87 workerNodeKey: number,
88 listener: (message: MessageValue<Message>) => void
89 ): void {
90 this.workerNodes[workerNodeKey].worker.off('message', listener)
91 }
92
93 /** @inheritDoc */
94 protected shallCreateDynamicWorker (): boolean {
95 return false
96 }
97
98 /** @inheritDoc */
99 protected checkAndEmitDynamicWorkerCreationEvents (): void {
100 /* noop */
101 }
102
103 /** @inheritDoc */
104 protected get type (): PoolType {
105 return PoolTypes.fixed
106 }
107
108 /** @inheritDoc */
109 protected get worker (): WorkerType {
110 return WorkerTypes.cluster
111 }
112
113 /** @inheritDoc */
114 protected get busy (): boolean {
115 return this.internalBusy()
116 }
117 }