refactor: limit pool internals public exposure
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
2740a743 2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
3import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
0d80593b 6 isPlainObject,
bbeadd16
JB
7 median
8} from '../utils'
34a0cfab 9import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 10import { CircularArray } from '../circular-array'
29ee7e9a 11import { Queue } from '../queue'
c4855468 12import {
65d7a1c9 13 type IPool,
7c5a1080 14 PoolEmitter,
c4855468 15 PoolEvents,
6b27d407 16 type PoolInfo,
c4855468 17 type PoolOptions,
6b27d407
JB
18 type PoolType,
19 PoolTypes,
184855e6
JB
20 type TasksQueueOptions,
21 type WorkerType
c4855468 22} from './pool'
f06e48d8 23import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
24import {
25 WorkerChoiceStrategies,
a20f0ba5
JB
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
bdaf31cd
JB
28} from './selection-strategies/selection-strategies-types'
29import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 30
729c563d 31/**
ea7a90d3 32 * Base class that implements some shared logic for all poolifier pools.
729c563d 33 *
38e795c1
JB
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 36 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 37 */
c97c7edb 38export abstract class AbstractPool<
f06e48d8 39 Worker extends IWorker,
d3c8a1a8
S
40 Data = unknown,
41 Response = unknown
c4855468 42> implements IPool<Worker, Data, Response> {
afc003b2 43 /** @inheritDoc */
f06e48d8 44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 45
afc003b2 46 /** @inheritDoc */
7c0ba920
JB
47 public readonly emitter?: PoolEmitter
48
be0676b3 49 /**
a3445496 50 * The execution response promise map.
be0676b3 51 *
2740a743 52 * - `key`: The message id of each submitted task.
a3445496 53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 54 *
a3445496 55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 56 */
c923ce56
JB
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 61
a35560ba 62 /**
51fe3d3c 63 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 64 *
51fe3d3c 65 * Default to a round robin algorithm.
a35560ba
S
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
68 Worker,
69 Data,
70 Response
a35560ba
S
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
38e795c1 76 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 77 * @param filePath - Path to the worker file.
38e795c1 78 * @param opts - Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
b4213b7f
JB
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
78cea37e 85 if (!this.isMain()) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
1086026a 91
7254e419
JB
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 96
c97c7edb
S
97 this.setupHook()
98
5c5a1fb7 99 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 100 this.createAndSetupWorker()
c97c7edb
S
101 }
102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
c97c7edb
S
115 }
116
a35560ba 117 private checkFilePath (filePath: string): void {
ffcbbad8
JB
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
c510fea7
APA
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
78cea37e 131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 132 throw new TypeError(
0d80593b 133 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
134 )
135 } else if (numberOfWorkers < 0) {
473c717a 136 throw new RangeError(
8d3782fa
JB
137 'Cannot instantiate a pool with a negative number of workers'
138 )
6b27d407 139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
1f68cede 155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 168 }
aee46736
JB
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 175 throw new Error(
aee46736 176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
177 )
178 }
7c0ba920
JB
179 }
180
0d80593b
JB
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
49be33fe
JB
189 if (
190 workerChoiceStrategyOptions.weights != null &&
6b27d407 191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
0d80593b
JB
197 }
198
a20f0ba5
JB
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
0d80593b
JB
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
a20f0ba5
JB
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
08f3f44c 214 /** @inheritDoc */
6b27d407
JB
215 public get info (): PoolInfo {
216 return {
217 type: this.type,
184855e6 218 worker: this.worker,
6b27d407
JB
219 minSize: this.minSize,
220 maxSize: this.maxSize,
221 workerNodes: this.workerNodes.length,
222 idleWorkerNodes: this.workerNodes.reduce(
223 (accumulator, workerNode) =>
224 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
225 0
226 ),
227 busyWorkerNodes: this.workerNodes.reduce(
228 (accumulator, workerNode) =>
229 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
230 0
231 ),
232 runningTasks: this.workerNodes.reduce(
233 (accumulator, workerNode) =>
234 accumulator + workerNode.tasksUsage.running,
235 0
236 ),
237 queuedTasks: this.workerNodes.reduce(
238 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
239 0
240 ),
241 maxQueuedTasks: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
243 accumulator + workerNode.tasksQueue.maxSize,
244 0
245 )
246 }
247 }
08f3f44c 248
8881ae32
JB
249 /**
250 * Pool type.
251 *
252 * If it is `'dynamic'`, it provides the `max` property.
253 */
254 protected abstract get type (): PoolType
255
184855e6
JB
256 /**
257 * Gets the worker type.
258 */
259 protected abstract get worker (): WorkerType
260
c2ade475 261 /**
6b27d407 262 * Pool minimum size.
c2ade475 263 */
6b27d407 264 protected abstract get minSize (): number
ff733df7
JB
265
266 /**
6b27d407 267 * Pool maximum size.
ff733df7 268 */
6b27d407 269 protected abstract get maxSize (): number
a35560ba 270
ffcbbad8 271 /**
f06e48d8 272 * Gets the given worker its worker node key.
ffcbbad8
JB
273 *
274 * @param worker - The worker.
f06e48d8 275 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 276 */
f06e48d8
JB
277 private getWorkerNodeKey (worker: Worker): number {
278 return this.workerNodes.findIndex(
279 workerNode => workerNode.worker === worker
280 )
bf9549ae
JB
281 }
282
afc003b2 283 /** @inheritDoc */
a35560ba 284 public setWorkerChoiceStrategy (
59219cbb
JB
285 workerChoiceStrategy: WorkerChoiceStrategy,
286 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 287 ): void {
aee46736 288 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 289 this.opts.workerChoiceStrategy = workerChoiceStrategy
0ebe2a9f 290 for (const workerNode of this.workerNodes) {
f82cd357
JB
291 this.setWorkerNodeTasksUsage(workerNode, {
292 run: 0,
293 running: 0,
294 runTime: 0,
295 runTimeHistory: new CircularArray(),
296 avgRunTime: 0,
297 medRunTime: 0,
0567595a
JB
298 waitTime: 0,
299 waitTimeHistory: new CircularArray(),
300 avgWaitTime: 0,
301 medWaitTime: 0,
f82cd357
JB
302 error: 0
303 })
ea7a90d3 304 }
a35560ba 305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
a20f0ba5
JB
306 this.opts.workerChoiceStrategy
307 )
59219cbb
JB
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
a20f0ba5
JB
311 }
312
313 /** @inheritDoc */
314 public setWorkerChoiceStrategyOptions (
315 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
316 ): void {
0d80593b 317 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
318 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
319 this.workerChoiceStrategyContext.setOptions(
320 this.opts.workerChoiceStrategyOptions
a35560ba
S
321 )
322 }
323
a20f0ba5 324 /** @inheritDoc */
8f52842f
JB
325 public enableTasksQueue (
326 enable: boolean,
327 tasksQueueOptions?: TasksQueueOptions
328 ): void {
a20f0ba5 329 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 330 this.flushTasksQueues()
a20f0ba5
JB
331 }
332 this.opts.enableTasksQueue = enable
8f52842f 333 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
334 }
335
336 /** @inheritDoc */
8f52842f 337 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 338 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
339 this.checkValidTasksQueueOptions(tasksQueueOptions)
340 this.opts.tasksQueueOptions =
341 this.buildTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
342 } else {
343 delete this.opts.tasksQueueOptions
344 }
345 }
346
347 private buildTasksQueueOptions (
348 tasksQueueOptions: TasksQueueOptions
349 ): TasksQueueOptions {
350 return {
351 concurrency: tasksQueueOptions?.concurrency ?? 1
352 }
353 }
354
c319c66b
JB
355 /**
356 * Whether the pool is full or not.
357 *
358 * The pool filling boolean status.
359 */
360 protected abstract get full (): boolean
c2ade475 361
c319c66b
JB
362 /**
363 * Whether the pool is busy or not.
364 *
365 * The pool busyness boolean status.
366 */
367 protected abstract get busy (): boolean
7c0ba920 368
c2ade475 369 protected internalBusy (): boolean {
e0ae6100
JB
370 return (
371 this.workerNodes.findIndex(workerNode => {
a4958de2 372 return workerNode.tasksUsage.running === 0
e0ae6100
JB
373 }) === -1
374 )
cb70b19d
JB
375 }
376
afc003b2 377 /** @inheritDoc */
a86b6df1 378 public async execute (data?: Data, name?: string): Promise<Response> {
0567595a 379 const submissionTimestamp = performance.now()
20dcad1a 380 const workerNodeKey = this.chooseWorkerNode()
adc3c320 381 const submittedTask: Task<Data> = {
a86b6df1 382 name,
e5a5c0fc
JB
383 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
384 data: data ?? ({} as Data),
0567595a 385 submissionTimestamp,
adc3c320
JB
386 id: crypto.randomUUID()
387 }
2e81254d 388 const res = new Promise<Response>((resolve, reject) => {
02706357 389 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
390 resolve,
391 reject,
20dcad1a 392 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
393 })
394 })
ff733df7
JB
395 if (
396 this.opts.enableTasksQueue === true &&
7171d33f 397 (this.busy ||
3528c992 398 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 399 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 400 .concurrency as number))
ff733df7 401 ) {
26a929d7
JB
402 this.enqueueTask(workerNodeKey, submittedTask)
403 } else {
2e81254d 404 this.executeTask(workerNodeKey, submittedTask)
adc3c320 405 }
b0d6ed8f 406 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 407 this.checkAndEmitEvents()
78cea37e 408 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
409 return res
410 }
c97c7edb 411
afc003b2 412 /** @inheritDoc */
c97c7edb 413 public async destroy (): Promise<void> {
1fbcaa7c 414 await Promise.all(
875a7c37
JB
415 this.workerNodes.map(async (workerNode, workerNodeKey) => {
416 this.flushTasksQueue(workerNodeKey)
47aacbaa 417 // FIXME: wait for tasks to be finished
f06e48d8 418 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
419 })
420 )
c97c7edb
S
421 }
422
4a6952ff 423 /**
f06e48d8 424 * Shutdowns the given worker.
4a6952ff 425 *
f06e48d8 426 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
427 */
428 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 429
729c563d 430 /**
2e81254d 431 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 432 * Can be overridden
afc003b2
JB
433 *
434 * @virtual
729c563d 435 */
280c2a77 436 protected setupHook (): void {
d99ba5a8 437 // Intentionally empty
280c2a77 438 }
c97c7edb 439
729c563d 440 /**
280c2a77
S
441 * Should return whether the worker is the main worker or not.
442 */
443 protected abstract isMain (): boolean
444
445 /**
2e81254d 446 * Hook executed before the worker task execution.
bf9549ae 447 * Can be overridden.
729c563d 448 *
f06e48d8 449 * @param workerNodeKey - The worker node key.
729c563d 450 */
f20f344f 451 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 452 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
453 }
454
c01733f1 455 /**
2e81254d 456 * Hook executed after the worker task execution.
bf9549ae 457 * Can be overridden.
c01733f1 458 *
c923ce56 459 * @param worker - The worker.
38e795c1 460 * @param message - The received message.
c01733f1 461 */
2e81254d 462 protected afterTaskExecutionHook (
c923ce56 463 worker: Worker,
2740a743 464 message: MessageValue<Response>
bf9549ae 465 ): void {
f8eb0a2a
JB
466 const workerTasksUsage =
467 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a
JB
468 --workerTasksUsage.running
469 ++workerTasksUsage.run
2740a743
JB
470 if (message.error != null) {
471 ++workerTasksUsage.error
472 }
f8eb0a2a 473 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 474 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
475 }
476
477 private updateRunTimeTasksUsage (
478 workerTasksUsage: TasksUsage,
479 message: MessageValue<Response>
480 ): void {
97a2abc3 481 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 482 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
483 if (
484 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
485 workerTasksUsage.run !== 0
486 ) {
3032893a
JB
487 workerTasksUsage.avgRunTime =
488 workerTasksUsage.runTime / workerTasksUsage.run
489 }
3fa4cdd2
JB
490 if (
491 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
492 message.runTime != null
493 ) {
494 workerTasksUsage.runTimeHistory.push(message.runTime)
78099a15
JB
495 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
496 }
3032893a 497 }
f8eb0a2a
JB
498 }
499
74001280 500 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
501 workerTasksUsage: TasksUsage,
502 message: MessageValue<Response>
503 ): void {
09a6305f
JB
504 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
505 workerTasksUsage.waitTime += message.waitTime ?? 0
506 if (
507 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
508 workerTasksUsage.run !== 0
509 ) {
510 workerTasksUsage.avgWaitTime =
511 workerTasksUsage.waitTime / workerTasksUsage.run
512 }
513 if (
514 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
515 message.waitTime != null
516 ) {
517 workerTasksUsage.waitTimeHistory.push(message.waitTime)
518 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
519 }
0567595a 520 }
c01733f1 521 }
522
280c2a77 523 /**
f06e48d8 524 * Chooses a worker node for the next task.
280c2a77 525 *
20dcad1a 526 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 527 *
20dcad1a 528 * @returns The worker node key
280c2a77 529 */
20dcad1a 530 protected chooseWorkerNode (): number {
f06e48d8 531 let workerNodeKey: number
6b27d407 532 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
533 const workerCreated = this.createAndSetupWorker()
534 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 535 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
536 if (
537 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 538 (message.kill != null &&
a4958de2 539 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 540 ) {
ff733df7 541 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 542 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 543 // FIXME: wait for tasks to be finished
7c5a1080 544 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
545 }
546 })
adc3c320 547 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 548 } else {
f06e48d8 549 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 550 }
20dcad1a 551 return workerNodeKey
c97c7edb
S
552 }
553
280c2a77 554 /**
675bb809 555 * Sends a message to the given worker.
280c2a77 556 *
38e795c1
JB
557 * @param worker - The worker which should receive the message.
558 * @param message - The message.
280c2a77
S
559 */
560 protected abstract sendToWorker (
561 worker: Worker,
562 message: MessageValue<Data>
563 ): void
564
4a6952ff 565 /**
f06e48d8 566 * Registers a listener callback on the given worker.
4a6952ff 567 *
38e795c1
JB
568 * @param worker - The worker which should register a listener.
569 * @param listener - The message listener callback.
4a6952ff
JB
570 */
571 protected abstract registerWorkerMessageListener<
4f7fa42a 572 Message extends Data | Response
78cea37e 573 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 574
729c563d
S
575 /**
576 * Returns a newly created worker.
577 */
280c2a77 578 protected abstract createWorker (): Worker
c97c7edb 579
729c563d 580 /**
f06e48d8 581 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 582 *
38e795c1 583 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 584 *
38e795c1 585 * @param worker - The newly created worker.
729c563d 586 */
280c2a77 587 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 588
4a6952ff 589 /**
f06e48d8 590 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
591 *
592 * @returns New, completely set up worker.
593 */
594 protected createAndSetupWorker (): Worker {
bdacc2d2 595 const worker = this.createWorker()
280c2a77 596
35cf1c03 597 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 598 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
599 worker.on('error', error => {
600 if (this.emitter != null) {
601 this.emitter.emit(PoolEvents.error, error)
602 }
603 })
604 if (this.opts.restartWorkerOnError === true) {
605 worker.on('error', () => {
606 this.createAndSetupWorker()
607 })
608 }
a35560ba
S
609 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
610 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 611 worker.once('exit', () => {
f06e48d8 612 this.removeWorkerNode(worker)
a974afa6 613 })
280c2a77 614
f06e48d8 615 this.pushWorkerNode(worker)
280c2a77
S
616
617 this.afterWorkerSetup(worker)
618
c97c7edb
S
619 return worker
620 }
be0676b3
APA
621
622 /**
ff733df7 623 * This function is the listener registered for each worker message.
be0676b3 624 *
bdacc2d2 625 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
626 */
627 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 628 return message => {
b1989cfd 629 if (message.id != null) {
a3445496 630 // Task execution response received
2740a743 631 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 632 if (promiseResponse != null) {
78cea37e 633 if (message.error != null) {
2740a743 634 promiseResponse.reject(message.error)
91ee39ed
JB
635 if (this.emitter != null) {
636 this.emitter.emit(PoolEvents.taskError, {
637 error: message.error,
638 errorData: message.errorData
639 })
640 }
a05c10de 641 } else {
2740a743 642 promiseResponse.resolve(message.data as Response)
a05c10de 643 }
2e81254d 644 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 645 this.promiseResponseMap.delete(message.id)
ff733df7
JB
646 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
647 if (
648 this.opts.enableTasksQueue === true &&
416fd65c 649 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 650 ) {
2e81254d
JB
651 this.executeTask(
652 workerNodeKey,
ff733df7
JB
653 this.dequeueTask(workerNodeKey) as Task<Data>
654 )
655 }
be0676b3
APA
656 }
657 }
658 }
be0676b3 659 }
7c0ba920 660
ff733df7 661 private checkAndEmitEvents (): void {
1f68cede 662 if (this.emitter != null) {
ff733df7 663 if (this.busy) {
6b27d407 664 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 665 }
6b27d407
JB
666 if (this.type === PoolTypes.dynamic && this.full) {
667 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 668 }
164d950a
JB
669 }
670 }
671
0ebe2a9f
JB
672 /**
673 * Sets the given worker node its tasks usage in the pool.
674 *
675 * @param workerNode - The worker node.
676 * @param tasksUsage - The worker node tasks usage.
677 */
678 private setWorkerNodeTasksUsage (
679 workerNode: WorkerNode<Worker, Data>,
680 tasksUsage: TasksUsage
681 ): void {
682 workerNode.tasksUsage = tasksUsage
683 }
684
a05c10de 685 /**
f06e48d8 686 * Pushes the given worker in the pool worker nodes.
ea7a90d3 687 *
38e795c1 688 * @param worker - The worker.
f06e48d8 689 * @returns The worker nodes length.
ea7a90d3 690 */
f06e48d8
JB
691 private pushWorkerNode (worker: Worker): number {
692 return this.workerNodes.push({
ffcbbad8 693 worker,
f82cd357
JB
694 tasksUsage: {
695 run: 0,
696 running: 0,
697 runTime: 0,
698 runTimeHistory: new CircularArray(),
699 avgRunTime: 0,
700 medRunTime: 0,
0567595a
JB
701 waitTime: 0,
702 waitTimeHistory: new CircularArray(),
703 avgWaitTime: 0,
704 medWaitTime: 0,
f82cd357
JB
705 error: 0
706 },
29ee7e9a 707 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
708 })
709 }
c923ce56
JB
710
711 /**
f06e48d8 712 * Sets the given worker in the pool worker nodes.
c923ce56 713 *
f06e48d8 714 * @param workerNodeKey - The worker node key.
c923ce56
JB
715 * @param worker - The worker.
716 * @param tasksUsage - The worker tasks usage.
f06e48d8 717 * @param tasksQueue - The worker task queue.
c923ce56 718 */
f06e48d8
JB
719 private setWorkerNode (
720 workerNodeKey: number,
c923ce56 721 worker: Worker,
f06e48d8 722 tasksUsage: TasksUsage,
29ee7e9a 723 tasksQueue: Queue<Task<Data>>
c923ce56 724 ): void {
f06e48d8 725 this.workerNodes[workerNodeKey] = {
c923ce56 726 worker,
f06e48d8
JB
727 tasksUsage,
728 tasksQueue
c923ce56
JB
729 }
730 }
51fe3d3c
JB
731
732 /**
f06e48d8 733 * Removes the given worker from the pool worker nodes.
51fe3d3c 734 *
f06e48d8 735 * @param worker - The worker.
51fe3d3c 736 */
416fd65c 737 private removeWorkerNode (worker: Worker): void {
f06e48d8 738 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
739 if (workerNodeKey !== -1) {
740 this.workerNodes.splice(workerNodeKey, 1)
741 this.workerChoiceStrategyContext.remove(workerNodeKey)
742 }
51fe3d3c 743 }
adc3c320 744
2e81254d 745 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 746 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
747 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
748 }
749
f9f00b5f 750 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 751 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
752 }
753
416fd65c 754 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 755 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
756 }
757
416fd65c 758 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 759 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 760 }
ff733df7 761
416fd65c
JB
762 private flushTasksQueue (workerNodeKey: number): void {
763 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
764 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
765 this.executeTask(
766 workerNodeKey,
767 this.dequeueTask(workerNodeKey) as Task<Data>
768 )
ff733df7 769 }
ff733df7
JB
770 }
771 }
772
ef41a6e6
JB
773 private flushTasksQueues (): void {
774 for (const [workerNodeKey] of this.workerNodes.entries()) {
775 this.flushTasksQueue(workerNodeKey)
776 }
777 }
c97c7edb 778}