1 import cluster
, { type Worker
} from
'node:cluster'
3 import type { MessageValue
} from
'../../utility-types.js'
5 import { AbstractPool
} from
'../abstract-pool.js'
6 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool.js'
7 import { type WorkerType
, WorkerTypes
} from
'../worker.js'
10 * Options for a poolifier cluster pool.
12 export type ClusterPoolOptions
= PoolOptions
<Worker
>
15 * A cluster pool with a fixed number of workers.
16 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
17 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
18 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
21 export class FixedClusterPool
<
24 > extends AbstractPool
<Worker
, Data
, Response
> {
26 * Constructs a new poolifier fixed cluster pool.
27 * @param numberOfWorkers - Number of workers for this pool.
28 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
29 * @param opts - Options for this fixed cluster pool.
30 * @param maximumNumberOfWorkers - The maximum number of workers for this pool.
33 numberOfWorkers
: number,
35 opts
: ClusterPoolOptions
= {},
36 maximumNumberOfWorkers
?: number
38 super(numberOfWorkers
, filePath
, opts
, maximumNumberOfWorkers
)
42 protected checkAndEmitDynamicWorkerCreationEvents (): void {
47 protected checkAndEmitDynamicWorkerDestructionEvents (): void {
52 protected deregisterWorkerMessageListener
<Message
extends Data
| Response
>(
53 workerNodeKey
: number,
54 listener
: (message
: MessageValue
<Message
>) => void
56 this.workerNodes
[workerNodeKey
].worker
.off('message', listener
)
60 protected isMain (): boolean {
61 return cluster
.isPrimary
65 protected registerOnceWorkerMessageListener
<Message
extends Data
| Response
>(
66 workerNodeKey
: number,
67 listener
: (message
: MessageValue
<Message
>) => void
69 this.workerNodes
[workerNodeKey
].worker
.once('message', listener
)
73 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
74 workerNodeKey
: number,
75 listener
: (message
: MessageValue
<Message
>) => void
77 this.workerNodes
[workerNodeKey
].worker
.on('message', listener
)
81 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
82 this.sendToWorker(workerNodeKey
, {
88 protected sendToWorker (
89 workerNodeKey
: number,
90 message
: MessageValue
<Data
>
92 this.workerNodes
[workerNodeKey
]?.worker
.send({
94 workerId
: this.getWorkerInfo(workerNodeKey
)?.id
,
95 } satisfies MessageValue
<Data
>)
99 protected override
setupHook (): void {
100 cluster
.setupPrimary({ ...this.opts
.settings
, exec
: this.filePath
})
104 protected shallCreateDynamicWorker (): boolean {
109 protected get
backPressure (): boolean {
110 return this.internalBackPressure()
114 protected get
busy (): boolean {
115 return this.internalBusy()
119 protected get
type (): PoolType
{
120 return PoolTypes
.fixed
124 protected get
worker (): WorkerType
{
125 return WorkerTypes
.cluster