chore(deps-dev): bump tatami-ng to 0.6.0
[poolifier.git] / src / pools / cluster / fixed.ts
1 import cluster, { type Worker } from 'node:cluster'
2
3 import type { MessageValue } from '../../utility-types.js'
4
5 import { AbstractPool } from '../abstract-pool.js'
6 import { type PoolOptions, type PoolType, PoolTypes } from '../pool.js'
7 import { type WorkerType, WorkerTypes } from '../worker.js'
8
9 /**
10 * Options for a poolifier cluster pool.
11 */
12 export type ClusterPoolOptions = PoolOptions<Worker>
13
14 /**
15 * A cluster pool with a fixed number of workers.
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
19 * @since 2.0.0
20 */
21 export class FixedClusterPool<
22 Data = unknown,
23 Response = unknown
24 > extends AbstractPool<Worker, Data, Response> {
25 /**
26 * Constructs a new poolifier fixed cluster pool.
27 * @param numberOfWorkers - Number of workers for this pool.
28 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
29 * @param opts - Options for this fixed cluster pool.
30 * @param maximumNumberOfWorkers - The maximum number of workers for this pool.
31 */
32 public constructor (
33 numberOfWorkers: number,
34 filePath: string,
35 opts: ClusterPoolOptions = {},
36 maximumNumberOfWorkers?: number
37 ) {
38 super(numberOfWorkers, filePath, opts, maximumNumberOfWorkers)
39 }
40
41 /** @inheritDoc */
42 protected checkAndEmitDynamicWorkerCreationEvents (): void {
43 /* noop */
44 }
45
46 /** @inheritDoc */
47 protected checkAndEmitDynamicWorkerDestructionEvents (): void {
48 /* noop */
49 }
50
51 /** @inheritDoc */
52 protected deregisterWorkerMessageListener<Message extends Data | Response>(
53 workerNodeKey: number,
54 listener: (message: MessageValue<Message>) => void
55 ): void {
56 this.workerNodes[workerNodeKey].worker.off('message', listener)
57 }
58
59 /** @inheritDoc */
60 protected isMain (): boolean {
61 return cluster.isPrimary
62 }
63
64 /** @inheritDoc */
65 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
66 workerNodeKey: number,
67 listener: (message: MessageValue<Message>) => void
68 ): void {
69 this.workerNodes[workerNodeKey].worker.once('message', listener)
70 }
71
72 /** @inheritDoc */
73 protected registerWorkerMessageListener<Message extends Data | Response>(
74 workerNodeKey: number,
75 listener: (message: MessageValue<Message>) => void
76 ): void {
77 this.workerNodes[workerNodeKey].worker.on('message', listener)
78 }
79
80 /** @inheritDoc */
81 protected sendStartupMessageToWorker (workerNodeKey: number): void {
82 this.sendToWorker(workerNodeKey, {
83 ready: false,
84 })
85 }
86
87 /** @inheritDoc */
88 protected sendToWorker (
89 workerNodeKey: number,
90 message: MessageValue<Data>
91 ): void {
92 this.workerNodes[workerNodeKey]?.worker.send({
93 ...message,
94 workerId: this.getWorkerInfo(workerNodeKey)?.id,
95 } satisfies MessageValue<Data>)
96 }
97
98 /** @inheritDoc */
99 protected override setupHook (): void {
100 cluster.setupPrimary({ ...this.opts.settings, exec: this.filePath })
101 }
102
103 /** @inheritDoc */
104 protected shallCreateDynamicWorker (): boolean {
105 return false
106 }
107
108 /** @inheritDoc */
109 protected get backPressure (): boolean {
110 return this.internalBackPressure()
111 }
112
113 /** @inheritDoc */
114 protected get busy (): boolean {
115 return this.internalBusy()
116 }
117
118 /** @inheritDoc */
119 protected get type (): PoolType {
120 return PoolTypes.fixed
121 }
122
123 /** @inheritDoc */
124 protected get worker (): WorkerType {
125 return WorkerTypes.cluster
126 }
127 }