feat: make the remaining worker choice strategies worker readiness aware
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
5c4d16da
JB
4import type {
5 MessageValue,
6 PromiseResponseWrapper,
7 Task
8} from '../utility-types'
bbeadd16 9import {
ff128cc9 10 DEFAULT_TASK_NAME,
bbeadd16
JB
11 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
12 EMPTY_FUNCTION,
59317253 13 isKillBehavior,
0d80593b 14 isPlainObject,
afe0d5bf
JB
15 median,
16 round
bbeadd16 17} from '../utils'
59317253 18import { KillBehaviors } from '../worker/worker-options'
c4855468 19import {
65d7a1c9 20 type IPool,
7c5a1080 21 PoolEmitter,
c4855468 22 PoolEvents,
6b27d407 23 type PoolInfo,
c4855468 24 type PoolOptions,
6b27d407
JB
25 type PoolType,
26 PoolTypes,
4b628b48 27 type TasksQueueOptions
c4855468 28} from './pool'
e102732c
JB
29import type {
30 IWorker,
4b628b48 31 IWorkerNode,
e102732c 32 MessageHandler,
8a1260a3 33 WorkerInfo,
4b628b48 34 WorkerType,
e102732c
JB
35 WorkerUsage
36} from './worker'
a35560ba 37import {
f0d7f803 38 Measurements,
a35560ba 39 WorkerChoiceStrategies,
a20f0ba5
JB
40 type WorkerChoiceStrategy,
41 type WorkerChoiceStrategyOptions
bdaf31cd
JB
42} from './selection-strategies/selection-strategies-types'
43import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 44import { version } from './version'
4b628b48 45import { WorkerNode } from './worker-node'
23ccf9d7 46
729c563d 47/**
ea7a90d3 48 * Base class that implements some shared logic for all poolifier pools.
729c563d 49 *
38e795c1 50 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
51 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
52 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 53 */
c97c7edb 54export abstract class AbstractPool<
f06e48d8 55 Worker extends IWorker,
d3c8a1a8
S
56 Data = unknown,
57 Response = unknown
c4855468 58> implements IPool<Worker, Data, Response> {
afc003b2 59 /** @inheritDoc */
4b628b48 60 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 61
afc003b2 62 /** @inheritDoc */
7c0ba920
JB
63 public readonly emitter?: PoolEmitter
64
be0676b3 65 /**
a3445496 66 * The execution response promise map.
be0676b3 67 *
2740a743 68 * - `key`: The message id of each submitted task.
a3445496 69 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 70 *
a3445496 71 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 72 */
c923ce56
JB
73 protected promiseResponseMap: Map<
74 string,
75 PromiseResponseWrapper<Worker, Response>
76 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 77
a35560ba 78 /**
51fe3d3c 79 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
80 */
81 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
82 Worker,
83 Data,
84 Response
a35560ba
S
85 >
86
afe0d5bf
JB
87 /**
88 * The start timestamp of the pool.
89 */
90 private readonly startTimestamp
91
729c563d
S
92 /**
93 * Constructs a new poolifier pool.
94 *
38e795c1 95 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 96 * @param filePath - Path to the worker file.
38e795c1 97 * @param opts - Options for the pool.
729c563d 98 */
c97c7edb 99 public constructor (
b4213b7f
JB
100 protected readonly numberOfWorkers: number,
101 protected readonly filePath: string,
102 protected readonly opts: PoolOptions<Worker>
c97c7edb 103 ) {
78cea37e 104 if (!this.isMain()) {
c97c7edb
S
105 throw new Error('Cannot start a pool from a worker!')
106 }
8d3782fa 107 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 108 this.checkFilePath(this.filePath)
7c0ba920 109 this.checkPoolOptions(this.opts)
1086026a 110
7254e419
JB
111 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
112 this.executeTask = this.executeTask.bind(this)
113 this.enqueueTask = this.enqueueTask.bind(this)
114 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 115
6bd72cd0 116 if (this.opts.enableEvents === true) {
7c0ba920
JB
117 this.emitter = new PoolEmitter()
118 }
d59df138
JB
119 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
120 Worker,
121 Data,
122 Response
da309861
JB
123 >(
124 this,
125 this.opts.workerChoiceStrategy,
126 this.opts.workerChoiceStrategyOptions
127 )
b6b32453
JB
128
129 this.setupHook()
130
afe0d5bf 131 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
132 this.createAndSetupWorker()
133 }
afe0d5bf
JB
134
135 this.startTimestamp = performance.now()
c97c7edb
S
136 }
137
a35560ba 138 private checkFilePath (filePath: string): void {
ffcbbad8
JB
139 if (
140 filePath == null ||
3d6dd312 141 typeof filePath !== 'string' ||
ffcbbad8
JB
142 (typeof filePath === 'string' && filePath.trim().length === 0)
143 ) {
c510fea7
APA
144 throw new Error('Please specify a file with a worker implementation')
145 }
3d6dd312
JB
146 if (!existsSync(filePath)) {
147 throw new Error(`Cannot find the worker file '${filePath}'`)
148 }
c510fea7
APA
149 }
150
8d3782fa
JB
151 private checkNumberOfWorkers (numberOfWorkers: number): void {
152 if (numberOfWorkers == null) {
153 throw new Error(
154 'Cannot instantiate a pool without specifying the number of workers'
155 )
78cea37e 156 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 157 throw new TypeError(
0d80593b 158 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
159 )
160 } else if (numberOfWorkers < 0) {
473c717a 161 throw new RangeError(
8d3782fa
JB
162 'Cannot instantiate a pool with a negative number of workers'
163 )
6b27d407 164 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
165 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
166 }
167 }
168
169 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
170 if (this.type === PoolTypes.dynamic) {
171 if (min > max) {
172 throw new RangeError(
173 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
174 )
175 } else if (min === 0 && max === 0) {
176 throw new RangeError(
177 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
178 )
179 } else if (min === max) {
180 throw new RangeError(
181 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
182 )
183 }
8d3782fa
JB
184 }
185 }
186
7c0ba920 187 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
188 if (isPlainObject(opts)) {
189 this.opts.workerChoiceStrategy =
190 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
191 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
192 this.opts.workerChoiceStrategyOptions =
193 opts.workerChoiceStrategyOptions ??
194 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
195 this.checkValidWorkerChoiceStrategyOptions(
196 this.opts.workerChoiceStrategyOptions
197 )
1f68cede 198 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
199 this.opts.enableEvents = opts.enableEvents ?? true
200 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
201 if (this.opts.enableTasksQueue) {
202 this.checkValidTasksQueueOptions(
203 opts.tasksQueueOptions as TasksQueueOptions
204 )
205 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
206 opts.tasksQueueOptions as TasksQueueOptions
207 )
208 }
209 } else {
210 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 211 }
aee46736
JB
212 }
213
214 private checkValidWorkerChoiceStrategy (
215 workerChoiceStrategy: WorkerChoiceStrategy
216 ): void {
217 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 218 throw new Error(
aee46736 219 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
220 )
221 }
7c0ba920
JB
222 }
223
0d80593b
JB
224 private checkValidWorkerChoiceStrategyOptions (
225 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
226 ): void {
227 if (!isPlainObject(workerChoiceStrategyOptions)) {
228 throw new TypeError(
229 'Invalid worker choice strategy options: must be a plain object'
230 )
231 }
49be33fe
JB
232 if (
233 workerChoiceStrategyOptions.weights != null &&
6b27d407 234 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
235 ) {
236 throw new Error(
237 'Invalid worker choice strategy options: must have a weight for each worker node'
238 )
239 }
f0d7f803
JB
240 if (
241 workerChoiceStrategyOptions.measurement != null &&
242 !Object.values(Measurements).includes(
243 workerChoiceStrategyOptions.measurement
244 )
245 ) {
246 throw new Error(
247 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
248 )
249 }
0d80593b
JB
250 }
251
a20f0ba5
JB
252 private checkValidTasksQueueOptions (
253 tasksQueueOptions: TasksQueueOptions
254 ): void {
0d80593b
JB
255 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
256 throw new TypeError('Invalid tasks queue options: must be a plain object')
257 }
f0d7f803
JB
258 if (
259 tasksQueueOptions?.concurrency != null &&
260 !Number.isSafeInteger(tasksQueueOptions.concurrency)
261 ) {
262 throw new TypeError(
263 'Invalid worker tasks concurrency: must be an integer'
264 )
265 }
266 if (
267 tasksQueueOptions?.concurrency != null &&
268 tasksQueueOptions.concurrency <= 0
269 ) {
a20f0ba5 270 throw new Error(
f0d7f803 271 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
272 )
273 }
274 }
275
08f3f44c 276 /** @inheritDoc */
6b27d407
JB
277 public get info (): PoolInfo {
278 return {
23ccf9d7 279 version,
6b27d407 280 type: this.type,
184855e6 281 worker: this.worker,
2431bdb4
JB
282 ready: this.ready,
283 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
284 minSize: this.minSize,
285 maxSize: this.maxSize,
c05f0d50
JB
286 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
287 .runTime.aggregate &&
1305e9a8
JB
288 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
289 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
290 workerNodes: this.workerNodes.length,
291 idleWorkerNodes: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
f59e1027 293 workerNode.usage.tasks.executing === 0
a4e07f72
JB
294 ? accumulator + 1
295 : accumulator,
6b27d407
JB
296 0
297 ),
298 busyWorkerNodes: this.workerNodes.reduce(
299 (accumulator, workerNode) =>
f59e1027 300 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
301 0
302 ),
a4e07f72 303 executedTasks: this.workerNodes.reduce(
6b27d407 304 (accumulator, workerNode) =>
f59e1027 305 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
306 0
307 ),
308 executingTasks: this.workerNodes.reduce(
309 (accumulator, workerNode) =>
f59e1027 310 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
311 0
312 ),
313 queuedTasks: this.workerNodes.reduce(
df593701 314 (accumulator, workerNode) =>
f59e1027 315 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
316 0
317 ),
318 maxQueuedTasks: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
b25a42cd 320 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
6b27d407 321 0
a4e07f72
JB
322 ),
323 failedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
f59e1027 325 accumulator + workerNode.usage.tasks.failed,
a4e07f72 326 0
1dcf8b7b
JB
327 ),
328 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
329 .runTime.aggregate && {
330 runTime: {
98e72cda
JB
331 minimum: round(
332 Math.min(
333 ...this.workerNodes.map(
334 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
335 )
1dcf8b7b
JB
336 )
337 ),
98e72cda
JB
338 maximum: round(
339 Math.max(
340 ...this.workerNodes.map(
341 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
342 )
1dcf8b7b 343 )
98e72cda
JB
344 ),
345 average: round(
346 this.workerNodes.reduce(
347 (accumulator, workerNode) =>
348 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
349 0
350 ) /
351 this.workerNodes.reduce(
352 (accumulator, workerNode) =>
353 accumulator + (workerNode.usage.tasks?.executed ?? 0),
354 0
355 )
356 ),
357 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
358 .runTime.median && {
359 median: round(
360 median(
361 this.workerNodes.map(
362 workerNode => workerNode.usage.runTime?.median ?? 0
363 )
364 )
365 )
366 })
1dcf8b7b
JB
367 }
368 }),
369 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
370 .waitTime.aggregate && {
371 waitTime: {
98e72cda
JB
372 minimum: round(
373 Math.min(
374 ...this.workerNodes.map(
375 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
376 )
1dcf8b7b
JB
377 )
378 ),
98e72cda
JB
379 maximum: round(
380 Math.max(
381 ...this.workerNodes.map(
382 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
383 )
1dcf8b7b 384 )
98e72cda
JB
385 ),
386 average: round(
387 this.workerNodes.reduce(
388 (accumulator, workerNode) =>
389 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
390 0
391 ) /
392 this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + (workerNode.usage.tasks?.executed ?? 0),
395 0
396 )
397 ),
398 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
399 .waitTime.median && {
400 median: round(
401 median(
402 this.workerNodes.map(
403 workerNode => workerNode.usage.waitTime?.median ?? 0
404 )
405 )
406 )
407 })
1dcf8b7b
JB
408 }
409 })
6b27d407
JB
410 }
411 }
08f3f44c 412
2431bdb4 413 private get starting (): boolean {
d2c73f82 414 return this.workerNodes.length < this.minSize
2431bdb4
JB
415 }
416
417 private get ready (): boolean {
418 return (
d5024c00 419 this.workerNodes.length >= this.minSize &&
d2c73f82
JB
420 this.workerNodes.every(
421 (workerNode, workerNodeKey) =>
422 workerNodeKey < this.minSize && workerNode.info.ready
423 )
2431bdb4
JB
424 )
425 }
426
afe0d5bf
JB
427 /**
428 * Gets the approximate pool utilization.
429 *
430 * @returns The pool utilization.
431 */
432 private get utilization (): number {
8e5ca040 433 const poolTimeCapacity =
fe7d90db 434 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
435 const totalTasksRunTime = this.workerNodes.reduce(
436 (accumulator, workerNode) =>
71514351 437 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
438 0
439 )
440 const totalTasksWaitTime = this.workerNodes.reduce(
441 (accumulator, workerNode) =>
71514351 442 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
443 0
444 )
8e5ca040 445 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
446 }
447
8881ae32
JB
448 /**
449 * Pool type.
450 *
451 * If it is `'dynamic'`, it provides the `max` property.
452 */
453 protected abstract get type (): PoolType
454
184855e6
JB
455 /**
456 * Gets the worker type.
457 */
458 protected abstract get worker (): WorkerType
459
c2ade475 460 /**
6b27d407 461 * Pool minimum size.
c2ade475 462 */
6b27d407 463 protected abstract get minSize (): number
ff733df7
JB
464
465 /**
6b27d407 466 * Pool maximum size.
ff733df7 467 */
6b27d407 468 protected abstract get maxSize (): number
a35560ba 469
f59e1027
JB
470 /**
471 * Get the worker given its id.
472 *
473 * @param workerId - The worker id.
474 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
475 */
476 private getWorkerById (workerId: number): Worker | undefined {
477 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
478 ?.worker
479 }
480
6b813701
JB
481 /**
482 * Checks if the worker id sent in the received message from a worker is valid.
483 *
484 * @param message - The received message.
485 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
486 */
21f710aa
JB
487 private checkMessageWorkerId (message: MessageValue<Response>): void {
488 if (
489 message.workerId != null &&
490 this.getWorkerById(message.workerId) == null
491 ) {
492 throw new Error(
493 `Worker message received from unknown worker '${message.workerId}'`
494 )
495 }
496 }
497
ffcbbad8 498 /**
f06e48d8 499 * Gets the given worker its worker node key.
ffcbbad8
JB
500 *
501 * @param worker - The worker.
f59e1027 502 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 503 */
f06e48d8
JB
504 private getWorkerNodeKey (worker: Worker): number {
505 return this.workerNodes.findIndex(
506 workerNode => workerNode.worker === worker
507 )
bf9549ae
JB
508 }
509
afc003b2 510 /** @inheritDoc */
a35560ba 511 public setWorkerChoiceStrategy (
59219cbb
JB
512 workerChoiceStrategy: WorkerChoiceStrategy,
513 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 514 ): void {
aee46736 515 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 516 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
517 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
518 this.opts.workerChoiceStrategy
519 )
520 if (workerChoiceStrategyOptions != null) {
521 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
522 }
8a1260a3 523 for (const workerNode of this.workerNodes) {
4b628b48 524 workerNode.resetUsage()
b6b32453 525 this.setWorkerStatistics(workerNode.worker)
59219cbb 526 }
a20f0ba5
JB
527 }
528
529 /** @inheritDoc */
530 public setWorkerChoiceStrategyOptions (
531 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
532 ): void {
0d80593b 533 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
534 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
535 this.workerChoiceStrategyContext.setOptions(
536 this.opts.workerChoiceStrategyOptions
a35560ba
S
537 )
538 }
539
a20f0ba5 540 /** @inheritDoc */
8f52842f
JB
541 public enableTasksQueue (
542 enable: boolean,
543 tasksQueueOptions?: TasksQueueOptions
544 ): void {
a20f0ba5 545 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 546 this.flushTasksQueues()
a20f0ba5
JB
547 }
548 this.opts.enableTasksQueue = enable
8f52842f 549 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
550 }
551
552 /** @inheritDoc */
8f52842f 553 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 554 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
555 this.checkValidTasksQueueOptions(tasksQueueOptions)
556 this.opts.tasksQueueOptions =
557 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 558 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
559 delete this.opts.tasksQueueOptions
560 }
561 }
562
563 private buildTasksQueueOptions (
564 tasksQueueOptions: TasksQueueOptions
565 ): TasksQueueOptions {
566 return {
567 concurrency: tasksQueueOptions?.concurrency ?? 1
568 }
569 }
570
c319c66b
JB
571 /**
572 * Whether the pool is full or not.
573 *
574 * The pool filling boolean status.
575 */
dea903a8
JB
576 protected get full (): boolean {
577 return this.workerNodes.length >= this.maxSize
578 }
c2ade475 579
c319c66b
JB
580 /**
581 * Whether the pool is busy or not.
582 *
583 * The pool busyness boolean status.
584 */
585 protected abstract get busy (): boolean
7c0ba920 586
6c6afb84
JB
587 /**
588 * Whether worker nodes are executing at least one task.
589 *
590 * @returns Worker nodes busyness boolean status.
591 */
c2ade475 592 protected internalBusy (): boolean {
e0ae6100
JB
593 return (
594 this.workerNodes.findIndex(workerNode => {
f59e1027 595 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
596 }) === -1
597 )
cb70b19d
JB
598 }
599
afc003b2 600 /** @inheritDoc */
a86b6df1 601 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 602 const timestamp = performance.now()
20dcad1a 603 const workerNodeKey = this.chooseWorkerNode()
adc3c320 604 const submittedTask: Task<Data> = {
ff128cc9 605 name: name ?? DEFAULT_TASK_NAME,
e5a5c0fc
JB
606 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
607 data: data ?? ({} as Data),
b6b32453 608 timestamp,
21f710aa 609 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 610 id: randomUUID()
adc3c320 611 }
2e81254d 612 const res = new Promise<Response>((resolve, reject) => {
02706357 613 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
614 resolve,
615 reject,
20dcad1a 616 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
617 })
618 })
ff733df7
JB
619 if (
620 this.opts.enableTasksQueue === true &&
7171d33f 621 (this.busy ||
f59e1027 622 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 623 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 624 .concurrency as number))
ff733df7 625 ) {
26a929d7
JB
626 this.enqueueTask(workerNodeKey, submittedTask)
627 } else {
2e81254d 628 this.executeTask(workerNodeKey, submittedTask)
adc3c320 629 }
ff733df7 630 this.checkAndEmitEvents()
78cea37e 631 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
632 return res
633 }
c97c7edb 634
afc003b2 635 /** @inheritDoc */
c97c7edb 636 public async destroy (): Promise<void> {
1fbcaa7c 637 await Promise.all(
875a7c37
JB
638 this.workerNodes.map(async (workerNode, workerNodeKey) => {
639 this.flushTasksQueue(workerNodeKey)
47aacbaa 640 // FIXME: wait for tasks to be finished
920278a2
JB
641 const workerExitPromise = new Promise<void>(resolve => {
642 workerNode.worker.on('exit', () => {
643 resolve()
644 })
645 })
f06e48d8 646 await this.destroyWorker(workerNode.worker)
920278a2 647 await workerExitPromise
1fbcaa7c
JB
648 })
649 )
c97c7edb
S
650 }
651
4a6952ff 652 /**
6c6afb84 653 * Terminates the given worker.
4a6952ff 654 *
f06e48d8 655 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
656 */
657 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 658
729c563d 659 /**
6677a3d3
JB
660 * Setup hook to execute code before worker nodes are created in the abstract constructor.
661 * Can be overridden.
afc003b2
JB
662 *
663 * @virtual
729c563d 664 */
280c2a77 665 protected setupHook (): void {
d99ba5a8 666 // Intentionally empty
280c2a77 667 }
c97c7edb 668
729c563d 669 /**
280c2a77
S
670 * Should return whether the worker is the main worker or not.
671 */
672 protected abstract isMain (): boolean
673
674 /**
2e81254d 675 * Hook executed before the worker task execution.
bf9549ae 676 * Can be overridden.
729c563d 677 *
f06e48d8 678 * @param workerNodeKey - The worker node key.
1c6fe997 679 * @param task - The task to execute.
729c563d 680 */
1c6fe997
JB
681 protected beforeTaskExecutionHook (
682 workerNodeKey: number,
683 task: Task<Data>
684 ): void {
f59e1027 685 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
686 ++workerUsage.tasks.executing
687 this.updateWaitTimeWorkerUsage(workerUsage, task)
eb8afc8a 688 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
ce1b31be
JB
689 task.name as string
690 ) as WorkerUsage
eb8afc8a
JB
691 ++taskWorkerUsage.tasks.executing
692 this.updateWaitTimeWorkerUsage(taskWorkerUsage, task)
c97c7edb
S
693 }
694
c01733f1 695 /**
2e81254d 696 * Hook executed after the worker task execution.
bf9549ae 697 * Can be overridden.
c01733f1 698 *
c923ce56 699 * @param worker - The worker.
38e795c1 700 * @param message - The received message.
c01733f1 701 */
2e81254d 702 protected afterTaskExecutionHook (
c923ce56 703 worker: Worker,
2740a743 704 message: MessageValue<Response>
bf9549ae 705 ): void {
ff128cc9
JB
706 const workerNodeKey = this.getWorkerNodeKey(worker)
707 const workerUsage = this.workerNodes[workerNodeKey].usage
f1c06930
JB
708 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
709 this.updateRunTimeWorkerUsage(workerUsage, message)
710 this.updateEluWorkerUsage(workerUsage, message)
eb8afc8a 711 const taskWorkerUsage = this.workerNodes[workerNodeKey].getTaskWorkerUsage(
87e44747 712 message.taskPerformance?.name ?? DEFAULT_TASK_NAME
ce1b31be 713 ) as WorkerUsage
eb8afc8a
JB
714 this.updateTaskStatisticsWorkerUsage(taskWorkerUsage, message)
715 this.updateRunTimeWorkerUsage(taskWorkerUsage, message)
716 this.updateEluWorkerUsage(taskWorkerUsage, message)
f1c06930
JB
717 }
718
719 private updateTaskStatisticsWorkerUsage (
720 workerUsage: WorkerUsage,
721 message: MessageValue<Response>
722 ): void {
a4e07f72
JB
723 const workerTaskStatistics = workerUsage.tasks
724 --workerTaskStatistics.executing
98e72cda
JB
725 if (message.taskError == null) {
726 ++workerTaskStatistics.executed
727 } else {
a4e07f72 728 ++workerTaskStatistics.failed
2740a743 729 }
f8eb0a2a
JB
730 }
731
a4e07f72
JB
732 private updateRunTimeWorkerUsage (
733 workerUsage: WorkerUsage,
f8eb0a2a
JB
734 message: MessageValue<Response>
735 ): void {
87de9ff5
JB
736 if (
737 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 738 .aggregate
87de9ff5 739 ) {
f7510105 740 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
741 workerUsage.runTime.aggregate =
742 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
743 workerUsage.runTime.minimum = Math.min(
744 taskRunTime,
71514351 745 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
746 )
747 workerUsage.runTime.maximum = Math.max(
748 taskRunTime,
71514351 749 workerUsage.runTime?.maximum ?? -Infinity
f7510105 750 )
c6bd2650 751 if (
932fc8be
JB
752 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
753 .average &&
a4e07f72 754 workerUsage.tasks.executed !== 0
c6bd2650 755 ) {
a4e07f72 756 workerUsage.runTime.average =
98e72cda 757 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 758 }
3fa4cdd2 759 if (
932fc8be
JB
760 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
761 .median &&
d715b7bc 762 message.taskPerformance?.runTime != null
3fa4cdd2 763 ) {
a4e07f72
JB
764 workerUsage.runTime.history.push(message.taskPerformance.runTime)
765 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 766 }
3032893a 767 }
f8eb0a2a
JB
768 }
769
a4e07f72
JB
770 private updateWaitTimeWorkerUsage (
771 workerUsage: WorkerUsage,
1c6fe997 772 task: Task<Data>
f8eb0a2a 773 ): void {
1c6fe997
JB
774 const timestamp = performance.now()
775 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
776 if (
777 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 778 .aggregate
87de9ff5 779 ) {
71514351
JB
780 workerUsage.waitTime.aggregate =
781 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
782 workerUsage.waitTime.minimum = Math.min(
783 taskWaitTime,
71514351 784 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
785 )
786 workerUsage.waitTime.maximum = Math.max(
787 taskWaitTime,
71514351 788 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 789 )
09a6305f 790 if (
87de9ff5 791 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 792 .waitTime.average &&
a4e07f72 793 workerUsage.tasks.executed !== 0
09a6305f 794 ) {
a4e07f72 795 workerUsage.waitTime.average =
98e72cda 796 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
797 }
798 if (
87de9ff5 799 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 800 .waitTime.median
09a6305f 801 ) {
1c6fe997 802 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 803 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 804 }
0567595a 805 }
c01733f1 806 }
807
a4e07f72 808 private updateEluWorkerUsage (
5df69fab 809 workerUsage: WorkerUsage,
62c15a68
JB
810 message: MessageValue<Response>
811 ): void {
5df69fab
JB
812 if (
813 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
814 .aggregate
815 ) {
f7510105 816 if (message.taskPerformance?.elu != null) {
71514351
JB
817 workerUsage.elu.idle.aggregate =
818 (workerUsage.elu.idle?.aggregate ?? 0) +
819 message.taskPerformance.elu.idle
820 workerUsage.elu.active.aggregate =
821 (workerUsage.elu.active?.aggregate ?? 0) +
822 message.taskPerformance.elu.active
f7510105
JB
823 if (workerUsage.elu.utilization != null) {
824 workerUsage.elu.utilization =
825 (workerUsage.elu.utilization +
826 message.taskPerformance.elu.utilization) /
827 2
828 } else {
829 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
830 }
831 workerUsage.elu.idle.minimum = Math.min(
832 message.taskPerformance.elu.idle,
71514351 833 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
834 )
835 workerUsage.elu.idle.maximum = Math.max(
836 message.taskPerformance.elu.idle,
71514351 837 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
838 )
839 workerUsage.elu.active.minimum = Math.min(
840 message.taskPerformance.elu.active,
71514351 841 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
842 )
843 workerUsage.elu.active.maximum = Math.max(
844 message.taskPerformance.elu.active,
71514351 845 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 846 )
71514351
JB
847 if (
848 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
849 .average &&
850 workerUsage.tasks.executed !== 0
851 ) {
71514351 852 workerUsage.elu.idle.average =
98e72cda 853 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 854 workerUsage.elu.active.average =
98e72cda 855 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
856 }
857 if (
858 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 859 .median
71514351
JB
860 ) {
861 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
862 workerUsage.elu.active.history.push(
863 message.taskPerformance.elu.active
864 )
865 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
866 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
867 }
62c15a68
JB
868 }
869 }
870 }
871
280c2a77 872 /**
f06e48d8 873 * Chooses a worker node for the next task.
280c2a77 874 *
6c6afb84 875 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 876 *
20dcad1a 877 * @returns The worker node key
280c2a77 878 */
6c6afb84 879 private chooseWorkerNode (): number {
930dcf12 880 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
881 const worker = this.createAndSetupDynamicWorker()
882 if (
883 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
884 ) {
885 return this.getWorkerNodeKey(worker)
886 }
17393ac8 887 }
930dcf12
JB
888 return this.workerChoiceStrategyContext.execute()
889 }
890
6c6afb84
JB
891 /**
892 * Conditions for dynamic worker creation.
893 *
894 * @returns Whether to create a dynamic worker or not.
895 */
896 private shallCreateDynamicWorker (): boolean {
930dcf12 897 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
898 }
899
280c2a77 900 /**
675bb809 901 * Sends a message to the given worker.
280c2a77 902 *
38e795c1
JB
903 * @param worker - The worker which should receive the message.
904 * @param message - The message.
280c2a77
S
905 */
906 protected abstract sendToWorker (
907 worker: Worker,
908 message: MessageValue<Data>
909 ): void
910
729c563d 911 /**
41344292 912 * Creates a new worker.
6c6afb84
JB
913 *
914 * @returns Newly created worker.
729c563d 915 */
280c2a77 916 protected abstract createWorker (): Worker
c97c7edb 917
4a6952ff 918 /**
f06e48d8 919 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
920 *
921 * @returns New, completely set up worker.
922 */
923 protected createAndSetupWorker (): Worker {
bdacc2d2 924 const worker = this.createWorker()
280c2a77 925
35cf1c03 926 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 927 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede 928 worker.on('error', error => {
9b106837
JB
929 const workerNodeKey = this.getWorkerNodeKey(worker)
930 const workerInfo = this.getWorkerInfo(workerNodeKey)
931 workerInfo.ready = false
1f68cede
JB
932 if (this.emitter != null) {
933 this.emitter.emit(PoolEvents.error, error)
934 }
2431bdb4 935 if (this.opts.restartWorkerOnError === true && !this.starting) {
9b106837 936 if (workerInfo.dynamic) {
8a1260a3
JB
937 this.createAndSetupDynamicWorker()
938 } else {
939 this.createAndSetupWorker()
940 }
5baee0d7 941 }
19dbc45b 942 if (this.opts.enableTasksQueue === true) {
9b106837 943 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 944 }
5baee0d7 945 })
a35560ba
S
946 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
947 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 948 worker.once('exit', () => {
f06e48d8 949 this.removeWorkerNode(worker)
a974afa6 950 })
280c2a77 951
f06e48d8 952 this.pushWorkerNode(worker)
280c2a77
S
953
954 this.afterWorkerSetup(worker)
955
c97c7edb
S
956 return worker
957 }
be0676b3 958
930dcf12
JB
959 /**
960 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
961 *
962 * @returns New, completely set up dynamic worker.
963 */
964 protected createAndSetupDynamicWorker (): Worker {
965 const worker = this.createAndSetupWorker()
966 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 967 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
968 if (
969 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
970 (message.kill != null &&
971 ((this.opts.enableTasksQueue === false &&
f59e1027 972 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 973 (this.opts.enableTasksQueue === true &&
f59e1027 974 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 975 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
976 ) {
977 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
978 void (this.destroyWorker(worker) as Promise<void>)
979 }
980 })
21f710aa 981 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
d2c73f82 982 workerInfo.ready = true
21f710aa
JB
983 workerInfo.dynamic = true
984 this.sendToWorker(worker, {
985 checkAlive: true,
986 workerId: workerInfo.id as number
987 })
930dcf12
JB
988 return worker
989 }
990
a2ed5053
JB
991 /**
992 * Registers a listener callback on the given worker.
993 *
994 * @param worker - The worker which should register a listener.
995 * @param listener - The message listener callback.
996 */
997 private registerWorkerMessageListener<Message extends Data | Response>(
998 worker: Worker,
999 listener: (message: MessageValue<Message>) => void
1000 ): void {
1001 worker.on('message', listener as MessageHandler<Worker>)
1002 }
1003
1004 /**
1005 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
1006 * Can be overridden.
1007 *
1008 * @param worker - The newly created worker.
1009 */
1010 protected afterWorkerSetup (worker: Worker): void {
1011 // Listen to worker messages.
1012 this.registerWorkerMessageListener(worker, this.workerListener())
1013 // Send startup message to worker.
d2c73f82
JB
1014 this.sendWorkerStartupMessage(worker)
1015 // Setup worker task statistics computation.
1016 this.setWorkerStatistics(worker)
1017 }
1018
1019 private sendWorkerStartupMessage (worker: Worker): void {
a2ed5053
JB
1020 this.sendToWorker(worker, {
1021 ready: false,
1022 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
1023 })
a2ed5053
JB
1024 }
1025
1026 private redistributeQueuedTasks (workerNodeKey: number): void {
1027 while (this.tasksQueueSize(workerNodeKey) > 0) {
1028 let targetWorkerNodeKey: number = workerNodeKey
1029 let minQueuedTasks = Infinity
1030 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
1031 const workerInfo = this.getWorkerInfo(workerNodeId)
1032 if (
1033 workerNodeId !== workerNodeKey &&
1034 workerInfo.ready &&
1035 workerNode.usage.tasks.queued === 0
1036 ) {
1037 targetWorkerNodeKey = workerNodeId
1038 break
1039 }
1040 if (
1041 workerNodeId !== workerNodeKey &&
1042 workerInfo.ready &&
1043 workerNode.usage.tasks.queued < minQueuedTasks
1044 ) {
1045 minQueuedTasks = workerNode.usage.tasks.queued
1046 targetWorkerNodeKey = workerNodeId
1047 }
1048 }
1049 this.enqueueTask(
1050 targetWorkerNodeKey,
1051 this.dequeueTask(workerNodeKey) as Task<Data>
1052 )
1053 }
1054 }
1055
be0676b3 1056 /**
ff733df7 1057 * This function is the listener registered for each worker message.
be0676b3 1058 *
bdacc2d2 1059 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1060 */
1061 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1062 return message => {
21f710aa 1063 this.checkMessageWorkerId(message)
d2c73f82 1064 if (message.ready != null) {
7c8381eb
JB
1065 // Worker ready message received
1066 this.handleWorkerReadyMessage(message)
f59e1027 1067 } else if (message.id != null) {
a3445496 1068 // Task execution response received
6b272951
JB
1069 this.handleTaskExecutionResponse(message)
1070 }
1071 }
1072 }
1073
7c8381eb 1074 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1075 const worker = this.getWorkerById(message.workerId)
1076 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1077 message.ready as boolean
2431bdb4
JB
1078 if (this.emitter != null && this.ready) {
1079 this.emitter.emit(PoolEvents.ready, this.info)
1080 }
6b272951
JB
1081 }
1082
1083 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1084 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1085 if (promiseResponse != null) {
1086 if (message.taskError != null) {
1087 if (this.emitter != null) {
1088 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1089 }
6b272951
JB
1090 promiseResponse.reject(message.taskError.message)
1091 } else {
1092 promiseResponse.resolve(message.data as Response)
1093 }
1094 this.afterTaskExecutionHook(promiseResponse.worker, message)
1095 this.promiseResponseMap.delete(message.id as string)
1096 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1097 if (
1098 this.opts.enableTasksQueue === true &&
1099 this.tasksQueueSize(workerNodeKey) > 0
1100 ) {
1101 this.executeTask(
1102 workerNodeKey,
1103 this.dequeueTask(workerNodeKey) as Task<Data>
1104 )
be0676b3 1105 }
6b272951 1106 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1107 }
be0676b3 1108 }
7c0ba920 1109
ff733df7 1110 private checkAndEmitEvents (): void {
1f68cede 1111 if (this.emitter != null) {
ff733df7 1112 if (this.busy) {
2845f2a5 1113 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1114 }
6b27d407 1115 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1116 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1117 }
164d950a
JB
1118 }
1119 }
1120
8a1260a3
JB
1121 /**
1122 * Gets the worker information.
1123 *
1124 * @param workerNodeKey - The worker node key.
1125 */
1126 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1127 return this.workerNodes[workerNodeKey].info
1128 }
1129
a05c10de 1130 /**
f06e48d8 1131 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1132 *
38e795c1 1133 * @param worker - The worker.
f06e48d8 1134 * @returns The worker nodes length.
ea7a90d3 1135 */
f06e48d8 1136 private pushWorkerNode (worker: Worker): number {
cc3ab78b 1137 const workerNode = new WorkerNode<Worker, Data>(worker, this.worker)
949194eb 1138 // Flag the worker as ready at pool startup.
d2c73f82
JB
1139 if (this.starting) {
1140 workerNode.info.ready = true
1141 }
cc3ab78b 1142 return this.workerNodes.push(workerNode)
ea7a90d3 1143 }
c923ce56 1144
51fe3d3c 1145 /**
f06e48d8 1146 * Removes the given worker from the pool worker nodes.
51fe3d3c 1147 *
f06e48d8 1148 * @param worker - The worker.
51fe3d3c 1149 */
416fd65c 1150 private removeWorkerNode (worker: Worker): void {
f06e48d8 1151 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1152 if (workerNodeKey !== -1) {
1153 this.workerNodes.splice(workerNodeKey, 1)
1154 this.workerChoiceStrategyContext.remove(workerNodeKey)
1155 }
51fe3d3c 1156 }
adc3c320 1157
2e81254d 1158 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1159 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1160 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1161 }
1162
f9f00b5f 1163 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1164 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1165 }
1166
416fd65c 1167 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1168 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1169 }
1170
416fd65c 1171 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1172 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1173 }
1174
416fd65c 1175 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1176 while (this.tasksQueueSize(workerNodeKey) > 0) {
1177 this.executeTask(
1178 workerNodeKey,
1179 this.dequeueTask(workerNodeKey) as Task<Data>
1180 )
ff733df7 1181 }
4b628b48 1182 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1183 }
1184
ef41a6e6
JB
1185 private flushTasksQueues (): void {
1186 for (const [workerNodeKey] of this.workerNodes.entries()) {
1187 this.flushTasksQueue(workerNodeKey)
1188 }
1189 }
b6b32453
JB
1190
1191 private setWorkerStatistics (worker: Worker): void {
1192 this.sendToWorker(worker, {
1193 statistics: {
87de9ff5
JB
1194 runTime:
1195 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1196 .runTime.aggregate,
87de9ff5 1197 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1198 .elu.aggregate
21f710aa
JB
1199 },
1200 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1201 })
1202 }
c97c7edb 1203}