refactor: switch Date.now() -> performance.now() where appropriate
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
2740a743 2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
78099a15 3import { EMPTY_FUNCTION, median } from '../utils'
34a0cfab 4import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
aee46736 5import { PoolEvents, type PoolOptions } from './pool'
b4904890 6import { PoolEmitter } from './pool'
ffcbbad8 7import type { IPoolInternal, TasksUsage, WorkerType } from './pool-internal'
b4904890 8import { PoolType } from './pool-internal'
ea7a90d3 9import type { IPoolWorker } from './pool-worker'
a35560ba
S
10import {
11 WorkerChoiceStrategies,
63220255 12 type WorkerChoiceStrategy
bdaf31cd
JB
13} from './selection-strategies/selection-strategies-types'
14import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
78099a15 15import { CircularArray } from '../circular-array'
c97c7edb 16
729c563d 17/**
ea7a90d3 18 * Base class that implements some shared logic for all poolifier pools.
729c563d 19 *
38e795c1
JB
20 * @typeParam Worker - Type of worker which manages this pool.
21 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
22 * @typeParam Response - Type of response of execution. This can only be serializable data.
729c563d 23 */
c97c7edb 24export abstract class AbstractPool<
ea7a90d3 25 Worker extends IPoolWorker,
d3c8a1a8
S
26 Data = unknown,
27 Response = unknown
9b2fdd9f 28> implements IPoolInternal<Worker, Data, Response> {
afc003b2 29 /** @inheritDoc */
e65c6cd9 30 public readonly workers: Array<WorkerType<Worker>> = []
4a6952ff 31
afc003b2 32 /** @inheritDoc */
7c0ba920
JB
33 public readonly emitter?: PoolEmitter
34
be0676b3 35 /**
2740a743 36 * The promise response map.
be0676b3 37 *
2740a743 38 * - `key`: The message id of each submitted task.
c923ce56 39 * - `value`: An object that contains the worker, the promise resolve and reject callbacks.
be0676b3 40 *
2740a743 41 * When we receive a message from the worker we get a map entry with the promise resolve/reject bound to the message.
be0676b3 42 */
c923ce56
JB
43 protected promiseResponseMap: Map<
44 string,
45 PromiseResponseWrapper<Worker, Response>
46 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 47
a35560ba 48 /**
51fe3d3c 49 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 50 *
51fe3d3c 51 * Default to a round robin algorithm.
a35560ba
S
52 */
53 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
54 Worker,
55 Data,
56 Response
a35560ba
S
57 >
58
729c563d
S
59 /**
60 * Constructs a new poolifier pool.
61 *
38e795c1
JB
62 * @param numberOfWorkers - Number of workers that this pool should manage.
63 * @param filePath - Path to the worker-file.
64 * @param opts - Options for the pool.
729c563d 65 */
c97c7edb 66 public constructor (
5c5a1fb7 67 public readonly numberOfWorkers: number,
c97c7edb 68 public readonly filePath: string,
1927ee67 69 public readonly opts: PoolOptions<Worker>
c97c7edb 70 ) {
78cea37e 71 if (!this.isMain()) {
c97c7edb
S
72 throw new Error('Cannot start a pool from a worker!')
73 }
8d3782fa 74 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 75 this.checkFilePath(this.filePath)
7c0ba920 76 this.checkPoolOptions(this.opts)
1086026a
JB
77
78 this.chooseWorker.bind(this)
79 this.internalExecute.bind(this)
164d950a 80 this.checkAndEmitFull.bind(this)
1086026a
JB
81 this.checkAndEmitBusy.bind(this)
82 this.sendToWorker.bind(this)
83
c97c7edb
S
84 this.setupHook()
85
5c5a1fb7 86 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 87 this.createAndSetupWorker()
c97c7edb
S
88 }
89
6bd72cd0 90 if (this.opts.enableEvents === true) {
7c0ba920
JB
91 this.emitter = new PoolEmitter()
92 }
d59df138
JB
93 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
94 Worker,
95 Data,
96 Response
17393ac8 97 >(this, this.opts.workerChoiceStrategy)
c97c7edb
S
98 }
99
a35560ba 100 private checkFilePath (filePath: string): void {
ffcbbad8
JB
101 if (
102 filePath == null ||
103 (typeof filePath === 'string' && filePath.trim().length === 0)
104 ) {
c510fea7
APA
105 throw new Error('Please specify a file with a worker implementation')
106 }
107 }
108
8d3782fa
JB
109 private checkNumberOfWorkers (numberOfWorkers: number): void {
110 if (numberOfWorkers == null) {
111 throw new Error(
112 'Cannot instantiate a pool without specifying the number of workers'
113 )
78cea37e 114 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 115 throw new TypeError(
8d3782fa
JB
116 'Cannot instantiate a pool with a non integer number of workers'
117 )
118 } else if (numberOfWorkers < 0) {
473c717a 119 throw new RangeError(
8d3782fa
JB
120 'Cannot instantiate a pool with a negative number of workers'
121 )
7c0ba920 122 } else if (this.type === PoolType.FIXED && numberOfWorkers === 0) {
8d3782fa
JB
123 throw new Error('Cannot instantiate a fixed pool with no worker')
124 }
125 }
126
7c0ba920 127 private checkPoolOptions (opts: PoolOptions<Worker>): void {
e843b904
JB
128 this.opts.workerChoiceStrategy =
129 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
aee46736
JB
130 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
131 this.opts.enableEvents = opts.enableEvents ?? true
132 }
133
134 private checkValidWorkerChoiceStrategy (
135 workerChoiceStrategy: WorkerChoiceStrategy
136 ): void {
137 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 138 throw new Error(
aee46736 139 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
140 )
141 }
7c0ba920
JB
142 }
143
afc003b2 144 /** @inheritDoc */
7c0ba920
JB
145 public abstract get type (): PoolType
146
c2ade475 147 /**
51fe3d3c 148 * Number of tasks concurrently running in the pool.
c2ade475
JB
149 */
150 private get numberOfRunningTasks (): number {
2740a743 151 return this.promiseResponseMap.size
a35560ba
S
152 }
153
ffcbbad8 154 /**
b4e75778 155 * Gets the given worker key.
ffcbbad8
JB
156 *
157 * @param worker - The worker.
7cf00f70 158 * @returns The worker key if the worker is found in the pool, `-1` otherwise.
ffcbbad8 159 */
e65c6cd9
JB
160 private getWorkerKey (worker: Worker): number {
161 return this.workers.findIndex(workerItem => workerItem.worker === worker)
bf9549ae
JB
162 }
163
afc003b2 164 /** @inheritDoc */
a35560ba
S
165 public setWorkerChoiceStrategy (
166 workerChoiceStrategy: WorkerChoiceStrategy
167 ): void {
aee46736 168 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 169 this.opts.workerChoiceStrategy = workerChoiceStrategy
c923ce56
JB
170 for (const [index, workerItem] of this.workers.entries()) {
171 this.setWorker(index, workerItem.worker, {
ffcbbad8
JB
172 run: 0,
173 running: 0,
174 runTime: 0,
78099a15 175 runTimeHistory: new CircularArray(),
2740a743 176 avgRunTime: 0,
78099a15 177 medRunTime: 0,
2740a743 178 error: 0
ffcbbad8 179 })
ea7a90d3 180 }
a35560ba
S
181 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
182 workerChoiceStrategy
183 )
184 }
185
afc003b2 186 /** @inheritDoc */
c2ade475
JB
187 public abstract get full (): boolean
188
afc003b2 189 /** @inheritDoc */
7c0ba920
JB
190 public abstract get busy (): boolean
191
c2ade475 192 protected internalBusy (): boolean {
7c0ba920
JB
193 return (
194 this.numberOfRunningTasks >= this.numberOfWorkers &&
bf90656c 195 this.findFreeWorkerKey() === -1
7c0ba920
JB
196 )
197 }
198
afc003b2 199 /** @inheritDoc */
bf90656c
JB
200 public findFreeWorkerKey (): number {
201 return this.workers.findIndex(workerItem => {
c923ce56
JB
202 return workerItem.tasksUsage.running === 0
203 })
7c0ba920
JB
204 }
205
afc003b2 206 /** @inheritDoc */
78cea37e 207 public async execute (data: Data): Promise<Response> {
c923ce56 208 const [workerKey, worker] = this.chooseWorker()
b4e75778 209 const messageId = crypto.randomUUID()
c923ce56 210 const res = this.internalExecute(workerKey, worker, messageId)
164d950a 211 this.checkAndEmitFull()
14916bf9 212 this.checkAndEmitBusy()
a05c10de 213 this.sendToWorker(worker, {
e5a5c0fc
JB
214 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
215 data: data ?? ({} as Data),
b4e75778 216 id: messageId
a05c10de 217 })
78cea37e 218 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
219 return res
220 }
c97c7edb 221
afc003b2 222 /** @inheritDoc */
c97c7edb 223 public async destroy (): Promise<void> {
1fbcaa7c 224 await Promise.all(
e65c6cd9
JB
225 this.workers.map(async workerItem => {
226 await this.destroyWorker(workerItem.worker)
1fbcaa7c
JB
227 })
228 )
c97c7edb
S
229 }
230
4a6952ff 231 /**
afc003b2 232 * Shutdowns given worker in the pool.
4a6952ff 233 *
38e795c1 234 * @param worker - A worker within `workers`.
4a6952ff
JB
235 */
236 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 237
729c563d 238 /**
280c2a77
S
239 * Setup hook that can be overridden by a Poolifier pool implementation
240 * to run code before workers are created in the abstract constructor.
d99ba5a8 241 * Can be overridden
afc003b2
JB
242 *
243 * @virtual
729c563d 244 */
280c2a77 245 protected setupHook (): void {
d99ba5a8 246 // Intentionally empty
280c2a77 247 }
c97c7edb 248
729c563d 249 /**
280c2a77
S
250 * Should return whether the worker is the main worker or not.
251 */
252 protected abstract isMain (): boolean
253
254 /**
bf9549ae
JB
255 * Hook executed before the worker task promise resolution.
256 * Can be overridden.
729c563d 257 *
2740a743 258 * @param workerKey - The worker key.
729c563d 259 */
2740a743
JB
260 protected beforePromiseResponseHook (workerKey: number): void {
261 ++this.workers[workerKey].tasksUsage.running
c97c7edb
S
262 }
263
c01733f1 264 /**
bf9549ae
JB
265 * Hook executed after the worker task promise resolution.
266 * Can be overridden.
c01733f1 267 *
c923ce56 268 * @param worker - The worker.
38e795c1 269 * @param message - The received message.
c01733f1 270 */
2740a743 271 protected afterPromiseResponseHook (
c923ce56 272 worker: Worker,
2740a743 273 message: MessageValue<Response>
bf9549ae 274 ): void {
c923ce56 275 const workerTasksUsage = this.getWorkerTasksUsage(worker) as TasksUsage
3032893a
JB
276 --workerTasksUsage.running
277 ++workerTasksUsage.run
2740a743
JB
278 if (message.error != null) {
279 ++workerTasksUsage.error
280 }
97a2abc3 281 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 282 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
283 if (
284 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
285 workerTasksUsage.run !== 0
286 ) {
3032893a
JB
287 workerTasksUsage.avgRunTime =
288 workerTasksUsage.runTime / workerTasksUsage.run
289 }
78099a15
JB
290 if (this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime) {
291 workerTasksUsage.runTimeHistory.push(message.runTime ?? 0)
292 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
293 }
3032893a 294 }
c01733f1 295 }
296
280c2a77 297 /**
675bb809 298 * Chooses a worker for the next task.
280c2a77 299 *
51fe3d3c 300 * The default uses a round robin algorithm to distribute the load.
280c2a77 301 *
c923ce56 302 * @returns [worker key, worker].
280c2a77 303 */
c923ce56 304 protected chooseWorker (): [number, Worker] {
17393ac8
JB
305 let workerKey: number
306 if (
307 this.type === PoolType.DYNAMIC &&
308 !this.full &&
309 this.findFreeWorkerKey() === -1
310 ) {
311 const createdWorker = this.createAndSetupWorker()
312 this.registerWorkerMessageListener(createdWorker, message => {
313 if (
314 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13
JB
315 (message.kill != null &&
316 this.getWorkerTasksUsage(createdWorker)?.running === 0)
17393ac8 317 ) {
aee46736 318 // Kill message received from the worker, means that no new tasks are submitted to that worker for a while ( > maxInactiveTime)
17393ac8
JB
319 void this.destroyWorker(createdWorker)
320 }
321 })
322 workerKey = this.getWorkerKey(createdWorker)
323 } else {
324 workerKey = this.workerChoiceStrategyContext.execute()
325 }
c923ce56 326 return [workerKey, this.workers[workerKey].worker]
c97c7edb
S
327 }
328
280c2a77 329 /**
675bb809 330 * Sends a message to the given worker.
280c2a77 331 *
38e795c1
JB
332 * @param worker - The worker which should receive the message.
333 * @param message - The message.
280c2a77
S
334 */
335 protected abstract sendToWorker (
336 worker: Worker,
337 message: MessageValue<Data>
338 ): void
339
4a6952ff 340 /**
bdede008 341 * Registers a listener callback on a given worker.
4a6952ff 342 *
38e795c1
JB
343 * @param worker - The worker which should register a listener.
344 * @param listener - The message listener callback.
4a6952ff
JB
345 */
346 protected abstract registerWorkerMessageListener<
4f7fa42a 347 Message extends Data | Response
78cea37e 348 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 349
729c563d
S
350 /**
351 * Returns a newly created worker.
352 */
280c2a77 353 protected abstract createWorker (): Worker
c97c7edb 354
729c563d
S
355 /**
356 * Function that can be hooked up when a worker has been newly created and moved to the workers registry.
357 *
38e795c1 358 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 359 *
38e795c1 360 * @param worker - The newly created worker.
afc003b2 361 * @virtual
729c563d 362 */
280c2a77 363 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 364
4a6952ff
JB
365 /**
366 * Creates a new worker for this pool and sets it up completely.
367 *
368 * @returns New, completely set up worker.
369 */
370 protected createAndSetupWorker (): Worker {
bdacc2d2 371 const worker = this.createWorker()
280c2a77 372
35cf1c03 373 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba
S
374 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
375 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
376 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6
JB
377 worker.once('exit', () => {
378 this.removeWorker(worker)
379 })
280c2a77 380
c923ce56 381 this.pushWorker(worker, {
ffcbbad8
JB
382 run: 0,
383 running: 0,
384 runTime: 0,
78099a15 385 runTimeHistory: new CircularArray(),
2740a743 386 avgRunTime: 0,
78099a15 387 medRunTime: 0,
2740a743 388 error: 0
ffcbbad8 389 })
280c2a77
S
390
391 this.afterWorkerSetup(worker)
392
c97c7edb
S
393 return worker
394 }
be0676b3
APA
395
396 /**
397 * This function is the listener registered for each worker.
398 *
bdacc2d2 399 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
400 */
401 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 402 return message => {
b1989cfd 403 if (message.id != null) {
aee46736 404 // Task response received
2740a743 405 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 406 if (promiseResponse != null) {
78cea37e 407 if (message.error != null) {
2740a743 408 promiseResponse.reject(message.error)
a05c10de 409 } else {
2740a743 410 promiseResponse.resolve(message.data as Response)
a05c10de 411 }
c923ce56 412 this.afterPromiseResponseHook(promiseResponse.worker, message)
2740a743 413 this.promiseResponseMap.delete(message.id)
be0676b3
APA
414 }
415 }
416 }
be0676b3 417 }
7c0ba920 418
78cea37e 419 private async internalExecute (
2740a743 420 workerKey: number,
c923ce56 421 worker: Worker,
b4e75778 422 messageId: string
78cea37e 423 ): Promise<Response> {
2740a743 424 this.beforePromiseResponseHook(workerKey)
78cea37e 425 return await new Promise<Response>((resolve, reject) => {
c923ce56 426 this.promiseResponseMap.set(messageId, { resolve, reject, worker })
78cea37e
JB
427 })
428 }
429
7c0ba920 430 private checkAndEmitBusy (): void {
78cea37e 431 if (this.opts.enableEvents === true && this.busy) {
aee46736 432 this.emitter?.emit(PoolEvents.busy)
7c0ba920
JB
433 }
434 }
bf9549ae 435
164d950a
JB
436 private checkAndEmitFull (): void {
437 if (
438 this.type === PoolType.DYNAMIC &&
439 this.opts.enableEvents === true &&
440 this.full
441 ) {
aee46736 442 this.emitter?.emit(PoolEvents.full)
164d950a
JB
443 }
444 }
445
c923ce56 446 /**
afc003b2 447 * Gets the given worker tasks usage in the pool.
c923ce56
JB
448 *
449 * @param worker - The worker.
450 * @returns The worker tasks usage.
451 */
452 private getWorkerTasksUsage (worker: Worker): TasksUsage | undefined {
3032893a 453 const workerKey = this.getWorkerKey(worker)
e65c6cd9
JB
454 if (workerKey !== -1) {
455 return this.workers[workerKey].tasksUsage
ffcbbad8 456 }
3032893a 457 throw new Error('Worker could not be found in the pool')
a05c10de
JB
458 }
459
460 /**
51fe3d3c 461 * Pushes the given worker in the pool.
ea7a90d3 462 *
38e795c1 463 * @param worker - The worker.
ffcbbad8 464 * @param tasksUsage - The worker tasks usage.
ea7a90d3 465 */
c923ce56 466 private pushWorker (worker: Worker, tasksUsage: TasksUsage): void {
e65c6cd9 467 this.workers.push({
ffcbbad8
JB
468 worker,
469 tasksUsage
ea7a90d3
JB
470 })
471 }
c923ce56
JB
472
473 /**
51fe3d3c 474 * Sets the given worker in the pool.
c923ce56
JB
475 *
476 * @param workerKey - The worker key.
477 * @param worker - The worker.
478 * @param tasksUsage - The worker tasks usage.
479 */
480 private setWorker (
481 workerKey: number,
482 worker: Worker,
483 tasksUsage: TasksUsage
484 ): void {
485 this.workers[workerKey] = {
486 worker,
487 tasksUsage
488 }
489 }
51fe3d3c
JB
490
491 /**
492 * Removes the given worker from the pool.
493 *
494 * @param worker - The worker that will be removed.
495 */
496 protected removeWorker (worker: Worker): void {
497 const workerKey = this.getWorkerKey(worker)
498 this.workers.splice(workerKey, 1)
499 this.workerChoiceStrategyContext.remove(workerKey)
500 }
c97c7edb 501}