1 import cluster
, { type Worker
} from
'node:cluster'
2 import type { MessageValue
} from
'../../utility-types'
3 import { AbstractPool
} from
'../abstract-pool'
4 import { type PoolOptions
, type PoolType
, PoolTypes
} from
'../pool'
5 import { type WorkerType
, WorkerTypes
} from
'../worker'
8 * A cluster pool with a fixed number of workers.
10 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
11 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
12 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
15 export class FixedClusterPool
<
18 > extends AbstractPool
<Worker
, Data
, Response
> {
20 * Constructs a new poolifier fixed cluster pool.
22 * @param numberOfWorkers - Number of workers for this pool.
23 * @param filePath - Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
24 * @param opts - Options for this fixed cluster pool.
27 numberOfWorkers
: number,
29 protected readonly opts
: PoolOptions
<Worker
> = {}
31 super(numberOfWorkers
, filePath
, opts
)
35 protected setupHook (): void {
36 cluster
.setupPrimary({ ...this.opts
.settings
, exec
: this.filePath
})
40 protected isMain (): boolean {
41 return cluster
.isPrimary
45 protected async destroyWorkerNode (workerNodeKey
: number): Promise
<void> {
46 this.flagWorkerNodeAsNotReady(workerNodeKey
)
47 this.flushTasksQueue(workerNodeKey
)
48 // FIXME: wait for tasks to be finished
49 const workerNode
= this.workerNodes
[workerNodeKey
]
50 const waitWorkerExit
= new Promise
<void>(resolve
=> {
51 workerNode
.registerOnceWorkerEventHandler('exit', () => {
55 workerNode
.registerOnceWorkerEventHandler('disconnect', () => {
56 workerNode
.worker
.kill()
58 await this.sendKillMessageToWorker(workerNodeKey
)
59 workerNode
.removeAllListeners()
60 workerNode
.worker
.disconnect()
65 protected sendToWorker (
66 workerNodeKey
: number,
67 message
: MessageValue
<Data
>
69 this.workerNodes
[workerNodeKey
].worker
.send({
71 workerId
: this.getWorkerInfo(workerNodeKey
).id
as number
76 protected sendStartupMessageToWorker (workerNodeKey
: number): void {
77 this.sendToWorker(workerNodeKey
, {
83 protected registerWorkerMessageListener
<Message
extends Data
| Response
>(
84 workerNodeKey
: number,
85 listener
: (message
: MessageValue
<Message
>) => void
87 this.workerNodes
[workerNodeKey
].worker
.on('message', listener
)
91 protected registerOnceWorkerMessageListener
<Message
extends Data
| Response
>(
92 workerNodeKey
: number,
93 listener
: (message
: MessageValue
<Message
>) => void
95 this.workerNodes
[workerNodeKey
].worker
.once('message', listener
)
99 protected deregisterWorkerMessageListener
<Message
extends Data
| Response
>(
100 workerNodeKey
: number,
101 listener
: (message
: MessageValue
<Message
>) => void
103 this.workerNodes
[workerNodeKey
].worker
.off('message', listener
)
107 protected get
type (): PoolType
{
108 return PoolTypes
.fixed
112 protected get
worker (): WorkerType
{
113 return WorkerTypes
.cluster
117 protected get
busy (): boolean {
118 return this.internalBusy()