docs: add type definition documentation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
f06e48d8 24import type { IWorker, Task, TasksUsage, WorkerNode } from './worker'
a35560ba
S
25import {
26 WorkerChoiceStrategies,
a20f0ba5
JB
27 type WorkerChoiceStrategy,
28 type WorkerChoiceStrategyOptions
bdaf31cd
JB
29} from './selection-strategies/selection-strategies-types'
30import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 31
729c563d 32/**
ea7a90d3 33 * Base class that implements some shared logic for all poolifier pools.
729c563d 34 *
38e795c1
JB
35 * @typeParam Worker - Type of worker which manages this pool.
36 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 37 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 38 */
c97c7edb 39export abstract class AbstractPool<
f06e48d8 40 Worker extends IWorker,
d3c8a1a8
S
41 Data = unknown,
42 Response = unknown
c4855468 43> implements IPool<Worker, Data, Response> {
afc003b2 44 /** @inheritDoc */
f06e48d8 45 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 46
afc003b2 47 /** @inheritDoc */
7c0ba920
JB
48 public readonly emitter?: PoolEmitter
49
be0676b3 50 /**
a3445496 51 * The execution response promise map.
be0676b3 52 *
2740a743 53 * - `key`: The message id of each submitted task.
a3445496 54 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 55 *
a3445496 56 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 57 */
c923ce56
JB
58 protected promiseResponseMap: Map<
59 string,
60 PromiseResponseWrapper<Worker, Response>
61 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 62
a35560ba 63 /**
51fe3d3c 64 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 65 *
51fe3d3c 66 * Default to a round robin algorithm.
a35560ba
S
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
69 Worker,
70 Data,
71 Response
a35560ba
S
72 >
73
729c563d
S
74 /**
75 * Constructs a new poolifier pool.
76 *
38e795c1 77 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 78 * @param filePath - Path to the worker file.
38e795c1 79 * @param opts - Options for the pool.
729c563d 80 */
c97c7edb 81 public constructor (
b4213b7f
JB
82 protected readonly numberOfWorkers: number,
83 protected readonly filePath: string,
84 protected readonly opts: PoolOptions<Worker>
c97c7edb 85 ) {
78cea37e 86 if (!this.isMain()) {
c97c7edb
S
87 throw new Error('Cannot start a pool from a worker!')
88 }
8d3782fa 89 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 90 this.checkFilePath(this.filePath)
7c0ba920 91 this.checkPoolOptions(this.opts)
1086026a 92
7254e419
JB
93 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
94 this.executeTask = this.executeTask.bind(this)
95 this.enqueueTask = this.enqueueTask.bind(this)
96 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 97
6bd72cd0 98 if (this.opts.enableEvents === true) {
7c0ba920
JB
99 this.emitter = new PoolEmitter()
100 }
d59df138
JB
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
102 Worker,
103 Data,
104 Response
da309861
JB
105 >(
106 this,
107 this.opts.workerChoiceStrategy,
108 this.opts.workerChoiceStrategyOptions
109 )
b6b32453
JB
110
111 this.setupHook()
112
113 for (let i = 1; i <= this.numberOfWorkers; i++) {
114 this.createAndSetupWorker()
115 }
c97c7edb
S
116 }
117
a35560ba 118 private checkFilePath (filePath: string): void {
ffcbbad8
JB
119 if (
120 filePath == null ||
121 (typeof filePath === 'string' && filePath.trim().length === 0)
122 ) {
c510fea7
APA
123 throw new Error('Please specify a file with a worker implementation')
124 }
125 }
126
8d3782fa
JB
127 private checkNumberOfWorkers (numberOfWorkers: number): void {
128 if (numberOfWorkers == null) {
129 throw new Error(
130 'Cannot instantiate a pool without specifying the number of workers'
131 )
78cea37e 132 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 133 throw new TypeError(
0d80593b 134 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
135 )
136 } else if (numberOfWorkers < 0) {
473c717a 137 throw new RangeError(
8d3782fa
JB
138 'Cannot instantiate a pool with a negative number of workers'
139 )
6b27d407 140 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
141 throw new Error('Cannot instantiate a fixed pool with no worker')
142 }
143 }
144
7c0ba920 145 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
146 if (isPlainObject(opts)) {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
150 this.opts.workerChoiceStrategyOptions =
151 opts.workerChoiceStrategyOptions ??
152 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
153 this.checkValidWorkerChoiceStrategyOptions(
154 this.opts.workerChoiceStrategyOptions
155 )
1f68cede 156 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
157 this.opts.enableEvents = opts.enableEvents ?? true
158 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
159 if (this.opts.enableTasksQueue) {
160 this.checkValidTasksQueueOptions(
161 opts.tasksQueueOptions as TasksQueueOptions
162 )
163 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
164 opts.tasksQueueOptions as TasksQueueOptions
165 )
166 }
167 } else {
168 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 169 }
aee46736
JB
170 }
171
172 private checkValidWorkerChoiceStrategy (
173 workerChoiceStrategy: WorkerChoiceStrategy
174 ): void {
175 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 176 throw new Error(
aee46736 177 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
178 )
179 }
7c0ba920
JB
180 }
181
0d80593b
JB
182 private checkValidWorkerChoiceStrategyOptions (
183 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
184 ): void {
185 if (!isPlainObject(workerChoiceStrategyOptions)) {
186 throw new TypeError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
189 }
49be33fe
JB
190 if (
191 workerChoiceStrategyOptions.weights != null &&
6b27d407 192 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
193 ) {
194 throw new Error(
195 'Invalid worker choice strategy options: must have a weight for each worker node'
196 )
197 }
0d80593b
JB
198 }
199
a20f0ba5
JB
200 private checkValidTasksQueueOptions (
201 tasksQueueOptions: TasksQueueOptions
202 ): void {
0d80593b
JB
203 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
204 throw new TypeError('Invalid tasks queue options: must be a plain object')
205 }
a20f0ba5
JB
206 if ((tasksQueueOptions?.concurrency as number) <= 0) {
207 throw new Error(
208 `Invalid worker tasks concurrency '${
209 tasksQueueOptions.concurrency as number
210 }'`
211 )
212 }
213 }
214
08f3f44c 215 /** @inheritDoc */
6b27d407
JB
216 public get info (): PoolInfo {
217 return {
218 type: this.type,
184855e6 219 worker: this.worker,
6b27d407
JB
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
225 workerNode.tasksUsage.running === 0 ? accumulator + 1 : accumulator,
226 0
227 ),
228 busyWorkerNodes: this.workerNodes.reduce(
229 (accumulator, workerNode) =>
230 workerNode.tasksUsage.running > 0 ? accumulator + 1 : accumulator,
231 0
232 ),
233 runningTasks: this.workerNodes.reduce(
234 (accumulator, workerNode) =>
235 accumulator + workerNode.tasksUsage.running,
236 0
237 ),
238 queuedTasks: this.workerNodes.reduce(
239 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
240 0
241 ),
242 maxQueuedTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.tasksQueue.maxSize,
245 0
246 )
247 }
248 }
08f3f44c 249
8881ae32
JB
250 /**
251 * Pool type.
252 *
253 * If it is `'dynamic'`, it provides the `max` property.
254 */
255 protected abstract get type (): PoolType
256
184855e6
JB
257 /**
258 * Gets the worker type.
259 */
260 protected abstract get worker (): WorkerType
261
c2ade475 262 /**
6b27d407 263 * Pool minimum size.
c2ade475 264 */
6b27d407 265 protected abstract get minSize (): number
ff733df7
JB
266
267 /**
6b27d407 268 * Pool maximum size.
ff733df7 269 */
6b27d407 270 protected abstract get maxSize (): number
a35560ba 271
ffcbbad8 272 /**
f06e48d8 273 * Gets the given worker its worker node key.
ffcbbad8
JB
274 *
275 * @param worker - The worker.
f06e48d8 276 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 277 */
f06e48d8
JB
278 private getWorkerNodeKey (worker: Worker): number {
279 return this.workerNodes.findIndex(
280 workerNode => workerNode.worker === worker
281 )
bf9549ae
JB
282 }
283
afc003b2 284 /** @inheritDoc */
a35560ba 285 public setWorkerChoiceStrategy (
59219cbb
JB
286 workerChoiceStrategy: WorkerChoiceStrategy,
287 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 288 ): void {
aee46736 289 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 290 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
291 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
292 this.opts.workerChoiceStrategy
293 )
294 if (workerChoiceStrategyOptions != null) {
295 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
296 }
0ebe2a9f 297 for (const workerNode of this.workerNodes) {
f82cd357 298 this.setWorkerNodeTasksUsage(workerNode, {
1ab50fe5 299 ran: 0,
f82cd357
JB
300 running: 0,
301 runTime: 0,
302 runTimeHistory: new CircularArray(),
303 avgRunTime: 0,
304 medRunTime: 0,
0567595a
JB
305 waitTime: 0,
306 waitTimeHistory: new CircularArray(),
307 avgWaitTime: 0,
308 medWaitTime: 0,
62c15a68
JB
309 error: 0,
310 elu: undefined
f82cd357 311 })
b6b32453 312 this.setWorkerStatistics(workerNode.worker)
59219cbb 313 }
a20f0ba5
JB
314 }
315
316 /** @inheritDoc */
317 public setWorkerChoiceStrategyOptions (
318 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
319 ): void {
0d80593b 320 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
321 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
322 this.workerChoiceStrategyContext.setOptions(
323 this.opts.workerChoiceStrategyOptions
a35560ba
S
324 )
325 }
326
a20f0ba5 327 /** @inheritDoc */
8f52842f
JB
328 public enableTasksQueue (
329 enable: boolean,
330 tasksQueueOptions?: TasksQueueOptions
331 ): void {
a20f0ba5 332 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 333 this.flushTasksQueues()
a20f0ba5
JB
334 }
335 this.opts.enableTasksQueue = enable
8f52842f 336 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
337 }
338
339 /** @inheritDoc */
8f52842f 340 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 341 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
342 this.checkValidTasksQueueOptions(tasksQueueOptions)
343 this.opts.tasksQueueOptions =
344 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 345 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
346 delete this.opts.tasksQueueOptions
347 }
348 }
349
350 private buildTasksQueueOptions (
351 tasksQueueOptions: TasksQueueOptions
352 ): TasksQueueOptions {
353 return {
354 concurrency: tasksQueueOptions?.concurrency ?? 1
355 }
356 }
357
c319c66b
JB
358 /**
359 * Whether the pool is full or not.
360 *
361 * The pool filling boolean status.
362 */
dea903a8
JB
363 protected get full (): boolean {
364 return this.workerNodes.length >= this.maxSize
365 }
c2ade475 366
c319c66b
JB
367 /**
368 * Whether the pool is busy or not.
369 *
370 * The pool busyness boolean status.
371 */
372 protected abstract get busy (): boolean
7c0ba920 373
c2ade475 374 protected internalBusy (): boolean {
e0ae6100
JB
375 return (
376 this.workerNodes.findIndex(workerNode => {
a4958de2 377 return workerNode.tasksUsage.running === 0
e0ae6100
JB
378 }) === -1
379 )
cb70b19d
JB
380 }
381
afc003b2 382 /** @inheritDoc */
a86b6df1 383 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 384 const timestamp = performance.now()
20dcad1a 385 const workerNodeKey = this.chooseWorkerNode()
adc3c320 386 const submittedTask: Task<Data> = {
a86b6df1 387 name,
e5a5c0fc
JB
388 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
389 data: data ?? ({} as Data),
b6b32453 390 timestamp,
adc3c320
JB
391 id: crypto.randomUUID()
392 }
2e81254d 393 const res = new Promise<Response>((resolve, reject) => {
02706357 394 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
395 resolve,
396 reject,
20dcad1a 397 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
398 })
399 })
ff733df7
JB
400 if (
401 this.opts.enableTasksQueue === true &&
7171d33f 402 (this.busy ||
3528c992 403 this.workerNodes[workerNodeKey].tasksUsage.running >=
7171d33f 404 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 405 .concurrency as number))
ff733df7 406 ) {
26a929d7
JB
407 this.enqueueTask(workerNodeKey, submittedTask)
408 } else {
2e81254d 409 this.executeTask(workerNodeKey, submittedTask)
adc3c320 410 }
b0d6ed8f 411 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 412 this.checkAndEmitEvents()
78cea37e 413 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
414 return res
415 }
c97c7edb 416
afc003b2 417 /** @inheritDoc */
c97c7edb 418 public async destroy (): Promise<void> {
1fbcaa7c 419 await Promise.all(
875a7c37
JB
420 this.workerNodes.map(async (workerNode, workerNodeKey) => {
421 this.flushTasksQueue(workerNodeKey)
47aacbaa 422 // FIXME: wait for tasks to be finished
f06e48d8 423 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
424 })
425 )
c97c7edb
S
426 }
427
4a6952ff 428 /**
f06e48d8 429 * Shutdowns the given worker.
4a6952ff 430 *
f06e48d8 431 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
432 */
433 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 434
729c563d 435 /**
2e81254d 436 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 437 * Can be overridden
afc003b2
JB
438 *
439 * @virtual
729c563d 440 */
280c2a77 441 protected setupHook (): void {
d99ba5a8 442 // Intentionally empty
280c2a77 443 }
c97c7edb 444
729c563d 445 /**
280c2a77
S
446 * Should return whether the worker is the main worker or not.
447 */
448 protected abstract isMain (): boolean
449
450 /**
2e81254d 451 * Hook executed before the worker task execution.
bf9549ae 452 * Can be overridden.
729c563d 453 *
f06e48d8 454 * @param workerNodeKey - The worker node key.
729c563d 455 */
f20f344f 456 protected beforeTaskExecutionHook (workerNodeKey: number): void {
09a6305f 457 ++this.workerNodes[workerNodeKey].tasksUsage.running
c97c7edb
S
458 }
459
c01733f1 460 /**
2e81254d 461 * Hook executed after the worker task execution.
bf9549ae 462 * Can be overridden.
c01733f1 463 *
c923ce56 464 * @param worker - The worker.
38e795c1 465 * @param message - The received message.
c01733f1 466 */
2e81254d 467 protected afterTaskExecutionHook (
c923ce56 468 worker: Worker,
2740a743 469 message: MessageValue<Response>
bf9549ae 470 ): void {
f8eb0a2a
JB
471 const workerTasksUsage =
472 this.workerNodes[this.getWorkerNodeKey(worker)].tasksUsage
3032893a 473 --workerTasksUsage.running
1ab50fe5 474 ++workerTasksUsage.ran
82f36766 475 if (message.taskError != null) {
2740a743
JB
476 ++workerTasksUsage.error
477 }
f8eb0a2a 478 this.updateRunTimeTasksUsage(workerTasksUsage, message)
74001280 479 this.updateWaitTimeTasksUsage(workerTasksUsage, message)
62c15a68 480 this.updateEluTasksUsage(workerTasksUsage, message)
f8eb0a2a
JB
481 }
482
483 private updateRunTimeTasksUsage (
484 workerTasksUsage: TasksUsage,
485 message: MessageValue<Response>
486 ): void {
b6b32453 487 if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
d715b7bc 488 workerTasksUsage.runTime += message.taskPerformance?.runTime ?? 0
c6bd2650 489 if (
b6b32453 490 this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
1ab50fe5 491 workerTasksUsage.ran !== 0
c6bd2650 492 ) {
3032893a 493 workerTasksUsage.avgRunTime =
1ab50fe5 494 workerTasksUsage.runTime / workerTasksUsage.ran
3032893a 495 }
3fa4cdd2 496 if (
b6b32453 497 this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
d715b7bc 498 message.taskPerformance?.runTime != null
3fa4cdd2 499 ) {
d715b7bc 500 workerTasksUsage.runTimeHistory.push(message.taskPerformance.runTime)
78099a15
JB
501 workerTasksUsage.medRunTime = median(workerTasksUsage.runTimeHistory)
502 }
3032893a 503 }
f8eb0a2a
JB
504 }
505
74001280 506 private updateWaitTimeTasksUsage (
f8eb0a2a
JB
507 workerTasksUsage: TasksUsage,
508 message: MessageValue<Response>
509 ): void {
b6b32453 510 if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
d715b7bc 511 workerTasksUsage.waitTime += message.taskPerformance?.waitTime ?? 0
09a6305f 512 if (
b6b32453 513 this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
1ab50fe5 514 workerTasksUsage.ran !== 0
09a6305f
JB
515 ) {
516 workerTasksUsage.avgWaitTime =
1ab50fe5 517 workerTasksUsage.waitTime / workerTasksUsage.ran
09a6305f
JB
518 }
519 if (
b6b32453 520 this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
d715b7bc 521 message.taskPerformance?.waitTime != null
09a6305f 522 ) {
d715b7bc 523 workerTasksUsage.waitTimeHistory.push(message.taskPerformance.waitTime)
09a6305f
JB
524 workerTasksUsage.medWaitTime = median(workerTasksUsage.waitTimeHistory)
525 }
0567595a 526 }
c01733f1 527 }
528
62c15a68
JB
529 private updateEluTasksUsage (
530 workerTasksUsage: TasksUsage,
531 message: MessageValue<Response>
532 ): void {
b6b32453 533 if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
d715b7bc
JB
534 if (
535 workerTasksUsage.elu != null &&
536 message.taskPerformance?.elu != null
537 ) {
62c15a68 538 workerTasksUsage.elu = {
d715b7bc
JB
539 idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
540 active:
541 workerTasksUsage.elu.active + message.taskPerformance.elu.active,
62c15a68 542 utilization:
d715b7bc
JB
543 (workerTasksUsage.elu.utilization +
544 message.taskPerformance.elu.utilization) /
545 2
62c15a68 546 }
d715b7bc
JB
547 } else if (message.taskPerformance?.elu != null) {
548 workerTasksUsage.elu = message.taskPerformance.elu
62c15a68
JB
549 }
550 }
551 }
552
280c2a77 553 /**
f06e48d8 554 * Chooses a worker node for the next task.
280c2a77 555 *
20dcad1a 556 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 557 *
20dcad1a 558 * @returns The worker node key
280c2a77 559 */
20dcad1a 560 protected chooseWorkerNode (): number {
f06e48d8 561 let workerNodeKey: number
6b27d407 562 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
563 const workerCreated = this.createAndSetupWorker()
564 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 565 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
566 if (
567 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 568 (message.kill != null &&
a4958de2 569 this.workerNodes[currentWorkerNodeKey].tasksUsage.running === 0)
17393ac8 570 ) {
ff733df7 571 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 572 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 573 // FIXME: wait for tasks to be finished
7c5a1080 574 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
575 }
576 })
adc3c320 577 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 578 } else {
f06e48d8 579 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 580 }
20dcad1a 581 return workerNodeKey
c97c7edb
S
582 }
583
280c2a77 584 /**
675bb809 585 * Sends a message to the given worker.
280c2a77 586 *
38e795c1
JB
587 * @param worker - The worker which should receive the message.
588 * @param message - The message.
280c2a77
S
589 */
590 protected abstract sendToWorker (
591 worker: Worker,
592 message: MessageValue<Data>
593 ): void
594
4a6952ff 595 /**
f06e48d8 596 * Registers a listener callback on the given worker.
4a6952ff 597 *
38e795c1
JB
598 * @param worker - The worker which should register a listener.
599 * @param listener - The message listener callback.
4a6952ff
JB
600 */
601 protected abstract registerWorkerMessageListener<
4f7fa42a 602 Message extends Data | Response
78cea37e 603 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 604
729c563d
S
605 /**
606 * Returns a newly created worker.
607 */
280c2a77 608 protected abstract createWorker (): Worker
c97c7edb 609
729c563d 610 /**
f06e48d8 611 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 612 *
38e795c1 613 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 614 *
38e795c1 615 * @param worker - The newly created worker.
729c563d 616 */
280c2a77 617 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 618
4a6952ff 619 /**
f06e48d8 620 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
621 *
622 * @returns New, completely set up worker.
623 */
624 protected createAndSetupWorker (): Worker {
bdacc2d2 625 const worker = this.createWorker()
280c2a77 626
35cf1c03 627 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 628 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
629 worker.on('error', error => {
630 if (this.emitter != null) {
631 this.emitter.emit(PoolEvents.error, error)
632 }
633 })
5baee0d7
JB
634 worker.on('error', () => {
635 if (this.opts.restartWorkerOnError === true) {
1f68cede 636 this.createAndSetupWorker()
5baee0d7
JB
637 }
638 })
a35560ba
S
639 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
640 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 641 worker.once('exit', () => {
f06e48d8 642 this.removeWorkerNode(worker)
a974afa6 643 })
280c2a77 644
f06e48d8 645 this.pushWorkerNode(worker)
280c2a77 646
b6b32453
JB
647 this.setWorkerStatistics(worker)
648
280c2a77
S
649 this.afterWorkerSetup(worker)
650
c97c7edb
S
651 return worker
652 }
be0676b3
APA
653
654 /**
ff733df7 655 * This function is the listener registered for each worker message.
be0676b3 656 *
bdacc2d2 657 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
658 */
659 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 660 return message => {
b1989cfd 661 if (message.id != null) {
a3445496 662 // Task execution response received
2740a743 663 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 664 if (promiseResponse != null) {
82f36766
JB
665 if (message.taskError != null) {
666 promiseResponse.reject(message.taskError.message)
91ee39ed 667 if (this.emitter != null) {
82f36766 668 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 669 }
a05c10de 670 } else {
2740a743 671 promiseResponse.resolve(message.data as Response)
a05c10de 672 }
2e81254d 673 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 674 this.promiseResponseMap.delete(message.id)
ff733df7
JB
675 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
676 if (
677 this.opts.enableTasksQueue === true &&
416fd65c 678 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 679 ) {
2e81254d
JB
680 this.executeTask(
681 workerNodeKey,
ff733df7
JB
682 this.dequeueTask(workerNodeKey) as Task<Data>
683 )
684 }
be0676b3
APA
685 }
686 }
687 }
be0676b3 688 }
7c0ba920 689
ff733df7 690 private checkAndEmitEvents (): void {
1f68cede 691 if (this.emitter != null) {
ff733df7 692 if (this.busy) {
6b27d407 693 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 694 }
6b27d407
JB
695 if (this.type === PoolTypes.dynamic && this.full) {
696 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 697 }
164d950a
JB
698 }
699 }
700
0ebe2a9f
JB
701 /**
702 * Sets the given worker node its tasks usage in the pool.
703 *
704 * @param workerNode - The worker node.
705 * @param tasksUsage - The worker node tasks usage.
706 */
707 private setWorkerNodeTasksUsage (
708 workerNode: WorkerNode<Worker, Data>,
709 tasksUsage: TasksUsage
710 ): void {
711 workerNode.tasksUsage = tasksUsage
712 }
713
a05c10de 714 /**
f06e48d8 715 * Pushes the given worker in the pool worker nodes.
ea7a90d3 716 *
38e795c1 717 * @param worker - The worker.
f06e48d8 718 * @returns The worker nodes length.
ea7a90d3 719 */
f06e48d8
JB
720 private pushWorkerNode (worker: Worker): number {
721 return this.workerNodes.push({
ffcbbad8 722 worker,
f82cd357 723 tasksUsage: {
1ab50fe5 724 ran: 0,
f82cd357
JB
725 running: 0,
726 runTime: 0,
727 runTimeHistory: new CircularArray(),
728 avgRunTime: 0,
729 medRunTime: 0,
0567595a
JB
730 waitTime: 0,
731 waitTimeHistory: new CircularArray(),
732 avgWaitTime: 0,
733 medWaitTime: 0,
62c15a68
JB
734 error: 0,
735 elu: undefined
f82cd357 736 },
29ee7e9a 737 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
738 })
739 }
c923ce56
JB
740
741 /**
f06e48d8 742 * Sets the given worker in the pool worker nodes.
c923ce56 743 *
f06e48d8 744 * @param workerNodeKey - The worker node key.
c923ce56
JB
745 * @param worker - The worker.
746 * @param tasksUsage - The worker tasks usage.
f06e48d8 747 * @param tasksQueue - The worker task queue.
c923ce56 748 */
f06e48d8
JB
749 private setWorkerNode (
750 workerNodeKey: number,
c923ce56 751 worker: Worker,
f06e48d8 752 tasksUsage: TasksUsage,
29ee7e9a 753 tasksQueue: Queue<Task<Data>>
c923ce56 754 ): void {
f06e48d8 755 this.workerNodes[workerNodeKey] = {
c923ce56 756 worker,
f06e48d8
JB
757 tasksUsage,
758 tasksQueue
c923ce56
JB
759 }
760 }
51fe3d3c
JB
761
762 /**
f06e48d8 763 * Removes the given worker from the pool worker nodes.
51fe3d3c 764 *
f06e48d8 765 * @param worker - The worker.
51fe3d3c 766 */
416fd65c 767 private removeWorkerNode (worker: Worker): void {
f06e48d8 768 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
769 if (workerNodeKey !== -1) {
770 this.workerNodes.splice(workerNodeKey, 1)
771 this.workerChoiceStrategyContext.remove(workerNodeKey)
772 }
51fe3d3c 773 }
adc3c320 774
2e81254d 775 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 776 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
777 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
778 }
779
f9f00b5f 780 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 781 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
782 }
783
416fd65c 784 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 785 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
786 }
787
416fd65c 788 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 789 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 790 }
ff733df7 791
416fd65c
JB
792 private flushTasksQueue (workerNodeKey: number): void {
793 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
794 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
795 this.executeTask(
796 workerNodeKey,
797 this.dequeueTask(workerNodeKey) as Task<Data>
798 )
ff733df7 799 }
ff733df7
JB
800 }
801 }
802
ef41a6e6
JB
803 private flushTasksQueues (): void {
804 for (const [workerNodeKey] of this.workerNodes.entries()) {
805 this.flushTasksQueue(workerNodeKey)
806 }
807 }
b6b32453
JB
808
809 private setWorkerStatistics (worker: Worker): void {
810 this.sendToWorker(worker, {
811 statistics: {
812 runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime,
813 waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime,
814 elu: this.workerChoiceStrategyContext.getTaskStatistics().elu
815 }
816 })
817 }
c97c7edb 818}