1 import { fork
, isMaster
, setupMaster
, Worker
} from
'cluster'
2 import type { JSONValue
, MessageValue
} from
'../../utility-types'
3 import type { PoolOptions
} from
'../abstract-pool'
4 import { AbstractPool
} from
'../abstract-pool'
7 * Options for a poolifier cluster pool.
9 export interface ClusterPoolOptions
extends PoolOptions
<Worker
> {
11 * Key/value pairs to add to worker process environment.
13 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
15 // eslint-disable-next-line @typescript-eslint/no-explicit-any
20 * A cluster pool with a fixed number of workers.
22 * It is possible to perform tasks in sync or asynchronous mode as you prefer.
24 * This pool selects the workers in a round robin fashion.
26 * @template Data Type of data sent to the worker.
27 * @template Response Type of response of execution.
29 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
32 export class FixedClusterPool
<
33 Data
extends JSONValue
= JSONValue
,
34 Response
extends JSONValue
= JSONValue
35 > extends AbstractPool
<Worker
, Data
, Response
> {
37 * Constructs a new poolifier fixed cluster pool.
39 * @param numberOfWorkers Number of workers for this pool.
40 * @param filePath Path to an implementation of a `ClusterWorker` file, which can be relative or absolute.
41 * @param opts Options for this fixed cluster pool. Default: `{ maxTasks: 1000 }`
44 numberOfWorkers
: number,
46 public readonly opts
: ClusterPoolOptions
= { maxTasks
: 1000 }
48 super(numberOfWorkers
, filePath
, opts
)
51 protected setupHook (): void {
57 protected isMain (): boolean {
61 protected destroyWorker (worker
: Worker
): void {
65 protected sendToWorker (worker
: Worker
, message
: MessageValue
<Data
>): void {
69 protected registerWorkerMessageListener
<Message
extends Data
| Response
> (
71 listener
: (message
: MessageValue
<Message
>) => void
73 worker
.on('message', listener
)
76 protected unregisterWorkerMessageListener
<Message
extends Data
| Response
> (
78 listener
: (message
: MessageValue
<Message
>) => void
80 worker
.removeListener('message', listener
)
83 protected createWorker (): Worker
{
84 return fork(this.opts
.env
)
87 protected afterWorkerSetup (worker
: Worker
): void {
88 // we will attach a listener for every task,
89 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
90 worker
.setMaxListeners(this.opts
.maxTasks
?? 1000)