feat: internal messaging strict worker id checking
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
4b628b48 21 type TasksQueueOptions
c4855468 22} from './pool'
e102732c
JB
23import type {
24 IWorker,
4b628b48 25 IWorkerNode,
e102732c
JB
26 MessageHandler,
27 Task,
8a1260a3 28 WorkerInfo,
4b628b48 29 WorkerType,
e102732c
JB
30 WorkerUsage
31} from './worker'
a35560ba 32import {
f0d7f803 33 Measurements,
a35560ba 34 WorkerChoiceStrategies,
a20f0ba5
JB
35 type WorkerChoiceStrategy,
36 type WorkerChoiceStrategyOptions
bdaf31cd
JB
37} from './selection-strategies/selection-strategies-types'
38import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 39import { version } from './version'
4b628b48 40import { WorkerNode } from './worker-node'
23ccf9d7 41
729c563d 42/**
ea7a90d3 43 * Base class that implements some shared logic for all poolifier pools.
729c563d 44 *
38e795c1 45 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
46 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
47 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 48 */
c97c7edb 49export abstract class AbstractPool<
f06e48d8 50 Worker extends IWorker,
d3c8a1a8
S
51 Data = unknown,
52 Response = unknown
c4855468 53> implements IPool<Worker, Data, Response> {
afc003b2 54 /** @inheritDoc */
4b628b48 55 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 56
afc003b2 57 /** @inheritDoc */
7c0ba920
JB
58 public readonly emitter?: PoolEmitter
59
be0676b3 60 /**
a3445496 61 * The execution response promise map.
be0676b3 62 *
2740a743 63 * - `key`: The message id of each submitted task.
a3445496 64 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 65 *
a3445496 66 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 67 */
c923ce56
JB
68 protected promiseResponseMap: Map<
69 string,
70 PromiseResponseWrapper<Worker, Response>
71 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 72
a35560ba 73 /**
51fe3d3c 74 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
75 */
76 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
77 Worker,
78 Data,
79 Response
a35560ba
S
80 >
81
afe0d5bf
JB
82 /**
83 * The start timestamp of the pool.
84 */
85 private readonly startTimestamp
86
729c563d
S
87 /**
88 * Constructs a new poolifier pool.
89 *
38e795c1 90 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 91 * @param filePath - Path to the worker file.
38e795c1 92 * @param opts - Options for the pool.
729c563d 93 */
c97c7edb 94 public constructor (
b4213b7f
JB
95 protected readonly numberOfWorkers: number,
96 protected readonly filePath: string,
97 protected readonly opts: PoolOptions<Worker>
c97c7edb 98 ) {
78cea37e 99 if (!this.isMain()) {
c97c7edb
S
100 throw new Error('Cannot start a pool from a worker!')
101 }
8d3782fa 102 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 103 this.checkFilePath(this.filePath)
7c0ba920 104 this.checkPoolOptions(this.opts)
1086026a 105
7254e419
JB
106 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
107 this.executeTask = this.executeTask.bind(this)
108 this.enqueueTask = this.enqueueTask.bind(this)
109 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 110
6bd72cd0 111 if (this.opts.enableEvents === true) {
7c0ba920
JB
112 this.emitter = new PoolEmitter()
113 }
d59df138
JB
114 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
115 Worker,
116 Data,
117 Response
da309861
JB
118 >(
119 this,
120 this.opts.workerChoiceStrategy,
121 this.opts.workerChoiceStrategyOptions
122 )
b6b32453
JB
123
124 this.setupHook()
125
afe0d5bf 126 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
127 this.createAndSetupWorker()
128 }
afe0d5bf
JB
129
130 this.startTimestamp = performance.now()
c97c7edb
S
131 }
132
a35560ba 133 private checkFilePath (filePath: string): void {
ffcbbad8
JB
134 if (
135 filePath == null ||
136 (typeof filePath === 'string' && filePath.trim().length === 0)
137 ) {
c510fea7
APA
138 throw new Error('Please specify a file with a worker implementation')
139 }
140 }
141
8d3782fa
JB
142 private checkNumberOfWorkers (numberOfWorkers: number): void {
143 if (numberOfWorkers == null) {
144 throw new Error(
145 'Cannot instantiate a pool without specifying the number of workers'
146 )
78cea37e 147 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 148 throw new TypeError(
0d80593b 149 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
150 )
151 } else if (numberOfWorkers < 0) {
473c717a 152 throw new RangeError(
8d3782fa
JB
153 'Cannot instantiate a pool with a negative number of workers'
154 )
6b27d407 155 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
156 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
157 }
158 }
159
160 protected checkDynamicPoolSize (min: number, max: number): void {
161 if (this.type === PoolTypes.dynamic && min > max) {
162 throw new RangeError(
163 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
164 )
21f710aa
JB
165 } else if (this.type === PoolTypes.dynamic && min === 0 && max === 0) {
166 throw new RangeError(
167 'Cannot instantiate a dynamic pool with a minimum pool size and a maximum pool size equal to zero'
168 )
2431bdb4
JB
169 } else if (this.type === PoolTypes.dynamic && min === max) {
170 throw new RangeError(
171 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
172 )
8d3782fa
JB
173 }
174 }
175
7c0ba920 176 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
177 if (isPlainObject(opts)) {
178 this.opts.workerChoiceStrategy =
179 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
180 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
181 this.opts.workerChoiceStrategyOptions =
182 opts.workerChoiceStrategyOptions ??
183 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
184 this.checkValidWorkerChoiceStrategyOptions(
185 this.opts.workerChoiceStrategyOptions
186 )
1f68cede 187 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
188 this.opts.enableEvents = opts.enableEvents ?? true
189 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
190 if (this.opts.enableTasksQueue) {
191 this.checkValidTasksQueueOptions(
192 opts.tasksQueueOptions as TasksQueueOptions
193 )
194 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
195 opts.tasksQueueOptions as TasksQueueOptions
196 )
197 }
198 } else {
199 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 200 }
aee46736
JB
201 }
202
203 private checkValidWorkerChoiceStrategy (
204 workerChoiceStrategy: WorkerChoiceStrategy
205 ): void {
206 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 207 throw new Error(
aee46736 208 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
209 )
210 }
7c0ba920
JB
211 }
212
0d80593b
JB
213 private checkValidWorkerChoiceStrategyOptions (
214 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
215 ): void {
216 if (!isPlainObject(workerChoiceStrategyOptions)) {
217 throw new TypeError(
218 'Invalid worker choice strategy options: must be a plain object'
219 )
220 }
49be33fe
JB
221 if (
222 workerChoiceStrategyOptions.weights != null &&
6b27d407 223 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
224 ) {
225 throw new Error(
226 'Invalid worker choice strategy options: must have a weight for each worker node'
227 )
228 }
f0d7f803
JB
229 if (
230 workerChoiceStrategyOptions.measurement != null &&
231 !Object.values(Measurements).includes(
232 workerChoiceStrategyOptions.measurement
233 )
234 ) {
235 throw new Error(
236 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
237 )
238 }
0d80593b
JB
239 }
240
a20f0ba5
JB
241 private checkValidTasksQueueOptions (
242 tasksQueueOptions: TasksQueueOptions
243 ): void {
0d80593b
JB
244 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
245 throw new TypeError('Invalid tasks queue options: must be a plain object')
246 }
f0d7f803
JB
247 if (
248 tasksQueueOptions?.concurrency != null &&
249 !Number.isSafeInteger(tasksQueueOptions.concurrency)
250 ) {
251 throw new TypeError(
252 'Invalid worker tasks concurrency: must be an integer'
253 )
254 }
255 if (
256 tasksQueueOptions?.concurrency != null &&
257 tasksQueueOptions.concurrency <= 0
258 ) {
a20f0ba5 259 throw new Error(
f0d7f803 260 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
261 )
262 }
263 }
264
08f3f44c 265 /** @inheritDoc */
6b27d407
JB
266 public get info (): PoolInfo {
267 return {
23ccf9d7 268 version,
6b27d407 269 type: this.type,
184855e6 270 worker: this.worker,
2431bdb4
JB
271 ready: this.ready,
272 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
273 minSize: this.minSize,
274 maxSize: this.maxSize,
c05f0d50
JB
275 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
276 .runTime.aggregate &&
1305e9a8
JB
277 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
278 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
279 workerNodes: this.workerNodes.length,
280 idleWorkerNodes: this.workerNodes.reduce(
281 (accumulator, workerNode) =>
f59e1027 282 workerNode.usage.tasks.executing === 0
a4e07f72
JB
283 ? accumulator + 1
284 : accumulator,
6b27d407
JB
285 0
286 ),
287 busyWorkerNodes: this.workerNodes.reduce(
288 (accumulator, workerNode) =>
f59e1027 289 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
290 0
291 ),
a4e07f72 292 executedTasks: this.workerNodes.reduce(
6b27d407 293 (accumulator, workerNode) =>
f59e1027 294 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
295 0
296 ),
297 executingTasks: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
f59e1027 299 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
300 0
301 ),
302 queuedTasks: this.workerNodes.reduce(
df593701 303 (accumulator, workerNode) =>
f59e1027 304 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
305 0
306 ),
307 maxQueuedTasks: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
f59e1027 309 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 310 0
a4e07f72
JB
311 ),
312 failedTasks: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
f59e1027 314 accumulator + workerNode.usage.tasks.failed,
a4e07f72 315 0
1dcf8b7b
JB
316 ),
317 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
318 .runTime.aggregate && {
319 runTime: {
98e72cda
JB
320 minimum: round(
321 Math.min(
322 ...this.workerNodes.map(
323 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
324 )
1dcf8b7b
JB
325 )
326 ),
98e72cda
JB
327 maximum: round(
328 Math.max(
329 ...this.workerNodes.map(
330 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
331 )
1dcf8b7b 332 )
98e72cda
JB
333 ),
334 average: round(
335 this.workerNodes.reduce(
336 (accumulator, workerNode) =>
337 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
338 0
339 ) /
340 this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + (workerNode.usage.tasks?.executed ?? 0),
343 0
344 )
345 ),
346 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
347 .runTime.median && {
348 median: round(
349 median(
350 this.workerNodes.map(
351 workerNode => workerNode.usage.runTime?.median ?? 0
352 )
353 )
354 )
355 })
1dcf8b7b
JB
356 }
357 }),
358 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
359 .waitTime.aggregate && {
360 waitTime: {
98e72cda
JB
361 minimum: round(
362 Math.min(
363 ...this.workerNodes.map(
364 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
365 )
1dcf8b7b
JB
366 )
367 ),
98e72cda
JB
368 maximum: round(
369 Math.max(
370 ...this.workerNodes.map(
371 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
372 )
1dcf8b7b 373 )
98e72cda
JB
374 ),
375 average: round(
376 this.workerNodes.reduce(
377 (accumulator, workerNode) =>
378 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
379 0
380 ) /
381 this.workerNodes.reduce(
382 (accumulator, workerNode) =>
383 accumulator + (workerNode.usage.tasks?.executed ?? 0),
384 0
385 )
386 ),
387 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
388 .waitTime.median && {
389 median: round(
390 median(
391 this.workerNodes.map(
392 workerNode => workerNode.usage.waitTime?.median ?? 0
393 )
394 )
395 )
396 })
1dcf8b7b
JB
397 }
398 })
6b27d407
JB
399 }
400 }
08f3f44c 401
2431bdb4
JB
402 private get starting (): boolean {
403 return (
404 !this.full ||
405 (this.full && this.workerNodes.some(workerNode => !workerNode.info.ready))
406 )
407 }
408
409 private get ready (): boolean {
410 return (
411 this.full && this.workerNodes.every(workerNode => workerNode.info.ready)
412 )
413 }
414
afe0d5bf
JB
415 /**
416 * Gets the approximate pool utilization.
417 *
418 * @returns The pool utilization.
419 */
420 private get utilization (): number {
fe7d90db
JB
421 const poolRunTimeCapacity =
422 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
423 const totalTasksRunTime = this.workerNodes.reduce(
424 (accumulator, workerNode) =>
71514351 425 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
426 0
427 )
428 const totalTasksWaitTime = this.workerNodes.reduce(
429 (accumulator, workerNode) =>
71514351 430 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
431 0
432 )
433 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
434 }
435
8881ae32
JB
436 /**
437 * Pool type.
438 *
439 * If it is `'dynamic'`, it provides the `max` property.
440 */
441 protected abstract get type (): PoolType
442
184855e6
JB
443 /**
444 * Gets the worker type.
445 */
446 protected abstract get worker (): WorkerType
447
c2ade475 448 /**
6b27d407 449 * Pool minimum size.
c2ade475 450 */
6b27d407 451 protected abstract get minSize (): number
ff733df7
JB
452
453 /**
6b27d407 454 * Pool maximum size.
ff733df7 455 */
6b27d407 456 protected abstract get maxSize (): number
a35560ba 457
f59e1027
JB
458 /**
459 * Get the worker given its id.
460 *
461 * @param workerId - The worker id.
462 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
463 */
464 private getWorkerById (workerId: number): Worker | undefined {
465 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
466 ?.worker
467 }
468
21f710aa
JB
469 private checkMessageWorkerId (message: MessageValue<Response>): void {
470 if (
471 message.workerId != null &&
472 this.getWorkerById(message.workerId) == null
473 ) {
474 throw new Error(
475 `Worker message received from unknown worker '${message.workerId}'`
476 )
477 }
478 }
479
ffcbbad8 480 /**
f06e48d8 481 * Gets the given worker its worker node key.
ffcbbad8
JB
482 *
483 * @param worker - The worker.
f59e1027 484 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 485 */
f06e48d8
JB
486 private getWorkerNodeKey (worker: Worker): number {
487 return this.workerNodes.findIndex(
488 workerNode => workerNode.worker === worker
489 )
bf9549ae
JB
490 }
491
afc003b2 492 /** @inheritDoc */
a35560ba 493 public setWorkerChoiceStrategy (
59219cbb
JB
494 workerChoiceStrategy: WorkerChoiceStrategy,
495 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 496 ): void {
aee46736 497 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 498 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
499 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
500 this.opts.workerChoiceStrategy
501 )
502 if (workerChoiceStrategyOptions != null) {
503 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
504 }
8a1260a3 505 for (const workerNode of this.workerNodes) {
4b628b48 506 workerNode.resetUsage()
b6b32453 507 this.setWorkerStatistics(workerNode.worker)
59219cbb 508 }
a20f0ba5
JB
509 }
510
511 /** @inheritDoc */
512 public setWorkerChoiceStrategyOptions (
513 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
514 ): void {
0d80593b 515 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
516 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
517 this.workerChoiceStrategyContext.setOptions(
518 this.opts.workerChoiceStrategyOptions
a35560ba
S
519 )
520 }
521
a20f0ba5 522 /** @inheritDoc */
8f52842f
JB
523 public enableTasksQueue (
524 enable: boolean,
525 tasksQueueOptions?: TasksQueueOptions
526 ): void {
a20f0ba5 527 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 528 this.flushTasksQueues()
a20f0ba5
JB
529 }
530 this.opts.enableTasksQueue = enable
8f52842f 531 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
532 }
533
534 /** @inheritDoc */
8f52842f 535 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 536 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
537 this.checkValidTasksQueueOptions(tasksQueueOptions)
538 this.opts.tasksQueueOptions =
539 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 540 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
541 delete this.opts.tasksQueueOptions
542 }
543 }
544
545 private buildTasksQueueOptions (
546 tasksQueueOptions: TasksQueueOptions
547 ): TasksQueueOptions {
548 return {
549 concurrency: tasksQueueOptions?.concurrency ?? 1
550 }
551 }
552
c319c66b
JB
553 /**
554 * Whether the pool is full or not.
555 *
556 * The pool filling boolean status.
557 */
dea903a8
JB
558 protected get full (): boolean {
559 return this.workerNodes.length >= this.maxSize
560 }
c2ade475 561
c319c66b
JB
562 /**
563 * Whether the pool is busy or not.
564 *
565 * The pool busyness boolean status.
566 */
567 protected abstract get busy (): boolean
7c0ba920 568
6c6afb84
JB
569 /**
570 * Whether worker nodes are executing at least one task.
571 *
572 * @returns Worker nodes busyness boolean status.
573 */
c2ade475 574 protected internalBusy (): boolean {
e0ae6100
JB
575 return (
576 this.workerNodes.findIndex(workerNode => {
f59e1027 577 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
578 }) === -1
579 )
cb70b19d
JB
580 }
581
afc003b2 582 /** @inheritDoc */
a86b6df1 583 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 584 const timestamp = performance.now()
20dcad1a 585 const workerNodeKey = this.chooseWorkerNode()
adc3c320 586 const submittedTask: Task<Data> = {
a86b6df1 587 name,
e5a5c0fc
JB
588 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
589 data: data ?? ({} as Data),
b6b32453 590 timestamp,
21f710aa 591 workerId: this.getWorkerInfo(workerNodeKey).id as number,
2845f2a5 592 id: randomUUID()
adc3c320 593 }
2e81254d 594 const res = new Promise<Response>((resolve, reject) => {
02706357 595 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
596 resolve,
597 reject,
20dcad1a 598 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
599 })
600 })
ff733df7
JB
601 if (
602 this.opts.enableTasksQueue === true &&
7171d33f 603 (this.busy ||
f59e1027 604 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 605 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 606 .concurrency as number))
ff733df7 607 ) {
26a929d7
JB
608 this.enqueueTask(workerNodeKey, submittedTask)
609 } else {
2e81254d 610 this.executeTask(workerNodeKey, submittedTask)
adc3c320 611 }
ff733df7 612 this.checkAndEmitEvents()
78cea37e 613 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
614 return res
615 }
c97c7edb 616
afc003b2 617 /** @inheritDoc */
c97c7edb 618 public async destroy (): Promise<void> {
1fbcaa7c 619 await Promise.all(
875a7c37
JB
620 this.workerNodes.map(async (workerNode, workerNodeKey) => {
621 this.flushTasksQueue(workerNodeKey)
47aacbaa 622 // FIXME: wait for tasks to be finished
920278a2
JB
623 const workerExitPromise = new Promise<void>(resolve => {
624 workerNode.worker.on('exit', () => {
625 resolve()
626 })
627 })
f06e48d8 628 await this.destroyWorker(workerNode.worker)
920278a2 629 await workerExitPromise
1fbcaa7c
JB
630 })
631 )
c97c7edb
S
632 }
633
4a6952ff 634 /**
6c6afb84 635 * Terminates the given worker.
4a6952ff 636 *
f06e48d8 637 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
638 */
639 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 640
729c563d 641 /**
6677a3d3
JB
642 * Setup hook to execute code before worker nodes are created in the abstract constructor.
643 * Can be overridden.
afc003b2
JB
644 *
645 * @virtual
729c563d 646 */
280c2a77 647 protected setupHook (): void {
d99ba5a8 648 // Intentionally empty
280c2a77 649 }
c97c7edb 650
729c563d 651 /**
280c2a77
S
652 * Should return whether the worker is the main worker or not.
653 */
654 protected abstract isMain (): boolean
655
656 /**
2e81254d 657 * Hook executed before the worker task execution.
bf9549ae 658 * Can be overridden.
729c563d 659 *
f06e48d8 660 * @param workerNodeKey - The worker node key.
1c6fe997 661 * @param task - The task to execute.
729c563d 662 */
1c6fe997
JB
663 protected beforeTaskExecutionHook (
664 workerNodeKey: number,
665 task: Task<Data>
666 ): void {
f59e1027 667 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
668 ++workerUsage.tasks.executing
669 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
670 }
671
c01733f1 672 /**
2e81254d 673 * Hook executed after the worker task execution.
bf9549ae 674 * Can be overridden.
c01733f1 675 *
c923ce56 676 * @param worker - The worker.
38e795c1 677 * @param message - The received message.
c01733f1 678 */
2e81254d 679 protected afterTaskExecutionHook (
c923ce56 680 worker: Worker,
2740a743 681 message: MessageValue<Response>
bf9549ae 682 ): void {
f59e1027 683 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
684 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
685 this.updateRunTimeWorkerUsage(workerUsage, message)
686 this.updateEluWorkerUsage(workerUsage, message)
687 }
688
689 private updateTaskStatisticsWorkerUsage (
690 workerUsage: WorkerUsage,
691 message: MessageValue<Response>
692 ): void {
a4e07f72
JB
693 const workerTaskStatistics = workerUsage.tasks
694 --workerTaskStatistics.executing
98e72cda
JB
695 if (message.taskError == null) {
696 ++workerTaskStatistics.executed
697 } else {
a4e07f72 698 ++workerTaskStatistics.failed
2740a743 699 }
f8eb0a2a
JB
700 }
701
a4e07f72
JB
702 private updateRunTimeWorkerUsage (
703 workerUsage: WorkerUsage,
f8eb0a2a
JB
704 message: MessageValue<Response>
705 ): void {
87de9ff5
JB
706 if (
707 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 708 .aggregate
87de9ff5 709 ) {
f7510105 710 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
711 workerUsage.runTime.aggregate =
712 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
713 workerUsage.runTime.minimum = Math.min(
714 taskRunTime,
71514351 715 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
716 )
717 workerUsage.runTime.maximum = Math.max(
718 taskRunTime,
71514351 719 workerUsage.runTime?.maximum ?? -Infinity
f7510105 720 )
c6bd2650 721 if (
932fc8be
JB
722 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
723 .average &&
a4e07f72 724 workerUsage.tasks.executed !== 0
c6bd2650 725 ) {
a4e07f72 726 workerUsage.runTime.average =
98e72cda 727 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 728 }
3fa4cdd2 729 if (
932fc8be
JB
730 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
731 .median &&
d715b7bc 732 message.taskPerformance?.runTime != null
3fa4cdd2 733 ) {
a4e07f72
JB
734 workerUsage.runTime.history.push(message.taskPerformance.runTime)
735 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 736 }
3032893a 737 }
f8eb0a2a
JB
738 }
739
a4e07f72
JB
740 private updateWaitTimeWorkerUsage (
741 workerUsage: WorkerUsage,
1c6fe997 742 task: Task<Data>
f8eb0a2a 743 ): void {
1c6fe997
JB
744 const timestamp = performance.now()
745 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
746 if (
747 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 748 .aggregate
87de9ff5 749 ) {
71514351
JB
750 workerUsage.waitTime.aggregate =
751 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
752 workerUsage.waitTime.minimum = Math.min(
753 taskWaitTime,
71514351 754 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
755 )
756 workerUsage.waitTime.maximum = Math.max(
757 taskWaitTime,
71514351 758 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 759 )
09a6305f 760 if (
87de9ff5 761 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 762 .waitTime.average &&
a4e07f72 763 workerUsage.tasks.executed !== 0
09a6305f 764 ) {
a4e07f72 765 workerUsage.waitTime.average =
98e72cda 766 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
767 }
768 if (
87de9ff5 769 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 770 .waitTime.median
09a6305f 771 ) {
1c6fe997 772 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 773 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 774 }
0567595a 775 }
c01733f1 776 }
777
a4e07f72 778 private updateEluWorkerUsage (
5df69fab 779 workerUsage: WorkerUsage,
62c15a68
JB
780 message: MessageValue<Response>
781 ): void {
5df69fab
JB
782 if (
783 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
784 .aggregate
785 ) {
f7510105 786 if (message.taskPerformance?.elu != null) {
71514351
JB
787 workerUsage.elu.idle.aggregate =
788 (workerUsage.elu.idle?.aggregate ?? 0) +
789 message.taskPerformance.elu.idle
790 workerUsage.elu.active.aggregate =
791 (workerUsage.elu.active?.aggregate ?? 0) +
792 message.taskPerformance.elu.active
f7510105
JB
793 if (workerUsage.elu.utilization != null) {
794 workerUsage.elu.utilization =
795 (workerUsage.elu.utilization +
796 message.taskPerformance.elu.utilization) /
797 2
798 } else {
799 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
800 }
801 workerUsage.elu.idle.minimum = Math.min(
802 message.taskPerformance.elu.idle,
71514351 803 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
804 )
805 workerUsage.elu.idle.maximum = Math.max(
806 message.taskPerformance.elu.idle,
71514351 807 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
808 )
809 workerUsage.elu.active.minimum = Math.min(
810 message.taskPerformance.elu.active,
71514351 811 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
812 )
813 workerUsage.elu.active.maximum = Math.max(
814 message.taskPerformance.elu.active,
71514351 815 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 816 )
71514351
JB
817 if (
818 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
819 .average &&
820 workerUsage.tasks.executed !== 0
821 ) {
71514351 822 workerUsage.elu.idle.average =
98e72cda 823 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 824 workerUsage.elu.active.average =
98e72cda 825 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
826 }
827 if (
828 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 829 .median
71514351
JB
830 ) {
831 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
832 workerUsage.elu.active.history.push(
833 message.taskPerformance.elu.active
834 )
835 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
836 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
837 }
62c15a68
JB
838 }
839 }
840 }
841
280c2a77 842 /**
f06e48d8 843 * Chooses a worker node for the next task.
280c2a77 844 *
6c6afb84 845 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 846 *
20dcad1a 847 * @returns The worker node key
280c2a77 848 */
6c6afb84 849 private chooseWorkerNode (): number {
930dcf12 850 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
851 const worker = this.createAndSetupDynamicWorker()
852 if (
853 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
854 ) {
855 return this.getWorkerNodeKey(worker)
856 }
17393ac8 857 }
930dcf12
JB
858 return this.workerChoiceStrategyContext.execute()
859 }
860
6c6afb84
JB
861 /**
862 * Conditions for dynamic worker creation.
863 *
864 * @returns Whether to create a dynamic worker or not.
865 */
866 private shallCreateDynamicWorker (): boolean {
930dcf12 867 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
868 }
869
280c2a77 870 /**
675bb809 871 * Sends a message to the given worker.
280c2a77 872 *
38e795c1
JB
873 * @param worker - The worker which should receive the message.
874 * @param message - The message.
280c2a77
S
875 */
876 protected abstract sendToWorker (
877 worker: Worker,
878 message: MessageValue<Data>
879 ): void
880
4a6952ff 881 /**
f06e48d8 882 * Registers a listener callback on the given worker.
4a6952ff 883 *
38e795c1
JB
884 * @param worker - The worker which should register a listener.
885 * @param listener - The message listener callback.
4a6952ff 886 */
e102732c
JB
887 private registerWorkerMessageListener<Message extends Data | Response>(
888 worker: Worker,
889 listener: (message: MessageValue<Message>) => void
890 ): void {
891 worker.on('message', listener as MessageHandler<Worker>)
892 }
c97c7edb 893
729c563d 894 /**
41344292 895 * Creates a new worker.
6c6afb84
JB
896 *
897 * @returns Newly created worker.
729c563d 898 */
280c2a77 899 protected abstract createWorker (): Worker
c97c7edb 900
729c563d 901 /**
f06e48d8 902 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 903 * Can be overridden.
729c563d 904 *
38e795c1 905 * @param worker - The newly created worker.
729c563d 906 */
6677a3d3 907 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
908 // Listen to worker messages.
909 this.registerWorkerMessageListener(worker, this.workerListener())
2431bdb4
JB
910 // Send startup message to worker.
911 this.sendToWorker(worker, {
912 ready: false,
21f710aa 913 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
2431bdb4
JB
914 })
915 // Setup worker task statistics computation.
916 this.setWorkerStatistics(worker)
e102732c 917 }
c97c7edb 918
4a6952ff 919 /**
f06e48d8 920 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
921 *
922 * @returns New, completely set up worker.
923 */
924 protected createAndSetupWorker (): Worker {
bdacc2d2 925 const worker = this.createWorker()
280c2a77 926
35cf1c03 927 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 928 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
929 worker.on('error', error => {
930 if (this.emitter != null) {
931 this.emitter.emit(PoolEvents.error, error)
932 }
5bc91f3e 933 if (this.opts.enableTasksQueue === true) {
7d2e3f59 934 this.redistributeQueuedTasks(worker)
5bc91f3e 935 }
2431bdb4 936 if (this.opts.restartWorkerOnError === true && !this.starting) {
8a1260a3
JB
937 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
938 this.createAndSetupDynamicWorker()
939 } else {
940 this.createAndSetupWorker()
941 }
5baee0d7
JB
942 }
943 })
a35560ba
S
944 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
945 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 946 worker.once('exit', () => {
f06e48d8 947 this.removeWorkerNode(worker)
a974afa6 948 })
280c2a77 949
f06e48d8 950 this.pushWorkerNode(worker)
280c2a77
S
951
952 this.afterWorkerSetup(worker)
953
c97c7edb
S
954 return worker
955 }
be0676b3 956
7d2e3f59
JB
957 private redistributeQueuedTasks (worker: Worker): void {
958 const workerNodeKey = this.getWorkerNodeKey(worker)
959 while (this.tasksQueueSize(workerNodeKey) > 0) {
960 let targetWorkerNodeKey: number = workerNodeKey
961 let minQueuedTasks = Infinity
962 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
963 if (
964 workerNodeId !== workerNodeKey &&
965 workerNode.usage.tasks.queued === 0
966 ) {
967 targetWorkerNodeKey = workerNodeId
968 break
969 }
970 if (
971 workerNodeId !== workerNodeKey &&
972 workerNode.usage.tasks.queued < minQueuedTasks
973 ) {
974 minQueuedTasks = workerNode.usage.tasks.queued
975 targetWorkerNodeKey = workerNodeId
976 }
977 }
978 this.enqueueTask(
979 targetWorkerNodeKey,
980 this.dequeueTask(workerNodeKey) as Task<Data>
981 )
982 }
983 }
984
930dcf12
JB
985 /**
986 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
987 *
988 * @returns New, completely set up dynamic worker.
989 */
990 protected createAndSetupDynamicWorker (): Worker {
991 const worker = this.createAndSetupWorker()
992 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 993 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
994 if (
995 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
996 (message.kill != null &&
997 ((this.opts.enableTasksQueue === false &&
f59e1027 998 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 999 (this.opts.enableTasksQueue === true &&
f59e1027 1000 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 1001 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
1002 ) {
1003 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
1004 void (this.destroyWorker(worker) as Promise<void>)
1005 }
1006 })
21f710aa
JB
1007 const workerInfo = this.getWorkerInfo(this.getWorkerNodeKey(worker))
1008 workerInfo.dynamic = true
1009 this.sendToWorker(worker, {
1010 checkAlive: true,
1011 workerId: workerInfo.id as number
1012 })
930dcf12
JB
1013 return worker
1014 }
1015
be0676b3 1016 /**
ff733df7 1017 * This function is the listener registered for each worker message.
be0676b3 1018 *
bdacc2d2 1019 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1020 */
1021 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 1022 return message => {
21f710aa 1023 this.checkMessageWorkerId(message)
2431bdb4 1024 if (message.ready != null && message.workerId != null) {
7c8381eb
JB
1025 // Worker ready message received
1026 this.handleWorkerReadyMessage(message)
f59e1027 1027 } else if (message.id != null) {
a3445496 1028 // Task execution response received
6b272951
JB
1029 this.handleTaskExecutionResponse(message)
1030 }
1031 }
1032 }
1033
7c8381eb 1034 private handleWorkerReadyMessage (message: MessageValue<Response>): void {
21f710aa
JB
1035 const worker = this.getWorkerById(message.workerId)
1036 this.getWorkerInfo(this.getWorkerNodeKey(worker as Worker)).ready =
1037 message.ready as boolean
2431bdb4
JB
1038 if (this.emitter != null && this.ready) {
1039 this.emitter.emit(PoolEvents.ready, this.info)
1040 }
6b272951
JB
1041 }
1042
1043 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1044 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1045 if (promiseResponse != null) {
1046 if (message.taskError != null) {
1047 if (this.emitter != null) {
1048 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1049 }
6b272951
JB
1050 promiseResponse.reject(message.taskError.message)
1051 } else {
1052 promiseResponse.resolve(message.data as Response)
1053 }
1054 this.afterTaskExecutionHook(promiseResponse.worker, message)
1055 this.promiseResponseMap.delete(message.id as string)
1056 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1057 if (
1058 this.opts.enableTasksQueue === true &&
1059 this.tasksQueueSize(workerNodeKey) > 0
1060 ) {
1061 this.executeTask(
1062 workerNodeKey,
1063 this.dequeueTask(workerNodeKey) as Task<Data>
1064 )
be0676b3 1065 }
6b272951 1066 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1067 }
be0676b3 1068 }
7c0ba920 1069
ff733df7 1070 private checkAndEmitEvents (): void {
1f68cede 1071 if (this.emitter != null) {
ff733df7 1072 if (this.busy) {
2845f2a5 1073 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1074 }
6b27d407 1075 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1076 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1077 }
164d950a
JB
1078 }
1079 }
1080
8a1260a3
JB
1081 /**
1082 * Gets the worker information.
1083 *
1084 * @param workerNodeKey - The worker node key.
1085 */
1086 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1087 return this.workerNodes[workerNodeKey].info
1088 }
1089
a05c10de 1090 /**
f06e48d8 1091 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1092 *
38e795c1 1093 * @param worker - The worker.
f06e48d8 1094 * @returns The worker nodes length.
ea7a90d3 1095 */
f06e48d8 1096 private pushWorkerNode (worker: Worker): number {
4b628b48 1097 return this.workerNodes.push(new WorkerNode(worker, this.worker))
ea7a90d3 1098 }
c923ce56 1099
51fe3d3c 1100 /**
f06e48d8 1101 * Removes the given worker from the pool worker nodes.
51fe3d3c 1102 *
f06e48d8 1103 * @param worker - The worker.
51fe3d3c 1104 */
416fd65c 1105 private removeWorkerNode (worker: Worker): void {
f06e48d8 1106 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1107 if (workerNodeKey !== -1) {
1108 this.workerNodes.splice(workerNodeKey, 1)
1109 this.workerChoiceStrategyContext.remove(workerNodeKey)
1110 }
51fe3d3c 1111 }
adc3c320 1112
2e81254d 1113 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1114 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1115 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1116 }
1117
f9f00b5f 1118 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
4b628b48 1119 return this.workerNodes[workerNodeKey].enqueueTask(task)
adc3c320
JB
1120 }
1121
416fd65c 1122 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1123 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1124 }
1125
416fd65c 1126 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1127 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1128 }
1129
416fd65c 1130 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1131 while (this.tasksQueueSize(workerNodeKey) > 0) {
1132 this.executeTask(
1133 workerNodeKey,
1134 this.dequeueTask(workerNodeKey) as Task<Data>
1135 )
ff733df7 1136 }
4b628b48 1137 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1138 }
1139
ef41a6e6
JB
1140 private flushTasksQueues (): void {
1141 for (const [workerNodeKey] of this.workerNodes.entries()) {
1142 this.flushTasksQueue(workerNodeKey)
1143 }
1144 }
b6b32453
JB
1145
1146 private setWorkerStatistics (worker: Worker): void {
1147 this.sendToWorker(worker, {
1148 statistics: {
87de9ff5
JB
1149 runTime:
1150 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1151 .runTime.aggregate,
87de9ff5 1152 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1153 .elu.aggregate
21f710aa
JB
1154 },
1155 workerId: this.getWorkerInfo(this.getWorkerNodeKey(worker)).id as number
b6b32453
JB
1156 })
1157 }
c97c7edb 1158}