65f6ce54c738e1b857a7c1d49f68811ca57f5697
1 import { fork
, isMaster
, setupMaster
, Worker
} from
'cluster'
2 import type { JSONValue
, MessageValue
} from
'../../utility-types'
3 import type { PoolOptions
} from
'../abstract-pool'
4 import { AbstractPool
} from
'../abstract-pool'
6 export interface ClusterPoolOptions
extends PoolOptions
<Worker
> {
8 * Key/value pairs to add to worker process environment.
10 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
12 // eslint-disable-next-line @typescript-eslint/no-explicit-any
17 * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
19 * This pool will select the worker in a round robin fashion.
21 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
24 export class FixedClusterPool
<
25 Data
extends JSONValue
= JSONValue
,
26 Response
extends JSONValue
= JSONValue
27 > extends AbstractPool
<Worker
, Data
, Response
> {
29 * @param numWorkers Number of workers for this pool.
30 * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
31 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
36 public readonly opts
: ClusterPoolOptions
= { maxTasks
: 1000 }
38 super(numWorkers
, filePath
, opts
)
41 protected setupHook (): void {
47 protected isMain (): boolean {
51 protected destroyWorker (worker
: Worker
): void {
53 // FIXME: The tests are currently failing, so these must be changed first
56 protected sendToWorker (worker
: Worker
, message
: MessageValue
<Data
>): void {
60 protected registerWorkerMessageListener (
62 listener
: (message
: MessageValue
<Response
>) => void
64 port
.on('message', listener
)
67 protected unregisterWorkerMessageListener (
69 listener
: (message
: MessageValue
<Response
>) => void
71 port
.removeListener('message', listener
)
74 protected newWorker (): Worker
{
75 return fork(this.opts
.env
)
78 protected afterNewWorkerPushed (worker
: Worker
): void {
79 // we will attach a listener for every task,
80 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
81 worker
.setMaxListeners(this.opts
.maxTasks
?? 1000)