docs: refine code comment
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf
JB
15 median,
16 round
bbeadd16 17} from '../utils'
59317253 18import { KillBehaviors } from '../worker/worker-options'
c4855468 19import {
65d7a1c9 20 type IPool,
7c5a1080 21 PoolEmitter,
c4855468 22 PoolEvents,
6b27d407 23 type PoolInfo,
c4855468 24 type PoolOptions,
6b27d407
JB
25 type PoolType,
26 PoolTypes,
4b628b48 27 type TasksQueueOptions
c4855468 28} from './pool'
e102732c
JB
29import type {
30 IWorker,
4b628b48 31 IWorkerNode,
e102732c 32 MessageHandler,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
f0d7f803 38 Measurements,
a35560ba 39 WorkerChoiceStrategies,
a20f0ba5
JB
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
bdaf31cd
JB
42} from './selection-strategies/selection-strategies-types'
43import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 44import { version } from './version'
4b628b48 45import { WorkerNode } from './worker-node'
23ccf9d7 46
729c563d 47/**
ea7a90d3 48 * Base class that implements some shared logic for all poolifier pools.
729c563d 49 *
38e795c1 50 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 53 */
c97c7edb 54export abstract class AbstractPool<
f06e48d8 55 Worker extends IWorker,
d3c8a1a8
S
56 Data = unknown,
57 Response = unknown
c4855468 58> implements IPool<Worker, Data, Response> {
afc003b2 59 /** @inheritDoc */
4b628b48 60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 61
afc003b2 62 /** @inheritDoc */
7c0ba920
JB
63 public readonly emitter?: PoolEmitter
64
be0676b3 65 /**
a3445496 66 * The execution response promise map.
be0676b3 67 *
2740a743 68 * - `key`: The message id of each submitted task.
a3445496 69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 70 *
a3445496 71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 72 */
c923ce56
JB
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
075e51d1 87 /**
adc9cc64 88 * Whether the pool is starting or not.
075e51d1
JB
89 */
90 private readonly starting: boolean
afe0d5bf
JB
91 /**
92 * The start timestamp of the pool.
93 */
94 private readonly startTimestamp
95
729c563d
S
96 /**
97 * Constructs a new poolifier pool.
98 *
38e795c1 99 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 100 * @param filePath - Path to the worker file.
38e795c1 101 * @param opts - Options for the pool.
729c563d 102 */
c97c7edb 103 public constructor (
b4213b7f
JB
104 protected readonly numberOfWorkers: number,
105 protected readonly filePath: string,
106 protected readonly opts: PoolOptions<Worker>
c97c7edb 107 ) {
78cea37e 108 if (!this.isMain()) {
c97c7edb
S
109 throw new Error('Cannot start a pool from a worker!')
110 }
8d3782fa 111 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 112 this.checkFilePath(this.filePath)
7c0ba920 113 this.checkPoolOptions(this.opts)
1086026a 114
7254e419
JB
115 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
116 this.executeTask = this.executeTask.bind(this)
117 this.enqueueTask = this.enqueueTask.bind(this)
118 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 119
6bd72cd0 120 if (this.opts.enableEvents === true) {
7c0ba920
JB
121 this.emitter = new PoolEmitter()
122 }
d59df138
JB
123 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
124 Worker,
125 Data,
126 Response
da309861
JB
127 >(
128 this,
129 this.opts.workerChoiceStrategy,
130 this.opts.workerChoiceStrategyOptions
131 )
b6b32453
JB
132
133 this.setupHook()
134
075e51d1 135 this.starting = true
b97d82d8
JB
136 while (
137 this.workerNodes.reduce(
138 (accumulator, workerNode) =>
139 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
140 0
141 ) < this.numberOfWorkers
142 ) {
b6b32453
JB
143 this.createAndSetupWorker()
144 }
075e51d1 145 this.starting = false
afe0d5bf
JB
146
147 this.startTimestamp = performance.now()
c97c7edb
S
148 }
149
a35560ba 150 private checkFilePath (filePath: string): void {
ffcbbad8
JB
151 if (
152 filePath == null ||
3d6dd312 153 typeof filePath !== 'string' ||
ffcbbad8
JB
154 (typeof filePath === 'string' && filePath.trim().length === 0)
155 ) {
c510fea7
APA
156 throw new Error('Please specify a file with a worker implementation')
157 }
3d6dd312
JB
158 if (!existsSync(filePath)) {
159 throw new Error(`Cannot find the worker file '${filePath}'`)
160 }
c510fea7
APA
161 }
162
8d3782fa
JB
163 private checkNumberOfWorkers (numberOfWorkers: number): void {
164 if (numberOfWorkers == null) {
165 throw new Error(
166 'Cannot instantiate a pool without specifying the number of workers'
167 )
78cea37e 168 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 169 throw new TypeError(
0d80593b 170 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
171 )
172 } else if (numberOfWorkers < 0) {
473c717a 173 throw new RangeError(
8d3782fa
JB
174 'Cannot instantiate a pool with a negative number of workers'
175 )
6b27d407 176 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
177 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
178 }
179 }
180
181 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
182 if (this.type === PoolTypes.dynamic) {
183 if (min > max) {
184 throw new RangeError(
185 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
186 )
b97d82d8 187 } else if (max === 0) {
079de991 188 throw new RangeError(
b97d82d8 189 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
190 )
191 } else if (min === max) {
192 throw new RangeError(
193 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
194 )
195 }
8d3782fa
JB
196 }
197 }
198
7c0ba920 199 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
200 if (isPlainObject(opts)) {
201 this.opts.workerChoiceStrategy =
202 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
203 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
204 this.opts.workerChoiceStrategyOptions =
205 opts.workerChoiceStrategyOptions ??
206 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
207 this.checkValidWorkerChoiceStrategyOptions(
208 this.opts.workerChoiceStrategyOptions
209 )
1f68cede 210 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
211 this.opts.enableEvents = opts.enableEvents ?? true
212 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
213 if (this.opts.enableTasksQueue) {
214 this.checkValidTasksQueueOptions(
215 opts.tasksQueueOptions as TasksQueueOptions
216 )
217 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
218 opts.tasksQueueOptions as TasksQueueOptions
219 )
220 }
221 } else {
222 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 223 }
aee46736
JB
224 }
225
226 private checkValidWorkerChoiceStrategy (
227 workerChoiceStrategy: WorkerChoiceStrategy
228 ): void {
229 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 230 throw new Error(
aee46736 231 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
232 )
233 }
7c0ba920
JB
234 }
235
0d80593b
JB
236 private checkValidWorkerChoiceStrategyOptions (
237 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
238 ): void {
239 if (!isPlainObject(workerChoiceStrategyOptions)) {
240 throw new TypeError(
241 'Invalid worker choice strategy options: must be a plain object'
242 )
243 }
49be33fe
JB
244 if (
245 workerChoiceStrategyOptions.weights != null &&
6b27d407 246 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
247 ) {
248 throw new Error(
249 'Invalid worker choice strategy options: must have a weight for each worker node'
250 )
251 }
f0d7f803
JB
252 if (
253 workerChoiceStrategyOptions.measurement != null &&
254 !Object.values(Measurements).includes(
255 workerChoiceStrategyOptions.measurement
256 )
257 ) {
258 throw new Error(
259 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
260 )
261 }
0d80593b
JB
262 }
263
a20f0ba5
JB
264 private checkValidTasksQueueOptions (
265 tasksQueueOptions: TasksQueueOptions
266 ): void {
0d80593b
JB
267 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
268 throw new TypeError('Invalid tasks queue options: must be a plain object')
269 }
f0d7f803
JB
270 if (
271 tasksQueueOptions?.concurrency != null &&
272 !Number.isSafeInteger(tasksQueueOptions.concurrency)
273 ) {
274 throw new TypeError(
275 'Invalid worker tasks concurrency: must be an integer'
276 )
277 }
278 if (
279 tasksQueueOptions?.concurrency != null &&
280 tasksQueueOptions.concurrency <= 0
281 ) {
a20f0ba5 282 throw new Error(
f0d7f803 283 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
284 )
285 }
286 }
287
08f3f44c 288 /** @inheritDoc */
6b27d407
JB
289 public get info (): PoolInfo {
290 return {
23ccf9d7 291 version,
6b27d407 292 type: this.type,
184855e6 293 worker: this.worker,
2431bdb4
JB
294 ready: this.ready,
295 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
296 minSize: this.minSize,
297 maxSize: this.maxSize,
c05f0d50
JB
298 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
299 .runTime.aggregate &&
1305e9a8
JB
300 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
301 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
302 workerNodes: this.workerNodes.length,
303 idleWorkerNodes: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
f59e1027 305 workerNode.usage.tasks.executing === 0
a4e07f72
JB
306 ? accumulator + 1
307 : accumulator,
6b27d407
JB
308 0
309 ),
310 busyWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
f59e1027 312 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
313 0
314 ),
a4e07f72 315 executedTasks: this.workerNodes.reduce(
6b27d407 316 (accumulator, workerNode) =>
f59e1027 317 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
318 0
319 ),
320 executingTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
323 0
324 ),
325 queuedTasks: this.workerNodes.reduce(
df593701 326 (accumulator, workerNode) =>
f59e1027 327 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
328 0
329 ),
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
b25a42cd 332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 333 0
a4e07f72
JB
334 ),
335 failedTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
f59e1027 337 accumulator + workerNode.usage.tasks.failed,
a4e07f72 338 0
1dcf8b7b
JB
339 ),
340 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
341 .runTime.aggregate && {
342 runTime: {
98e72cda
JB
343 minimum: round(
344 Math.min(
345 ...this.workerNodes.map(
346 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
347 )
1dcf8b7b
JB
348 )
349 ),
98e72cda
JB
350 maximum: round(
351 Math.max(
352 ...this.workerNodes.map(
353 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
354 )
1dcf8b7b 355 )
98e72cda
JB
356 ),
357 average: round(
358 this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
361 0
362 ) /
363 this.workerNodes.reduce(
364 (accumulator, workerNode) =>
365 accumulator + (workerNode.usage.tasks?.executed ?? 0),
366 0
367 )
368 ),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .runTime.median && {
371 median: round(
372 median(
373 this.workerNodes.map(
374 workerNode => workerNode.usage.runTime?.median ?? 0
375 )
376 )
377 )
378 })
1dcf8b7b
JB
379 }
380 }),
381 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
382 .waitTime.aggregate && {
383 waitTime: {
98e72cda
JB
384 minimum: round(
385 Math.min(
386 ...this.workerNodes.map(
387 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
388 )
1dcf8b7b
JB
389 )
390 ),
98e72cda
JB
391 maximum: round(
392 Math.max(
393 ...this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
395 )
1dcf8b7b 396 )
98e72cda
JB
397 ),
398 average: round(
399 this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
402 0
403 ) /
404 this.workerNodes.reduce(
405 (accumulator, workerNode) =>
406 accumulator + (workerNode.usage.tasks?.executed ?? 0),
407 0
408 )
409 ),
410 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
411 .waitTime.median && {
412 median: round(
413 median(
414 this.workerNodes.map(
415 workerNode => workerNode.usage.waitTime?.median ?? 0
416 )
417 )
418 )
419 })
1dcf8b7b
JB
420 }
421 })
6b27d407
JB
422 }
423 }
08f3f44c 424
2431bdb4
JB
425 private get ready (): boolean {
426 return (
b97d82d8
JB
427 this.workerNodes.reduce(
428 (accumulator, workerNode) =>
429 !workerNode.info.dynamic && workerNode.info.ready
430 ? accumulator + 1
431 : accumulator,
432 0
433 ) >= this.minSize
2431bdb4
JB
434 )
435 }
436
afe0d5bf
JB
437 /**
438 * Gets the approximate pool utilization.
439 *
440 * @returns The pool utilization.
441 */
442 private get utilization (): number {
8e5ca040 443 const poolTimeCapacity =
fe7d90db 444 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
445 const totalTasksRunTime = this.workerNodes.reduce(
446 (accumulator, workerNode) =>
71514351 447 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
448 0
449 )
450 const totalTasksWaitTime = this.workerNodes.reduce(
451 (accumulator, workerNode) =>
71514351 452 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
453 0
454 )
8e5ca040 455 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
456 }
457
8881ae32
JB
458 /**
459 * Pool type.
460 *
461 * If it is `'dynamic'`, it provides the `max` property.
462 */
463 protected abstract get type (): PoolType
464
184855e6
JB
465 /**
466 * Gets the worker type.
467 */
468 protected abstract get worker (): WorkerType
469
c2ade475 470 /**
6b27d407 471 * Pool minimum size.
c2ade475 472 */
6b27d407 473 protected abstract get minSize (): number
ff733df7
JB
474
475 /**
6b27d407 476 * Pool maximum size.
ff733df7 477 */
6b27d407 478 protected abstract get maxSize (): number
a35560ba 479
f59e1027
JB
480 /**
481 * Get the worker given its id.
482 *
483 * @param workerId - The worker id.
484 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
485 */
486 private getWorkerById (workerId: number): Worker | undefined {
487 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
488 ?.worker
489 }
490
6b813701
JB
491 /**
492 * Checks if the worker id sent in the received message from a worker is valid.
493 *
494 * @param message - The received message.
495 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
496 */
21f710aa
JB
497 private checkMessageWorkerId (message: MessageValue<Response>): void {
498 if (
499 message.workerId != null &&
500 this.getWorkerById(message.workerId) == null
501 ) {
502 throw new Error(
503 `Worker message received from unknown worker '${message.workerId}'`
504 )
505 }
506 }
507
ffcbbad8 508 /**
f06e48d8 509 * Gets the given worker its worker node key.
ffcbbad8
JB
510 *
511 * @param worker - The worker.
f59e1027 512 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 513 */
f06e48d8
JB
514 private getWorkerNodeKey (worker: Worker): number {
515 return this.workerNodes.findIndex(
516 workerNode => workerNode.worker === worker
517 )
bf9549ae
JB
518 }
519
afc003b2 520 /** @inheritDoc */
a35560ba 521 public setWorkerChoiceStrategy (
59219cbb
JB
522 workerChoiceStrategy: WorkerChoiceStrategy,
523 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 524 ): void {
aee46736 525 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 526 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
527 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
528 this.opts.workerChoiceStrategy
529 )
530 if (workerChoiceStrategyOptions != null) {
531 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
532 }
8a1260a3 533 for (const workerNode of this.workerNodes) {
4b628b48 534 workerNode.resetUsage()
b6b32453 535 this.setWorkerStatistics(workerNode.worker)
59219cbb 536 }
a20f0ba5
JB
537 }
538
539 /** @inheritDoc */
540 public setWorkerChoiceStrategyOptions (
541 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
542 ): void {
0d80593b 543 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
544 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
545 this.workerChoiceStrategyContext.setOptions(
546 this.opts.workerChoiceStrategyOptions
a35560ba
S
547 )
548 }
549
a20f0ba5 550 /** @inheritDoc */
8f52842f
JB
551 public enableTasksQueue (
552 enable: boolean,
553 tasksQueueOptions?: TasksQueueOptions
554 ): void {
a20f0ba5 555 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 556 this.flushTasksQueues()
a20f0ba5
JB
557 }
558 this.opts.enableTasksQueue = enable
8f52842f 559 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
560 }
561
562 /** @inheritDoc */
8f52842f 563 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 564 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
565 this.checkValidTasksQueueOptions(tasksQueueOptions)
566 this.opts.tasksQueueOptions =
567 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 568 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
569 delete this.opts.tasksQueueOptions
570 }
571 }
572
573 private buildTasksQueueOptions (
574 tasksQueueOptions: TasksQueueOptions
575 ): TasksQueueOptions {
576 return {
577 concurrency: tasksQueueOptions?.concurrency ?? 1
578 }
579 }
580
c319c66b
JB
581 /**
582 * Whether the pool is full or not.
583 *
584 * The pool filling boolean status.
585 */
dea903a8
JB
586 protected get full (): boolean {
587 return this.workerNodes.length >= this.maxSize
588 }
c2ade475 589
c319c66b
JB
590 /**
591 * Whether the pool is busy or not.
592 *
593 * The pool busyness boolean status.
594 */
595 protected abstract get busy (): boolean
7c0ba920 596
6c6afb84
JB
597 /**
598 * Whether worker nodes are executing at least one task.
599 *
600 * @returns Worker nodes busyness boolean status.
601 */
c2ade475 602 protected internalBusy (): boolean {
e0ae6100
JB
603 return (
604 this.workerNodes.findIndex(workerNode => {
f59e1027 605 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
606 }) === -1
607 )
cb70b19d
JB
608 }
609
afc003b2 610 /** @inheritDoc */
a86b6df1 611 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 612 const timestamp = performance.now()
20dcad1a 613 const workerNodeKey = this.chooseWorkerNode()
adc3c320 614 const submittedTask: Task<Data> = {
ff128cc9 615 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
616 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
617 data: data ?? ({} as Data),
b6b32453 618 timestamp,
21f710aa 619 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 620 id: randomUUID()
adc3c320 621 }
2e81254d 622 const res = new Promise<Response>((resolve, reject) => {
02706357 623 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
624 resolve,
625 reject,
20dcad1a 626 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
627 })
628 })
ff733df7
JB
629 if (
630 this.opts.enableTasksQueue === true &&
7171d33f 631 (this.busy ||
f59e1027 632 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 633 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 634 .concurrency as number))
ff733df7 635 ) {
26a929d7
JB
636 this.enqueueTask(workerNodeKey, submittedTask)
637 } else {
2e81254d 638 this.executeTask(workerNodeKey, submittedTask)
adc3c320 639 }
ff733df7 640 this.checkAndEmitEvents()
78cea37e 641 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
642 return res
643 }
c97c7edb 644
afc003b2 645 /** @inheritDoc */
c97c7edb 646 public async destroy (): Promise<void> {
1fbcaa7c 647 await Promise.all(
875a7c37
JB
648 this.workerNodes.map(async (workerNode, workerNodeKey) => {
649 this.flushTasksQueue(workerNodeKey)
47aacbaa 650 // FIXME: wait for tasks to be finished
920278a2
JB
651 const workerExitPromise = new Promise<void>(resolve => {
652 workerNode.worker.on('exit', () => {
653 resolve()
654 })
655 })
f06e48d8 656 await this.destroyWorker(workerNode.worker)
920278a2 657 await workerExitPromise
1fbcaa7c
JB
658 })
659 )
c97c7edb
S
660 }
661
4a6952ff 662 /**
6c6afb84 663 * Terminates the given worker.
4a6952ff 664 *
f06e48d8 665 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
666 */
667 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 668
729c563d 669 /**
6677a3d3
JB
670 * Setup hook to execute code before worker nodes are created in the abstract constructor.
671 * Can be overridden.
afc003b2
JB
672 *
673 * @virtual
729c563d 674 */
280c2a77 675 protected setupHook (): void {
d99ba5a8 676 // Intentionally empty
280c2a77 677 }
c97c7edb 678
729c563d 679 /**
280c2a77
S
680 * Should return whether the worker is the main worker or not.
681 */
682 protected abstract isMain (): boolean
683
684 /**
2e81254d 685 * Hook executed before the worker task execution.
bf9549ae 686 * Can be overridden.
729c563d 687 *
f06e48d8 688 * @param workerNodeKey - The worker node key.
1c6fe997 689 * @param task - The task to execute.
729c563d 690 */
1c6fe997
JB
691 protected beforeTaskExecutionHook (
692 workerNodeKey: number,
693 task: Task<Data>
694 ): void {
f59e1027 695 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
696 ++workerUsage.tasks.executing
697 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 698 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
699 task.name as string
700 ) as WorkerUsage
eb8afc8a
JB
701 ++taskWorkerUsage.tasks.executing
702 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
703 }
704
c01733f1 705 /**
2e81254d 706 * Hook executed after the worker task execution.
bf9549ae 707 * Can be overridden.
c01733f1 708 *
c923ce56 709 * @param worker - The worker.
38e795c1 710 * @param message - The received message.
c01733f1 711 */
2e81254d 712 protected afterTaskExecutionHook (
c923ce56 713 worker: Worker,
2740a743 714 message: MessageValue<Response>
bf9549ae 715 ): void {
ff128cc9
JB
716 const workerNodeKey = this.getWorkerNodeKey(worker)
717 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
718 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
719 this.updateRunTimeWorkerUsage(workerUsage, message)
720 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 721 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 722 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 723 ) as WorkerUsage
eb8afc8a
JB
724 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
725 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
726 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
727 }
728
729 private updateTaskStatisticsWorkerUsage (
730 workerUsage: WorkerUsage,
731 message: MessageValue<Response>
732 ): void {
a4e07f72
JB
733 const workerTaskStatistics = workerUsage.tasks
734 --workerTaskStatistics.executing
98e72cda
JB
735 if (message.taskError == null) {
736 ++workerTaskStatistics.executed
737 } else {
a4e07f72 738 ++workerTaskStatistics.failed
2740a743 739 }
f8eb0a2a
JB
740 }
741
a4e07f72
JB
742 private updateRunTimeWorkerUsage (
743 workerUsage: WorkerUsage,
f8eb0a2a
JB
744 message: MessageValue<Response>
745 ): void {
87de9ff5
JB
746 if (
747 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 748 .aggregate
87de9ff5 749 ) {
f7510105 750 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
751 workerUsage.runTime.aggregate =
752 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
753 workerUsage.runTime.minimum = Math.min(
754 taskRunTime,
71514351 755 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
756 )
757 workerUsage.runTime.maximum = Math.max(
758 taskRunTime,
71514351 759 workerUsage.runTime?.maximum ?? -Infinity
f7510105 760 )
c6bd2650 761 if (
932fc8be
JB
762 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
763 .average &&
a4e07f72 764 workerUsage.tasks.executed !== 0
c6bd2650 765 ) {
a4e07f72 766 workerUsage.runTime.average =
98e72cda 767 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 768 }
3fa4cdd2 769 if (
932fc8be
JB
770 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
771 .median &&
d715b7bc 772 message.taskPerformance?.runTime != null
3fa4cdd2 773 ) {
a4e07f72
JB
774 workerUsage.runTime.history.push(message.taskPerformance.runTime)
775 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 776 }
3032893a 777 }
f8eb0a2a
JB
778 }
779
a4e07f72
JB
780 private updateWaitTimeWorkerUsage (
781 workerUsage: WorkerUsage,
1c6fe997 782 task: Task<Data>
f8eb0a2a 783 ): void {
1c6fe997
JB
784 const timestamp = performance.now()
785 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
786 if (
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 788 .aggregate
87de9ff5 789 ) {
71514351
JB
790 workerUsage.waitTime.aggregate =
791 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
792 workerUsage.waitTime.minimum = Math.min(
793 taskWaitTime,
71514351 794 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
795 )
796 workerUsage.waitTime.maximum = Math.max(
797 taskWaitTime,
71514351 798 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 799 )
09a6305f 800 if (
87de9ff5 801 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 802 .waitTime.average &&
a4e07f72 803 workerUsage.tasks.executed !== 0
09a6305f 804 ) {
a4e07f72 805 workerUsage.waitTime.average =
98e72cda 806 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
807 }
808 if (
87de9ff5 809 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 810 .waitTime.median
09a6305f 811 ) {
1c6fe997 812 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 813 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 814 }
0567595a 815 }
c01733f1 816 }
817
a4e07f72 818 private updateEluWorkerUsage (
5df69fab 819 workerUsage: WorkerUsage,
62c15a68
JB
820 message: MessageValue<Response>
821 ): void {
5df69fab
JB
822 if (
823 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
824 .aggregate
825 ) {
f7510105 826 if (message.taskPerformance?.elu != null) {
71514351
JB
827 workerUsage.elu.idle.aggregate =
828 (workerUsage.elu.idle?.aggregate ?? 0) +
829 message.taskPerformance.elu.idle
830 workerUsage.elu.active.aggregate =
831 (workerUsage.elu.active?.aggregate ?? 0) +
832 message.taskPerformance.elu.active
f7510105
JB
833 if (workerUsage.elu.utilization != null) {
834 workerUsage.elu.utilization =
835 (workerUsage.elu.utilization +
836 message.taskPerformance.elu.utilization) /
837 2
838 } else {
839 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
840 }
841 workerUsage.elu.idle.minimum = Math.min(
842 message.taskPerformance.elu.idle,
71514351 843 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
844 )
845 workerUsage.elu.idle.maximum = Math.max(
846 message.taskPerformance.elu.idle,
71514351 847 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
848 )
849 workerUsage.elu.active.minimum = Math.min(
850 message.taskPerformance.elu.active,
71514351 851 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
852 )
853 workerUsage.elu.active.maximum = Math.max(
854 message.taskPerformance.elu.active,
71514351 855 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 856 )
71514351
JB
857 if (
858 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
859 .average &&
860 workerUsage.tasks.executed !== 0
861 ) {
71514351 862 workerUsage.elu.idle.average =
98e72cda 863 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 864 workerUsage.elu.active.average =
98e72cda 865 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
866 }
867 if (
868 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 869 .median
71514351
JB
870 ) {
871 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
872 workerUsage.elu.active.history.push(
873 message.taskPerformance.elu.active
874 )
875 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
876 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
877 }
62c15a68
JB
878 }
879 }
880 }
881
280c2a77 882 /**
f06e48d8 883 * Chooses a worker node for the next task.
280c2a77 884 *
6c6afb84 885 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 886 *
20dcad1a 887 * @returns The worker node key
280c2a77 888 */
6c6afb84 889 private chooseWorkerNode (): number {
930dcf12 890 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
891 const worker = this.createAndSetupDynamicWorker()
892 if (
893 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
894 ) {
895 return this.getWorkerNodeKey(worker)
896 }
17393ac8 897 }
930dcf12
JB
898 return this.workerChoiceStrategyContext.execute()
899 }
900
6c6afb84
JB
901 /**
902 * Conditions for dynamic worker creation.
903 *
904 * @returns Whether to create a dynamic worker or not.
905 */
906 private shallCreateDynamicWorker (): boolean {
930dcf12 907 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
908 }
909
280c2a77 910 /**
675bb809 911 * Sends a message to the given worker.
280c2a77 912 *
38e795c1
JB
913 * @param worker - The worker which should receive the message.
914 * @param message - The message.
280c2a77
S
915 */
916 protected abstract sendToWorker (
917 worker: Worker,
918 message: MessageValue<Data>
919 ): void
920
729c563d 921 /**
41344292 922 * Creates a new worker.
6c6afb84
JB
923 *
924 * @returns Newly created worker.
729c563d 925 */
280c2a77 926 protected abstract createWorker (): Worker
c97c7edb 927
4a6952ff 928 /**
f06e48d8 929 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
930 *
931 * @returns New, completely set up worker.
932 */
933 protected createAndSetupWorker (): Worker {
bdacc2d2 934 const worker = this.createWorker()
280c2a77 935
35cf1c03 936 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 937 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 938 worker.on('error', error => {
9b106837
JB
939 const workerNodeKey = this.getWorkerNodeKey(worker)
940 const workerInfo = this.getWorkerInfo(workerNodeKey)
941 workerInfo.ready = false
1f68cede
JB
942 if (this.emitter != null) {
943 this.emitter.emit(PoolEvents.error, error)
944 }
2431bdb4 945 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 946 if (workerInfo.dynamic) {
8a1260a3
JB
947 this.createAndSetupDynamicWorker()
948 } else {
949 this.createAndSetupWorker()
950 }
5baee0d7 951 }
19dbc45b 952 if (this.opts.enableTasksQueue === true) {
9b106837 953 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 954 }
5baee0d7 955 })
a35560ba
S
956 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
957 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 958 worker.once('exit', () => {
f06e48d8 959 this.removeWorkerNode(worker)
a974afa6 960 })
280c2a77 961
b0a4db63 962 this.addWorkerNode(worker)
280c2a77
S
963
964 this.afterWorkerSetup(worker)
965
c97c7edb
S
966 return worker
967 }
be0676b3 968
930dcf12
JB
969 /**
970 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
971 *
972 * @returns New, completely set up dynamic worker.
973 */
974 protected createAndSetupDynamicWorker (): Worker {
975 const worker = this.createAndSetupWorker()
976 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 977 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
978 if (
979 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
980 (message.kill != null &&
981 ((this.opts.enableTasksQueue === false &&
f59e1027 982 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 983 (this.opts.enableTasksQueue === true &&
f59e1027 984 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 985 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
986 ) {
987 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
988 void (this.destroyWorker(worker) as Promise<void>)
989 }
990 })
21f710aa 991 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
b0a4db63 992 workerInfo.dynamic = true
b97d82d8
JB
993 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
994 workerInfo.ready = true
995 }
21f710aa 996 this.sendToWorker(worker, {
b0a4db63 997 checkActive: true,
21f710aa
JB
998 workerId: workerInfo.id as number
999 })
930dcf12
JB
1000 return worker
1001 }
1002
a2ed5053
JB
1003 /**
1004 * Registers a listener callback on the given worker.
1005 *
1006 * @param worker - The worker which should register a listener.
1007 * @param listener - The message listener callback.
1008 */
1009 private registerWorkerMessageListener<Message extends Data | Response>(
1010 worker: Worker,
1011 listener: (message: MessageValue<Message>) => void
1012 ): void {
1013 worker.on('message', listener as MessageHandler<Worker>)
1014 }
1015
1016 /**
1017 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1018 * Can be overridden.
1019 *
1020 * @param worker - The newly created worker.
1021 */
1022 protected afterWorkerSetup (worker: Worker): void {
1023 // Listen to worker messages.
1024 this.registerWorkerMessageListener(worker, this.workerListener())
1025 // Send startup message to worker.
d2c73f82
JB
1026 this.sendWorkerStartupMessage(worker)
1027 // Setup worker task statistics computation.
1028 this.setWorkerStatistics(worker)
1029 }
1030
1031 private sendWorkerStartupMessage (worker: Worker): void {
a2ed5053
JB
1032 this.sendToWorker(worker, {
1033 ready: false,
1034 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1035 })
a2ed5053
JB
1036 }
1037
1038 private redistributeQueuedTasks (workerNodeKey: number): void {
1039 while (this.tasksQueueSize(workerNodeKey) > 0) {
1040 let targetWorkerNodeKey: number = workerNodeKey
1041 let minQueuedTasks = Infinity
1042 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1043 const workerInfo = this.getWorkerInfo(workerNodeId)
1044 if (
1045 workerNodeId !== workerNodeKey &&
1046 workerInfo.ready &&
1047 workerNode.usage.tasks.queued === 0
1048 ) {
1049 targetWorkerNodeKey = workerNodeId
1050 break
1051 }
1052 if (
1053 workerNodeId !== workerNodeKey &&
1054 workerInfo.ready &&
1055 workerNode.usage.tasks.queued < minQueuedTasks
1056 ) {
1057 minQueuedTasks = workerNode.usage.tasks.queued
1058 targetWorkerNodeKey = workerNodeId
1059 }
1060 }
1061 this.enqueueTask(
1062 targetWorkerNodeKey,
1063 this.dequeueTask(workerNodeKey) as Task<Data>
1064 )
1065 }
1066 }
1067
be0676b3 1068 /**
ff733df7 1069 * This function is the listener registered for each worker message.
be0676b3 1070 *
bdacc2d2 1071 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1072 */
1073 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1074 return message => {
21f710aa 1075 this.checkMessageWorkerId(message)
d2c73f82 1076 if (message.ready != null) {
10e2aa7e
JB
1077 // Worker ready response received
1078 this.handleWorkerReadyResponse(message)
f59e1027 1079 } else if (message.id != null) {
a3445496 1080 // Task execution response received
6b272951
JB
1081 this.handleTaskExecutionResponse(message)
1082 }
1083 }
1084 }
1085
10e2aa7e 1086 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
21f710aa
JB
1087 const worker = this.getWorkerById(message.workerId)
1088 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1089 message.ready as boolean
2431bdb4
JB
1090 if (this.emitter != null && this.ready) {
1091 this.emitter.emit(PoolEvents.ready, this.info)
1092 }
6b272951
JB
1093 }
1094
1095 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1096 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1097 if (promiseResponse != null) {
1098 if (message.taskError != null) {
1099 if (this.emitter != null) {
1100 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1101 }
6b272951
JB
1102 promiseResponse.reject(message.taskError.message)
1103 } else {
1104 promiseResponse.resolve(message.data as Response)
1105 }
1106 this.afterTaskExecutionHook(promiseResponse.worker, message)
1107 this.promiseResponseMap.delete(message.id as string)
1108 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1109 if (
1110 this.opts.enableTasksQueue === true &&
1111 this.tasksQueueSize(workerNodeKey) > 0
1112 ) {
1113 this.executeTask(
1114 workerNodeKey,
1115 this.dequeueTask(workerNodeKey) as Task<Data>
1116 )
be0676b3 1117 }
6b272951 1118 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1119 }
be0676b3 1120 }
7c0ba920 1121
ff733df7 1122 private checkAndEmitEvents (): void {
1f68cede 1123 if (this.emitter != null) {
ff733df7 1124 if (this.busy) {
2845f2a5 1125 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1126 }
6b27d407 1127 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1128 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1129 }
164d950a
JB
1130 }
1131 }
1132
8a1260a3
JB
1133 /**
1134 * Gets the worker information.
1135 *
1136 * @param workerNodeKey - The worker node key.
1137 */
1138 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1139 return this.workerNodes[workerNodeKey].info
1140 }
1141
a05c10de 1142 /**
b0a4db63 1143 * Adds the given worker in the pool worker nodes.
ea7a90d3 1144 *
38e795c1 1145 * @param worker - The worker.
f06e48d8 1146 * @returns The worker nodes length.
ea7a90d3 1147 */
b0a4db63 1148 private addWorkerNode (worker: Worker): number {
cc3ab78b 1149 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1150 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1151 if (this.starting) {
1152 workerNode.info.ready = true
1153 }
cc3ab78b 1154 return this.workerNodes.push(workerNode)
ea7a90d3 1155 }
c923ce56 1156
51fe3d3c 1157 /**
f06e48d8 1158 * Removes the given worker from the pool worker nodes.
51fe3d3c 1159 *
f06e48d8 1160 * @param worker - The worker.
51fe3d3c 1161 */
416fd65c 1162 private removeWorkerNode (worker: Worker): void {
f06e48d8 1163 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1164 if (workerNodeKey !== -1) {
1165 this.workerNodes.splice(workerNodeKey, 1)
1166 this.workerChoiceStrategyContext.remove(workerNodeKey)
1167 }
51fe3d3c 1168 }
adc3c320 1169
b0a4db63
JB
1170 /**
1171 * Executes the given task on the given worker.
1172 *
1173 * @param worker - The worker.
1174 * @param task - The task to execute.
1175 */
2e81254d 1176 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1177 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1178 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1179 }
1180
f9f00b5f 1181 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1182 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1183 }
1184
416fd65c 1185 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1186 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1187 }
1188
416fd65c 1189 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1190 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1191 }
1192
416fd65c 1193 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1194 while (this.tasksQueueSize(workerNodeKey) > 0) {
1195 this.executeTask(
1196 workerNodeKey,
1197 this.dequeueTask(workerNodeKey) as Task<Data>
1198 )
ff733df7 1199 }
4b628b48 1200 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1201 }
1202
ef41a6e6
JB
1203 private flushTasksQueues (): void {
1204 for (const [workerNodeKey] of this.workerNodes.entries()) {
1205 this.flushTasksQueue(workerNodeKey)
1206 }
1207 }
b6b32453
JB
1208
1209 private setWorkerStatistics (worker: Worker): void {
1210 this.sendToWorker(worker, {
1211 statistics: {
87de9ff5
JB
1212 runTime:
1213 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1214 .runTime.aggregate,
87de9ff5 1215 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1216 .elu.aggregate
21f710aa
JB
1217 },
1218 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1219 })
1220 }
c97c7edb 1221}