Merge branch 'master' into fix-worker-readiness
[poolifier.git] / src / pools / cluster / fixed.ts
CommitLineData
c3719753 1import cluster, { type Worker } from 'node:cluster'
deb85c12 2import type { MessageValue } from '../../utility-types'
c97c7edb 3import { AbstractPool } from '../abstract-pool'
4b628b48
JB
4import { type PoolOptions, type PoolType, PoolTypes } from '../pool'
5import { type WorkerType, WorkerTypes } from '../worker'
4ade5f1f 6
4ade5f1f 7/**
729c563d
S
8 * A cluster pool with a fixed number of workers.
9 *
e102732c
JB
10 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
11 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
325f50bc
S
12 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
13 * @since 2.0.0
4ade5f1f 14 */
d3c8a1a8 15export class FixedClusterPool<
deb85c12
JB
16 Data = unknown,
17 Response = unknown
d3c8a1a8 18> extends AbstractPool<Worker, Data, Response> {
4ade5f1f 19 /**
729c563d
S
20 * Constructs a new poolifier fixed cluster pool.
21 *
38e795c1
JB
22 * @param numberOfWorkers - Number of workers for this pool.
23 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
24 * @param opts - Options for this fixed cluster pool.
4ade5f1f
S
25 */
26 public constructor (
5c5a1fb7 27 numberOfWorkers: number,
c97c7edb 28 filePath: string,
26ce26ca
JB
29 opts: PoolOptions<Worker> = {},
30 maximumNumberOfWorkers?: number
4ade5f1f 31 ) {
26ce26ca 32 super(numberOfWorkers, filePath, opts, maximumNumberOfWorkers)
c97c7edb 33 }
4ade5f1f 34
afc003b2 35 /** @inheritDoc */
c97c7edb 36 protected setupHook (): void {
1a76932b 37 cluster.setupPrimary({ ...this.opts.settings, exec: this.filePath })
c97c7edb 38 }
325f50bc 39
afc003b2 40 /** @inheritDoc */
c97c7edb 41 protected isMain (): boolean {
7e0d447f 42 return cluster.isPrimary
4ade5f1f
S
43 }
44
afc003b2 45 /** @inheritDoc */
aa9eede8
JB
46 protected sendToWorker (
47 workerNodeKey: number,
48 message: MessageValue<Data>
49 ): void {
72ae84a2
JB
50 this.workerNodes[workerNodeKey].worker.send({
51 ...message,
dbfa7948 52 workerId: this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 53 })
4ade5f1f
S
54 }
55
85aeb3f3 56 /** @inheritDoc */
aa9eede8
JB
57 protected sendStartupMessageToWorker (workerNodeKey: number): void {
58 this.sendToWorker(workerNodeKey, {
e9dd5b66 59 ready: false
85aeb3f3
JB
60 })
61 }
62
63 /** @inheritDoc */
64 protected registerWorkerMessageListener<Message extends Data | Response>(
aa9eede8 65 workerNodeKey: number,
85aeb3f3
JB
66 listener: (message: MessageValue<Message>) => void
67 ): void {
aa9eede8 68 this.workerNodes[workerNodeKey].worker.on('message', listener)
85aeb3f3
JB
69 }
70
ae036c3e
JB
71 /** @inheritDoc */
72 protected registerOnceWorkerMessageListener<Message extends Data | Response>(
73 workerNodeKey: number,
74 listener: (message: MessageValue<Message>) => void
75 ): void {
76 this.workerNodes[workerNodeKey].worker.once('message', listener)
77 }
78
79 /** @inheritDoc */
80 protected deregisterWorkerMessageListener<Message extends Data | Response>(
81 workerNodeKey: number,
82 listener: (message: MessageValue<Message>) => void
83 ): void {
84 this.workerNodes[workerNodeKey].worker.off('message', listener)
85 }
86
afc003b2 87 /** @inheritDoc */
8881ae32 88 protected get type (): PoolType {
6b27d407 89 return PoolTypes.fixed
7c0ba920
JB
90 }
91
184855e6
JB
92 /** @inheritDoc */
93 protected get worker (): WorkerType {
94 return WorkerTypes.cluster
95 }
96
afc003b2 97 /** @inheritDoc */
c319c66b 98 protected get busy (): boolean {
c2ade475 99 return this.internalBusy()
7c0ba920 100 }
4ade5f1f 101}