Commit | Line | Data |
---|---|---|
325f50bc S |
1 | import type { SendHandle } from 'child_process' |
2 | import { fork, isMaster, setupMaster, Worker } from 'cluster' | |
3 | import type { MessageValue } from '../../utility-types' | |
4ade5f1f | 4 | |
325f50bc | 5 | export type WorkerWithMessageChannel = Worker // & Draft<MessageChannel> |
4ade5f1f | 6 | |
325f50bc | 7 | export interface FixedClusterPoolOptions { |
4ade5f1f | 8 | /** |
325f50bc | 9 | * A function that will listen for error event on each worker. |
4ade5f1f S |
10 | */ |
11 | errorHandler?: (this: Worker, e: Error) => void | |
12 | /** | |
325f50bc | 13 | * A function that will listen for online event on each worker. |
4ade5f1f S |
14 | */ |
15 | onlineHandler?: (this: Worker) => void | |
16 | /** | |
325f50bc | 17 | * A function that will listen for exit event on each worker. |
4ade5f1f S |
18 | */ |
19 | exitHandler?: (this: Worker, code: number) => void | |
20 | /** | |
21 | * This is just to avoid not useful warnings message, is used to set `maxListeners` on event emitters (workers are event emitters). | |
22 | * | |
23 | * @default 1000 | |
24 | */ | |
25 | maxTasks?: number | |
325f50bc S |
26 | /** |
27 | * Key/value pairs to add to worker process environment. | |
28 | * | |
29 | * @see https://nodejs.org/api/cluster.html#cluster_cluster_fork_env | |
30 | */ | |
31 | // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
32 | env?: any | |
4ade5f1f S |
33 | } |
34 | ||
35 | /** | |
325f50bc | 36 | * A cluster pool with a static number of workers, is possible to execute tasks in sync or async mode as you prefer. |
4ade5f1f | 37 | * |
325f50bc | 38 | * This pool will select the worker in a round robin fashion. |
4ade5f1f | 39 | * |
325f50bc S |
40 | * @author [Christopher Quadflieg](https://github.com/Shinigami92) |
41 | * @since 2.0.0 | |
4ade5f1f | 42 | */ |
777b7824 | 43 | // eslint-disable-next-line @typescript-eslint/no-explicit-any |
325f50bc | 44 | export class FixedClusterPool<Data = any, Response = any> { |
4ade5f1f S |
45 | public readonly workers: WorkerWithMessageChannel[] = [] |
46 | public nextWorker: number = 0 | |
47 | ||
325f50bc | 48 | // workerId as key and an integer value |
4ade5f1f S |
49 | public readonly tasks: Map<WorkerWithMessageChannel, number> = new Map< |
50 | WorkerWithMessageChannel, | |
51 | number | |
52 | >() | |
53 | ||
fa0f5b28 | 54 | protected id: number = 0 |
4ade5f1f S |
55 | |
56 | /** | |
325f50bc S |
57 | * @param numWorkers Number of workers for this pool. |
58 | * @param filePath A file path with implementation of `ClusterWorker` class, relative path is fine. | |
4ade5f1f S |
59 | * @param opts An object with possible options for example `errorHandler`, `onlineHandler`. Default: `{ maxTasks: 1000 }` |
60 | */ | |
61 | public constructor ( | |
325f50bc | 62 | public readonly numWorkers: number, |
4ade5f1f | 63 | public readonly filePath: string, |
325f50bc | 64 | public readonly opts: FixedClusterPoolOptions = { maxTasks: 1000 } |
4ade5f1f | 65 | ) { |
325f50bc S |
66 | if (!isMaster) { |
67 | throw new Error('Cannot start a cluster pool from a worker!') | |
f045358d | 68 | } |
325f50bc | 69 | // TODO christopher 2021-02-09: Improve this check e.g. with a pattern or blank check |
f045358d | 70 | if (!this.filePath) { |
4ade5f1f | 71 | throw new Error('Please specify a file with a worker implementation') |
f045358d | 72 | } |
4ade5f1f | 73 | |
325f50bc S |
74 | setupMaster({ |
75 | exec: this.filePath | |
76 | }) | |
77 | ||
78 | for (let i = 1; i <= this.numWorkers; i++) { | |
fa0f5b28 | 79 | this.newWorker() |
4ade5f1f S |
80 | } |
81 | } | |
82 | ||
325f50bc | 83 | public destroy (): void { |
4ade5f1f | 84 | for (const worker of this.workers) { |
325f50bc | 85 | worker.kill() |
4ade5f1f S |
86 | } |
87 | } | |
88 | ||
89 | /** | |
90 | * Execute the task specified into the constructor with the data parameter. | |
91 | * | |
92 | * @param data The input for the task specified. | |
93 | * @returns Promise that is resolved when the task is done. | |
94 | */ | |
ee99693b | 95 | public execute (data: Data): Promise<Response> { |
4ade5f1f | 96 | // configure worker to handle message with the specified task |
325f50bc S |
97 | const worker: WorkerWithMessageChannel = this.chooseWorker() |
98 | // console.log('FixedClusterPool#execute choosen worker:', worker) | |
d62d9c97 S |
99 | const previousWorkerIndex = this.tasks.get(worker) |
100 | if (previousWorkerIndex !== undefined) { | |
101 | this.tasks.set(worker, previousWorkerIndex + 1) | |
102 | } else { | |
103 | throw Error('Worker could not be found in tasks map') | |
104 | } | |
325f50bc S |
105 | const id: number = ++this.id |
106 | const res: Promise<Response> = this.internalExecute(worker, id) | |
107 | // console.log('FixedClusterPool#execute send data to worker:', worker) | |
108 | worker.send({ data: data || {}, id: id }) | |
4ade5f1f S |
109 | return res |
110 | } | |
111 | ||
fa0f5b28 | 112 | protected internalExecute ( |
4ade5f1f S |
113 | worker: WorkerWithMessageChannel, |
114 | id: number | |
115 | ): Promise<Response> { | |
116 | return new Promise((resolve, reject) => { | |
325f50bc S |
117 | const listener: ( |
118 | message: MessageValue<Response>, | |
119 | handle: SendHandle | |
120 | ) => void = message => { | |
121 | // console.log('FixedClusterPool#internalExecute listener:', message) | |
fa0f5b28 | 122 | if (message.id === id) { |
325f50bc | 123 | worker.removeListener('message', listener) |
d62d9c97 S |
124 | const previousWorkerIndex = this.tasks.get(worker) |
125 | if (previousWorkerIndex !== undefined) { | |
126 | this.tasks.set(worker, previousWorkerIndex + 1) | |
127 | } else { | |
128 | throw Error('Worker could not be found in tasks map') | |
129 | } | |
4ade5f1f | 130 | if (message.error) reject(message.error) |
325f50bc | 131 | else resolve(message.data as Response) |
4ade5f1f S |
132 | } |
133 | } | |
325f50bc | 134 | worker.on('message', listener) |
4ade5f1f S |
135 | }) |
136 | } | |
137 | ||
fa0f5b28 | 138 | protected chooseWorker (): WorkerWithMessageChannel { |
4ade5f1f S |
139 | if (this.workers.length - 1 === this.nextWorker) { |
140 | this.nextWorker = 0 | |
141 | return this.workers[this.nextWorker] | |
142 | } else { | |
143 | this.nextWorker++ | |
144 | return this.workers[this.nextWorker] | |
145 | } | |
146 | } | |
147 | ||
fa0f5b28 | 148 | protected newWorker (): WorkerWithMessageChannel { |
325f50bc | 149 | const worker: WorkerWithMessageChannel = fork(this.opts.env) |
fa0f5b28 S |
150 | worker.on('error', this.opts.errorHandler ?? (() => {})) |
151 | worker.on('online', this.opts.onlineHandler ?? (() => {})) | |
325f50bc | 152 | // TODO handle properly when a worker exit |
fa0f5b28 | 153 | worker.on('exit', this.opts.exitHandler ?? (() => {})) |
4ade5f1f | 154 | this.workers.push(worker) |
4ade5f1f S |
155 | // we will attach a listener for every task, |
156 | // when task is completed the listener will be removed but to avoid warnings we are increasing the max listeners size | |
325f50bc | 157 | worker.setMaxListeners(this.opts.maxTasks ?? 1000) |
4ade5f1f S |
158 | // init tasks map |
159 | this.tasks.set(worker, 0) | |
160 | return worker | |
161 | } | |
162 | } |