refactor: convert helper to arrow function
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
2740a743 2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
3import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
0d80593b 6 isPlainObject,
bbeadd16
JB
7 median
8} from '../utils'
34a0cfab 9import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 10import { CircularArray } from '../circular-array'
29ee7e9a 11import { Queue } from '../queue'
c4855468 12import {
65d7a1c9 13 type IPool,
7c5a1080 14 PoolEmitter,
c4855468 15 PoolEvents,
6b27d407 16 type PoolInfo,
c4855468 17 type PoolOptions,
6b27d407
JB
18 type PoolType,
19 PoolTypes,
184855e6
JB
20 type TasksQueueOptions,
21 type WorkerType
c4855468 22} from './pool'
f06e48d8 23import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
24import {
25 WorkerChoiceStrategies,
a20f0ba5
JB
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
bdaf31cd
JB
28} from './selection-strategies/selection-strategies-types'
29import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 30
729c563d 31/**
ea7a90d3 32 * Base class that implements some shared logic for all poolifier pools.
729c563d 33 *
38e795c1
JB
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 36 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 37 */
c97c7edb 38export abstract class AbstractPool<
f06e48d8 39 Worker extends IWorker,
d3c8a1a8
S
40 Data = unknown,
41 Response = unknown
c4855468 42> implements IPool<Worker, Data, Response> {
afc003b2 43 /** @inheritDoc */
f06e48d8 44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 45
afc003b2 46 /** @inheritDoc */
7c0ba920
JB
47 public readonly emitter?: PoolEmitter
48
be0676b3 49 /**
a3445496 50 * The execution response promise map.
be0676b3 51 *
2740a743 52 * - `key`: The message id of each submitted task.
a3445496 53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 54 *
a3445496 55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 56 */
c923ce56
JB
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 61
a35560ba 62 /**
51fe3d3c 63 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 64 *
51fe3d3c 65 * Default to a round robin algorithm.
a35560ba
S
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
68 Worker,
69 Data,
70 Response
a35560ba
S
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
38e795c1 76 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 77 * @param filePath - Path to the worker file.
38e795c1 78 * @param opts - Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
5c5a1fb7 81 public readonly numberOfWorkers: number,
c97c7edb 82 public readonly filePath: string,
1927ee67 83 public readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
78cea37e 85 if (!this.isMain()) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
1086026a 91
7254e419
JB
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 96
c97c7edb
S
97 this.setupHook()
98
5c5a1fb7 99 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 100 this.createAndSetupWorker()
c97c7edb
S
101 }
102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
c97c7edb
S
115 }
116
a35560ba 117 private checkFilePath (filePath: string): void {
ffcbbad8
JB
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
c510fea7
APA
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
78cea37e 131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 132 throw new TypeError(
0d80593b 133 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
134 )
135 } else if (numberOfWorkers < 0) {
473c717a 136 throw new RangeError(
8d3782fa
JB
137 'Cannot instantiate a pool with a negative number of workers'
138 )
6b27d407 139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
1f68cede 155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 168 }
aee46736
JB
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 175 throw new Error(
aee46736 176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
177 )
178 }
7c0ba920
JB
179 }
180
0d80593b
JB
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
49be33fe
JB
189 if (
190 workerChoiceStrategyOptions.weights != null &&
6b27d407 191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
0d80593b
JB
197 }
198
a20f0ba5
JB
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
0d80593b
JB
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
a20f0ba5
JB
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
afc003b2 214 /** @inheritDoc */
7c0ba920
JB
215 public abstract get type (): PoolType
216
08f3f44c 217 /** @inheritDoc */
6b27d407
JB
218 public get info (): PoolInfo {
219 return {
220 type: this.type,
184855e6 221 worker: this.worker,
6b27d407
JB
222 minSize: this.minSize,
223 maxSize: this.maxSize,
224 workerNodes: this.workerNodes.length,
225 idleWorkerNodes: this.workerNodes.reduce(
226 (accumulator, workerNode) =>
227 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
232 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
233 0
234 ),
235 runningTasks: this.workerNodes.reduce(
236 (accumulator, workerNode) =>
237 accumulator + workerNode.tasksUsage.running,
238 0
239 ),
240 queuedTasks: this.workerNodes.reduce(
241 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
242 0
243 ),
244 maxQueuedTasks: this.workerNodes.reduce(
245 (accumulator, workerNode) =>
246 accumulator + workerNode.tasksQueue.maxSize,
247 0
248 )
249 }
250 }
08f3f44c 251
184855e6
JB
252 /**
253 * Gets the worker type.
254 */
255 protected abstract get worker (): WorkerType
256
c2ade475 257 /**
6b27d407 258 * Pool minimum size.
c2ade475 259 */
6b27d407 260 protected abstract get minSize (): number
ff733df7
JB
261
262 /**
6b27d407 263 * Pool maximum size.
ff733df7 264 */
6b27d407 265 protected abstract get maxSize (): number
a35560ba 266
ffcbbad8 267 /**
f06e48d8 268 * Gets the given worker its worker node key.
ffcbbad8
JB
269 *
270 * @param worker - The worker.
f06e48d8 271 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 272 */
f06e48d8
JB
273 private getWorkerNodeKey (worker: Worker): number {
274 return this.workerNodes.findIndex(
275 workerNode => workerNode.worker === worker
276 )
bf9549ae
JB
277 }
278
afc003b2 279 /** @inheritDoc */
a35560ba 280 public setWorkerChoiceStrategy (
59219cbb
JB
281 workerChoiceStrategy: WorkerChoiceStrategy,
282 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 283 ): void {
aee46736 284 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 285 this.opts.workerChoiceStrategy = workerChoiceStrategy
0ebe2a9f 286 for (const workerNode of this.workerNodes) {
f82cd357
JB
287 this.setWorkerNodeTasksUsage(workerNode, {
288 run: 0,
289 running: 0,
290 runTime: 0,
291 runTimeHistory: new CircularArray(),
292 avgRunTime: 0,
293 medRunTime: 0,
0567595a
JB
294 waitTime: 0,
295 waitTimeHistory: new CircularArray(),
296 avgWaitTime: 0,
297 medWaitTime: 0,
f82cd357
JB
298 error: 0
299 })
ea7a90d3 300 }
a35560ba 301 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
a20f0ba5
JB
302 this.opts.workerChoiceStrategy
303 )
59219cbb
JB
304 if (workerChoiceStrategyOptions != null) {
305 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
306 }
a20f0ba5
JB
307 }
308
309 /** @inheritDoc */
310 public setWorkerChoiceStrategyOptions (
311 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
312 ): void {
0d80593b 313 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
314 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
315 this.workerChoiceStrategyContext.setOptions(
316 this.opts.workerChoiceStrategyOptions
a35560ba
S
317 )
318 }
319
a20f0ba5 320 /** @inheritDoc */
8f52842f
JB
321 public enableTasksQueue (
322 enable: boolean,
323 tasksQueueOptions?: TasksQueueOptions
324 ): void {
a20f0ba5 325 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 326 this.flushTasksQueues()
a20f0ba5
JB
327 }
328 this.opts.enableTasksQueue = enable
8f52842f 329 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
330 }
331
332 /** @inheritDoc */
8f52842f 333 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 334 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
335 this.checkValidTasksQueueOptions(tasksQueueOptions)
336 this.opts.tasksQueueOptions =
337 this.buildTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
338 } else {
339 delete this.opts.tasksQueueOptions
340 }
341 }
342
343 private buildTasksQueueOptions (
344 tasksQueueOptions: TasksQueueOptions
345 ): TasksQueueOptions {
346 return {
347 concurrency: tasksQueueOptions?.concurrency ?? 1
348 }
349 }
350
c319c66b
JB
351 /**
352 * Whether the pool is full or not.
353 *
354 * The pool filling boolean status.
355 */
356 protected abstract get full (): boolean
c2ade475 357
c319c66b
JB
358 /**
359 * Whether the pool is busy or not.
360 *
361 * The pool busyness boolean status.
362 */
363 protected abstract get busy (): boolean
7c0ba920 364
c2ade475 365 protected internalBusy (): boolean {
e0ae6100
JB
366 return (
367 this.workerNodes.findIndex(workerNode => {
a4958de2 368 return workerNode.tasksUsage.running === 0
e0ae6100
JB
369 }) === -1
370 )
cb70b19d
JB
371 }
372
afc003b2 373 /** @inheritDoc */
a86b6df1 374 public async execute (data?: Data, name?: string): Promise<Response> {
0567595a 375 const submissionTimestamp = performance.now()
20dcad1a 376 const workerNodeKey = this.chooseWorkerNode()
adc3c320 377 const submittedTask: Task<Data> = {
a86b6df1 378 name,
e5a5c0fc
JB
379 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
380 data: data ?? ({} as Data),
0567595a 381 submissionTimestamp,
adc3c320
JB
382 id: crypto.randomUUID()
383 }
2e81254d 384 const res = new Promise<Response>((resolve, reject) => {
02706357 385 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
386 resolve,
387 reject,
20dcad1a 388 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
389 })
390 })
ff733df7
JB
391 if (
392 this.opts.enableTasksQueue === true &&
7171d33f 393 (this.busy ||
3528c992 394 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 395 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 396 .concurrency as number))
ff733df7 397 ) {
26a929d7
JB
398 this.enqueueTask(workerNodeKey, submittedTask)
399 } else {
2e81254d 400 this.executeTask(workerNodeKey, submittedTask)
adc3c320 401 }
b0d6ed8f 402 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 403 this.checkAndEmitEvents()
78cea37e 404 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
405 return res
406 }
c97c7edb 407
afc003b2 408 /** @inheritDoc */
c97c7edb 409 public async destroy (): Promise<void> {
1fbcaa7c 410 await Promise.all(
875a7c37
JB
411 this.workerNodes.map(async (workerNode, workerNodeKey) => {
412 this.flushTasksQueue(workerNodeKey)
f06e48d8 413 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
414 })
415 )
c97c7edb
S
416 }
417
4a6952ff 418 /**
f06e48d8 419 * Shutdowns the given worker.
4a6952ff 420 *
f06e48d8 421 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
422 */
423 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 424
729c563d 425 /**
2e81254d 426 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 427 * Can be overridden
afc003b2
JB
428 *
429 * @virtual
729c563d 430 */
280c2a77 431 protected setupHook (): void {
d99ba5a8 432 // Intentionally empty
280c2a77 433 }
c97c7edb 434
729c563d 435 /**
280c2a77
S
436 * Should return whether the worker is the main worker or not.
437 */
438 protected abstract isMain (): boolean
439
440 /**
2e81254d 441 * Hook executed before the worker task execution.
bf9549ae 442 * Can be overridden.
729c563d 443 *
f06e48d8 444 * @param workerNodeKey - The worker node key.
729c563d 445 */
f20f344f 446 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 447 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
448 }
449
c01733f1 450 /**
2e81254d 451 * Hook executed after the worker task execution.
bf9549ae 452 * Can be overridden.
c01733f1 453 *
c923ce56 454 * @param worker - The worker.
38e795c1 455 * @param message - The received message.
c01733f1 456 */
2e81254d 457 protected afterTaskExecutionHook (
c923ce56 458 worker: Worker,
2740a743 459 message: MessageValue<Response>
bf9549ae 460 ): void {
f8eb0a2a
JB
461 const workerTasksUsage =
462 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a
JB
463 --workerTasksUsage.running
464 ++workerTasksUsage.run
2740a743
JB
465 if (message.error != null) {
466 ++workerTasksUsage.error
467 }
f8eb0a2a 468 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 469 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
470 }
471
472 private updateRunTimeTasksUsage (
473 workerTasksUsage: TasksUsage,
474 message: MessageValue<Response>
475 ): void {
97a2abc3 476 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 477 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
478 if (
479 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
480 workerTasksUsage.run !== 0
481 ) {
3032893a
JB
482 workerTasksUsage.avgRunTime =
483 workerTasksUsage.runTime / workerTasksUsage.run
484 }
3fa4cdd2
JB
485 if (
486 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
487 message.runTime != null
488 ) {
489 workerTasksUsage.runTimeHistory.push(message.runTime)
78099a15
JB
490 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
491 }
3032893a 492 }
f8eb0a2a
JB
493 }
494
74001280 495 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
496 workerTasksUsage: TasksUsage,
497 message: MessageValue<Response>
498 ): void {
09a6305f
JB
499 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
500 workerTasksUsage.waitTime += message.waitTime ?? 0
501 if (
502 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
503 workerTasksUsage.run !== 0
504 ) {
505 workerTasksUsage.avgWaitTime =
506 workerTasksUsage.waitTime / workerTasksUsage.run
507 }
508 if (
509 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
510 message.waitTime != null
511 ) {
512 workerTasksUsage.waitTimeHistory.push(message.waitTime)
513 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
514 }
0567595a 515 }
c01733f1 516 }
517
280c2a77 518 /**
f06e48d8 519 * Chooses a worker node for the next task.
280c2a77 520 *
20dcad1a 521 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 522 *
20dcad1a 523 * @returns The worker node key
280c2a77 524 */
20dcad1a 525 protected chooseWorkerNode (): number {
f06e48d8 526 let workerNodeKey: number
6b27d407 527 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
528 const workerCreated = this.createAndSetupWorker()
529 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 530 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
531 if (
532 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 533 (message.kill != null &&
a4958de2 534 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 535 ) {
ff733df7 536 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 537 this.flushTasksQueue(currentWorkerNodeKey)
7c5a1080 538 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
539 }
540 })
adc3c320 541 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 542 } else {
f06e48d8 543 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 544 }
20dcad1a 545 return workerNodeKey
c97c7edb
S
546 }
547
280c2a77 548 /**
675bb809 549 * Sends a message to the given worker.
280c2a77 550 *
38e795c1
JB
551 * @param worker - The worker which should receive the message.
552 * @param message - The message.
280c2a77
S
553 */
554 protected abstract sendToWorker (
555 worker: Worker,
556 message: MessageValue<Data>
557 ): void
558
4a6952ff 559 /**
f06e48d8 560 * Registers a listener callback on the given worker.
4a6952ff 561 *
38e795c1
JB
562 * @param worker - The worker which should register a listener.
563 * @param listener - The message listener callback.
4a6952ff
JB
564 */
565 protected abstract registerWorkerMessageListener<
4f7fa42a 566 Message extends Data | Response
78cea37e 567 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 568
729c563d
S
569 /**
570 * Returns a newly created worker.
571 */
280c2a77 572 protected abstract createWorker (): Worker
c97c7edb 573
729c563d 574 /**
f06e48d8 575 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 576 *
38e795c1 577 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 578 *
38e795c1 579 * @param worker - The newly created worker.
729c563d 580 */
280c2a77 581 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 582
4a6952ff 583 /**
f06e48d8 584 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
585 *
586 * @returns New, completely set up worker.
587 */
588 protected createAndSetupWorker (): Worker {
bdacc2d2 589 const worker = this.createWorker()
280c2a77 590
35cf1c03 591 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 592 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
593 worker.on('error', error => {
594 if (this.emitter != null) {
595 this.emitter.emit(PoolEvents.error, error)
596 }
597 })
598 if (this.opts.restartWorkerOnError === true) {
599 worker.on('error', () => {
600 this.createAndSetupWorker()
601 })
602 }
a35560ba
S
603 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
604 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 605 worker.once('exit', () => {
f06e48d8 606 this.removeWorkerNode(worker)
a974afa6 607 })
280c2a77 608
f06e48d8 609 this.pushWorkerNode(worker)
280c2a77
S
610
611 this.afterWorkerSetup(worker)
612
c97c7edb
S
613 return worker
614 }
be0676b3
APA
615
616 /**
ff733df7 617 * This function is the listener registered for each worker message.
be0676b3 618 *
bdacc2d2 619 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
620 */
621 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 622 return message => {
b1989cfd 623 if (message.id != null) {
a3445496 624 // Task execution response received
2740a743 625 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 626 if (promiseResponse != null) {
78cea37e 627 if (message.error != null) {
2740a743 628 promiseResponse.reject(message.error)
91ee39ed
JB
629 if (this.emitter != null) {
630 this.emitter.emit(PoolEvents.taskError, {
631 error: message.error,
632 errorData: message.errorData
633 })
634 }
a05c10de 635 } else {
2740a743 636 promiseResponse.resolve(message.data as Response)
a05c10de 637 }
2e81254d 638 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 639 this.promiseResponseMap.delete(message.id)
ff733df7
JB
640 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
641 if (
642 this.opts.enableTasksQueue === true &&
416fd65c 643 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 644 ) {
2e81254d
JB
645 this.executeTask(
646 workerNodeKey,
ff733df7
JB
647 this.dequeueTask(workerNodeKey) as Task<Data>
648 )
649 }
be0676b3
APA
650 }
651 }
652 }
be0676b3 653 }
7c0ba920 654
ff733df7 655 private checkAndEmitEvents (): void {
1f68cede 656 if (this.emitter != null) {
ff733df7 657 if (this.busy) {
6b27d407 658 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 659 }
6b27d407
JB
660 if (this.type === PoolTypes.dynamic && this.full) {
661 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 662 }
164d950a
JB
663 }
664 }
665
0ebe2a9f
JB
666 /**
667 * Sets the given worker node its tasks usage in the pool.
668 *
669 * @param workerNode - The worker node.
670 * @param tasksUsage - The worker node tasks usage.
671 */
672 private setWorkerNodeTasksUsage (
673 workerNode: WorkerNode<Worker, Data>,
674 tasksUsage: TasksUsage
675 ): void {
676 workerNode.tasksUsage = tasksUsage
677 }
678
a05c10de 679 /**
f06e48d8 680 * Pushes the given worker in the pool worker nodes.
ea7a90d3 681 *
38e795c1 682 * @param worker - The worker.
f06e48d8 683 * @returns The worker nodes length.
ea7a90d3 684 */
f06e48d8
JB
685 private pushWorkerNode (worker: Worker): number {
686 return this.workerNodes.push({
ffcbbad8 687 worker,
f82cd357
JB
688 tasksUsage: {
689 run: 0,
690 running: 0,
691 runTime: 0,
692 runTimeHistory: new CircularArray(),
693 avgRunTime: 0,
694 medRunTime: 0,
0567595a
JB
695 waitTime: 0,
696 waitTimeHistory: new CircularArray(),
697 avgWaitTime: 0,
698 medWaitTime: 0,
f82cd357
JB
699 error: 0
700 },
29ee7e9a 701 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
702 })
703 }
c923ce56
JB
704
705 /**
f06e48d8 706 * Sets the given worker in the pool worker nodes.
c923ce56 707 *
f06e48d8 708 * @param workerNodeKey - The worker node key.
c923ce56
JB
709 * @param worker - The worker.
710 * @param tasksUsage - The worker tasks usage.
f06e48d8 711 * @param tasksQueue - The worker task queue.
c923ce56 712 */
f06e48d8
JB
713 private setWorkerNode (
714 workerNodeKey: number,
c923ce56 715 worker: Worker,
f06e48d8 716 tasksUsage: TasksUsage,
29ee7e9a 717 tasksQueue: Queue<Task<Data>>
c923ce56 718 ): void {
f06e48d8 719 this.workerNodes[workerNodeKey] = {
c923ce56 720 worker,
f06e48d8
JB
721 tasksUsage,
722 tasksQueue
c923ce56
JB
723 }
724 }
51fe3d3c
JB
725
726 /**
f06e48d8 727 * Removes the given worker from the pool worker nodes.
51fe3d3c 728 *
f06e48d8 729 * @param worker - The worker.
51fe3d3c 730 */
416fd65c 731 private removeWorkerNode (worker: Worker): void {
f06e48d8 732 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
733 if (workerNodeKey !== -1) {
734 this.workerNodes.splice(workerNodeKey, 1)
735 this.workerChoiceStrategyContext.remove(workerNodeKey)
736 }
51fe3d3c 737 }
adc3c320 738
2e81254d 739 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 740 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
741 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
742 }
743
f9f00b5f 744 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 745 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
746 }
747
416fd65c 748 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 749 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
750 }
751
416fd65c 752 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 753 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 754 }
ff733df7 755
416fd65c
JB
756 private flushTasksQueue (workerNodeKey: number): void {
757 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
758 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
759 this.executeTask(
760 workerNodeKey,
761 this.dequeueTask(workerNodeKey) as Task<Data>
762 )
ff733df7 763 }
ff733df7
JB
764 }
765 }
766
ef41a6e6
JB
767 private flushTasksQueues (): void {
768 for (const [workerNodeKey] of this.workerNodes.entries()) {
769 this.flushTasksQueue(workerNodeKey)
770 }
771 }
c97c7edb 772}