332daa4d2b4cfa33225f3ff10a557d5f1a2fbde7
1 import type { SendHandle
} from
'child_process'
2 import { fork
, isMaster
, setupMaster
, Worker
} from
'cluster'
3 import type { MessageValue
} from
'../../utility-types'
5 export type WorkerWithMessageChannel
= Worker
// & Draft<MessageChannel>
7 export interface FixedClusterPoolOptions
{
9 * A function that will listen for error event on each worker.
11 errorHandler
?: (this: Worker
, e
: Error) => void
13 * A function that will listen for online event on each worker.
15 onlineHandler
?: (this: Worker
) => void
17 * A function that will listen for exit event on each worker.
19 exitHandler
?: (this: Worker
, code
: number) => void
21 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
27 * Key/value pairs to add to worker process environment.
29 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
31 // eslint-disable-next-line @typescript-eslint/no-explicit-any
36 * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
38 * This pool will select the worker in a round robin fashion.
40 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
43 // eslint-disable-next-line @typescript-eslint/no-explicit-any
44 export class FixedClusterPool
<Data
= any, Response
= any> {
45 public readonly workers
: WorkerWithMessageChannel
[] = []
46 public nextWorker
: number = 0
48 // workerId as key and an integer value
49 public readonly tasks
: Map
<WorkerWithMessageChannel
, number> = new Map
<
50 WorkerWithMessageChannel
,
54 protected id
: number = 0
57 * @param numWorkers Number of workers for this pool.
58 * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
59 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
62 public readonly numWorkers
: number,
63 public readonly filePath
: string,
64 public readonly opts
: FixedClusterPoolOptions
= { maxTasks
: 1000 }
67 throw new Error('Cannot start a cluster pool from a worker!')
69 // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check
71 throw new Error('Please specify a file with a worker implementation')
78 for (let i
= 1; i
<= this.numWorkers
; i
++) {
83 public destroy (): void {
84 for (const worker
of this.workers
) {
90 * Execute the task specified into the constructor with the data parameter.
92 * @param data The input for the task specified.
93 * @returns Promise that is resolved when the task is done.
95 public execute (data
: Data
): Promise
<Response
> {
96 // configure worker to handle message with the specified task
97 const worker
: WorkerWithMessageChannel
= this.chooseWorker()
98 // console.log('FixedClusterPool#execute choosen worker:', worker)
99 const previousWorkerIndex
= this.tasks
.get(worker
)
100 if (previousWorkerIndex
!== undefined) {
101 this.tasks
.set(worker
, previousWorkerIndex
+ 1)
103 throw Error('Worker could not be found in tasks map')
105 const id
: number = ++this.id
106 const res
: Promise
<Response
> = this.internalExecute(worker
, id
)
107 // console.log('FixedClusterPool#execute send data to worker:', worker)
108 worker
.send({ data
: data
|| {}, id
: id
})
112 protected internalExecute (
113 worker
: WorkerWithMessageChannel
,
115 ): Promise
<Response
> {
116 return new Promise((resolve
, reject
) => {
118 message
: MessageValue
<Response
>,
120 ) => void = message
=> {
121 // console.log('FixedClusterPool#internalExecute listener:', message)
122 if (message
.id
=== id
) {
123 worker
.removeListener('message', listener
)
124 const previousWorkerIndex
= this.tasks
.get(worker
)
125 if (previousWorkerIndex
!== undefined) {
126 this.tasks
.set(worker
, previousWorkerIndex
+ 1)
128 throw Error('Worker could not be found in tasks map')
130 if (message
.error
) reject(message
.error
)
131 else resolve(message
.data
as Response
)
134 worker
.on('message', listener
)
138 protected chooseWorker (): WorkerWithMessageChannel
{
139 if (this.workers
.length
- 1 === this.nextWorker
) {
141 return this.workers
[this.nextWorker
]
144 return this.workers
[this.nextWorker
]
148 protected newWorker (): WorkerWithMessageChannel
{
149 const worker
: WorkerWithMessageChannel
= fork(this.opts
.env
)
150 worker
.on('error', this.opts
.errorHandler
?? (() => {}))
151 worker
.on('online', this.opts
.onlineHandler
?? (() => {}))
152 // TODO handle properly when a worker exit
153 worker
.on('exit', this.opts
.exitHandler
?? (() => {}))
154 this.workers
.push(worker
)
155 // we will attach a listener for every task,
156 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
157 worker
.setMaxListeners(this.opts
.maxTasks
?? 1000)
159 this.tasks
.set(worker
, 0)