build(deps-dev): apply updates
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf
JB
15 median,
16 round
bbeadd16 17} from '../utils'
59317253 18import { KillBehaviors } from '../worker/worker-options'
c4855468 19import {
65d7a1c9 20 type IPool,
7c5a1080 21 PoolEmitter,
c4855468 22 PoolEvents,
6b27d407 23 type PoolInfo,
c4855468 24 type PoolOptions,
6b27d407
JB
25 type PoolType,
26 PoolTypes,
4b628b48 27 type TasksQueueOptions
c4855468 28} from './pool'
e102732c
JB
29import type {
30 IWorker,
4b628b48 31 IWorkerNode,
e102732c 32 MessageHandler,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
f0d7f803 38 Measurements,
a35560ba 39 WorkerChoiceStrategies,
a20f0ba5
JB
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
bdaf31cd
JB
42} from './selection-strategies/selection-strategies-types'
43import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 44import { version } from './version'
4b628b48 45import { WorkerNode } from './worker-node'
23ccf9d7 46
729c563d 47/**
ea7a90d3 48 * Base class that implements some shared logic for all poolifier pools.
729c563d 49 *
38e795c1 50 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 53 */
c97c7edb 54export abstract class AbstractPool<
f06e48d8 55 Worker extends IWorker,
d3c8a1a8
S
56 Data = unknown,
57 Response = unknown
c4855468 58> implements IPool<Worker, Data, Response> {
afc003b2 59 /** @inheritDoc */
4b628b48 60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 61
afc003b2 62 /** @inheritDoc */
7c0ba920
JB
63 public readonly emitter?: PoolEmitter
64
be0676b3 65 /**
a3445496 66 * The execution response promise map.
be0676b3 67 *
2740a743 68 * - `key`: The message id of each submitted task.
a3445496 69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 70 *
a3445496 71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 72 */
c923ce56
JB
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
afe0d5bf
JB
87 /**
88 * The start timestamp of the pool.
89 */
90 private readonly startTimestamp
91
729c563d
S
92 /**
93 * Constructs a new poolifier pool.
94 *
38e795c1 95 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 96 * @param filePath - Path to the worker file.
38e795c1 97 * @param opts - Options for the pool.
729c563d 98 */
c97c7edb 99 public constructor (
b4213b7f
JB
100 protected readonly numberOfWorkers: number,
101 protected readonly filePath: string,
102 protected readonly opts: PoolOptions<Worker>
c97c7edb 103 ) {
78cea37e 104 if (!this.isMain()) {
c97c7edb
S
105 throw new Error('Cannot start a pool from a worker!')
106 }
8d3782fa 107 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 108 this.checkFilePath(this.filePath)
7c0ba920 109 this.checkPoolOptions(this.opts)
1086026a 110
7254e419
JB
111 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
112 this.executeTask = this.executeTask.bind(this)
113 this.enqueueTask = this.enqueueTask.bind(this)
114 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 115
6bd72cd0 116 if (this.opts.enableEvents === true) {
7c0ba920
JB
117 this.emitter = new PoolEmitter()
118 }
d59df138
JB
119 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
120 Worker,
121 Data,
122 Response
da309861
JB
123 >(
124 this,
125 this.opts.workerChoiceStrategy,
126 this.opts.workerChoiceStrategyOptions
127 )
b6b32453
JB
128
129 this.setupHook()
130
afe0d5bf 131 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
132 this.createAndSetupWorker()
133 }
afe0d5bf
JB
134
135 this.startTimestamp = performance.now()
c97c7edb
S
136 }
137
a35560ba 138 private checkFilePath (filePath: string): void {
ffcbbad8
JB
139 if (
140 filePath == null ||
3d6dd312 141 typeof filePath !== 'string' ||
ffcbbad8
JB
142 (typeof filePath === 'string' && filePath.trim().length === 0)
143 ) {
c510fea7
APA
144 throw new Error('Please specify a file with a worker implementation')
145 }
3d6dd312
JB
146 if (!existsSync(filePath)) {
147 throw new Error(`Cannot find the worker file '${filePath}'`)
148 }
c510fea7
APA
149 }
150
8d3782fa
JB
151 private checkNumberOfWorkers (numberOfWorkers: number): void {
152 if (numberOfWorkers == null) {
153 throw new Error(
154 'Cannot instantiate a pool without specifying the number of workers'
155 )
78cea37e 156 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 157 throw new TypeError(
0d80593b 158 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
159 )
160 } else if (numberOfWorkers < 0) {
473c717a 161 throw new RangeError(
8d3782fa
JB
162 'Cannot instantiate a pool with a negative number of workers'
163 )
6b27d407 164 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
165 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
166 }
167 }
168
169 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
170 if (this.type === PoolTypes.dynamic) {
171 if (min > max) {
172 throw new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 } else if (min === 0 && max === 0) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
178 )
179 } else if (min === max) {
180 throw new RangeError(
181 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
182 )
183 }
8d3782fa
JB
184 }
185 }
186
7c0ba920 187 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
188 if (isPlainObject(opts)) {
189 this.opts.workerChoiceStrategy =
190 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
191 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
192 this.opts.workerChoiceStrategyOptions =
193 opts.workerChoiceStrategyOptions ??
194 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
195 this.checkValidWorkerChoiceStrategyOptions(
196 this.opts.workerChoiceStrategyOptions
197 )
1f68cede 198 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
199 this.opts.enableEvents = opts.enableEvents ?? true
200 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
201 if (this.opts.enableTasksQueue) {
202 this.checkValidTasksQueueOptions(
203 opts.tasksQueueOptions as TasksQueueOptions
204 )
205 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 }
209 } else {
210 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 211 }
aee46736
JB
212 }
213
214 private checkValidWorkerChoiceStrategy (
215 workerChoiceStrategy: WorkerChoiceStrategy
216 ): void {
217 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 218 throw new Error(
aee46736 219 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
220 )
221 }
7c0ba920
JB
222 }
223
0d80593b
JB
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
226 ): void {
227 if (!isPlainObject(workerChoiceStrategyOptions)) {
228 throw new TypeError(
229 'Invalid worker choice strategy options: must be a plain object'
230 )
231 }
49be33fe
JB
232 if (
233 workerChoiceStrategyOptions.weights != null &&
6b27d407 234 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
235 ) {
236 throw new Error(
237 'Invalid worker choice strategy options: must have a weight for each worker node'
238 )
239 }
f0d7f803
JB
240 if (
241 workerChoiceStrategyOptions.measurement != null &&
242 !Object.values(Measurements).includes(
243 workerChoiceStrategyOptions.measurement
244 )
245 ) {
246 throw new Error(
247 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
248 )
249 }
0d80593b
JB
250 }
251
a20f0ba5
JB
252 private checkValidTasksQueueOptions (
253 tasksQueueOptions: TasksQueueOptions
254 ): void {
0d80593b
JB
255 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
256 throw new TypeError('Invalid tasks queue options: must be a plain object')
257 }
f0d7f803
JB
258 if (
259 tasksQueueOptions?.concurrency != null &&
260 !Number.isSafeInteger(tasksQueueOptions.concurrency)
261 ) {
262 throw new TypeError(
263 'Invalid worker tasks concurrency: must be an integer'
264 )
265 }
266 if (
267 tasksQueueOptions?.concurrency != null &&
268 tasksQueueOptions.concurrency <= 0
269 ) {
a20f0ba5 270 throw new Error(
f0d7f803 271 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
272 )
273 }
274 }
275
08f3f44c 276 /** @inheritDoc */
6b27d407
JB
277 public get info (): PoolInfo {
278 return {
23ccf9d7 279 version,
6b27d407 280 type: this.type,
184855e6 281 worker: this.worker,
2431bdb4
JB
282 ready: this.ready,
283 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
284 minSize: this.minSize,
285 maxSize: this.maxSize,
c05f0d50
JB
286 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
287 .runTime.aggregate &&
1305e9a8
JB
288 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
289 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
290 workerNodes: this.workerNodes.length,
291 idleWorkerNodes: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
f59e1027 293 workerNode.usage.tasks.executing === 0
a4e07f72
JB
294 ? accumulator + 1
295 : accumulator,
6b27d407
JB
296 0
297 ),
298 busyWorkerNodes: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
f59e1027 300 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
301 0
302 ),
a4e07f72 303 executedTasks: this.workerNodes.reduce(
6b27d407 304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
306 0
307 ),
308 executingTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
311 0
312 ),
313 queuedTasks: this.workerNodes.reduce(
df593701 314 (accumulator, workerNode) =>
f59e1027 315 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
316 0
317 ),
318 maxQueuedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
b25a42cd 320 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 321 0
a4e07f72
JB
322 ),
323 failedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
f59e1027 325 accumulator + workerNode.usage.tasks.failed,
a4e07f72 326 0
1dcf8b7b
JB
327 ),
328 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
329 .runTime.aggregate && {
330 runTime: {
98e72cda
JB
331 minimum: round(
332 Math.min(
333 ...this.workerNodes.map(
334 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
335 )
1dcf8b7b
JB
336 )
337 ),
98e72cda
JB
338 maximum: round(
339 Math.max(
340 ...this.workerNodes.map(
341 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
342 )
1dcf8b7b 343 )
98e72cda
JB
344 ),
345 average: round(
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
349 0
350 ) /
351 this.workerNodes.reduce(
352 (accumulator, workerNode) =>
353 accumulator + (workerNode.usage.tasks?.executed ?? 0),
354 0
355 )
356 ),
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.median && {
359 median: round(
360 median(
361 this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.median ?? 0
363 )
364 )
365 )
366 })
1dcf8b7b
JB
367 }
368 }),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .waitTime.aggregate && {
371 waitTime: {
98e72cda
JB
372 minimum: round(
373 Math.min(
374 ...this.workerNodes.map(
375 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
376 )
1dcf8b7b
JB
377 )
378 ),
98e72cda
JB
379 maximum: round(
380 Math.max(
381 ...this.workerNodes.map(
382 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
383 )
1dcf8b7b 384 )
98e72cda
JB
385 ),
386 average: round(
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
390 0
391 ) /
392 this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.executed ?? 0),
395 0
396 )
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.median && {
400 median: round(
401 median(
402 this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.median ?? 0
404 )
405 )
406 )
407 })
1dcf8b7b
JB
408 }
409 })
6b27d407
JB
410 }
411 }
08f3f44c 412
2431bdb4
JB
413 private get starting (): boolean {
414 return (
d5024c00
JB
415 this.workerNodes.length < this.minSize ||
416 (this.workerNodes.length >= this.minSize &&
417 this.workerNodes.some(workerNode => !workerNode.info.ready))
2431bdb4
JB
418 )
419 }
420
421 private get ready (): boolean {
422 return (
d5024c00
JB
423 this.workerNodes.length >= this.minSize &&
424 this.workerNodes.every(workerNode => workerNode.info.ready)
2431bdb4
JB
425 )
426 }
427
afe0d5bf
JB
428 /**
429 * Gets the approximate pool utilization.
430 *
431 * @returns The pool utilization.
432 */
433 private get utilization (): number {
8e5ca040 434 const poolTimeCapacity =
fe7d90db 435 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
436 const totalTasksRunTime = this.workerNodes.reduce(
437 (accumulator, workerNode) =>
71514351 438 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
439 0
440 )
441 const totalTasksWaitTime = this.workerNodes.reduce(
442 (accumulator, workerNode) =>
71514351 443 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
444 0
445 )
8e5ca040 446 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
447 }
448
8881ae32
JB
449 /**
450 * Pool type.
451 *
452 * If it is `'dynamic'`, it provides the `max` property.
453 */
454 protected abstract get type (): PoolType
455
184855e6
JB
456 /**
457 * Gets the worker type.
458 */
459 protected abstract get worker (): WorkerType
460
c2ade475 461 /**
6b27d407 462 * Pool minimum size.
c2ade475 463 */
6b27d407 464 protected abstract get minSize (): number
ff733df7
JB
465
466 /**
6b27d407 467 * Pool maximum size.
ff733df7 468 */
6b27d407 469 protected abstract get maxSize (): number
a35560ba 470
f59e1027
JB
471 /**
472 * Get the worker given its id.
473 *
474 * @param workerId - The worker id.
475 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
476 */
477 private getWorkerById (workerId: number): Worker | undefined {
478 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
479 ?.worker
480 }
481
6b813701
JB
482 /**
483 * Checks if the worker id sent in the received message from a worker is valid.
484 *
485 * @param message - The received message.
486 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
487 */
21f710aa
JB
488 private checkMessageWorkerId (message: MessageValue<Response>): void {
489 if (
490 message.workerId != null &&
491 this.getWorkerById(message.workerId) == null
492 ) {
493 throw new Error(
494 `Worker message received from unknown worker '${message.workerId}'`
495 )
496 }
497 }
498
ffcbbad8 499 /**
f06e48d8 500 * Gets the given worker its worker node key.
ffcbbad8
JB
501 *
502 * @param worker - The worker.
f59e1027 503 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 504 */
f06e48d8
JB
505 private getWorkerNodeKey (worker: Worker): number {
506 return this.workerNodes.findIndex(
507 workerNode => workerNode.worker === worker
508 )
bf9549ae
JB
509 }
510
afc003b2 511 /** @inheritDoc */
a35560ba 512 public setWorkerChoiceStrategy (
59219cbb
JB
513 workerChoiceStrategy: WorkerChoiceStrategy,
514 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 515 ): void {
aee46736 516 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 517 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
518 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
519 this.opts.workerChoiceStrategy
520 )
521 if (workerChoiceStrategyOptions != null) {
522 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
523 }
8a1260a3 524 for (const workerNode of this.workerNodes) {
4b628b48 525 workerNode.resetUsage()
b6b32453 526 this.setWorkerStatistics(workerNode.worker)
59219cbb 527 }
a20f0ba5
JB
528 }
529
530 /** @inheritDoc */
531 public setWorkerChoiceStrategyOptions (
532 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
533 ): void {
0d80593b 534 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
535 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
536 this.workerChoiceStrategyContext.setOptions(
537 this.opts.workerChoiceStrategyOptions
a35560ba
S
538 )
539 }
540
a20f0ba5 541 /** @inheritDoc */
8f52842f
JB
542 public enableTasksQueue (
543 enable: boolean,
544 tasksQueueOptions?: TasksQueueOptions
545 ): void {
a20f0ba5 546 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 547 this.flushTasksQueues()
a20f0ba5
JB
548 }
549 this.opts.enableTasksQueue = enable
8f52842f 550 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
551 }
552
553 /** @inheritDoc */
8f52842f 554 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 555 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
556 this.checkValidTasksQueueOptions(tasksQueueOptions)
557 this.opts.tasksQueueOptions =
558 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 559 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
560 delete this.opts.tasksQueueOptions
561 }
562 }
563
564 private buildTasksQueueOptions (
565 tasksQueueOptions: TasksQueueOptions
566 ): TasksQueueOptions {
567 return {
568 concurrency: tasksQueueOptions?.concurrency ?? 1
569 }
570 }
571
c319c66b
JB
572 /**
573 * Whether the pool is full or not.
574 *
575 * The pool filling boolean status.
576 */
dea903a8
JB
577 protected get full (): boolean {
578 return this.workerNodes.length >= this.maxSize
579 }
c2ade475 580
c319c66b
JB
581 /**
582 * Whether the pool is busy or not.
583 *
584 * The pool busyness boolean status.
585 */
586 protected abstract get busy (): boolean
7c0ba920 587
6c6afb84
JB
588 /**
589 * Whether worker nodes are executing at least one task.
590 *
591 * @returns Worker nodes busyness boolean status.
592 */
c2ade475 593 protected internalBusy (): boolean {
e0ae6100
JB
594 return (
595 this.workerNodes.findIndex(workerNode => {
f59e1027 596 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
597 }) === -1
598 )
cb70b19d
JB
599 }
600
afc003b2 601 /** @inheritDoc */
a86b6df1 602 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 603 const timestamp = performance.now()
20dcad1a 604 const workerNodeKey = this.chooseWorkerNode()
adc3c320 605 const submittedTask: Task<Data> = {
ff128cc9 606 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
607 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
608 data: data ?? ({} as Data),
b6b32453 609 timestamp,
21f710aa 610 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 611 id: randomUUID()
adc3c320 612 }
2e81254d 613 const res = new Promise<Response>((resolve, reject) => {
02706357 614 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
615 resolve,
616 reject,
20dcad1a 617 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
618 })
619 })
ff733df7
JB
620 if (
621 this.opts.enableTasksQueue === true &&
7171d33f 622 (this.busy ||
f59e1027 623 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 624 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 625 .concurrency as number))
ff733df7 626 ) {
26a929d7
JB
627 this.enqueueTask(workerNodeKey, submittedTask)
628 } else {
2e81254d 629 this.executeTask(workerNodeKey, submittedTask)
adc3c320 630 }
ff733df7 631 this.checkAndEmitEvents()
78cea37e 632 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
633 return res
634 }
c97c7edb 635
afc003b2 636 /** @inheritDoc */
c97c7edb 637 public async destroy (): Promise<void> {
1fbcaa7c 638 await Promise.all(
875a7c37
JB
639 this.workerNodes.map(async (workerNode, workerNodeKey) => {
640 this.flushTasksQueue(workerNodeKey)
47aacbaa 641 // FIXME: wait for tasks to be finished
920278a2
JB
642 const workerExitPromise = new Promise<void>(resolve => {
643 workerNode.worker.on('exit', () => {
644 resolve()
645 })
646 })
f06e48d8 647 await this.destroyWorker(workerNode.worker)
920278a2 648 await workerExitPromise
1fbcaa7c
JB
649 })
650 )
c97c7edb
S
651 }
652
4a6952ff 653 /**
6c6afb84 654 * Terminates the given worker.
4a6952ff 655 *
f06e48d8 656 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
657 */
658 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 659
729c563d 660 /**
6677a3d3
JB
661 * Setup hook to execute code before worker nodes are created in the abstract constructor.
662 * Can be overridden.
afc003b2
JB
663 *
664 * @virtual
729c563d 665 */
280c2a77 666 protected setupHook (): void {
d99ba5a8 667 // Intentionally empty
280c2a77 668 }
c97c7edb 669
729c563d 670 /**
280c2a77
S
671 * Should return whether the worker is the main worker or not.
672 */
673 protected abstract isMain (): boolean
674
675 /**
2e81254d 676 * Hook executed before the worker task execution.
bf9549ae 677 * Can be overridden.
729c563d 678 *
f06e48d8 679 * @param workerNodeKey - The worker node key.
1c6fe997 680 * @param task - The task to execute.
729c563d 681 */
1c6fe997
JB
682 protected beforeTaskExecutionHook (
683 workerNodeKey: number,
684 task: Task<Data>
685 ): void {
f59e1027 686 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
687 ++workerUsage.tasks.executing
688 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 689 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
690 task.name as string
691 ) as WorkerUsage
eb8afc8a
JB
692 ++taskWorkerUsage.tasks.executing
693 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
694 }
695
c01733f1 696 /**
2e81254d 697 * Hook executed after the worker task execution.
bf9549ae 698 * Can be overridden.
c01733f1 699 *
c923ce56 700 * @param worker - The worker.
38e795c1 701 * @param message - The received message.
c01733f1 702 */
2e81254d 703 protected afterTaskExecutionHook (
c923ce56 704 worker: Worker,
2740a743 705 message: MessageValue<Response>
bf9549ae 706 ): void {
ff128cc9
JB
707 const workerNodeKey = this.getWorkerNodeKey(worker)
708 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
709 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
710 this.updateRunTimeWorkerUsage(workerUsage, message)
711 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 712 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 713 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 714 ) as WorkerUsage
eb8afc8a
JB
715 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
716 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
717 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
718 }
719
720 private updateTaskStatisticsWorkerUsage (
721 workerUsage: WorkerUsage,
722 message: MessageValue<Response>
723 ): void {
a4e07f72
JB
724 const workerTaskStatistics = workerUsage.tasks
725 --workerTaskStatistics.executing
98e72cda
JB
726 if (message.taskError == null) {
727 ++workerTaskStatistics.executed
728 } else {
a4e07f72 729 ++workerTaskStatistics.failed
2740a743 730 }
f8eb0a2a
JB
731 }
732
a4e07f72
JB
733 private updateRunTimeWorkerUsage (
734 workerUsage: WorkerUsage,
f8eb0a2a
JB
735 message: MessageValue<Response>
736 ): void {
87de9ff5
JB
737 if (
738 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 739 .aggregate
87de9ff5 740 ) {
f7510105 741 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
742 workerUsage.runTime.aggregate =
743 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
744 workerUsage.runTime.minimum = Math.min(
745 taskRunTime,
71514351 746 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
747 )
748 workerUsage.runTime.maximum = Math.max(
749 taskRunTime,
71514351 750 workerUsage.runTime?.maximum ?? -Infinity
f7510105 751 )
c6bd2650 752 if (
932fc8be
JB
753 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
754 .average &&
a4e07f72 755 workerUsage.tasks.executed !== 0
c6bd2650 756 ) {
a4e07f72 757 workerUsage.runTime.average =
98e72cda 758 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 759 }
3fa4cdd2 760 if (
932fc8be
JB
761 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
762 .median &&
d715b7bc 763 message.taskPerformance?.runTime != null
3fa4cdd2 764 ) {
a4e07f72
JB
765 workerUsage.runTime.history.push(message.taskPerformance.runTime)
766 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 767 }
3032893a 768 }
f8eb0a2a
JB
769 }
770
a4e07f72
JB
771 private updateWaitTimeWorkerUsage (
772 workerUsage: WorkerUsage,
1c6fe997 773 task: Task<Data>
f8eb0a2a 774 ): void {
1c6fe997
JB
775 const timestamp = performance.now()
776 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
777 if (
778 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 779 .aggregate
87de9ff5 780 ) {
71514351
JB
781 workerUsage.waitTime.aggregate =
782 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
783 workerUsage.waitTime.minimum = Math.min(
784 taskWaitTime,
71514351 785 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
786 )
787 workerUsage.waitTime.maximum = Math.max(
788 taskWaitTime,
71514351 789 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 790 )
09a6305f 791 if (
87de9ff5 792 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 793 .waitTime.average &&
a4e07f72 794 workerUsage.tasks.executed !== 0
09a6305f 795 ) {
a4e07f72 796 workerUsage.waitTime.average =
98e72cda 797 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
798 }
799 if (
87de9ff5 800 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 801 .waitTime.median
09a6305f 802 ) {
1c6fe997 803 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 804 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 805 }
0567595a 806 }
c01733f1 807 }
808
a4e07f72 809 private updateEluWorkerUsage (
5df69fab 810 workerUsage: WorkerUsage,
62c15a68
JB
811 message: MessageValue<Response>
812 ): void {
5df69fab
JB
813 if (
814 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
815 .aggregate
816 ) {
f7510105 817 if (message.taskPerformance?.elu != null) {
71514351
JB
818 workerUsage.elu.idle.aggregate =
819 (workerUsage.elu.idle?.aggregate ?? 0) +
820 message.taskPerformance.elu.idle
821 workerUsage.elu.active.aggregate =
822 (workerUsage.elu.active?.aggregate ?? 0) +
823 message.taskPerformance.elu.active
f7510105
JB
824 if (workerUsage.elu.utilization != null) {
825 workerUsage.elu.utilization =
826 (workerUsage.elu.utilization +
827 message.taskPerformance.elu.utilization) /
828 2
829 } else {
830 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
831 }
832 workerUsage.elu.idle.minimum = Math.min(
833 message.taskPerformance.elu.idle,
71514351 834 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
835 )
836 workerUsage.elu.idle.maximum = Math.max(
837 message.taskPerformance.elu.idle,
71514351 838 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
839 )
840 workerUsage.elu.active.minimum = Math.min(
841 message.taskPerformance.elu.active,
71514351 842 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
843 )
844 workerUsage.elu.active.maximum = Math.max(
845 message.taskPerformance.elu.active,
71514351 846 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 847 )
71514351
JB
848 if (
849 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
850 .average &&
851 workerUsage.tasks.executed !== 0
852 ) {
71514351 853 workerUsage.elu.idle.average =
98e72cda 854 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 855 workerUsage.elu.active.average =
98e72cda 856 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
857 }
858 if (
859 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 860 .median
71514351
JB
861 ) {
862 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
863 workerUsage.elu.active.history.push(
864 message.taskPerformance.elu.active
865 )
866 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
867 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
868 }
62c15a68
JB
869 }
870 }
871 }
872
280c2a77 873 /**
f06e48d8 874 * Chooses a worker node for the next task.
280c2a77 875 *
6c6afb84 876 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 877 *
20dcad1a 878 * @returns The worker node key
280c2a77 879 */
6c6afb84 880 private chooseWorkerNode (): number {
930dcf12 881 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
882 const worker = this.createAndSetupDynamicWorker()
883 if (
884 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
885 ) {
886 return this.getWorkerNodeKey(worker)
887 }
17393ac8 888 }
930dcf12
JB
889 return this.workerChoiceStrategyContext.execute()
890 }
891
6c6afb84
JB
892 /**
893 * Conditions for dynamic worker creation.
894 *
895 * @returns Whether to create a dynamic worker or not.
896 */
897 private shallCreateDynamicWorker (): boolean {
930dcf12 898 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
899 }
900
280c2a77 901 /**
675bb809 902 * Sends a message to the given worker.
280c2a77 903 *
38e795c1
JB
904 * @param worker - The worker which should receive the message.
905 * @param message - The message.
280c2a77
S
906 */
907 protected abstract sendToWorker (
908 worker: Worker,
909 message: MessageValue<Data>
910 ): void
911
729c563d 912 /**
41344292 913 * Creates a new worker.
6c6afb84
JB
914 *
915 * @returns Newly created worker.
729c563d 916 */
280c2a77 917 protected abstract createWorker (): Worker
c97c7edb 918
4a6952ff 919 /**
f06e48d8 920 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
921 *
922 * @returns New, completely set up worker.
923 */
924 protected createAndSetupWorker (): Worker {
bdacc2d2 925 const worker = this.createWorker()
280c2a77 926
35cf1c03 927 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 928 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 929 worker.on('error', error => {
9b106837
JB
930 const workerNodeKey = this.getWorkerNodeKey(worker)
931 const workerInfo = this.getWorkerInfo(workerNodeKey)
932 workerInfo.ready = false
1f68cede
JB
933 if (this.emitter != null) {
934 this.emitter.emit(PoolEvents.error, error)
935 }
2431bdb4 936 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 937 if (workerInfo.dynamic) {
8a1260a3
JB
938 this.createAndSetupDynamicWorker()
939 } else {
940 this.createAndSetupWorker()
941 }
5baee0d7 942 }
19dbc45b 943 if (this.opts.enableTasksQueue === true) {
9b106837 944 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 945 }
5baee0d7 946 })
a35560ba
S
947 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
948 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 949 worker.once('exit', () => {
f06e48d8 950 this.removeWorkerNode(worker)
a974afa6 951 })
280c2a77 952
f06e48d8 953 this.pushWorkerNode(worker)
280c2a77
S
954
955 this.afterWorkerSetup(worker)
956
c97c7edb
S
957 return worker
958 }
be0676b3 959
930dcf12
JB
960 /**
961 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
962 *
963 * @returns New, completely set up dynamic worker.
964 */
965 protected createAndSetupDynamicWorker (): Worker {
966 const worker = this.createAndSetupWorker()
967 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 968 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
969 if (
970 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
971 (message.kill != null &&
972 ((this.opts.enableTasksQueue === false &&
f59e1027 973 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 974 (this.opts.enableTasksQueue === true &&
f59e1027 975 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 976 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
977 ) {
978 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
979 void (this.destroyWorker(worker) as Promise<void>)
980 }
981 })
21f710aa
JB
982 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
983 workerInfo.dynamic = true
984 this.sendToWorker(worker, {
985 checkAlive: true,
986 workerId: workerInfo.id as number
987 })
930dcf12
JB
988 return worker
989 }
990
a2ed5053
JB
991 /**
992 * Registers a listener callback on the given worker.
993 *
994 * @param worker - The worker which should register a listener.
995 * @param listener - The message listener callback.
996 */
997 private registerWorkerMessageListener<Message extends Data | Response>(
998 worker: Worker,
999 listener: (message: MessageValue<Message>) => void
1000 ): void {
1001 worker.on('message', listener as MessageHandler<Worker>)
1002 }
1003
1004 /**
1005 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1006 * Can be overridden.
1007 *
1008 * @param worker - The newly created worker.
1009 */
1010 protected afterWorkerSetup (worker: Worker): void {
1011 // Listen to worker messages.
1012 this.registerWorkerMessageListener(worker, this.workerListener())
1013 // Send startup message to worker.
1014 this.sendToWorker(worker, {
1015 ready: false,
1016 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1017 })
1018 // Setup worker task statistics computation.
1019 this.setWorkerStatistics(worker)
1020 }
1021
1022 private redistributeQueuedTasks (workerNodeKey: number): void {
1023 while (this.tasksQueueSize(workerNodeKey) > 0) {
1024 let targetWorkerNodeKey: number = workerNodeKey
1025 let minQueuedTasks = Infinity
1026 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1027 const workerInfo = this.getWorkerInfo(workerNodeId)
1028 if (
1029 workerNodeId !== workerNodeKey &&
1030 workerInfo.ready &&
1031 workerNode.usage.tasks.queued === 0
1032 ) {
1033 targetWorkerNodeKey = workerNodeId
1034 break
1035 }
1036 if (
1037 workerNodeId !== workerNodeKey &&
1038 workerInfo.ready &&
1039 workerNode.usage.tasks.queued < minQueuedTasks
1040 ) {
1041 minQueuedTasks = workerNode.usage.tasks.queued
1042 targetWorkerNodeKey = workerNodeId
1043 }
1044 }
1045 this.enqueueTask(
1046 targetWorkerNodeKey,
1047 this.dequeueTask(workerNodeKey) as Task<Data>
1048 )
1049 }
1050 }
1051
be0676b3 1052 /**
ff733df7 1053 * This function is the listener registered for each worker message.
be0676b3 1054 *
bdacc2d2 1055 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1056 */
1057 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1058 return message => {
21f710aa 1059 this.checkMessageWorkerId(message)
2431bdb4 1060 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1061 // Worker ready message received
1062 this.handleWorkerReadyMessage(message)
f59e1027 1063 } else if (message.id != null) {
a3445496 1064 // Task execution response received
6b272951
JB
1065 this.handleTaskExecutionResponse(message)
1066 }
1067 }
1068 }
1069
7c8381eb 1070 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1071 const worker = this.getWorkerById(message.workerId)
1072 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1073 message.ready as boolean
2431bdb4
JB
1074 if (this.emitter != null && this.ready) {
1075 this.emitter.emit(PoolEvents.ready, this.info)
1076 }
6b272951
JB
1077 }
1078
1079 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1080 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1081 if (promiseResponse != null) {
1082 if (message.taskError != null) {
1083 if (this.emitter != null) {
1084 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1085 }
6b272951
JB
1086 promiseResponse.reject(message.taskError.message)
1087 } else {
1088 promiseResponse.resolve(message.data as Response)
1089 }
1090 this.afterTaskExecutionHook(promiseResponse.worker, message)
1091 this.promiseResponseMap.delete(message.id as string)
1092 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1093 if (
1094 this.opts.enableTasksQueue === true &&
1095 this.tasksQueueSize(workerNodeKey) > 0
1096 ) {
1097 this.executeTask(
1098 workerNodeKey,
1099 this.dequeueTask(workerNodeKey) as Task<Data>
1100 )
be0676b3 1101 }
6b272951 1102 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1103 }
be0676b3 1104 }
7c0ba920 1105
ff733df7 1106 private checkAndEmitEvents (): void {
1f68cede 1107 if (this.emitter != null) {
ff733df7 1108 if (this.busy) {
2845f2a5 1109 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1110 }
6b27d407 1111 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1112 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1113 }
164d950a
JB
1114 }
1115 }
1116
8a1260a3
JB
1117 /**
1118 * Gets the worker information.
1119 *
1120 * @param workerNodeKey - The worker node key.
1121 */
1122 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1123 return this.workerNodes[workerNodeKey].info
1124 }
1125
a05c10de 1126 /**
f06e48d8 1127 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1128 *
38e795c1 1129 * @param worker - The worker.
f06e48d8 1130 * @returns The worker nodes length.
ea7a90d3 1131 */
f06e48d8 1132 private pushWorkerNode (worker: Worker): number {
4b628b48 1133 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1134 }
c923ce56 1135
51fe3d3c 1136 /**
f06e48d8 1137 * Removes the given worker from the pool worker nodes.
51fe3d3c 1138 *
f06e48d8 1139 * @param worker - The worker.
51fe3d3c 1140 */
416fd65c 1141 private removeWorkerNode (worker: Worker): void {
f06e48d8 1142 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1143 if (workerNodeKey !== -1) {
1144 this.workerNodes.splice(workerNodeKey, 1)
1145 this.workerChoiceStrategyContext.remove(workerNodeKey)
1146 }
51fe3d3c 1147 }
adc3c320 1148
2e81254d 1149 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1150 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1151 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1152 }
1153
f9f00b5f 1154 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1155 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1156 }
1157
416fd65c 1158 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1159 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1160 }
1161
416fd65c 1162 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1163 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1164 }
1165
416fd65c 1166 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1167 while (this.tasksQueueSize(workerNodeKey) > 0) {
1168 this.executeTask(
1169 workerNodeKey,
1170 this.dequeueTask(workerNodeKey) as Task<Data>
1171 )
ff733df7 1172 }
4b628b48 1173 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1174 }
1175
ef41a6e6
JB
1176 private flushTasksQueues (): void {
1177 for (const [workerNodeKey] of this.workerNodes.entries()) {
1178 this.flushTasksQueue(workerNodeKey)
1179 }
1180 }
b6b32453
JB
1181
1182 private setWorkerStatistics (worker: Worker): void {
1183 this.sendToWorker(worker, {
1184 statistics: {
87de9ff5
JB
1185 runTime:
1186 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1187 .runTime.aggregate,
87de9ff5 1188 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1189 .elu.aggregate
21f710aa
JB
1190 },
1191 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1192 })
1193 }
c97c7edb 1194}