docs: refine worker node code comments
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16 4import {
ff128cc9 5 DEFAULT_TASK_NAME,
bbeadd16
JB
6 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
7 EMPTY_FUNCTION,
59317253 8 isKillBehavior,
0d80593b 9 isPlainObject,
afe0d5bf
JB
10 median,
11 round
bbeadd16 12} from '../utils'
59317253 13import { KillBehaviors } from '../worker/worker-options'
c4855468 14import {
65d7a1c9 15 type IPool,
7c5a1080 16 PoolEmitter,
c4855468 17 PoolEvents,
6b27d407 18 type PoolInfo,
c4855468 19 type PoolOptions,
6b27d407
JB
20 type PoolType,
21 PoolTypes,
4b628b48 22 type TasksQueueOptions
c4855468 23} from './pool'
e102732c
JB
24import type {
25 IWorker,
4b628b48 26 IWorkerNode,
e102732c
JB
27 MessageHandler,
28 Task,
8a1260a3 29 WorkerInfo,
4b628b48 30 WorkerType,
e102732c
JB
31 WorkerUsage
32} from './worker'
a35560ba 33import {
f0d7f803 34 Measurements,
a35560ba 35 WorkerChoiceStrategies,
a20f0ba5
JB
36 type WorkerChoiceStrategy,
37 type WorkerChoiceStrategyOptions
bdaf31cd
JB
38} from './selection-strategies/selection-strategies-types'
39import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 40import { version } from './version'
4b628b48 41import { WorkerNode } from './worker-node'
23ccf9d7 42
729c563d 43/**
ea7a90d3 44 * Base class that implements some shared logic for all poolifier pools.
729c563d 45 *
38e795c1 46 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
47 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
48 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 49 */
c97c7edb 50export abstract class AbstractPool<
f06e48d8 51 Worker extends IWorker,
d3c8a1a8
S
52 Data = unknown,
53 Response = unknown
c4855468 54> implements IPool<Worker, Data, Response> {
afc003b2 55 /** @inheritDoc */
4b628b48 56 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 57
afc003b2 58 /** @inheritDoc */
7c0ba920
JB
59 public readonly emitter?: PoolEmitter
60
be0676b3 61 /**
a3445496 62 * The execution response promise map.
be0676b3 63 *
2740a743 64 * - `key`: The message id of each submitted task.
a3445496 65 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 66 *
a3445496 67 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 68 */
c923ce56
JB
69 protected promiseResponseMap: Map<
70 string,
71 PromiseResponseWrapper<Worker, Response>
72 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 73
a35560ba 74 /**
51fe3d3c 75 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
76 */
77 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
78 Worker,
79 Data,
80 Response
a35560ba
S
81 >
82
afe0d5bf
JB
83 /**
84 * The start timestamp of the pool.
85 */
86 private readonly startTimestamp
87
729c563d
S
88 /**
89 * Constructs a new poolifier pool.
90 *
38e795c1 91 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 92 * @param filePath - Path to the worker file.
38e795c1 93 * @param opts - Options for the pool.
729c563d 94 */
c97c7edb 95 public constructor (
b4213b7f
JB
96 protected readonly numberOfWorkers: number,
97 protected readonly filePath: string,
98 protected readonly opts: PoolOptions<Worker>
c97c7edb 99 ) {
78cea37e 100 if (!this.isMain()) {
c97c7edb
S
101 throw new Error('Cannot start a pool from a worker!')
102 }
8d3782fa 103 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 104 this.checkFilePath(this.filePath)
7c0ba920 105 this.checkPoolOptions(this.opts)
1086026a 106
7254e419
JB
107 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
108 this.executeTask = this.executeTask.bind(this)
109 this.enqueueTask = this.enqueueTask.bind(this)
110 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 111
6bd72cd0 112 if (this.opts.enableEvents === true) {
7c0ba920
JB
113 this.emitter = new PoolEmitter()
114 }
d59df138
JB
115 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
116 Worker,
117 Data,
118 Response
da309861
JB
119 >(
120 this,
121 this.opts.workerChoiceStrategy,
122 this.opts.workerChoiceStrategyOptions
123 )
b6b32453
JB
124
125 this.setupHook()
126
afe0d5bf 127 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
128 this.createAndSetupWorker()
129 }
afe0d5bf
JB
130
131 this.startTimestamp = performance.now()
c97c7edb
S
132 }
133
a35560ba 134 private checkFilePath (filePath: string): void {
ffcbbad8
JB
135 if (
136 filePath == null ||
137 (typeof filePath === 'string' && filePath.trim().length === 0)
138 ) {
c510fea7
APA
139 throw new Error('Please specify a file with a worker implementation')
140 }
141 }
142
8d3782fa
JB
143 private checkNumberOfWorkers (numberOfWorkers: number): void {
144 if (numberOfWorkers == null) {
145 throw new Error(
146 'Cannot instantiate a pool without specifying the number of workers'
147 )
78cea37e 148 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 149 throw new TypeError(
0d80593b 150 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
151 )
152 } else if (numberOfWorkers < 0) {
473c717a 153 throw new RangeError(
8d3782fa
JB
154 'Cannot instantiate a pool with a negative number of workers'
155 )
6b27d407 156 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
157 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
158 }
159 }
160
161 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
162 if (this.type === PoolTypes.dynamic) {
163 if (min > max) {
164 throw new RangeError(
165 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
166 )
167 } else if (min === 0 && max === 0) {
168 throw new RangeError(
169 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
170 )
171 } else if (min === max) {
172 throw new RangeError(
173 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
174 )
175 }
8d3782fa
JB
176 }
177 }
178
7c0ba920 179 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
180 if (isPlainObject(opts)) {
181 this.opts.workerChoiceStrategy =
182 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
183 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
184 this.opts.workerChoiceStrategyOptions =
185 opts.workerChoiceStrategyOptions ??
186 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
187 this.checkValidWorkerChoiceStrategyOptions(
188 this.opts.workerChoiceStrategyOptions
189 )
1f68cede 190 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
191 this.opts.enableEvents = opts.enableEvents ?? true
192 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
193 if (this.opts.enableTasksQueue) {
194 this.checkValidTasksQueueOptions(
195 opts.tasksQueueOptions as TasksQueueOptions
196 )
197 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
198 opts.tasksQueueOptions as TasksQueueOptions
199 )
200 }
201 } else {
202 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 203 }
aee46736
JB
204 }
205
206 private checkValidWorkerChoiceStrategy (
207 workerChoiceStrategy: WorkerChoiceStrategy
208 ): void {
209 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 210 throw new Error(
aee46736 211 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
212 )
213 }
7c0ba920
JB
214 }
215
0d80593b
JB
216 private checkValidWorkerChoiceStrategyOptions (
217 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
218 ): void {
219 if (!isPlainObject(workerChoiceStrategyOptions)) {
220 throw new TypeError(
221 'Invalid worker choice strategy options: must be a plain object'
222 )
223 }
49be33fe
JB
224 if (
225 workerChoiceStrategyOptions.weights != null &&
6b27d407 226 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
227 ) {
228 throw new Error(
229 'Invalid worker choice strategy options: must have a weight for each worker node'
230 )
231 }
f0d7f803
JB
232 if (
233 workerChoiceStrategyOptions.measurement != null &&
234 !Object.values(Measurements).includes(
235 workerChoiceStrategyOptions.measurement
236 )
237 ) {
238 throw new Error(
239 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
240 )
241 }
0d80593b
JB
242 }
243
a20f0ba5
JB
244 private checkValidTasksQueueOptions (
245 tasksQueueOptions: TasksQueueOptions
246 ): void {
0d80593b
JB
247 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
248 throw new TypeError('Invalid tasks queue options: must be a plain object')
249 }
f0d7f803
JB
250 if (
251 tasksQueueOptions?.concurrency != null &&
252 !Number.isSafeInteger(tasksQueueOptions.concurrency)
253 ) {
254 throw new TypeError(
255 'Invalid worker tasks concurrency: must be an integer'
256 )
257 }
258 if (
259 tasksQueueOptions?.concurrency != null &&
260 tasksQueueOptions.concurrency <= 0
261 ) {
a20f0ba5 262 throw new Error(
f0d7f803 263 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
264 )
265 }
266 }
267
08f3f44c 268 /** @inheritDoc */
6b27d407
JB
269 public get info (): PoolInfo {
270 return {
23ccf9d7 271 version,
6b27d407 272 type: this.type,
184855e6 273 worker: this.worker,
2431bdb4
JB
274 ready: this.ready,
275 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
276 minSize: this.minSize,
277 maxSize: this.maxSize,
c05f0d50
JB
278 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
279 .runTime.aggregate &&
1305e9a8
JB
280 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
281 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
282 workerNodes: this.workerNodes.length,
283 idleWorkerNodes: this.workerNodes.reduce(
284 (accumulator, workerNode) =>
f59e1027 285 workerNode.usage.tasks.executing === 0
a4e07f72
JB
286 ? accumulator + 1
287 : accumulator,
6b27d407
JB
288 0
289 ),
290 busyWorkerNodes: this.workerNodes.reduce(
291 (accumulator, workerNode) =>
f59e1027 292 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
293 0
294 ),
a4e07f72 295 executedTasks: this.workerNodes.reduce(
6b27d407 296 (accumulator, workerNode) =>
f59e1027 297 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
298 0
299 ),
300 executingTasks: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
f59e1027 302 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
303 0
304 ),
305 queuedTasks: this.workerNodes.reduce(
df593701 306 (accumulator, workerNode) =>
f59e1027 307 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
308 0
309 ),
310 maxQueuedTasks: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
b25a42cd 312 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 313 0
a4e07f72
JB
314 ),
315 failedTasks: this.workerNodes.reduce(
316 (accumulator, workerNode) =>
f59e1027 317 accumulator + workerNode.usage.tasks.failed,
a4e07f72 318 0
1dcf8b7b
JB
319 ),
320 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
321 .runTime.aggregate && {
322 runTime: {
98e72cda
JB
323 minimum: round(
324 Math.min(
325 ...this.workerNodes.map(
326 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
327 )
1dcf8b7b
JB
328 )
329 ),
98e72cda
JB
330 maximum: round(
331 Math.max(
332 ...this.workerNodes.map(
333 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
334 )
1dcf8b7b 335 )
98e72cda
JB
336 ),
337 average: round(
338 this.workerNodes.reduce(
339 (accumulator, workerNode) =>
340 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
341 0
342 ) /
343 this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + (workerNode.usage.tasks?.executed ?? 0),
346 0
347 )
348 ),
349 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
350 .runTime.median && {
351 median: round(
352 median(
353 this.workerNodes.map(
354 workerNode => workerNode.usage.runTime?.median ?? 0
355 )
356 )
357 )
358 })
1dcf8b7b
JB
359 }
360 }),
361 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
362 .waitTime.aggregate && {
363 waitTime: {
98e72cda
JB
364 minimum: round(
365 Math.min(
366 ...this.workerNodes.map(
367 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
368 )
1dcf8b7b
JB
369 )
370 ),
98e72cda
JB
371 maximum: round(
372 Math.max(
373 ...this.workerNodes.map(
374 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
375 )
1dcf8b7b 376 )
98e72cda
JB
377 ),
378 average: round(
379 this.workerNodes.reduce(
380 (accumulator, workerNode) =>
381 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
382 0
383 ) /
384 this.workerNodes.reduce(
385 (accumulator, workerNode) =>
386 accumulator + (workerNode.usage.tasks?.executed ?? 0),
387 0
388 )
389 ),
390 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
391 .waitTime.median && {
392 median: round(
393 median(
394 this.workerNodes.map(
395 workerNode => workerNode.usage.waitTime?.median ?? 0
396 )
397 )
398 )
399 })
1dcf8b7b
JB
400 }
401 })
6b27d407
JB
402 }
403 }
08f3f44c 404
2431bdb4
JB
405 private get starting (): boolean {
406 return (
d5024c00
JB
407 this.workerNodes.length < this.minSize ||
408 (this.workerNodes.length >= this.minSize &&
409 this.workerNodes.some(workerNode => !workerNode.info.ready))
2431bdb4
JB
410 )
411 }
412
413 private get ready (): boolean {
414 return (
d5024c00
JB
415 this.workerNodes.length >= this.minSize &&
416 this.workerNodes.every(workerNode => workerNode.info.ready)
2431bdb4
JB
417 )
418 }
419
afe0d5bf
JB
420 /**
421 * Gets the approximate pool utilization.
422 *
423 * @returns The pool utilization.
424 */
425 private get utilization (): number {
fe7d90db
JB
426 const poolRunTimeCapacity =
427 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
428 const totalTasksRunTime = this.workerNodes.reduce(
429 (accumulator, workerNode) =>
71514351 430 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
431 0
432 )
433 const totalTasksWaitTime = this.workerNodes.reduce(
434 (accumulator, workerNode) =>
71514351 435 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
436 0
437 )
438 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
439 }
440
8881ae32
JB
441 /**
442 * Pool type.
443 *
444 * If it is `'dynamic'`, it provides the `max` property.
445 */
446 protected abstract get type (): PoolType
447
184855e6
JB
448 /**
449 * Gets the worker type.
450 */
451 protected abstract get worker (): WorkerType
452
c2ade475 453 /**
6b27d407 454 * Pool minimum size.
c2ade475 455 */
6b27d407 456 protected abstract get minSize (): number
ff733df7
JB
457
458 /**
6b27d407 459 * Pool maximum size.
ff733df7 460 */
6b27d407 461 protected abstract get maxSize (): number
a35560ba 462
f59e1027
JB
463 /**
464 * Get the worker given its id.
465 *
466 * @param workerId - The worker id.
467 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
468 */
469 private getWorkerById (workerId: number): Worker | undefined {
470 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
471 ?.worker
472 }
473
21f710aa
JB
474 private checkMessageWorkerId (message: MessageValue<Response>): void {
475 if (
476 message.workerId != null &&
477 this.getWorkerById(message.workerId) == null
478 ) {
479 throw new Error(
480 `Worker message received from unknown worker '${message.workerId}'`
481 )
482 }
483 }
484
ffcbbad8 485 /**
f06e48d8 486 * Gets the given worker its worker node key.
ffcbbad8
JB
487 *
488 * @param worker - The worker.
f59e1027 489 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 490 */
f06e48d8
JB
491 private getWorkerNodeKey (worker: Worker): number {
492 return this.workerNodes.findIndex(
493 workerNode => workerNode.worker === worker
494 )
bf9549ae
JB
495 }
496
afc003b2 497 /** @inheritDoc */
a35560ba 498 public setWorkerChoiceStrategy (
59219cbb
JB
499 workerChoiceStrategy: WorkerChoiceStrategy,
500 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 501 ): void {
aee46736 502 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 503 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
504 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
505 this.opts.workerChoiceStrategy
506 )
507 if (workerChoiceStrategyOptions != null) {
508 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
509 }
8a1260a3 510 for (const workerNode of this.workerNodes) {
4b628b48 511 workerNode.resetUsage()
b6b32453 512 this.setWorkerStatistics(workerNode.worker)
59219cbb 513 }
a20f0ba5
JB
514 }
515
516 /** @inheritDoc */
517 public setWorkerChoiceStrategyOptions (
518 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
519 ): void {
0d80593b 520 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
521 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
522 this.workerChoiceStrategyContext.setOptions(
523 this.opts.workerChoiceStrategyOptions
a35560ba
S
524 )
525 }
526
a20f0ba5 527 /** @inheritDoc */
8f52842f
JB
528 public enableTasksQueue (
529 enable: boolean,
530 tasksQueueOptions?: TasksQueueOptions
531 ): void {
a20f0ba5 532 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 533 this.flushTasksQueues()
a20f0ba5
JB
534 }
535 this.opts.enableTasksQueue = enable
8f52842f 536 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
537 }
538
539 /** @inheritDoc */
8f52842f 540 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 541 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
542 this.checkValidTasksQueueOptions(tasksQueueOptions)
543 this.opts.tasksQueueOptions =
544 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 545 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
546 delete this.opts.tasksQueueOptions
547 }
548 }
549
550 private buildTasksQueueOptions (
551 tasksQueueOptions: TasksQueueOptions
552 ): TasksQueueOptions {
553 return {
554 concurrency: tasksQueueOptions?.concurrency ?? 1
555 }
556 }
557
c319c66b
JB
558 /**
559 * Whether the pool is full or not.
560 *
561 * The pool filling boolean status.
562 */
dea903a8
JB
563 protected get full (): boolean {
564 return this.workerNodes.length >= this.maxSize
565 }
c2ade475 566
c319c66b
JB
567 /**
568 * Whether the pool is busy or not.
569 *
570 * The pool busyness boolean status.
571 */
572 protected abstract get busy (): boolean
7c0ba920 573
6c6afb84
JB
574 /**
575 * Whether worker nodes are executing at least one task.
576 *
577 * @returns Worker nodes busyness boolean status.
578 */
c2ade475 579 protected internalBusy (): boolean {
e0ae6100
JB
580 return (
581 this.workerNodes.findIndex(workerNode => {
f59e1027 582 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
583 }) === -1
584 )
cb70b19d
JB
585 }
586
afc003b2 587 /** @inheritDoc */
a86b6df1 588 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 589 const timestamp = performance.now()
20dcad1a 590 const workerNodeKey = this.chooseWorkerNode()
adc3c320 591 const submittedTask: Task<Data> = {
ff128cc9 592 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
593 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
594 data: data ?? ({} as Data),
b6b32453 595 timestamp,
21f710aa 596 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 597 id: randomUUID()
adc3c320 598 }
2e81254d 599 const res = new Promise<Response>((resolve, reject) => {
02706357 600 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
601 resolve,
602 reject,
20dcad1a 603 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
604 })
605 })
ff733df7
JB
606 if (
607 this.opts.enableTasksQueue === true &&
7171d33f 608 (this.busy ||
f59e1027 609 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 610 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 611 .concurrency as number))
ff733df7 612 ) {
26a929d7
JB
613 this.enqueueTask(workerNodeKey, submittedTask)
614 } else {
2e81254d 615 this.executeTask(workerNodeKey, submittedTask)
adc3c320 616 }
ff733df7 617 this.checkAndEmitEvents()
78cea37e 618 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
619 return res
620 }
c97c7edb 621
afc003b2 622 /** @inheritDoc */
c97c7edb 623 public async destroy (): Promise<void> {
1fbcaa7c 624 await Promise.all(
875a7c37
JB
625 this.workerNodes.map(async (workerNode, workerNodeKey) => {
626 this.flushTasksQueue(workerNodeKey)
47aacbaa 627 // FIXME: wait for tasks to be finished
920278a2
JB
628 const workerExitPromise = new Promise<void>(resolve => {
629 workerNode.worker.on('exit', () => {
630 resolve()
631 })
632 })
f06e48d8 633 await this.destroyWorker(workerNode.worker)
920278a2 634 await workerExitPromise
1fbcaa7c
JB
635 })
636 )
c97c7edb
S
637 }
638
4a6952ff 639 /**
6c6afb84 640 * Terminates the given worker.
4a6952ff 641 *
f06e48d8 642 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
643 */
644 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 645
729c563d 646 /**
6677a3d3
JB
647 * Setup hook to execute code before worker nodes are created in the abstract constructor.
648 * Can be overridden.
afc003b2
JB
649 *
650 * @virtual
729c563d 651 */
280c2a77 652 protected setupHook (): void {
d99ba5a8 653 // Intentionally empty
280c2a77 654 }
c97c7edb 655
729c563d 656 /**
280c2a77
S
657 * Should return whether the worker is the main worker or not.
658 */
659 protected abstract isMain (): boolean
660
661 /**
2e81254d 662 * Hook executed before the worker task execution.
bf9549ae 663 * Can be overridden.
729c563d 664 *
f06e48d8 665 * @param workerNodeKey - The worker node key.
1c6fe997 666 * @param task - The task to execute.
729c563d 667 */
1c6fe997
JB
668 protected beforeTaskExecutionHook (
669 workerNodeKey: number,
670 task: Task<Data>
671 ): void {
f59e1027 672 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
673 ++workerUsage.tasks.executing
674 this.updateWaitTimeWorkerUsage(workerUsage, task)
ce1b31be
JB
675 const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
676 task.name as string
677 ) as WorkerUsage
ff128cc9
JB
678 ++tasksWorkerUsage.tasks.executing
679 this.updateWaitTimeWorkerUsage(tasksWorkerUsage, task)
c97c7edb
S
680 }
681
c01733f1 682 /**
2e81254d 683 * Hook executed after the worker task execution.
bf9549ae 684 * Can be overridden.
c01733f1 685 *
c923ce56 686 * @param worker - The worker.
38e795c1 687 * @param message - The received message.
c01733f1 688 */
2e81254d 689 protected afterTaskExecutionHook (
c923ce56 690 worker: Worker,
2740a743 691 message: MessageValue<Response>
bf9549ae 692 ): void {
ff128cc9
JB
693 const workerNodeKey = this.getWorkerNodeKey(worker)
694 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
695 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
696 this.updateRunTimeWorkerUsage(workerUsage, message)
697 this.updateEluWorkerUsage(workerUsage, message)
ce1b31be
JB
698 const tasksWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
699 message.name as string
700 ) as WorkerUsage
ff128cc9
JB
701 this.updateTaskStatisticsWorkerUsage(tasksWorkerUsage, message)
702 this.updateRunTimeWorkerUsage(tasksWorkerUsage, message)
703 this.updateEluWorkerUsage(tasksWorkerUsage, message)
f1c06930
JB
704 }
705
706 private updateTaskStatisticsWorkerUsage (
707 workerUsage: WorkerUsage,
708 message: MessageValue<Response>
709 ): void {
a4e07f72
JB
710 const workerTaskStatistics = workerUsage.tasks
711 --workerTaskStatistics.executing
98e72cda
JB
712 if (message.taskError == null) {
713 ++workerTaskStatistics.executed
714 } else {
a4e07f72 715 ++workerTaskStatistics.failed
2740a743 716 }
f8eb0a2a
JB
717 }
718
a4e07f72
JB
719 private updateRunTimeWorkerUsage (
720 workerUsage: WorkerUsage,
f8eb0a2a
JB
721 message: MessageValue<Response>
722 ): void {
87de9ff5
JB
723 if (
724 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 725 .aggregate
87de9ff5 726 ) {
f7510105 727 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
728 workerUsage.runTime.aggregate =
729 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
730 workerUsage.runTime.minimum = Math.min(
731 taskRunTime,
71514351 732 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
733 )
734 workerUsage.runTime.maximum = Math.max(
735 taskRunTime,
71514351 736 workerUsage.runTime?.maximum ?? -Infinity
f7510105 737 )
c6bd2650 738 if (
932fc8be
JB
739 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
740 .average &&
a4e07f72 741 workerUsage.tasks.executed !== 0
c6bd2650 742 ) {
a4e07f72 743 workerUsage.runTime.average =
98e72cda 744 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 745 }
3fa4cdd2 746 if (
932fc8be
JB
747 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
748 .median &&
d715b7bc 749 message.taskPerformance?.runTime != null
3fa4cdd2 750 ) {
a4e07f72
JB
751 workerUsage.runTime.history.push(message.taskPerformance.runTime)
752 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 753 }
3032893a 754 }
f8eb0a2a
JB
755 }
756
a4e07f72
JB
757 private updateWaitTimeWorkerUsage (
758 workerUsage: WorkerUsage,
1c6fe997 759 task: Task<Data>
f8eb0a2a 760 ): void {
1c6fe997
JB
761 const timestamp = performance.now()
762 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
763 if (
764 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 765 .aggregate
87de9ff5 766 ) {
71514351
JB
767 workerUsage.waitTime.aggregate =
768 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
769 workerUsage.waitTime.minimum = Math.min(
770 taskWaitTime,
71514351 771 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
772 )
773 workerUsage.waitTime.maximum = Math.max(
774 taskWaitTime,
71514351 775 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 776 )
09a6305f 777 if (
87de9ff5 778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 779 .waitTime.average &&
a4e07f72 780 workerUsage.tasks.executed !== 0
09a6305f 781 ) {
a4e07f72 782 workerUsage.waitTime.average =
98e72cda 783 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
784 }
785 if (
87de9ff5 786 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 787 .waitTime.median
09a6305f 788 ) {
1c6fe997 789 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 790 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 791 }
0567595a 792 }
c01733f1 793 }
794
a4e07f72 795 private updateEluWorkerUsage (
5df69fab 796 workerUsage: WorkerUsage,
62c15a68
JB
797 message: MessageValue<Response>
798 ): void {
5df69fab
JB
799 if (
800 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
801 .aggregate
802 ) {
f7510105 803 if (message.taskPerformance?.elu != null) {
71514351
JB
804 workerUsage.elu.idle.aggregate =
805 (workerUsage.elu.idle?.aggregate ?? 0) +
806 message.taskPerformance.elu.idle
807 workerUsage.elu.active.aggregate =
808 (workerUsage.elu.active?.aggregate ?? 0) +
809 message.taskPerformance.elu.active
f7510105
JB
810 if (workerUsage.elu.utilization != null) {
811 workerUsage.elu.utilization =
812 (workerUsage.elu.utilization +
813 message.taskPerformance.elu.utilization) /
814 2
815 } else {
816 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
817 }
818 workerUsage.elu.idle.minimum = Math.min(
819 message.taskPerformance.elu.idle,
71514351 820 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
821 )
822 workerUsage.elu.idle.maximum = Math.max(
823 message.taskPerformance.elu.idle,
71514351 824 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
825 )
826 workerUsage.elu.active.minimum = Math.min(
827 message.taskPerformance.elu.active,
71514351 828 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
829 )
830 workerUsage.elu.active.maximum = Math.max(
831 message.taskPerformance.elu.active,
71514351 832 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 833 )
71514351
JB
834 if (
835 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
836 .average &&
837 workerUsage.tasks.executed !== 0
838 ) {
71514351 839 workerUsage.elu.idle.average =
98e72cda 840 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 841 workerUsage.elu.active.average =
98e72cda 842 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
843 }
844 if (
845 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 846 .median
71514351
JB
847 ) {
848 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
849 workerUsage.elu.active.history.push(
850 message.taskPerformance.elu.active
851 )
852 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
853 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
854 }
62c15a68
JB
855 }
856 }
857 }
858
280c2a77 859 /**
f06e48d8 860 * Chooses a worker node for the next task.
280c2a77 861 *
6c6afb84 862 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 863 *
20dcad1a 864 * @returns The worker node key
280c2a77 865 */
6c6afb84 866 private chooseWorkerNode (): number {
930dcf12 867 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
868 const worker = this.createAndSetupDynamicWorker()
869 if (
870 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
871 ) {
872 return this.getWorkerNodeKey(worker)
873 }
17393ac8 874 }
930dcf12
JB
875 return this.workerChoiceStrategyContext.execute()
876 }
877
6c6afb84
JB
878 /**
879 * Conditions for dynamic worker creation.
880 *
881 * @returns Whether to create a dynamic worker or not.
882 */
883 private shallCreateDynamicWorker (): boolean {
930dcf12 884 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
885 }
886
280c2a77 887 /**
675bb809 888 * Sends a message to the given worker.
280c2a77 889 *
38e795c1
JB
890 * @param worker - The worker which should receive the message.
891 * @param message - The message.
280c2a77
S
892 */
893 protected abstract sendToWorker (
894 worker: Worker,
895 message: MessageValue<Data>
896 ): void
897
4a6952ff 898 /**
f06e48d8 899 * Registers a listener callback on the given worker.
4a6952ff 900 *
38e795c1
JB
901 * @param worker - The worker which should register a listener.
902 * @param listener - The message listener callback.
4a6952ff 903 */
e102732c
JB
904 private registerWorkerMessageListener<Message extends Data | Response>(
905 worker: Worker,
906 listener: (message: MessageValue<Message>) => void
907 ): void {
908 worker.on('message', listener as MessageHandler<Worker>)
909 }
c97c7edb 910
729c563d 911 /**
41344292 912 * Creates a new worker.
6c6afb84
JB
913 *
914 * @returns Newly created worker.
729c563d 915 */
280c2a77 916 protected abstract createWorker (): Worker
c97c7edb 917
729c563d 918 /**
f06e48d8 919 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 920 * Can be overridden.
729c563d 921 *
38e795c1 922 * @param worker - The newly created worker.
729c563d 923 */
6677a3d3 924 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
925 // Listen to worker messages.
926 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
927 // Send startup message to worker.
928 this.sendToWorker(worker, {
929 ready: false,
21f710aa 930 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
2431bdb4
JB
931 })
932 // Setup worker task statistics computation.
933 this.setWorkerStatistics(worker)
e102732c 934 }
c97c7edb 935
4a6952ff 936 /**
f06e48d8 937 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
938 *
939 * @returns New, completely set up worker.
940 */
941 protected createAndSetupWorker (): Worker {
bdacc2d2 942 const worker = this.createWorker()
280c2a77 943
35cf1c03 944 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 945 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
946 worker.on('error', error => {
947 if (this.emitter != null) {
948 this.emitter.emit(PoolEvents.error, error)
949 }
5bc91f3e 950 if (this.opts.enableTasksQueue === true) {
7d2e3f59 951 this.redistributeQueuedTasks(worker)
5bc91f3e 952 }
2431bdb4 953 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
954 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
955 this.createAndSetupDynamicWorker()
956 } else {
957 this.createAndSetupWorker()
958 }
5baee0d7
JB
959 }
960 })
a35560ba
S
961 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
962 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 963 worker.once('exit', () => {
f06e48d8 964 this.removeWorkerNode(worker)
a974afa6 965 })
280c2a77 966
f06e48d8 967 this.pushWorkerNode(worker)
280c2a77
S
968
969 this.afterWorkerSetup(worker)
970
c97c7edb
S
971 return worker
972 }
be0676b3 973
7d2e3f59
JB
974 private redistributeQueuedTasks (worker: Worker): void {
975 const workerNodeKey = this.getWorkerNodeKey(worker)
976 while (this.tasksQueueSize(workerNodeKey) > 0) {
977 let targetWorkerNodeKey: number = workerNodeKey
978 let minQueuedTasks = Infinity
979 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
980 if (
981 workerNodeId !== workerNodeKey &&
982 workerNode.usage.tasks.queued === 0
983 ) {
984 targetWorkerNodeKey = workerNodeId
985 break
986 }
987 if (
988 workerNodeId !== workerNodeKey &&
989 workerNode.usage.tasks.queued < minQueuedTasks
990 ) {
991 minQueuedTasks = workerNode.usage.tasks.queued
992 targetWorkerNodeKey = workerNodeId
993 }
994 }
995 this.enqueueTask(
996 targetWorkerNodeKey,
997 this.dequeueTask(workerNodeKey) as Task<Data>
998 )
999 }
1000 }
1001
930dcf12
JB
1002 /**
1003 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
1004 *
1005 * @returns New, completely set up dynamic worker.
1006 */
1007 protected createAndSetupDynamicWorker (): Worker {
1008 const worker = this.createAndSetupWorker()
1009 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 1010 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
1011 if (
1012 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
1013 (message.kill != null &&
1014 ((this.opts.enableTasksQueue === false &&
f59e1027 1015 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 1016 (this.opts.enableTasksQueue === true &&
f59e1027 1017 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 1018 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
1019 ) {
1020 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
1021 void (this.destroyWorker(worker) as Promise<void>)
1022 }
1023 })
21f710aa
JB
1024 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1025 workerInfo.dynamic = true
1026 this.sendToWorker(worker, {
1027 checkAlive: true,
1028 workerId: workerInfo.id as number
1029 })
930dcf12
JB
1030 return worker
1031 }
1032
be0676b3 1033 /**
ff733df7 1034 * This function is the listener registered for each worker message.
be0676b3 1035 *
bdacc2d2 1036 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1037 */
1038 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1039 return message => {
21f710aa 1040 this.checkMessageWorkerId(message)
2431bdb4 1041 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1042 // Worker ready message received
1043 this.handleWorkerReadyMessage(message)
f59e1027 1044 } else if (message.id != null) {
a3445496 1045 // Task execution response received
6b272951
JB
1046 this.handleTaskExecutionResponse(message)
1047 }
1048 }
1049 }
1050
7c8381eb 1051 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1052 const worker = this.getWorkerById(message.workerId)
1053 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1054 message.ready as boolean
2431bdb4
JB
1055 if (this.emitter != null && this.ready) {
1056 this.emitter.emit(PoolEvents.ready, this.info)
1057 }
6b272951
JB
1058 }
1059
1060 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1061 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1062 if (promiseResponse != null) {
1063 if (message.taskError != null) {
1064 if (this.emitter != null) {
1065 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1066 }
6b272951
JB
1067 promiseResponse.reject(message.taskError.message)
1068 } else {
1069 promiseResponse.resolve(message.data as Response)
1070 }
1071 this.afterTaskExecutionHook(promiseResponse.worker, message)
1072 this.promiseResponseMap.delete(message.id as string)
1073 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1074 if (
1075 this.opts.enableTasksQueue === true &&
1076 this.tasksQueueSize(workerNodeKey) > 0
1077 ) {
1078 this.executeTask(
1079 workerNodeKey,
1080 this.dequeueTask(workerNodeKey) as Task<Data>
1081 )
be0676b3 1082 }
6b272951 1083 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1084 }
be0676b3 1085 }
7c0ba920 1086
ff733df7 1087 private checkAndEmitEvents (): void {
1f68cede 1088 if (this.emitter != null) {
ff733df7 1089 if (this.busy) {
2845f2a5 1090 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1091 }
6b27d407 1092 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1093 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1094 }
164d950a
JB
1095 }
1096 }
1097
8a1260a3
JB
1098 /**
1099 * Gets the worker information.
1100 *
1101 * @param workerNodeKey - The worker node key.
1102 */
1103 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1104 return this.workerNodes[workerNodeKey].info
1105 }
1106
a05c10de 1107 /**
f06e48d8 1108 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1109 *
38e795c1 1110 * @param worker - The worker.
f06e48d8 1111 * @returns The worker nodes length.
ea7a90d3 1112 */
f06e48d8 1113 private pushWorkerNode (worker: Worker): number {
4b628b48 1114 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1115 }
c923ce56 1116
51fe3d3c 1117 /**
f06e48d8 1118 * Removes the given worker from the pool worker nodes.
51fe3d3c 1119 *
f06e48d8 1120 * @param worker - The worker.
51fe3d3c 1121 */
416fd65c 1122 private removeWorkerNode (worker: Worker): void {
f06e48d8 1123 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1124 if (workerNodeKey !== -1) {
1125 this.workerNodes.splice(workerNodeKey, 1)
1126 this.workerChoiceStrategyContext.remove(workerNodeKey)
1127 }
51fe3d3c 1128 }
adc3c320 1129
2e81254d 1130 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1131 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1132 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1133 }
1134
f9f00b5f 1135 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1136 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1137 }
1138
416fd65c 1139 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1140 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1141 }
1142
416fd65c 1143 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1144 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1145 }
1146
416fd65c 1147 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1148 while (this.tasksQueueSize(workerNodeKey) > 0) {
1149 this.executeTask(
1150 workerNodeKey,
1151 this.dequeueTask(workerNodeKey) as Task<Data>
1152 )
ff733df7 1153 }
4b628b48 1154 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1155 }
1156
ef41a6e6
JB
1157 private flushTasksQueues (): void {
1158 for (const [workerNodeKey] of this.workerNodes.entries()) {
1159 this.flushTasksQueue(workerNodeKey)
1160 }
1161 }
b6b32453
JB
1162
1163 private setWorkerStatistics (worker: Worker): void {
1164 this.sendToWorker(worker, {
1165 statistics: {
87de9ff5
JB
1166 runTime:
1167 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1168 .runTime.aggregate,
87de9ff5 1169 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1170 .elu.aggregate
21f710aa
JB
1171 },
1172 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1173 })
1174 }
c97c7edb 1175}