docs: refinements
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
e102732c
JB
24import type {
25 IWorker,
26 MessageHandler,
27 Task,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba 31import {
f0d7f803 32 Measurements,
a35560ba 33 WorkerChoiceStrategies,
a20f0ba5
JB
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
bdaf31cd
JB
36} from './selection-strategies/selection-strategies-types'
37import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 38
729c563d 39/**
ea7a90d3 40 * Base class that implements some shared logic for all poolifier pools.
729c563d 41 *
38e795c1 42 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
43 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
44 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 45 */
c97c7edb 46export abstract class AbstractPool<
f06e48d8 47 Worker extends IWorker,
d3c8a1a8
S
48 Data = unknown,
49 Response = unknown
c4855468 50> implements IPool<Worker, Data, Response> {
afc003b2 51 /** @inheritDoc */
f06e48d8 52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 53
afc003b2 54 /** @inheritDoc */
7c0ba920
JB
55 public readonly emitter?: PoolEmitter
56
be0676b3 57 /**
a3445496 58 * The execution response promise map.
be0676b3 59 *
2740a743 60 * - `key`: The message id of each submitted task.
a3445496 61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 62 *
a3445496 63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 64 */
c923ce56
JB
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 69
a35560ba 70 /**
51fe3d3c 71 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
74 Worker,
75 Data,
76 Response
a35560ba
S
77 >
78
729c563d
S
79 /**
80 * Constructs a new poolifier pool.
81 *
38e795c1 82 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 83 * @param filePath - Path to the worker file.
38e795c1 84 * @param opts - Options for the pool.
729c563d 85 */
c97c7edb 86 public constructor (
b4213b7f
JB
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
c97c7edb 90 ) {
78cea37e 91 if (!this.isMain()) {
c97c7edb
S
92 throw new Error('Cannot start a pool from a worker!')
93 }
8d3782fa 94 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 95 this.checkFilePath(this.filePath)
7c0ba920 96 this.checkPoolOptions(this.opts)
1086026a 97
7254e419
JB
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
b6b32453
JB
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
c97c7edb
S
121 }
122
a35560ba 123 private checkFilePath (filePath: string): void {
ffcbbad8
JB
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
c510fea7
APA
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
8d3782fa
JB
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
78cea37e 137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 138 throw new TypeError(
0d80593b 139 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
140 )
141 } else if (numberOfWorkers < 0) {
473c717a 142 throw new RangeError(
8d3782fa
JB
143 'Cannot instantiate a pool with a negative number of workers'
144 )
6b27d407 145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
7c0ba920 150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
1f68cede 161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 174 }
aee46736
JB
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 181 throw new Error(
aee46736 182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
183 )
184 }
7c0ba920
JB
185 }
186
0d80593b
JB
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
49be33fe
JB
195 if (
196 workerChoiceStrategyOptions.weights != null &&
6b27d407 197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
f0d7f803
JB
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
0d80593b
JB
213 }
214
a20f0ba5
JB
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
0d80593b
JB
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
f0d7f803
JB
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
a20f0ba5 233 throw new Error(
f0d7f803 234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
235 )
236 }
237 }
238
08f3f44c 239 /** @inheritDoc */
6b27d407
JB
240 public get info (): PoolInfo {
241 return {
242 type: this.type,
184855e6 243 worker: this.worker,
6b27d407
JB
244 minSize: this.minSize,
245 maxSize: this.maxSize,
246 workerNodes: this.workerNodes.length,
247 idleWorkerNodes: this.workerNodes.reduce(
248 (accumulator, workerNode) =>
465b2940 249 workerNode.usage.tasks.executing === 0
a4e07f72
JB
250 ? accumulator + 1
251 : accumulator,
6b27d407
JB
252 0
253 ),
254 busyWorkerNodes: this.workerNodes.reduce(
255 (accumulator, workerNode) =>
465b2940 256 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
257 0
258 ),
a4e07f72 259 executedTasks: this.workerNodes.reduce(
6b27d407 260 (accumulator, workerNode) =>
465b2940 261 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
262 0
263 ),
264 executingTasks: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
465b2940 266 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
267 0
268 ),
269 queuedTasks: this.workerNodes.reduce(
df593701 270 (accumulator, workerNode) =>
465b2940 271 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
272 0
273 ),
274 maxQueuedTasks: this.workerNodes.reduce(
275 (accumulator, workerNode) =>
465b2940 276 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 277 0
a4e07f72
JB
278 ),
279 failedTasks: this.workerNodes.reduce(
280 (accumulator, workerNode) =>
465b2940 281 accumulator + workerNode.usage.tasks.failed,
a4e07f72 282 0
6b27d407
JB
283 )
284 }
285 }
08f3f44c 286
8881ae32
JB
287 /**
288 * Pool type.
289 *
290 * If it is `'dynamic'`, it provides the `max` property.
291 */
292 protected abstract get type (): PoolType
293
184855e6
JB
294 /**
295 * Gets the worker type.
296 */
297 protected abstract get worker (): WorkerType
298
c2ade475 299 /**
6b27d407 300 * Pool minimum size.
c2ade475 301 */
6b27d407 302 protected abstract get minSize (): number
ff733df7
JB
303
304 /**
6b27d407 305 * Pool maximum size.
ff733df7 306 */
6b27d407 307 protected abstract get maxSize (): number
a35560ba 308
ffcbbad8 309 /**
f06e48d8 310 * Gets the given worker its worker node key.
ffcbbad8
JB
311 *
312 * @param worker - The worker.
465b2940 313 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 314 */
f06e48d8
JB
315 private getWorkerNodeKey (worker: Worker): number {
316 return this.workerNodes.findIndex(
317 workerNode => workerNode.worker === worker
318 )
bf9549ae
JB
319 }
320
afc003b2 321 /** @inheritDoc */
a35560ba 322 public setWorkerChoiceStrategy (
59219cbb
JB
323 workerChoiceStrategy: WorkerChoiceStrategy,
324 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 325 ): void {
aee46736 326 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 327 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
328 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
329 this.opts.workerChoiceStrategy
330 )
331 if (workerChoiceStrategyOptions != null) {
332 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
333 }
9c16fb4b 334 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
335 this.setWorkerNodeTasksUsage(
336 workerNode,
9c16fb4b 337 this.getWorkerUsage(workerNodeKey)
8604aaab 338 )
b6b32453 339 this.setWorkerStatistics(workerNode.worker)
59219cbb 340 }
a20f0ba5
JB
341 }
342
343 /** @inheritDoc */
344 public setWorkerChoiceStrategyOptions (
345 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
346 ): void {
0d80593b 347 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
348 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
349 this.workerChoiceStrategyContext.setOptions(
350 this.opts.workerChoiceStrategyOptions
a35560ba
S
351 )
352 }
353
a20f0ba5 354 /** @inheritDoc */
8f52842f
JB
355 public enableTasksQueue (
356 enable: boolean,
357 tasksQueueOptions?: TasksQueueOptions
358 ): void {
a20f0ba5 359 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 360 this.flushTasksQueues()
a20f0ba5
JB
361 }
362 this.opts.enableTasksQueue = enable
8f52842f 363 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
364 }
365
366 /** @inheritDoc */
8f52842f 367 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 368 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
369 this.checkValidTasksQueueOptions(tasksQueueOptions)
370 this.opts.tasksQueueOptions =
371 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 372 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
373 delete this.opts.tasksQueueOptions
374 }
375 }
376
377 private buildTasksQueueOptions (
378 tasksQueueOptions: TasksQueueOptions
379 ): TasksQueueOptions {
380 return {
381 concurrency: tasksQueueOptions?.concurrency ?? 1
382 }
383 }
384
c319c66b
JB
385 /**
386 * Whether the pool is full or not.
387 *
388 * The pool filling boolean status.
389 */
dea903a8
JB
390 protected get full (): boolean {
391 return this.workerNodes.length >= this.maxSize
392 }
c2ade475 393
c319c66b
JB
394 /**
395 * Whether the pool is busy or not.
396 *
397 * The pool busyness boolean status.
398 */
399 protected abstract get busy (): boolean
7c0ba920 400
6c6afb84
JB
401 /**
402 * Whether worker nodes are executing at least one task.
403 *
404 * @returns Worker nodes busyness boolean status.
405 */
c2ade475 406 protected internalBusy (): boolean {
e0ae6100
JB
407 return (
408 this.workerNodes.findIndex(workerNode => {
465b2940 409 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
410 }) === -1
411 )
cb70b19d
JB
412 }
413
afc003b2 414 /** @inheritDoc */
a86b6df1 415 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 416 const timestamp = performance.now()
20dcad1a 417 const workerNodeKey = this.chooseWorkerNode()
adc3c320 418 const submittedTask: Task<Data> = {
a86b6df1 419 name,
e5a5c0fc
JB
420 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
421 data: data ?? ({} as Data),
b6b32453 422 timestamp,
adc3c320
JB
423 id: crypto.randomUUID()
424 }
2e81254d 425 const res = new Promise<Response>((resolve, reject) => {
02706357 426 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
427 resolve,
428 reject,
20dcad1a 429 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
430 })
431 })
ff733df7
JB
432 if (
433 this.opts.enableTasksQueue === true &&
7171d33f 434 (this.busy ||
465b2940 435 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 436 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 437 .concurrency as number))
ff733df7 438 ) {
26a929d7
JB
439 this.enqueueTask(workerNodeKey, submittedTask)
440 } else {
2e81254d 441 this.executeTask(workerNodeKey, submittedTask)
adc3c320 442 }
ff733df7 443 this.checkAndEmitEvents()
78cea37e 444 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
445 return res
446 }
c97c7edb 447
afc003b2 448 /** @inheritDoc */
c97c7edb 449 public async destroy (): Promise<void> {
1fbcaa7c 450 await Promise.all(
875a7c37
JB
451 this.workerNodes.map(async (workerNode, workerNodeKey) => {
452 this.flushTasksQueue(workerNodeKey)
47aacbaa 453 // FIXME: wait for tasks to be finished
f06e48d8 454 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
455 })
456 )
c97c7edb
S
457 }
458
4a6952ff 459 /**
6c6afb84 460 * Terminates the given worker.
4a6952ff 461 *
f06e48d8 462 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
463 */
464 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 465
729c563d 466 /**
6677a3d3
JB
467 * Setup hook to execute code before worker nodes are created in the abstract constructor.
468 * Can be overridden.
afc003b2
JB
469 *
470 * @virtual
729c563d 471 */
280c2a77 472 protected setupHook (): void {
d99ba5a8 473 // Intentionally empty
280c2a77 474 }
c97c7edb 475
729c563d 476 /**
280c2a77
S
477 * Should return whether the worker is the main worker or not.
478 */
479 protected abstract isMain (): boolean
480
481 /**
2e81254d 482 * Hook executed before the worker task execution.
bf9549ae 483 * Can be overridden.
729c563d 484 *
f06e48d8 485 * @param workerNodeKey - The worker node key.
1c6fe997 486 * @param task - The task to execute.
729c563d 487 */
1c6fe997
JB
488 protected beforeTaskExecutionHook (
489 workerNodeKey: number,
490 task: Task<Data>
491 ): void {
465b2940 492 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
493 ++workerUsage.tasks.executing
494 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
495 }
496
c01733f1 497 /**
2e81254d 498 * Hook executed after the worker task execution.
bf9549ae 499 * Can be overridden.
c01733f1 500 *
c923ce56 501 * @param worker - The worker.
38e795c1 502 * @param message - The received message.
c01733f1 503 */
2e81254d 504 protected afterTaskExecutionHook (
c923ce56 505 worker: Worker,
2740a743 506 message: MessageValue<Response>
bf9549ae 507 ): void {
465b2940 508 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
509 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
510 this.updateRunTimeWorkerUsage(workerUsage, message)
511 this.updateEluWorkerUsage(workerUsage, message)
512 }
513
514 private updateTaskStatisticsWorkerUsage (
515 workerUsage: WorkerUsage,
516 message: MessageValue<Response>
517 ): void {
a4e07f72
JB
518 const workerTaskStatistics = workerUsage.tasks
519 --workerTaskStatistics.executing
520 ++workerTaskStatistics.executed
82f36766 521 if (message.taskError != null) {
a4e07f72 522 ++workerTaskStatistics.failed
2740a743 523 }
f8eb0a2a
JB
524 }
525
a4e07f72
JB
526 private updateRunTimeWorkerUsage (
527 workerUsage: WorkerUsage,
f8eb0a2a
JB
528 message: MessageValue<Response>
529 ): void {
87de9ff5
JB
530 if (
531 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 532 .aggregate
87de9ff5 533 ) {
932fc8be 534 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 535 if (
932fc8be
JB
536 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
537 .average &&
a4e07f72 538 workerUsage.tasks.executed !== 0
c6bd2650 539 ) {
a4e07f72 540 workerUsage.runTime.average =
f1c06930
JB
541 workerUsage.runTime.aggregate /
542 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 543 }
3fa4cdd2 544 if (
932fc8be
JB
545 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
546 .median &&
d715b7bc 547 message.taskPerformance?.runTime != null
3fa4cdd2 548 ) {
a4e07f72
JB
549 workerUsage.runTime.history.push(message.taskPerformance.runTime)
550 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 551 }
3032893a 552 }
f8eb0a2a
JB
553 }
554
a4e07f72
JB
555 private updateWaitTimeWorkerUsage (
556 workerUsage: WorkerUsage,
1c6fe997 557 task: Task<Data>
f8eb0a2a 558 ): void {
1c6fe997
JB
559 const timestamp = performance.now()
560 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
561 if (
562 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 563 .aggregate
87de9ff5 564 ) {
932fc8be 565 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 566 if (
87de9ff5 567 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 568 .waitTime.average &&
a4e07f72 569 workerUsage.tasks.executed !== 0
09a6305f 570 ) {
a4e07f72 571 workerUsage.waitTime.average =
f1c06930
JB
572 workerUsage.waitTime.aggregate /
573 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
574 }
575 if (
87de9ff5 576 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 577 .waitTime.median &&
1c6fe997 578 taskWaitTime != null
09a6305f 579 ) {
1c6fe997 580 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 581 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 582 }
0567595a 583 }
c01733f1 584 }
585
a4e07f72 586 private updateEluWorkerUsage (
5df69fab 587 workerUsage: WorkerUsage,
62c15a68
JB
588 message: MessageValue<Response>
589 ): void {
5df69fab
JB
590 if (
591 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
592 .aggregate
593 ) {
594 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
595 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
596 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
597 workerUsage.elu.utilization =
598 (workerUsage.elu.utilization +
599 message.taskPerformance.elu.utilization) /
600 2
601 } else if (message.taskPerformance?.elu != null) {
602 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
603 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
604 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
605 }
d715b7bc 606 if (
5df69fab
JB
607 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
608 .average &&
609 workerUsage.tasks.executed !== 0
610 ) {
f1c06930
JB
611 const executedTasks =
612 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 613 workerUsage.elu.idle.average =
f1c06930 614 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 615 workerUsage.elu.active.average =
f1c06930 616 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
617 }
618 if (
619 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
620 .median &&
d715b7bc
JB
621 message.taskPerformance?.elu != null
622 ) {
5df69fab
JB
623 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
624 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
625 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
626 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
627 }
628 }
629 }
630
280c2a77 631 /**
f06e48d8 632 * Chooses a worker node for the next task.
280c2a77 633 *
6c6afb84 634 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 635 *
20dcad1a 636 * @returns The worker node key
280c2a77 637 */
6c6afb84 638 private chooseWorkerNode (): number {
930dcf12 639 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
640 const worker = this.createAndSetupDynamicWorker()
641 if (
642 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
643 ) {
644 return this.getWorkerNodeKey(worker)
645 }
17393ac8 646 }
930dcf12
JB
647 return this.workerChoiceStrategyContext.execute()
648 }
649
6c6afb84
JB
650 /**
651 * Conditions for dynamic worker creation.
652 *
653 * @returns Whether to create a dynamic worker or not.
654 */
655 private shallCreateDynamicWorker (): boolean {
930dcf12 656 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
657 }
658
280c2a77 659 /**
675bb809 660 * Sends a message to the given worker.
280c2a77 661 *
38e795c1
JB
662 * @param worker - The worker which should receive the message.
663 * @param message - The message.
280c2a77
S
664 */
665 protected abstract sendToWorker (
666 worker: Worker,
667 message: MessageValue<Data>
668 ): void
669
4a6952ff 670 /**
f06e48d8 671 * Registers a listener callback on the given worker.
4a6952ff 672 *
38e795c1
JB
673 * @param worker - The worker which should register a listener.
674 * @param listener - The message listener callback.
4a6952ff 675 */
e102732c
JB
676 private registerWorkerMessageListener<Message extends Data | Response>(
677 worker: Worker,
678 listener: (message: MessageValue<Message>) => void
679 ): void {
680 worker.on('message', listener as MessageHandler<Worker>)
681 }
c97c7edb 682
729c563d 683 /**
41344292 684 * Creates a new worker.
6c6afb84
JB
685 *
686 * @returns Newly created worker.
729c563d 687 */
280c2a77 688 protected abstract createWorker (): Worker
c97c7edb 689
729c563d 690 /**
f06e48d8 691 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 692 * Can be overridden.
729c563d 693 *
38e795c1 694 * @param worker - The newly created worker.
729c563d 695 */
6677a3d3 696 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
697 // Listen to worker messages.
698 this.registerWorkerMessageListener(worker, this.workerListener())
699 }
c97c7edb 700
4a6952ff 701 /**
f06e48d8 702 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
703 *
704 * @returns New, completely set up worker.
705 */
706 protected createAndSetupWorker (): Worker {
bdacc2d2 707 const worker = this.createWorker()
280c2a77 708
35cf1c03 709 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 710 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
711 worker.on('error', error => {
712 if (this.emitter != null) {
713 this.emitter.emit(PoolEvents.error, error)
714 }
5baee0d7 715 if (this.opts.restartWorkerOnError === true) {
1f68cede 716 this.createAndSetupWorker()
5baee0d7
JB
717 }
718 })
a35560ba
S
719 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
720 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 721 worker.once('exit', () => {
f06e48d8 722 this.removeWorkerNode(worker)
a974afa6 723 })
280c2a77 724
f06e48d8 725 this.pushWorkerNode(worker)
280c2a77 726
b6b32453
JB
727 this.setWorkerStatistics(worker)
728
280c2a77
S
729 this.afterWorkerSetup(worker)
730
c97c7edb
S
731 return worker
732 }
be0676b3 733
930dcf12
JB
734 /**
735 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
736 *
737 * @returns New, completely set up dynamic worker.
738 */
739 protected createAndSetupDynamicWorker (): Worker {
740 const worker = this.createAndSetupWorker()
741 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 742 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
743 if (
744 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
745 (message.kill != null &&
746 ((this.opts.enableTasksQueue === false &&
465b2940 747 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 748 (this.opts.enableTasksQueue === true &&
465b2940 749 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 750 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
751 ) {
752 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
753 void (this.destroyWorker(worker) as Promise<void>)
754 }
755 })
756 return worker
757 }
758
be0676b3 759 /**
ff733df7 760 * This function is the listener registered for each worker message.
be0676b3 761 *
bdacc2d2 762 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
763 */
764 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 765 return message => {
b1989cfd 766 if (message.id != null) {
a3445496 767 // Task execution response received
2740a743 768 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 769 if (promiseResponse != null) {
82f36766 770 if (message.taskError != null) {
91ee39ed 771 if (this.emitter != null) {
82f36766 772 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 773 }
cd9580e7 774 promiseResponse.reject(message.taskError.message)
a05c10de 775 } else {
2740a743 776 promiseResponse.resolve(message.data as Response)
a05c10de 777 }
2e81254d 778 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 779 this.promiseResponseMap.delete(message.id)
ff733df7
JB
780 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
781 if (
782 this.opts.enableTasksQueue === true &&
416fd65c 783 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 784 ) {
2e81254d
JB
785 this.executeTask(
786 workerNodeKey,
ff733df7
JB
787 this.dequeueTask(workerNodeKey) as Task<Data>
788 )
789 }
e5536a06 790 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
791 }
792 }
793 }
be0676b3 794 }
7c0ba920 795
ff733df7 796 private checkAndEmitEvents (): void {
1f68cede 797 if (this.emitter != null) {
ff733df7 798 if (this.busy) {
6b27d407 799 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 800 }
6b27d407
JB
801 if (this.type === PoolTypes.dynamic && this.full) {
802 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 803 }
164d950a
JB
804 }
805 }
806
0ebe2a9f
JB
807 /**
808 * Sets the given worker node its tasks usage in the pool.
809 *
810 * @param workerNode - The worker node.
a4e07f72 811 * @param workerUsage - The worker usage.
0ebe2a9f
JB
812 */
813 private setWorkerNodeTasksUsage (
814 workerNode: WorkerNode<Worker, Data>,
a4e07f72 815 workerUsage: WorkerUsage
0ebe2a9f 816 ): void {
465b2940 817 workerNode.usage = workerUsage
0ebe2a9f
JB
818 }
819
a05c10de 820 /**
f06e48d8 821 * Pushes the given worker in the pool worker nodes.
ea7a90d3 822 *
38e795c1 823 * @param worker - The worker.
f06e48d8 824 * @returns The worker nodes length.
ea7a90d3 825 */
f06e48d8 826 private pushWorkerNode (worker: Worker): number {
9c16fb4b 827 this.workerNodes.push({
ffcbbad8 828 worker,
465b2940 829 usage: this.getWorkerUsage(),
29ee7e9a 830 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 831 })
9c16fb4b
JB
832 const workerNodeKey = this.getWorkerNodeKey(worker)
833 this.setWorkerNodeTasksUsage(
834 this.workerNodes[workerNodeKey],
835 this.getWorkerUsage(workerNodeKey)
836 )
837 return this.workerNodes.length
ea7a90d3 838 }
c923ce56 839
8604aaab
JB
840 // /**
841 // * Sets the given worker in the pool worker nodes.
842 // *
843 // * @param workerNodeKey - The worker node key.
844 // * @param worker - The worker.
845 // * @param workerUsage - The worker usage.
846 // * @param tasksQueue - The worker task queue.
847 // */
848 // private setWorkerNode (
849 // workerNodeKey: number,
850 // worker: Worker,
851 // workerUsage: WorkerUsage,
852 // tasksQueue: Queue<Task<Data>>
853 // ): void {
854 // this.workerNodes[workerNodeKey] = {
855 // worker,
465b2940 856 // usage: workerUsage,
8604aaab
JB
857 // tasksQueue
858 // }
859 // }
51fe3d3c
JB
860
861 /**
f06e48d8 862 * Removes the given worker from the pool worker nodes.
51fe3d3c 863 *
f06e48d8 864 * @param worker - The worker.
51fe3d3c 865 */
416fd65c 866 private removeWorkerNode (worker: Worker): void {
f06e48d8 867 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
868 if (workerNodeKey !== -1) {
869 this.workerNodes.splice(workerNodeKey, 1)
870 this.workerChoiceStrategyContext.remove(workerNodeKey)
871 }
51fe3d3c 872 }
adc3c320 873
2e81254d 874 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 875 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
876 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
877 }
878
f9f00b5f 879 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 880 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
881 }
882
416fd65c 883 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 884 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
885 }
886
416fd65c 887 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 888 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 889 }
ff733df7 890
df593701
JB
891 private tasksMaxQueueSize (workerNodeKey: number): number {
892 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
893 }
894
416fd65c
JB
895 private flushTasksQueue (workerNodeKey: number): void {
896 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
897 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
898 this.executeTask(
899 workerNodeKey,
900 this.dequeueTask(workerNodeKey) as Task<Data>
901 )
ff733df7 902 }
ff733df7 903 }
df593701 904 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
905 }
906
ef41a6e6
JB
907 private flushTasksQueues (): void {
908 for (const [workerNodeKey] of this.workerNodes.entries()) {
909 this.flushTasksQueue(workerNodeKey)
910 }
911 }
b6b32453
JB
912
913 private setWorkerStatistics (worker: Worker): void {
914 this.sendToWorker(worker, {
915 statistics: {
87de9ff5
JB
916 runTime:
917 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 918 .runTime.aggregate,
87de9ff5 919 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 920 .elu.aggregate
b6b32453
JB
921 }
922 })
923 }
8604aaab 924
9c16fb4b 925 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
926 const getTasksQueueSize = (workerNodeKey?: number): number => {
927 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 928 }
df593701
JB
929 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
930 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
931 }
8604aaab 932 return {
9c16fb4b
JB
933 tasks: {
934 executed: 0,
935 executing: 0,
936 get queued (): number {
e3347a5c 937 return getTasksQueueSize(workerNodeKey)
9c16fb4b 938 },
df593701
JB
939 get maxQueued (): number {
940 return getTasksMaxQueueSize(workerNodeKey)
941 },
9c16fb4b
JB
942 failed: 0
943 },
8604aaab 944 runTime: {
932fc8be 945 aggregate: 0,
8604aaab
JB
946 average: 0,
947 median: 0,
948 history: new CircularArray()
949 },
950 waitTime: {
932fc8be 951 aggregate: 0,
8604aaab
JB
952 average: 0,
953 median: 0,
954 history: new CircularArray()
955 },
5df69fab
JB
956 elu: {
957 idle: {
958 aggregate: 0,
959 average: 0,
960 median: 0,
961 history: new CircularArray()
962 },
963 active: {
964 aggregate: 0,
965 average: 0,
966 median: 0,
967 history: new CircularArray()
968 },
969 utilization: 0
970 }
8604aaab
JB
971 }
972 }
c97c7edb 973}