Improve property namings (#145)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
c97c7edb
S
1import EventEmitter from 'events'
2import type { MessageValue } from '../utility-types'
3import type { IPool } from './pool'
4
729c563d
S
5/**
6 * Callback invoked if the worker raised an error.
7 */
c97c7edb 8export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
9
10/**
11 * Callback invoked when the worker has started successfully.
12 */
c97c7edb 13export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
14
15/**
16 * Callback invoked when the worker exits successfully.
17 */
c97c7edb
S
18export type ExitHandler<Worker> = (this: Worker, code: number) => void
19
729c563d
S
20/**
21 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
22 */
c97c7edb
S
23export interface IWorker {
24 on(event: 'error', handler: ErrorHandler<this>): void
25 on(event: 'online', handler: OnlineHandler<this>): void
26 on(event: 'exit', handler: ExitHandler<this>): void
27}
28
729c563d
S
29/**
30 * Options for a poolifier pool.
31 */
c97c7edb
S
32export interface PoolOptions<Worker> {
33 /**
34 * A function that will listen for error event on each worker.
35 */
36 errorHandler?: ErrorHandler<Worker>
37 /**
38 * A function that will listen for online event on each worker.
39 */
40 onlineHandler?: OnlineHandler<Worker>
41 /**
42 * A function that will listen for exit event on each worker.
43 */
44 exitHandler?: ExitHandler<Worker>
45 /**
729c563d
S
46 * This is just to avoid non-useful warning messages.
47 *
48 * Will be used to set `maxListeners` on event emitters (workers are event emitters).
c97c7edb
S
49 *
50 * @default 1000
729c563d 51 * @see [Node events emitter.setMaxListeners(n)](https://nodejs.org/api/events.html#events_emitter_setmaxlisteners_n)
c97c7edb
S
52 */
53 maxTasks?: number
54}
55
729c563d
S
56/**
57 * Internal poolifier pool emitter.
58 */
c97c7edb
S
59class PoolEmitter extends EventEmitter {}
60
729c563d
S
61/**
62 * Base class containing some shared logic for all poolifier pools.
63 *
64 * @template Worker Type of worker which manages this pool.
65 * @template Data Type of data sent to the worker.
66 * @template Response Type of response of execution.
67 */
c97c7edb
S
68export abstract class AbstractPool<
69 Worker extends IWorker,
d3c8a1a8
S
70 Data = unknown,
71 Response = unknown
c97c7edb 72> implements IPool<Data, Response> {
729c563d
S
73 /**
74 * List of currently available workers.
75 */
c97c7edb 76 public readonly workers: Worker[] = []
729c563d
S
77
78 /**
280c2a77 79 * Index for the next worker.
729c563d 80 */
280c2a77 81 public nextWorkerIndex: number = 0
c97c7edb
S
82
83 /**
729c563d
S
84 * - `key`: The `Worker`
85 * - `value`: Number of tasks that has been assigned to that worker since it started
c97c7edb
S
86 */
87 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
88
729c563d
S
89 /**
90 * Emitter on which events can be listened to.
91 *
92 * Events that can currently be listened to:
93 *
94 * - `'FullPool'`
95 */
c97c7edb
S
96 public readonly emitter: PoolEmitter
97
729c563d
S
98 /**
99 * ID of the next message.
100 */
280c2a77 101 protected nextMessageId: number = 0
c97c7edb 102
729c563d
S
103 /**
104 * Constructs a new poolifier pool.
105 *
5c5a1fb7 106 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d
S
107 * @param filePath Path to the worker-file.
108 * @param opts Options for the pool. Default: `{ maxTasks: 1000 }`
109 */
c97c7edb 110 public constructor (
5c5a1fb7 111 public readonly numberOfWorkers: number,
c97c7edb
S
112 public readonly filePath: string,
113 public readonly opts: PoolOptions<Worker> = { maxTasks: 1000 }
114 ) {
115 if (!this.isMain()) {
116 throw new Error('Cannot start a pool from a worker!')
117 }
118 // TODO christopher 2021-02-07: Improve this check e.g. with a pattern or blank check
119 if (!this.filePath) {
120 throw new Error('Please specify a file with a worker implementation')
121 }
122
123 this.setupHook()
124
5c5a1fb7 125 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 126 this.createAndSetupWorker()
c97c7edb
S
127 }
128
129 this.emitter = new PoolEmitter()
130 }
131
729c563d 132 /**
5c5a1fb7
S
133 * Number of workers that this pool should manage.
134 *
135 * @returns Number of workers that this pool manages.
136 * @deprecated Only here for backward compatibility.
137 */
138 // eslint-disable-next-line spellcheck/spell-checker
139 public get numWorkers (): number {
140 return this.numberOfWorkers
141 }
142
143 /**
280c2a77
S
144 * Index for the next worker.
145 *
146 * @returns Index for the next worker.
147 * @deprecated Only here for backward compatibility.
729c563d 148 */
280c2a77
S
149 public get nextWorker (): number {
150 return this.nextWorkerIndex
c97c7edb
S
151 }
152
280c2a77
S
153 public execute (data: Data): Promise<Response> {
154 // Configure worker to handle message with the specified task
155 const worker = this.chooseWorker()
156 this.increaseWorkersTask(worker)
157 const messageId = ++this.nextMessageId
158 const res = this.internalExecute(worker, messageId)
159 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
160 return res
161 }
c97c7edb
S
162
163 public async destroy (): Promise<void> {
164 for (const worker of this.workers) {
165 await this.destroyWorker(worker)
166 }
167 }
168
729c563d
S
169 /**
170 * Shut down given worker.
171 *
172 * @param worker A worker within `workers`.
173 */
c97c7edb
S
174 protected abstract destroyWorker (worker: Worker): void | Promise<void>
175
729c563d 176 /**
280c2a77
S
177 * Setup hook that can be overridden by a Poolifier pool implementation
178 * to run code before workers are created in the abstract constructor.
729c563d 179 */
280c2a77
S
180 protected setupHook (): void {
181 // Can be overridden
182 }
c97c7edb 183
729c563d 184 /**
280c2a77
S
185 * Should return whether the worker is the main worker or not.
186 */
187 protected abstract isMain (): boolean
188
189 /**
190 * Increase the number of tasks that the given workers has done.
729c563d 191 *
280c2a77 192 * @param worker Workers whose tasks are increased.
729c563d 193 */
280c2a77
S
194 protected increaseWorkersTask (worker: Worker): void {
195 const numberOfTasksTheWorkerHas = this.tasks.get(worker)
196 if (numberOfTasksTheWorkerHas !== undefined) {
197 this.tasks.set(worker, numberOfTasksTheWorkerHas + 1)
c97c7edb
S
198 } else {
199 throw Error('Worker could not be found in tasks map')
200 }
201 }
202
729c563d
S
203 /**
204 * Removes the given worker from the pool.
205 *
206 * @param worker Worker that will be removed.
207 */
f2fdaa86
JB
208 protected removeWorker (worker: Worker): void {
209 // Clean worker from data structure
210 const workerIndex = this.workers.indexOf(worker)
211 this.workers.splice(workerIndex, 1)
212 this.tasks.delete(worker)
213 }
214
280c2a77
S
215 /**
216 * Choose a worker for the next task.
217 *
218 * The default implementation uses a round robin algorithm to distribute the load.
219 *
220 * @returns Worker.
221 */
222 protected chooseWorker (): Worker {
223 const chosenWorker = this.workers[this.nextWorkerIndex]
224 this.nextWorkerIndex++
225 this.nextWorkerIndex %= this.workers.length
226 return chosenWorker
c97c7edb
S
227 }
228
280c2a77
S
229 /**
230 * Send a message to the given worker.
231 *
232 * @param worker The worker which should receive the message.
233 * @param message The message.
234 */
235 protected abstract sendToWorker (
236 worker: Worker,
237 message: MessageValue<Data>
238 ): void
239
c97c7edb
S
240 protected abstract registerWorkerMessageListener (
241 port: Worker,
242 listener: (message: MessageValue<Response>) => void
243 ): void
244
245 protected abstract unregisterWorkerMessageListener (
246 port: Worker,
247 listener: (message: MessageValue<Response>) => void
248 ): void
249
280c2a77
S
250 protected internalExecute (
251 worker: Worker,
252 messageId: number
253 ): Promise<Response> {
c97c7edb
S
254 return new Promise((resolve, reject) => {
255 const listener: (message: MessageValue<Response>) => void = message => {
280c2a77 256 if (message.id === messageId) {
c97c7edb 257 this.unregisterWorkerMessageListener(worker, listener)
280c2a77 258 this.increaseWorkersTask(worker)
c97c7edb
S
259 if (message.error) reject(message.error)
260 else resolve(message.data as Response)
261 }
262 }
263 this.registerWorkerMessageListener(worker, listener)
264 })
265 }
266
729c563d
S
267 /**
268 * Returns a newly created worker.
269 */
280c2a77 270 protected abstract createWorker (): Worker
c97c7edb 271
729c563d
S
272 /**
273 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
274 *
275 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
276 *
277 * @param worker The newly created worker.
278 */
280c2a77 279 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 280
729c563d
S
281 /**
282 * Creates a new worker for this pool and sets it up completely.
50eceb07
S
283 *
284 * @returns New, completely set up worker.
729c563d 285 */
280c2a77
S
286 protected createAndSetupWorker (): Worker {
287 const worker: Worker = this.createWorker()
288
c97c7edb
S
289 worker.on('error', this.opts.errorHandler ?? (() => {}))
290 worker.on('online', this.opts.onlineHandler ?? (() => {}))
291 // TODO handle properly when a worker exit
292 worker.on('exit', this.opts.exitHandler ?? (() => {}))
280c2a77 293
c97c7edb 294 this.workers.push(worker)
280c2a77
S
295
296 // Init tasks map
c97c7edb 297 this.tasks.set(worker, 0)
280c2a77
S
298
299 this.afterWorkerSetup(worker)
300
c97c7edb
S
301 return worker
302 }
303}