Merge branch 'elu-strategy' of github.com:poolifier/poolifier into elu-strategy
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
a4e07f72 24import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
a35560ba
S
25import {
26 WorkerChoiceStrategies,
a20f0ba5
JB
27 type WorkerChoiceStrategy,
28 type WorkerChoiceStrategyOptions
bdaf31cd
JB
29} from './selection-strategies/selection-strategies-types'
30import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 31
729c563d 32/**
ea7a90d3 33 * Base class that implements some shared logic for all poolifier pools.
729c563d 34 *
38e795c1
JB
35 * @typeParam Worker - Type of worker which manages this pool.
36 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 37 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 38 */
c97c7edb 39export abstract class AbstractPool<
f06e48d8 40 Worker extends IWorker,
d3c8a1a8
S
41 Data = unknown,
42 Response = unknown
c4855468 43> implements IPool<Worker, Data, Response> {
afc003b2 44 /** @inheritDoc */
f06e48d8 45 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 46
afc003b2 47 /** @inheritDoc */
7c0ba920
JB
48 public readonly emitter?: PoolEmitter
49
be0676b3 50 /**
a3445496 51 * The execution response promise map.
be0676b3 52 *
2740a743 53 * - `key`: The message id of each submitted task.
a3445496 54 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 55 *
a3445496 56 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 57 */
c923ce56
JB
58 protected promiseResponseMap: Map<
59 string,
60 PromiseResponseWrapper<Worker, Response>
61 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 62
a35560ba 63 /**
51fe3d3c 64 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 65 *
51fe3d3c 66 * Default to a round robin algorithm.
a35560ba
S
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
69 Worker,
70 Data,
71 Response
a35560ba
S
72 >
73
729c563d
S
74 /**
75 * Constructs a new poolifier pool.
76 *
38e795c1 77 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 78 * @param filePath - Path to the worker file.
38e795c1 79 * @param opts - Options for the pool.
729c563d 80 */
c97c7edb 81 public constructor (
b4213b7f
JB
82 protected readonly numberOfWorkers: number,
83 protected readonly filePath: string,
84 protected readonly opts: PoolOptions<Worker>
c97c7edb 85 ) {
78cea37e 86 if (!this.isMain()) {
c97c7edb
S
87 throw new Error('Cannot start a pool from a worker!')
88 }
8d3782fa 89 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 90 this.checkFilePath(this.filePath)
7c0ba920 91 this.checkPoolOptions(this.opts)
1086026a 92
7254e419
JB
93 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
94 this.executeTask = this.executeTask.bind(this)
95 this.enqueueTask = this.enqueueTask.bind(this)
96 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 97
6bd72cd0 98 if (this.opts.enableEvents === true) {
7c0ba920
JB
99 this.emitter = new PoolEmitter()
100 }
d59df138
JB
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
102 Worker,
103 Data,
104 Response
da309861
JB
105 >(
106 this,
107 this.opts.workerChoiceStrategy,
108 this.opts.workerChoiceStrategyOptions
109 )
b6b32453
JB
110
111 this.setupHook()
112
113 for (let i = 1; i <= this.numberOfWorkers; i++) {
114 this.createAndSetupWorker()
115 }
c97c7edb
S
116 }
117
a35560ba 118 private checkFilePath (filePath: string): void {
ffcbbad8
JB
119 if (
120 filePath == null ||
121 (typeof filePath === 'string' && filePath.trim().length === 0)
122 ) {
c510fea7
APA
123 throw new Error('Please specify a file with a worker implementation')
124 }
125 }
126
8d3782fa
JB
127 private checkNumberOfWorkers (numberOfWorkers: number): void {
128 if (numberOfWorkers == null) {
129 throw new Error(
130 'Cannot instantiate a pool without specifying the number of workers'
131 )
78cea37e 132 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 133 throw new TypeError(
0d80593b 134 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
135 )
136 } else if (numberOfWorkers < 0) {
473c717a 137 throw new RangeError(
8d3782fa
JB
138 'Cannot instantiate a pool with a negative number of workers'
139 )
6b27d407 140 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
141 throw new Error('Cannot instantiate a fixed pool with no worker')
142 }
143 }
144
7c0ba920 145 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
146 if (isPlainObject(opts)) {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
150 this.opts.workerChoiceStrategyOptions =
151 opts.workerChoiceStrategyOptions ??
152 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
153 this.checkValidWorkerChoiceStrategyOptions(
154 this.opts.workerChoiceStrategyOptions
155 )
1f68cede 156 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
157 this.opts.enableEvents = opts.enableEvents ?? true
158 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
159 if (this.opts.enableTasksQueue) {
160 this.checkValidTasksQueueOptions(
161 opts.tasksQueueOptions as TasksQueueOptions
162 )
163 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
164 opts.tasksQueueOptions as TasksQueueOptions
165 )
166 }
167 } else {
168 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 169 }
aee46736
JB
170 }
171
172 private checkValidWorkerChoiceStrategy (
173 workerChoiceStrategy: WorkerChoiceStrategy
174 ): void {
175 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 176 throw new Error(
aee46736 177 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
178 )
179 }
7c0ba920
JB
180 }
181
0d80593b
JB
182 private checkValidWorkerChoiceStrategyOptions (
183 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
184 ): void {
185 if (!isPlainObject(workerChoiceStrategyOptions)) {
186 throw new TypeError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
189 }
49be33fe
JB
190 if (
191 workerChoiceStrategyOptions.weights != null &&
6b27d407 192 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
193 ) {
194 throw new Error(
195 'Invalid worker choice strategy options: must have a weight for each worker node'
196 )
197 }
0d80593b
JB
198 }
199
a20f0ba5
JB
200 private checkValidTasksQueueOptions (
201 tasksQueueOptions: TasksQueueOptions
202 ): void {
0d80593b
JB
203 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
204 throw new TypeError('Invalid tasks queue options: must be a plain object')
205 }
a20f0ba5
JB
206 if ((tasksQueueOptions?.concurrency as number) <= 0) {
207 throw new Error(
208 `Invalid worker tasks concurrency '${
209 tasksQueueOptions.concurrency as number
210 }'`
211 )
212 }
213 }
214
08f3f44c 215 /** @inheritDoc */
6b27d407
JB
216 public get info (): PoolInfo {
217 return {
218 type: this.type,
184855e6 219 worker: this.worker,
6b27d407
JB
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
a4e07f72
JB
225 workerNode.workerUsage.tasks.executing === 0
226 ? accumulator + 1
227 : accumulator,
6b27d407
JB
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
a4e07f72
JB
232 workerNode.workerUsage.tasks.executing > 0
233 ? accumulator + 1
234 : accumulator,
6b27d407
JB
235 0
236 ),
a4e07f72 237 executedTasks: this.workerNodes.reduce(
6b27d407 238 (accumulator, workerNode) =>
a4e07f72
JB
239 accumulator + workerNode.workerUsage.tasks.executed,
240 0
241 ),
242 executingTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
245 0
246 ),
247 queuedTasks: this.workerNodes.reduce(
248 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
249 0
250 ),
251 maxQueuedTasks: this.workerNodes.reduce(
252 (accumulator, workerNode) =>
253 accumulator + workerNode.tasksQueue.maxSize,
254 0
a4e07f72
JB
255 ),
256 failedTasks: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
258 accumulator + workerNode.workerUsage.tasks.failed,
259 0
6b27d407
JB
260 )
261 }
262 }
08f3f44c 263
8881ae32
JB
264 /**
265 * Pool type.
266 *
267 * If it is `'dynamic'`, it provides the `max` property.
268 */
269 protected abstract get type (): PoolType
270
184855e6
JB
271 /**
272 * Gets the worker type.
273 */
274 protected abstract get worker (): WorkerType
275
c2ade475 276 /**
6b27d407 277 * Pool minimum size.
c2ade475 278 */
6b27d407 279 protected abstract get minSize (): number
ff733df7
JB
280
281 /**
6b27d407 282 * Pool maximum size.
ff733df7 283 */
6b27d407 284 protected abstract get maxSize (): number
a35560ba 285
ffcbbad8 286 /**
f06e48d8 287 * Gets the given worker its worker node key.
ffcbbad8
JB
288 *
289 * @param worker - The worker.
f06e48d8 290 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 291 */
f06e48d8
JB
292 private getWorkerNodeKey (worker: Worker): number {
293 return this.workerNodes.findIndex(
294 workerNode => workerNode.worker === worker
295 )
bf9549ae
JB
296 }
297
afc003b2 298 /** @inheritDoc */
a35560ba 299 public setWorkerChoiceStrategy (
59219cbb
JB
300 workerChoiceStrategy: WorkerChoiceStrategy,
301 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 302 ): void {
aee46736 303 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 304 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
306 this.opts.workerChoiceStrategy
307 )
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
0ebe2a9f 311 for (const workerNode of this.workerNodes) {
f82cd357 312 this.setWorkerNodeTasksUsage(workerNode, {
a4e07f72 313 tasks: {
a4e07f72 314 executed: 0,
c3c5b1b6 315 executing: 0,
a4e07f72
JB
316 queued:
317 this.opts.enableTasksQueue === true
318 ? workerNode.tasksQueue.size
319 : 0,
320 failed: 0
321 },
322 runTime: {
323 aggregation: 0,
324 average: 0,
325 median: 0,
326 history: new CircularArray()
327 },
328 waitTime: {
329 aggregation: 0,
330 average: 0,
331 median: 0,
332 history: new CircularArray()
333 },
62c15a68 334 elu: undefined
f82cd357 335 })
b6b32453 336 this.setWorkerStatistics(workerNode.worker)
59219cbb 337 }
a20f0ba5
JB
338 }
339
340 /** @inheritDoc */
341 public setWorkerChoiceStrategyOptions (
342 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
343 ): void {
0d80593b 344 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
345 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
346 this.workerChoiceStrategyContext.setOptions(
347 this.opts.workerChoiceStrategyOptions
a35560ba
S
348 )
349 }
350
a20f0ba5 351 /** @inheritDoc */
8f52842f
JB
352 public enableTasksQueue (
353 enable: boolean,
354 tasksQueueOptions?: TasksQueueOptions
355 ): void {
a20f0ba5 356 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 357 this.flushTasksQueues()
a20f0ba5
JB
358 }
359 this.opts.enableTasksQueue = enable
8f52842f 360 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
361 }
362
363 /** @inheritDoc */
8f52842f 364 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 365 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
366 this.checkValidTasksQueueOptions(tasksQueueOptions)
367 this.opts.tasksQueueOptions =
368 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 369 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
370 delete this.opts.tasksQueueOptions
371 }
372 }
373
374 private buildTasksQueueOptions (
375 tasksQueueOptions: TasksQueueOptions
376 ): TasksQueueOptions {
377 return {
378 concurrency: tasksQueueOptions?.concurrency ?? 1
379 }
380 }
381
c319c66b
JB
382 /**
383 * Whether the pool is full or not.
384 *
385 * The pool filling boolean status.
386 */
dea903a8
JB
387 protected get full (): boolean {
388 return this.workerNodes.length >= this.maxSize
389 }
c2ade475 390
c319c66b
JB
391 /**
392 * Whether the pool is busy or not.
393 *
394 * The pool busyness boolean status.
395 */
396 protected abstract get busy (): boolean
7c0ba920 397
c2ade475 398 protected internalBusy (): boolean {
e0ae6100
JB
399 return (
400 this.workerNodes.findIndex(workerNode => {
a4e07f72 401 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
402 }) === -1
403 )
cb70b19d
JB
404 }
405
afc003b2 406 /** @inheritDoc */
a86b6df1 407 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 408 const timestamp = performance.now()
20dcad1a 409 const workerNodeKey = this.chooseWorkerNode()
adc3c320 410 const submittedTask: Task<Data> = {
a86b6df1 411 name,
e5a5c0fc
JB
412 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
413 data: data ?? ({} as Data),
b6b32453 414 timestamp,
adc3c320
JB
415 id: crypto.randomUUID()
416 }
2e81254d 417 const res = new Promise<Response>((resolve, reject) => {
02706357 418 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
419 resolve,
420 reject,
20dcad1a 421 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
422 })
423 })
ff733df7
JB
424 if (
425 this.opts.enableTasksQueue === true &&
7171d33f 426 (this.busy ||
a4e07f72 427 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 428 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 429 .concurrency as number))
ff733df7 430 ) {
26a929d7
JB
431 this.enqueueTask(workerNodeKey, submittedTask)
432 } else {
2e81254d 433 this.executeTask(workerNodeKey, submittedTask)
adc3c320 434 }
b0d6ed8f 435 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 436 this.checkAndEmitEvents()
78cea37e 437 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
438 return res
439 }
c97c7edb 440
afc003b2 441 /** @inheritDoc */
c97c7edb 442 public async destroy (): Promise<void> {
1fbcaa7c 443 await Promise.all(
875a7c37
JB
444 this.workerNodes.map(async (workerNode, workerNodeKey) => {
445 this.flushTasksQueue(workerNodeKey)
47aacbaa 446 // FIXME: wait for tasks to be finished
f06e48d8 447 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
448 })
449 )
c97c7edb
S
450 }
451
4a6952ff 452 /**
f06e48d8 453 * Shutdowns the given worker.
4a6952ff 454 *
f06e48d8 455 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
456 */
457 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 458
729c563d 459 /**
2e81254d 460 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 461 * Can be overridden
afc003b2
JB
462 *
463 * @virtual
729c563d 464 */
280c2a77 465 protected setupHook (): void {
d99ba5a8 466 // Intentionally empty
280c2a77 467 }
c97c7edb 468
729c563d 469 /**
280c2a77
S
470 * Should return whether the worker is the main worker or not.
471 */
472 protected abstract isMain (): boolean
473
474 /**
2e81254d 475 * Hook executed before the worker task execution.
bf9549ae 476 * Can be overridden.
729c563d 477 *
f06e48d8 478 * @param workerNodeKey - The worker node key.
729c563d 479 */
f20f344f 480 protected beforeTaskExecutionHook (workerNodeKey: number): void {
a4e07f72
JB
481 ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
482 if (this.opts.enableTasksQueue === true) {
483 this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
484 this.tasksQueueSize(workerNodeKey)
485 }
c97c7edb
S
486 }
487
c01733f1 488 /**
2e81254d 489 * Hook executed after the worker task execution.
bf9549ae 490 * Can be overridden.
c01733f1 491 *
c923ce56 492 * @param worker - The worker.
38e795c1 493 * @param message - The received message.
c01733f1 494 */
2e81254d 495 protected afterTaskExecutionHook (
c923ce56 496 worker: Worker,
2740a743 497 message: MessageValue<Response>
bf9549ae 498 ): void {
a4e07f72
JB
499 const workerUsage =
500 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
501 const workerTaskStatistics = workerUsage.tasks
502 --workerTaskStatistics.executing
503 ++workerTaskStatistics.executed
82f36766 504 if (message.taskError != null) {
a4e07f72 505 ++workerTaskStatistics.failed
2740a743 506 }
a4e07f72
JB
507
508 this.updateRunTimeWorkerUsage(workerUsage, message)
509 this.updateWaitTimeWorkerUsage(workerUsage, message)
510 this.updateEluWorkerUsage(workerUsage, message)
f8eb0a2a
JB
511 }
512
a4e07f72
JB
513 private updateRunTimeWorkerUsage (
514 workerUsage: WorkerUsage,
f8eb0a2a
JB
515 message: MessageValue<Response>
516 ): void {
87de9ff5
JB
517 if (
518 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
519 ) {
a4e07f72 520 workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
c6bd2650 521 if (
87de9ff5
JB
522 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
523 .avgRunTime &&
a4e07f72 524 workerUsage.tasks.executed !== 0
c6bd2650 525 ) {
a4e07f72
JB
526 workerUsage.runTime.average =
527 workerUsage.runTime.aggregation / workerUsage.tasks.executed
3032893a 528 }
3fa4cdd2 529 if (
87de9ff5
JB
530 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
531 .medRunTime &&
d715b7bc 532 message.taskPerformance?.runTime != null
3fa4cdd2 533 ) {
a4e07f72
JB
534 workerUsage.runTime.history.push(message.taskPerformance.runTime)
535 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 536 }
3032893a 537 }
f8eb0a2a
JB
538 }
539
a4e07f72
JB
540 private updateWaitTimeWorkerUsage (
541 workerUsage: WorkerUsage,
f8eb0a2a
JB
542 message: MessageValue<Response>
543 ): void {
87de9ff5
JB
544 if (
545 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
546 ) {
a4e07f72 547 workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
09a6305f 548 if (
87de9ff5
JB
549 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
550 .avgWaitTime &&
a4e07f72 551 workerUsage.tasks.executed !== 0
09a6305f 552 ) {
a4e07f72
JB
553 workerUsage.waitTime.average =
554 workerUsage.waitTime.aggregation / workerUsage.tasks.executed
09a6305f
JB
555 }
556 if (
87de9ff5
JB
557 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
558 .medWaitTime &&
d715b7bc 559 message.taskPerformance?.waitTime != null
09a6305f 560 ) {
a4e07f72
JB
561 workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
562 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 563 }
0567595a 564 }
c01733f1 565 }
566
a4e07f72
JB
567 private updateEluWorkerUsage (
568 workerTasksUsage: WorkerUsage,
62c15a68
JB
569 message: MessageValue<Response>
570 ): void {
87de9ff5 571 if (this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu) {
d715b7bc
JB
572 if (
573 workerTasksUsage.elu != null &&
574 message.taskPerformance?.elu != null
575 ) {
62c15a68 576 workerTasksUsage.elu = {
d715b7bc
JB
577 idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
578 active:
579 workerTasksUsage.elu.active + message.taskPerformance.elu.active,
62c15a68 580 utilization:
d715b7bc
JB
581 (workerTasksUsage.elu.utilization +
582 message.taskPerformance.elu.utilization) /
583 2
62c15a68 584 }
d715b7bc
JB
585 } else if (message.taskPerformance?.elu != null) {
586 workerTasksUsage.elu = message.taskPerformance.elu
62c15a68
JB
587 }
588 }
589 }
590
280c2a77 591 /**
f06e48d8 592 * Chooses a worker node for the next task.
280c2a77 593 *
20dcad1a 594 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 595 *
20dcad1a 596 * @returns The worker node key
280c2a77 597 */
20dcad1a 598 protected chooseWorkerNode (): number {
f06e48d8 599 let workerNodeKey: number
6b27d407 600 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
601 const workerCreated = this.createAndSetupWorker()
602 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 603 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
604 if (
605 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 606 (message.kill != null &&
a4e07f72
JB
607 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
608 .executing === 0)
17393ac8 609 ) {
ff733df7 610 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 611 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 612 // FIXME: wait for tasks to be finished
7c5a1080 613 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
614 }
615 })
adc3c320 616 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 617 } else {
f06e48d8 618 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 619 }
20dcad1a 620 return workerNodeKey
c97c7edb
S
621 }
622
280c2a77 623 /**
675bb809 624 * Sends a message to the given worker.
280c2a77 625 *
38e795c1
JB
626 * @param worker - The worker which should receive the message.
627 * @param message - The message.
280c2a77
S
628 */
629 protected abstract sendToWorker (
630 worker: Worker,
631 message: MessageValue<Data>
632 ): void
633
4a6952ff 634 /**
f06e48d8 635 * Registers a listener callback on the given worker.
4a6952ff 636 *
38e795c1
JB
637 * @param worker - The worker which should register a listener.
638 * @param listener - The message listener callback.
4a6952ff
JB
639 */
640 protected abstract registerWorkerMessageListener<
4f7fa42a 641 Message extends Data | Response
78cea37e 642 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 643
729c563d
S
644 /**
645 * Returns a newly created worker.
646 */
280c2a77 647 protected abstract createWorker (): Worker
c97c7edb 648
729c563d 649 /**
f06e48d8 650 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 651 *
38e795c1 652 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 653 *
38e795c1 654 * @param worker - The newly created worker.
729c563d 655 */
280c2a77 656 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 657
4a6952ff 658 /**
f06e48d8 659 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
660 *
661 * @returns New, completely set up worker.
662 */
663 protected createAndSetupWorker (): Worker {
bdacc2d2 664 const worker = this.createWorker()
280c2a77 665
35cf1c03 666 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 667 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
668 worker.on('error', error => {
669 if (this.emitter != null) {
670 this.emitter.emit(PoolEvents.error, error)
671 }
672 })
5baee0d7
JB
673 worker.on('error', () => {
674 if (this.opts.restartWorkerOnError === true) {
1f68cede 675 this.createAndSetupWorker()
5baee0d7
JB
676 }
677 })
a35560ba
S
678 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
679 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 680 worker.once('exit', () => {
f06e48d8 681 this.removeWorkerNode(worker)
a974afa6 682 })
280c2a77 683
f06e48d8 684 this.pushWorkerNode(worker)
280c2a77 685
b6b32453
JB
686 this.setWorkerStatistics(worker)
687
280c2a77
S
688 this.afterWorkerSetup(worker)
689
c97c7edb
S
690 return worker
691 }
be0676b3
APA
692
693 /**
ff733df7 694 * This function is the listener registered for each worker message.
be0676b3 695 *
bdacc2d2 696 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
697 */
698 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 699 return message => {
b1989cfd 700 if (message.id != null) {
a3445496 701 // Task execution response received
2740a743 702 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 703 if (promiseResponse != null) {
82f36766
JB
704 if (message.taskError != null) {
705 promiseResponse.reject(message.taskError.message)
91ee39ed 706 if (this.emitter != null) {
82f36766 707 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 708 }
a05c10de 709 } else {
2740a743 710 promiseResponse.resolve(message.data as Response)
a05c10de 711 }
2e81254d 712 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 713 this.promiseResponseMap.delete(message.id)
ff733df7
JB
714 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
715 if (
716 this.opts.enableTasksQueue === true &&
416fd65c 717 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 718 ) {
2e81254d
JB
719 this.executeTask(
720 workerNodeKey,
ff733df7
JB
721 this.dequeueTask(workerNodeKey) as Task<Data>
722 )
723 }
be0676b3
APA
724 }
725 }
726 }
be0676b3 727 }
7c0ba920 728
ff733df7 729 private checkAndEmitEvents (): void {
1f68cede 730 if (this.emitter != null) {
ff733df7 731 if (this.busy) {
6b27d407 732 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 733 }
6b27d407
JB
734 if (this.type === PoolTypes.dynamic && this.full) {
735 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 736 }
164d950a
JB
737 }
738 }
739
0ebe2a9f
JB
740 /**
741 * Sets the given worker node its tasks usage in the pool.
742 *
743 * @param workerNode - The worker node.
a4e07f72 744 * @param workerUsage - The worker usage.
0ebe2a9f
JB
745 */
746 private setWorkerNodeTasksUsage (
747 workerNode: WorkerNode<Worker, Data>,
a4e07f72 748 workerUsage: WorkerUsage
0ebe2a9f 749 ): void {
a4e07f72 750 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
751 }
752
a05c10de 753 /**
f06e48d8 754 * Pushes the given worker in the pool worker nodes.
ea7a90d3 755 *
38e795c1 756 * @param worker - The worker.
f06e48d8 757 * @returns The worker nodes length.
ea7a90d3 758 */
f06e48d8
JB
759 private pushWorkerNode (worker: Worker): number {
760 return this.workerNodes.push({
ffcbbad8 761 worker,
a4e07f72
JB
762 workerUsage: {
763 tasks: {
764 executed: 0,
765 executing: 0,
766 queued: 0,
767 failed: 0
768 },
769 runTime: {
770 aggregation: 0,
771 average: 0,
772 median: 0,
773 history: new CircularArray()
774 },
775
776 waitTime: {
777 aggregation: 0,
778 average: 0,
779 median: 0,
780 history: new CircularArray()
781 },
62c15a68 782 elu: undefined
f82cd357 783 },
29ee7e9a 784 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
785 })
786 }
c923ce56
JB
787
788 /**
f06e48d8 789 * Sets the given worker in the pool worker nodes.
c923ce56 790 *
f06e48d8 791 * @param workerNodeKey - The worker node key.
c923ce56 792 * @param worker - The worker.
a4e07f72 793 * @param workerUsage - The worker usage.
f06e48d8 794 * @param tasksQueue - The worker task queue.
c923ce56 795 */
f06e48d8
JB
796 private setWorkerNode (
797 workerNodeKey: number,
c923ce56 798 worker: Worker,
a4e07f72 799 workerUsage: WorkerUsage,
29ee7e9a 800 tasksQueue: Queue<Task<Data>>
c923ce56 801 ): void {
f06e48d8 802 this.workerNodes[workerNodeKey] = {
c923ce56 803 worker,
a4e07f72 804 workerUsage,
f06e48d8 805 tasksQueue
c923ce56
JB
806 }
807 }
51fe3d3c
JB
808
809 /**
f06e48d8 810 * Removes the given worker from the pool worker nodes.
51fe3d3c 811 *
f06e48d8 812 * @param worker - The worker.
51fe3d3c 813 */
416fd65c 814 private removeWorkerNode (worker: Worker): void {
f06e48d8 815 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
816 if (workerNodeKey !== -1) {
817 this.workerNodes.splice(workerNodeKey, 1)
818 this.workerChoiceStrategyContext.remove(workerNodeKey)
819 }
51fe3d3c 820 }
adc3c320 821
2e81254d 822 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 823 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
824 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
825 }
826
f9f00b5f 827 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 828 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
829 }
830
416fd65c 831 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 832 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
833 }
834
416fd65c 835 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 836 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 837 }
ff733df7 838
416fd65c
JB
839 private flushTasksQueue (workerNodeKey: number): void {
840 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
841 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
842 this.executeTask(
843 workerNodeKey,
844 this.dequeueTask(workerNodeKey) as Task<Data>
845 )
ff733df7 846 }
ff733df7
JB
847 }
848 }
849
ef41a6e6
JB
850 private flushTasksQueues (): void {
851 for (const [workerNodeKey] of this.workerNodes.entries()) {
852 this.flushTasksQueue(workerNodeKey)
853 }
854 }
b6b32453
JB
855
856 private setWorkerStatistics (worker: Worker): void {
857 this.sendToWorker(worker, {
858 statistics: {
87de9ff5
JB
859 runTime:
860 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
861 .runTime,
862 waitTime:
863 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
864 .waitTime,
865 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
866 .elu
b6b32453
JB
867 }
868 })
869 }
c97c7edb 870}