feat: worker node readiness aware worker choice strategies
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
5c4d16da
JB
3import type {
4 MessageValue,
5 PromiseResponseWrapper,
6 Task
7} from '../utility-types'
bbeadd16 8import {
ff128cc9 9 DEFAULT_TASK_NAME,
bbeadd16
JB
10 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
11 EMPTY_FUNCTION,
59317253 12 isKillBehavior,
0d80593b 13 isPlainObject,
afe0d5bf
JB
14 median,
15 round
bbeadd16 16} from '../utils'
59317253 17import { KillBehaviors } from '../worker/worker-options'
c4855468 18import {
65d7a1c9 19 type IPool,
7c5a1080 20 PoolEmitter,
c4855468 21 PoolEvents,
6b27d407 22 type PoolInfo,
c4855468 23 type PoolOptions,
6b27d407
JB
24 type PoolType,
25 PoolTypes,
4b628b48 26 type TasksQueueOptions
c4855468 27} from './pool'
e102732c
JB
28import type {
29 IWorker,
4b628b48 30 IWorkerNode,
e102732c 31 MessageHandler,
8a1260a3 32 WorkerInfo,
4b628b48 33 WorkerType,
e102732c
JB
34 WorkerUsage
35} from './worker'
a35560ba 36import {
f0d7f803 37 Measurements,
a35560ba 38 WorkerChoiceStrategies,
a20f0ba5
JB
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
bdaf31cd
JB
41} from './selection-strategies/selection-strategies-types'
42import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 43import { version } from './version'
4b628b48 44import { WorkerNode } from './worker-node'
23ccf9d7 45
729c563d 46/**
ea7a90d3 47 * Base class that implements some shared logic for all poolifier pools.
729c563d 48 *
38e795c1 49 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
50 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
51 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 52 */
c97c7edb 53export abstract class AbstractPool<
f06e48d8 54 Worker extends IWorker,
d3c8a1a8
S
55 Data = unknown,
56 Response = unknown
c4855468 57> implements IPool<Worker, Data, Response> {
afc003b2 58 /** @inheritDoc */
4b628b48 59 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 60
afc003b2 61 /** @inheritDoc */
7c0ba920
JB
62 public readonly emitter?: PoolEmitter
63
be0676b3 64 /**
a3445496 65 * The execution response promise map.
be0676b3 66 *
2740a743 67 * - `key`: The message id of each submitted task.
a3445496 68 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 69 *
a3445496 70 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 71 */
c923ce56
JB
72 protected promiseResponseMap: Map<
73 string,
74 PromiseResponseWrapper<Worker, Response>
75 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 76
a35560ba 77 /**
51fe3d3c 78 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
79 */
80 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
81 Worker,
82 Data,
83 Response
a35560ba
S
84 >
85
afe0d5bf
JB
86 /**
87 * The start timestamp of the pool.
88 */
89 private readonly startTimestamp
90
729c563d
S
91 /**
92 * Constructs a new poolifier pool.
93 *
38e795c1 94 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 95 * @param filePath - Path to the worker file.
38e795c1 96 * @param opts - Options for the pool.
729c563d 97 */
c97c7edb 98 public constructor (
b4213b7f
JB
99 protected readonly numberOfWorkers: number,
100 protected readonly filePath: string,
101 protected readonly opts: PoolOptions<Worker>
c97c7edb 102 ) {
78cea37e 103 if (!this.isMain()) {
c97c7edb
S
104 throw new Error('Cannot start a pool from a worker!')
105 }
8d3782fa 106 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 107 this.checkFilePath(this.filePath)
7c0ba920 108 this.checkPoolOptions(this.opts)
1086026a 109
7254e419
JB
110 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
111 this.executeTask = this.executeTask.bind(this)
112 this.enqueueTask = this.enqueueTask.bind(this)
113 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 114
6bd72cd0 115 if (this.opts.enableEvents === true) {
7c0ba920
JB
116 this.emitter = new PoolEmitter()
117 }
d59df138
JB
118 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
119 Worker,
120 Data,
121 Response
da309861
JB
122 >(
123 this,
124 this.opts.workerChoiceStrategy,
125 this.opts.workerChoiceStrategyOptions
126 )
b6b32453
JB
127
128 this.setupHook()
129
afe0d5bf 130 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
131 this.createAndSetupWorker()
132 }
afe0d5bf
JB
133
134 this.startTimestamp = performance.now()
c97c7edb
S
135 }
136
a35560ba 137 private checkFilePath (filePath: string): void {
ffcbbad8
JB
138 if (
139 filePath == null ||
140 (typeof filePath === 'string' && filePath.trim().length === 0)
141 ) {
c510fea7
APA
142 throw new Error('Please specify a file with a worker implementation')
143 }
144 }
145
8d3782fa
JB
146 private checkNumberOfWorkers (numberOfWorkers: number): void {
147 if (numberOfWorkers == null) {
148 throw new Error(
149 'Cannot instantiate a pool without specifying the number of workers'
150 )
78cea37e 151 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 152 throw new TypeError(
0d80593b 153 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
154 )
155 } else if (numberOfWorkers < 0) {
473c717a 156 throw new RangeError(
8d3782fa
JB
157 'Cannot instantiate a pool with a negative number of workers'
158 )
6b27d407 159 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
160 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
161 }
162 }
163
164 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
165 if (this.type === PoolTypes.dynamic) {
166 if (min > max) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
169 )
170 } else if (min === 0 && max === 0) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
173 )
174 } else if (min === max) {
175 throw new RangeError(
176 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
177 )
178 }
8d3782fa
JB
179 }
180 }
181
7c0ba920 182 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
183 if (isPlainObject(opts)) {
184 this.opts.workerChoiceStrategy =
185 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
186 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
187 this.opts.workerChoiceStrategyOptions =
188 opts.workerChoiceStrategyOptions ??
189 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
190 this.checkValidWorkerChoiceStrategyOptions(
191 this.opts.workerChoiceStrategyOptions
192 )
1f68cede 193 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
194 this.opts.enableEvents = opts.enableEvents ?? true
195 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
196 if (this.opts.enableTasksQueue) {
197 this.checkValidTasksQueueOptions(
198 opts.tasksQueueOptions as TasksQueueOptions
199 )
200 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
201 opts.tasksQueueOptions as TasksQueueOptions
202 )
203 }
204 } else {
205 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 206 }
aee46736
JB
207 }
208
209 private checkValidWorkerChoiceStrategy (
210 workerChoiceStrategy: WorkerChoiceStrategy
211 ): void {
212 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 213 throw new Error(
aee46736 214 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
215 )
216 }
7c0ba920
JB
217 }
218
0d80593b
JB
219 private checkValidWorkerChoiceStrategyOptions (
220 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
221 ): void {
222 if (!isPlainObject(workerChoiceStrategyOptions)) {
223 throw new TypeError(
224 'Invalid worker choice strategy options: must be a plain object'
225 )
226 }
49be33fe
JB
227 if (
228 workerChoiceStrategyOptions.weights != null &&
6b27d407 229 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
230 ) {
231 throw new Error(
232 'Invalid worker choice strategy options: must have a weight for each worker node'
233 )
234 }
f0d7f803
JB
235 if (
236 workerChoiceStrategyOptions.measurement != null &&
237 !Object.values(Measurements).includes(
238 workerChoiceStrategyOptions.measurement
239 )
240 ) {
241 throw new Error(
242 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
243 )
244 }
0d80593b
JB
245 }
246
a20f0ba5
JB
247 private checkValidTasksQueueOptions (
248 tasksQueueOptions: TasksQueueOptions
249 ): void {
0d80593b
JB
250 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
251 throw new TypeError('Invalid tasks queue options: must be a plain object')
252 }
f0d7f803
JB
253 if (
254 tasksQueueOptions?.concurrency != null &&
255 !Number.isSafeInteger(tasksQueueOptions.concurrency)
256 ) {
257 throw new TypeError(
258 'Invalid worker tasks concurrency: must be an integer'
259 )
260 }
261 if (
262 tasksQueueOptions?.concurrency != null &&
263 tasksQueueOptions.concurrency <= 0
264 ) {
a20f0ba5 265 throw new Error(
f0d7f803 266 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
267 )
268 }
269 }
270
08f3f44c 271 /** @inheritDoc */
6b27d407
JB
272 public get info (): PoolInfo {
273 return {
23ccf9d7 274 version,
6b27d407 275 type: this.type,
184855e6 276 worker: this.worker,
2431bdb4
JB
277 ready: this.ready,
278 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
279 minSize: this.minSize,
280 maxSize: this.maxSize,
c05f0d50
JB
281 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
282 .runTime.aggregate &&
1305e9a8
JB
283 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
284 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
285 workerNodes: this.workerNodes.length,
286 idleWorkerNodes: this.workerNodes.reduce(
287 (accumulator, workerNode) =>
f59e1027 288 workerNode.usage.tasks.executing === 0
a4e07f72
JB
289 ? accumulator + 1
290 : accumulator,
6b27d407
JB
291 0
292 ),
293 busyWorkerNodes: this.workerNodes.reduce(
294 (accumulator, workerNode) =>
f59e1027 295 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
296 0
297 ),
a4e07f72 298 executedTasks: this.workerNodes.reduce(
6b27d407 299 (accumulator, workerNode) =>
f59e1027 300 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
301 0
302 ),
303 executingTasks: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
306 0
307 ),
308 queuedTasks: this.workerNodes.reduce(
df593701 309 (accumulator, workerNode) =>
f59e1027 310 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
311 0
312 ),
313 maxQueuedTasks: this.workerNodes.reduce(
314 (accumulator, workerNode) =>
b25a42cd 315 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 316 0
a4e07f72
JB
317 ),
318 failedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
f59e1027 320 accumulator + workerNode.usage.tasks.failed,
a4e07f72 321 0
1dcf8b7b
JB
322 ),
323 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
324 .runTime.aggregate && {
325 runTime: {
98e72cda
JB
326 minimum: round(
327 Math.min(
328 ...this.workerNodes.map(
329 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
330 )
1dcf8b7b
JB
331 )
332 ),
98e72cda
JB
333 maximum: round(
334 Math.max(
335 ...this.workerNodes.map(
336 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
337 )
1dcf8b7b 338 )
98e72cda
JB
339 ),
340 average: round(
341 this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
344 0
345 ) /
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.tasks?.executed ?? 0),
349 0
350 )
351 ),
352 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
353 .runTime.median && {
354 median: round(
355 median(
356 this.workerNodes.map(
357 workerNode => workerNode.usage.runTime?.median ?? 0
358 )
359 )
360 )
361 })
1dcf8b7b
JB
362 }
363 }),
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .waitTime.aggregate && {
366 waitTime: {
98e72cda
JB
367 minimum: round(
368 Math.min(
369 ...this.workerNodes.map(
370 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
371 )
1dcf8b7b
JB
372 )
373 ),
98e72cda
JB
374 maximum: round(
375 Math.max(
376 ...this.workerNodes.map(
377 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
378 )
1dcf8b7b 379 )
98e72cda
JB
380 ),
381 average: round(
382 this.workerNodes.reduce(
383 (accumulator, workerNode) =>
384 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
385 0
386 ) /
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.tasks?.executed ?? 0),
390 0
391 )
392 ),
393 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
394 .waitTime.median && {
395 median: round(
396 median(
397 this.workerNodes.map(
398 workerNode => workerNode.usage.waitTime?.median ?? 0
399 )
400 )
401 )
402 })
1dcf8b7b
JB
403 }
404 })
6b27d407
JB
405 }
406 }
08f3f44c 407
2431bdb4
JB
408 private get starting (): boolean {
409 return (
d5024c00
JB
410 this.workerNodes.length < this.minSize ||
411 (this.workerNodes.length >= this.minSize &&
412 this.workerNodes.some(workerNode => !workerNode.info.ready))
2431bdb4
JB
413 )
414 }
415
416 private get ready (): boolean {
417 return (
d5024c00
JB
418 this.workerNodes.length >= this.minSize &&
419 this.workerNodes.every(workerNode => workerNode.info.ready)
2431bdb4
JB
420 )
421 }
422
afe0d5bf
JB
423 /**
424 * Gets the approximate pool utilization.
425 *
426 * @returns The pool utilization.
427 */
428 private get utilization (): number {
fe7d90db
JB
429 const poolRunTimeCapacity =
430 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
431 const totalTasksRunTime = this.workerNodes.reduce(
432 (accumulator, workerNode) =>
71514351 433 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
434 0
435 )
436 const totalTasksWaitTime = this.workerNodes.reduce(
437 (accumulator, workerNode) =>
71514351 438 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
439 0
440 )
441 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
442 }
443
8881ae32
JB
444 /**
445 * Pool type.
446 *
447 * If it is `'dynamic'`, it provides the `max` property.
448 */
449 protected abstract get type (): PoolType
450
184855e6
JB
451 /**
452 * Gets the worker type.
453 */
454 protected abstract get worker (): WorkerType
455
c2ade475 456 /**
6b27d407 457 * Pool minimum size.
c2ade475 458 */
6b27d407 459 protected abstract get minSize (): number
ff733df7
JB
460
461 /**
6b27d407 462 * Pool maximum size.
ff733df7 463 */
6b27d407 464 protected abstract get maxSize (): number
a35560ba 465
f59e1027
JB
466 /**
467 * Get the worker given its id.
468 *
469 * @param workerId - The worker id.
470 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
471 */
472 private getWorkerById (workerId: number): Worker | undefined {
473 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
474 ?.worker
475 }
476
21f710aa
JB
477 private checkMessageWorkerId (message: MessageValue<Response>): void {
478 if (
479 message.workerId != null &&
480 this.getWorkerById(message.workerId) == null
481 ) {
482 throw new Error(
483 `Worker message received from unknown worker '${message.workerId}'`
484 )
485 }
486 }
487
ffcbbad8 488 /**
f06e48d8 489 * Gets the given worker its worker node key.
ffcbbad8
JB
490 *
491 * @param worker - The worker.
f59e1027 492 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 493 */
f06e48d8
JB
494 private getWorkerNodeKey (worker: Worker): number {
495 return this.workerNodes.findIndex(
496 workerNode => workerNode.worker === worker
497 )
bf9549ae
JB
498 }
499
afc003b2 500 /** @inheritDoc */
a35560ba 501 public setWorkerChoiceStrategy (
59219cbb
JB
502 workerChoiceStrategy: WorkerChoiceStrategy,
503 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 504 ): void {
aee46736 505 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 506 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
507 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
508 this.opts.workerChoiceStrategy
509 )
510 if (workerChoiceStrategyOptions != null) {
511 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
512 }
8a1260a3 513 for (const workerNode of this.workerNodes) {
4b628b48 514 workerNode.resetUsage()
b6b32453 515 this.setWorkerStatistics(workerNode.worker)
59219cbb 516 }
a20f0ba5
JB
517 }
518
519 /** @inheritDoc */
520 public setWorkerChoiceStrategyOptions (
521 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
522 ): void {
0d80593b 523 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
524 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
525 this.workerChoiceStrategyContext.setOptions(
526 this.opts.workerChoiceStrategyOptions
a35560ba
S
527 )
528 }
529
a20f0ba5 530 /** @inheritDoc */
8f52842f
JB
531 public enableTasksQueue (
532 enable: boolean,
533 tasksQueueOptions?: TasksQueueOptions
534 ): void {
a20f0ba5 535 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 536 this.flushTasksQueues()
a20f0ba5
JB
537 }
538 this.opts.enableTasksQueue = enable
8f52842f 539 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
540 }
541
542 /** @inheritDoc */
8f52842f 543 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 544 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
545 this.checkValidTasksQueueOptions(tasksQueueOptions)
546 this.opts.tasksQueueOptions =
547 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 548 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
549 delete this.opts.tasksQueueOptions
550 }
551 }
552
553 private buildTasksQueueOptions (
554 tasksQueueOptions: TasksQueueOptions
555 ): TasksQueueOptions {
556 return {
557 concurrency: tasksQueueOptions?.concurrency ?? 1
558 }
559 }
560
c319c66b
JB
561 /**
562 * Whether the pool is full or not.
563 *
564 * The pool filling boolean status.
565 */
dea903a8
JB
566 protected get full (): boolean {
567 return this.workerNodes.length >= this.maxSize
568 }
c2ade475 569
c319c66b
JB
570 /**
571 * Whether the pool is busy or not.
572 *
573 * The pool busyness boolean status.
574 */
575 protected abstract get busy (): boolean
7c0ba920 576
6c6afb84
JB
577 /**
578 * Whether worker nodes are executing at least one task.
579 *
580 * @returns Worker nodes busyness boolean status.
581 */
c2ade475 582 protected internalBusy (): boolean {
e0ae6100
JB
583 return (
584 this.workerNodes.findIndex(workerNode => {
f59e1027 585 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
586 }) === -1
587 )
cb70b19d
JB
588 }
589
afc003b2 590 /** @inheritDoc */
a86b6df1 591 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 592 const timestamp = performance.now()
20dcad1a 593 const workerNodeKey = this.chooseWorkerNode()
adc3c320 594 const submittedTask: Task<Data> = {
ff128cc9 595 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
596 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
597 data: data ?? ({} as Data),
b6b32453 598 timestamp,
21f710aa 599 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 600 id: randomUUID()
adc3c320 601 }
2e81254d 602 const res = new Promise<Response>((resolve, reject) => {
02706357 603 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
604 resolve,
605 reject,
20dcad1a 606 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
607 })
608 })
ff733df7
JB
609 if (
610 this.opts.enableTasksQueue === true &&
7171d33f 611 (this.busy ||
f59e1027 612 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 613 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 614 .concurrency as number))
ff733df7 615 ) {
26a929d7
JB
616 this.enqueueTask(workerNodeKey, submittedTask)
617 } else {
2e81254d 618 this.executeTask(workerNodeKey, submittedTask)
adc3c320 619 }
ff733df7 620 this.checkAndEmitEvents()
78cea37e 621 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
622 return res
623 }
c97c7edb 624
afc003b2 625 /** @inheritDoc */
c97c7edb 626 public async destroy (): Promise<void> {
1fbcaa7c 627 await Promise.all(
875a7c37
JB
628 this.workerNodes.map(async (workerNode, workerNodeKey) => {
629 this.flushTasksQueue(workerNodeKey)
47aacbaa 630 // FIXME: wait for tasks to be finished
920278a2
JB
631 const workerExitPromise = new Promise<void>(resolve => {
632 workerNode.worker.on('exit', () => {
633 resolve()
634 })
635 })
f06e48d8 636 await this.destroyWorker(workerNode.worker)
920278a2 637 await workerExitPromise
1fbcaa7c
JB
638 })
639 )
c97c7edb
S
640 }
641
4a6952ff 642 /**
6c6afb84 643 * Terminates the given worker.
4a6952ff 644 *
f06e48d8 645 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
646 */
647 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 648
729c563d 649 /**
6677a3d3
JB
650 * Setup hook to execute code before worker nodes are created in the abstract constructor.
651 * Can be overridden.
afc003b2
JB
652 *
653 * @virtual
729c563d 654 */
280c2a77 655 protected setupHook (): void {
d99ba5a8 656 // Intentionally empty
280c2a77 657 }
c97c7edb 658
729c563d 659 /**
280c2a77
S
660 * Should return whether the worker is the main worker or not.
661 */
662 protected abstract isMain (): boolean
663
664 /**
2e81254d 665 * Hook executed before the worker task execution.
bf9549ae 666 * Can be overridden.
729c563d 667 *
f06e48d8 668 * @param workerNodeKey - The worker node key.
1c6fe997 669 * @param task - The task to execute.
729c563d 670 */
1c6fe997
JB
671 protected beforeTaskExecutionHook (
672 workerNodeKey: number,
673 task: Task<Data>
674 ): void {
f59e1027 675 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
676 ++workerUsage.tasks.executing
677 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 678 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
679 task.name as string
680 ) as WorkerUsage
eb8afc8a
JB
681 ++taskWorkerUsage.tasks.executing
682 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
683 }
684
c01733f1 685 /**
2e81254d 686 * Hook executed after the worker task execution.
bf9549ae 687 * Can be overridden.
c01733f1 688 *
c923ce56 689 * @param worker - The worker.
38e795c1 690 * @param message - The received message.
c01733f1 691 */
2e81254d 692 protected afterTaskExecutionHook (
c923ce56 693 worker: Worker,
2740a743 694 message: MessageValue<Response>
bf9549ae 695 ): void {
ff128cc9
JB
696 const workerNodeKey = this.getWorkerNodeKey(worker)
697 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
698 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
699 this.updateRunTimeWorkerUsage(workerUsage, message)
700 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 701 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 702 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 703 ) as WorkerUsage
eb8afc8a
JB
704 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
705 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
706 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
707 }
708
709 private updateTaskStatisticsWorkerUsage (
710 workerUsage: WorkerUsage,
711 message: MessageValue<Response>
712 ): void {
a4e07f72
JB
713 const workerTaskStatistics = workerUsage.tasks
714 --workerTaskStatistics.executing
98e72cda
JB
715 if (message.taskError == null) {
716 ++workerTaskStatistics.executed
717 } else {
a4e07f72 718 ++workerTaskStatistics.failed
2740a743 719 }
f8eb0a2a
JB
720 }
721
a4e07f72
JB
722 private updateRunTimeWorkerUsage (
723 workerUsage: WorkerUsage,
f8eb0a2a
JB
724 message: MessageValue<Response>
725 ): void {
87de9ff5
JB
726 if (
727 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 728 .aggregate
87de9ff5 729 ) {
f7510105 730 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
731 workerUsage.runTime.aggregate =
732 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
733 workerUsage.runTime.minimum = Math.min(
734 taskRunTime,
71514351 735 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
736 )
737 workerUsage.runTime.maximum = Math.max(
738 taskRunTime,
71514351 739 workerUsage.runTime?.maximum ?? -Infinity
f7510105 740 )
c6bd2650 741 if (
932fc8be
JB
742 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
743 .average &&
a4e07f72 744 workerUsage.tasks.executed !== 0
c6bd2650 745 ) {
a4e07f72 746 workerUsage.runTime.average =
98e72cda 747 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 748 }
3fa4cdd2 749 if (
932fc8be
JB
750 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
751 .median &&
d715b7bc 752 message.taskPerformance?.runTime != null
3fa4cdd2 753 ) {
a4e07f72
JB
754 workerUsage.runTime.history.push(message.taskPerformance.runTime)
755 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 756 }
3032893a 757 }
f8eb0a2a
JB
758 }
759
a4e07f72
JB
760 private updateWaitTimeWorkerUsage (
761 workerUsage: WorkerUsage,
1c6fe997 762 task: Task<Data>
f8eb0a2a 763 ): void {
1c6fe997
JB
764 const timestamp = performance.now()
765 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
766 if (
767 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 768 .aggregate
87de9ff5 769 ) {
71514351
JB
770 workerUsage.waitTime.aggregate =
771 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
772 workerUsage.waitTime.minimum = Math.min(
773 taskWaitTime,
71514351 774 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
775 )
776 workerUsage.waitTime.maximum = Math.max(
777 taskWaitTime,
71514351 778 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 779 )
09a6305f 780 if (
87de9ff5 781 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 782 .waitTime.average &&
a4e07f72 783 workerUsage.tasks.executed !== 0
09a6305f 784 ) {
a4e07f72 785 workerUsage.waitTime.average =
98e72cda 786 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
787 }
788 if (
87de9ff5 789 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 790 .waitTime.median
09a6305f 791 ) {
1c6fe997 792 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 793 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 794 }
0567595a 795 }
c01733f1 796 }
797
a4e07f72 798 private updateEluWorkerUsage (
5df69fab 799 workerUsage: WorkerUsage,
62c15a68
JB
800 message: MessageValue<Response>
801 ): void {
5df69fab
JB
802 if (
803 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
804 .aggregate
805 ) {
f7510105 806 if (message.taskPerformance?.elu != null) {
71514351
JB
807 workerUsage.elu.idle.aggregate =
808 (workerUsage.elu.idle?.aggregate ?? 0) +
809 message.taskPerformance.elu.idle
810 workerUsage.elu.active.aggregate =
811 (workerUsage.elu.active?.aggregate ?? 0) +
812 message.taskPerformance.elu.active
f7510105
JB
813 if (workerUsage.elu.utilization != null) {
814 workerUsage.elu.utilization =
815 (workerUsage.elu.utilization +
816 message.taskPerformance.elu.utilization) /
817 2
818 } else {
819 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
820 }
821 workerUsage.elu.idle.minimum = Math.min(
822 message.taskPerformance.elu.idle,
71514351 823 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
824 )
825 workerUsage.elu.idle.maximum = Math.max(
826 message.taskPerformance.elu.idle,
71514351 827 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
828 )
829 workerUsage.elu.active.minimum = Math.min(
830 message.taskPerformance.elu.active,
71514351 831 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
832 )
833 workerUsage.elu.active.maximum = Math.max(
834 message.taskPerformance.elu.active,
71514351 835 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 836 )
71514351
JB
837 if (
838 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
839 .average &&
840 workerUsage.tasks.executed !== 0
841 ) {
71514351 842 workerUsage.elu.idle.average =
98e72cda 843 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 844 workerUsage.elu.active.average =
98e72cda 845 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
846 }
847 if (
848 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 849 .median
71514351
JB
850 ) {
851 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
852 workerUsage.elu.active.history.push(
853 message.taskPerformance.elu.active
854 )
855 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
856 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
857 }
62c15a68
JB
858 }
859 }
860 }
861
280c2a77 862 /**
f06e48d8 863 * Chooses a worker node for the next task.
280c2a77 864 *
6c6afb84 865 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 866 *
20dcad1a 867 * @returns The worker node key
280c2a77 868 */
6c6afb84 869 private chooseWorkerNode (): number {
930dcf12 870 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
871 const worker = this.createAndSetupDynamicWorker()
872 if (
873 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
874 ) {
875 return this.getWorkerNodeKey(worker)
876 }
17393ac8 877 }
930dcf12
JB
878 return this.workerChoiceStrategyContext.execute()
879 }
880
6c6afb84
JB
881 /**
882 * Conditions for dynamic worker creation.
883 *
884 * @returns Whether to create a dynamic worker or not.
885 */
886 private shallCreateDynamicWorker (): boolean {
930dcf12 887 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
888 }
889
280c2a77 890 /**
675bb809 891 * Sends a message to the given worker.
280c2a77 892 *
38e795c1
JB
893 * @param worker - The worker which should receive the message.
894 * @param message - The message.
280c2a77
S
895 */
896 protected abstract sendToWorker (
897 worker: Worker,
898 message: MessageValue<Data>
899 ): void
900
4a6952ff 901 /**
f06e48d8 902 * Registers a listener callback on the given worker.
4a6952ff 903 *
38e795c1
JB
904 * @param worker - The worker which should register a listener.
905 * @param listener - The message listener callback.
4a6952ff 906 */
e102732c
JB
907 private registerWorkerMessageListener<Message extends Data | Response>(
908 worker: Worker,
909 listener: (message: MessageValue<Message>) => void
910 ): void {
911 worker.on('message', listener as MessageHandler<Worker>)
912 }
c97c7edb 913
729c563d 914 /**
41344292 915 * Creates a new worker.
6c6afb84
JB
916 *
917 * @returns Newly created worker.
729c563d 918 */
280c2a77 919 protected abstract createWorker (): Worker
c97c7edb 920
729c563d 921 /**
f06e48d8 922 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 923 * Can be overridden.
729c563d 924 *
38e795c1 925 * @param worker - The newly created worker.
729c563d 926 */
6677a3d3 927 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
928 // Listen to worker messages.
929 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
930 // Send startup message to worker.
931 this.sendToWorker(worker, {
932 ready: false,
21f710aa 933 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
2431bdb4
JB
934 })
935 // Setup worker task statistics computation.
936 this.setWorkerStatistics(worker)
e102732c 937 }
c97c7edb 938
4a6952ff 939 /**
f06e48d8 940 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
941 *
942 * @returns New, completely set up worker.
943 */
944 protected createAndSetupWorker (): Worker {
bdacc2d2 945 const worker = this.createWorker()
280c2a77 946
35cf1c03 947 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 948 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
949 worker.on('error', error => {
950 if (this.emitter != null) {
951 this.emitter.emit(PoolEvents.error, error)
952 }
2431bdb4 953 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
954 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
955 this.createAndSetupDynamicWorker()
956 } else {
957 this.createAndSetupWorker()
958 }
5baee0d7 959 }
19dbc45b
JB
960 if (this.opts.enableTasksQueue === true) {
961 this.redistributeQueuedTasks(worker)
962 }
5baee0d7 963 })
a35560ba
S
964 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
965 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 966 worker.once('exit', () => {
f06e48d8 967 this.removeWorkerNode(worker)
a974afa6 968 })
280c2a77 969
f06e48d8 970 this.pushWorkerNode(worker)
280c2a77
S
971
972 this.afterWorkerSetup(worker)
973
c97c7edb
S
974 return worker
975 }
be0676b3 976
7d2e3f59
JB
977 private redistributeQueuedTasks (worker: Worker): void {
978 const workerNodeKey = this.getWorkerNodeKey(worker)
979 while (this.tasksQueueSize(workerNodeKey) > 0) {
980 let targetWorkerNodeKey: number = workerNodeKey
981 let minQueuedTasks = Infinity
982 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
983 if (
984 workerNodeId !== workerNodeKey &&
985 workerNode.usage.tasks.queued === 0
986 ) {
987 targetWorkerNodeKey = workerNodeId
988 break
989 }
990 if (
991 workerNodeId !== workerNodeKey &&
992 workerNode.usage.tasks.queued < minQueuedTasks
993 ) {
994 minQueuedTasks = workerNode.usage.tasks.queued
995 targetWorkerNodeKey = workerNodeId
996 }
997 }
998 this.enqueueTask(
999 targetWorkerNodeKey,
1000 this.dequeueTask(workerNodeKey) as Task<Data>
1001 )
1002 }
1003 }
1004
930dcf12
JB
1005 /**
1006 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
1007 *
1008 * @returns New, completely set up dynamic worker.
1009 */
1010 protected createAndSetupDynamicWorker (): Worker {
1011 const worker = this.createAndSetupWorker()
1012 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 1013 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
1014 if (
1015 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
1016 (message.kill != null &&
1017 ((this.opts.enableTasksQueue === false &&
f59e1027 1018 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 1019 (this.opts.enableTasksQueue === true &&
f59e1027 1020 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 1021 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
1022 ) {
1023 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
1024 void (this.destroyWorker(worker) as Promise<void>)
1025 }
1026 })
21f710aa
JB
1027 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1028 workerInfo.dynamic = true
1029 this.sendToWorker(worker, {
1030 checkAlive: true,
1031 workerId: workerInfo.id as number
1032 })
930dcf12
JB
1033 return worker
1034 }
1035
be0676b3 1036 /**
ff733df7 1037 * This function is the listener registered for each worker message.
be0676b3 1038 *
bdacc2d2 1039 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1040 */
1041 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1042 return message => {
21f710aa 1043 this.checkMessageWorkerId(message)
2431bdb4 1044 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1045 // Worker ready message received
1046 this.handleWorkerReadyMessage(message)
f59e1027 1047 } else if (message.id != null) {
a3445496 1048 // Task execution response received
6b272951
JB
1049 this.handleTaskExecutionResponse(message)
1050 }
1051 }
1052 }
1053
7c8381eb 1054 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1055 const worker = this.getWorkerById(message.workerId)
1056 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1057 message.ready as boolean
2431bdb4
JB
1058 if (this.emitter != null && this.ready) {
1059 this.emitter.emit(PoolEvents.ready, this.info)
1060 }
6b272951
JB
1061 }
1062
1063 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1064 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1065 if (promiseResponse != null) {
1066 if (message.taskError != null) {
1067 if (this.emitter != null) {
1068 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1069 }
6b272951
JB
1070 promiseResponse.reject(message.taskError.message)
1071 } else {
1072 promiseResponse.resolve(message.data as Response)
1073 }
1074 this.afterTaskExecutionHook(promiseResponse.worker, message)
1075 this.promiseResponseMap.delete(message.id as string)
1076 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1077 if (
1078 this.opts.enableTasksQueue === true &&
1079 this.tasksQueueSize(workerNodeKey) > 0
1080 ) {
1081 this.executeTask(
1082 workerNodeKey,
1083 this.dequeueTask(workerNodeKey) as Task<Data>
1084 )
be0676b3 1085 }
6b272951 1086 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1087 }
be0676b3 1088 }
7c0ba920 1089
ff733df7 1090 private checkAndEmitEvents (): void {
1f68cede 1091 if (this.emitter != null) {
ff733df7 1092 if (this.busy) {
2845f2a5 1093 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1094 }
6b27d407 1095 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1096 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1097 }
164d950a
JB
1098 }
1099 }
1100
8a1260a3
JB
1101 /**
1102 * Gets the worker information.
1103 *
1104 * @param workerNodeKey - The worker node key.
1105 */
1106 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1107 return this.workerNodes[workerNodeKey].info
1108 }
1109
a05c10de 1110 /**
f06e48d8 1111 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1112 *
38e795c1 1113 * @param worker - The worker.
f06e48d8 1114 * @returns The worker nodes length.
ea7a90d3 1115 */
f06e48d8 1116 private pushWorkerNode (worker: Worker): number {
4b628b48 1117 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1118 }
c923ce56 1119
51fe3d3c 1120 /**
f06e48d8 1121 * Removes the given worker from the pool worker nodes.
51fe3d3c 1122 *
f06e48d8 1123 * @param worker - The worker.
51fe3d3c 1124 */
416fd65c 1125 private removeWorkerNode (worker: Worker): void {
f06e48d8 1126 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1127 if (workerNodeKey !== -1) {
1128 this.workerNodes.splice(workerNodeKey, 1)
1129 this.workerChoiceStrategyContext.remove(workerNodeKey)
1130 }
51fe3d3c 1131 }
adc3c320 1132
2e81254d 1133 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1134 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1135 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1136 }
1137
f9f00b5f 1138 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1139 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1140 }
1141
416fd65c 1142 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1143 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1144 }
1145
416fd65c 1146 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1147 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1148 }
1149
416fd65c 1150 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1151 while (this.tasksQueueSize(workerNodeKey) > 0) {
1152 this.executeTask(
1153 workerNodeKey,
1154 this.dequeueTask(workerNodeKey) as Task<Data>
1155 )
ff733df7 1156 }
4b628b48 1157 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1158 }
1159
ef41a6e6
JB
1160 private flushTasksQueues (): void {
1161 for (const [workerNodeKey] of this.workerNodes.entries()) {
1162 this.flushTasksQueue(workerNodeKey)
1163 }
1164 }
b6b32453
JB
1165
1166 private setWorkerStatistics (worker: Worker): void {
1167 this.sendToWorker(worker, {
1168 statistics: {
87de9ff5
JB
1169 runTime:
1170 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1171 .runTime.aggregate,
87de9ff5 1172 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1173 .elu.aggregate
21f710aa
JB
1174 },
1175 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1176 })
1177 }
c97c7edb 1178}