chore: v2.5.2
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
2740a743 2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
3import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
0d80593b 6 isPlainObject,
bbeadd16
JB
7 median
8} from '../utils'
34a0cfab 9import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 10import { CircularArray } from '../circular-array'
29ee7e9a 11import { Queue } from '../queue'
c4855468 12import {
65d7a1c9 13 type IPool,
7c5a1080 14 PoolEmitter,
c4855468 15 PoolEvents,
6b27d407 16 type PoolInfo,
c4855468 17 type PoolOptions,
6b27d407
JB
18 type PoolType,
19 PoolTypes,
65d7a1c9 20 type TasksQueueOptions
c4855468 21} from './pool'
f06e48d8 22import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
23import {
24 WorkerChoiceStrategies,
a20f0ba5
JB
25 type WorkerChoiceStrategy,
26 type WorkerChoiceStrategyOptions
bdaf31cd
JB
27} from './selection-strategies/selection-strategies-types'
28import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 29
729c563d 30/**
ea7a90d3 31 * Base class that implements some shared logic for all poolifier pools.
729c563d 32 *
38e795c1
JB
33 * @typeParam Worker - Type of worker which manages this pool.
34 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 35 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 36 */
c97c7edb 37export abstract class AbstractPool<
f06e48d8 38 Worker extends IWorker,
d3c8a1a8
S
39 Data = unknown,
40 Response = unknown
c4855468 41> implements IPool<Worker, Data, Response> {
afc003b2 42 /** @inheritDoc */
f06e48d8 43 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 44
afc003b2 45 /** @inheritDoc */
7c0ba920
JB
46 public readonly emitter?: PoolEmitter
47
be0676b3 48 /**
a3445496 49 * The execution response promise map.
be0676b3 50 *
2740a743 51 * - `key`: The message id of each submitted task.
a3445496 52 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 53 *
a3445496 54 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 55 */
c923ce56
JB
56 protected promiseResponseMap: Map<
57 string,
58 PromiseResponseWrapper<Worker, Response>
59 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 60
a35560ba 61 /**
51fe3d3c 62 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 63 *
51fe3d3c 64 * Default to a round robin algorithm.
a35560ba
S
65 */
66 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
67 Worker,
68 Data,
69 Response
a35560ba
S
70 >
71
729c563d
S
72 /**
73 * Constructs a new poolifier pool.
74 *
38e795c1 75 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 76 * @param filePath - Path to the worker file.
38e795c1 77 * @param opts - Options for the pool.
729c563d 78 */
c97c7edb 79 public constructor (
5c5a1fb7 80 public readonly numberOfWorkers: number,
c97c7edb 81 public readonly filePath: string,
1927ee67 82 public readonly opts: PoolOptions<Worker>
c97c7edb 83 ) {
78cea37e 84 if (!this.isMain()) {
c97c7edb
S
85 throw new Error('Cannot start a pool from a worker!')
86 }
8d3782fa 87 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 88 this.checkFilePath(this.filePath)
7c0ba920 89 this.checkPoolOptions(this.opts)
1086026a 90
7254e419
JB
91 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
92 this.executeTask = this.executeTask.bind(this)
93 this.enqueueTask = this.enqueueTask.bind(this)
94 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 95
c97c7edb
S
96 this.setupHook()
97
5c5a1fb7 98 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 99 this.createAndSetupWorker()
c97c7edb
S
100 }
101
6bd72cd0 102 if (this.opts.enableEvents === true) {
7c0ba920
JB
103 this.emitter = new PoolEmitter()
104 }
d59df138
JB
105 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
106 Worker,
107 Data,
108 Response
da309861
JB
109 >(
110 this,
111 this.opts.workerChoiceStrategy,
112 this.opts.workerChoiceStrategyOptions
113 )
c97c7edb
S
114 }
115
a35560ba 116 private checkFilePath (filePath: string): void {
ffcbbad8
JB
117 if (
118 filePath == null ||
119 (typeof filePath === 'string' && filePath.trim().length === 0)
120 ) {
c510fea7
APA
121 throw new Error('Please specify a file with a worker implementation')
122 }
123 }
124
8d3782fa
JB
125 private checkNumberOfWorkers (numberOfWorkers: number): void {
126 if (numberOfWorkers == null) {
127 throw new Error(
128 'Cannot instantiate a pool without specifying the number of workers'
129 )
78cea37e 130 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 131 throw new TypeError(
0d80593b 132 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
133 )
134 } else if (numberOfWorkers < 0) {
473c717a 135 throw new RangeError(
8d3782fa
JB
136 'Cannot instantiate a pool with a negative number of workers'
137 )
6b27d407 138 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
139 throw new Error('Cannot instantiate a fixed pool with no worker')
140 }
141 }
142
7c0ba920 143 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
144 if (isPlainObject(opts)) {
145 this.opts.workerChoiceStrategy =
146 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
147 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
148 this.opts.workerChoiceStrategyOptions =
149 opts.workerChoiceStrategyOptions ??
150 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
151 this.checkValidWorkerChoiceStrategyOptions(
152 this.opts.workerChoiceStrategyOptions
153 )
1f68cede 154 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
155 this.opts.enableEvents = opts.enableEvents ?? true
156 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
157 if (this.opts.enableTasksQueue) {
158 this.checkValidTasksQueueOptions(
159 opts.tasksQueueOptions as TasksQueueOptions
160 )
161 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
162 opts.tasksQueueOptions as TasksQueueOptions
163 )
164 }
165 } else {
166 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 167 }
aee46736
JB
168 }
169
170 private checkValidWorkerChoiceStrategy (
171 workerChoiceStrategy: WorkerChoiceStrategy
172 ): void {
173 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 174 throw new Error(
aee46736 175 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
176 )
177 }
7c0ba920
JB
178 }
179
0d80593b
JB
180 private checkValidWorkerChoiceStrategyOptions (
181 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
182 ): void {
183 if (!isPlainObject(workerChoiceStrategyOptions)) {
184 throw new TypeError(
185 'Invalid worker choice strategy options: must be a plain object'
186 )
187 }
49be33fe
JB
188 if (
189 workerChoiceStrategyOptions.weights != null &&
6b27d407 190 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
191 ) {
192 throw new Error(
193 'Invalid worker choice strategy options: must have a weight for each worker node'
194 )
195 }
0d80593b
JB
196 }
197
a20f0ba5
JB
198 private checkValidTasksQueueOptions (
199 tasksQueueOptions: TasksQueueOptions
200 ): void {
0d80593b
JB
201 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
202 throw new TypeError('Invalid tasks queue options: must be a plain object')
203 }
a20f0ba5
JB
204 if ((tasksQueueOptions?.concurrency as number) <= 0) {
205 throw new Error(
206 `Invalid worker tasks concurrency '${
207 tasksQueueOptions.concurrency as number
208 }'`
209 )
210 }
211 }
212
afc003b2 213 /** @inheritDoc */
7c0ba920
JB
214 public abstract get type (): PoolType
215
08f3f44c 216 /** @inheritDoc */
6b27d407
JB
217 public get info (): PoolInfo {
218 return {
219 type: this.type,
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
225 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
226 0
227 ),
228 busyWorkerNodes: this.workerNodes.reduce(
229 (accumulator, workerNode) =>
230 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
231 0
232 ),
233 runningTasks: this.workerNodes.reduce(
234 (accumulator, workerNode) =>
235 accumulator + workerNode.tasksUsage.running,
236 0
237 ),
238 queuedTasks: this.workerNodes.reduce(
239 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
240 0
241 ),
242 maxQueuedTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.tasksQueue.maxSize,
245 0
246 )
247 }
248 }
08f3f44c 249
c2ade475 250 /**
6b27d407 251 * Pool minimum size.
c2ade475 252 */
6b27d407 253 protected abstract get minSize (): number
ff733df7
JB
254
255 /**
6b27d407 256 * Pool maximum size.
ff733df7 257 */
6b27d407 258 protected abstract get maxSize (): number
a35560ba 259
ffcbbad8 260 /**
f06e48d8 261 * Gets the given worker its worker node key.
ffcbbad8
JB
262 *
263 * @param worker - The worker.
f06e48d8 264 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 265 */
f06e48d8
JB
266 private getWorkerNodeKey (worker: Worker): number {
267 return this.workerNodes.findIndex(
268 workerNode => workerNode.worker === worker
269 )
bf9549ae
JB
270 }
271
afc003b2 272 /** @inheritDoc */
a35560ba 273 public setWorkerChoiceStrategy (
59219cbb
JB
274 workerChoiceStrategy: WorkerChoiceStrategy,
275 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 276 ): void {
aee46736 277 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 278 this.opts.workerChoiceStrategy = workerChoiceStrategy
0ebe2a9f 279 for (const workerNode of this.workerNodes) {
f82cd357
JB
280 this.setWorkerNodeTasksUsage(workerNode, {
281 run: 0,
282 running: 0,
283 runTime: 0,
284 runTimeHistory: new CircularArray(),
285 avgRunTime: 0,
286 medRunTime: 0,
0567595a
JB
287 waitTime: 0,
288 waitTimeHistory: new CircularArray(),
289 avgWaitTime: 0,
290 medWaitTime: 0,
f82cd357
JB
291 error: 0
292 })
ea7a90d3 293 }
a35560ba 294 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
a20f0ba5
JB
295 this.opts.workerChoiceStrategy
296 )
59219cbb
JB
297 if (workerChoiceStrategyOptions != null) {
298 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
299 }
a20f0ba5
JB
300 }
301
302 /** @inheritDoc */
303 public setWorkerChoiceStrategyOptions (
304 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
305 ): void {
0d80593b 306 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
307 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
308 this.workerChoiceStrategyContext.setOptions(
309 this.opts.workerChoiceStrategyOptions
a35560ba
S
310 )
311 }
312
a20f0ba5 313 /** @inheritDoc */
8f52842f
JB
314 public enableTasksQueue (
315 enable: boolean,
316 tasksQueueOptions?: TasksQueueOptions
317 ): void {
a20f0ba5 318 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 319 this.flushTasksQueues()
a20f0ba5
JB
320 }
321 this.opts.enableTasksQueue = enable
8f52842f 322 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
323 }
324
325 /** @inheritDoc */
8f52842f 326 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 327 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
328 this.checkValidTasksQueueOptions(tasksQueueOptions)
329 this.opts.tasksQueueOptions =
330 this.buildTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
331 } else {
332 delete this.opts.tasksQueueOptions
333 }
334 }
335
336 private buildTasksQueueOptions (
337 tasksQueueOptions: TasksQueueOptions
338 ): TasksQueueOptions {
339 return {
340 concurrency: tasksQueueOptions?.concurrency ?? 1
341 }
342 }
343
c319c66b
JB
344 /**
345 * Whether the pool is full or not.
346 *
347 * The pool filling boolean status.
348 */
349 protected abstract get full (): boolean
c2ade475 350
c319c66b
JB
351 /**
352 * Whether the pool is busy or not.
353 *
354 * The pool busyness boolean status.
355 */
356 protected abstract get busy (): boolean
7c0ba920 357
c2ade475 358 protected internalBusy (): boolean {
e0ae6100
JB
359 return (
360 this.workerNodes.findIndex(workerNode => {
a4958de2 361 return workerNode.tasksUsage.running === 0
e0ae6100
JB
362 }) === -1
363 )
cb70b19d
JB
364 }
365
afc003b2 366 /** @inheritDoc */
a86b6df1 367 public async execute (data?: Data, name?: string): Promise<Response> {
0567595a 368 const submissionTimestamp = performance.now()
20dcad1a 369 const workerNodeKey = this.chooseWorkerNode()
adc3c320 370 const submittedTask: Task<Data> = {
a86b6df1 371 name,
e5a5c0fc
JB
372 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
373 data: data ?? ({} as Data),
0567595a 374 submissionTimestamp,
adc3c320
JB
375 id: crypto.randomUUID()
376 }
2e81254d 377 const res = new Promise<Response>((resolve, reject) => {
02706357 378 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
379 resolve,
380 reject,
20dcad1a 381 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
382 })
383 })
ff733df7
JB
384 if (
385 this.opts.enableTasksQueue === true &&
7171d33f 386 (this.busy ||
3528c992 387 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 388 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 389 .concurrency as number))
ff733df7 390 ) {
26a929d7
JB
391 this.enqueueTask(workerNodeKey, submittedTask)
392 } else {
2e81254d 393 this.executeTask(workerNodeKey, submittedTask)
adc3c320 394 }
b0d6ed8f 395 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 396 this.checkAndEmitEvents()
78cea37e 397 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
398 return res
399 }
c97c7edb 400
afc003b2 401 /** @inheritDoc */
c97c7edb 402 public async destroy (): Promise<void> {
1fbcaa7c 403 await Promise.all(
875a7c37
JB
404 this.workerNodes.map(async (workerNode, workerNodeKey) => {
405 this.flushTasksQueue(workerNodeKey)
f06e48d8 406 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
407 })
408 )
c97c7edb
S
409 }
410
4a6952ff 411 /**
f06e48d8 412 * Shutdowns the given worker.
4a6952ff 413 *
f06e48d8 414 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
415 */
416 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 417
729c563d 418 /**
2e81254d 419 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 420 * Can be overridden
afc003b2
JB
421 *
422 * @virtual
729c563d 423 */
280c2a77 424 protected setupHook (): void {
d99ba5a8 425 // Intentionally empty
280c2a77 426 }
c97c7edb 427
729c563d 428 /**
280c2a77
S
429 * Should return whether the worker is the main worker or not.
430 */
431 protected abstract isMain (): boolean
432
433 /**
2e81254d 434 * Hook executed before the worker task execution.
bf9549ae 435 * Can be overridden.
729c563d 436 *
f06e48d8 437 * @param workerNodeKey - The worker node key.
729c563d 438 */
f20f344f 439 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 440 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
441 }
442
c01733f1 443 /**
2e81254d 444 * Hook executed after the worker task execution.
bf9549ae 445 * Can be overridden.
c01733f1 446 *
c923ce56 447 * @param worker - The worker.
38e795c1 448 * @param message - The received message.
c01733f1 449 */
2e81254d 450 protected afterTaskExecutionHook (
c923ce56 451 worker: Worker,
2740a743 452 message: MessageValue<Response>
bf9549ae 453 ): void {
f8eb0a2a
JB
454 const workerTasksUsage =
455 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a
JB
456 --workerTasksUsage.running
457 ++workerTasksUsage.run
2740a743
JB
458 if (message.error != null) {
459 ++workerTasksUsage.error
460 }
f8eb0a2a 461 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 462 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
463 }
464
465 private updateRunTimeTasksUsage (
466 workerTasksUsage: TasksUsage,
467 message: MessageValue<Response>
468 ): void {
97a2abc3 469 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 470 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
471 if (
472 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
473 workerTasksUsage.run !== 0
474 ) {
3032893a
JB
475 workerTasksUsage.avgRunTime =
476 workerTasksUsage.runTime / workerTasksUsage.run
477 }
3fa4cdd2
JB
478 if (
479 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
480 message.runTime != null
481 ) {
482 workerTasksUsage.runTimeHistory.push(message.runTime)
78099a15
JB
483 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
484 }
3032893a 485 }
f8eb0a2a
JB
486 }
487
74001280 488 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
489 workerTasksUsage: TasksUsage,
490 message: MessageValue<Response>
491 ): void {
09a6305f
JB
492 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
493 workerTasksUsage.waitTime += message.waitTime ?? 0
494 if (
495 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
496 workerTasksUsage.run !== 0
497 ) {
498 workerTasksUsage.avgWaitTime =
499 workerTasksUsage.waitTime / workerTasksUsage.run
500 }
501 if (
502 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
503 message.waitTime != null
504 ) {
505 workerTasksUsage.waitTimeHistory.push(message.waitTime)
506 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
507 }
0567595a 508 }
c01733f1 509 }
510
280c2a77 511 /**
f06e48d8 512 * Chooses a worker node for the next task.
280c2a77 513 *
20dcad1a 514 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 515 *
20dcad1a 516 * @returns The worker node key
280c2a77 517 */
20dcad1a 518 protected chooseWorkerNode (): number {
f06e48d8 519 let workerNodeKey: number
6b27d407 520 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
521 const workerCreated = this.createAndSetupWorker()
522 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 523 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
524 if (
525 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 526 (message.kill != null &&
a4958de2 527 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 528 ) {
ff733df7 529 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 530 this.flushTasksQueue(currentWorkerNodeKey)
7c5a1080 531 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
532 }
533 })
adc3c320 534 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 535 } else {
f06e48d8 536 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 537 }
20dcad1a 538 return workerNodeKey
c97c7edb
S
539 }
540
280c2a77 541 /**
675bb809 542 * Sends a message to the given worker.
280c2a77 543 *
38e795c1
JB
544 * @param worker - The worker which should receive the message.
545 * @param message - The message.
280c2a77
S
546 */
547 protected abstract sendToWorker (
548 worker: Worker,
549 message: MessageValue<Data>
550 ): void
551
4a6952ff 552 /**
f06e48d8 553 * Registers a listener callback on the given worker.
4a6952ff 554 *
38e795c1
JB
555 * @param worker - The worker which should register a listener.
556 * @param listener - The message listener callback.
4a6952ff
JB
557 */
558 protected abstract registerWorkerMessageListener<
4f7fa42a 559 Message extends Data | Response
78cea37e 560 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 561
729c563d
S
562 /**
563 * Returns a newly created worker.
564 */
280c2a77 565 protected abstract createWorker (): Worker
c97c7edb 566
729c563d 567 /**
f06e48d8 568 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 569 *
38e795c1 570 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 571 *
38e795c1 572 * @param worker - The newly created worker.
729c563d 573 */
280c2a77 574 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 575
4a6952ff 576 /**
f06e48d8 577 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
578 *
579 * @returns New, completely set up worker.
580 */
581 protected createAndSetupWorker (): Worker {
bdacc2d2 582 const worker = this.createWorker()
280c2a77 583
35cf1c03 584 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 585 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
586 worker.on('error', error => {
587 if (this.emitter != null) {
588 this.emitter.emit(PoolEvents.error, error)
589 }
590 })
591 if (this.opts.restartWorkerOnError === true) {
592 worker.on('error', () => {
593 this.createAndSetupWorker()
594 })
595 }
a35560ba
S
596 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
597 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 598 worker.once('exit', () => {
f06e48d8 599 this.removeWorkerNode(worker)
a974afa6 600 })
280c2a77 601
f06e48d8 602 this.pushWorkerNode(worker)
280c2a77
S
603
604 this.afterWorkerSetup(worker)
605
c97c7edb
S
606 return worker
607 }
be0676b3
APA
608
609 /**
ff733df7 610 * This function is the listener registered for each worker message.
be0676b3 611 *
bdacc2d2 612 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
613 */
614 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 615 return message => {
b1989cfd 616 if (message.id != null) {
a3445496 617 // Task execution response received
2740a743 618 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 619 if (promiseResponse != null) {
78cea37e 620 if (message.error != null) {
2740a743 621 promiseResponse.reject(message.error)
91ee39ed
JB
622 if (this.emitter != null) {
623 this.emitter.emit(PoolEvents.taskError, {
624 error: message.error,
625 errorData: message.errorData
626 })
627 }
a05c10de 628 } else {
2740a743 629 promiseResponse.resolve(message.data as Response)
a05c10de 630 }
2e81254d 631 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 632 this.promiseResponseMap.delete(message.id)
ff733df7
JB
633 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
634 if (
635 this.opts.enableTasksQueue === true &&
416fd65c 636 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 637 ) {
2e81254d
JB
638 this.executeTask(
639 workerNodeKey,
ff733df7
JB
640 this.dequeueTask(workerNodeKey) as Task<Data>
641 )
642 }
be0676b3
APA
643 }
644 }
645 }
be0676b3 646 }
7c0ba920 647
ff733df7 648 private checkAndEmitEvents (): void {
1f68cede 649 if (this.emitter != null) {
ff733df7 650 if (this.busy) {
6b27d407 651 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 652 }
6b27d407
JB
653 if (this.type === PoolTypes.dynamic && this.full) {
654 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 655 }
164d950a
JB
656 }
657 }
658
0ebe2a9f
JB
659 /**
660 * Sets the given worker node its tasks usage in the pool.
661 *
662 * @param workerNode - The worker node.
663 * @param tasksUsage - The worker node tasks usage.
664 */
665 private setWorkerNodeTasksUsage (
666 workerNode: WorkerNode<Worker, Data>,
667 tasksUsage: TasksUsage
668 ): void {
669 workerNode.tasksUsage = tasksUsage
670 }
671
a05c10de 672 /**
f06e48d8 673 * Pushes the given worker in the pool worker nodes.
ea7a90d3 674 *
38e795c1 675 * @param worker - The worker.
f06e48d8 676 * @returns The worker nodes length.
ea7a90d3 677 */
f06e48d8
JB
678 private pushWorkerNode (worker: Worker): number {
679 return this.workerNodes.push({
ffcbbad8 680 worker,
f82cd357
JB
681 tasksUsage: {
682 run: 0,
683 running: 0,
684 runTime: 0,
685 runTimeHistory: new CircularArray(),
686 avgRunTime: 0,
687 medRunTime: 0,
0567595a
JB
688 waitTime: 0,
689 waitTimeHistory: new CircularArray(),
690 avgWaitTime: 0,
691 medWaitTime: 0,
f82cd357
JB
692 error: 0
693 },
29ee7e9a 694 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
695 })
696 }
c923ce56
JB
697
698 /**
f06e48d8 699 * Sets the given worker in the pool worker nodes.
c923ce56 700 *
f06e48d8 701 * @param workerNodeKey - The worker node key.
c923ce56
JB
702 * @param worker - The worker.
703 * @param tasksUsage - The worker tasks usage.
f06e48d8 704 * @param tasksQueue - The worker task queue.
c923ce56 705 */
f06e48d8
JB
706 private setWorkerNode (
707 workerNodeKey: number,
c923ce56 708 worker: Worker,
f06e48d8 709 tasksUsage: TasksUsage,
29ee7e9a 710 tasksQueue: Queue<Task<Data>>
c923ce56 711 ): void {
f06e48d8 712 this.workerNodes[workerNodeKey] = {
c923ce56 713 worker,
f06e48d8
JB
714 tasksUsage,
715 tasksQueue
c923ce56
JB
716 }
717 }
51fe3d3c
JB
718
719 /**
f06e48d8 720 * Removes the given worker from the pool worker nodes.
51fe3d3c 721 *
f06e48d8 722 * @param worker - The worker.
51fe3d3c 723 */
416fd65c 724 private removeWorkerNode (worker: Worker): void {
f06e48d8 725 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
726 if (workerNodeKey !== -1) {
727 this.workerNodes.splice(workerNodeKey, 1)
728 this.workerChoiceStrategyContext.remove(workerNodeKey)
729 }
51fe3d3c 730 }
adc3c320 731
2e81254d 732 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 733 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
734 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
735 }
736
f9f00b5f 737 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 738 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
739 }
740
416fd65c 741 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 742 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
743 }
744
416fd65c 745 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 746 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 747 }
ff733df7 748
416fd65c
JB
749 private flushTasksQueue (workerNodeKey: number): void {
750 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
751 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
752 this.executeTask(
753 workerNodeKey,
754 this.dequeueTask(workerNodeKey) as Task<Data>
755 )
ff733df7 756 }
ff733df7
JB
757 }
758 }
759
ef41a6e6
JB
760 private flushTasksQueues (): void {
761 for (const [workerNodeKey] of this.workerNodes.entries()) {
762 this.flushTasksQueue(workerNodeKey)
763 }
764 }
c97c7edb 765}