build: bump volta pnpm version
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
5c4d16da
JB
3import type {
4 MessageValue,
5 PromiseResponseWrapper,
6 Task
7} from '../utility-types'
bbeadd16 8import {
ff128cc9 9 DEFAULT_TASK_NAME,
bbeadd16
JB
10 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
11 EMPTY_FUNCTION,
59317253 12 isKillBehavior,
0d80593b 13 isPlainObject,
afe0d5bf
JB
14 median,
15 round
bbeadd16 16} from '../utils'
59317253 17import { KillBehaviors } from '../worker/worker-options'
c4855468 18import {
65d7a1c9 19 type IPool,
7c5a1080 20 PoolEmitter,
c4855468 21 PoolEvents,
6b27d407 22 type PoolInfo,
c4855468 23 type PoolOptions,
6b27d407
JB
24 type PoolType,
25 PoolTypes,
4b628b48 26 type TasksQueueOptions
c4855468 27} from './pool'
e102732c
JB
28import type {
29 IWorker,
4b628b48 30 IWorkerNode,
e102732c 31 MessageHandler,
8a1260a3 32 WorkerInfo,
4b628b48 33 WorkerType,
e102732c
JB
34 WorkerUsage
35} from './worker'
a35560ba 36import {
f0d7f803 37 Measurements,
a35560ba 38 WorkerChoiceStrategies,
a20f0ba5
JB
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
bdaf31cd
JB
41} from './selection-strategies/selection-strategies-types'
42import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 43import { version } from './version'
4b628b48 44import { WorkerNode } from './worker-node'
23ccf9d7 45
729c563d 46/**
ea7a90d3 47 * Base class that implements some shared logic for all poolifier pools.
729c563d 48 *
38e795c1 49 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
50 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
51 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 52 */
c97c7edb 53export abstract class AbstractPool<
f06e48d8 54 Worker extends IWorker,
d3c8a1a8
S
55 Data = unknown,
56 Response = unknown
c4855468 57> implements IPool<Worker, Data, Response> {
afc003b2 58 /** @inheritDoc */
4b628b48 59 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 60
afc003b2 61 /** @inheritDoc */
7c0ba920
JB
62 public readonly emitter?: PoolEmitter
63
be0676b3 64 /**
a3445496 65 * The execution response promise map.
be0676b3 66 *
2740a743 67 * - `key`: The message id of each submitted task.
a3445496 68 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 69 *
a3445496 70 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 71 */
c923ce56
JB
72 protected promiseResponseMap: Map<
73 string,
74 PromiseResponseWrapper<Worker, Response>
75 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
afe0d5bf
JB
86 /**
87 * The start timestamp of the pool.
88 */
89 private readonly startTimestamp
90
729c563d
S
91 /**
92 * Constructs a new poolifier pool.
93 *
38e795c1 94 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 95 * @param filePath - Path to the worker file.
38e795c1 96 * @param opts - Options for the pool.
729c563d 97 */
c97c7edb 98 public constructor (
b4213b7f
JB
99 protected readonly numberOfWorkers: number,
100 protected readonly filePath: string,
101 protected readonly opts: PoolOptions<Worker>
c97c7edb 102 ) {
78cea37e 103 if (!this.isMain()) {
c97c7edb
S
104 throw new Error('Cannot start a pool from a worker!')
105 }
8d3782fa 106 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 107 this.checkFilePath(this.filePath)
7c0ba920 108 this.checkPoolOptions(this.opts)
1086026a 109
7254e419
JB
110 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
111 this.executeTask = this.executeTask.bind(this)
112 this.enqueueTask = this.enqueueTask.bind(this)
113 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 114
6bd72cd0 115 if (this.opts.enableEvents === true) {
7c0ba920
JB
116 this.emitter = new PoolEmitter()
117 }
d59df138
JB
118 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
119 Worker,
120 Data,
121 Response
da309861
JB
122 >(
123 this,
124 this.opts.workerChoiceStrategy,
125 this.opts.workerChoiceStrategyOptions
126 )
b6b32453
JB
127
128 this.setupHook()
129
afe0d5bf 130 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
131 this.createAndSetupWorker()
132 }
afe0d5bf
JB
133
134 this.startTimestamp = performance.now()
c97c7edb
S
135 }
136
a35560ba 137 private checkFilePath (filePath: string): void {
ffcbbad8
JB
138 if (
139 filePath == null ||
140 (typeof filePath === 'string' && filePath.trim().length === 0)
141 ) {
c510fea7
APA
142 throw new Error('Please specify a file with a worker implementation')
143 }
144 }
145
8d3782fa
JB
146 private checkNumberOfWorkers (numberOfWorkers: number): void {
147 if (numberOfWorkers == null) {
148 throw new Error(
149 'Cannot instantiate a pool without specifying the number of workers'
150 )
78cea37e 151 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 152 throw new TypeError(
0d80593b 153 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
154 )
155 } else if (numberOfWorkers < 0) {
473c717a 156 throw new RangeError(
8d3782fa
JB
157 'Cannot instantiate a pool with a negative number of workers'
158 )
6b27d407 159 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
160 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
161 }
162 }
163
164 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
165 if (this.type === PoolTypes.dynamic) {
166 if (min > max) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 )
170 } else if (min === 0 && max === 0) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
173 )
174 } else if (min === max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 )
178 }
8d3782fa
JB
179 }
180 }
181
7c0ba920 182 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
183 if (isPlainObject(opts)) {
184 this.opts.workerChoiceStrategy =
185 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
186 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
187 this.opts.workerChoiceStrategyOptions =
188 opts.workerChoiceStrategyOptions ??
189 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
190 this.checkValidWorkerChoiceStrategyOptions(
191 this.opts.workerChoiceStrategyOptions
192 )
1f68cede 193 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
194 this.opts.enableEvents = opts.enableEvents ?? true
195 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
196 if (this.opts.enableTasksQueue) {
197 this.checkValidTasksQueueOptions(
198 opts.tasksQueueOptions as TasksQueueOptions
199 )
200 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
201 opts.tasksQueueOptions as TasksQueueOptions
202 )
203 }
204 } else {
205 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 206 }
aee46736
JB
207 }
208
209 private checkValidWorkerChoiceStrategy (
210 workerChoiceStrategy: WorkerChoiceStrategy
211 ): void {
212 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 213 throw new Error(
aee46736 214 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
215 )
216 }
7c0ba920
JB
217 }
218
0d80593b
JB
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
221 ): void {
222 if (!isPlainObject(workerChoiceStrategyOptions)) {
223 throw new TypeError(
224 'Invalid worker choice strategy options: must be a plain object'
225 )
226 }
49be33fe
JB
227 if (
228 workerChoiceStrategyOptions.weights != null &&
6b27d407 229 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
230 ) {
231 throw new Error(
232 'Invalid worker choice strategy options: must have a weight for each worker node'
233 )
234 }
f0d7f803
JB
235 if (
236 workerChoiceStrategyOptions.measurement != null &&
237 !Object.values(Measurements).includes(
238 workerChoiceStrategyOptions.measurement
239 )
240 ) {
241 throw new Error(
242 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
243 )
244 }
0d80593b
JB
245 }
246
a20f0ba5
JB
247 private checkValidTasksQueueOptions (
248 tasksQueueOptions: TasksQueueOptions
249 ): void {
0d80593b
JB
250 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
251 throw new TypeError('Invalid tasks queue options: must be a plain object')
252 }
f0d7f803
JB
253 if (
254 tasksQueueOptions?.concurrency != null &&
255 !Number.isSafeInteger(tasksQueueOptions.concurrency)
256 ) {
257 throw new TypeError(
258 'Invalid worker tasks concurrency: must be an integer'
259 )
260 }
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 tasksQueueOptions.concurrency <= 0
264 ) {
a20f0ba5 265 throw new Error(
f0d7f803 266 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
267 )
268 }
269 }
270
08f3f44c 271 /** @inheritDoc */
6b27d407
JB
272 public get info (): PoolInfo {
273 return {
23ccf9d7 274 version,
6b27d407 275 type: this.type,
184855e6 276 worker: this.worker,
2431bdb4
JB
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
279 minSize: this.minSize,
280 maxSize: this.maxSize,
c05f0d50
JB
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
1305e9a8
JB
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
f59e1027 288 workerNode.usage.tasks.executing === 0
a4e07f72
JB
289 ? accumulator + 1
290 : accumulator,
6b27d407
JB
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
296 0
297 ),
a4e07f72 298 executedTasks: this.workerNodes.reduce(
6b27d407 299 (accumulator, workerNode) =>
f59e1027 300 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
306 0
307 ),
308 queuedTasks: this.workerNodes.reduce(
df593701 309 (accumulator, workerNode) =>
f59e1027 310 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
311 0
312 ),
313 maxQueuedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
b25a42cd 315 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 316 0
a4e07f72
JB
317 ),
318 failedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
f59e1027 320 accumulator + workerNode.usage.tasks.failed,
a4e07f72 321 0
1dcf8b7b
JB
322 ),
323 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
324 .runTime.aggregate && {
325 runTime: {
98e72cda
JB
326 minimum: round(
327 Math.min(
328 ...this.workerNodes.map(
329 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
330 )
1dcf8b7b
JB
331 )
332 ),
98e72cda
JB
333 maximum: round(
334 Math.max(
335 ...this.workerNodes.map(
336 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
337 )
1dcf8b7b 338 )
98e72cda
JB
339 ),
340 average: round(
341 this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
344 0
345 ) /
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.tasks?.executed ?? 0),
349 0
350 )
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.median && {
354 median: round(
355 median(
356 this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.median ?? 0
358 )
359 )
360 )
361 })
1dcf8b7b
JB
362 }
363 }),
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && {
366 waitTime: {
98e72cda
JB
367 minimum: round(
368 Math.min(
369 ...this.workerNodes.map(
370 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
371 )
1dcf8b7b
JB
372 )
373 ),
98e72cda
JB
374 maximum: round(
375 Math.max(
376 ...this.workerNodes.map(
377 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
378 )
1dcf8b7b 379 )
98e72cda
JB
380 ),
381 average: round(
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
385 0
386 ) /
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.tasks?.executed ?? 0),
390 0
391 )
392 ),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.median && {
395 median: round(
396 median(
397 this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.median ?? 0
399 )
400 )
401 )
402 })
1dcf8b7b
JB
403 }
404 })
6b27d407
JB
405 }
406 }
08f3f44c 407
2431bdb4
JB
408 private get starting (): boolean {
409 return (
d5024c00
JB
410 this.workerNodes.length < this.minSize ||
411 (this.workerNodes.length >= this.minSize &&
412 this.workerNodes.some(workerNode => !workerNode.info.ready))
2431bdb4
JB
413 )
414 }
415
416 private get ready (): boolean {
417 return (
d5024c00
JB
418 this.workerNodes.length >= this.minSize &&
419 this.workerNodes.every(workerNode => workerNode.info.ready)
2431bdb4
JB
420 )
421 }
422
afe0d5bf
JB
423 /**
424 * Gets the approximate pool utilization.
425 *
426 * @returns The pool utilization.
427 */
428 private get utilization (): number {
fe7d90db
JB
429 const poolRunTimeCapacity =
430 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
431 const totalTasksRunTime = this.workerNodes.reduce(
432 (accumulator, workerNode) =>
71514351 433 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
434 0
435 )
436 const totalTasksWaitTime = this.workerNodes.reduce(
437 (accumulator, workerNode) =>
71514351 438 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
439 0
440 )
441 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
442 }
443
8881ae32
JB
444 /**
445 * Pool type.
446 *
447 * If it is `'dynamic'`, it provides the `max` property.
448 */
449 protected abstract get type (): PoolType
450
184855e6
JB
451 /**
452 * Gets the worker type.
453 */
454 protected abstract get worker (): WorkerType
455
c2ade475 456 /**
6b27d407 457 * Pool minimum size.
c2ade475 458 */
6b27d407 459 protected abstract get minSize (): number
ff733df7
JB
460
461 /**
6b27d407 462 * Pool maximum size.
ff733df7 463 */
6b27d407 464 protected abstract get maxSize (): number
a35560ba 465
f59e1027
JB
466 /**
467 * Get the worker given its id.
468 *
469 * @param workerId - The worker id.
470 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
471 */
472 private getWorkerById (workerId: number): Worker | undefined {
473 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
474 ?.worker
475 }
476
6b813701
JB
477 /**
478 * Checks if the worker id sent in the received message from a worker is valid.
479 *
480 * @param message - The received message.
481 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
482 */
21f710aa
JB
483 private checkMessageWorkerId (message: MessageValue<Response>): void {
484 if (
485 message.workerId != null &&
486 this.getWorkerById(message.workerId) == null
487 ) {
488 throw new Error(
489 `Worker message received from unknown worker '${message.workerId}'`
490 )
491 }
492 }
493
ffcbbad8 494 /**
f06e48d8 495 * Gets the given worker its worker node key.
ffcbbad8
JB
496 *
497 * @param worker - The worker.
f59e1027 498 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 499 */
f06e48d8
JB
500 private getWorkerNodeKey (worker: Worker): number {
501 return this.workerNodes.findIndex(
502 workerNode => workerNode.worker === worker
503 )
bf9549ae
JB
504 }
505
afc003b2 506 /** @inheritDoc */
a35560ba 507 public setWorkerChoiceStrategy (
59219cbb
JB
508 workerChoiceStrategy: WorkerChoiceStrategy,
509 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 510 ): void {
aee46736 511 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 512 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
513 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
514 this.opts.workerChoiceStrategy
515 )
516 if (workerChoiceStrategyOptions != null) {
517 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
518 }
8a1260a3 519 for (const workerNode of this.workerNodes) {
4b628b48 520 workerNode.resetUsage()
b6b32453 521 this.setWorkerStatistics(workerNode.worker)
59219cbb 522 }
a20f0ba5
JB
523 }
524
525 /** @inheritDoc */
526 public setWorkerChoiceStrategyOptions (
527 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
528 ): void {
0d80593b 529 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
530 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
531 this.workerChoiceStrategyContext.setOptions(
532 this.opts.workerChoiceStrategyOptions
a35560ba
S
533 )
534 }
535
a20f0ba5 536 /** @inheritDoc */
8f52842f
JB
537 public enableTasksQueue (
538 enable: boolean,
539 tasksQueueOptions?: TasksQueueOptions
540 ): void {
a20f0ba5 541 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 542 this.flushTasksQueues()
a20f0ba5
JB
543 }
544 this.opts.enableTasksQueue = enable
8f52842f 545 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
546 }
547
548 /** @inheritDoc */
8f52842f 549 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 550 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
551 this.checkValidTasksQueueOptions(tasksQueueOptions)
552 this.opts.tasksQueueOptions =
553 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 554 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
555 delete this.opts.tasksQueueOptions
556 }
557 }
558
559 private buildTasksQueueOptions (
560 tasksQueueOptions: TasksQueueOptions
561 ): TasksQueueOptions {
562 return {
563 concurrency: tasksQueueOptions?.concurrency ?? 1
564 }
565 }
566
c319c66b
JB
567 /**
568 * Whether the pool is full or not.
569 *
570 * The pool filling boolean status.
571 */
dea903a8
JB
572 protected get full (): boolean {
573 return this.workerNodes.length >= this.maxSize
574 }
c2ade475 575
c319c66b
JB
576 /**
577 * Whether the pool is busy or not.
578 *
579 * The pool busyness boolean status.
580 */
581 protected abstract get busy (): boolean
7c0ba920 582
6c6afb84
JB
583 /**
584 * Whether worker nodes are executing at least one task.
585 *
586 * @returns Worker nodes busyness boolean status.
587 */
c2ade475 588 protected internalBusy (): boolean {
e0ae6100
JB
589 return (
590 this.workerNodes.findIndex(workerNode => {
f59e1027 591 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
592 }) === -1
593 )
cb70b19d
JB
594 }
595
afc003b2 596 /** @inheritDoc */
a86b6df1 597 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 598 const timestamp = performance.now()
20dcad1a 599 const workerNodeKey = this.chooseWorkerNode()
adc3c320 600 const submittedTask: Task<Data> = {
ff128cc9 601 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
602 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
603 data: data ?? ({} as Data),
b6b32453 604 timestamp,
21f710aa 605 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 606 id: randomUUID()
adc3c320 607 }
2e81254d 608 const res = new Promise<Response>((resolve, reject) => {
02706357 609 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
610 resolve,
611 reject,
20dcad1a 612 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
613 })
614 })
ff733df7
JB
615 if (
616 this.opts.enableTasksQueue === true &&
7171d33f 617 (this.busy ||
f59e1027 618 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 619 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 620 .concurrency as number))
ff733df7 621 ) {
26a929d7
JB
622 this.enqueueTask(workerNodeKey, submittedTask)
623 } else {
2e81254d 624 this.executeTask(workerNodeKey, submittedTask)
adc3c320 625 }
ff733df7 626 this.checkAndEmitEvents()
78cea37e 627 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
628 return res
629 }
c97c7edb 630
afc003b2 631 /** @inheritDoc */
c97c7edb 632 public async destroy (): Promise<void> {
1fbcaa7c 633 await Promise.all(
875a7c37
JB
634 this.workerNodes.map(async (workerNode, workerNodeKey) => {
635 this.flushTasksQueue(workerNodeKey)
47aacbaa 636 // FIXME: wait for tasks to be finished
920278a2
JB
637 const workerExitPromise = new Promise<void>(resolve => {
638 workerNode.worker.on('exit', () => {
639 resolve()
640 })
641 })
f06e48d8 642 await this.destroyWorker(workerNode.worker)
920278a2 643 await workerExitPromise
1fbcaa7c
JB
644 })
645 )
c97c7edb
S
646 }
647
4a6952ff 648 /**
6c6afb84 649 * Terminates the given worker.
4a6952ff 650 *
f06e48d8 651 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
652 */
653 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 654
729c563d 655 /**
6677a3d3
JB
656 * Setup hook to execute code before worker nodes are created in the abstract constructor.
657 * Can be overridden.
afc003b2
JB
658 *
659 * @virtual
729c563d 660 */
280c2a77 661 protected setupHook (): void {
d99ba5a8 662 // Intentionally empty
280c2a77 663 }
c97c7edb 664
729c563d 665 /**
280c2a77
S
666 * Should return whether the worker is the main worker or not.
667 */
668 protected abstract isMain (): boolean
669
670 /**
2e81254d 671 * Hook executed before the worker task execution.
bf9549ae 672 * Can be overridden.
729c563d 673 *
f06e48d8 674 * @param workerNodeKey - The worker node key.
1c6fe997 675 * @param task - The task to execute.
729c563d 676 */
1c6fe997
JB
677 protected beforeTaskExecutionHook (
678 workerNodeKey: number,
679 task: Task<Data>
680 ): void {
f59e1027 681 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
682 ++workerUsage.tasks.executing
683 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 684 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
685 task.name as string
686 ) as WorkerUsage
eb8afc8a
JB
687 ++taskWorkerUsage.tasks.executing
688 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
689 }
690
c01733f1 691 /**
2e81254d 692 * Hook executed after the worker task execution.
bf9549ae 693 * Can be overridden.
c01733f1 694 *
c923ce56 695 * @param worker - The worker.
38e795c1 696 * @param message - The received message.
c01733f1 697 */
2e81254d 698 protected afterTaskExecutionHook (
c923ce56 699 worker: Worker,
2740a743 700 message: MessageValue<Response>
bf9549ae 701 ): void {
ff128cc9
JB
702 const workerNodeKey = this.getWorkerNodeKey(worker)
703 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
704 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
705 this.updateRunTimeWorkerUsage(workerUsage, message)
706 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 707 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 708 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 709 ) as WorkerUsage
eb8afc8a
JB
710 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
711 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
712 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
713 }
714
715 private updateTaskStatisticsWorkerUsage (
716 workerUsage: WorkerUsage,
717 message: MessageValue<Response>
718 ): void {
a4e07f72
JB
719 const workerTaskStatistics = workerUsage.tasks
720 --workerTaskStatistics.executing
98e72cda
JB
721 if (message.taskError == null) {
722 ++workerTaskStatistics.executed
723 } else {
a4e07f72 724 ++workerTaskStatistics.failed
2740a743 725 }
f8eb0a2a
JB
726 }
727
a4e07f72
JB
728 private updateRunTimeWorkerUsage (
729 workerUsage: WorkerUsage,
f8eb0a2a
JB
730 message: MessageValue<Response>
731 ): void {
87de9ff5
JB
732 if (
733 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 734 .aggregate
87de9ff5 735 ) {
f7510105 736 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
737 workerUsage.runTime.aggregate =
738 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
739 workerUsage.runTime.minimum = Math.min(
740 taskRunTime,
71514351 741 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
742 )
743 workerUsage.runTime.maximum = Math.max(
744 taskRunTime,
71514351 745 workerUsage.runTime?.maximum ?? -Infinity
f7510105 746 )
c6bd2650 747 if (
932fc8be
JB
748 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
749 .average &&
a4e07f72 750 workerUsage.tasks.executed !== 0
c6bd2650 751 ) {
a4e07f72 752 workerUsage.runTime.average =
98e72cda 753 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 754 }
3fa4cdd2 755 if (
932fc8be
JB
756 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
757 .median &&
d715b7bc 758 message.taskPerformance?.runTime != null
3fa4cdd2 759 ) {
a4e07f72
JB
760 workerUsage.runTime.history.push(message.taskPerformance.runTime)
761 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 762 }
3032893a 763 }
f8eb0a2a
JB
764 }
765
a4e07f72
JB
766 private updateWaitTimeWorkerUsage (
767 workerUsage: WorkerUsage,
1c6fe997 768 task: Task<Data>
f8eb0a2a 769 ): void {
1c6fe997
JB
770 const timestamp = performance.now()
771 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
772 if (
773 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 774 .aggregate
87de9ff5 775 ) {
71514351
JB
776 workerUsage.waitTime.aggregate =
777 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
778 workerUsage.waitTime.minimum = Math.min(
779 taskWaitTime,
71514351 780 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
781 )
782 workerUsage.waitTime.maximum = Math.max(
783 taskWaitTime,
71514351 784 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 785 )
09a6305f 786 if (
87de9ff5 787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 788 .waitTime.average &&
a4e07f72 789 workerUsage.tasks.executed !== 0
09a6305f 790 ) {
a4e07f72 791 workerUsage.waitTime.average =
98e72cda 792 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
793 }
794 if (
87de9ff5 795 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 796 .waitTime.median
09a6305f 797 ) {
1c6fe997 798 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 799 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 800 }
0567595a 801 }
c01733f1 802 }
803
a4e07f72 804 private updateEluWorkerUsage (
5df69fab 805 workerUsage: WorkerUsage,
62c15a68
JB
806 message: MessageValue<Response>
807 ): void {
5df69fab
JB
808 if (
809 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
810 .aggregate
811 ) {
f7510105 812 if (message.taskPerformance?.elu != null) {
71514351
JB
813 workerUsage.elu.idle.aggregate =
814 (workerUsage.elu.idle?.aggregate ?? 0) +
815 message.taskPerformance.elu.idle
816 workerUsage.elu.active.aggregate =
817 (workerUsage.elu.active?.aggregate ?? 0) +
818 message.taskPerformance.elu.active
f7510105
JB
819 if (workerUsage.elu.utilization != null) {
820 workerUsage.elu.utilization =
821 (workerUsage.elu.utilization +
822 message.taskPerformance.elu.utilization) /
823 2
824 } else {
825 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
826 }
827 workerUsage.elu.idle.minimum = Math.min(
828 message.taskPerformance.elu.idle,
71514351 829 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
830 )
831 workerUsage.elu.idle.maximum = Math.max(
832 message.taskPerformance.elu.idle,
71514351 833 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
834 )
835 workerUsage.elu.active.minimum = Math.min(
836 message.taskPerformance.elu.active,
71514351 837 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
838 )
839 workerUsage.elu.active.maximum = Math.max(
840 message.taskPerformance.elu.active,
71514351 841 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 842 )
71514351
JB
843 if (
844 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
845 .average &&
846 workerUsage.tasks.executed !== 0
847 ) {
71514351 848 workerUsage.elu.idle.average =
98e72cda 849 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 850 workerUsage.elu.active.average =
98e72cda 851 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
852 }
853 if (
854 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 855 .median
71514351
JB
856 ) {
857 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
858 workerUsage.elu.active.history.push(
859 message.taskPerformance.elu.active
860 )
861 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
862 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
863 }
62c15a68
JB
864 }
865 }
866 }
867
280c2a77 868 /**
f06e48d8 869 * Chooses a worker node for the next task.
280c2a77 870 *
6c6afb84 871 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 872 *
20dcad1a 873 * @returns The worker node key
280c2a77 874 */
6c6afb84 875 private chooseWorkerNode (): number {
930dcf12 876 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
877 const worker = this.createAndSetupDynamicWorker()
878 if (
879 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
880 ) {
881 return this.getWorkerNodeKey(worker)
882 }
17393ac8 883 }
930dcf12
JB
884 return this.workerChoiceStrategyContext.execute()
885 }
886
6c6afb84
JB
887 /**
888 * Conditions for dynamic worker creation.
889 *
890 * @returns Whether to create a dynamic worker or not.
891 */
892 private shallCreateDynamicWorker (): boolean {
930dcf12 893 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
894 }
895
280c2a77 896 /**
675bb809 897 * Sends a message to the given worker.
280c2a77 898 *
38e795c1
JB
899 * @param worker - The worker which should receive the message.
900 * @param message - The message.
280c2a77
S
901 */
902 protected abstract sendToWorker (
903 worker: Worker,
904 message: MessageValue<Data>
905 ): void
906
4a6952ff 907 /**
f06e48d8 908 * Registers a listener callback on the given worker.
4a6952ff 909 *
38e795c1
JB
910 * @param worker - The worker which should register a listener.
911 * @param listener - The message listener callback.
4a6952ff 912 */
e102732c
JB
913 private registerWorkerMessageListener<Message extends Data | Response>(
914 worker: Worker,
915 listener: (message: MessageValue<Message>) => void
916 ): void {
917 worker.on('message', listener as MessageHandler<Worker>)
918 }
c97c7edb 919
729c563d 920 /**
41344292 921 * Creates a new worker.
6c6afb84
JB
922 *
923 * @returns Newly created worker.
729c563d 924 */
280c2a77 925 protected abstract createWorker (): Worker
c97c7edb 926
729c563d 927 /**
f06e48d8 928 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 929 * Can be overridden.
729c563d 930 *
38e795c1 931 * @param worker - The newly created worker.
729c563d 932 */
6677a3d3 933 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
934 // Listen to worker messages.
935 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
936 // Send startup message to worker.
937 this.sendToWorker(worker, {
938 ready: false,
21f710aa 939 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
2431bdb4
JB
940 })
941 // Setup worker task statistics computation.
942 this.setWorkerStatistics(worker)
e102732c 943 }
c97c7edb 944
4a6952ff 945 /**
f06e48d8 946 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
947 *
948 * @returns New, completely set up worker.
949 */
950 protected createAndSetupWorker (): Worker {
bdacc2d2 951 const worker = this.createWorker()
280c2a77 952
35cf1c03 953 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 954 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
955 worker.on('error', error => {
956 if (this.emitter != null) {
957 this.emitter.emit(PoolEvents.error, error)
958 }
2431bdb4 959 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
960 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
961 this.createAndSetupDynamicWorker()
962 } else {
963 this.createAndSetupWorker()
964 }
5baee0d7 965 }
19dbc45b
JB
966 if (this.opts.enableTasksQueue === true) {
967 this.redistributeQueuedTasks(worker)
968 }
5baee0d7 969 })
a35560ba
S
970 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
971 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 972 worker.once('exit', () => {
f06e48d8 973 this.removeWorkerNode(worker)
a974afa6 974 })
280c2a77 975
f06e48d8 976 this.pushWorkerNode(worker)
280c2a77
S
977
978 this.afterWorkerSetup(worker)
979
c97c7edb
S
980 return worker
981 }
be0676b3 982
7d2e3f59
JB
983 private redistributeQueuedTasks (worker: Worker): void {
984 const workerNodeKey = this.getWorkerNodeKey(worker)
985 while (this.tasksQueueSize(workerNodeKey) > 0) {
986 let targetWorkerNodeKey: number = workerNodeKey
987 let minQueuedTasks = Infinity
988 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
989 if (
990 workerNodeId !== workerNodeKey &&
991 workerNode.usage.tasks.queued === 0
992 ) {
993 targetWorkerNodeKey = workerNodeId
994 break
995 }
996 if (
997 workerNodeId !== workerNodeKey &&
998 workerNode.usage.tasks.queued < minQueuedTasks
999 ) {
1000 minQueuedTasks = workerNode.usage.tasks.queued
1001 targetWorkerNodeKey = workerNodeId
1002 }
1003 }
1004 this.enqueueTask(
1005 targetWorkerNodeKey,
1006 this.dequeueTask(workerNodeKey) as Task<Data>
1007 )
1008 }
1009 }
1010
930dcf12
JB
1011 /**
1012 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
1013 *
1014 * @returns New, completely set up dynamic worker.
1015 */
1016 protected createAndSetupDynamicWorker (): Worker {
1017 const worker = this.createAndSetupWorker()
1018 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 1019 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
1020 if (
1021 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
1022 (message.kill != null &&
1023 ((this.opts.enableTasksQueue === false &&
f59e1027 1024 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 1025 (this.opts.enableTasksQueue === true &&
f59e1027 1026 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 1027 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
1028 ) {
1029 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
1030 void (this.destroyWorker(worker) as Promise<void>)
1031 }
1032 })
21f710aa
JB
1033 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1034 workerInfo.dynamic = true
1035 this.sendToWorker(worker, {
1036 checkAlive: true,
1037 workerId: workerInfo.id as number
1038 })
930dcf12
JB
1039 return worker
1040 }
1041
be0676b3 1042 /**
ff733df7 1043 * This function is the listener registered for each worker message.
be0676b3 1044 *
bdacc2d2 1045 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1046 */
1047 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1048 return message => {
21f710aa 1049 this.checkMessageWorkerId(message)
2431bdb4 1050 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1051 // Worker ready message received
1052 this.handleWorkerReadyMessage(message)
f59e1027 1053 } else if (message.id != null) {
a3445496 1054 // Task execution response received
6b272951
JB
1055 this.handleTaskExecutionResponse(message)
1056 }
1057 }
1058 }
1059
7c8381eb 1060 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1061 const worker = this.getWorkerById(message.workerId)
1062 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1063 message.ready as boolean
2431bdb4
JB
1064 if (this.emitter != null && this.ready) {
1065 this.emitter.emit(PoolEvents.ready, this.info)
1066 }
6b272951
JB
1067 }
1068
1069 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1070 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1071 if (promiseResponse != null) {
1072 if (message.taskError != null) {
1073 if (this.emitter != null) {
1074 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1075 }
6b272951
JB
1076 promiseResponse.reject(message.taskError.message)
1077 } else {
1078 promiseResponse.resolve(message.data as Response)
1079 }
1080 this.afterTaskExecutionHook(promiseResponse.worker, message)
1081 this.promiseResponseMap.delete(message.id as string)
1082 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1083 if (
1084 this.opts.enableTasksQueue === true &&
1085 this.tasksQueueSize(workerNodeKey) > 0
1086 ) {
1087 this.executeTask(
1088 workerNodeKey,
1089 this.dequeueTask(workerNodeKey) as Task<Data>
1090 )
be0676b3 1091 }
6b272951 1092 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1093 }
be0676b3 1094 }
7c0ba920 1095
ff733df7 1096 private checkAndEmitEvents (): void {
1f68cede 1097 if (this.emitter != null) {
ff733df7 1098 if (this.busy) {
2845f2a5 1099 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1100 }
6b27d407 1101 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1102 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1103 }
164d950a
JB
1104 }
1105 }
1106
8a1260a3
JB
1107 /**
1108 * Gets the worker information.
1109 *
1110 * @param workerNodeKey - The worker node key.
1111 */
1112 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1113 return this.workerNodes[workerNodeKey].info
1114 }
1115
a05c10de 1116 /**
f06e48d8 1117 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1118 *
38e795c1 1119 * @param worker - The worker.
f06e48d8 1120 * @returns The worker nodes length.
ea7a90d3 1121 */
f06e48d8 1122 private pushWorkerNode (worker: Worker): number {
4b628b48 1123 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1124 }
c923ce56 1125
51fe3d3c 1126 /**
f06e48d8 1127 * Removes the given worker from the pool worker nodes.
51fe3d3c 1128 *
f06e48d8 1129 * @param worker - The worker.
51fe3d3c 1130 */
416fd65c 1131 private removeWorkerNode (worker: Worker): void {
f06e48d8 1132 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1133 if (workerNodeKey !== -1) {
1134 this.workerNodes.splice(workerNodeKey, 1)
1135 this.workerChoiceStrategyContext.remove(workerNodeKey)
1136 }
51fe3d3c 1137 }
adc3c320 1138
2e81254d 1139 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1140 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1141 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1142 }
1143
f9f00b5f 1144 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1145 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1146 }
1147
416fd65c 1148 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1149 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1150 }
1151
416fd65c 1152 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1153 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1154 }
1155
416fd65c 1156 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1157 while (this.tasksQueueSize(workerNodeKey) > 0) {
1158 this.executeTask(
1159 workerNodeKey,
1160 this.dequeueTask(workerNodeKey) as Task<Data>
1161 )
ff733df7 1162 }
4b628b48 1163 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1164 }
1165
ef41a6e6
JB
1166 private flushTasksQueues (): void {
1167 for (const [workerNodeKey] of this.workerNodes.entries()) {
1168 this.flushTasksQueue(workerNodeKey)
1169 }
1170 }
b6b32453
JB
1171
1172 private setWorkerStatistics (worker: Worker): void {
1173 this.sendToWorker(worker, {
1174 statistics: {
87de9ff5
JB
1175 runTime:
1176 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1177 .runTime.aggregate,
87de9ff5 1178 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1179 .elu.aggregate
21f710aa
JB
1180 },
1181 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1182 })
1183 }
c97c7edb 1184}