test: improve pool options coverage
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
8604aaab
JB
24import type {
25 IWorker,
26 Task,
27 TaskStatistics,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba 31import {
f0d7f803 32 Measurements,
a35560ba 33 WorkerChoiceStrategies,
a20f0ba5
JB
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
bdaf31cd
JB
36} from './selection-strategies/selection-strategies-types'
37import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 38
729c563d 39/**
ea7a90d3 40 * Base class that implements some shared logic for all poolifier pools.
729c563d 41 *
38e795c1
JB
42 * @typeParam Worker - Type of worker which manages this pool.
43 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 44 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 45 */
c97c7edb 46export abstract class AbstractPool<
f06e48d8 47 Worker extends IWorker,
d3c8a1a8
S
48 Data = unknown,
49 Response = unknown
c4855468 50> implements IPool<Worker, Data, Response> {
afc003b2 51 /** @inheritDoc */
f06e48d8 52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 53
afc003b2 54 /** @inheritDoc */
7c0ba920
JB
55 public readonly emitter?: PoolEmitter
56
be0676b3 57 /**
a3445496 58 * The execution response promise map.
be0676b3 59 *
2740a743 60 * - `key`: The message id of each submitted task.
a3445496 61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 62 *
a3445496 63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 64 */
c923ce56
JB
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 69
a35560ba 70 /**
51fe3d3c 71 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
74 Worker,
75 Data,
76 Response
a35560ba
S
77 >
78
729c563d
S
79 /**
80 * Constructs a new poolifier pool.
81 *
38e795c1 82 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 83 * @param filePath - Path to the worker file.
38e795c1 84 * @param opts - Options for the pool.
729c563d 85 */
c97c7edb 86 public constructor (
b4213b7f
JB
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
c97c7edb 90 ) {
78cea37e 91 if (!this.isMain()) {
c97c7edb
S
92 throw new Error('Cannot start a pool from a worker!')
93 }
8d3782fa 94 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 95 this.checkFilePath(this.filePath)
7c0ba920 96 this.checkPoolOptions(this.opts)
1086026a 97
7254e419
JB
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
b6b32453
JB
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
c97c7edb
S
121 }
122
a35560ba 123 private checkFilePath (filePath: string): void {
ffcbbad8
JB
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
c510fea7
APA
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
8d3782fa
JB
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
78cea37e 137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 138 throw new TypeError(
0d80593b 139 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
140 )
141 } else if (numberOfWorkers < 0) {
473c717a 142 throw new RangeError(
8d3782fa
JB
143 'Cannot instantiate a pool with a negative number of workers'
144 )
6b27d407 145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
7c0ba920 150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
1f68cede 161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 174 }
aee46736
JB
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 181 throw new Error(
aee46736 182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
183 )
184 }
7c0ba920
JB
185 }
186
0d80593b
JB
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
49be33fe
JB
195 if (
196 workerChoiceStrategyOptions.weights != null &&
6b27d407 197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
f0d7f803
JB
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
0d80593b
JB
213 }
214
a20f0ba5
JB
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
0d80593b
JB
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
f0d7f803
JB
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
a20f0ba5 233 throw new Error(
f0d7f803 234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
235 )
236 }
237 }
238
08f3f44c 239 /** @inheritDoc */
6b27d407
JB
240 public get info (): PoolInfo {
241 return {
242 type: this.type,
184855e6 243 worker: this.worker,
6b27d407
JB
244 minSize: this.minSize,
245 maxSize: this.maxSize,
246 workerNodes: this.workerNodes.length,
247 idleWorkerNodes: this.workerNodes.reduce(
248 (accumulator, workerNode) =>
a4e07f72
JB
249 workerNode.workerUsage.tasks.executing === 0
250 ? accumulator + 1
251 : accumulator,
6b27d407
JB
252 0
253 ),
254 busyWorkerNodes: this.workerNodes.reduce(
255 (accumulator, workerNode) =>
a4e07f72
JB
256 workerNode.workerUsage.tasks.executing > 0
257 ? accumulator + 1
258 : accumulator,
6b27d407
JB
259 0
260 ),
a4e07f72 261 executedTasks: this.workerNodes.reduce(
6b27d407 262 (accumulator, workerNode) =>
a4e07f72
JB
263 accumulator + workerNode.workerUsage.tasks.executed,
264 0
265 ),
266 executingTasks: this.workerNodes.reduce(
267 (accumulator, workerNode) =>
268 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
269 0
270 ),
271 queuedTasks: this.workerNodes.reduce(
272 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
273 0
274 ),
275 maxQueuedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.tasksQueue.maxSize,
278 0
a4e07f72
JB
279 ),
280 failedTasks: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
282 accumulator + workerNode.workerUsage.tasks.failed,
283 0
6b27d407
JB
284 )
285 }
286 }
08f3f44c 287
8881ae32
JB
288 /**
289 * Pool type.
290 *
291 * If it is `'dynamic'`, it provides the `max` property.
292 */
293 protected abstract get type (): PoolType
294
184855e6
JB
295 /**
296 * Gets the worker type.
297 */
298 protected abstract get worker (): WorkerType
299
c2ade475 300 /**
6b27d407 301 * Pool minimum size.
c2ade475 302 */
6b27d407 303 protected abstract get minSize (): number
ff733df7
JB
304
305 /**
6b27d407 306 * Pool maximum size.
ff733df7 307 */
6b27d407 308 protected abstract get maxSize (): number
a35560ba 309
ffcbbad8 310 /**
f06e48d8 311 * Gets the given worker its worker node key.
ffcbbad8
JB
312 *
313 * @param worker - The worker.
f06e48d8 314 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 315 */
f06e48d8
JB
316 private getWorkerNodeKey (worker: Worker): number {
317 return this.workerNodes.findIndex(
318 workerNode => workerNode.worker === worker
319 )
bf9549ae
JB
320 }
321
afc003b2 322 /** @inheritDoc */
a35560ba 323 public setWorkerChoiceStrategy (
59219cbb
JB
324 workerChoiceStrategy: WorkerChoiceStrategy,
325 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 326 ): void {
aee46736 327 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 328 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
329 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
330 this.opts.workerChoiceStrategy
331 )
332 if (workerChoiceStrategyOptions != null) {
333 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
334 }
0ebe2a9f 335 for (const workerNode of this.workerNodes) {
8604aaab
JB
336 this.setWorkerNodeTasksUsage(
337 workerNode,
338 this.getWorkerUsage(workerNode.worker)
339 )
b6b32453 340 this.setWorkerStatistics(workerNode.worker)
59219cbb 341 }
a20f0ba5
JB
342 }
343
344 /** @inheritDoc */
345 public setWorkerChoiceStrategyOptions (
346 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
347 ): void {
0d80593b 348 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
349 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
350 this.workerChoiceStrategyContext.setOptions(
351 this.opts.workerChoiceStrategyOptions
a35560ba
S
352 )
353 }
354
a20f0ba5 355 /** @inheritDoc */
8f52842f
JB
356 public enableTasksQueue (
357 enable: boolean,
358 tasksQueueOptions?: TasksQueueOptions
359 ): void {
a20f0ba5 360 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 361 this.flushTasksQueues()
a20f0ba5
JB
362 }
363 this.opts.enableTasksQueue = enable
8f52842f 364 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
365 }
366
367 /** @inheritDoc */
8f52842f 368 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 369 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
370 this.checkValidTasksQueueOptions(tasksQueueOptions)
371 this.opts.tasksQueueOptions =
372 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 373 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
374 delete this.opts.tasksQueueOptions
375 }
376 }
377
378 private buildTasksQueueOptions (
379 tasksQueueOptions: TasksQueueOptions
380 ): TasksQueueOptions {
381 return {
382 concurrency: tasksQueueOptions?.concurrency ?? 1
383 }
384 }
385
c319c66b
JB
386 /**
387 * Whether the pool is full or not.
388 *
389 * The pool filling boolean status.
390 */
dea903a8
JB
391 protected get full (): boolean {
392 return this.workerNodes.length >= this.maxSize
393 }
c2ade475 394
c319c66b
JB
395 /**
396 * Whether the pool is busy or not.
397 *
398 * The pool busyness boolean status.
399 */
400 protected abstract get busy (): boolean
7c0ba920 401
6c6afb84
JB
402 /**
403 * Whether worker nodes are executing at least one task.
404 *
405 * @returns Worker nodes busyness boolean status.
406 */
c2ade475 407 protected internalBusy (): boolean {
e0ae6100
JB
408 return (
409 this.workerNodes.findIndex(workerNode => {
a4e07f72 410 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
411 }) === -1
412 )
cb70b19d
JB
413 }
414
afc003b2 415 /** @inheritDoc */
a86b6df1 416 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 417 const timestamp = performance.now()
20dcad1a 418 const workerNodeKey = this.chooseWorkerNode()
adc3c320 419 const submittedTask: Task<Data> = {
a86b6df1 420 name,
e5a5c0fc
JB
421 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
422 data: data ?? ({} as Data),
b6b32453 423 timestamp,
adc3c320
JB
424 id: crypto.randomUUID()
425 }
2e81254d 426 const res = new Promise<Response>((resolve, reject) => {
02706357 427 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
428 resolve,
429 reject,
20dcad1a 430 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
431 })
432 })
ff733df7
JB
433 if (
434 this.opts.enableTasksQueue === true &&
7171d33f 435 (this.busy ||
a4e07f72 436 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 437 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 438 .concurrency as number))
ff733df7 439 ) {
26a929d7
JB
440 this.enqueueTask(workerNodeKey, submittedTask)
441 } else {
2e81254d 442 this.executeTask(workerNodeKey, submittedTask)
adc3c320 443 }
ff733df7 444 this.checkAndEmitEvents()
78cea37e 445 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
446 return res
447 }
c97c7edb 448
afc003b2 449 /** @inheritDoc */
c97c7edb 450 public async destroy (): Promise<void> {
1fbcaa7c 451 await Promise.all(
875a7c37
JB
452 this.workerNodes.map(async (workerNode, workerNodeKey) => {
453 this.flushTasksQueue(workerNodeKey)
47aacbaa 454 // FIXME: wait for tasks to be finished
f06e48d8 455 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
456 })
457 )
c97c7edb
S
458 }
459
4a6952ff 460 /**
6c6afb84 461 * Terminates the given worker.
4a6952ff 462 *
f06e48d8 463 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
464 */
465 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 466
729c563d 467 /**
2e81254d 468 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 469 * Can be overridden
afc003b2
JB
470 *
471 * @virtual
729c563d 472 */
280c2a77 473 protected setupHook (): void {
d99ba5a8 474 // Intentionally empty
280c2a77 475 }
c97c7edb 476
729c563d 477 /**
280c2a77
S
478 * Should return whether the worker is the main worker or not.
479 */
480 protected abstract isMain (): boolean
481
482 /**
2e81254d 483 * Hook executed before the worker task execution.
bf9549ae 484 * Can be overridden.
729c563d 485 *
f06e48d8 486 * @param workerNodeKey - The worker node key.
1c6fe997 487 * @param task - The task to execute.
729c563d 488 */
1c6fe997
JB
489 protected beforeTaskExecutionHook (
490 workerNodeKey: number,
491 task: Task<Data>
492 ): void {
493 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
494 ++workerUsage.tasks.executing
495 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
496 }
497
c01733f1 498 /**
2e81254d 499 * Hook executed after the worker task execution.
bf9549ae 500 * Can be overridden.
c01733f1 501 *
c923ce56 502 * @param worker - The worker.
38e795c1 503 * @param message - The received message.
c01733f1 504 */
2e81254d 505 protected afterTaskExecutionHook (
c923ce56 506 worker: Worker,
2740a743 507 message: MessageValue<Response>
bf9549ae 508 ): void {
a4e07f72
JB
509 const workerUsage =
510 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
f1c06930
JB
511 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
512 this.updateRunTimeWorkerUsage(workerUsage, message)
513 this.updateEluWorkerUsage(workerUsage, message)
514 }
515
516 private updateTaskStatisticsWorkerUsage (
517 workerUsage: WorkerUsage,
518 message: MessageValue<Response>
519 ): void {
a4e07f72
JB
520 const workerTaskStatistics = workerUsage.tasks
521 --workerTaskStatistics.executing
522 ++workerTaskStatistics.executed
82f36766 523 if (message.taskError != null) {
a4e07f72 524 ++workerTaskStatistics.failed
2740a743 525 }
f8eb0a2a
JB
526 }
527
a4e07f72
JB
528 private updateRunTimeWorkerUsage (
529 workerUsage: WorkerUsage,
f8eb0a2a
JB
530 message: MessageValue<Response>
531 ): void {
87de9ff5
JB
532 if (
533 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 534 .aggregate
87de9ff5 535 ) {
932fc8be 536 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 537 if (
932fc8be
JB
538 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
539 .average &&
a4e07f72 540 workerUsage.tasks.executed !== 0
c6bd2650 541 ) {
a4e07f72 542 workerUsage.runTime.average =
f1c06930
JB
543 workerUsage.runTime.aggregate /
544 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 545 }
3fa4cdd2 546 if (
932fc8be
JB
547 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
548 .median &&
d715b7bc 549 message.taskPerformance?.runTime != null
3fa4cdd2 550 ) {
a4e07f72
JB
551 workerUsage.runTime.history.push(message.taskPerformance.runTime)
552 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 553 }
3032893a 554 }
f8eb0a2a
JB
555 }
556
a4e07f72
JB
557 private updateWaitTimeWorkerUsage (
558 workerUsage: WorkerUsage,
1c6fe997 559 task: Task<Data>
f8eb0a2a 560 ): void {
1c6fe997
JB
561 const timestamp = performance.now()
562 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
563 if (
564 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 565 .aggregate
87de9ff5 566 ) {
932fc8be 567 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 568 if (
87de9ff5 569 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 570 .waitTime.average &&
a4e07f72 571 workerUsage.tasks.executed !== 0
09a6305f 572 ) {
a4e07f72 573 workerUsage.waitTime.average =
f1c06930
JB
574 workerUsage.waitTime.aggregate /
575 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
576 }
577 if (
87de9ff5 578 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 579 .waitTime.median &&
1c6fe997 580 taskWaitTime != null
09a6305f 581 ) {
1c6fe997 582 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 583 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 584 }
0567595a 585 }
c01733f1 586 }
587
a4e07f72 588 private updateEluWorkerUsage (
5df69fab 589 workerUsage: WorkerUsage,
62c15a68
JB
590 message: MessageValue<Response>
591 ): void {
5df69fab
JB
592 if (
593 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
594 .aggregate
595 ) {
596 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
597 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
598 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
599 workerUsage.elu.utilization =
600 (workerUsage.elu.utilization +
601 message.taskPerformance.elu.utilization) /
602 2
603 } else if (message.taskPerformance?.elu != null) {
604 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
605 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
606 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
607 }
d715b7bc 608 if (
5df69fab
JB
609 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
610 .average &&
611 workerUsage.tasks.executed !== 0
612 ) {
f1c06930
JB
613 const executedTasks =
614 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 615 workerUsage.elu.idle.average =
f1c06930 616 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 617 workerUsage.elu.active.average =
f1c06930 618 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
619 }
620 if (
621 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
622 .median &&
d715b7bc
JB
623 message.taskPerformance?.elu != null
624 ) {
5df69fab
JB
625 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
626 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
627 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
628 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
629 }
630 }
631 }
632
280c2a77 633 /**
f06e48d8 634 * Chooses a worker node for the next task.
280c2a77 635 *
6c6afb84 636 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 637 *
20dcad1a 638 * @returns The worker node key
280c2a77 639 */
6c6afb84 640 private chooseWorkerNode (): number {
930dcf12 641 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
642 const worker = this.createAndSetupDynamicWorker()
643 if (
644 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
645 ) {
646 return this.getWorkerNodeKey(worker)
647 }
17393ac8 648 }
930dcf12
JB
649 return this.workerChoiceStrategyContext.execute()
650 }
651
6c6afb84
JB
652 /**
653 * Conditions for dynamic worker creation.
654 *
655 * @returns Whether to create a dynamic worker or not.
656 */
657 private shallCreateDynamicWorker (): boolean {
930dcf12 658 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
659 }
660
280c2a77 661 /**
675bb809 662 * Sends a message to the given worker.
280c2a77 663 *
38e795c1
JB
664 * @param worker - The worker which should receive the message.
665 * @param message - The message.
280c2a77
S
666 */
667 protected abstract sendToWorker (
668 worker: Worker,
669 message: MessageValue<Data>
670 ): void
671
4a6952ff 672 /**
f06e48d8 673 * Registers a listener callback on the given worker.
4a6952ff 674 *
38e795c1
JB
675 * @param worker - The worker which should register a listener.
676 * @param listener - The message listener callback.
4a6952ff
JB
677 */
678 protected abstract registerWorkerMessageListener<
4f7fa42a 679 Message extends Data | Response
78cea37e 680 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 681
729c563d 682 /**
41344292 683 * Creates a new worker.
6c6afb84
JB
684 *
685 * @returns Newly created worker.
729c563d 686 */
280c2a77 687 protected abstract createWorker (): Worker
c97c7edb 688
729c563d 689 /**
f06e48d8 690 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 691 *
38e795c1 692 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 693 *
38e795c1 694 * @param worker - The newly created worker.
729c563d 695 */
280c2a77 696 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 697
4a6952ff 698 /**
f06e48d8 699 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
700 *
701 * @returns New, completely set up worker.
702 */
703 protected createAndSetupWorker (): Worker {
bdacc2d2 704 const worker = this.createWorker()
280c2a77 705
35cf1c03 706 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 707 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
708 worker.on('error', error => {
709 if (this.emitter != null) {
710 this.emitter.emit(PoolEvents.error, error)
711 }
5baee0d7 712 if (this.opts.restartWorkerOnError === true) {
1f68cede 713 this.createAndSetupWorker()
5baee0d7
JB
714 }
715 })
a35560ba
S
716 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
717 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 718 worker.once('exit', () => {
f06e48d8 719 this.removeWorkerNode(worker)
a974afa6 720 })
280c2a77 721
f06e48d8 722 this.pushWorkerNode(worker)
280c2a77 723
b6b32453
JB
724 this.setWorkerStatistics(worker)
725
280c2a77
S
726 this.afterWorkerSetup(worker)
727
c97c7edb
S
728 return worker
729 }
be0676b3 730
930dcf12
JB
731 /**
732 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
733 *
734 * @returns New, completely set up dynamic worker.
735 */
736 protected createAndSetupDynamicWorker (): Worker {
737 const worker = this.createAndSetupWorker()
738 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 739 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
740 if (
741 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
742 (message.kill != null &&
743 ((this.opts.enableTasksQueue === false &&
e8b3a5ab
JB
744 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
745 0) ||
7b56f532 746 (this.opts.enableTasksQueue === true &&
e8b3a5ab
JB
747 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
748 0 &&
749 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
750 ) {
751 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
752 void (this.destroyWorker(worker) as Promise<void>)
753 }
754 })
755 return worker
756 }
757
be0676b3 758 /**
ff733df7 759 * This function is the listener registered for each worker message.
be0676b3 760 *
bdacc2d2 761 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
762 */
763 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 764 return message => {
b1989cfd 765 if (message.id != null) {
a3445496 766 // Task execution response received
2740a743 767 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 768 if (promiseResponse != null) {
82f36766
JB
769 if (message.taskError != null) {
770 promiseResponse.reject(message.taskError.message)
91ee39ed 771 if (this.emitter != null) {
82f36766 772 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 773 }
a05c10de 774 } else {
2740a743 775 promiseResponse.resolve(message.data as Response)
a05c10de 776 }
2e81254d 777 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 778 this.promiseResponseMap.delete(message.id)
ff733df7
JB
779 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
780 if (
781 this.opts.enableTasksQueue === true &&
416fd65c 782 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 783 ) {
2e81254d
JB
784 this.executeTask(
785 workerNodeKey,
ff733df7
JB
786 this.dequeueTask(workerNodeKey) as Task<Data>
787 )
788 }
e5536a06 789 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
790 }
791 }
792 }
be0676b3 793 }
7c0ba920 794
ff733df7 795 private checkAndEmitEvents (): void {
1f68cede 796 if (this.emitter != null) {
ff733df7 797 if (this.busy) {
6b27d407 798 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 799 }
6b27d407
JB
800 if (this.type === PoolTypes.dynamic && this.full) {
801 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 802 }
164d950a
JB
803 }
804 }
805
0ebe2a9f
JB
806 /**
807 * Sets the given worker node its tasks usage in the pool.
808 *
809 * @param workerNode - The worker node.
a4e07f72 810 * @param workerUsage - The worker usage.
0ebe2a9f
JB
811 */
812 private setWorkerNodeTasksUsage (
813 workerNode: WorkerNode<Worker, Data>,
a4e07f72 814 workerUsage: WorkerUsage
0ebe2a9f 815 ): void {
a4e07f72 816 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
817 }
818
a05c10de 819 /**
f06e48d8 820 * Pushes the given worker in the pool worker nodes.
ea7a90d3 821 *
38e795c1 822 * @param worker - The worker.
f06e48d8 823 * @returns The worker nodes length.
ea7a90d3 824 */
f06e48d8
JB
825 private pushWorkerNode (worker: Worker): number {
826 return this.workerNodes.push({
ffcbbad8 827 worker,
8604aaab 828 workerUsage: this.getWorkerUsage(worker),
29ee7e9a 829 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
830 })
831 }
c923ce56 832
8604aaab
JB
833 // /**
834 // * Sets the given worker in the pool worker nodes.
835 // *
836 // * @param workerNodeKey - The worker node key.
837 // * @param worker - The worker.
838 // * @param workerUsage - The worker usage.
839 // * @param tasksQueue - The worker task queue.
840 // */
841 // private setWorkerNode (
842 // workerNodeKey: number,
843 // worker: Worker,
844 // workerUsage: WorkerUsage,
845 // tasksQueue: Queue<Task<Data>>
846 // ): void {
847 // this.workerNodes[workerNodeKey] = {
848 // worker,
849 // workerUsage,
850 // tasksQueue
851 // }
852 // }
51fe3d3c
JB
853
854 /**
f06e48d8 855 * Removes the given worker from the pool worker nodes.
51fe3d3c 856 *
f06e48d8 857 * @param worker - The worker.
51fe3d3c 858 */
416fd65c 859 private removeWorkerNode (worker: Worker): void {
f06e48d8 860 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
861 if (workerNodeKey !== -1) {
862 this.workerNodes.splice(workerNodeKey, 1)
863 this.workerChoiceStrategyContext.remove(workerNodeKey)
864 }
51fe3d3c 865 }
adc3c320 866
2e81254d 867 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 868 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
869 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
870 }
871
f9f00b5f 872 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 873 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
874 }
875
416fd65c 876 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 877 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
878 }
879
416fd65c 880 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 881 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 882 }
ff733df7 883
416fd65c
JB
884 private flushTasksQueue (workerNodeKey: number): void {
885 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
886 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
887 this.executeTask(
888 workerNodeKey,
889 this.dequeueTask(workerNodeKey) as Task<Data>
890 )
ff733df7 891 }
ff733df7
JB
892 }
893 }
894
ef41a6e6
JB
895 private flushTasksQueues (): void {
896 for (const [workerNodeKey] of this.workerNodes.entries()) {
897 this.flushTasksQueue(workerNodeKey)
898 }
899 }
b6b32453
JB
900
901 private setWorkerStatistics (worker: Worker): void {
902 this.sendToWorker(worker, {
903 statistics: {
87de9ff5
JB
904 runTime:
905 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 906 .runTime.aggregate,
87de9ff5 907 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 908 .elu.aggregate
b6b32453
JB
909 }
910 })
911 }
8604aaab
JB
912
913 private getWorkerUsage (worker: Worker): WorkerUsage {
914 return {
1c6fe997 915 tasks: this.getTaskStatistics(worker),
8604aaab 916 runTime: {
932fc8be 917 aggregate: 0,
8604aaab
JB
918 average: 0,
919 median: 0,
920 history: new CircularArray()
921 },
922 waitTime: {
932fc8be 923 aggregate: 0,
8604aaab
JB
924 average: 0,
925 median: 0,
926 history: new CircularArray()
927 },
5df69fab
JB
928 elu: {
929 idle: {
930 aggregate: 0,
931 average: 0,
932 median: 0,
933 history: new CircularArray()
934 },
935 active: {
936 aggregate: 0,
937 average: 0,
938 median: 0,
939 history: new CircularArray()
940 },
941 utilization: 0
942 }
8604aaab
JB
943 }
944 }
945
1c6fe997
JB
946 private getTaskStatistics (worker: Worker): TaskStatistics {
947 const queueSize =
948 this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
8604aaab
JB
949 return {
950 executed: 0,
951 executing: 0,
952 get queued (): number {
1c6fe997 953 return queueSize ?? 0
8604aaab
JB
954 },
955 failed: 0
956 }
957 }
c97c7edb 958}