refactor: add task function name to task performance
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
4b628b48 21 type TasksQueueOptions
c4855468 22} from './pool'
e102732c
JB
23import type {
24 IWorker,
4b628b48 25 IWorkerNode,
e102732c
JB
26 MessageHandler,
27 Task,
8a1260a3 28 WorkerInfo,
4b628b48 29 WorkerType,
e102732c
JB
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 39import { version } from './version'
4b628b48 40import { WorkerNode } from './worker-node'
23ccf9d7 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
4b628b48 55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
079de991
JB
161 if (this.type === PoolTypes.dynamic) {
162 if (min > max) {
163 throw new RangeError(
164 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
165 )
166 } else if (min === 0 && max === 0) {
167 throw new RangeError(
168 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
169 )
170 } else if (min === max) {
171 throw new RangeError(
172 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
173 )
174 }
8d3782fa
JB
175 }
176 }
177
7c0ba920 178 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
179 if (isPlainObject(opts)) {
180 this.opts.workerChoiceStrategy =
181 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
182 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
183 this.opts.workerChoiceStrategyOptions =
184 opts.workerChoiceStrategyOptions ??
185 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
186 this.checkValidWorkerChoiceStrategyOptions(
187 this.opts.workerChoiceStrategyOptions
188 )
1f68cede 189 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
190 this.opts.enableEvents = opts.enableEvents ?? true
191 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
192 if (this.opts.enableTasksQueue) {
193 this.checkValidTasksQueueOptions(
194 opts.tasksQueueOptions as TasksQueueOptions
195 )
196 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
197 opts.tasksQueueOptions as TasksQueueOptions
198 )
199 }
200 } else {
201 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 202 }
aee46736
JB
203 }
204
205 private checkValidWorkerChoiceStrategy (
206 workerChoiceStrategy: WorkerChoiceStrategy
207 ): void {
208 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 209 throw new Error(
aee46736 210 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
211 )
212 }
7c0ba920
JB
213 }
214
0d80593b
JB
215 private checkValidWorkerChoiceStrategyOptions (
216 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
217 ): void {
218 if (!isPlainObject(workerChoiceStrategyOptions)) {
219 throw new TypeError(
220 'Invalid worker choice strategy options: must be a plain object'
221 )
222 }
49be33fe
JB
223 if (
224 workerChoiceStrategyOptions.weights != null &&
6b27d407 225 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
226 ) {
227 throw new Error(
228 'Invalid worker choice strategy options: must have a weight for each worker node'
229 )
230 }
f0d7f803
JB
231 if (
232 workerChoiceStrategyOptions.measurement != null &&
233 !Object.values(Measurements).includes(
234 workerChoiceStrategyOptions.measurement
235 )
236 ) {
237 throw new Error(
238 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
239 )
240 }
0d80593b
JB
241 }
242
a20f0ba5
JB
243 private checkValidTasksQueueOptions (
244 tasksQueueOptions: TasksQueueOptions
245 ): void {
0d80593b
JB
246 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
247 throw new TypeError('Invalid tasks queue options: must be a plain object')
248 }
f0d7f803
JB
249 if (
250 tasksQueueOptions?.concurrency != null &&
251 !Number.isSafeInteger(tasksQueueOptions.concurrency)
252 ) {
253 throw new TypeError(
254 'Invalid worker tasks concurrency: must be an integer'
255 )
256 }
257 if (
258 tasksQueueOptions?.concurrency != null &&
259 tasksQueueOptions.concurrency <= 0
260 ) {
a20f0ba5 261 throw new Error(
f0d7f803 262 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
263 )
264 }
265 }
266
08f3f44c 267 /** @inheritDoc */
6b27d407
JB
268 public get info (): PoolInfo {
269 return {
23ccf9d7 270 version,
6b27d407 271 type: this.type,
184855e6 272 worker: this.worker,
2431bdb4
JB
273 ready: this.ready,
274 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
275 minSize: this.minSize,
276 maxSize: this.maxSize,
c05f0d50
JB
277 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .runTime.aggregate &&
1305e9a8
JB
279 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
280 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
281 workerNodes: this.workerNodes.length,
282 idleWorkerNodes: this.workerNodes.reduce(
283 (accumulator, workerNode) =>
f59e1027 284 workerNode.usage.tasks.executing === 0
a4e07f72
JB
285 ? accumulator + 1
286 : accumulator,
6b27d407
JB
287 0
288 ),
289 busyWorkerNodes: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
f59e1027 291 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
292 0
293 ),
a4e07f72 294 executedTasks: this.workerNodes.reduce(
6b27d407 295 (accumulator, workerNode) =>
f59e1027 296 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
297 0
298 ),
299 executingTasks: this.workerNodes.reduce(
300 (accumulator, workerNode) =>
f59e1027 301 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
302 0
303 ),
304 queuedTasks: this.workerNodes.reduce(
df593701 305 (accumulator, workerNode) =>
f59e1027 306 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
307 0
308 ),
309 maxQueuedTasks: this.workerNodes.reduce(
310 (accumulator, workerNode) =>
f59e1027 311 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 312 0
a4e07f72
JB
313 ),
314 failedTasks: this.workerNodes.reduce(
315 (accumulator, workerNode) =>
f59e1027 316 accumulator + workerNode.usage.tasks.failed,
a4e07f72 317 0
1dcf8b7b
JB
318 ),
319 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
320 .runTime.aggregate && {
321 runTime: {
98e72cda
JB
322 minimum: round(
323 Math.min(
324 ...this.workerNodes.map(
325 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
326 )
1dcf8b7b
JB
327 )
328 ),
98e72cda
JB
329 maximum: round(
330 Math.max(
331 ...this.workerNodes.map(
332 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
333 )
1dcf8b7b 334 )
98e72cda
JB
335 ),
336 average: round(
337 this.workerNodes.reduce(
338 (accumulator, workerNode) =>
339 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
340 0
341 ) /
342 this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + (workerNode.usage.tasks?.executed ?? 0),
345 0
346 )
347 ),
348 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
349 .runTime.median && {
350 median: round(
351 median(
352 this.workerNodes.map(
353 workerNode => workerNode.usage.runTime?.median ?? 0
354 )
355 )
356 )
357 })
1dcf8b7b
JB
358 }
359 }),
360 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
361 .waitTime.aggregate && {
362 waitTime: {
98e72cda
JB
363 minimum: round(
364 Math.min(
365 ...this.workerNodes.map(
366 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
367 )
1dcf8b7b
JB
368 )
369 ),
98e72cda
JB
370 maximum: round(
371 Math.max(
372 ...this.workerNodes.map(
373 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
374 )
1dcf8b7b 375 )
98e72cda
JB
376 ),
377 average: round(
378 this.workerNodes.reduce(
379 (accumulator, workerNode) =>
380 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
381 0
382 ) /
383 this.workerNodes.reduce(
384 (accumulator, workerNode) =>
385 accumulator + (workerNode.usage.tasks?.executed ?? 0),
386 0
387 )
388 ),
389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
390 .waitTime.median && {
391 median: round(
392 median(
393 this.workerNodes.map(
394 workerNode => workerNode.usage.waitTime?.median ?? 0
395 )
396 )
397 )
398 })
1dcf8b7b
JB
399 }
400 })
6b27d407
JB
401 }
402 }
08f3f44c 403
2431bdb4
JB
404 private get starting (): boolean {
405 return (
d5024c00
JB
406 this.workerNodes.length < this.minSize ||
407 (this.workerNodes.length >= this.minSize &&
408 this.workerNodes.some(workerNode => !workerNode.info.ready))
2431bdb4
JB
409 )
410 }
411
412 private get ready (): boolean {
413 return (
d5024c00
JB
414 this.workerNodes.length >= this.minSize &&
415 this.workerNodes.every(workerNode => workerNode.info.ready)
2431bdb4
JB
416 )
417 }
418
afe0d5bf
JB
419 /**
420 * Gets the approximate pool utilization.
421 *
422 * @returns The pool utilization.
423 */
424 private get utilization (): number {
fe7d90db
JB
425 const poolRunTimeCapacity =
426 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
427 const totalTasksRunTime = this.workerNodes.reduce(
428 (accumulator, workerNode) =>
71514351 429 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
430 0
431 )
432 const totalTasksWaitTime = this.workerNodes.reduce(
433 (accumulator, workerNode) =>
71514351 434 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
435 0
436 )
437 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
438 }
439
8881ae32
JB
440 /**
441 * Pool type.
442 *
443 * If it is `'dynamic'`, it provides the `max` property.
444 */
445 protected abstract get type (): PoolType
446
184855e6
JB
447 /**
448 * Gets the worker type.
449 */
450 protected abstract get worker (): WorkerType
451
c2ade475 452 /**
6b27d407 453 * Pool minimum size.
c2ade475 454 */
6b27d407 455 protected abstract get minSize (): number
ff733df7
JB
456
457 /**
6b27d407 458 * Pool maximum size.
ff733df7 459 */
6b27d407 460 protected abstract get maxSize (): number
a35560ba 461
f59e1027
JB
462 /**
463 * Get the worker given its id.
464 *
465 * @param workerId - The worker id.
466 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
467 */
468 private getWorkerById (workerId: number): Worker | undefined {
469 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
470 ?.worker
471 }
472
21f710aa
JB
473 private checkMessageWorkerId (message: MessageValue<Response>): void {
474 if (
475 message.workerId != null &&
476 this.getWorkerById(message.workerId) == null
477 ) {
478 throw new Error(
479 `Worker message received from unknown worker '${message.workerId}'`
480 )
481 }
482 }
483
ffcbbad8 484 /**
f06e48d8 485 * Gets the given worker its worker node key.
ffcbbad8
JB
486 *
487 * @param worker - The worker.
f59e1027 488 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 489 */
f06e48d8
JB
490 private getWorkerNodeKey (worker: Worker): number {
491 return this.workerNodes.findIndex(
492 workerNode => workerNode.worker === worker
493 )
bf9549ae
JB
494 }
495
afc003b2 496 /** @inheritDoc */
a35560ba 497 public setWorkerChoiceStrategy (
59219cbb
JB
498 workerChoiceStrategy: WorkerChoiceStrategy,
499 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 500 ): void {
aee46736 501 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 502 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
503 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
504 this.opts.workerChoiceStrategy
505 )
506 if (workerChoiceStrategyOptions != null) {
507 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
508 }
8a1260a3 509 for (const workerNode of this.workerNodes) {
4b628b48 510 workerNode.resetUsage()
b6b32453 511 this.setWorkerStatistics(workerNode.worker)
59219cbb 512 }
a20f0ba5
JB
513 }
514
515 /** @inheritDoc */
516 public setWorkerChoiceStrategyOptions (
517 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
518 ): void {
0d80593b 519 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
520 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
521 this.workerChoiceStrategyContext.setOptions(
522 this.opts.workerChoiceStrategyOptions
a35560ba
S
523 )
524 }
525
a20f0ba5 526 /** @inheritDoc */
8f52842f
JB
527 public enableTasksQueue (
528 enable: boolean,
529 tasksQueueOptions?: TasksQueueOptions
530 ): void {
a20f0ba5 531 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 532 this.flushTasksQueues()
a20f0ba5
JB
533 }
534 this.opts.enableTasksQueue = enable
8f52842f 535 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
536 }
537
538 /** @inheritDoc */
8f52842f 539 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 540 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
541 this.checkValidTasksQueueOptions(tasksQueueOptions)
542 this.opts.tasksQueueOptions =
543 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 544 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
545 delete this.opts.tasksQueueOptions
546 }
547 }
548
549 private buildTasksQueueOptions (
550 tasksQueueOptions: TasksQueueOptions
551 ): TasksQueueOptions {
552 return {
553 concurrency: tasksQueueOptions?.concurrency ?? 1
554 }
555 }
556
c319c66b
JB
557 /**
558 * Whether the pool is full or not.
559 *
560 * The pool filling boolean status.
561 */
dea903a8
JB
562 protected get full (): boolean {
563 return this.workerNodes.length >= this.maxSize
564 }
c2ade475 565
c319c66b
JB
566 /**
567 * Whether the pool is busy or not.
568 *
569 * The pool busyness boolean status.
570 */
571 protected abstract get busy (): boolean
7c0ba920 572
6c6afb84
JB
573 /**
574 * Whether worker nodes are executing at least one task.
575 *
576 * @returns Worker nodes busyness boolean status.
577 */
c2ade475 578 protected internalBusy (): boolean {
e0ae6100
JB
579 return (
580 this.workerNodes.findIndex(workerNode => {
f59e1027 581 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
582 }) === -1
583 )
cb70b19d
JB
584 }
585
afc003b2 586 /** @inheritDoc */
a86b6df1 587 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 588 const timestamp = performance.now()
20dcad1a 589 const workerNodeKey = this.chooseWorkerNode()
adc3c320 590 const submittedTask: Task<Data> = {
a86b6df1 591 name,
e5a5c0fc
JB
592 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
593 data: data ?? ({} as Data),
b6b32453 594 timestamp,
21f710aa 595 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 596 id: randomUUID()
adc3c320 597 }
2e81254d 598 const res = new Promise<Response>((resolve, reject) => {
02706357 599 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
600 resolve,
601 reject,
20dcad1a 602 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
603 })
604 })
ff733df7
JB
605 if (
606 this.opts.enableTasksQueue === true &&
7171d33f 607 (this.busy ||
f59e1027 608 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 609 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 610 .concurrency as number))
ff733df7 611 ) {
26a929d7
JB
612 this.enqueueTask(workerNodeKey, submittedTask)
613 } else {
2e81254d 614 this.executeTask(workerNodeKey, submittedTask)
adc3c320 615 }
ff733df7 616 this.checkAndEmitEvents()
78cea37e 617 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
618 return res
619 }
c97c7edb 620
afc003b2 621 /** @inheritDoc */
c97c7edb 622 public async destroy (): Promise<void> {
1fbcaa7c 623 await Promise.all(
875a7c37
JB
624 this.workerNodes.map(async (workerNode, workerNodeKey) => {
625 this.flushTasksQueue(workerNodeKey)
47aacbaa 626 // FIXME: wait for tasks to be finished
920278a2
JB
627 const workerExitPromise = new Promise<void>(resolve => {
628 workerNode.worker.on('exit', () => {
629 resolve()
630 })
631 })
f06e48d8 632 await this.destroyWorker(workerNode.worker)
920278a2 633 await workerExitPromise
1fbcaa7c
JB
634 })
635 )
c97c7edb
S
636 }
637
4a6952ff 638 /**
6c6afb84 639 * Terminates the given worker.
4a6952ff 640 *
f06e48d8 641 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
642 */
643 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 644
729c563d 645 /**
6677a3d3
JB
646 * Setup hook to execute code before worker nodes are created in the abstract constructor.
647 * Can be overridden.
afc003b2
JB
648 *
649 * @virtual
729c563d 650 */
280c2a77 651 protected setupHook (): void {
d99ba5a8 652 // Intentionally empty
280c2a77 653 }
c97c7edb 654
729c563d 655 /**
280c2a77
S
656 * Should return whether the worker is the main worker or not.
657 */
658 protected abstract isMain (): boolean
659
660 /**
2e81254d 661 * Hook executed before the worker task execution.
bf9549ae 662 * Can be overridden.
729c563d 663 *
f06e48d8 664 * @param workerNodeKey - The worker node key.
1c6fe997 665 * @param task - The task to execute.
729c563d 666 */
1c6fe997
JB
667 protected beforeTaskExecutionHook (
668 workerNodeKey: number,
669 task: Task<Data>
670 ): void {
f59e1027 671 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
672 ++workerUsage.tasks.executing
673 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
674 }
675
c01733f1 676 /**
2e81254d 677 * Hook executed after the worker task execution.
bf9549ae 678 * Can be overridden.
c01733f1 679 *
c923ce56 680 * @param worker - The worker.
38e795c1 681 * @param message - The received message.
c01733f1 682 */
2e81254d 683 protected afterTaskExecutionHook (
c923ce56 684 worker: Worker,
2740a743 685 message: MessageValue<Response>
bf9549ae 686 ): void {
f59e1027 687 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
688 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
689 this.updateRunTimeWorkerUsage(workerUsage, message)
690 this.updateEluWorkerUsage(workerUsage, message)
691 }
692
693 private updateTaskStatisticsWorkerUsage (
694 workerUsage: WorkerUsage,
695 message: MessageValue<Response>
696 ): void {
a4e07f72
JB
697 const workerTaskStatistics = workerUsage.tasks
698 --workerTaskStatistics.executing
98e72cda
JB
699 if (message.taskError == null) {
700 ++workerTaskStatistics.executed
701 } else {
a4e07f72 702 ++workerTaskStatistics.failed
2740a743 703 }
f8eb0a2a
JB
704 }
705
a4e07f72
JB
706 private updateRunTimeWorkerUsage (
707 workerUsage: WorkerUsage,
f8eb0a2a
JB
708 message: MessageValue<Response>
709 ): void {
87de9ff5
JB
710 if (
711 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 712 .aggregate
87de9ff5 713 ) {
f7510105 714 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
715 workerUsage.runTime.aggregate =
716 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
717 workerUsage.runTime.minimum = Math.min(
718 taskRunTime,
71514351 719 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
720 )
721 workerUsage.runTime.maximum = Math.max(
722 taskRunTime,
71514351 723 workerUsage.runTime?.maximum ?? -Infinity
f7510105 724 )
c6bd2650 725 if (
932fc8be
JB
726 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
727 .average &&
a4e07f72 728 workerUsage.tasks.executed !== 0
c6bd2650 729 ) {
a4e07f72 730 workerUsage.runTime.average =
98e72cda 731 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 732 }
3fa4cdd2 733 if (
932fc8be
JB
734 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
735 .median &&
d715b7bc 736 message.taskPerformance?.runTime != null
3fa4cdd2 737 ) {
a4e07f72
JB
738 workerUsage.runTime.history.push(message.taskPerformance.runTime)
739 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 740 }
3032893a 741 }
f8eb0a2a
JB
742 }
743
a4e07f72
JB
744 private updateWaitTimeWorkerUsage (
745 workerUsage: WorkerUsage,
1c6fe997 746 task: Task<Data>
f8eb0a2a 747 ): void {
1c6fe997
JB
748 const timestamp = performance.now()
749 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
750 if (
751 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 752 .aggregate
87de9ff5 753 ) {
71514351
JB
754 workerUsage.waitTime.aggregate =
755 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
756 workerUsage.waitTime.minimum = Math.min(
757 taskWaitTime,
71514351 758 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
759 )
760 workerUsage.waitTime.maximum = Math.max(
761 taskWaitTime,
71514351 762 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 763 )
09a6305f 764 if (
87de9ff5 765 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 766 .waitTime.average &&
a4e07f72 767 workerUsage.tasks.executed !== 0
09a6305f 768 ) {
a4e07f72 769 workerUsage.waitTime.average =
98e72cda 770 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
771 }
772 if (
87de9ff5 773 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 774 .waitTime.median
09a6305f 775 ) {
1c6fe997 776 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 777 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 778 }
0567595a 779 }
c01733f1 780 }
781
a4e07f72 782 private updateEluWorkerUsage (
5df69fab 783 workerUsage: WorkerUsage,
62c15a68
JB
784 message: MessageValue<Response>
785 ): void {
5df69fab
JB
786 if (
787 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
788 .aggregate
789 ) {
f7510105 790 if (message.taskPerformance?.elu != null) {
71514351
JB
791 workerUsage.elu.idle.aggregate =
792 (workerUsage.elu.idle?.aggregate ?? 0) +
793 message.taskPerformance.elu.idle
794 workerUsage.elu.active.aggregate =
795 (workerUsage.elu.active?.aggregate ?? 0) +
796 message.taskPerformance.elu.active
f7510105
JB
797 if (workerUsage.elu.utilization != null) {
798 workerUsage.elu.utilization =
799 (workerUsage.elu.utilization +
800 message.taskPerformance.elu.utilization) /
801 2
802 } else {
803 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
804 }
805 workerUsage.elu.idle.minimum = Math.min(
806 message.taskPerformance.elu.idle,
71514351 807 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
808 )
809 workerUsage.elu.idle.maximum = Math.max(
810 message.taskPerformance.elu.idle,
71514351 811 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
812 )
813 workerUsage.elu.active.minimum = Math.min(
814 message.taskPerformance.elu.active,
71514351 815 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
816 )
817 workerUsage.elu.active.maximum = Math.max(
818 message.taskPerformance.elu.active,
71514351 819 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 820 )
71514351
JB
821 if (
822 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
823 .average &&
824 workerUsage.tasks.executed !== 0
825 ) {
71514351 826 workerUsage.elu.idle.average =
98e72cda 827 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 828 workerUsage.elu.active.average =
98e72cda 829 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
830 }
831 if (
832 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 833 .median
71514351
JB
834 ) {
835 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
836 workerUsage.elu.active.history.push(
837 message.taskPerformance.elu.active
838 )
839 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
840 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
841 }
62c15a68
JB
842 }
843 }
844 }
845
280c2a77 846 /**
f06e48d8 847 * Chooses a worker node for the next task.
280c2a77 848 *
6c6afb84 849 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 850 *
20dcad1a 851 * @returns The worker node key
280c2a77 852 */
6c6afb84 853 private chooseWorkerNode (): number {
930dcf12 854 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
855 const worker = this.createAndSetupDynamicWorker()
856 if (
857 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
858 ) {
859 return this.getWorkerNodeKey(worker)
860 }
17393ac8 861 }
930dcf12
JB
862 return this.workerChoiceStrategyContext.execute()
863 }
864
6c6afb84
JB
865 /**
866 * Conditions for dynamic worker creation.
867 *
868 * @returns Whether to create a dynamic worker or not.
869 */
870 private shallCreateDynamicWorker (): boolean {
930dcf12 871 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
872 }
873
280c2a77 874 /**
675bb809 875 * Sends a message to the given worker.
280c2a77 876 *
38e795c1
JB
877 * @param worker - The worker which should receive the message.
878 * @param message - The message.
280c2a77
S
879 */
880 protected abstract sendToWorker (
881 worker: Worker,
882 message: MessageValue<Data>
883 ): void
884
4a6952ff 885 /**
f06e48d8 886 * Registers a listener callback on the given worker.
4a6952ff 887 *
38e795c1
JB
888 * @param worker - The worker which should register a listener.
889 * @param listener - The message listener callback.
4a6952ff 890 */
e102732c
JB
891 private registerWorkerMessageListener<Message extends Data | Response>(
892 worker: Worker,
893 listener: (message: MessageValue<Message>) => void
894 ): void {
895 worker.on('message', listener as MessageHandler<Worker>)
896 }
c97c7edb 897
729c563d 898 /**
41344292 899 * Creates a new worker.
6c6afb84
JB
900 *
901 * @returns Newly created worker.
729c563d 902 */
280c2a77 903 protected abstract createWorker (): Worker
c97c7edb 904
729c563d 905 /**
f06e48d8 906 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 907 * Can be overridden.
729c563d 908 *
38e795c1 909 * @param worker - The newly created worker.
729c563d 910 */
6677a3d3 911 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
912 // Listen to worker messages.
913 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
914 // Send startup message to worker.
915 this.sendToWorker(worker, {
916 ready: false,
21f710aa 917 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
2431bdb4
JB
918 })
919 // Setup worker task statistics computation.
920 this.setWorkerStatistics(worker)
e102732c 921 }
c97c7edb 922
4a6952ff 923 /**
f06e48d8 924 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
925 *
926 * @returns New, completely set up worker.
927 */
928 protected createAndSetupWorker (): Worker {
bdacc2d2 929 const worker = this.createWorker()
280c2a77 930
35cf1c03 931 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 932 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
933 worker.on('error', error => {
934 if (this.emitter != null) {
935 this.emitter.emit(PoolEvents.error, error)
936 }
5bc91f3e 937 if (this.opts.enableTasksQueue === true) {
7d2e3f59 938 this.redistributeQueuedTasks(worker)
5bc91f3e 939 }
2431bdb4 940 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
941 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
942 this.createAndSetupDynamicWorker()
943 } else {
944 this.createAndSetupWorker()
945 }
5baee0d7
JB
946 }
947 })
a35560ba
S
948 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
949 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 950 worker.once('exit', () => {
f06e48d8 951 this.removeWorkerNode(worker)
a974afa6 952 })
280c2a77 953
f06e48d8 954 this.pushWorkerNode(worker)
280c2a77
S
955
956 this.afterWorkerSetup(worker)
957
c97c7edb
S
958 return worker
959 }
be0676b3 960
7d2e3f59
JB
961 private redistributeQueuedTasks (worker: Worker): void {
962 const workerNodeKey = this.getWorkerNodeKey(worker)
963 while (this.tasksQueueSize(workerNodeKey) > 0) {
964 let targetWorkerNodeKey: number = workerNodeKey
965 let minQueuedTasks = Infinity
966 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
967 if (
968 workerNodeId !== workerNodeKey &&
969 workerNode.usage.tasks.queued === 0
970 ) {
971 targetWorkerNodeKey = workerNodeId
972 break
973 }
974 if (
975 workerNodeId !== workerNodeKey &&
976 workerNode.usage.tasks.queued < minQueuedTasks
977 ) {
978 minQueuedTasks = workerNode.usage.tasks.queued
979 targetWorkerNodeKey = workerNodeId
980 }
981 }
982 this.enqueueTask(
983 targetWorkerNodeKey,
984 this.dequeueTask(workerNodeKey) as Task<Data>
985 )
986 }
987 }
988
930dcf12
JB
989 /**
990 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
991 *
992 * @returns New, completely set up dynamic worker.
993 */
994 protected createAndSetupDynamicWorker (): Worker {
995 const worker = this.createAndSetupWorker()
996 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 997 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
998 if (
999 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
1000 (message.kill != null &&
1001 ((this.opts.enableTasksQueue === false &&
f59e1027 1002 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 1003 (this.opts.enableTasksQueue === true &&
f59e1027 1004 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 1005 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
1006 ) {
1007 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
1008 void (this.destroyWorker(worker) as Promise<void>)
1009 }
1010 })
21f710aa
JB
1011 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1012 workerInfo.dynamic = true
1013 this.sendToWorker(worker, {
1014 checkAlive: true,
1015 workerId: workerInfo.id as number
1016 })
930dcf12
JB
1017 return worker
1018 }
1019
be0676b3 1020 /**
ff733df7 1021 * This function is the listener registered for each worker message.
be0676b3 1022 *
bdacc2d2 1023 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1024 */
1025 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1026 return message => {
21f710aa 1027 this.checkMessageWorkerId(message)
2431bdb4 1028 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1029 // Worker ready message received
1030 this.handleWorkerReadyMessage(message)
f59e1027 1031 } else if (message.id != null) {
a3445496 1032 // Task execution response received
6b272951
JB
1033 this.handleTaskExecutionResponse(message)
1034 }
1035 }
1036 }
1037
7c8381eb 1038 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1039 const worker = this.getWorkerById(message.workerId)
1040 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1041 message.ready as boolean
2431bdb4
JB
1042 if (this.emitter != null && this.ready) {
1043 this.emitter.emit(PoolEvents.ready, this.info)
1044 }
6b272951
JB
1045 }
1046
1047 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1048 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1049 if (promiseResponse != null) {
1050 if (message.taskError != null) {
1051 if (this.emitter != null) {
1052 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1053 }
6b272951
JB
1054 promiseResponse.reject(message.taskError.message)
1055 } else {
1056 promiseResponse.resolve(message.data as Response)
1057 }
1058 this.afterTaskExecutionHook(promiseResponse.worker, message)
1059 this.promiseResponseMap.delete(message.id as string)
1060 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1061 if (
1062 this.opts.enableTasksQueue === true &&
1063 this.tasksQueueSize(workerNodeKey) > 0
1064 ) {
1065 this.executeTask(
1066 workerNodeKey,
1067 this.dequeueTask(workerNodeKey) as Task<Data>
1068 )
be0676b3 1069 }
6b272951 1070 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1071 }
be0676b3 1072 }
7c0ba920 1073
ff733df7 1074 private checkAndEmitEvents (): void {
1f68cede 1075 if (this.emitter != null) {
ff733df7 1076 if (this.busy) {
2845f2a5 1077 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1078 }
6b27d407 1079 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1080 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1081 }
164d950a
JB
1082 }
1083 }
1084
8a1260a3
JB
1085 /**
1086 * Gets the worker information.
1087 *
1088 * @param workerNodeKey - The worker node key.
1089 */
1090 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1091 return this.workerNodes[workerNodeKey].info
1092 }
1093
a05c10de 1094 /**
f06e48d8 1095 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1096 *
38e795c1 1097 * @param worker - The worker.
f06e48d8 1098 * @returns The worker nodes length.
ea7a90d3 1099 */
f06e48d8 1100 private pushWorkerNode (worker: Worker): number {
4b628b48 1101 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1102 }
c923ce56 1103
51fe3d3c 1104 /**
f06e48d8 1105 * Removes the given worker from the pool worker nodes.
51fe3d3c 1106 *
f06e48d8 1107 * @param worker - The worker.
51fe3d3c 1108 */
416fd65c 1109 private removeWorkerNode (worker: Worker): void {
f06e48d8 1110 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1111 if (workerNodeKey !== -1) {
1112 this.workerNodes.splice(workerNodeKey, 1)
1113 this.workerChoiceStrategyContext.remove(workerNodeKey)
1114 }
51fe3d3c 1115 }
adc3c320 1116
2e81254d 1117 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1118 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1119 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1120 }
1121
f9f00b5f 1122 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1123 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1124 }
1125
416fd65c 1126 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1127 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1128 }
1129
416fd65c 1130 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1131 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1132 }
1133
416fd65c 1134 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1135 while (this.tasksQueueSize(workerNodeKey) > 0) {
1136 this.executeTask(
1137 workerNodeKey,
1138 this.dequeueTask(workerNodeKey) as Task<Data>
1139 )
ff733df7 1140 }
4b628b48 1141 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1142 }
1143
ef41a6e6
JB
1144 private flushTasksQueues (): void {
1145 for (const [workerNodeKey] of this.workerNodes.entries()) {
1146 this.flushTasksQueue(workerNodeKey)
1147 }
1148 }
b6b32453
JB
1149
1150 private setWorkerStatistics (worker: Worker): void {
1151 this.sendToWorker(worker, {
1152 statistics: {
87de9ff5
JB
1153 runTime:
1154 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1155 .runTime.aggregate,
87de9ff5 1156 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1157 .elu.aggregate
21f710aa
JB
1158 },
1159 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1160 })
1161 }
c97c7edb 1162}