Refine eslint configuration
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
bdaf31cd 7import type { PoolOptions } from './pool'
bf9549ae 8import type { IPoolInternal, TasksUsage } from './pool-internal'
7c0ba920 9import { PoolEmitter, PoolType } from './pool-internal'
ea7a90d3 10import type { IPoolWorker } from './pool-worker'
a35560ba
S
11import {
12 WorkerChoiceStrategies,
bdaf31cd
JB
13 WorkerChoiceStrategy
14} from './selection-strategies/selection-strategies-types'
15import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 16
bf9549ae
JB
17const WORKER_NOT_FOUND_TASKS_USAGE_MAP =
18 'Worker could not be found in worker tasks usage map'
19
729c563d 20/**
ea7a90d3 21 * Base class that implements some shared logic for all poolifier pools.
729c563d
S
22 *
23 * @template Worker Type of worker which manages this pool.
deb85c12
JB
24 * @template Data Type of data sent to the worker. This can only be serializable data.
25 * @template Response Type of response of execution. This can only be serializable data.
729c563d 26 */
c97c7edb 27export abstract class AbstractPool<
ea7a90d3 28 Worker extends IPoolWorker,
d3c8a1a8
S
29 Data = unknown,
30 Response = unknown
9b2fdd9f 31> implements IPoolInternal<Worker, Data, Response> {
a76fac14 32 /** @inheritDoc */
4a6952ff
JB
33 public readonly workers: Worker[] = []
34
ea7a90d3
JB
35 /** @inheritDoc */
36 public readonly workersTasksUsage: Map<Worker, TasksUsage> = new Map<
bf9549ae
JB
37 Worker,
38 TasksUsage
39 >()
4a6952ff 40
a76fac14 41 /** @inheritDoc */
7c0ba920
JB
42 public readonly emitter?: PoolEmitter
43
a76fac14 44 /** @inheritDoc */
7c0ba920 45 public readonly max?: number
4a6952ff 46
be0676b3
APA
47 /**
48 * The promise map.
49 *
e088a00c 50 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
51 * - `value`: An object that contains the worker, the resolve function and the reject function.
52 *
53 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
54 */
55 protected promiseMap: Map<
56 number,
57 PromiseWorkerResponseWrapper<Worker, Response>
58 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
59
729c563d 60 /**
e088a00c 61 * Id of the next message.
729c563d 62 */
280c2a77 63 protected nextMessageId: number = 0
c97c7edb 64
a35560ba
S
65 /**
66 * Worker choice strategy instance implementing the worker choice algorithm.
67 *
68 * Default to a strategy implementing a round robin algorithm.
69 */
70 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
71 Worker,
72 Data,
73 Response
74 >
75
729c563d
S
76 /**
77 * Constructs a new poolifier pool.
78 *
5c5a1fb7 79 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 80 * @param filePath Path to the worker-file.
1927ee67 81 * @param opts Options for the pool.
729c563d 82 */
c97c7edb 83 public constructor (
5c5a1fb7 84 public readonly numberOfWorkers: number,
c97c7edb 85 public readonly filePath: string,
1927ee67 86 public readonly opts: PoolOptions<Worker>
c97c7edb 87 ) {
6bd72cd0 88 if (this.isMain() === false) {
c97c7edb
S
89 throw new Error('Cannot start a pool from a worker!')
90 }
8d3782fa 91 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 92 this.checkFilePath(this.filePath)
7c0ba920 93 this.checkPoolOptions(this.opts)
c97c7edb
S
94 this.setupHook()
95
5c5a1fb7 96 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 97 this.createAndSetupWorker()
c97c7edb
S
98 }
99
6bd72cd0 100 if (this.opts.enableEvents === true) {
7c0ba920
JB
101 this.emitter = new PoolEmitter()
102 }
a35560ba
S
103 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
104 this,
4a6952ff
JB
105 () => {
106 const workerCreated = this.createAndSetupWorker()
f3636726 107 this.registerWorkerMessageListener(workerCreated, message => {
4a6952ff
JB
108 if (
109 isKillBehavior(KillBehaviors.HARD, message.kill) ||
bdaf31cd 110 this.getWorkerRunningTasks(workerCreated) === 0
4a6952ff
JB
111 ) {
112 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
f3636726 113 this.destroyWorker(workerCreated) as void
4a6952ff
JB
114 }
115 })
116 return workerCreated
117 },
e843b904 118 this.opts.workerChoiceStrategy
a35560ba 119 )
c97c7edb
S
120 }
121
a35560ba 122 private checkFilePath (filePath: string): void {
c510fea7
APA
123 if (!filePath) {
124 throw new Error('Please specify a file with a worker implementation')
125 }
126 }
127
8d3782fa
JB
128 private checkNumberOfWorkers (numberOfWorkers: number): void {
129 if (numberOfWorkers == null) {
130 throw new Error(
131 'Cannot instantiate a pool without specifying the number of workers'
132 )
b893d997 133 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
8d3782fa
JB
134 throw new Error(
135 'Cannot instantiate a pool with a non integer number of workers'
136 )
137 } else if (numberOfWorkers < 0) {
138 throw new Error(
139 'Cannot instantiate a pool with a negative number of workers'
140 )
7c0ba920 141 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
142 throw new Error('Cannot instantiate a fixed pool with no worker')
143 }
144 }
145
7c0ba920 146 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
149 this.opts.enableEvents = opts.enableEvents ?? true
150 }
151
a76fac14 152 /** @inheritDoc */
7c0ba920
JB
153 public abstract get type (): PoolType
154
a76fac14 155 /** @inheritDoc */
7c0ba920
JB
156 public get numberOfRunningTasks (): number {
157 return this.promiseMap.size
a35560ba
S
158 }
159
bf9549ae
JB
160 /** @inheritDoc */
161 public getWorkerIndex (worker: Worker): number {
162 return this.workers.indexOf(worker)
163 }
164
a76fac14 165 /** @inheritDoc */
bdaf31cd 166 public getWorkerRunningTasks (worker: Worker): number | undefined {
bf9549ae 167 return this.workersTasksUsage.get(worker)?.running
bdaf31cd
JB
168 }
169
a76fac14 170 /** @inheritDoc */
bf9549ae
JB
171 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
172 return this.workersTasksUsage.get(worker)?.avgRunTime
bdaf31cd
JB
173 }
174
a76fac14 175 /** @inheritDoc */
a35560ba
S
176 public setWorkerChoiceStrategy (
177 workerChoiceStrategy: WorkerChoiceStrategy
178 ): void {
b98ec2e6 179 this.opts.workerChoiceStrategy = workerChoiceStrategy
ea7a90d3
JB
180 for (const worker of this.workers) {
181 this.resetWorkerTasksUsage(worker)
182 }
a35560ba
S
183 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
184 workerChoiceStrategy
185 )
186 }
187
a76fac14 188 /** @inheritDoc */
7c0ba920
JB
189 public abstract get busy (): boolean
190
191 protected internalGetBusyStatus (): boolean {
192 return (
193 this.numberOfRunningTasks >= this.numberOfWorkers &&
bdaf31cd 194 this.findFreeWorker() === false
7c0ba920
JB
195 )
196 }
197
a76fac14 198 /** @inheritDoc */
bdaf31cd
JB
199 public findFreeWorker (): Worker | false {
200 for (const worker of this.workers) {
201 if (this.getWorkerRunningTasks(worker) === 0) {
202 // A worker is free, return the matching worker
203 return worker
7c0ba920
JB
204 }
205 }
206 return false
207 }
208
a76fac14 209 /** @inheritDoc */
280c2a77
S
210 public execute (data: Data): Promise<Response> {
211 // Configure worker to handle message with the specified task
212 const worker = this.chooseWorker()
280c2a77
S
213 const messageId = ++this.nextMessageId
214 const res = this.internalExecute(worker, messageId)
14916bf9 215 this.checkAndEmitBusy()
cf597bc5
JB
216 data = data ?? ({} as Data)
217 this.sendToWorker(worker, { data, id: messageId })
280c2a77
S
218 return res
219 }
c97c7edb 220
a76fac14 221 /** @inheritDoc */
c97c7edb 222 public async destroy (): Promise<void> {
45dbbb14 223 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
224 }
225
4a6952ff 226 /**
675bb809 227 * Shutdowns given worker.
4a6952ff
JB
228 *
229 * @param worker A worker within `workers`.
230 */
231 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 232
729c563d 233 /**
280c2a77
S
234 * Setup hook that can be overridden by a Poolifier pool implementation
235 * to run code before workers are created in the abstract constructor.
729c563d 236 */
280c2a77
S
237 protected setupHook (): void {
238 // Can be overridden
239 }
c97c7edb 240
729c563d 241 /**
280c2a77
S
242 * Should return whether the worker is the main worker or not.
243 */
244 protected abstract isMain (): boolean
245
246 /**
bf9549ae
JB
247 * Hook executed before the worker task promise resolution.
248 * Can be overridden.
729c563d 249 *
bf9549ae 250 * @param worker The worker.
729c563d 251 */
bf9549ae
JB
252 protected beforePromiseWorkerResponseHook (worker: Worker): void {
253 this.increaseWorkerRunningTasks(worker)
c97c7edb
S
254 }
255
c01733f1 256 /**
bf9549ae
JB
257 * Hook executed after the worker task promise resolution.
258 * Can be overridden.
c01733f1 259 *
bf9549ae
JB
260 * @param message The received message.
261 * @param promise The Promise response.
c01733f1 262 */
bf9549ae
JB
263 protected afterPromiseWorkerResponseHook (
264 message: MessageValue<Response>,
265 promise: PromiseWorkerResponseWrapper<Worker, Response>
266 ): void {
267 this.decreaseWorkerRunningTasks(promise.worker)
268 this.stepWorkerRunTasks(promise.worker, 1)
269 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
c01733f1 270 }
271
729c563d
S
272 /**
273 * Removes the given worker from the pool.
274 *
11df3590 275 * @param worker The worker that will be removed.
729c563d 276 */
f2fdaa86
JB
277 protected removeWorker (worker: Worker): void {
278 // Clean worker from data structure
bdaf31cd 279 this.workers.splice(this.getWorkerIndex(worker), 1)
10fcfaf4 280 this.removeWorkerTasksUsage(worker)
f2fdaa86
JB
281 }
282
280c2a77 283 /**
675bb809 284 * Chooses a worker for the next task.
280c2a77
S
285 *
286 * The default implementation uses a round robin algorithm to distribute the load.
287 *
288 * @returns Worker.
289 */
290 protected chooseWorker (): Worker {
a35560ba 291 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
292 }
293
280c2a77 294 /**
675bb809 295 * Sends a message to the given worker.
280c2a77
S
296 *
297 * @param worker The worker which should receive the message.
298 * @param message The message.
299 */
300 protected abstract sendToWorker (
301 worker: Worker,
302 message: MessageValue<Data>
303 ): void
304
4a6952ff 305 /**
bdede008 306 * Registers a listener callback on a given worker.
4a6952ff 307 *
11df3590
JB
308 * @param worker The worker which should register a listener.
309 * @param listener The message listener callback.
4a6952ff
JB
310 */
311 protected abstract registerWorkerMessageListener<
4f7fa42a
S
312 Message extends Data | Response
313 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 314
280c2a77
S
315 protected internalExecute (
316 worker: Worker,
317 messageId: number
318 ): Promise<Response> {
bf9549ae 319 this.beforePromiseWorkerResponseHook(worker)
be0676b3
APA
320 return new Promise<Response>((resolve, reject) => {
321 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
322 })
323 }
324
729c563d
S
325 /**
326 * Returns a newly created worker.
327 */
280c2a77 328 protected abstract createWorker (): Worker
c97c7edb 329
729c563d
S
330 /**
331 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
332 *
333 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
334 *
335 * @param worker The newly created worker.
336 */
280c2a77 337 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 338
4a6952ff
JB
339 /**
340 * Creates a new worker for this pool and sets it up completely.
341 *
342 * @returns New, completely set up worker.
343 */
344 protected createAndSetupWorker (): Worker {
bdacc2d2 345 const worker = this.createWorker()
280c2a77 346
35cf1c03 347 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
348 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
349 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
350 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 351 worker.once('exit', () => this.removeWorker(worker))
280c2a77 352
c97c7edb 353 this.workers.push(worker)
280c2a77 354
bf9549ae 355 // Init worker tasks usage map
ea7a90d3 356 this.initWorkerTasksUsage(worker)
280c2a77
S
357
358 this.afterWorkerSetup(worker)
359
c97c7edb
S
360 return worker
361 }
be0676b3
APA
362
363 /**
364 * This function is the listener registered for each worker.
365 *
bdacc2d2 366 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
367 */
368 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 369 return message => {
bdacc2d2
JB
370 if (message.id !== undefined) {
371 const promise = this.promiseMap.get(message.id)
372 if (promise !== undefined) {
bf9549ae 373 this.afterPromiseWorkerResponseHook(message, promise)
bdacc2d2
JB
374 if (message.error) promise.reject(message.error)
375 else promise.resolve(message.data as Response)
be0676b3
APA
376 this.promiseMap.delete(message.id)
377 }
378 }
379 }
be0676b3 380 }
7c0ba920
JB
381
382 private checkAndEmitBusy (): void {
6bd72cd0 383 if (this.opts.enableEvents === true && this.busy === true) {
7c0ba920
JB
384 this.emitter?.emit('busy')
385 }
386 }
bf9549ae
JB
387
388 /**
10fcfaf4 389 * Increases the number of tasks that the given worker has applied.
bf9549ae
JB
390 *
391 * @param worker Worker which running tasks is increased.
392 */
393 private increaseWorkerRunningTasks (worker: Worker): void {
394 this.stepWorkerRunningTasks(worker, 1)
395 }
396
397 /**
10fcfaf4 398 * Decreases the number of tasks that the given worker has applied.
bf9549ae
JB
399 *
400 * @param worker Worker which running tasks is decreased.
401 */
402 private decreaseWorkerRunningTasks (worker: Worker): void {
403 this.stepWorkerRunningTasks(worker, -1)
404 }
405
406 /**
10fcfaf4 407 * Steps the number of tasks that the given worker has applied.
bf9549ae
JB
408 *
409 * @param worker Worker which running tasks are stepped.
410 * @param step Number of running tasks step.
411 */
412 private stepWorkerRunningTasks (worker: Worker, step: number): void {
413 const tasksUsage = this.workersTasksUsage.get(worker)
414 if (tasksUsage !== undefined) {
415 tasksUsage.running = tasksUsage.running + step
416 this.workersTasksUsage.set(worker, tasksUsage)
417 } else {
418 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
419 }
420 }
421
422 /**
10fcfaf4 423 * Steps the number of tasks that the given worker has run.
bf9549ae
JB
424 *
425 * @param worker Worker which has run tasks.
426 * @param step Number of run tasks step.
427 */
10fcfaf4 428 private stepWorkerRunTasks (worker: Worker, step: number): void {
bf9549ae
JB
429 const tasksUsage = this.workersTasksUsage.get(worker)
430 if (tasksUsage !== undefined) {
431 tasksUsage.run = tasksUsage.run + step
432 this.workersTasksUsage.set(worker, tasksUsage)
433 } else {
434 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
435 }
436 }
437
438 /**
23135a89 439 * Updates tasks runtime for the given worker.
bf9549ae
JB
440 *
441 * @param worker Worker which run the task.
23135a89 442 * @param taskRunTime Worker task runtime.
bf9549ae
JB
443 */
444 private updateWorkerTasksRunTime (
445 worker: Worker,
446 taskRunTime: number | undefined
10fcfaf4
JB
447 ): void {
448 if (
449 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
450 .requiredStatistics.runTime === true
451 ) {
452 const tasksUsage = this.workersTasksUsage.get(worker)
675bb809 453 if (tasksUsage !== undefined) {
10fcfaf4 454 tasksUsage.runTime += taskRunTime ?? 0
675bb809
JB
455 if (tasksUsage.run !== 0) {
456 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
457 }
10fcfaf4
JB
458 this.workersTasksUsage.set(worker, tasksUsage)
459 } else {
460 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
461 }
bf9549ae
JB
462 }
463 }
464
ea7a90d3
JB
465 /**
466 * Initializes tasks usage statistics.
467 *
468 * @param worker The worker.
469 */
470 initWorkerTasksUsage (worker: Worker): void {
471 this.workersTasksUsage.set(worker, {
472 run: 0,
473 running: 0,
474 runTime: 0,
475 avgRunTime: 0
476 })
477 }
478
bf9549ae 479 /**
10fcfaf4 480 * Removes worker tasks usage statistics.
bf9549ae
JB
481 *
482 * @param worker The worker.
483 */
10fcfaf4 484 private removeWorkerTasksUsage (worker: Worker): void {
bf9549ae
JB
485 this.workersTasksUsage.delete(worker)
486 }
ea7a90d3
JB
487
488 /**
489 * Resets worker tasks usage statistics.
490 *
491 * @param worker The worker.
492 */
493 private resetWorkerTasksUsage (worker: Worker): void {
494 this.removeWorkerTasksUsage(worker)
495 this.initWorkerTasksUsage(worker)
496 }
c97c7edb 497}