Apply dependencies update
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
bdaf31cd 7import type { PoolOptions } from './pool'
bf9549ae 8import type { IPoolInternal, TasksUsage } from './pool-internal'
7c0ba920 9import { PoolEmitter, PoolType } from './pool-internal'
ea7a90d3 10import type { IPoolWorker } from './pool-worker'
a35560ba
S
11import {
12 WorkerChoiceStrategies,
bdaf31cd
JB
13 WorkerChoiceStrategy
14} from './selection-strategies/selection-strategies-types'
15import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 16
729c563d 17/**
ea7a90d3 18 * Base class that implements some shared logic for all poolifier pools.
729c563d
S
19 *
20 * @template Worker Type of worker which manages this pool.
deb85c12
JB
21 * @template Data Type of data sent to the worker. This can only be serializable data.
22 * @template Response Type of response of execution. This can only be serializable data.
729c563d 23 */
c97c7edb 24export abstract class AbstractPool<
ea7a90d3 25 Worker extends IPoolWorker,
d3c8a1a8
S
26 Data = unknown,
27 Response = unknown
9b2fdd9f 28> implements IPoolInternal<Worker, Data, Response> {
a76fac14 29 /** @inheritDoc */
4a6952ff
JB
30 public readonly workers: Worker[] = []
31
ea7a90d3
JB
32 /** @inheritDoc */
33 public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
bf9549ae
JB
34 Worker,
35 TasksUsage
36 >()
4a6952ff 37
a76fac14 38 /** @inheritDoc */
7c0ba920
JB
39 public readonly emitter?: PoolEmitter
40
a76fac14 41 /** @inheritDoc */
7c0ba920 42 public readonly max?: number
4a6952ff 43
be0676b3
APA
44 /**
45 * The promise map.
46 *
e088a00c 47 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
48 * - `value`: An object that contains the worker, the resolve function and the reject function.
49 *
50 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
51 */
52 protected promiseMap: Map<
53 number,
54 PromiseWorkerResponseWrapper<Worker, Response>
55 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
56
729c563d 57 /**
e088a00c 58 * Id of the next message.
729c563d 59 */
280c2a77 60 protected nextMessageId: number = 0
c97c7edb 61
a35560ba
S
62 /**
63 * Worker choice strategy instance implementing the worker choice algorithm.
64 *
65 * Default to a strategy implementing a round robin algorithm.
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
68 Worker,
69 Data,
70 Response
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
5c5a1fb7 76 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 77 * @param filePath Path to the worker-file.
1927ee67 78 * @param opts Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
5c5a1fb7 81 public readonly numberOfWorkers: number,
c97c7edb 82 public readonly filePath: string,
1927ee67 83 public readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
6bd72cd0 85 if (this.isMain() === false) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
c97c7edb
S
91 this.setupHook()
92
5c5a1fb7 93 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 94 this.createAndSetupWorker()
c97c7edb
S
95 }
96
6bd72cd0 97 if (this.opts.enableEvents === true) {
7c0ba920
JB
98 this.emitter = new PoolEmitter()
99 }
a35560ba
S
100 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
101 this,
4a6952ff
JB
102 () => {
103 const workerCreated = this.createAndSetupWorker()
f3636726 104 this.registerWorkerMessageListener(workerCreated, message => {
4a6952ff
JB
105 if (
106 isKillBehavior(KillBehaviors.HARD, message.kill) ||
bdaf31cd 107 this.getWorkerRunningTasks(workerCreated) === 0
4a6952ff
JB
108 ) {
109 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
f3636726 110 this.destroyWorker(workerCreated) as void
4a6952ff
JB
111 }
112 })
113 return workerCreated
114 },
e843b904 115 this.opts.workerChoiceStrategy
a35560ba 116 )
c97c7edb
S
117 }
118
a35560ba 119 private checkFilePath (filePath: string): void {
c510fea7
APA
120 if (!filePath) {
121 throw new Error('Please specify a file with a worker implementation')
122 }
123 }
124
8d3782fa
JB
125 private checkNumberOfWorkers (numberOfWorkers: number): void {
126 if (numberOfWorkers == null) {
127 throw new Error(
128 'Cannot instantiate a pool without specifying the number of workers'
129 )
b893d997 130 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
8d3782fa
JB
131 throw new Error(
132 'Cannot instantiate a pool with a non integer number of workers'
133 )
134 } else if (numberOfWorkers < 0) {
135 throw new Error(
136 'Cannot instantiate a pool with a negative number of workers'
137 )
7c0ba920 138 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
139 throw new Error('Cannot instantiate a fixed pool with no worker')
140 }
141 }
142
7c0ba920 143 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
144 this.opts.workerChoiceStrategy =
145 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
146 this.opts.enableEvents = opts.enableEvents ?? true
147 }
148
a76fac14 149 /** @inheritDoc */
7c0ba920
JB
150 public abstract get type (): PoolType
151
a76fac14 152 /** @inheritDoc */
7c0ba920
JB
153 public get numberOfRunningTasks (): number {
154 return this.promiseMap.size
a35560ba
S
155 }
156
bf9549ae
JB
157 /** @inheritDoc */
158 public getWorkerIndex (worker: Worker): number {
159 return this.workers.indexOf(worker)
160 }
161
a76fac14 162 /** @inheritDoc */
bdaf31cd 163 public getWorkerRunningTasks (worker: Worker): number | undefined {
bf9549ae 164 return this.workersTasksUsage.get(worker)?.running
bdaf31cd
JB
165 }
166
a76fac14 167 /** @inheritDoc */
bf9549ae
JB
168 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
169 return this.workersTasksUsage.get(worker)?.avgRunTime
bdaf31cd
JB
170 }
171
a76fac14 172 /** @inheritDoc */
a35560ba
S
173 public setWorkerChoiceStrategy (
174 workerChoiceStrategy: WorkerChoiceStrategy
175 ): void {
b98ec2e6 176 this.opts.workerChoiceStrategy = workerChoiceStrategy
ea7a90d3
JB
177 for (const worker of this.workers) {
178 this.resetWorkerTasksUsage(worker)
179 }
a35560ba
S
180 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
181 workerChoiceStrategy
182 )
183 }
184
a76fac14 185 /** @inheritDoc */
7c0ba920
JB
186 public abstract get busy (): boolean
187
188 protected internalGetBusyStatus (): boolean {
189 return (
190 this.numberOfRunningTasks >= this.numberOfWorkers &&
bdaf31cd 191 this.findFreeWorker() === false
7c0ba920
JB
192 )
193 }
194
a76fac14 195 /** @inheritDoc */
bdaf31cd
JB
196 public findFreeWorker (): Worker | false {
197 for (const worker of this.workers) {
198 if (this.getWorkerRunningTasks(worker) === 0) {
199 // A worker is free, return the matching worker
200 return worker
7c0ba920
JB
201 }
202 }
203 return false
204 }
205
a76fac14 206 /** @inheritDoc */
280c2a77
S
207 public execute (data: Data): Promise<Response> {
208 // Configure worker to handle message with the specified task
209 const worker = this.chooseWorker()
a05c10de 210 const res = this.internalExecute(worker, this.nextMessageId)
14916bf9 211 this.checkAndEmitBusy()
a05c10de
JB
212 this.sendToWorker(worker, {
213 data: data ?? ({} as Data),
214 id: this.nextMessageId
215 })
216 ++this.nextMessageId
280c2a77
S
217 return res
218 }
c97c7edb 219
a76fac14 220 /** @inheritDoc */
c97c7edb 221 public async destroy (): Promise<void> {
45dbbb14 222 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
223 }
224
4a6952ff 225 /**
675bb809 226 * Shutdowns given worker.
4a6952ff
JB
227 *
228 * @param worker A worker within `workers`.
229 */
230 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 231
729c563d 232 /**
280c2a77
S
233 * Setup hook that can be overridden by a Poolifier pool implementation
234 * to run code before workers are created in the abstract constructor.
729c563d 235 */
280c2a77
S
236 protected setupHook (): void {
237 // Can be overridden
238 }
c97c7edb 239
729c563d 240 /**
280c2a77
S
241 * Should return whether the worker is the main worker or not.
242 */
243 protected abstract isMain (): boolean
244
245 /**
bf9549ae
JB
246 * Hook executed before the worker task promise resolution.
247 * Can be overridden.
729c563d 248 *
bf9549ae 249 * @param worker The worker.
729c563d 250 */
bf9549ae
JB
251 protected beforePromiseWorkerResponseHook (worker: Worker): void {
252 this.increaseWorkerRunningTasks(worker)
c97c7edb
S
253 }
254
c01733f1 255 /**
bf9549ae
JB
256 * Hook executed after the worker task promise resolution.
257 * Can be overridden.
c01733f1 258 *
bf9549ae
JB
259 * @param message The received message.
260 * @param promise The Promise response.
c01733f1 261 */
bf9549ae
JB
262 protected afterPromiseWorkerResponseHook (
263 message: MessageValue<Response>,
264 promise: PromiseWorkerResponseWrapper<Worker, Response>
265 ): void {
266 this.decreaseWorkerRunningTasks(promise.worker)
267 this.stepWorkerRunTasks(promise.worker, 1)
268 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
c01733f1 269 }
270
729c563d
S
271 /**
272 * Removes the given worker from the pool.
273 *
11df3590 274 * @param worker The worker that will be removed.
729c563d 275 */
f2fdaa86
JB
276 protected removeWorker (worker: Worker): void {
277 // Clean worker from data structure
bdaf31cd 278 this.workers.splice(this.getWorkerIndex(worker), 1)
10fcfaf4 279 this.removeWorkerTasksUsage(worker)
f2fdaa86
JB
280 }
281
280c2a77 282 /**
675bb809 283 * Chooses a worker for the next task.
280c2a77
S
284 *
285 * The default implementation uses a round robin algorithm to distribute the load.
286 *
287 * @returns Worker.
288 */
289 protected chooseWorker (): Worker {
a35560ba 290 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
291 }
292
280c2a77 293 /**
675bb809 294 * Sends a message to the given worker.
280c2a77
S
295 *
296 * @param worker The worker which should receive the message.
297 * @param message The message.
298 */
299 protected abstract sendToWorker (
300 worker: Worker,
301 message: MessageValue<Data>
302 ): void
303
4a6952ff 304 /**
bdede008 305 * Registers a listener callback on a given worker.
4a6952ff 306 *
11df3590
JB
307 * @param worker The worker which should register a listener.
308 * @param listener The message listener callback.
4a6952ff
JB
309 */
310 protected abstract registerWorkerMessageListener<
4f7fa42a
S
311 Message extends Data | Response
312 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 313
280c2a77
S
314 protected internalExecute (
315 worker: Worker,
316 messageId: number
317 ): Promise<Response> {
bf9549ae 318 this.beforePromiseWorkerResponseHook(worker)
be0676b3
APA
319 return new Promise<Response>((resolve, reject) => {
320 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
321 })
322 }
323
729c563d
S
324 /**
325 * Returns a newly created worker.
326 */
280c2a77 327 protected abstract createWorker (): Worker
c97c7edb 328
729c563d
S
329 /**
330 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
331 *
332 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
333 *
334 * @param worker The newly created worker.
335 */
280c2a77 336 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 337
4a6952ff
JB
338 /**
339 * Creates a new worker for this pool and sets it up completely.
340 *
341 * @returns New, completely set up worker.
342 */
343 protected createAndSetupWorker (): Worker {
bdacc2d2 344 const worker = this.createWorker()
280c2a77 345
35cf1c03 346 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
347 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
348 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
349 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 350 worker.once('exit', () => this.removeWorker(worker))
280c2a77 351
c97c7edb 352 this.workers.push(worker)
280c2a77 353
bf9549ae 354 // Init worker tasks usage map
ea7a90d3 355 this.initWorkerTasksUsage(worker)
280c2a77
S
356
357 this.afterWorkerSetup(worker)
358
c97c7edb
S
359 return worker
360 }
be0676b3
APA
361
362 /**
363 * This function is the listener registered for each worker.
364 *
bdacc2d2 365 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
366 */
367 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 368 return message => {
bdacc2d2
JB
369 if (message.id !== undefined) {
370 const promise = this.promiseMap.get(message.id)
371 if (promise !== undefined) {
a05c10de
JB
372 if (message.error) {
373 promise.reject(message.error)
374 } else {
375 promise.resolve(message.data as Response)
376 }
bf9549ae 377 this.afterPromiseWorkerResponseHook(message, promise)
be0676b3
APA
378 this.promiseMap.delete(message.id)
379 }
380 }
381 }
be0676b3 382 }
7c0ba920
JB
383
384 private checkAndEmitBusy (): void {
6bd72cd0 385 if (this.opts.enableEvents === true && this.busy === true) {
7c0ba920
JB
386 this.emitter?.emit('busy')
387 }
388 }
bf9549ae
JB
389
390 /**
10fcfaf4 391 * Increases the number of tasks that the given worker has applied.
bf9549ae
JB
392 *
393 * @param worker Worker which running tasks is increased.
394 */
395 private increaseWorkerRunningTasks (worker: Worker): void {
396 this.stepWorkerRunningTasks(worker, 1)
397 }
398
399 /**
10fcfaf4 400 * Decreases the number of tasks that the given worker has applied.
bf9549ae
JB
401 *
402 * @param worker Worker which running tasks is decreased.
403 */
404 private decreaseWorkerRunningTasks (worker: Worker): void {
405 this.stepWorkerRunningTasks(worker, -1)
406 }
407
408 /**
10fcfaf4 409 * Steps the number of tasks that the given worker has applied.
bf9549ae
JB
410 *
411 * @param worker Worker which running tasks are stepped.
412 * @param step Number of running tasks step.
413 */
414 private stepWorkerRunningTasks (worker: Worker, step: number): void {
a05c10de
JB
415 if (this.checkWorkerTasksUsage(worker) === true) {
416 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
bf9549ae
JB
417 tasksUsage.running = tasksUsage.running + step
418 this.workersTasksUsage.set(worker, tasksUsage)
bf9549ae
JB
419 }
420 }
421
422 /**
10fcfaf4 423 * Steps the number of tasks that the given worker has run.
bf9549ae
JB
424 *
425 * @param worker Worker which has run tasks.
426 * @param step Number of run tasks step.
427 */
10fcfaf4 428 private stepWorkerRunTasks (worker: Worker, step: number): void {
a05c10de
JB
429 if (this.checkWorkerTasksUsage(worker) === true) {
430 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
bf9549ae
JB
431 tasksUsage.run = tasksUsage.run + step
432 this.workersTasksUsage.set(worker, tasksUsage)
bf9549ae
JB
433 }
434 }
435
436 /**
23135a89 437 * Updates tasks runtime for the given worker.
bf9549ae
JB
438 *
439 * @param worker Worker which run the task.
23135a89 440 * @param taskRunTime Worker task runtime.
bf9549ae
JB
441 */
442 private updateWorkerTasksRunTime (
443 worker: Worker,
444 taskRunTime: number | undefined
10fcfaf4
JB
445 ): void {
446 if (
447 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
a05c10de
JB
448 .requiredStatistics.runTime === true &&
449 this.checkWorkerTasksUsage(worker) === true
10fcfaf4 450 ) {
a05c10de
JB
451 const tasksUsage = this.workersTasksUsage.get(worker) as TasksUsage
452 tasksUsage.runTime += taskRunTime ?? 0
453 if (tasksUsage.run !== 0) {
454 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
10fcfaf4 455 }
a05c10de
JB
456 this.workersTasksUsage.set(worker, tasksUsage)
457 }
458 }
459
460 /**
461 * Checks if the given worker is registered in the workers tasks usage map.
462 *
463 * @param worker Worker to check.
464 * @returns `true` if the worker is registered in the workers tasks usage map. `false` otherwise.
465 */
466 private checkWorkerTasksUsage (worker: Worker): boolean {
467 const hasTasksUsage = this.workersTasksUsage.has(worker)
468 if (hasTasksUsage === false) {
469 throw new Error('Worker could not be found in workers tasks usage map')
bf9549ae 470 }
a05c10de 471 return hasTasksUsage
bf9549ae
JB
472 }
473
ea7a90d3
JB
474 /**
475 * Initializes tasks usage statistics.
476 *
477 * @param worker The worker.
478 */
479 initWorkerTasksUsage (worker: Worker): void {
480 this.workersTasksUsage.set(worker, {
481 run: 0,
482 running: 0,
483 runTime: 0,
484 avgRunTime: 0
485 })
486 }
487
bf9549ae 488 /**
10fcfaf4 489 * Removes worker tasks usage statistics.
bf9549ae
JB
490 *
491 * @param worker The worker.
492 */
10fcfaf4 493 private removeWorkerTasksUsage (worker: Worker): void {
bf9549ae
JB
494 this.workersTasksUsage.delete(worker)
495 }
ea7a90d3
JB
496
497 /**
498 * Resets worker tasks usage statistics.
499 *
500 * @param worker The worker.
501 */
502 private resetWorkerTasksUsage (worker: Worker): void {
503 this.removeWorkerTasksUsage(worker)
504 this.initWorkerTasksUsage(worker)
505 }
c97c7edb 506}