Removed max tasks (#225)
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
a35560ba
S
5import type { IPoolInternal } from './pool-internal'
6import { PoolEmitter } from './pool-internal'
7import type { WorkerChoiceStrategy } from './selection-strategies'
8import {
9 WorkerChoiceStrategies,
10 WorkerChoiceStrategyContext
11} from './selection-strategies'
c97c7edb 12
c510fea7
APA
13/**
14 * An intentional empty function.
15 */
a35560ba
S
16const EMPTY_FUNCTION: () => void = () => {
17 /* Intentionally empty */
c510fea7
APA
18}
19
729c563d
S
20/**
21 * Callback invoked if the worker raised an error.
22 */
c97c7edb 23export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
729c563d
S
24
25/**
26 * Callback invoked when the worker has started successfully.
27 */
c97c7edb 28export type OnlineHandler<Worker> = (this: Worker) => void
729c563d
S
29
30/**
31 * Callback invoked when the worker exits successfully.
32 */
c97c7edb
S
33export type ExitHandler<Worker> = (this: Worker, code: number) => void
34
729c563d
S
35/**
36 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
37 */
c97c7edb 38export interface IWorker {
3832ad95
S
39 /**
40 * Register a listener to the error event.
41 *
42 * @param event `'error'`.
43 * @param handler The error handler.
44 */
c97c7edb 45 on(event: 'error', handler: ErrorHandler<this>): void
3832ad95
S
46 /**
47 * Register a listener to the online event.
48 *
49 * @param event `'online'`.
50 * @param handler The online handler.
51 */
c97c7edb 52 on(event: 'online', handler: OnlineHandler<this>): void
3832ad95
S
53 /**
54 * Register a listener to the exit event.
55 *
56 * @param event `'exit'`.
57 * @param handler The exit handler.
58 */
c97c7edb 59 on(event: 'exit', handler: ExitHandler<this>): void
3832ad95
S
60 /**
61 * Register a listener to the exit event that will only performed once.
62 *
63 * @param event `'exit'`.
64 * @param handler The exit handler.
65 */
45dbbb14 66 once(event: 'exit', handler: ExitHandler<this>): void
c97c7edb
S
67}
68
729c563d
S
69/**
70 * Options for a poolifier pool.
71 */
c97c7edb
S
72export interface PoolOptions<Worker> {
73 /**
74 * A function that will listen for error event on each worker.
75 */
76 errorHandler?: ErrorHandler<Worker>
77 /**
78 * A function that will listen for online event on each worker.
79 */
80 onlineHandler?: OnlineHandler<Worker>
81 /**
82 * A function that will listen for exit event on each worker.
83 */
84 exitHandler?: ExitHandler<Worker>
a35560ba
S
85 /**
86 * The work choice strategy to use in this pool.
87 */
88 workerChoiceStrategy?: WorkerChoiceStrategy
c97c7edb
S
89}
90
729c563d
S
91/**
92 * Base class containing some shared logic for all poolifier pools.
93 *
94 * @template Worker Type of worker which manages this pool.
deb85c12
JB
95 * @template Data Type of data sent to the worker. This can only be serializable data.
96 * @template Response Type of response of execution. This can only be serializable data.
729c563d 97 */
c97c7edb
S
98export abstract class AbstractPool<
99 Worker extends IWorker,
d3c8a1a8
S
100 Data = unknown,
101 Response = unknown
a35560ba 102> implements IPoolInternal<Worker, Data, Response> {
be0676b3
APA
103 /**
104 * The promise map.
105 *
106 * - `key`: This is the message ID of each submitted task.
107 * - `value`: An object that contains the worker, the resolve function and the reject function.
108 *
109 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
110 */
111 protected promiseMap: Map<
112 number,
113 PromiseWorkerResponseWrapper<Worker, Response>
114 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
115
a35560ba 116 /** @inheritdoc */
c97c7edb 117 public readonly workers: Worker[] = []
729c563d 118
a35560ba 119 /** @inheritdoc */
c97c7edb
S
120 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
121
a35560ba 122 /** @inheritdoc */
c97c7edb
S
123 public readonly emitter: PoolEmitter
124
729c563d
S
125 /**
126 * ID of the next message.
127 */
280c2a77 128 protected nextMessageId: number = 0
c97c7edb 129
a35560ba
S
130 /**
131 * Worker choice strategy instance implementing the worker choice algorithm.
132 *
133 * Default to a strategy implementing a round robin algorithm.
134 */
135 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
139 >
140
729c563d
S
141 /**
142 * Constructs a new poolifier pool.
143 *
5c5a1fb7 144 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 145 * @param filePath Path to the worker-file.
1927ee67 146 * @param opts Options for the pool.
729c563d 147 */
c97c7edb 148 public constructor (
5c5a1fb7 149 public readonly numberOfWorkers: number,
c97c7edb 150 public readonly filePath: string,
1927ee67 151 public readonly opts: PoolOptions<Worker>
c97c7edb
S
152 ) {
153 if (!this.isMain()) {
154 throw new Error('Cannot start a pool from a worker!')
155 }
8d3782fa 156 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 157 this.checkFilePath(this.filePath)
c97c7edb
S
158 this.setupHook()
159
5c5a1fb7 160 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 161 this.createAndSetupWorker()
c97c7edb
S
162 }
163
164 this.emitter = new PoolEmitter()
a35560ba
S
165 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
166 this,
167 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
168 )
c97c7edb
S
169 }
170
a35560ba 171 private checkFilePath (filePath: string): void {
c510fea7
APA
172 if (!filePath) {
173 throw new Error('Please specify a file with a worker implementation')
174 }
175 }
176
8d3782fa
JB
177 private checkNumberOfWorkers (numberOfWorkers: number): void {
178 if (numberOfWorkers == null) {
179 throw new Error(
180 'Cannot instantiate a pool without specifying the number of workers'
181 )
182 } else if (!Number.isSafeInteger(numberOfWorkers)) {
183 throw new Error(
184 'Cannot instantiate a pool with a non integer number of workers'
185 )
186 } else if (numberOfWorkers < 0) {
187 throw new Error(
188 'Cannot instantiate a pool with a negative number of workers'
189 )
190 } else if (!this.isDynamic() && numberOfWorkers === 0) {
191 throw new Error('Cannot instantiate a fixed pool with no worker')
192 }
193 }
194
a35560ba
S
195 /** @inheritdoc */
196 public isDynamic (): boolean {
197 return false
198 }
199
200 /** @inheritdoc */
201 public setWorkerChoiceStrategy (
202 workerChoiceStrategy: WorkerChoiceStrategy
203 ): void {
b98ec2e6 204 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
205 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
206 workerChoiceStrategy
207 )
208 }
209
210 /** @inheritdoc */
280c2a77
S
211 public execute (data: Data): Promise<Response> {
212 // Configure worker to handle message with the specified task
213 const worker = this.chooseWorker()
214 this.increaseWorkersTask(worker)
215 const messageId = ++this.nextMessageId
216 const res = this.internalExecute(worker, messageId)
217 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
218 return res
219 }
c97c7edb 220
a35560ba 221 /** @inheritdoc */
c97c7edb 222 public async destroy (): Promise<void> {
45dbbb14 223 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
224 }
225
a35560ba
S
226 /** @inheritdoc */
227 public abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 228
729c563d 229 /**
280c2a77
S
230 * Setup hook that can be overridden by a Poolifier pool implementation
231 * to run code before workers are created in the abstract constructor.
729c563d 232 */
280c2a77
S
233 protected setupHook (): void {
234 // Can be overridden
235 }
c97c7edb 236
729c563d 237 /**
280c2a77
S
238 * Should return whether the worker is the main worker or not.
239 */
240 protected abstract isMain (): boolean
241
242 /**
243 * Increase the number of tasks that the given workers has done.
729c563d 244 *
f416c098 245 * @param worker Worker whose tasks are increased.
729c563d 246 */
280c2a77 247 protected increaseWorkersTask (worker: Worker): void {
f416c098 248 this.stepWorkerNumberOfTasks(worker, 1)
c97c7edb
S
249 }
250
c01733f1 251 /**
d63d3be3 252 * Decrease the number of tasks that the given workers has done.
c01733f1 253 *
f416c098 254 * @param worker Worker whose tasks are decreased.
c01733f1 255 */
256 protected decreaseWorkersTasks (worker: Worker): void {
f416c098
JB
257 this.stepWorkerNumberOfTasks(worker, -1)
258 }
259
260 /**
261 * Step the number of tasks that the given workers has done.
262 *
263 * @param worker Worker whose tasks are set.
264 * @param step Worker number of tasks step.
265 */
a35560ba 266 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
d63d3be3 267 const numberOfTasksInProgress = this.tasks.get(worker)
268 if (numberOfTasksInProgress !== undefined) {
f416c098 269 this.tasks.set(worker, numberOfTasksInProgress + step)
c01733f1 270 } else {
271 throw Error('Worker could not be found in tasks map')
272 }
273 }
274
729c563d
S
275 /**
276 * Removes the given worker from the pool.
277 *
278 * @param worker Worker that will be removed.
279 */
f2fdaa86
JB
280 protected removeWorker (worker: Worker): void {
281 // Clean worker from data structure
282 const workerIndex = this.workers.indexOf(worker)
283 this.workers.splice(workerIndex, 1)
284 this.tasks.delete(worker)
285 }
286
280c2a77
S
287 /**
288 * Choose a worker for the next task.
289 *
290 * The default implementation uses a round robin algorithm to distribute the load.
291 *
292 * @returns Worker.
293 */
294 protected chooseWorker (): Worker {
a35560ba 295 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
296 }
297
280c2a77
S
298 /**
299 * Send a message to the given worker.
300 *
301 * @param worker The worker which should receive the message.
302 * @param message The message.
303 */
304 protected abstract sendToWorker (
305 worker: Worker,
306 message: MessageValue<Data>
307 ): void
308
a35560ba
S
309 /** @inheritdoc */
310 public abstract registerWorkerMessageListener<
4f7fa42a
S
311 Message extends Data | Response
312 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 313
280c2a77
S
314 protected internalExecute (
315 worker: Worker,
316 messageId: number
317 ): Promise<Response> {
be0676b3
APA
318 return new Promise<Response>((resolve, reject) => {
319 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
320 })
321 }
322
729c563d
S
323 /**
324 * Returns a newly created worker.
325 */
280c2a77 326 protected abstract createWorker (): Worker
c97c7edb 327
729c563d
S
328 /**
329 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
330 *
331 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
332 *
333 * @param worker The newly created worker.
334 */
280c2a77 335 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 336
a35560ba
S
337 /** @inheritdoc */
338 public createAndSetupWorker (): Worker {
280c2a77
S
339 const worker: Worker = this.createWorker()
340
a35560ba
S
341 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
342 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
343 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 344 worker.once('exit', () => this.removeWorker(worker))
280c2a77 345
c97c7edb 346 this.workers.push(worker)
280c2a77
S
347
348 // Init tasks map
c97c7edb 349 this.tasks.set(worker, 0)
280c2a77
S
350
351 this.afterWorkerSetup(worker)
352
c97c7edb
S
353 return worker
354 }
be0676b3
APA
355
356 /**
357 * This function is the listener registered for each worker.
358 *
359 * @returns The listener function to execute when a message is sent from a worker.
360 */
361 protected workerListener (): (message: MessageValue<Response>) => void {
362 const listener: (message: MessageValue<Response>) => void = message => {
363 if (message.id) {
364 const value = this.promiseMap.get(message.id)
365 if (value) {
366 this.decreaseWorkersTasks(value.worker)
367 if (message.error) value.reject(message.error)
368 else value.resolve(message.data as Response)
369 this.promiseMap.delete(message.id)
370 }
371 }
372 }
373 return listener
374 }
c97c7edb 375}