Strict boolean check
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
be0676b3
APA
1import type {
2 MessageValue,
3 PromiseWorkerResponseWrapper
4} from '../utility-types'
6e9d10db 5import { EMPTY_FUNCTION } from '../utils'
4a6952ff 6import { isKillBehavior, KillBehaviors } from '../worker/worker-options'
bdaf31cd
JB
7import type { AbstractPoolWorker } from './abstract-pool-worker'
8import type { PoolOptions } from './pool'
bf9549ae 9import type { IPoolInternal, TasksUsage } from './pool-internal'
7c0ba920 10import { PoolEmitter, PoolType } from './pool-internal'
a35560ba
S
11import {
12 WorkerChoiceStrategies,
bdaf31cd
JB
13 WorkerChoiceStrategy
14} from './selection-strategies/selection-strategies-types'
15import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 16
bf9549ae
JB
17const WORKER_NOT_FOUND_TASKS_USAGE_MAP =
18 'Worker could not be found in worker tasks usage map'
19
729c563d
S
20/**
21 * Base class containing some shared logic for all poolifier pools.
22 *
23 * @template Worker Type of worker which manages this pool.
deb85c12
JB
24 * @template Data Type of data sent to the worker. This can only be serializable data.
25 * @template Response Type of response of execution. This can only be serializable data.
729c563d 26 */
c97c7edb 27export abstract class AbstractPool<
bdaf31cd 28 Worker extends AbstractPoolWorker,
d3c8a1a8
S
29 Data = unknown,
30 Response = unknown
9b2fdd9f 31> implements IPoolInternal<Worker, Data, Response> {
a76fac14 32 /** @inheritDoc */
4a6952ff
JB
33 public readonly workers: Worker[] = []
34
bf9549ae
JB
35 /**
36 * The workers tasks usage map.
37 *
38 * `key`: The `Worker`
39 * `value`: Worker tasks usage statistics.
40 */
41 protected workersTasksUsage: Map<Worker, TasksUsage> = new Map<
42 Worker,
43 TasksUsage
44 >()
4a6952ff 45
a76fac14 46 /** @inheritDoc */
7c0ba920
JB
47 public readonly emitter?: PoolEmitter
48
a76fac14 49 /** @inheritDoc */
7c0ba920 50 public readonly max?: number
4a6952ff 51
be0676b3
APA
52 /**
53 * The promise map.
54 *
e088a00c 55 * - `key`: This is the message Id of each submitted task.
be0676b3
APA
56 * - `value`: An object that contains the worker, the resolve function and the reject function.
57 *
58 * When we receive a message from the worker we get a map entry and resolve/reject the promise based on the message.
59 */
60 protected promiseMap: Map<
61 number,
62 PromiseWorkerResponseWrapper<Worker, Response>
63 > = new Map<number, PromiseWorkerResponseWrapper<Worker, Response>>()
64
729c563d 65 /**
e088a00c 66 * Id of the next message.
729c563d 67 */
280c2a77 68 protected nextMessageId: number = 0
c97c7edb 69
a35560ba
S
70 /**
71 * Worker choice strategy instance implementing the worker choice algorithm.
72 *
73 * Default to a strategy implementing a round robin algorithm.
74 */
75 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
76 Worker,
77 Data,
78 Response
79 >
80
729c563d
S
81 /**
82 * Constructs a new poolifier pool.
83 *
5c5a1fb7 84 * @param numberOfWorkers Number of workers that this pool should manage.
729c563d 85 * @param filePath Path to the worker-file.
1927ee67 86 * @param opts Options for the pool.
729c563d 87 */
c97c7edb 88 public constructor (
5c5a1fb7 89 public readonly numberOfWorkers: number,
c97c7edb 90 public readonly filePath: string,
1927ee67 91 public readonly opts: PoolOptions<Worker>
c97c7edb
S
92 ) {
93 if (!this.isMain()) {
94 throw new Error('Cannot start a pool from a worker!')
95 }
8d3782fa 96 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 97 this.checkFilePath(this.filePath)
7c0ba920 98 this.checkPoolOptions(this.opts)
c97c7edb
S
99 this.setupHook()
100
5c5a1fb7 101 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 102 this.createAndSetupWorker()
c97c7edb
S
103 }
104
7c0ba920
JB
105 if (this.opts.enableEvents) {
106 this.emitter = new PoolEmitter()
107 }
a35560ba
S
108 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext(
109 this,
4a6952ff
JB
110 () => {
111 const workerCreated = this.createAndSetupWorker()
f3636726 112 this.registerWorkerMessageListener(workerCreated, message => {
4a6952ff
JB
113 if (
114 isKillBehavior(KillBehaviors.HARD, message.kill) ||
bdaf31cd 115 this.getWorkerRunningTasks(workerCreated) === 0
4a6952ff
JB
116 ) {
117 // Kill received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
f3636726 118 this.destroyWorker(workerCreated) as void
4a6952ff
JB
119 }
120 })
121 return workerCreated
122 },
e843b904 123 this.opts.workerChoiceStrategy
a35560ba 124 )
c97c7edb
S
125 }
126
a35560ba 127 private checkFilePath (filePath: string): void {
c510fea7
APA
128 if (!filePath) {
129 throw new Error('Please specify a file with a worker implementation')
130 }
131 }
132
8d3782fa
JB
133 private checkNumberOfWorkers (numberOfWorkers: number): void {
134 if (numberOfWorkers == null) {
135 throw new Error(
136 'Cannot instantiate a pool without specifying the number of workers'
137 )
b893d997 138 } else if (Number.isSafeInteger(numberOfWorkers) === false) {
8d3782fa
JB
139 throw new Error(
140 'Cannot instantiate a pool with a non integer number of workers'
141 )
142 } else if (numberOfWorkers < 0) {
143 throw new Error(
144 'Cannot instantiate a pool with a negative number of workers'
145 )
7c0ba920 146 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
147 throw new Error('Cannot instantiate a fixed pool with no worker')
148 }
149 }
150
7c0ba920 151 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
7c0ba920
JB
154 this.opts.enableEvents = opts.enableEvents ?? true
155 }
156
a76fac14 157 /** @inheritDoc */
7c0ba920
JB
158 public abstract get type (): PoolType
159
a76fac14 160 /** @inheritDoc */
7c0ba920
JB
161 public get numberOfRunningTasks (): number {
162 return this.promiseMap.size
a35560ba
S
163 }
164
bf9549ae
JB
165 /** @inheritDoc */
166 public getWorkerIndex (worker: Worker): number {
167 return this.workers.indexOf(worker)
168 }
169
a76fac14 170 /** @inheritDoc */
bdaf31cd 171 public getWorkerRunningTasks (worker: Worker): number | undefined {
bf9549ae 172 return this.workersTasksUsage.get(worker)?.running
bdaf31cd
JB
173 }
174
a76fac14 175 /** @inheritDoc */
bf9549ae
JB
176 public getWorkerAverageTasksRunTime (worker: Worker): number | undefined {
177 return this.workersTasksUsage.get(worker)?.avgRunTime
bdaf31cd
JB
178 }
179
a76fac14 180 /** @inheritDoc */
a35560ba
S
181 public setWorkerChoiceStrategy (
182 workerChoiceStrategy: WorkerChoiceStrategy
183 ): void {
b98ec2e6 184 this.opts.workerChoiceStrategy = workerChoiceStrategy
a35560ba
S
185 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
186 workerChoiceStrategy
187 )
188 }
189
a76fac14 190 /** @inheritDoc */
7c0ba920
JB
191 public abstract get busy (): boolean
192
193 protected internalGetBusyStatus (): boolean {
194 return (
195 this.numberOfRunningTasks >= this.numberOfWorkers &&
bdaf31cd 196 this.findFreeWorker() === false
7c0ba920
JB
197 )
198 }
199
a76fac14 200 /** @inheritDoc */
bdaf31cd
JB
201 public findFreeWorker (): Worker | false {
202 for (const worker of this.workers) {
203 if (this.getWorkerRunningTasks(worker) === 0) {
204 // A worker is free, return the matching worker
205 return worker
7c0ba920
JB
206 }
207 }
208 return false
209 }
210
a76fac14 211 /** @inheritDoc */
280c2a77
S
212 public execute (data: Data): Promise<Response> {
213 // Configure worker to handle message with the specified task
214 const worker = this.chooseWorker()
280c2a77
S
215 const messageId = ++this.nextMessageId
216 const res = this.internalExecute(worker, messageId)
14916bf9 217 this.checkAndEmitBusy()
cf597bc5
JB
218 data = data ?? ({} as Data)
219 this.sendToWorker(worker, { data, id: messageId })
280c2a77
S
220 return res
221 }
c97c7edb 222
a76fac14 223 /** @inheritDoc */
c97c7edb 224 public async destroy (): Promise<void> {
45dbbb14 225 await Promise.all(this.workers.map(worker => this.destroyWorker(worker)))
c97c7edb
S
226 }
227
4a6952ff 228 /**
675bb809 229 * Shutdowns given worker.
4a6952ff
JB
230 *
231 * @param worker A worker within `workers`.
232 */
233 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 234
729c563d 235 /**
280c2a77
S
236 * Setup hook that can be overridden by a Poolifier pool implementation
237 * to run code before workers are created in the abstract constructor.
729c563d 238 */
280c2a77
S
239 protected setupHook (): void {
240 // Can be overridden
241 }
c97c7edb 242
729c563d 243 /**
280c2a77
S
244 * Should return whether the worker is the main worker or not.
245 */
246 protected abstract isMain (): boolean
247
248 /**
bf9549ae
JB
249 * Hook executed before the worker task promise resolution.
250 * Can be overridden.
729c563d 251 *
bf9549ae 252 * @param worker The worker.
729c563d 253 */
bf9549ae
JB
254 protected beforePromiseWorkerResponseHook (worker: Worker): void {
255 this.increaseWorkerRunningTasks(worker)
c97c7edb
S
256 }
257
c01733f1 258 /**
bf9549ae
JB
259 * Hook executed after the worker task promise resolution.
260 * Can be overridden.
c01733f1 261 *
bf9549ae
JB
262 * @param message The received message.
263 * @param promise The Promise response.
c01733f1 264 */
bf9549ae
JB
265 protected afterPromiseWorkerResponseHook (
266 message: MessageValue<Response>,
267 promise: PromiseWorkerResponseWrapper<Worker, Response>
268 ): void {
269 this.decreaseWorkerRunningTasks(promise.worker)
270 this.stepWorkerRunTasks(promise.worker, 1)
271 this.updateWorkerTasksRunTime(promise.worker, message.taskRunTime)
c01733f1 272 }
273
729c563d
S
274 /**
275 * Removes the given worker from the pool.
276 *
277 * @param worker Worker that will be removed.
278 */
f2fdaa86
JB
279 protected removeWorker (worker: Worker): void {
280 // Clean worker from data structure
bdaf31cd 281 this.workers.splice(this.getWorkerIndex(worker), 1)
10fcfaf4 282 this.removeWorkerTasksUsage(worker)
f2fdaa86
JB
283 }
284
280c2a77 285 /**
675bb809 286 * Chooses a worker for the next task.
280c2a77
S
287 *
288 * The default implementation uses a round robin algorithm to distribute the load.
289 *
290 * @returns Worker.
291 */
292 protected chooseWorker (): Worker {
a35560ba 293 return this.workerChoiceStrategyContext.execute()
c97c7edb
S
294 }
295
280c2a77 296 /**
675bb809 297 * Sends a message to the given worker.
280c2a77
S
298 *
299 * @param worker The worker which should receive the message.
300 * @param message The message.
301 */
302 protected abstract sendToWorker (
303 worker: Worker,
304 message: MessageValue<Data>
305 ): void
306
4a6952ff 307 /**
bdede008 308 * Registers a listener callback on a given worker.
4a6952ff
JB
309 *
310 * @param worker A worker.
311 * @param listener A message listener callback.
312 */
313 protected abstract registerWorkerMessageListener<
4f7fa42a
S
314 Message extends Data | Response
315 > (worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 316
280c2a77
S
317 protected internalExecute (
318 worker: Worker,
319 messageId: number
320 ): Promise<Response> {
bf9549ae 321 this.beforePromiseWorkerResponseHook(worker)
be0676b3
APA
322 return new Promise<Response>((resolve, reject) => {
323 this.promiseMap.set(messageId, { resolve, reject, worker })
c97c7edb
S
324 })
325 }
326
729c563d
S
327 /**
328 * Returns a newly created worker.
329 */
280c2a77 330 protected abstract createWorker (): Worker
c97c7edb 331
729c563d
S
332 /**
333 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
334 *
335 * Can be used to update the `maxListeners` or binding the `main-worker`<->`worker` connection if not bind by default.
336 *
337 * @param worker The newly created worker.
338 */
280c2a77 339 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 340
4a6952ff
JB
341 /**
342 * Creates a new worker for this pool and sets it up completely.
343 *
344 * @returns New, completely set up worker.
345 */
346 protected createAndSetupWorker (): Worker {
bdacc2d2 347 const worker = this.createWorker()
280c2a77 348
35cf1c03 349 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
350 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
351 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
352 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
45dbbb14 353 worker.once('exit', () => this.removeWorker(worker))
280c2a77 354
c97c7edb 355 this.workers.push(worker)
280c2a77 356
bf9549ae
JB
357 // Init worker tasks usage map
358 this.workersTasksUsage.set(worker, {
359 run: 0,
360 running: 0,
361 runTime: 0,
362 avgRunTime: 0
363 })
280c2a77
S
364
365 this.afterWorkerSetup(worker)
366
c97c7edb
S
367 return worker
368 }
be0676b3
APA
369
370 /**
371 * This function is the listener registered for each worker.
372 *
bdacc2d2 373 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
374 */
375 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 376 return message => {
bdacc2d2
JB
377 if (message.id !== undefined) {
378 const promise = this.promiseMap.get(message.id)
379 if (promise !== undefined) {
bf9549ae 380 this.afterPromiseWorkerResponseHook(message, promise)
bdacc2d2
JB
381 if (message.error) promise.reject(message.error)
382 else promise.resolve(message.data as Response)
be0676b3
APA
383 this.promiseMap.delete(message.id)
384 }
385 }
386 }
be0676b3 387 }
7c0ba920
JB
388
389 private checkAndEmitBusy (): void {
390 if (this.opts.enableEvents && this.busy) {
391 this.emitter?.emit('busy')
392 }
393 }
bf9549ae
JB
394
395 /**
10fcfaf4 396 * Increases the number of tasks that the given worker has applied.
bf9549ae
JB
397 *
398 * @param worker Worker which running tasks is increased.
399 */
400 private increaseWorkerRunningTasks (worker: Worker): void {
401 this.stepWorkerRunningTasks(worker, 1)
402 }
403
404 /**
10fcfaf4 405 * Decreases the number of tasks that the given worker has applied.
bf9549ae
JB
406 *
407 * @param worker Worker which running tasks is decreased.
408 */
409 private decreaseWorkerRunningTasks (worker: Worker): void {
410 this.stepWorkerRunningTasks(worker, -1)
411 }
412
413 /**
10fcfaf4 414 * Steps the number of tasks that the given worker has applied.
bf9549ae
JB
415 *
416 * @param worker Worker which running tasks are stepped.
417 * @param step Number of running tasks step.
418 */
419 private stepWorkerRunningTasks (worker: Worker, step: number): void {
420 const tasksUsage = this.workersTasksUsage.get(worker)
421 if (tasksUsage !== undefined) {
422 tasksUsage.running = tasksUsage.running + step
423 this.workersTasksUsage.set(worker, tasksUsage)
424 } else {
425 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
426 }
427 }
428
429 /**
10fcfaf4 430 * Steps the number of tasks that the given worker has run.
bf9549ae
JB
431 *
432 * @param worker Worker which has run tasks.
433 * @param step Number of run tasks step.
434 */
10fcfaf4 435 private stepWorkerRunTasks (worker: Worker, step: number): void {
bf9549ae
JB
436 const tasksUsage = this.workersTasksUsage.get(worker)
437 if (tasksUsage !== undefined) {
438 tasksUsage.run = tasksUsage.run + step
439 this.workersTasksUsage.set(worker, tasksUsage)
440 } else {
441 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
442 }
443 }
444
445 /**
23135a89 446 * Updates tasks runtime for the given worker.
bf9549ae
JB
447 *
448 * @param worker Worker which run the task.
23135a89 449 * @param taskRunTime Worker task runtime.
bf9549ae
JB
450 */
451 private updateWorkerTasksRunTime (
452 worker: Worker,
453 taskRunTime: number | undefined
10fcfaf4
JB
454 ): void {
455 if (
456 this.workerChoiceStrategyContext.getWorkerChoiceStrategy()
457 .requiredStatistics.runTime === true
458 ) {
459 const tasksUsage = this.workersTasksUsage.get(worker)
675bb809 460 if (tasksUsage !== undefined) {
10fcfaf4 461 tasksUsage.runTime += taskRunTime ?? 0
675bb809
JB
462 if (tasksUsage.run !== 0) {
463 tasksUsage.avgRunTime = tasksUsage.runTime / tasksUsage.run
464 }
10fcfaf4
JB
465 this.workersTasksUsage.set(worker, tasksUsage)
466 } else {
467 throw new Error(WORKER_NOT_FOUND_TASKS_USAGE_MAP)
468 }
bf9549ae
JB
469 }
470 }
471
472 /**
10fcfaf4 473 * Removes worker tasks usage statistics.
bf9549ae
JB
474 *
475 * @param worker The worker.
476 */
10fcfaf4 477 private removeWorkerTasksUsage (worker: Worker): void {
bf9549ae
JB
478 this.workersTasksUsage.delete(worker)
479 }
c97c7edb 480}