Dedupe worker attributes (#364)
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import { EMPTY_FUNCTION } from '../utils'
6 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
7 import type { IPoolInternal } from './pool-internal'
8 import { PoolEmitter, PoolType } from './pool-internal'
9 import type { WorkerChoiceStrategy } from './selection-strategies'
10 import {
11 WorkerChoiceStrategies,
12 WorkerChoiceStrategyContext
13 } from './selection-strategies'
14
15 /**
16 * Callback invoked if the worker raised an error.
17 */
18 export type ErrorHandler<Worker> = (this: Worker, e: Error) => void
19
20 /**
21 * Callback invoked when the worker has started successfully.
22 */
23 export type OnlineHandler<Worker> = (this: Worker) => void
24
25 /**
26 * Callback invoked when the worker exits successfully.
27 */
28 export type ExitHandler<Worker> = (this: Worker, code: number) => void
29
30 /**
31 * Basic interface that describes the minimum required implementation of listener events for a pool-worker.
32 */
33 export interface IWorker {
34 /**
35 * Register a listener to the error event.
36 *
37 * @param event `'error'`.
38 * @param handler The error handler.
39 */
40 on(event: 'error', handler: ErrorHandler<this>): void
41 /**
42 * Register a listener to the online event.
43 *
44 * @param event `'online'`.
45 * @param handler The online handler.
46 */
47 on(event: 'online', handler: OnlineHandler<this>): void
48 /**
49 * Register a listener to the exit event.
50 *
51 * @param event `'exit'`.
52 * @param handler The exit handler.
53 */
54 on(event: 'exit', handler: ExitHandler<this>): void
55 /**
56 * Register a listener to the exit event that will only performed once.
57 *
58 * @param event `'exit'`.
59 * @param handler The exit handler.
60 */
61 once(event: 'exit', handler: ExitHandler<this>): void
62 }
63
64 /**
65 * Options for a poolifier pool.
66 */
67 export interface PoolOptions<Worker> {
68 /**
69 * A function that will listen for error event on each worker.
70 */
71 errorHandler?: ErrorHandler<Worker>
72 /**
73 * A function that will listen for online event on each worker.
74 */
75 onlineHandler?: OnlineHandler<Worker>
76 /**
77 * A function that will listen for exit event on each worker.
78 */
79 exitHandler?: ExitHandler<Worker>
80 /**
81 * The work choice strategy to use in this pool.
82 */
83 workerChoiceStrategy?: WorkerChoiceStrategy
84 /**
85 * Pool events emission.
86 *
87 * Default to true.
88 */
89 enableEvents?: boolean
90 }
91
92 /**
93 * Base class containing some shared logic for all poolifier pools.
94 *
95 * @template Worker Type of worker which manages this pool.
96 * @template Data Type of data sent to the worker. This can only be serializable data.
97 * @template Response Type of response of execution. This can only be serializable data.
98 */
99 export abstract class AbstractPool<
100 Worker extends IWorker,
101 Data = unknown,
102 Response = unknown
103 > implements IPoolInternal<Worker, Data, Response> {
104 /** @inheritdoc */
105 public readonly workers: Worker[] = []
106
107 /** @inheritdoc */
108 public readonly tasks: Map<Worker, number> = new Map<Worker, number>()
109
110 /** @inheritdoc */
111 public readonly emitter?: PoolEmitter
112
113 /** @inheritdoc */
114 public readonly max?: number
115
116 /**
117 * The promise map.
118 *
119 * - `key`: This is the message Id of each submitted task.
120 * - `value`: An object that contains the worker, the resolve function and the reject function.
121 *
122 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
123 */
124 protected promiseMap: Map<
125 number,
126 PromiseWorkerResponseWrapper<Worker, Response>
127 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
128
129 /**
130 * Id of the next message.
131 */
132 protected nextMessageId: number = 0
133
134 /**
135 * Worker choice strategy instance implementing the worker choice algorithm.
136 *
137 * Default to a strategy implementing a round robin algorithm.
138 */
139 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
140 Worker,
141 Data,
142 Response
143 >
144
145 /**
146 * Constructs a new poolifier pool.
147 *
148 * @param numberOfWorkers Number of workers that this pool should manage.
149 * @param filePath Path to the worker-file.
150 * @param opts Options for the pool.
151 */
152 public constructor (
153 public readonly numberOfWorkers: number,
154 public readonly filePath: string,
155 public readonly opts: PoolOptions<Worker>
156 ) {
157 if (!this.isMain()) {
158 throw new Error('Cannot start a pool from a worker!')
159 }
160 this.checkNumberOfWorkers(this.numberOfWorkers)
161 this.checkFilePath(this.filePath)
162 this.checkPoolOptions(this.opts)
163 this.setupHook()
164
165 for (let i = 1; i <= this.numberOfWorkers; i++) {
166 this.createAndSetupWorker()
167 }
168
169 if (this.opts.enableEvents) {
170 this.emitter = new PoolEmitter()
171 }
172 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
173 this,
174 () => {
175 const workerCreated = this.createAndSetupWorker()
176 this.registerWorkerMessageListener(workerCreated, async message => {
177 const tasksInProgress = this.tasks.get(workerCreated)
178 if (
179 isKillBehavior(KillBehaviors.HARD, message.kill) ||
180 tasksInProgress === 0
181 ) {
182 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
183 await this.destroyWorker(workerCreated)
184 }
185 })
186 return workerCreated
187 },
188 this.opts.workerChoiceStrategy
189 )
190 }
191
192 private checkFilePath (filePath: string): void {
193 if (!filePath) {
194 throw new Error('Please specify a file with a worker implementation')
195 }
196 }
197
198 private checkNumberOfWorkers (numberOfWorkers: number): void {
199 if (numberOfWorkers == null) {
200 throw new Error(
201 'Cannot instantiate a pool without specifying the number of workers'
202 )
203 } else if (!Number.isSafeInteger(numberOfWorkers)) {
204 throw new Error(
205 'Cannot instantiate a pool with a non integer number of workers'
206 )
207 } else if (numberOfWorkers < 0) {
208 throw new Error(
209 'Cannot instantiate a pool with a negative number of workers'
210 )
211 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
212 throw new Error('Cannot instantiate a fixed pool with no worker')
213 }
214 }
215
216 private checkPoolOptions (opts: PoolOptions<Worker>): void {
217 this.opts.workerChoiceStrategy =
218 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
219 this.opts.enableEvents = opts.enableEvents ?? true
220 }
221
222 /** @inheritdoc */
223 public abstract get type (): PoolType
224
225 /** @inheritdoc */
226 public get numberOfRunningTasks (): number {
227 return this.promiseMap.size
228 }
229
230 /** @inheritdoc */
231 public setWorkerChoiceStrategy (
232 workerChoiceStrategy: WorkerChoiceStrategy
233 ): void {
234 this.opts.workerChoiceStrategy = workerChoiceStrategy
235 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
236 workerChoiceStrategy
237 )
238 }
239
240 /** @inheritdoc */
241 public abstract get busy (): boolean
242
243 protected internalGetBusyStatus (): boolean {
244 return (
245 this.numberOfRunningTasks >= this.numberOfWorkers &&
246 this.findFreeTasksMapEntry() === false
247 )
248 }
249
250 /** @inheritdoc */
251 public findFreeTasksMapEntry (): [Worker, number] | false {
252 for (const [worker, numberOfTasks] of this.tasks) {
253 if (numberOfTasks === 0) {
254 // A worker is free, return the matching tasks map entry
255 return [worker, numberOfTasks]
256 }
257 }
258 return false
259 }
260
261 /** @inheritdoc */
262 public execute (data: Data): Promise<Response> {
263 // Configure worker to handle message with the specified task
264 const worker = this.chooseWorker()
265 const messageId = ++this.nextMessageId
266 const res = this.internalExecute(worker, messageId)
267 this.checkAndEmitBusy()
268 this.sendToWorker(worker, { data: data || ({} as Data), id: messageId })
269 return res
270 }
271
272 /** @inheritdoc */
273 public async destroy (): Promise<void> {
274 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
275 }
276
277 /**
278 * Shut down given worker.
279 *
280 * @param worker A worker within `workers`.
281 */
282 protected abstract destroyWorker (worker: Worker): void | Promise<void>
283
284 /**
285 * Setup hook that can be overridden by a Poolifier pool implementation
286 * to run code before workers are created in the abstract constructor.
287 */
288 protected setupHook (): void {
289 // Can be overridden
290 }
291
292 /**
293 * Should return whether the worker is the main worker or not.
294 */
295 protected abstract isMain (): boolean
296
297 /**
298 * Increase the number of tasks that the given workers has applied.
299 *
300 * @param worker Worker whose tasks are increased.
301 */
302 protected increaseWorkersTask (worker: Worker): void {
303 this.stepWorkerNumberOfTasks(worker, 1)
304 }
305
306 /**
307 * Decrease the number of tasks that the given workers has applied.
308 *
309 * @param worker Worker whose tasks are decreased.
310 */
311 protected decreaseWorkersTasks (worker: Worker): void {
312 this.stepWorkerNumberOfTasks(worker, -1)
313 }
314
315 /**
316 * Step the number of tasks that the given workers has applied.
317 *
318 * @param worker Worker whose tasks are set.
319 * @param step Worker number of tasks step.
320 */
321 private stepWorkerNumberOfTasks (worker: Worker, step: number): void {
322 const numberOfTasksInProgress = this.tasks.get(worker)
323 if (numberOfTasksInProgress !== undefined) {
324 this.tasks.set(worker, numberOfTasksInProgress + step)
325 } else {
326 throw Error('Worker could not be found in tasks map')
327 }
328 }
329
330 /**
331 * Removes the given worker from the pool.
332 *
333 * @param worker Worker that will be removed.
334 */
335 protected removeWorker (worker: Worker): void {
336 // Clean worker from data structure
337 const workerIndex = this.workers.indexOf(worker)
338 this.workers.splice(workerIndex, 1)
339 this.tasks.delete(worker)
340 }
341
342 /**
343 * Choose a worker for the next task.
344 *
345 * The default implementation uses a round robin algorithm to distribute the load.
346 *
347 * @returns Worker.
348 */
349 protected chooseWorker (): Worker {
350 return this.workerChoiceStrategyContext.execute()
351 }
352
353 /**
354 * Send a message to the given worker.
355 *
356 * @param worker The worker which should receive the message.
357 * @param message The message.
358 */
359 protected abstract sendToWorker (
360 worker: Worker,
361 message: MessageValue<Data>
362 ): void
363
364 /**
365 * Register a listener callback on a given worker.
366 *
367 * @param worker A worker.
368 * @param listener A message listener callback.
369 */
370 protected abstract registerWorkerMessageListener<
371 Message extends Data | Response
372 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
373
374 protected internalExecute (
375 worker: Worker,
376 messageId: number
377 ): Promise<Response> {
378 this.increaseWorkersTask(worker)
379 return new Promise<Response>((resolve, reject) => {
380 this.promiseMap.set(messageId, { resolve, reject, worker })
381 })
382 }
383
384 /**
385 * Returns a newly created worker.
386 */
387 protected abstract createWorker (): Worker
388
389 /**
390 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
391 *
392 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
393 *
394 * @param worker The newly created worker.
395 */
396 protected abstract afterWorkerSetup (worker: Worker): void
397
398 /**
399 * Creates a new worker for this pool and sets it up completely.
400 *
401 * @returns New, completely set up worker.
402 */
403 protected createAndSetupWorker (): Worker {
404 const worker: Worker = this.createWorker()
405
406 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
407 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
408 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
409 worker.once('exit', () => this.removeWorker(worker))
410
411 this.workers.push(worker)
412
413 // Init tasks map
414 this.tasks.set(worker, 0)
415
416 this.afterWorkerSetup(worker)
417
418 return worker
419 }
420
421 /**
422 * This function is the listener registered for each worker.
423 *
424 * @returns The listener function to execute when a message is sent from a worker.
425 */
426 protected workerListener (): (message: MessageValue<Response>) => void {
427 return message => {
428 if (message.id) {
429 const value = this.promiseMap.get(message.id)
430 if (value) {
431 this.decreaseWorkersTasks(value.worker)
432 if (message.error) value.reject(message.error)
433 else value.resolve(message.data as Response)
434 this.promiseMap.delete(message.id)
435 }
436 }
437 }
438 }
439
440 private checkAndEmitBusy (): void {
441 if (this.opts.enableEvents && this.busy) {
442 this.emitter?.emit('busy')
443 }
444 }
445 }