docs: cleanup dynamic worker activity check comments
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf
JB
15 median,
16 round
bbeadd16 17} from '../utils'
59317253 18import { KillBehaviors } from '../worker/worker-options'
c4855468 19import {
65d7a1c9 20 type IPool,
7c5a1080 21 PoolEmitter,
c4855468 22 PoolEvents,
6b27d407 23 type PoolInfo,
c4855468 24 type PoolOptions,
6b27d407
JB
25 type PoolType,
26 PoolTypes,
4b628b48 27 type TasksQueueOptions
c4855468 28} from './pool'
e102732c
JB
29import type {
30 IWorker,
4b628b48 31 IWorkerNode,
e102732c 32 MessageHandler,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
f0d7f803 38 Measurements,
a35560ba 39 WorkerChoiceStrategies,
a20f0ba5
JB
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
bdaf31cd
JB
42} from './selection-strategies/selection-strategies-types'
43import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 44import { version } from './version'
4b628b48 45import { WorkerNode } from './worker-node'
23ccf9d7 46
729c563d 47/**
ea7a90d3 48 * Base class that implements some shared logic for all poolifier pools.
729c563d 49 *
38e795c1 50 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 53 */
c97c7edb 54export abstract class AbstractPool<
f06e48d8 55 Worker extends IWorker,
d3c8a1a8
S
56 Data = unknown,
57 Response = unknown
c4855468 58> implements IPool<Worker, Data, Response> {
afc003b2 59 /** @inheritDoc */
4b628b48 60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 61
afc003b2 62 /** @inheritDoc */
7c0ba920
JB
63 public readonly emitter?: PoolEmitter
64
be0676b3 65 /**
a3445496 66 * The execution response promise map.
be0676b3 67 *
2740a743 68 * - `key`: The message id of each submitted task.
a3445496 69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 70 *
a3445496 71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 72 */
c923ce56
JB
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
afe0d5bf
JB
87 /**
88 * The start timestamp of the pool.
89 */
90 private readonly startTimestamp
91
729c563d
S
92 /**
93 * Constructs a new poolifier pool.
94 *
38e795c1 95 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 96 * @param filePath - Path to the worker file.
38e795c1 97 * @param opts - Options for the pool.
729c563d 98 */
c97c7edb 99 public constructor (
b4213b7f
JB
100 protected readonly numberOfWorkers: number,
101 protected readonly filePath: string,
102 protected readonly opts: PoolOptions<Worker>
c97c7edb 103 ) {
78cea37e 104 if (!this.isMain()) {
c97c7edb
S
105 throw new Error('Cannot start a pool from a worker!')
106 }
8d3782fa 107 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 108 this.checkFilePath(this.filePath)
7c0ba920 109 this.checkPoolOptions(this.opts)
1086026a 110
7254e419
JB
111 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
112 this.executeTask = this.executeTask.bind(this)
113 this.enqueueTask = this.enqueueTask.bind(this)
114 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 115
6bd72cd0 116 if (this.opts.enableEvents === true) {
7c0ba920
JB
117 this.emitter = new PoolEmitter()
118 }
d59df138
JB
119 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
120 Worker,
121 Data,
122 Response
da309861
JB
123 >(
124 this,
125 this.opts.workerChoiceStrategy,
126 this.opts.workerChoiceStrategyOptions
127 )
b6b32453
JB
128
129 this.setupHook()
130
b97d82d8
JB
131 while (
132 this.workerNodes.reduce(
133 (accumulator, workerNode) =>
134 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
135 0
136 ) < this.numberOfWorkers
137 ) {
b6b32453
JB
138 this.createAndSetupWorker()
139 }
afe0d5bf
JB
140
141 this.startTimestamp = performance.now()
c97c7edb
S
142 }
143
a35560ba 144 private checkFilePath (filePath: string): void {
ffcbbad8
JB
145 if (
146 filePath == null ||
3d6dd312 147 typeof filePath !== 'string' ||
ffcbbad8
JB
148 (typeof filePath === 'string' && filePath.trim().length === 0)
149 ) {
c510fea7
APA
150 throw new Error('Please specify a file with a worker implementation')
151 }
3d6dd312
JB
152 if (!existsSync(filePath)) {
153 throw new Error(`Cannot find the worker file '${filePath}'`)
154 }
c510fea7
APA
155 }
156
8d3782fa
JB
157 private checkNumberOfWorkers (numberOfWorkers: number): void {
158 if (numberOfWorkers == null) {
159 throw new Error(
160 'Cannot instantiate a pool without specifying the number of workers'
161 )
78cea37e 162 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 163 throw new TypeError(
0d80593b 164 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
165 )
166 } else if (numberOfWorkers < 0) {
473c717a 167 throw new RangeError(
8d3782fa
JB
168 'Cannot instantiate a pool with a negative number of workers'
169 )
6b27d407 170 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
171 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
172 }
173 }
174
175 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
176 if (this.type === PoolTypes.dynamic) {
177 if (min > max) {
178 throw new RangeError(
179 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
180 )
b97d82d8 181 } else if (max === 0) {
079de991 182 throw new RangeError(
b97d82d8 183 'Cannot instantiate a dynamic pool with a pool size equal to zero'
079de991
JB
184 )
185 } else if (min === max) {
186 throw new RangeError(
187 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
188 )
189 }
8d3782fa
JB
190 }
191 }
192
7c0ba920 193 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
194 if (isPlainObject(opts)) {
195 this.opts.workerChoiceStrategy =
196 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
197 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
198 this.opts.workerChoiceStrategyOptions =
199 opts.workerChoiceStrategyOptions ??
200 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
201 this.checkValidWorkerChoiceStrategyOptions(
202 this.opts.workerChoiceStrategyOptions
203 )
1f68cede 204 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
205 this.opts.enableEvents = opts.enableEvents ?? true
206 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
207 if (this.opts.enableTasksQueue) {
208 this.checkValidTasksQueueOptions(
209 opts.tasksQueueOptions as TasksQueueOptions
210 )
211 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
212 opts.tasksQueueOptions as TasksQueueOptions
213 )
214 }
215 } else {
216 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 217 }
aee46736
JB
218 }
219
220 private checkValidWorkerChoiceStrategy (
221 workerChoiceStrategy: WorkerChoiceStrategy
222 ): void {
223 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 224 throw new Error(
aee46736 225 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
226 )
227 }
7c0ba920
JB
228 }
229
0d80593b
JB
230 private checkValidWorkerChoiceStrategyOptions (
231 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
232 ): void {
233 if (!isPlainObject(workerChoiceStrategyOptions)) {
234 throw new TypeError(
235 'Invalid worker choice strategy options: must be a plain object'
236 )
237 }
49be33fe
JB
238 if (
239 workerChoiceStrategyOptions.weights != null &&
6b27d407 240 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
241 ) {
242 throw new Error(
243 'Invalid worker choice strategy options: must have a weight for each worker node'
244 )
245 }
f0d7f803
JB
246 if (
247 workerChoiceStrategyOptions.measurement != null &&
248 !Object.values(Measurements).includes(
249 workerChoiceStrategyOptions.measurement
250 )
251 ) {
252 throw new Error(
253 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
254 )
255 }
0d80593b
JB
256 }
257
a20f0ba5
JB
258 private checkValidTasksQueueOptions (
259 tasksQueueOptions: TasksQueueOptions
260 ): void {
0d80593b
JB
261 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
262 throw new TypeError('Invalid tasks queue options: must be a plain object')
263 }
f0d7f803
JB
264 if (
265 tasksQueueOptions?.concurrency != null &&
266 !Number.isSafeInteger(tasksQueueOptions.concurrency)
267 ) {
268 throw new TypeError(
269 'Invalid worker tasks concurrency: must be an integer'
270 )
271 }
272 if (
273 tasksQueueOptions?.concurrency != null &&
274 tasksQueueOptions.concurrency <= 0
275 ) {
a20f0ba5 276 throw new Error(
f0d7f803 277 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
278 )
279 }
280 }
281
08f3f44c 282 /** @inheritDoc */
6b27d407
JB
283 public get info (): PoolInfo {
284 return {
23ccf9d7 285 version,
6b27d407 286 type: this.type,
184855e6 287 worker: this.worker,
2431bdb4
JB
288 ready: this.ready,
289 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
290 minSize: this.minSize,
291 maxSize: this.maxSize,
c05f0d50
JB
292 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
293 .runTime.aggregate &&
1305e9a8
JB
294 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
295 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
296 workerNodes: this.workerNodes.length,
297 idleWorkerNodes: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
f59e1027 299 workerNode.usage.tasks.executing === 0
a4e07f72
JB
300 ? accumulator + 1
301 : accumulator,
6b27d407
JB
302 0
303 ),
304 busyWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
f59e1027 306 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
307 0
308 ),
a4e07f72 309 executedTasks: this.workerNodes.reduce(
6b27d407 310 (accumulator, workerNode) =>
f59e1027 311 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
312 0
313 ),
314 executingTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
f59e1027 316 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
317 0
318 ),
319 queuedTasks: this.workerNodes.reduce(
df593701 320 (accumulator, workerNode) =>
f59e1027 321 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
322 0
323 ),
324 maxQueuedTasks: this.workerNodes.reduce(
325 (accumulator, workerNode) =>
b25a42cd 326 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 327 0
a4e07f72
JB
328 ),
329 failedTasks: this.workerNodes.reduce(
330 (accumulator, workerNode) =>
f59e1027 331 accumulator + workerNode.usage.tasks.failed,
a4e07f72 332 0
1dcf8b7b
JB
333 ),
334 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
335 .runTime.aggregate && {
336 runTime: {
98e72cda
JB
337 minimum: round(
338 Math.min(
339 ...this.workerNodes.map(
340 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
341 )
1dcf8b7b
JB
342 )
343 ),
98e72cda
JB
344 maximum: round(
345 Math.max(
346 ...this.workerNodes.map(
347 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
348 )
1dcf8b7b 349 )
98e72cda
JB
350 ),
351 average: round(
352 this.workerNodes.reduce(
353 (accumulator, workerNode) =>
354 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
355 0
356 ) /
357 this.workerNodes.reduce(
358 (accumulator, workerNode) =>
359 accumulator + (workerNode.usage.tasks?.executed ?? 0),
360 0
361 )
362 ),
363 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
364 .runTime.median && {
365 median: round(
366 median(
367 this.workerNodes.map(
368 workerNode => workerNode.usage.runTime?.median ?? 0
369 )
370 )
371 )
372 })
1dcf8b7b
JB
373 }
374 }),
375 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
376 .waitTime.aggregate && {
377 waitTime: {
98e72cda
JB
378 minimum: round(
379 Math.min(
380 ...this.workerNodes.map(
381 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
382 )
1dcf8b7b
JB
383 )
384 ),
98e72cda
JB
385 maximum: round(
386 Math.max(
387 ...this.workerNodes.map(
388 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
389 )
1dcf8b7b 390 )
98e72cda
JB
391 ),
392 average: round(
393 this.workerNodes.reduce(
394 (accumulator, workerNode) =>
395 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
396 0
397 ) /
398 this.workerNodes.reduce(
399 (accumulator, workerNode) =>
400 accumulator + (workerNode.usage.tasks?.executed ?? 0),
401 0
402 )
403 ),
404 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
405 .waitTime.median && {
406 median: round(
407 median(
408 this.workerNodes.map(
409 workerNode => workerNode.usage.waitTime?.median ?? 0
410 )
411 )
412 )
413 })
1dcf8b7b
JB
414 }
415 })
6b27d407
JB
416 }
417 }
08f3f44c 418
2431bdb4 419 private get starting (): boolean {
b97d82d8
JB
420 return (
421 this.workerNodes.reduce(
422 (accumulator, workerNode) =>
423 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
424 0
425 ) < this.minSize
426 )
2431bdb4
JB
427 }
428
429 private get ready (): boolean {
430 return (
b97d82d8
JB
431 this.workerNodes.reduce(
432 (accumulator, workerNode) =>
433 !workerNode.info.dynamic && workerNode.info.ready
434 ? accumulator + 1
435 : accumulator,
436 0
437 ) >= this.minSize
2431bdb4
JB
438 )
439 }
440
afe0d5bf
JB
441 /**
442 * Gets the approximate pool utilization.
443 *
444 * @returns The pool utilization.
445 */
446 private get utilization (): number {
8e5ca040 447 const poolTimeCapacity =
fe7d90db 448 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
449 const totalTasksRunTime = this.workerNodes.reduce(
450 (accumulator, workerNode) =>
71514351 451 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
452 0
453 )
454 const totalTasksWaitTime = this.workerNodes.reduce(
455 (accumulator, workerNode) =>
71514351 456 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
457 0
458 )
8e5ca040 459 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
460 }
461
8881ae32
JB
462 /**
463 * Pool type.
464 *
465 * If it is `'dynamic'`, it provides the `max` property.
466 */
467 protected abstract get type (): PoolType
468
184855e6
JB
469 /**
470 * Gets the worker type.
471 */
472 protected abstract get worker (): WorkerType
473
c2ade475 474 /**
6b27d407 475 * Pool minimum size.
c2ade475 476 */
6b27d407 477 protected abstract get minSize (): number
ff733df7
JB
478
479 /**
6b27d407 480 * Pool maximum size.
ff733df7 481 */
6b27d407 482 protected abstract get maxSize (): number
a35560ba 483
f59e1027
JB
484 /**
485 * Get the worker given its id.
486 *
487 * @param workerId - The worker id.
488 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
489 */
490 private getWorkerById (workerId: number): Worker | undefined {
491 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
492 ?.worker
493 }
494
6b813701
JB
495 /**
496 * Checks if the worker id sent in the received message from a worker is valid.
497 *
498 * @param message - The received message.
499 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
500 */
21f710aa
JB
501 private checkMessageWorkerId (message: MessageValue<Response>): void {
502 if (
503 message.workerId != null &&
504 this.getWorkerById(message.workerId) == null
505 ) {
506 throw new Error(
507 `Worker message received from unknown worker '${message.workerId}'`
508 )
509 }
510 }
511
ffcbbad8 512 /**
f06e48d8 513 * Gets the given worker its worker node key.
ffcbbad8
JB
514 *
515 * @param worker - The worker.
f59e1027 516 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 517 */
f06e48d8
JB
518 private getWorkerNodeKey (worker: Worker): number {
519 return this.workerNodes.findIndex(
520 workerNode => workerNode.worker === worker
521 )
bf9549ae
JB
522 }
523
afc003b2 524 /** @inheritDoc */
a35560ba 525 public setWorkerChoiceStrategy (
59219cbb
JB
526 workerChoiceStrategy: WorkerChoiceStrategy,
527 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 528 ): void {
aee46736 529 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 530 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
531 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
532 this.opts.workerChoiceStrategy
533 )
534 if (workerChoiceStrategyOptions != null) {
535 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
536 }
8a1260a3 537 for (const workerNode of this.workerNodes) {
4b628b48 538 workerNode.resetUsage()
b6b32453 539 this.setWorkerStatistics(workerNode.worker)
59219cbb 540 }
a20f0ba5
JB
541 }
542
543 /** @inheritDoc */
544 public setWorkerChoiceStrategyOptions (
545 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
546 ): void {
0d80593b 547 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
548 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
549 this.workerChoiceStrategyContext.setOptions(
550 this.opts.workerChoiceStrategyOptions
a35560ba
S
551 )
552 }
553
a20f0ba5 554 /** @inheritDoc */
8f52842f
JB
555 public enableTasksQueue (
556 enable: boolean,
557 tasksQueueOptions?: TasksQueueOptions
558 ): void {
a20f0ba5 559 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 560 this.flushTasksQueues()
a20f0ba5
JB
561 }
562 this.opts.enableTasksQueue = enable
8f52842f 563 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
564 }
565
566 /** @inheritDoc */
8f52842f 567 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 568 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
569 this.checkValidTasksQueueOptions(tasksQueueOptions)
570 this.opts.tasksQueueOptions =
571 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 572 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
573 delete this.opts.tasksQueueOptions
574 }
575 }
576
577 private buildTasksQueueOptions (
578 tasksQueueOptions: TasksQueueOptions
579 ): TasksQueueOptions {
580 return {
581 concurrency: tasksQueueOptions?.concurrency ?? 1
582 }
583 }
584
c319c66b
JB
585 /**
586 * Whether the pool is full or not.
587 *
588 * The pool filling boolean status.
589 */
dea903a8
JB
590 protected get full (): boolean {
591 return this.workerNodes.length >= this.maxSize
592 }
c2ade475 593
c319c66b
JB
594 /**
595 * Whether the pool is busy or not.
596 *
597 * The pool busyness boolean status.
598 */
599 protected abstract get busy (): boolean
7c0ba920 600
6c6afb84
JB
601 /**
602 * Whether worker nodes are executing at least one task.
603 *
604 * @returns Worker nodes busyness boolean status.
605 */
c2ade475 606 protected internalBusy (): boolean {
e0ae6100
JB
607 return (
608 this.workerNodes.findIndex(workerNode => {
f59e1027 609 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
610 }) === -1
611 )
cb70b19d
JB
612 }
613
afc003b2 614 /** @inheritDoc */
a86b6df1 615 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 616 const timestamp = performance.now()
20dcad1a 617 const workerNodeKey = this.chooseWorkerNode()
adc3c320 618 const submittedTask: Task<Data> = {
ff128cc9 619 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
620 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
621 data: data ?? ({} as Data),
b6b32453 622 timestamp,
21f710aa 623 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 624 id: randomUUID()
adc3c320 625 }
2e81254d 626 const res = new Promise<Response>((resolve, reject) => {
02706357 627 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
628 resolve,
629 reject,
20dcad1a 630 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
631 })
632 })
ff733df7
JB
633 if (
634 this.opts.enableTasksQueue === true &&
7171d33f 635 (this.busy ||
f59e1027 636 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 637 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 638 .concurrency as number))
ff733df7 639 ) {
26a929d7
JB
640 this.enqueueTask(workerNodeKey, submittedTask)
641 } else {
2e81254d 642 this.executeTask(workerNodeKey, submittedTask)
adc3c320 643 }
ff733df7 644 this.checkAndEmitEvents()
78cea37e 645 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
646 return res
647 }
c97c7edb 648
afc003b2 649 /** @inheritDoc */
c97c7edb 650 public async destroy (): Promise<void> {
1fbcaa7c 651 await Promise.all(
875a7c37
JB
652 this.workerNodes.map(async (workerNode, workerNodeKey) => {
653 this.flushTasksQueue(workerNodeKey)
47aacbaa 654 // FIXME: wait for tasks to be finished
920278a2
JB
655 const workerExitPromise = new Promise<void>(resolve => {
656 workerNode.worker.on('exit', () => {
657 resolve()
658 })
659 })
f06e48d8 660 await this.destroyWorker(workerNode.worker)
920278a2 661 await workerExitPromise
1fbcaa7c
JB
662 })
663 )
c97c7edb
S
664 }
665
4a6952ff 666 /**
6c6afb84 667 * Terminates the given worker.
4a6952ff 668 *
f06e48d8 669 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
670 */
671 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 672
729c563d 673 /**
6677a3d3
JB
674 * Setup hook to execute code before worker nodes are created in the abstract constructor.
675 * Can be overridden.
afc003b2
JB
676 *
677 * @virtual
729c563d 678 */
280c2a77 679 protected setupHook (): void {
d99ba5a8 680 // Intentionally empty
280c2a77 681 }
c97c7edb 682
729c563d 683 /**
280c2a77
S
684 * Should return whether the worker is the main worker or not.
685 */
686 protected abstract isMain (): boolean
687
688 /**
2e81254d 689 * Hook executed before the worker task execution.
bf9549ae 690 * Can be overridden.
729c563d 691 *
f06e48d8 692 * @param workerNodeKey - The worker node key.
1c6fe997 693 * @param task - The task to execute.
729c563d 694 */
1c6fe997
JB
695 protected beforeTaskExecutionHook (
696 workerNodeKey: number,
697 task: Task<Data>
698 ): void {
f59e1027 699 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
700 ++workerUsage.tasks.executing
701 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 702 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
703 task.name as string
704 ) as WorkerUsage
eb8afc8a
JB
705 ++taskWorkerUsage.tasks.executing
706 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
707 }
708
c01733f1 709 /**
2e81254d 710 * Hook executed after the worker task execution.
bf9549ae 711 * Can be overridden.
c01733f1 712 *
c923ce56 713 * @param worker - The worker.
38e795c1 714 * @param message - The received message.
c01733f1 715 */
2e81254d 716 protected afterTaskExecutionHook (
c923ce56 717 worker: Worker,
2740a743 718 message: MessageValue<Response>
bf9549ae 719 ): void {
ff128cc9
JB
720 const workerNodeKey = this.getWorkerNodeKey(worker)
721 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
722 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
723 this.updateRunTimeWorkerUsage(workerUsage, message)
724 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 725 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 726 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 727 ) as WorkerUsage
eb8afc8a
JB
728 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
729 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
730 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
731 }
732
733 private updateTaskStatisticsWorkerUsage (
734 workerUsage: WorkerUsage,
735 message: MessageValue<Response>
736 ): void {
a4e07f72
JB
737 const workerTaskStatistics = workerUsage.tasks
738 --workerTaskStatistics.executing
98e72cda
JB
739 if (message.taskError == null) {
740 ++workerTaskStatistics.executed
741 } else {
a4e07f72 742 ++workerTaskStatistics.failed
2740a743 743 }
f8eb0a2a
JB
744 }
745
a4e07f72
JB
746 private updateRunTimeWorkerUsage (
747 workerUsage: WorkerUsage,
f8eb0a2a
JB
748 message: MessageValue<Response>
749 ): void {
87de9ff5
JB
750 if (
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 752 .aggregate
87de9ff5 753 ) {
f7510105 754 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
755 workerUsage.runTime.aggregate =
756 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
757 workerUsage.runTime.minimum = Math.min(
758 taskRunTime,
71514351 759 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
760 )
761 workerUsage.runTime.maximum = Math.max(
762 taskRunTime,
71514351 763 workerUsage.runTime?.maximum ?? -Infinity
f7510105 764 )
c6bd2650 765 if (
932fc8be
JB
766 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
767 .average &&
a4e07f72 768 workerUsage.tasks.executed !== 0
c6bd2650 769 ) {
a4e07f72 770 workerUsage.runTime.average =
98e72cda 771 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 772 }
3fa4cdd2 773 if (
932fc8be
JB
774 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
775 .median &&
d715b7bc 776 message.taskPerformance?.runTime != null
3fa4cdd2 777 ) {
a4e07f72
JB
778 workerUsage.runTime.history.push(message.taskPerformance.runTime)
779 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 780 }
3032893a 781 }
f8eb0a2a
JB
782 }
783
a4e07f72
JB
784 private updateWaitTimeWorkerUsage (
785 workerUsage: WorkerUsage,
1c6fe997 786 task: Task<Data>
f8eb0a2a 787 ): void {
1c6fe997
JB
788 const timestamp = performance.now()
789 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
790 if (
791 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 792 .aggregate
87de9ff5 793 ) {
71514351
JB
794 workerUsage.waitTime.aggregate =
795 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
796 workerUsage.waitTime.minimum = Math.min(
797 taskWaitTime,
71514351 798 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
799 )
800 workerUsage.waitTime.maximum = Math.max(
801 taskWaitTime,
71514351 802 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 803 )
09a6305f 804 if (
87de9ff5 805 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 806 .waitTime.average &&
a4e07f72 807 workerUsage.tasks.executed !== 0
09a6305f 808 ) {
a4e07f72 809 workerUsage.waitTime.average =
98e72cda 810 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
811 }
812 if (
87de9ff5 813 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 814 .waitTime.median
09a6305f 815 ) {
1c6fe997 816 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 817 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 818 }
0567595a 819 }
c01733f1 820 }
821
a4e07f72 822 private updateEluWorkerUsage (
5df69fab 823 workerUsage: WorkerUsage,
62c15a68
JB
824 message: MessageValue<Response>
825 ): void {
5df69fab
JB
826 if (
827 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
828 .aggregate
829 ) {
f7510105 830 if (message.taskPerformance?.elu != null) {
71514351
JB
831 workerUsage.elu.idle.aggregate =
832 (workerUsage.elu.idle?.aggregate ?? 0) +
833 message.taskPerformance.elu.idle
834 workerUsage.elu.active.aggregate =
835 (workerUsage.elu.active?.aggregate ?? 0) +
836 message.taskPerformance.elu.active
f7510105
JB
837 if (workerUsage.elu.utilization != null) {
838 workerUsage.elu.utilization =
839 (workerUsage.elu.utilization +
840 message.taskPerformance.elu.utilization) /
841 2
842 } else {
843 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
844 }
845 workerUsage.elu.idle.minimum = Math.min(
846 message.taskPerformance.elu.idle,
71514351 847 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
848 )
849 workerUsage.elu.idle.maximum = Math.max(
850 message.taskPerformance.elu.idle,
71514351 851 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
852 )
853 workerUsage.elu.active.minimum = Math.min(
854 message.taskPerformance.elu.active,
71514351 855 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
856 )
857 workerUsage.elu.active.maximum = Math.max(
858 message.taskPerformance.elu.active,
71514351 859 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 860 )
71514351
JB
861 if (
862 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
863 .average &&
864 workerUsage.tasks.executed !== 0
865 ) {
71514351 866 workerUsage.elu.idle.average =
98e72cda 867 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 868 workerUsage.elu.active.average =
98e72cda 869 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
870 }
871 if (
872 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 873 .median
71514351
JB
874 ) {
875 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
876 workerUsage.elu.active.history.push(
877 message.taskPerformance.elu.active
878 )
879 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
880 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
881 }
62c15a68
JB
882 }
883 }
884 }
885
280c2a77 886 /**
f06e48d8 887 * Chooses a worker node for the next task.
280c2a77 888 *
6c6afb84 889 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 890 *
20dcad1a 891 * @returns The worker node key
280c2a77 892 */
6c6afb84 893 private chooseWorkerNode (): number {
930dcf12 894 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
895 const worker = this.createAndSetupDynamicWorker()
896 if (
897 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
898 ) {
899 return this.getWorkerNodeKey(worker)
900 }
17393ac8 901 }
930dcf12
JB
902 return this.workerChoiceStrategyContext.execute()
903 }
904
6c6afb84
JB
905 /**
906 * Conditions for dynamic worker creation.
907 *
908 * @returns Whether to create a dynamic worker or not.
909 */
910 private shallCreateDynamicWorker (): boolean {
930dcf12 911 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
912 }
913
280c2a77 914 /**
675bb809 915 * Sends a message to the given worker.
280c2a77 916 *
38e795c1
JB
917 * @param worker - The worker which should receive the message.
918 * @param message - The message.
280c2a77
S
919 */
920 protected abstract sendToWorker (
921 worker: Worker,
922 message: MessageValue<Data>
923 ): void
924
729c563d 925 /**
41344292 926 * Creates a new worker.
6c6afb84
JB
927 *
928 * @returns Newly created worker.
729c563d 929 */
280c2a77 930 protected abstract createWorker (): Worker
c97c7edb 931
4a6952ff 932 /**
f06e48d8 933 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
934 *
935 * @returns New, completely set up worker.
936 */
937 protected createAndSetupWorker (): Worker {
bdacc2d2 938 const worker = this.createWorker()
280c2a77 939
35cf1c03 940 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 941 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 942 worker.on('error', error => {
9b106837
JB
943 const workerNodeKey = this.getWorkerNodeKey(worker)
944 const workerInfo = this.getWorkerInfo(workerNodeKey)
945 workerInfo.ready = false
1f68cede
JB
946 if (this.emitter != null) {
947 this.emitter.emit(PoolEvents.error, error)
948 }
2431bdb4 949 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 950 if (workerInfo.dynamic) {
8a1260a3
JB
951 this.createAndSetupDynamicWorker()
952 } else {
953 this.createAndSetupWorker()
954 }
5baee0d7 955 }
19dbc45b 956 if (this.opts.enableTasksQueue === true) {
9b106837 957 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 958 }
5baee0d7 959 })
a35560ba
S
960 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
961 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 962 worker.once('exit', () => {
f06e48d8 963 this.removeWorkerNode(worker)
a974afa6 964 })
280c2a77 965
b0a4db63 966 this.addWorkerNode(worker)
280c2a77
S
967
968 this.afterWorkerSetup(worker)
969
c97c7edb
S
970 return worker
971 }
be0676b3 972
930dcf12
JB
973 /**
974 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
975 *
976 * @returns New, completely set up dynamic worker.
977 */
978 protected createAndSetupDynamicWorker (): Worker {
979 const worker = this.createAndSetupWorker()
980 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 981 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
982 if (
983 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
984 (message.kill != null &&
985 ((this.opts.enableTasksQueue === false &&
f59e1027 986 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 987 (this.opts.enableTasksQueue === true &&
f59e1027 988 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 989 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
990 ) {
991 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
992 void (this.destroyWorker(worker) as Promise<void>)
993 }
994 })
21f710aa 995 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
b0a4db63 996 workerInfo.dynamic = true
b97d82d8
JB
997 if (this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker) {
998 workerInfo.ready = true
999 }
21f710aa 1000 this.sendToWorker(worker, {
b0a4db63 1001 checkActive: true,
21f710aa
JB
1002 workerId: workerInfo.id as number
1003 })
930dcf12
JB
1004 return worker
1005 }
1006
a2ed5053
JB
1007 /**
1008 * Registers a listener callback on the given worker.
1009 *
1010 * @param worker - The worker which should register a listener.
1011 * @param listener - The message listener callback.
1012 */
1013 private registerWorkerMessageListener<Message extends Data | Response>(
1014 worker: Worker,
1015 listener: (message: MessageValue<Message>) => void
1016 ): void {
1017 worker.on('message', listener as MessageHandler<Worker>)
1018 }
1019
1020 /**
1021 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1022 * Can be overridden.
1023 *
1024 * @param worker - The newly created worker.
1025 */
1026 protected afterWorkerSetup (worker: Worker): void {
1027 // Listen to worker messages.
1028 this.registerWorkerMessageListener(worker, this.workerListener())
1029 // Send startup message to worker.
d2c73f82
JB
1030 this.sendWorkerStartupMessage(worker)
1031 // Setup worker task statistics computation.
1032 this.setWorkerStatistics(worker)
1033 }
1034
1035 private sendWorkerStartupMessage (worker: Worker): void {
a2ed5053
JB
1036 this.sendToWorker(worker, {
1037 ready: false,
1038 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1039 })
a2ed5053
JB
1040 }
1041
1042 private redistributeQueuedTasks (workerNodeKey: number): void {
1043 while (this.tasksQueueSize(workerNodeKey) > 0) {
1044 let targetWorkerNodeKey: number = workerNodeKey
1045 let minQueuedTasks = Infinity
1046 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1047 const workerInfo = this.getWorkerInfo(workerNodeId)
1048 if (
1049 workerNodeId !== workerNodeKey &&
1050 workerInfo.ready &&
1051 workerNode.usage.tasks.queued === 0
1052 ) {
1053 targetWorkerNodeKey = workerNodeId
1054 break
1055 }
1056 if (
1057 workerNodeId !== workerNodeKey &&
1058 workerInfo.ready &&
1059 workerNode.usage.tasks.queued < minQueuedTasks
1060 ) {
1061 minQueuedTasks = workerNode.usage.tasks.queued
1062 targetWorkerNodeKey = workerNodeId
1063 }
1064 }
1065 this.enqueueTask(
1066 targetWorkerNodeKey,
1067 this.dequeueTask(workerNodeKey) as Task<Data>
1068 )
1069 }
1070 }
1071
be0676b3 1072 /**
ff733df7 1073 * This function is the listener registered for each worker message.
be0676b3 1074 *
bdacc2d2 1075 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1076 */
1077 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1078 return message => {
21f710aa 1079 this.checkMessageWorkerId(message)
d2c73f82 1080 if (message.ready != null) {
7c8381eb
JB
1081 // Worker ready message received
1082 this.handleWorkerReadyMessage(message)
f59e1027 1083 } else if (message.id != null) {
a3445496 1084 // Task execution response received
6b272951
JB
1085 this.handleTaskExecutionResponse(message)
1086 }
1087 }
1088 }
1089
7c8381eb 1090 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1091 const worker = this.getWorkerById(message.workerId)
1092 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1093 message.ready as boolean
2431bdb4
JB
1094 if (this.emitter != null && this.ready) {
1095 this.emitter.emit(PoolEvents.ready, this.info)
1096 }
6b272951
JB
1097 }
1098
1099 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1100 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1101 if (promiseResponse != null) {
1102 if (message.taskError != null) {
1103 if (this.emitter != null) {
1104 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1105 }
6b272951
JB
1106 promiseResponse.reject(message.taskError.message)
1107 } else {
1108 promiseResponse.resolve(message.data as Response)
1109 }
1110 this.afterTaskExecutionHook(promiseResponse.worker, message)
1111 this.promiseResponseMap.delete(message.id as string)
1112 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1113 if (
1114 this.opts.enableTasksQueue === true &&
1115 this.tasksQueueSize(workerNodeKey) > 0
1116 ) {
1117 this.executeTask(
1118 workerNodeKey,
1119 this.dequeueTask(workerNodeKey) as Task<Data>
1120 )
be0676b3 1121 }
6b272951 1122 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1123 }
be0676b3 1124 }
7c0ba920 1125
ff733df7 1126 private checkAndEmitEvents (): void {
1f68cede 1127 if (this.emitter != null) {
ff733df7 1128 if (this.busy) {
2845f2a5 1129 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1130 }
6b27d407 1131 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1132 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1133 }
164d950a
JB
1134 }
1135 }
1136
8a1260a3
JB
1137 /**
1138 * Gets the worker information.
1139 *
1140 * @param workerNodeKey - The worker node key.
1141 */
1142 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1143 return this.workerNodes[workerNodeKey].info
1144 }
1145
a05c10de 1146 /**
b0a4db63 1147 * Adds the given worker in the pool worker nodes.
ea7a90d3 1148 *
38e795c1 1149 * @param worker - The worker.
f06e48d8 1150 * @returns The worker nodes length.
ea7a90d3 1151 */
b0a4db63 1152 private addWorkerNode (worker: Worker): number {
cc3ab78b 1153 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
b97d82d8 1154 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1155 if (this.starting) {
1156 workerNode.info.ready = true
1157 }
cc3ab78b 1158 return this.workerNodes.push(workerNode)
ea7a90d3 1159 }
c923ce56 1160
51fe3d3c 1161 /**
f06e48d8 1162 * Removes the given worker from the pool worker nodes.
51fe3d3c 1163 *
f06e48d8 1164 * @param worker - The worker.
51fe3d3c 1165 */
416fd65c 1166 private removeWorkerNode (worker: Worker): void {
f06e48d8 1167 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1168 if (workerNodeKey !== -1) {
1169 this.workerNodes.splice(workerNodeKey, 1)
1170 this.workerChoiceStrategyContext.remove(workerNodeKey)
1171 }
51fe3d3c 1172 }
adc3c320 1173
b0a4db63
JB
1174 /**
1175 * Executes the given task on the given worker.
1176 *
1177 * @param worker - The worker.
1178 * @param task - The task to execute.
1179 */
2e81254d 1180 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1181 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1182 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1183 }
1184
f9f00b5f 1185 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1186 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1187 }
1188
416fd65c 1189 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1190 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1191 }
1192
416fd65c 1193 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1194 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1195 }
1196
416fd65c 1197 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1198 while (this.tasksQueueSize(workerNodeKey) > 0) {
1199 this.executeTask(
1200 workerNodeKey,
1201 this.dequeueTask(workerNodeKey) as Task<Data>
1202 )
ff733df7 1203 }
4b628b48 1204 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1205 }
1206
ef41a6e6
JB
1207 private flushTasksQueues (): void {
1208 for (const [workerNodeKey] of this.workerNodes.entries()) {
1209 this.flushTasksQueue(workerNodeKey)
1210 }
1211 }
b6b32453
JB
1212
1213 private setWorkerStatistics (worker: Worker): void {
1214 this.sendToWorker(worker, {
1215 statistics: {
87de9ff5
JB
1216 runTime:
1217 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1218 .runTime.aggregate,
87de9ff5 1219 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1220 .elu.aggregate
21f710aa
JB
1221 },
1222 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1223 })
1224 }
c97c7edb 1225}