docs: fix GH project version
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
2740a743 2import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
3import {
4 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
5 EMPTY_FUNCTION,
0d80593b 6 isPlainObject,
bbeadd16
JB
7 median
8} from '../utils'
34a0cfab 9import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 10import { CircularArray } from '../circular-array'
29ee7e9a 11import { Queue } from '../queue'
c4855468 12import {
65d7a1c9 13 type IPool,
7c5a1080 14 PoolEmitter,
c4855468 15 PoolEvents,
6b27d407 16 type PoolInfo,
c4855468 17 type PoolOptions,
6b27d407
JB
18 type PoolType,
19 PoolTypes,
184855e6
JB
20 type TasksQueueOptions,
21 type WorkerType
c4855468 22} from './pool'
f06e48d8 23import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
24import {
25 WorkerChoiceStrategies,
a20f0ba5
JB
26 type WorkerChoiceStrategy,
27 type WorkerChoiceStrategyOptions
bdaf31cd
JB
28} from './selection-strategies/selection-strategies-types'
29import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 30
729c563d 31/**
ea7a90d3 32 * Base class that implements some shared logic for all poolifier pools.
729c563d 33 *
38e795c1
JB
34 * @typeParam Worker - Type of worker which manages this pool.
35 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 36 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 37 */
c97c7edb 38export abstract class AbstractPool<
f06e48d8 39 Worker extends IWorker,
d3c8a1a8
S
40 Data = unknown,
41 Response = unknown
c4855468 42> implements IPool<Worker, Data, Response> {
afc003b2 43 /** @inheritDoc */
f06e48d8 44 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 45
afc003b2 46 /** @inheritDoc */
7c0ba920
JB
47 public readonly emitter?: PoolEmitter
48
be0676b3 49 /**
a3445496 50 * The execution response promise map.
be0676b3 51 *
2740a743 52 * - `key`: The message id of each submitted task.
a3445496 53 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 54 *
a3445496 55 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 56 */
c923ce56
JB
57 protected promiseResponseMap: Map<
58 string,
59 PromiseResponseWrapper<Worker, Response>
60 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 61
a35560ba 62 /**
51fe3d3c 63 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 64 *
51fe3d3c 65 * Default to a round robin algorithm.
a35560ba
S
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
68 Worker,
69 Data,
70 Response
a35560ba
S
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
38e795c1 76 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 77 * @param filePath - Path to the worker file.
38e795c1 78 * @param opts - Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
b4213b7f
JB
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
78cea37e 85 if (!this.isMain()) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
1086026a 91
7254e419
JB
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 96
c97c7edb
S
97 this.setupHook()
98
5c5a1fb7 99 for (let i = 1; i <= this.numberOfWorkers; i++) {
280c2a77 100 this.createAndSetupWorker()
c97c7edb
S
101 }
102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
c97c7edb
S
115 }
116
a35560ba 117 private checkFilePath (filePath: string): void {
ffcbbad8
JB
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
c510fea7
APA
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
78cea37e 131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 132 throw new TypeError(
0d80593b 133 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
134 )
135 } else if (numberOfWorkers < 0) {
473c717a 136 throw new RangeError(
8d3782fa
JB
137 'Cannot instantiate a pool with a negative number of workers'
138 )
6b27d407 139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
1f68cede 155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 168 }
aee46736
JB
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 175 throw new Error(
aee46736 176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
177 )
178 }
7c0ba920
JB
179 }
180
0d80593b
JB
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
49be33fe
JB
189 if (
190 workerChoiceStrategyOptions.weights != null &&
6b27d407 191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
0d80593b
JB
197 }
198
a20f0ba5
JB
199 private checkValidTasksQueueOptions (
200 tasksQueueOptions: TasksQueueOptions
201 ): void {
0d80593b
JB
202 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
203 throw new TypeError('Invalid tasks queue options: must be a plain object')
204 }
a20f0ba5
JB
205 if ((tasksQueueOptions?.concurrency as number) <= 0) {
206 throw new Error(
207 `Invalid worker tasks concurrency '${
208 tasksQueueOptions.concurrency as number
209 }'`
210 )
211 }
212 }
213
08f3f44c 214 /** @inheritDoc */
6b27d407
JB
215 public get info (): PoolInfo {
216 return {
217 type: this.type,
184855e6 218 worker: this.worker,
6b27d407
JB
219 minSize: this.minSize,
220 maxSize: this.maxSize,
221 workerNodes: this.workerNodes.length,
222 idleWorkerNodes: this.workerNodes.reduce(
223 (accumulator, workerNode) =>
224 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
225 0
226 ),
227 busyWorkerNodes: this.workerNodes.reduce(
228 (accumulator, workerNode) =>
229 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
230 0
231 ),
232 runningTasks: this.workerNodes.reduce(
233 (accumulator, workerNode) =>
234 accumulator + workerNode.tasksUsage.running,
235 0
236 ),
237 queuedTasks: this.workerNodes.reduce(
238 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
239 0
240 ),
241 maxQueuedTasks: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
243 accumulator + workerNode.tasksQueue.maxSize,
244 0
245 )
246 }
247 }
08f3f44c 248
8881ae32
JB
249 /**
250 * Pool type.
251 *
252 * If it is `'dynamic'`, it provides the `max` property.
253 */
254 protected abstract get type (): PoolType
255
184855e6
JB
256 /**
257 * Gets the worker type.
258 */
259 protected abstract get worker (): WorkerType
260
c2ade475 261 /**
6b27d407 262 * Pool minimum size.
c2ade475 263 */
6b27d407 264 protected abstract get minSize (): number
ff733df7
JB
265
266 /**
6b27d407 267 * Pool maximum size.
ff733df7 268 */
6b27d407 269 protected abstract get maxSize (): number
a35560ba 270
ffcbbad8 271 /**
f06e48d8 272 * Gets the given worker its worker node key.
ffcbbad8
JB
273 *
274 * @param worker - The worker.
f06e48d8 275 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 276 */
f06e48d8
JB
277 private getWorkerNodeKey (worker: Worker): number {
278 return this.workerNodes.findIndex(
279 workerNode => workerNode.worker === worker
280 )
bf9549ae
JB
281 }
282
afc003b2 283 /** @inheritDoc */
a35560ba 284 public setWorkerChoiceStrategy (
59219cbb
JB
285 workerChoiceStrategy: WorkerChoiceStrategy,
286 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 287 ): void {
aee46736 288 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 289 this.opts.workerChoiceStrategy = workerChoiceStrategy
0ebe2a9f 290 for (const workerNode of this.workerNodes) {
f82cd357 291 this.setWorkerNodeTasksUsage(workerNode, {
1ab50fe5 292 ran: 0,
f82cd357
JB
293 running: 0,
294 runTime: 0,
295 runTimeHistory: new CircularArray(),
296 avgRunTime: 0,
297 medRunTime: 0,
0567595a
JB
298 waitTime: 0,
299 waitTimeHistory: new CircularArray(),
300 avgWaitTime: 0,
301 medWaitTime: 0,
f82cd357
JB
302 error: 0
303 })
ea7a90d3 304 }
a35560ba 305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
a20f0ba5
JB
306 this.opts.workerChoiceStrategy
307 )
59219cbb
JB
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
a20f0ba5
JB
311 }
312
313 /** @inheritDoc */
314 public setWorkerChoiceStrategyOptions (
315 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
316 ): void {
0d80593b 317 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
318 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
319 this.workerChoiceStrategyContext.setOptions(
320 this.opts.workerChoiceStrategyOptions
a35560ba
S
321 )
322 }
323
a20f0ba5 324 /** @inheritDoc */
8f52842f
JB
325 public enableTasksQueue (
326 enable: boolean,
327 tasksQueueOptions?: TasksQueueOptions
328 ): void {
a20f0ba5 329 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 330 this.flushTasksQueues()
a20f0ba5
JB
331 }
332 this.opts.enableTasksQueue = enable
8f52842f 333 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
334 }
335
336 /** @inheritDoc */
8f52842f 337 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 338 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
339 this.checkValidTasksQueueOptions(tasksQueueOptions)
340 this.opts.tasksQueueOptions =
341 this.buildTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
342 } else {
343 delete this.opts.tasksQueueOptions
344 }
345 }
346
347 private buildTasksQueueOptions (
348 tasksQueueOptions: TasksQueueOptions
349 ): TasksQueueOptions {
350 return {
351 concurrency: tasksQueueOptions?.concurrency ?? 1
352 }
353 }
354
c319c66b
JB
355 /**
356 * Whether the pool is full or not.
357 *
358 * The pool filling boolean status.
359 */
dea903a8
JB
360 protected get full (): boolean {
361 return this.workerNodes.length >= this.maxSize
362 }
c2ade475 363
c319c66b
JB
364 /**
365 * Whether the pool is busy or not.
366 *
367 * The pool busyness boolean status.
368 */
369 protected abstract get busy (): boolean
7c0ba920 370
c2ade475 371 protected internalBusy (): boolean {
e0ae6100
JB
372 return (
373 this.workerNodes.findIndex(workerNode => {
a4958de2 374 return workerNode.tasksUsage.running === 0
e0ae6100
JB
375 }) === -1
376 )
cb70b19d
JB
377 }
378
afc003b2 379 /** @inheritDoc */
a86b6df1 380 public async execute (data?: Data, name?: string): Promise<Response> {
0567595a 381 const submissionTimestamp = performance.now()
20dcad1a 382 const workerNodeKey = this.chooseWorkerNode()
adc3c320 383 const submittedTask: Task<Data> = {
a86b6df1 384 name,
e5a5c0fc
JB
385 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
386 data: data ?? ({} as Data),
0567595a 387 submissionTimestamp,
adc3c320
JB
388 id: crypto.randomUUID()
389 }
2e81254d 390 const res = new Promise<Response>((resolve, reject) => {
02706357 391 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
392 resolve,
393 reject,
20dcad1a 394 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
395 })
396 })
ff733df7
JB
397 if (
398 this.opts.enableTasksQueue === true &&
7171d33f 399 (this.busy ||
3528c992 400 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 401 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 402 .concurrency as number))
ff733df7 403 ) {
26a929d7
JB
404 this.enqueueTask(workerNodeKey, submittedTask)
405 } else {
2e81254d 406 this.executeTask(workerNodeKey, submittedTask)
adc3c320 407 }
b0d6ed8f 408 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 409 this.checkAndEmitEvents()
78cea37e 410 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
411 return res
412 }
c97c7edb 413
afc003b2 414 /** @inheritDoc */
c97c7edb 415 public async destroy (): Promise<void> {
1fbcaa7c 416 await Promise.all(
875a7c37
JB
417 this.workerNodes.map(async (workerNode, workerNodeKey) => {
418 this.flushTasksQueue(workerNodeKey)
47aacbaa 419 // FIXME: wait for tasks to be finished
f06e48d8 420 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
421 })
422 )
c97c7edb
S
423 }
424
4a6952ff 425 /**
f06e48d8 426 * Shutdowns the given worker.
4a6952ff 427 *
f06e48d8 428 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
429 */
430 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 431
729c563d 432 /**
2e81254d 433 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 434 * Can be overridden
afc003b2
JB
435 *
436 * @virtual
729c563d 437 */
280c2a77 438 protected setupHook (): void {
d99ba5a8 439 // Intentionally empty
280c2a77 440 }
c97c7edb 441
729c563d 442 /**
280c2a77
S
443 * Should return whether the worker is the main worker or not.
444 */
445 protected abstract isMain (): boolean
446
447 /**
2e81254d 448 * Hook executed before the worker task execution.
bf9549ae 449 * Can be overridden.
729c563d 450 *
f06e48d8 451 * @param workerNodeKey - The worker node key.
729c563d 452 */
f20f344f 453 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 454 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
455 }
456
c01733f1 457 /**
2e81254d 458 * Hook executed after the worker task execution.
bf9549ae 459 * Can be overridden.
c01733f1 460 *
c923ce56 461 * @param worker - The worker.
38e795c1 462 * @param message - The received message.
c01733f1 463 */
2e81254d 464 protected afterTaskExecutionHook (
c923ce56 465 worker: Worker,
2740a743 466 message: MessageValue<Response>
bf9549ae 467 ): void {
f8eb0a2a
JB
468 const workerTasksUsage =
469 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a 470 --workerTasksUsage.running
1ab50fe5 471 ++workerTasksUsage.ran
2740a743
JB
472 if (message.error != null) {
473 ++workerTasksUsage.error
474 }
f8eb0a2a 475 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 476 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
477 }
478
479 private updateRunTimeTasksUsage (
480 workerTasksUsage: TasksUsage,
481 message: MessageValue<Response>
482 ): void {
97a2abc3 483 if (this.workerChoiceStrategyContext.getRequiredStatistics().runTime) {
aee46736 484 workerTasksUsage.runTime += message.runTime ?? 0
c6bd2650
JB
485 if (
486 this.workerChoiceStrategyContext.getRequiredStatistics().avgRunTime &&
1ab50fe5 487 workerTasksUsage.ran !== 0
c6bd2650 488 ) {
3032893a 489 workerTasksUsage.avgRunTime =
1ab50fe5 490 workerTasksUsage.runTime / workerTasksUsage.ran
3032893a 491 }
3fa4cdd2
JB
492 if (
493 this.workerChoiceStrategyContext.getRequiredStatistics().medRunTime &&
494 message.runTime != null
495 ) {
496 workerTasksUsage.runTimeHistory.push(message.runTime)
78099a15
JB
497 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
498 }
3032893a 499 }
f8eb0a2a
JB
500 }
501
74001280 502 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
503 workerTasksUsage: TasksUsage,
504 message: MessageValue<Response>
505 ): void {
09a6305f
JB
506 if (this.workerChoiceStrategyContext.getRequiredStatistics().waitTime) {
507 workerTasksUsage.waitTime += message.waitTime ?? 0
508 if (
509 this.workerChoiceStrategyContext.getRequiredStatistics().avgWaitTime &&
1ab50fe5 510 workerTasksUsage.ran !== 0
09a6305f
JB
511 ) {
512 workerTasksUsage.avgWaitTime =
1ab50fe5 513 workerTasksUsage.waitTime / workerTasksUsage.ran
09a6305f
JB
514 }
515 if (
516 this.workerChoiceStrategyContext.getRequiredStatistics().medWaitTime &&
517 message.waitTime != null
518 ) {
519 workerTasksUsage.waitTimeHistory.push(message.waitTime)
520 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
521 }
0567595a 522 }
c01733f1 523 }
524
280c2a77 525 /**
f06e48d8 526 * Chooses a worker node for the next task.
280c2a77 527 *
20dcad1a 528 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 529 *
20dcad1a 530 * @returns The worker node key
280c2a77 531 */
20dcad1a 532 protected chooseWorkerNode (): number {
f06e48d8 533 let workerNodeKey: number
6b27d407 534 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
535 const workerCreated = this.createAndSetupWorker()
536 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 537 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
538 if (
539 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 540 (message.kill != null &&
a4958de2 541 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 542 ) {
ff733df7 543 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 544 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 545 // FIXME: wait for tasks to be finished
7c5a1080 546 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
547 }
548 })
adc3c320 549 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 550 } else {
f06e48d8 551 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 552 }
20dcad1a 553 return workerNodeKey
c97c7edb
S
554 }
555
280c2a77 556 /**
675bb809 557 * Sends a message to the given worker.
280c2a77 558 *
38e795c1
JB
559 * @param worker - The worker which should receive the message.
560 * @param message - The message.
280c2a77
S
561 */
562 protected abstract sendToWorker (
563 worker: Worker,
564 message: MessageValue<Data>
565 ): void
566
4a6952ff 567 /**
f06e48d8 568 * Registers a listener callback on the given worker.
4a6952ff 569 *
38e795c1
JB
570 * @param worker - The worker which should register a listener.
571 * @param listener - The message listener callback.
4a6952ff
JB
572 */
573 protected abstract registerWorkerMessageListener<
4f7fa42a 574 Message extends Data | Response
78cea37e 575 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 576
729c563d
S
577 /**
578 * Returns a newly created worker.
579 */
280c2a77 580 protected abstract createWorker (): Worker
c97c7edb 581
729c563d 582 /**
f06e48d8 583 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 584 *
38e795c1 585 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 586 *
38e795c1 587 * @param worker - The newly created worker.
729c563d 588 */
280c2a77 589 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 590
4a6952ff 591 /**
f06e48d8 592 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
593 *
594 * @returns New, completely set up worker.
595 */
596 protected createAndSetupWorker (): Worker {
bdacc2d2 597 const worker = this.createWorker()
280c2a77 598
35cf1c03 599 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 600 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
601 worker.on('error', error => {
602 if (this.emitter != null) {
603 this.emitter.emit(PoolEvents.error, error)
604 }
605 })
606 if (this.opts.restartWorkerOnError === true) {
607 worker.on('error', () => {
608 this.createAndSetupWorker()
609 })
610 }
a35560ba
S
611 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
612 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 613 worker.once('exit', () => {
f06e48d8 614 this.removeWorkerNode(worker)
a974afa6 615 })
280c2a77 616
f06e48d8 617 this.pushWorkerNode(worker)
280c2a77
S
618
619 this.afterWorkerSetup(worker)
620
c97c7edb
S
621 return worker
622 }
be0676b3
APA
623
624 /**
ff733df7 625 * This function is the listener registered for each worker message.
be0676b3 626 *
bdacc2d2 627 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
628 */
629 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 630 return message => {
b1989cfd 631 if (message.id != null) {
a3445496 632 // Task execution response received
2740a743 633 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 634 if (promiseResponse != null) {
78cea37e 635 if (message.error != null) {
2740a743 636 promiseResponse.reject(message.error)
91ee39ed
JB
637 if (this.emitter != null) {
638 this.emitter.emit(PoolEvents.taskError, {
639 error: message.error,
640 errorData: message.errorData
641 })
642 }
a05c10de 643 } else {
2740a743 644 promiseResponse.resolve(message.data as Response)
a05c10de 645 }
2e81254d 646 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 647 this.promiseResponseMap.delete(message.id)
ff733df7
JB
648 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
649 if (
650 this.opts.enableTasksQueue === true &&
416fd65c 651 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 652 ) {
2e81254d
JB
653 this.executeTask(
654 workerNodeKey,
ff733df7
JB
655 this.dequeueTask(workerNodeKey) as Task<Data>
656 )
657 }
be0676b3
APA
658 }
659 }
660 }
be0676b3 661 }
7c0ba920 662
ff733df7 663 private checkAndEmitEvents (): void {
1f68cede 664 if (this.emitter != null) {
ff733df7 665 if (this.busy) {
6b27d407 666 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 667 }
6b27d407
JB
668 if (this.type === PoolTypes.dynamic && this.full) {
669 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 670 }
164d950a
JB
671 }
672 }
673
0ebe2a9f
JB
674 /**
675 * Sets the given worker node its tasks usage in the pool.
676 *
677 * @param workerNode - The worker node.
678 * @param tasksUsage - The worker node tasks usage.
679 */
680 private setWorkerNodeTasksUsage (
681 workerNode: WorkerNode<Worker, Data>,
682 tasksUsage: TasksUsage
683 ): void {
684 workerNode.tasksUsage = tasksUsage
685 }
686
a05c10de 687 /**
f06e48d8 688 * Pushes the given worker in the pool worker nodes.
ea7a90d3 689 *
38e795c1 690 * @param worker - The worker.
f06e48d8 691 * @returns The worker nodes length.
ea7a90d3 692 */
f06e48d8
JB
693 private pushWorkerNode (worker: Worker): number {
694 return this.workerNodes.push({
ffcbbad8 695 worker,
f82cd357 696 tasksUsage: {
1ab50fe5 697 ran: 0,
f82cd357
JB
698 running: 0,
699 runTime: 0,
700 runTimeHistory: new CircularArray(),
701 avgRunTime: 0,
702 medRunTime: 0,
0567595a
JB
703 waitTime: 0,
704 waitTimeHistory: new CircularArray(),
705 avgWaitTime: 0,
706 medWaitTime: 0,
f82cd357
JB
707 error: 0
708 },
29ee7e9a 709 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
710 })
711 }
c923ce56
JB
712
713 /**
f06e48d8 714 * Sets the given worker in the pool worker nodes.
c923ce56 715 *
f06e48d8 716 * @param workerNodeKey - The worker node key.
c923ce56
JB
717 * @param worker - The worker.
718 * @param tasksUsage - The worker tasks usage.
f06e48d8 719 * @param tasksQueue - The worker task queue.
c923ce56 720 */
f06e48d8
JB
721 private setWorkerNode (
722 workerNodeKey: number,
c923ce56 723 worker: Worker,
f06e48d8 724 tasksUsage: TasksUsage,
29ee7e9a 725 tasksQueue: Queue<Task<Data>>
c923ce56 726 ): void {
f06e48d8 727 this.workerNodes[workerNodeKey] = {
c923ce56 728 worker,
f06e48d8
JB
729 tasksUsage,
730 tasksQueue
c923ce56
JB
731 }
732 }
51fe3d3c
JB
733
734 /**
f06e48d8 735 * Removes the given worker from the pool worker nodes.
51fe3d3c 736 *
f06e48d8 737 * @param worker - The worker.
51fe3d3c 738 */
416fd65c 739 private removeWorkerNode (worker: Worker): void {
f06e48d8 740 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
741 if (workerNodeKey !== -1) {
742 this.workerNodes.splice(workerNodeKey, 1)
743 this.workerChoiceStrategyContext.remove(workerNodeKey)
744 }
51fe3d3c 745 }
adc3c320 746
2e81254d 747 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 748 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
749 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
750 }
751
f9f00b5f 752 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 753 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
754 }
755
416fd65c 756 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 757 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
758 }
759
416fd65c 760 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 761 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 762 }
ff733df7 763
416fd65c
JB
764 private flushTasksQueue (workerNodeKey: number): void {
765 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
766 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
767 this.executeTask(
768 workerNodeKey,
769 this.dequeueTask(workerNodeKey) as Task<Data>
770 )
ff733df7 771 }
ff733df7
JB
772 }
773 }
774
ef41a6e6
JB
775 private flushTasksQueues (): void {
776 for (const [workerNodeKey] of this.workerNodes.entries()) {
777 this.flushTasksQueue(workerNodeKey)
778 }
779 }
c97c7edb 780}