docs: small refinements
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
2740a743 2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
3import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
0d80593b 6 isPlainObject,
bbeadd16
JB
7 median
8} from '../utils'
34a0cfab 9import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 10import { CircularArray } from '../circular-array'
29ee7e9a 11import { Queue } from '../queue'
c4855468 12import {
65d7a1c9 13 type IPool,
7c5a1080 14 PoolEmitter,
c4855468 15 PoolEvents,
6b27d407 16 type PoolInfo,
c4855468 17 type PoolOptions,
6b27d407
JB
18 type PoolType,
19 PoolTypes,
184855e6
JB
20 type TasksQueueOptions,
21 type WorkerType
c4855468 22} from './pool'
f06e48d8 23import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
24import {
25 WorkerChoiceStrategies,
a20f0ba5
JB
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
bdaf31cd
JB
28} from './selection-strategies/selection-strategies-types'
29import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 30
729c563d 31/**
ea7a90d3 32 * Base class that implements some shared logic for all poolifier pools.
729c563d 33 *
38e795c1
JB
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 36 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 37 */
c97c7edb 38export abstract class AbstractPool<
f06e48d8 39 Worker extends IWorker,
d3c8a1a8
S
40 Data = unknown,
41 Response = unknown
c4855468 42> implements IPool<Worker, Data, Response> {
afc003b2 43 /** @inheritDoc */
f06e48d8 44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 45
afc003b2 46 /** @inheritDoc */
7c0ba920
JB
47 public readonly emitter?: PoolEmitter
48
be0676b3 49 /**
a3445496 50 * The execution response promise map.
be0676b3 51 *
2740a743 52 * - `key`: The message id of each submitted task.
a3445496 53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 54 *
a3445496 55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 56 */
c923ce56
JB
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 61
a35560ba 62 /**
51fe3d3c 63 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 64 *
51fe3d3c 65 * Default to a round robin algorithm.
a35560ba
S
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
68 Worker,
69 Data,
70 Response
a35560ba
S
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
38e795c1 76 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 77 * @param filePath - Path to the worker file.
38e795c1 78 * @param opts - Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
5c5a1fb7 81 public readonly numberOfWorkers: number,
c97c7edb 82 public readonly filePath: string,
1927ee67 83 public readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
78cea37e 85 if (!this.isMain()) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
1086026a 91
7254e419
JB
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 96
c97c7edb
S
97 this.setupHook()
98
5c5a1fb7 99 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 100 this.createAndSetupWorker()
c97c7edb
S
101 }
102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
c97c7edb
S
115 }
116
a35560ba 117 private checkFilePath (filePath: string): void {
ffcbbad8
JB
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
c510fea7
APA
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
78cea37e 131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 132 throw new TypeError(
0d80593b 133 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
134 )
135 } else if (numberOfWorkers < 0) {
473c717a 136 throw new RangeError(
8d3782fa
JB
137 'Cannot instantiate a pool with a negative number of workers'
138 )
6b27d407 139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
1f68cede 155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 168 }
aee46736
JB
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 175 throw new Error(
aee46736 176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
177 )
178 }
7c0ba920
JB
179 }
180
0d80593b
JB
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
49be33fe
JB
189 if (
190 workerChoiceStrategyOptions.weights != null &&
6b27d407 191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
0d80593b
JB
197 }
198
a20f0ba5
JB
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
0d80593b
JB
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
a20f0ba5
JB
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
afc003b2 214 /** @inheritDoc */
7c0ba920
JB
215 public abstract get type (): PoolType
216
08f3f44c 217 /** @inheritDoc */
6b27d407
JB
218 public get info (): PoolInfo {
219 return {
220 type: this.type,
184855e6 221 worker: this.worker,
6b27d407
JB
222 minSize: this.minSize,
223 maxSize: this.maxSize,
224 workerNodes: this.workerNodes.length,
225 idleWorkerNodes: this.workerNodes.reduce(
226 (accumulator, workerNode) =>
227 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
232 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
233 0
234 ),
235 runningTasks: this.workerNodes.reduce(
236 (accumulator, workerNode) =>
237 accumulator + workerNode.tasksUsage.running,
238 0
239 ),
240 queuedTasks: this.workerNodes.reduce(
241 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
242 0
243 ),
244 maxQueuedTasks: this.workerNodes.reduce(
245 (accumulator, workerNode) =>
246 accumulator + workerNode.tasksQueue.maxSize,
247 0
248 )
249 }
250 }
08f3f44c 251
184855e6
JB
252 /**
253 * Gets the worker type.
254 */
255 protected abstract get worker (): WorkerType
256
c2ade475 257 /**
6b27d407 258 * Pool minimum size.
c2ade475 259 */
6b27d407 260 protected abstract get minSize (): number
ff733df7
JB
261
262 /**
6b27d407 263 * Pool maximum size.
ff733df7 264 */
6b27d407 265 protected abstract get maxSize (): number
a35560ba 266
ffcbbad8 267 /**
f06e48d8 268 * Gets the given worker its worker node key.
ffcbbad8
JB
269 *
270 * @param worker - The worker.
f06e48d8 271 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 272 */
f06e48d8
JB
273 private getWorkerNodeKey (worker: Worker): number {
274 return this.workerNodes.findIndex(
275 workerNode => workerNode.worker === worker
276 )
bf9549ae
JB
277 }
278
afc003b2 279 /** @inheritDoc */
a35560ba 280 public setWorkerChoiceStrategy (
59219cbb
JB
281 workerChoiceStrategy: WorkerChoiceStrategy,
282 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 283 ): void {
aee46736 284 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 285 this.opts.workerChoiceStrategy = workerChoiceStrategy
0ebe2a9f 286 for (const workerNode of this.workerNodes) {
f82cd357
JB
287 this.setWorkerNodeTasksUsage(workerNode, {
288 run: 0,
289 running: 0,
290 runTime: 0,
291 runTimeHistory: new CircularArray(),
292 avgRunTime: 0,
293 medRunTime: 0,
0567595a
JB
294 waitTime: 0,
295 waitTimeHistory: new CircularArray(),
296 avgWaitTime: 0,
297 medWaitTime: 0,
f82cd357
JB
298 error: 0
299 })
ea7a90d3 300 }
a35560ba 301 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
a20f0ba5
JB
302 this.opts.workerChoiceStrategy
303 )
59219cbb
JB
304 if (workerChoiceStrategyOptions != null) {
305 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
306 }
a20f0ba5
JB
307 }
308
309 /** @inheritDoc */
310 public setWorkerChoiceStrategyOptions (
311 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
312 ): void {
0d80593b 313 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
314 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
315 this.workerChoiceStrategyContext.setOptions(
316 this.opts.workerChoiceStrategyOptions
a35560ba
S
317 )
318 }
319
a20f0ba5 320 /** @inheritDoc */
8f52842f
JB
321 public enableTasksQueue (
322 enable: boolean,
323 tasksQueueOptions?: TasksQueueOptions
324 ): void {
a20f0ba5 325 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 326 this.flushTasksQueues()
a20f0ba5
JB
327 }
328 this.opts.enableTasksQueue = enable
8f52842f 329 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
330 }
331
332 /** @inheritDoc */
8f52842f 333 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 334 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
335 this.checkValidTasksQueueOptions(tasksQueueOptions)
336 this.opts.tasksQueueOptions =
337 this.buildTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
338 } else {
339 delete this.opts.tasksQueueOptions
340 }
341 }
342
343 private buildTasksQueueOptions (
344 tasksQueueOptions: TasksQueueOptions
345 ): TasksQueueOptions {
346 return {
347 concurrency: tasksQueueOptions?.concurrency ?? 1
348 }
349 }
350
c319c66b
JB
351 /**
352 * Whether the pool is full or not.
353 *
354 * The pool filling boolean status.
355 */
356 protected abstract get full (): boolean
c2ade475 357
c319c66b
JB
358 /**
359 * Whether the pool is busy or not.
360 *
361 * The pool busyness boolean status.
362 */
363 protected abstract get busy (): boolean
7c0ba920 364
c2ade475 365 protected internalBusy (): boolean {
e0ae6100
JB
366 return (
367 this.workerNodes.findIndex(workerNode => {
a4958de2 368 return workerNode.tasksUsage.running === 0
e0ae6100
JB
369 }) === -1
370 )
cb70b19d
JB
371 }
372
afc003b2 373 /** @inheritDoc */
a86b6df1 374 public async execute (data?: Data, name?: string): Promise<Response> {
0567595a 375 const submissionTimestamp = performance.now()
20dcad1a 376 const workerNodeKey = this.chooseWorkerNode()
adc3c320 377 const submittedTask: Task<Data> = {
a86b6df1 378 name,
e5a5c0fc
JB
379 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
380 data: data ?? ({} as Data),
0567595a 381 submissionTimestamp,
adc3c320
JB
382 id: crypto.randomUUID()
383 }
2e81254d 384 const res = new Promise<Response>((resolve, reject) => {
02706357 385 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
386 resolve,
387 reject,
20dcad1a 388 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
389 })
390 })
ff733df7
JB
391 if (
392 this.opts.enableTasksQueue === true &&
7171d33f 393 (this.busy ||
3528c992 394 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 395 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 396 .concurrency as number))
ff733df7 397 ) {
26a929d7
JB
398 this.enqueueTask(workerNodeKey, submittedTask)
399 } else {
2e81254d 400 this.executeTask(workerNodeKey, submittedTask)
adc3c320 401 }
b0d6ed8f 402 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 403 this.checkAndEmitEvents()
78cea37e 404 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
405 return res
406 }
c97c7edb 407
afc003b2 408 /** @inheritDoc */
c97c7edb 409 public async destroy (): Promise<void> {
1fbcaa7c 410 await Promise.all(
875a7c37
JB
411 this.workerNodes.map(async (workerNode, workerNodeKey) => {
412 this.flushTasksQueue(workerNodeKey)
47aacbaa 413 // FIXME: wait for tasks to be finished
f06e48d8 414 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
415 })
416 )
c97c7edb
S
417 }
418
4a6952ff 419 /**
f06e48d8 420 * Shutdowns the given worker.
4a6952ff 421 *
f06e48d8 422 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
423 */
424 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 425
729c563d 426 /**
2e81254d 427 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 428 * Can be overridden
afc003b2
JB
429 *
430 * @virtual
729c563d 431 */
280c2a77 432 protected setupHook (): void {
d99ba5a8 433 // Intentionally empty
280c2a77 434 }
c97c7edb 435
729c563d 436 /**
280c2a77
S
437 * Should return whether the worker is the main worker or not.
438 */
439 protected abstract isMain (): boolean
440
441 /**
2e81254d 442 * Hook executed before the worker task execution.
bf9549ae 443 * Can be overridden.
729c563d 444 *
f06e48d8 445 * @param workerNodeKey - The worker node key.
729c563d 446 */
f20f344f 447 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 448 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
449 }
450
c01733f1 451 /**
2e81254d 452 * Hook executed after the worker task execution.
bf9549ae 453 * Can be overridden.
c01733f1 454 *
c923ce56 455 * @param worker - The worker.
38e795c1 456 * @param message - The received message.
c01733f1 457 */
2e81254d 458 protected afterTaskExecutionHook (
c923ce56 459 worker: Worker,
2740a743 460 message: MessageValue<Response>
bf9549ae 461 ): void {
f8eb0a2a
JB
462 const workerTasksUsage =
463 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a
JB
464 --workerTasksUsage.running
465 ++workerTasksUsage.run
2740a743
JB
466 if (message.error != null) {
467 ++workerTasksUsage.error
468 }
f8eb0a2a 469 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 470 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
471 }
472
473 private updateRunTimeTasksUsage (
474 workerTasksUsage: TasksUsage,
475 message: MessageValue<Response>
476 ): void {
97a2abc3 477 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 478 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
479 if (
480 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
481 workerTasksUsage.run !== 0
482 ) {
3032893a
JB
483 workerTasksUsage.avgRunTime =
484 workerTasksUsage.runTime / workerTasksUsage.run
485 }
3fa4cdd2
JB
486 if (
487 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
488 message.runTime != null
489 ) {
490 workerTasksUsage.runTimeHistory.push(message.runTime)
78099a15
JB
491 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
492 }
3032893a 493 }
f8eb0a2a
JB
494 }
495
74001280 496 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
497 workerTasksUsage: TasksUsage,
498 message: MessageValue<Response>
499 ): void {
09a6305f
JB
500 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
501 workerTasksUsage.waitTime += message.waitTime ?? 0
502 if (
503 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
504 workerTasksUsage.run !== 0
505 ) {
506 workerTasksUsage.avgWaitTime =
507 workerTasksUsage.waitTime / workerTasksUsage.run
508 }
509 if (
510 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
511 message.waitTime != null
512 ) {
513 workerTasksUsage.waitTimeHistory.push(message.waitTime)
514 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
515 }
0567595a 516 }
c01733f1 517 }
518
280c2a77 519 /**
f06e48d8 520 * Chooses a worker node for the next task.
280c2a77 521 *
20dcad1a 522 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 523 *
20dcad1a 524 * @returns The worker node key
280c2a77 525 */
20dcad1a 526 protected chooseWorkerNode (): number {
f06e48d8 527 let workerNodeKey: number
6b27d407 528 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
529 const workerCreated = this.createAndSetupWorker()
530 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 531 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
532 if (
533 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 534 (message.kill != null &&
a4958de2 535 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 536 ) {
ff733df7 537 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 538 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 539 // FIXME: wait for tasks to be finished
7c5a1080 540 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
541 }
542 })
adc3c320 543 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 544 } else {
f06e48d8 545 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 546 }
20dcad1a 547 return workerNodeKey
c97c7edb
S
548 }
549
280c2a77 550 /**
675bb809 551 * Sends a message to the given worker.
280c2a77 552 *
38e795c1
JB
553 * @param worker - The worker which should receive the message.
554 * @param message - The message.
280c2a77
S
555 */
556 protected abstract sendToWorker (
557 worker: Worker,
558 message: MessageValue<Data>
559 ): void
560
4a6952ff 561 /**
f06e48d8 562 * Registers a listener callback on the given worker.
4a6952ff 563 *
38e795c1
JB
564 * @param worker - The worker which should register a listener.
565 * @param listener - The message listener callback.
4a6952ff
JB
566 */
567 protected abstract registerWorkerMessageListener<
4f7fa42a 568 Message extends Data | Response
78cea37e 569 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 570
729c563d
S
571 /**
572 * Returns a newly created worker.
573 */
280c2a77 574 protected abstract createWorker (): Worker
c97c7edb 575
729c563d 576 /**
f06e48d8 577 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 578 *
38e795c1 579 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 580 *
38e795c1 581 * @param worker - The newly created worker.
729c563d 582 */
280c2a77 583 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 584
4a6952ff 585 /**
f06e48d8 586 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
587 *
588 * @returns New, completely set up worker.
589 */
590 protected createAndSetupWorker (): Worker {
bdacc2d2 591 const worker = this.createWorker()
280c2a77 592
35cf1c03 593 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 594 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
595 worker.on('error', error => {
596 if (this.emitter != null) {
597 this.emitter.emit(PoolEvents.error, error)
598 }
599 })
600 if (this.opts.restartWorkerOnError === true) {
601 worker.on('error', () => {
602 this.createAndSetupWorker()
603 })
604 }
a35560ba
S
605 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
606 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 607 worker.once('exit', () => {
f06e48d8 608 this.removeWorkerNode(worker)
a974afa6 609 })
280c2a77 610
f06e48d8 611 this.pushWorkerNode(worker)
280c2a77
S
612
613 this.afterWorkerSetup(worker)
614
c97c7edb
S
615 return worker
616 }
be0676b3
APA
617
618 /**
ff733df7 619 * This function is the listener registered for each worker message.
be0676b3 620 *
bdacc2d2 621 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
622 */
623 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 624 return message => {
b1989cfd 625 if (message.id != null) {
a3445496 626 // Task execution response received
2740a743 627 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 628 if (promiseResponse != null) {
78cea37e 629 if (message.error != null) {
2740a743 630 promiseResponse.reject(message.error)
91ee39ed
JB
631 if (this.emitter != null) {
632 this.emitter.emit(PoolEvents.taskError, {
633 error: message.error,
634 errorData: message.errorData
635 })
636 }
a05c10de 637 } else {
2740a743 638 promiseResponse.resolve(message.data as Response)
a05c10de 639 }
2e81254d 640 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 641 this.promiseResponseMap.delete(message.id)
ff733df7
JB
642 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
643 if (
644 this.opts.enableTasksQueue === true &&
416fd65c 645 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 646 ) {
2e81254d
JB
647 this.executeTask(
648 workerNodeKey,
ff733df7
JB
649 this.dequeueTask(workerNodeKey) as Task<Data>
650 )
651 }
be0676b3
APA
652 }
653 }
654 }
be0676b3 655 }
7c0ba920 656
ff733df7 657 private checkAndEmitEvents (): void {
1f68cede 658 if (this.emitter != null) {
ff733df7 659 if (this.busy) {
6b27d407 660 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 661 }
6b27d407
JB
662 if (this.type === PoolTypes.dynamic && this.full) {
663 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 664 }
164d950a
JB
665 }
666 }
667
0ebe2a9f
JB
668 /**
669 * Sets the given worker node its tasks usage in the pool.
670 *
671 * @param workerNode - The worker node.
672 * @param tasksUsage - The worker node tasks usage.
673 */
674 private setWorkerNodeTasksUsage (
675 workerNode: WorkerNode<Worker, Data>,
676 tasksUsage: TasksUsage
677 ): void {
678 workerNode.tasksUsage = tasksUsage
679 }
680
a05c10de 681 /**
f06e48d8 682 * Pushes the given worker in the pool worker nodes.
ea7a90d3 683 *
38e795c1 684 * @param worker - The worker.
f06e48d8 685 * @returns The worker nodes length.
ea7a90d3 686 */
f06e48d8
JB
687 private pushWorkerNode (worker: Worker): number {
688 return this.workerNodes.push({
ffcbbad8 689 worker,
f82cd357
JB
690 tasksUsage: {
691 run: 0,
692 running: 0,
693 runTime: 0,
694 runTimeHistory: new CircularArray(),
695 avgRunTime: 0,
696 medRunTime: 0,
0567595a
JB
697 waitTime: 0,
698 waitTimeHistory: new CircularArray(),
699 avgWaitTime: 0,
700 medWaitTime: 0,
f82cd357
JB
701 error: 0
702 },
29ee7e9a 703 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
704 })
705 }
c923ce56
JB
706
707 /**
f06e48d8 708 * Sets the given worker in the pool worker nodes.
c923ce56 709 *
f06e48d8 710 * @param workerNodeKey - The worker node key.
c923ce56
JB
711 * @param worker - The worker.
712 * @param tasksUsage - The worker tasks usage.
f06e48d8 713 * @param tasksQueue - The worker task queue.
c923ce56 714 */
f06e48d8
JB
715 private setWorkerNode (
716 workerNodeKey: number,
c923ce56 717 worker: Worker,
f06e48d8 718 tasksUsage: TasksUsage,
29ee7e9a 719 tasksQueue: Queue<Task<Data>>
c923ce56 720 ): void {
f06e48d8 721 this.workerNodes[workerNodeKey] = {
c923ce56 722 worker,
f06e48d8
JB
723 tasksUsage,
724 tasksQueue
c923ce56
JB
725 }
726 }
51fe3d3c
JB
727
728 /**
f06e48d8 729 * Removes the given worker from the pool worker nodes.
51fe3d3c 730 *
f06e48d8 731 * @param worker - The worker.
51fe3d3c 732 */
416fd65c 733 private removeWorkerNode (worker: Worker): void {
f06e48d8 734 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
735 if (workerNodeKey !== -1) {
736 this.workerNodes.splice(workerNodeKey, 1)
737 this.workerChoiceStrategyContext.remove(workerNodeKey)
738 }
51fe3d3c 739 }
adc3c320 740
2e81254d 741 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 742 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
743 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
744 }
745
f9f00b5f 746 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 747 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
748 }
749
416fd65c 750 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 751 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
752 }
753
416fd65c 754 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 755 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 756 }
ff733df7 757
416fd65c
JB
758 private flushTasksQueue (workerNodeKey: number): void {
759 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
760 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
761 this.executeTask(
762 workerNodeKey,
763 this.dequeueTask(workerNodeKey) as Task<Data>
764 )
ff733df7 765 }
ff733df7
JB
766 }
767 }
768
ef41a6e6
JB
769 private flushTasksQueues (): void {
770 for (const [workerNodeKey] of this.workerNodes.entries()) {
771 this.flushTasksQueue(workerNodeKey)
772 }
773 }
c97c7edb 774}