6393fe2eb599daa0971f5d66072a5c2f890737eb
[poolifier.git] / src / pools / abstract-pool.ts
1 import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4 } from '../utility-types'
5 import { EMPTY_FUNCTION } from '../utils'
6 import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
7 import type { PoolOptions } from './pool'
8 import type { IPoolInternal, TasksUsage } from './pool-internal'
9 import { PoolEmitter, PoolType } from './pool-internal'
10 import type { IPoolWorker } from './pool-worker'
11 import {
12 WorkerChoiceStrategies,
13 WorkerChoiceStrategy
14 } from './selection-strategies/selection-strategies-types'
15 import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
16
17 /**
18 * Base class that implements some shared logic for all poolifier pools.
19 *
20 * @template Worker Type of worker which manages this pool.
21 * @template Data Type of data sent to the worker. This can only be serializable data.
22 * @template Response Type of response of execution. This can only be serializable data.
23 */
24 export abstract class AbstractPool<
25 Worker extends IPoolWorker,
26 Data = unknown,
27 Response = unknown
28 > implements IPoolInternal<Worker, Data, Response> {
29 /** @inheritDoc */
30 public readonly workers: Worker[] = []
31
32 /** @inheritDoc */
33 public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
34 Worker,
35 TasksUsage
36 >()
37
38 /** @inheritDoc */
39 public readonly emitter?: PoolEmitter
40
41 /** @inheritDoc */
42 public readonly max?: number
43
44 /**
45 * The promise map.
46 *
47 * - `key`: This is the message Id of each submitted task.
48 * - `value`: An object that contains the worker, the resolve function and the reject function.
49 *
50 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
51 */
52 protected promiseMap: Map<
53 number,
54 PromiseWorkerResponseWrapper<Worker, Response>
55 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
56
57 /**
58 * Id of the next message.
59 */
60 protected nextMessageId: number = 0
61
62 /**
63 * Worker choice strategy instance implementing the worker choice algorithm.
64 *
65 * Default to a strategy implementing a round robin algorithm.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
73 /**
74 * Constructs a new poolifier pool.
75 *
76 * @param numberOfWorkers Number of workers that this pool should manage.
77 * @param filePath Path to the worker-file.
78 * @param opts Options for the pool.
79 */
80 public constructor (
81 public readonly numberOfWorkers: number,
82 public readonly filePath: string,
83 public readonly opts: PoolOptions<Worker>
84 ) {
85 if (this.isMain() === false) {
86 throw new Error('Cannot start a pool from a worker!')
87 }
88 this.checkNumberOfWorkers(this.numberOfWorkers)
89 this.checkFilePath(this.filePath)
90 this.checkPoolOptions(this.opts)
91 this.setupHook()
92
93 for (let i = 1; i <= this.numberOfWorkers; i++) {
94 this.createAndSetupWorker()
95 }
96
97 if (this.opts.enableEvents === true) {
98 this.emitter = new PoolEmitter()
99 }
100 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
101 this,
102 () => {
103 const workerCreated = this.createAndSetupWorker()
104 this.registerWorkerMessageListener(workerCreated, message => {
105 if (
106 isKillBehavior(KillBehaviors.HARD, message.kill) ||
107 this.getWorkerRunningTasks(workerCreated) === 0
108 ) {
109 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
110 this.destroyWorker(workerCreated) as void
111 }
112 })
113 return workerCreated
114 },
115 this.opts.workerChoiceStrategy
116 )
117 }
118
119 private checkFilePath (filePath: string): void {
120 if (!filePath) {
121 throw new Error('Please specify a file with a worker implementation')
122 }
123 }
124
125 private checkNumberOfWorkers (numberOfWorkers: number): void {
126 if (numberOfWorkers == null) {
127 throw new Error(
128 'Cannot instantiate a pool without specifying the number of workers'
129 )
130 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
131 throw new Error(
132 'Cannot instantiate a pool with a non integer number of workers'
133 )
134 } else if (numberOfWorkers < 0) {
135 throw new Error(
136 'Cannot instantiate a pool with a negative number of workers'
137 )
138 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
139 throw new Error('Cannot instantiate a fixed pool with no worker')
140 }
141 }
142
143 private checkPoolOptions (opts: PoolOptions<Worker>): void {
144 this.opts.workerChoiceStrategy =
145 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
146 this.opts.enableEvents = opts.enableEvents ?? true
147 }
148
149 /** @inheritDoc */
150 public abstract get type (): PoolType
151
152 /** @inheritDoc */
153 public get numberOfRunningTasks (): number {
154 return this.promiseMap.size
155 }
156
157 /** @inheritDoc */
158 public getWorkerIndex (worker: Worker): number {
159 return this.workers.indexOf(worker)
160 }
161
162 /** @inheritDoc */
163 public getWorkerRunningTasks (worker: Worker): number | undefined {
164 return this.workersTasksUsage.get(worker)?.running
165 }
166
167 /** @inheritDoc */
168 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
169 return this.workersTasksUsage.get(worker)?.avgRunTime
170 }
171
172 /** @inheritDoc */
173 public setWorkerChoiceStrategy (
174 workerChoiceStrategy: WorkerChoiceStrategy
175 ): void {
176 this.opts.workerChoiceStrategy = workerChoiceStrategy
177 for (const worker of this.workers) {
178 this.resetWorkerTasksUsage(worker)
179 }
180 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
181 workerChoiceStrategy
182 )
183 }
184
185 /** @inheritDoc */
186 public abstract get busy (): boolean
187
188 protected internalGetBusyStatus (): boolean {
189 return (
190 this.numberOfRunningTasks >= this.numberOfWorkers &&
191 this.findFreeWorker() === false
192 )
193 }
194
195 /** @inheritDoc */
196 public findFreeWorker (): Worker | false {
197 for (const worker of this.workers) {
198 if (this.getWorkerRunningTasks(worker) === 0) {
199 // A worker is free, return the matching worker
200 return worker
201 }
202 }
203 return false
204 }
205
206 /** @inheritDoc */
207 public execute (data: Data): Promise<Response> {
208 // Configure worker to handle message with the specified task
209 const worker = this.chooseWorker()
210 const res = this.internalExecute(worker, this.nextMessageId)
211 this.checkAndEmitBusy()
212 this.sendToWorker(worker, {
213 data: data ?? ({} as Data),
214 id: this.nextMessageId
215 })
216 ++this.nextMessageId
217 return res
218 }
219
220 /** @inheritDoc */
221 public async destroy (): Promise<void> {
222 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
223 }
224
225 /**
226 * Shutdowns given worker.
227 *
228 * @param worker A worker within `workers`.
229 */
230 protected abstract destroyWorker (worker: Worker): void | Promise<void>
231
232 /**
233 * Setup hook that can be overridden by a Poolifier pool implementation
234 * to run code before workers are created in the abstract constructor.
235 */
236 protected setupHook (): void {
237 // Can be overridden
238 }
239
240 /**
241 * Should return whether the worker is the main worker or not.
242 */
243 protected abstract isMain (): boolean
244
245 /**
246 * Hook executed before the worker task promise resolution.
247 * Can be overridden.
248 *
249 * @param worker The worker.
250 */
251 protected beforePromiseWorkerResponseHook (worker: Worker): void {
252 this.increaseWorkerRunningTasks(worker)
253 }
254
255 /**
256 * Hook executed after the worker task promise resolution.
257 * Can be overridden.
258 *
259 * @param message The received message.
260 * @param promise The Promise response.
261 */
262 protected afterPromiseWorkerResponseHook (
263 message: MessageValue<Response>,
264 promise: PromiseWorkerResponseWrapper<Worker, Response>
265 ): void {
266 this.decreaseWorkerRunningTasks(promise.worker)
267 this.stepWorkerRunTasks(promise.worker, 1)
268 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
269 }
270
271 /**
272 * Removes the given worker from the pool.
273 *
274 * @param worker The worker that will be removed.
275 */
276 protected removeWorker (worker: Worker): void {
277 // Clean worker from data structure
278 this.workers.splice(this.getWorkerIndex(worker), 1)
279 this.removeWorkerTasksUsage(worker)
280 }
281
282 /**
283 * Chooses a worker for the next task.
284 *
285 * The default implementation uses a round robin algorithm to distribute the load.
286 *
287 * @returns Worker.
288 */
289 protected chooseWorker (): Worker {
290 return this.workerChoiceStrategyContext.execute()
291 }
292
293 /**
294 * Sends a message to the given worker.
295 *
296 * @param worker The worker which should receive the message.
297 * @param message The message.
298 */
299 protected abstract sendToWorker (
300 worker: Worker,
301 message: MessageValue<Data>
302 ): void
303
304 /**
305 * Registers a listener callback on a given worker.
306 *
307 * @param worker The worker which should register a listener.
308 * @param listener The message listener callback.
309 */
310 protected abstract registerWorkerMessageListener<
311 Message extends Data | Response
312 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
313
314 protected internalExecute (
315 worker: Worker,
316 messageId: number
317 ): Promise<Response> {
318 this.beforePromiseWorkerResponseHook(worker)
319 return new Promise<Response>((resolve, reject) => {
320 this.promiseMap.set(messageId, { resolve, reject, worker })
321 })
322 }
323
324 /**
325 * Returns a newly created worker.
326 */
327 protected abstract createWorker (): Worker
328
329 /**
330 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
331 *
332 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
333 *
334 * @param worker The newly created worker.
335 */
336 protected abstract afterWorkerSetup (worker: Worker): void
337
338 /**
339 * Creates a new worker for this pool and sets it up completely.
340 *
341 * @returns New, completely set up worker.
342 */
343 protected createAndSetupWorker (): Worker {
344 const worker = this.createWorker()
345
346 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
347 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
348 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
349 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
350 worker.once('exit', () => this.removeWorker(worker))
351
352 this.workers.push(worker)
353
354 // Init worker tasks usage map
355 this.initWorkerTasksUsage(worker)
356
357 this.afterWorkerSetup(worker)
358
359 return worker
360 }
361
362 /**
363 * This function is the listener registered for each worker.
364 *
365 * @returns The listener function to execute when a message is received from a worker.
366 */
367 protected workerListener (): (message: MessageValue<Response>) => void {
368 return message => {
369 if (message.id !== undefined) {
370 const promise = this.promiseMap.get(message.id)
371 if (promise !== undefined) {
372 if (message.error) {
373 promise.reject(message.error)
374 } else {
375 promise.resolve(message.data as Response)
376 }
377 this.afterPromiseWorkerResponseHook(message, promise)
378 this.promiseMap.delete(message.id)
379 }
380 }
381 }
382 }
383
384 private checkAndEmitBusy (): void {
385 if (this.opts.enableEvents === true && this.busy === true) {
386 this.emitter?.emit('busy')
387 }
388 }
389
390 /**
391 * Increases the number of tasks that the given worker has applied.
392 *
393 * @param worker Worker which running tasks is increased.
394 */
395 private increaseWorkerRunningTasks (worker: Worker): void {
396 this.stepWorkerRunningTasks(worker, 1)
397 }
398
399 /**
400 * Decreases the number of tasks that the given worker has applied.
401 *
402 * @param worker Worker which running tasks is decreased.
403 */
404 private decreaseWorkerRunningTasks (worker: Worker): void {
405 this.stepWorkerRunningTasks(worker, -1)
406 }
407
408 /**
409 * Steps the number of tasks that the given worker has applied.
410 *
411 * @param worker Worker which running tasks are stepped.
412 * @param step Number of running tasks step.
413 */
414 private stepWorkerRunningTasks (worker: Worker, step: number): void {
415 if (this.checkWorkerTasksUsage(worker) === true) {
416 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
417 tasksUsage.running = tasksUsage.running + step
418 this.workersTasksUsage.set(worker, tasksUsage)
419 }
420 }
421
422 /**
423 * Steps the number of tasks that the given worker has run.
424 *
425 * @param worker Worker which has run tasks.
426 * @param step Number of run tasks step.
427 */
428 private stepWorkerRunTasks (worker: Worker, step: number): void {
429 if (this.checkWorkerTasksUsage(worker) === true) {
430 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
431 tasksUsage.run = tasksUsage.run + step
432 this.workersTasksUsage.set(worker, tasksUsage)
433 }
434 }
435
436 /**
437 * Updates tasks runtime for the given worker.
438 *
439 * @param worker Worker which run the task.
440 * @param taskRunTime Worker task runtime.
441 */
442 private updateWorkerTasksRunTime (
443 worker: Worker,
444 taskRunTime: number | undefined
445 ): void {
446 if (
447 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
448 .requiredStatistics.runTime === true &&
449 this.checkWorkerTasksUsage(worker) === true
450 ) {
451 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
452 tasksUsage.runTime += taskRunTime ?? 0
453 if (tasksUsage.run !== 0) {
454 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
455 }
456 this.workersTasksUsage.set(worker, tasksUsage)
457 }
458 }
459
460 /**
461 * Checks if the given worker is registered in the workers tasks usage map.
462 *
463 * @param worker Worker to check.
464 * @returns `true` if the worker is registered in the workers tasks usage map. `false` otherwise.
465 */
466 private checkWorkerTasksUsage (worker: Worker): boolean {
467 const hasTasksUsage = this.workersTasksUsage.has(worker)
468 if (hasTasksUsage === false) {
469 throw new Error('Worker could not be found in workers tasks usage map')
470 }
471 return hasTasksUsage
472 }
473
474 /**
475 * Initializes tasks usage statistics.
476 *
477 * @param worker The worker.
478 */
479 initWorkerTasksUsage (worker: Worker): void {
480 this.workersTasksUsage.set(worker, {
481 run: 0,
482 running: 0,
483 runTime: 0,
484 avgRunTime: 0
485 })
486 }
487
488 /**
489 * Removes worker tasks usage statistics.
490 *
491 * @param worker The worker.
492 */
493 private removeWorkerTasksUsage (worker: Worker): void {
494 this.workersTasksUsage.delete(worker)
495 }
496
497 /**
498 * Resets worker tasks usage statistics.
499 *
500 * @param worker The worker.
501 */
502 private resetWorkerTasksUsage (worker: Worker): void {
503 this.removeWorkerTasksUsage(worker)
504 this.initWorkerTasksUsage(worker)
505 }
506 }