1 import { fork
, isMaster
, setupMaster
, Worker
} from
'cluster'
2 import type { MessageValue
} from
'../../utility-types'
3 import type { PoolOptions
} from
'../abstract-pool'
4 import { AbstractPool
} from
'../abstract-pool'
6 export interface ClusterPoolOptions
extends PoolOptions
<Worker
> {
8 * Key/value pairs to add to worker process environment.
10 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
12 // eslint-disable-next-line @typescript-eslint/no-explicit-any
17 * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
19 * This pool will select the worker in a round robin fashion.
21 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
24 // eslint-disable-next-line @typescript-eslint/no-explicit-any
25 export class FixedClusterPool
<Data
= any, Response
= any> extends AbstractPool
<
31 * @param numWorkers Number of workers for this pool.
32 * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
33 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
38 public readonly opts
: ClusterPoolOptions
= { maxTasks
: 1000 }
40 super(numWorkers
, filePath
, opts
)
43 protected setupHook (): void {
49 protected isMain (): boolean {
53 protected destroyWorker (worker
: Worker
): void {
57 protected sendToWorker (worker
: Worker
, message
: MessageValue
<Data
>): void {
61 protected registerWorkerMessageListener (
63 listener
: (message
: MessageValue
<Response
>) => void
65 port
.on('message', listener
)
68 protected unregisterWorkerMessageListener (
70 listener
: (message
: MessageValue
<Response
>) => void
72 port
.removeListener('message', listener
)
75 protected newWorker (): Worker
{
76 return fork(this.opts
.env
)
79 protected afterNewWorkerPushed (worker
: Worker
): void {
80 // we will attach a listener for every task,
81 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
82 worker
.setMaxListeners(this.opts
.maxTasks
?? 1000)