332daa4d2b4cfa33225f3ff10a557d5f1a2fbde7
[poolifier.git] / src / pools / cluster / fixed.ts
1 import type { SendHandle } from 'child_process'
2 import { fork, isMaster, setupMaster, Worker } from 'cluster'
3 import type { MessageValue } from '../../utility-types'
4
5 export type WorkerWithMessageChannel = Worker // & Draft<MessageChannel>
6
7 export interface FixedClusterPoolOptions {
8 /**
9 * A function that will listen for error event on each worker.
10 */
11 errorHandler?: (this: Worker, e: Error) => void
12 /**
13 * A function that will listen for online event on each worker.
14 */
15 onlineHandler?: (this: Worker) => void
16 /**
17 * A function that will listen for exit event on each worker.
18 */
19 exitHandler?: (this: Worker, code: number) => void
20 /**
21 * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters).
22 *
23 * @default 1000
24 */
25 maxTasks?: number
26 /**
27 * Key/value pairs to add to worker process environment.
28 *
29 * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env
30 */
31 // eslint-disable-next-line @typescript-eslint/no-explicit-any
32 env?: any
33 }
34
35 /**
36 * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer.
37 *
38 * This pool will select the worker in a round robin fashion.
39 *
40 * @author [Christopher Quadflieg](https://github.com/Shinigami92)
41 * @since 2.0.0
42 */
43 // eslint-disable-next-line @typescript-eslint/no-explicit-any
44 export class FixedClusterPool<Data = any, Response = any> {
45 public readonly workers: WorkerWithMessageChannel[] = []
46 public nextWorker: number = 0
47
48 // workerId as key and an integer value
49 public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map<
50 WorkerWithMessageChannel,
51 number
52 >()
53
54 protected id: number = 0
55
56 /**
57 * @param numWorkers Number of workers for this pool.
58 * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine.
59 * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }`
60 */
61 public constructor (
62 public readonly numWorkers: number,
63 public readonly filePath: string,
64 public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 }
65 ) {
66 if (!isMaster) {
67 throw new Error('Cannot start a cluster pool from a worker!')
68 }
69 // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check
70 if (!this.filePath) {
71 throw new Error('Please specify a file with a worker implementation')
72 }
73
74 setupMaster({
75 exec: this.filePath
76 })
77
78 for (let i = 1; i <= this.numWorkers; i++) {
79 this.newWorker()
80 }
81 }
82
83 public destroy (): void {
84 for (const worker of this.workers) {
85 worker.kill()
86 }
87 }
88
89 /**
90 * Execute the task specified into the constructor with the data parameter.
91 *
92 * @param data The input for the task specified.
93 * @returns Promise that is resolved when the task is done.
94 */
95 public execute (data: Data): Promise<Response> {
96 // configure worker to handle message with the specified task
97 const worker: WorkerWithMessageChannel = this.chooseWorker()
98 // console.log('FixedClusterPool#execute choosen worker:', worker)
99 const previousWorkerIndex = this.tasks.get(worker)
100 if (previousWorkerIndex !== undefined) {
101 this.tasks.set(worker, previousWorkerIndex + 1)
102 } else {
103 throw Error('Worker could not be found in tasks map')
104 }
105 const id: number = ++this.id
106 const res: Promise<Response> = this.internalExecute(worker, id)
107 // console.log('FixedClusterPool#execute send data to worker:', worker)
108 worker.send({ data: data || {}, id: id })
109 return res
110 }
111
112 protected internalExecute (
113 worker: WorkerWithMessageChannel,
114 id: number
115 ): Promise<Response> {
116 return new Promise((resolve, reject) => {
117 const listener: (
118 message: MessageValue<Response>,
119 handle: SendHandle
120 ) => void = message => {
121 // console.log('FixedClusterPool#internalExecute listener:', message)
122 if (message.id === id) {
123 worker.removeListener('message', listener)
124 const previousWorkerIndex = this.tasks.get(worker)
125 if (previousWorkerIndex !== undefined) {
126 this.tasks.set(worker, previousWorkerIndex + 1)
127 } else {
128 throw Error('Worker could not be found in tasks map')
129 }
130 if (message.error) reject(message.error)
131 else resolve(message.data as Response)
132 }
133 }
134 worker.on('message', listener)
135 })
136 }
137
138 protected chooseWorker (): WorkerWithMessageChannel {
139 if (this.workers.length - 1 === this.nextWorker) {
140 this.nextWorker = 0
141 return this.workers[this.nextWorker]
142 } else {
143 this.nextWorker++
144 return this.workers[this.nextWorker]
145 }
146 }
147
148 protected newWorker (): WorkerWithMessageChannel {
149 const worker: WorkerWithMessageChannel = fork(this.opts.env)
150 worker.on('error', this.opts.errorHandler ?? (() => {}))
151 worker.on('online', this.opts.onlineHandler ?? (() => {}))
152 // TODO handle properly when a worker exit
153 worker.on('exit', this.opts.exitHandler ?? (() => {}))
154 this.workers.push(worker)
155 // we will attach a listener for every task,
156 // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size
157 worker.setMaxListeners(this.opts.maxTasks ?? 1000)
158 // init tasks map
159 this.tasks.set(worker, 0)
160 return worker
161 }
162 }