refactor: factor out code to redistribute queued tasks
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
8a1260a3 31 WorkerInfo,
e102732c
JB
32 WorkerNode,
33 WorkerUsage
34} from './worker'
a35560ba 35import {
f0d7f803 36 Measurements,
a35560ba 37 WorkerChoiceStrategies,
a20f0ba5
JB
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
bdaf31cd
JB
40} from './selection-strategies/selection-strategies-types'
41import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 42import { version } from './version'
23ccf9d7 43
729c563d 44/**
ea7a90d3 45 * Base class that implements some shared logic for all poolifier pools.
729c563d 46 *
38e795c1 47 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 50 */
c97c7edb 51export abstract class AbstractPool<
f06e48d8 52 Worker extends IWorker,
d3c8a1a8
S
53 Data = unknown,
54 Response = unknown
c4855468 55> implements IPool<Worker, Data, Response> {
afc003b2 56 /** @inheritDoc */
f06e48d8 57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 58
afc003b2 59 /** @inheritDoc */
7c0ba920
JB
60 public readonly emitter?: PoolEmitter
61
be0676b3 62 /**
a3445496 63 * The execution response promise map.
be0676b3 64 *
2740a743 65 * - `key`: The message id of each submitted task.
a3445496 66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 67 *
a3445496 68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 69 */
c923ce56
JB
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 74
a35560ba 75 /**
51fe3d3c 76 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
79 Worker,
80 Data,
81 Response
a35560ba
S
82 >
83
afe0d5bf
JB
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
729c563d
S
89 /**
90 * Constructs a new poolifier pool.
91 *
38e795c1 92 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 93 * @param filePath - Path to the worker file.
38e795c1 94 * @param opts - Options for the pool.
729c563d 95 */
c97c7edb 96 public constructor (
b4213b7f
JB
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
c97c7edb 100 ) {
78cea37e 101 if (!this.isMain()) {
c97c7edb
S
102 throw new Error('Cannot start a pool from a worker!')
103 }
8d3782fa 104 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 105 this.checkFilePath(this.filePath)
7c0ba920 106 this.checkPoolOptions(this.opts)
1086026a 107
7254e419
JB
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 112
6bd72cd0 113 if (this.opts.enableEvents === true) {
7c0ba920
JB
114 this.emitter = new PoolEmitter()
115 }
d59df138
JB
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
da309861
JB
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
b6b32453
JB
125
126 this.setupHook()
127
afe0d5bf 128 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
129 this.createAndSetupWorker()
130 }
afe0d5bf
JB
131
132 this.startTimestamp = performance.now()
c97c7edb
S
133 }
134
a35560ba 135 private checkFilePath (filePath: string): void {
ffcbbad8
JB
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
c510fea7
APA
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
8d3782fa
JB
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
78cea37e 149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 150 throw new TypeError(
0d80593b 151 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
152 )
153 } else if (numberOfWorkers < 0) {
473c717a 154 throw new RangeError(
8d3782fa
JB
155 'Cannot instantiate a pool with a negative number of workers'
156 )
6b27d407 157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
7c0ba920 162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
1f68cede 173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 186 }
aee46736
JB
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 193 throw new Error(
aee46736 194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
195 )
196 }
7c0ba920
JB
197 }
198
0d80593b
JB
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
49be33fe
JB
207 if (
208 workerChoiceStrategyOptions.weights != null &&
6b27d407 209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
f0d7f803
JB
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
0d80593b
JB
225 }
226
a20f0ba5
JB
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
0d80593b
JB
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
f0d7f803
JB
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
a20f0ba5 245 throw new Error(
f0d7f803 246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
247 )
248 }
249 }
250
08f3f44c 251 /** @inheritDoc */
6b27d407
JB
252 public get info (): PoolInfo {
253 return {
23ccf9d7 254 version,
6b27d407 255 type: this.type,
184855e6 256 worker: this.worker,
6b27d407
JB
257 minSize: this.minSize,
258 maxSize: this.maxSize,
c05f0d50
JB
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
1305e9a8
JB
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
f59e1027 266 workerNode.usage.tasks.executing === 0
a4e07f72
JB
267 ? accumulator + 1
268 : accumulator,
6b27d407
JB
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
f59e1027 273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
274 0
275 ),
a4e07f72 276 executedTasks: this.workerNodes.reduce(
6b27d407 277 (accumulator, workerNode) =>
f59e1027 278 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
f59e1027 283 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
df593701 287 (accumulator, workerNode) =>
f59e1027 288 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
f59e1027 293 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 294 0
a4e07f72
JB
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
f59e1027 298 accumulator + workerNode.usage.tasks.failed,
a4e07f72 299 0
1dcf8b7b
JB
300 ),
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate && {
303 runTime: {
98e72cda
JB
304 minimum: round(
305 Math.min(
306 ...this.workerNodes.map(
307 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
308 )
1dcf8b7b
JB
309 )
310 ),
98e72cda
JB
311 maximum: round(
312 Math.max(
313 ...this.workerNodes.map(
314 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
315 )
1dcf8b7b 316 )
98e72cda
JB
317 ),
318 average: round(
319 this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
322 0
323 ) /
324 this.workerNodes.reduce(
325 (accumulator, workerNode) =>
326 accumulator + (workerNode.usage.tasks?.executed ?? 0),
327 0
328 )
329 ),
330 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
331 .runTime.median && {
332 median: round(
333 median(
334 this.workerNodes.map(
335 workerNode => workerNode.usage.runTime?.median ?? 0
336 )
337 )
338 )
339 })
1dcf8b7b
JB
340 }
341 }),
342 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
343 .waitTime.aggregate && {
344 waitTime: {
98e72cda
JB
345 minimum: round(
346 Math.min(
347 ...this.workerNodes.map(
348 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
349 )
1dcf8b7b
JB
350 )
351 ),
98e72cda
JB
352 maximum: round(
353 Math.max(
354 ...this.workerNodes.map(
355 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
356 )
1dcf8b7b 357 )
98e72cda
JB
358 ),
359 average: round(
360 this.workerNodes.reduce(
361 (accumulator, workerNode) =>
362 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
363 0
364 ) /
365 this.workerNodes.reduce(
366 (accumulator, workerNode) =>
367 accumulator + (workerNode.usage.tasks?.executed ?? 0),
368 0
369 )
370 ),
371 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
372 .waitTime.median && {
373 median: round(
374 median(
375 this.workerNodes.map(
376 workerNode => workerNode.usage.waitTime?.median ?? 0
377 )
378 )
379 )
380 })
1dcf8b7b
JB
381 }
382 })
6b27d407
JB
383 }
384 }
08f3f44c 385
afe0d5bf
JB
386 /**
387 * Gets the approximate pool utilization.
388 *
389 * @returns The pool utilization.
390 */
391 private get utilization (): number {
fe7d90db
JB
392 const poolRunTimeCapacity =
393 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
394 const totalTasksRunTime = this.workerNodes.reduce(
395 (accumulator, workerNode) =>
71514351 396 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
397 0
398 )
399 const totalTasksWaitTime = this.workerNodes.reduce(
400 (accumulator, workerNode) =>
71514351 401 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
402 0
403 )
404 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
405 }
406
8881ae32
JB
407 /**
408 * Pool type.
409 *
410 * If it is `'dynamic'`, it provides the `max` property.
411 */
412 protected abstract get type (): PoolType
413
184855e6
JB
414 /**
415 * Gets the worker type.
416 */
417 protected abstract get worker (): WorkerType
418
c2ade475 419 /**
6b27d407 420 * Pool minimum size.
c2ade475 421 */
6b27d407 422 protected abstract get minSize (): number
ff733df7
JB
423
424 /**
6b27d407 425 * Pool maximum size.
ff733df7 426 */
6b27d407 427 protected abstract get maxSize (): number
a35560ba 428
f59e1027
JB
429 /**
430 * Get the worker given its id.
431 *
432 * @param workerId - The worker id.
433 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
434 */
435 private getWorkerById (workerId: number): Worker | undefined {
436 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
437 ?.worker
438 }
439
ffcbbad8 440 /**
f06e48d8 441 * Gets the given worker its worker node key.
ffcbbad8
JB
442 *
443 * @param worker - The worker.
f59e1027 444 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 445 */
f06e48d8
JB
446 private getWorkerNodeKey (worker: Worker): number {
447 return this.workerNodes.findIndex(
448 workerNode => workerNode.worker === worker
449 )
bf9549ae
JB
450 }
451
afc003b2 452 /** @inheritDoc */
a35560ba 453 public setWorkerChoiceStrategy (
59219cbb
JB
454 workerChoiceStrategy: WorkerChoiceStrategy,
455 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 456 ): void {
aee46736 457 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 458 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
459 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
460 this.opts.workerChoiceStrategy
461 )
462 if (workerChoiceStrategyOptions != null) {
463 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
464 }
8a1260a3 465 for (const workerNode of this.workerNodes) {
8604aaab
JB
466 this.setWorkerNodeTasksUsage(
467 workerNode,
8a1260a3 468 this.getInitialWorkerUsage(workerNode.worker)
8604aaab 469 )
b6b32453 470 this.setWorkerStatistics(workerNode.worker)
59219cbb 471 }
a20f0ba5
JB
472 }
473
474 /** @inheritDoc */
475 public setWorkerChoiceStrategyOptions (
476 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
477 ): void {
0d80593b 478 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
479 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
480 this.workerChoiceStrategyContext.setOptions(
481 this.opts.workerChoiceStrategyOptions
a35560ba
S
482 )
483 }
484
a20f0ba5 485 /** @inheritDoc */
8f52842f
JB
486 public enableTasksQueue (
487 enable: boolean,
488 tasksQueueOptions?: TasksQueueOptions
489 ): void {
a20f0ba5 490 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 491 this.flushTasksQueues()
a20f0ba5
JB
492 }
493 this.opts.enableTasksQueue = enable
8f52842f 494 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
495 }
496
497 /** @inheritDoc */
8f52842f 498 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 499 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
500 this.checkValidTasksQueueOptions(tasksQueueOptions)
501 this.opts.tasksQueueOptions =
502 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 503 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
504 delete this.opts.tasksQueueOptions
505 }
506 }
507
508 private buildTasksQueueOptions (
509 tasksQueueOptions: TasksQueueOptions
510 ): TasksQueueOptions {
511 return {
512 concurrency: tasksQueueOptions?.concurrency ?? 1
513 }
514 }
515
c319c66b
JB
516 /**
517 * Whether the pool is full or not.
518 *
519 * The pool filling boolean status.
520 */
dea903a8
JB
521 protected get full (): boolean {
522 return this.workerNodes.length >= this.maxSize
523 }
c2ade475 524
c319c66b
JB
525 /**
526 * Whether the pool is busy or not.
527 *
528 * The pool busyness boolean status.
529 */
530 protected abstract get busy (): boolean
7c0ba920 531
6c6afb84
JB
532 /**
533 * Whether worker nodes are executing at least one task.
534 *
535 * @returns Worker nodes busyness boolean status.
536 */
c2ade475 537 protected internalBusy (): boolean {
e0ae6100
JB
538 return (
539 this.workerNodes.findIndex(workerNode => {
f59e1027 540 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
541 }) === -1
542 )
cb70b19d
JB
543 }
544
afc003b2 545 /** @inheritDoc */
a86b6df1 546 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 547 const timestamp = performance.now()
20dcad1a 548 const workerNodeKey = this.chooseWorkerNode()
adc3c320 549 const submittedTask: Task<Data> = {
a86b6df1 550 name,
e5a5c0fc
JB
551 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
552 data: data ?? ({} as Data),
b6b32453 553 timestamp,
2845f2a5 554 id: randomUUID()
adc3c320 555 }
2e81254d 556 const res = new Promise<Response>((resolve, reject) => {
02706357 557 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
558 resolve,
559 reject,
20dcad1a 560 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
561 })
562 })
ff733df7
JB
563 if (
564 this.opts.enableTasksQueue === true &&
7171d33f 565 (this.busy ||
f59e1027 566 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 567 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 568 .concurrency as number))
ff733df7 569 ) {
26a929d7
JB
570 this.enqueueTask(workerNodeKey, submittedTask)
571 } else {
2e81254d 572 this.executeTask(workerNodeKey, submittedTask)
adc3c320 573 }
ff733df7 574 this.checkAndEmitEvents()
78cea37e 575 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
576 return res
577 }
c97c7edb 578
afc003b2 579 /** @inheritDoc */
c97c7edb 580 public async destroy (): Promise<void> {
1fbcaa7c 581 await Promise.all(
875a7c37
JB
582 this.workerNodes.map(async (workerNode, workerNodeKey) => {
583 this.flushTasksQueue(workerNodeKey)
47aacbaa 584 // FIXME: wait for tasks to be finished
920278a2
JB
585 const workerExitPromise = new Promise<void>(resolve => {
586 workerNode.worker.on('exit', () => {
587 resolve()
588 })
589 })
f06e48d8 590 await this.destroyWorker(workerNode.worker)
920278a2 591 await workerExitPromise
1fbcaa7c
JB
592 })
593 )
c97c7edb
S
594 }
595
4a6952ff 596 /**
6c6afb84 597 * Terminates the given worker.
4a6952ff 598 *
f06e48d8 599 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
600 */
601 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 602
729c563d 603 /**
6677a3d3
JB
604 * Setup hook to execute code before worker nodes are created in the abstract constructor.
605 * Can be overridden.
afc003b2
JB
606 *
607 * @virtual
729c563d 608 */
280c2a77 609 protected setupHook (): void {
d99ba5a8 610 // Intentionally empty
280c2a77 611 }
c97c7edb 612
729c563d 613 /**
280c2a77
S
614 * Should return whether the worker is the main worker or not.
615 */
616 protected abstract isMain (): boolean
617
618 /**
2e81254d 619 * Hook executed before the worker task execution.
bf9549ae 620 * Can be overridden.
729c563d 621 *
f06e48d8 622 * @param workerNodeKey - The worker node key.
1c6fe997 623 * @param task - The task to execute.
729c563d 624 */
1c6fe997
JB
625 protected beforeTaskExecutionHook (
626 workerNodeKey: number,
627 task: Task<Data>
628 ): void {
f59e1027 629 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
630 ++workerUsage.tasks.executing
631 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
632 }
633
c01733f1 634 /**
2e81254d 635 * Hook executed after the worker task execution.
bf9549ae 636 * Can be overridden.
c01733f1 637 *
c923ce56 638 * @param worker - The worker.
38e795c1 639 * @param message - The received message.
c01733f1 640 */
2e81254d 641 protected afterTaskExecutionHook (
c923ce56 642 worker: Worker,
2740a743 643 message: MessageValue<Response>
bf9549ae 644 ): void {
f59e1027 645 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
646 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
647 this.updateRunTimeWorkerUsage(workerUsage, message)
648 this.updateEluWorkerUsage(workerUsage, message)
649 }
650
651 private updateTaskStatisticsWorkerUsage (
652 workerUsage: WorkerUsage,
653 message: MessageValue<Response>
654 ): void {
a4e07f72
JB
655 const workerTaskStatistics = workerUsage.tasks
656 --workerTaskStatistics.executing
98e72cda
JB
657 if (message.taskError == null) {
658 ++workerTaskStatistics.executed
659 } else {
a4e07f72 660 ++workerTaskStatistics.failed
2740a743 661 }
f8eb0a2a
JB
662 }
663
a4e07f72
JB
664 private updateRunTimeWorkerUsage (
665 workerUsage: WorkerUsage,
f8eb0a2a
JB
666 message: MessageValue<Response>
667 ): void {
87de9ff5
JB
668 if (
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 670 .aggregate
87de9ff5 671 ) {
f7510105 672 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
673 workerUsage.runTime.aggregate =
674 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
675 workerUsage.runTime.minimum = Math.min(
676 taskRunTime,
71514351 677 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
678 )
679 workerUsage.runTime.maximum = Math.max(
680 taskRunTime,
71514351 681 workerUsage.runTime?.maximum ?? -Infinity
f7510105 682 )
c6bd2650 683 if (
932fc8be
JB
684 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
685 .average &&
a4e07f72 686 workerUsage.tasks.executed !== 0
c6bd2650 687 ) {
a4e07f72 688 workerUsage.runTime.average =
98e72cda 689 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 690 }
3fa4cdd2 691 if (
932fc8be
JB
692 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
693 .median &&
d715b7bc 694 message.taskPerformance?.runTime != null
3fa4cdd2 695 ) {
a4e07f72
JB
696 workerUsage.runTime.history.push(message.taskPerformance.runTime)
697 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 698 }
3032893a 699 }
f8eb0a2a
JB
700 }
701
a4e07f72
JB
702 private updateWaitTimeWorkerUsage (
703 workerUsage: WorkerUsage,
1c6fe997 704 task: Task<Data>
f8eb0a2a 705 ): void {
1c6fe997
JB
706 const timestamp = performance.now()
707 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
708 if (
709 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 710 .aggregate
87de9ff5 711 ) {
71514351
JB
712 workerUsage.waitTime.aggregate =
713 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
714 workerUsage.waitTime.minimum = Math.min(
715 taskWaitTime,
71514351 716 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
717 )
718 workerUsage.waitTime.maximum = Math.max(
719 taskWaitTime,
71514351 720 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 721 )
09a6305f 722 if (
87de9ff5 723 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 724 .waitTime.average &&
a4e07f72 725 workerUsage.tasks.executed !== 0
09a6305f 726 ) {
a4e07f72 727 workerUsage.waitTime.average =
98e72cda 728 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
729 }
730 if (
87de9ff5 731 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 732 .waitTime.median
09a6305f 733 ) {
1c6fe997 734 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 735 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 736 }
0567595a 737 }
c01733f1 738 }
739
a4e07f72 740 private updateEluWorkerUsage (
5df69fab 741 workerUsage: WorkerUsage,
62c15a68
JB
742 message: MessageValue<Response>
743 ): void {
5df69fab
JB
744 if (
745 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
746 .aggregate
747 ) {
f7510105 748 if (message.taskPerformance?.elu != null) {
71514351
JB
749 workerUsage.elu.idle.aggregate =
750 (workerUsage.elu.idle?.aggregate ?? 0) +
751 message.taskPerformance.elu.idle
752 workerUsage.elu.active.aggregate =
753 (workerUsage.elu.active?.aggregate ?? 0) +
754 message.taskPerformance.elu.active
f7510105
JB
755 if (workerUsage.elu.utilization != null) {
756 workerUsage.elu.utilization =
757 (workerUsage.elu.utilization +
758 message.taskPerformance.elu.utilization) /
759 2
760 } else {
761 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
762 }
763 workerUsage.elu.idle.minimum = Math.min(
764 message.taskPerformance.elu.idle,
71514351 765 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
766 )
767 workerUsage.elu.idle.maximum = Math.max(
768 message.taskPerformance.elu.idle,
71514351 769 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
770 )
771 workerUsage.elu.active.minimum = Math.min(
772 message.taskPerformance.elu.active,
71514351 773 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
774 )
775 workerUsage.elu.active.maximum = Math.max(
776 message.taskPerformance.elu.active,
71514351 777 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 778 )
71514351
JB
779 if (
780 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
781 .average &&
782 workerUsage.tasks.executed !== 0
783 ) {
71514351 784 workerUsage.elu.idle.average =
98e72cda 785 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
71514351 786 workerUsage.elu.active.average =
98e72cda 787 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
71514351
JB
788 }
789 if (
790 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
61e4a75e 791 .median
71514351
JB
792 ) {
793 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
794 workerUsage.elu.active.history.push(
795 message.taskPerformance.elu.active
796 )
797 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
798 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
799 }
62c15a68
JB
800 }
801 }
802 }
803
280c2a77 804 /**
f06e48d8 805 * Chooses a worker node for the next task.
280c2a77 806 *
6c6afb84 807 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 808 *
20dcad1a 809 * @returns The worker node key
280c2a77 810 */
6c6afb84 811 private chooseWorkerNode (): number {
930dcf12 812 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
813 const worker = this.createAndSetupDynamicWorker()
814 if (
815 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
816 ) {
817 return this.getWorkerNodeKey(worker)
818 }
17393ac8 819 }
930dcf12
JB
820 return this.workerChoiceStrategyContext.execute()
821 }
822
6c6afb84
JB
823 /**
824 * Conditions for dynamic worker creation.
825 *
826 * @returns Whether to create a dynamic worker or not.
827 */
828 private shallCreateDynamicWorker (): boolean {
930dcf12 829 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
830 }
831
280c2a77 832 /**
675bb809 833 * Sends a message to the given worker.
280c2a77 834 *
38e795c1
JB
835 * @param worker - The worker which should receive the message.
836 * @param message - The message.
280c2a77
S
837 */
838 protected abstract sendToWorker (
839 worker: Worker,
840 message: MessageValue<Data>
841 ): void
842
4a6952ff 843 /**
f06e48d8 844 * Registers a listener callback on the given worker.
4a6952ff 845 *
38e795c1
JB
846 * @param worker - The worker which should register a listener.
847 * @param listener - The message listener callback.
4a6952ff 848 */
e102732c
JB
849 private registerWorkerMessageListener<Message extends Data | Response>(
850 worker: Worker,
851 listener: (message: MessageValue<Message>) => void
852 ): void {
853 worker.on('message', listener as MessageHandler<Worker>)
854 }
c97c7edb 855
729c563d 856 /**
41344292 857 * Creates a new worker.
6c6afb84
JB
858 *
859 * @returns Newly created worker.
729c563d 860 */
280c2a77 861 protected abstract createWorker (): Worker
c97c7edb 862
729c563d 863 /**
f06e48d8 864 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 865 * Can be overridden.
729c563d 866 *
38e795c1 867 * @param worker - The newly created worker.
729c563d 868 */
6677a3d3 869 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
870 // Listen to worker messages.
871 this.registerWorkerMessageListener(worker, this.workerListener())
872 }
c97c7edb 873
4a6952ff 874 /**
f06e48d8 875 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
876 *
877 * @returns New, completely set up worker.
878 */
879 protected createAndSetupWorker (): Worker {
bdacc2d2 880 const worker = this.createWorker()
280c2a77 881
35cf1c03 882 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 883 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
884 worker.on('error', error => {
885 if (this.emitter != null) {
886 this.emitter.emit(PoolEvents.error, error)
887 }
5bc91f3e 888 if (this.opts.enableTasksQueue === true) {
7d2e3f59 889 this.redistributeQueuedTasks(worker)
5bc91f3e 890 }
e8a4c3ea 891 if (this.opts.restartWorkerOnError === true) {
8a1260a3
JB
892 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
893 this.createAndSetupDynamicWorker()
894 } else {
895 this.createAndSetupWorker()
896 }
5baee0d7
JB
897 }
898 })
a35560ba
S
899 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
900 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 901 worker.once('exit', () => {
f06e48d8 902 this.removeWorkerNode(worker)
a974afa6 903 })
280c2a77 904
f06e48d8 905 this.pushWorkerNode(worker)
280c2a77 906
b6b32453
JB
907 this.setWorkerStatistics(worker)
908
280c2a77
S
909 this.afterWorkerSetup(worker)
910
c97c7edb
S
911 return worker
912 }
be0676b3 913
7d2e3f59
JB
914 private redistributeQueuedTasks (worker: Worker): void {
915 const workerNodeKey = this.getWorkerNodeKey(worker)
916 while (this.tasksQueueSize(workerNodeKey) > 0) {
917 let targetWorkerNodeKey: number = workerNodeKey
918 let minQueuedTasks = Infinity
919 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
920 if (
921 workerNodeId !== workerNodeKey &&
922 workerNode.usage.tasks.queued === 0
923 ) {
924 targetWorkerNodeKey = workerNodeId
925 break
926 }
927 if (
928 workerNodeId !== workerNodeKey &&
929 workerNode.usage.tasks.queued < minQueuedTasks
930 ) {
931 minQueuedTasks = workerNode.usage.tasks.queued
932 targetWorkerNodeKey = workerNodeId
933 }
934 }
935 this.enqueueTask(
936 targetWorkerNodeKey,
937 this.dequeueTask(workerNodeKey) as Task<Data>
938 )
939 }
940 }
941
930dcf12
JB
942 /**
943 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
944 *
945 * @returns New, completely set up dynamic worker.
946 */
947 protected createAndSetupDynamicWorker (): Worker {
948 const worker = this.createAndSetupWorker()
8a1260a3 949 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
930dcf12 950 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 951 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
952 if (
953 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
954 (message.kill != null &&
955 ((this.opts.enableTasksQueue === false &&
f59e1027 956 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 957 (this.opts.enableTasksQueue === true &&
f59e1027 958 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 959 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
960 ) {
961 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
962 void (this.destroyWorker(worker) as Promise<void>)
963 }
964 })
965 return worker
966 }
967
be0676b3 968 /**
ff733df7 969 * This function is the listener registered for each worker message.
be0676b3 970 *
bdacc2d2 971 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
972 */
973 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 974 return message => {
f59e1027
JB
975 if (message.workerId != null && message.started != null) {
976 // Worker started message received
6b272951 977 this.handleWorkerStartedMessage(message)
f59e1027 978 } else if (message.id != null) {
a3445496 979 // Task execution response received
6b272951
JB
980 this.handleTaskExecutionResponse(message)
981 }
982 }
983 }
984
985 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
986 // Worker started message received
987 const worker = this.getWorkerById(message.workerId as number)
988 if (worker != null) {
989 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
990 message.started as boolean
991 } else {
992 throw new Error(
993 `Worker started message received from unknown worker '${
994 message.workerId as number
995 }'`
996 )
997 }
998 }
999
1000 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
1001 const promiseResponse = this.promiseResponseMap.get(message.id as string)
1002 if (promiseResponse != null) {
1003 if (message.taskError != null) {
1004 if (this.emitter != null) {
1005 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 1006 }
6b272951
JB
1007 promiseResponse.reject(message.taskError.message)
1008 } else {
1009 promiseResponse.resolve(message.data as Response)
1010 }
1011 this.afterTaskExecutionHook(promiseResponse.worker, message)
1012 this.promiseResponseMap.delete(message.id as string)
1013 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
1014 if (
1015 this.opts.enableTasksQueue === true &&
1016 this.tasksQueueSize(workerNodeKey) > 0
1017 ) {
1018 this.executeTask(
1019 workerNodeKey,
1020 this.dequeueTask(workerNodeKey) as Task<Data>
1021 )
be0676b3 1022 }
6b272951 1023 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1024 }
be0676b3 1025 }
7c0ba920 1026
ff733df7 1027 private checkAndEmitEvents (): void {
1f68cede 1028 if (this.emitter != null) {
ff733df7 1029 if (this.busy) {
2845f2a5 1030 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 1031 }
6b27d407 1032 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 1033 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 1034 }
164d950a
JB
1035 }
1036 }
1037
0ebe2a9f
JB
1038 /**
1039 * Sets the given worker node its tasks usage in the pool.
1040 *
1041 * @param workerNode - The worker node.
a4e07f72 1042 * @param workerUsage - The worker usage.
0ebe2a9f
JB
1043 */
1044 private setWorkerNodeTasksUsage (
1045 workerNode: WorkerNode<Worker, Data>,
a4e07f72 1046 workerUsage: WorkerUsage
0ebe2a9f 1047 ): void {
f59e1027 1048 workerNode.usage = workerUsage
0ebe2a9f
JB
1049 }
1050
8a1260a3
JB
1051 /**
1052 * Gets the worker information.
1053 *
1054 * @param workerNodeKey - The worker node key.
1055 */
1056 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1057 return this.workerNodes[workerNodeKey].info
1058 }
1059
a05c10de 1060 /**
f06e48d8 1061 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1062 *
38e795c1 1063 * @param worker - The worker.
f06e48d8 1064 * @returns The worker nodes length.
ea7a90d3 1065 */
f06e48d8 1066 private pushWorkerNode (worker: Worker): number {
9c16fb4b 1067 this.workerNodes.push({
ffcbbad8 1068 worker,
8a1260a3
JB
1069 info: this.getInitialWorkerInfo(worker),
1070 usage: this.getInitialWorkerUsage(),
29ee7e9a 1071 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 1072 })
9c16fb4b 1073 this.setWorkerNodeTasksUsage(
8a1260a3
JB
1074 this.workerNodes[this.getWorkerNodeKey(worker)],
1075 this.getInitialWorkerUsage(worker)
9c16fb4b
JB
1076 )
1077 return this.workerNodes.length
ea7a90d3 1078 }
c923ce56 1079
83fa0a36
JB
1080 /**
1081 * Gets the worker id.
1082 *
1083 * @param worker - The worker.
1084 * @returns The worker id.
1085 */
1086 private getWorkerId (worker: Worker): number | undefined {
1087 if (this.worker === WorkerTypes.thread) {
1088 return worker.threadId
1089 } else if (this.worker === WorkerTypes.cluster) {
1090 return worker.id
1091 }
1092 }
1093
8604aaab
JB
1094 // /**
1095 // * Sets the given worker in the pool worker nodes.
1096 // *
1097 // * @param workerNodeKey - The worker node key.
1098 // * @param worker - The worker.
f59e1027 1099 // * @param workerInfo - The worker info.
8604aaab
JB
1100 // * @param workerUsage - The worker usage.
1101 // * @param tasksQueue - The worker task queue.
1102 // */
1103 // private setWorkerNode (
1104 // workerNodeKey: number,
1105 // worker: Worker,
f59e1027 1106 // workerInfo: WorkerInfo,
8604aaab
JB
1107 // workerUsage: WorkerUsage,
1108 // tasksQueue: Queue<Task<Data>>
1109 // ): void {
1110 // this.workerNodes[workerNodeKey] = {
1111 // worker,
f59e1027
JB
1112 // info: workerInfo,
1113 // usage: workerUsage,
8604aaab
JB
1114 // tasksQueue
1115 // }
1116 // }
51fe3d3c
JB
1117
1118 /**
f06e48d8 1119 * Removes the given worker from the pool worker nodes.
51fe3d3c 1120 *
f06e48d8 1121 * @param worker - The worker.
51fe3d3c 1122 */
416fd65c 1123 private removeWorkerNode (worker: Worker): void {
f06e48d8 1124 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1125 if (workerNodeKey !== -1) {
1126 this.workerNodes.splice(workerNodeKey, 1)
1127 this.workerChoiceStrategyContext.remove(workerNodeKey)
1128 }
51fe3d3c 1129 }
adc3c320 1130
2e81254d 1131 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1132 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1133 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1134 }
1135
f9f00b5f 1136 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1137 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1138 }
1139
416fd65c 1140 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1141 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1142 }
1143
416fd65c 1144 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1145 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1146 }
ff733df7 1147
df593701
JB
1148 private tasksMaxQueueSize (workerNodeKey: number): number {
1149 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1150 }
1151
416fd65c 1152 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1153 while (this.tasksQueueSize(workerNodeKey) > 0) {
1154 this.executeTask(
1155 workerNodeKey,
1156 this.dequeueTask(workerNodeKey) as Task<Data>
1157 )
ff733df7 1158 }
df593701 1159 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1160 }
1161
ef41a6e6
JB
1162 private flushTasksQueues (): void {
1163 for (const [workerNodeKey] of this.workerNodes.entries()) {
1164 this.flushTasksQueue(workerNodeKey)
1165 }
1166 }
b6b32453
JB
1167
1168 private setWorkerStatistics (worker: Worker): void {
1169 this.sendToWorker(worker, {
1170 statistics: {
87de9ff5
JB
1171 runTime:
1172 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1173 .runTime.aggregate,
87de9ff5 1174 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1175 .elu.aggregate
b6b32453
JB
1176 }
1177 })
1178 }
8604aaab 1179
8a1260a3
JB
1180 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1181 const getTasksQueueSize = (worker?: Worker): number => {
71514351
JB
1182 if (worker == null) {
1183 return 0
1184 }
2cfaaca0 1185 return this.tasksQueueSize(this.getWorkerNodeKey(worker))
9c16fb4b 1186 }
8a1260a3 1187 const getTasksMaxQueueSize = (worker?: Worker): number => {
71514351
JB
1188 if (worker == null) {
1189 return 0
1190 }
2cfaaca0 1191 return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
df593701 1192 }
8604aaab 1193 return {
9c16fb4b
JB
1194 tasks: {
1195 executed: 0,
1196 executing: 0,
1197 get queued (): number {
8a1260a3 1198 return getTasksQueueSize(worker)
9c16fb4b 1199 },
df593701 1200 get maxQueued (): number {
8a1260a3 1201 return getTasksMaxQueueSize(worker)
df593701 1202 },
9c16fb4b
JB
1203 failed: 0
1204 },
8604aaab 1205 runTime: {
8604aaab
JB
1206 history: new CircularArray()
1207 },
1208 waitTime: {
8604aaab
JB
1209 history: new CircularArray()
1210 },
5df69fab
JB
1211 elu: {
1212 idle: {
5df69fab
JB
1213 history: new CircularArray()
1214 },
1215 active: {
5df69fab 1216 history: new CircularArray()
f7510105 1217 }
5df69fab 1218 }
8604aaab
JB
1219 }
1220 }
8a1260a3
JB
1221
1222 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1223 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1224 }
c97c7edb 1225}