docs: refine FIXME comment
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
59317253 7 isKillBehavior,
0d80593b 8 isPlainObject,
afe0d5bf
JB
9 median,
10 round
bbeadd16 11} from '../utils'
59317253 12import { KillBehaviors } from '../worker/worker-options'
65d7a1c9 13import { CircularArray } from '../circular-array'
29ee7e9a 14import { Queue } from '../queue'
c4855468 15import {
65d7a1c9 16 type IPool,
7c5a1080 17 PoolEmitter,
c4855468 18 PoolEvents,
6b27d407 19 type PoolInfo,
c4855468 20 type PoolOptions,
6b27d407
JB
21 type PoolType,
22 PoolTypes,
184855e6 23 type TasksQueueOptions,
83fa0a36
JB
24 type WorkerType,
25 WorkerTypes
c4855468 26} from './pool'
e102732c
JB
27import type {
28 IWorker,
29 MessageHandler,
30 Task,
8a1260a3 31 WorkerInfo,
e102732c
JB
32 WorkerNode,
33 WorkerUsage
34} from './worker'
a35560ba 35import {
f0d7f803 36 Measurements,
a35560ba 37 WorkerChoiceStrategies,
a20f0ba5
JB
38 type WorkerChoiceStrategy,
39 type WorkerChoiceStrategyOptions
bdaf31cd
JB
40} from './selection-strategies/selection-strategies-types'
41import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 42import { version } from './version'
23ccf9d7 43
729c563d 44/**
ea7a90d3 45 * Base class that implements some shared logic for all poolifier pools.
729c563d 46 *
38e795c1 47 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
48 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
49 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 50 */
c97c7edb 51export abstract class AbstractPool<
f06e48d8 52 Worker extends IWorker,
d3c8a1a8
S
53 Data = unknown,
54 Response = unknown
c4855468 55> implements IPool<Worker, Data, Response> {
afc003b2 56 /** @inheritDoc */
f06e48d8 57 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 58
afc003b2 59 /** @inheritDoc */
7c0ba920
JB
60 public readonly emitter?: PoolEmitter
61
be0676b3 62 /**
a3445496 63 * The execution response promise map.
be0676b3 64 *
2740a743 65 * - `key`: The message id of each submitted task.
a3445496 66 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 67 *
a3445496 68 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 69 */
c923ce56
JB
70 protected promiseResponseMap: Map<
71 string,
72 PromiseResponseWrapper<Worker, Response>
73 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 74
a35560ba 75 /**
51fe3d3c 76 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
77 */
78 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
79 Worker,
80 Data,
81 Response
a35560ba
S
82 >
83
afe0d5bf
JB
84 /**
85 * The start timestamp of the pool.
86 */
87 private readonly startTimestamp
88
729c563d
S
89 /**
90 * Constructs a new poolifier pool.
91 *
38e795c1 92 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 93 * @param filePath - Path to the worker file.
38e795c1 94 * @param opts - Options for the pool.
729c563d 95 */
c97c7edb 96 public constructor (
b4213b7f
JB
97 protected readonly numberOfWorkers: number,
98 protected readonly filePath: string,
99 protected readonly opts: PoolOptions<Worker>
c97c7edb 100 ) {
78cea37e 101 if (!this.isMain()) {
c97c7edb
S
102 throw new Error('Cannot start a pool from a worker!')
103 }
8d3782fa 104 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 105 this.checkFilePath(this.filePath)
7c0ba920 106 this.checkPoolOptions(this.opts)
1086026a 107
7254e419
JB
108 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
109 this.executeTask = this.executeTask.bind(this)
110 this.enqueueTask = this.enqueueTask.bind(this)
111 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 112
6bd72cd0 113 if (this.opts.enableEvents === true) {
7c0ba920
JB
114 this.emitter = new PoolEmitter()
115 }
d59df138
JB
116 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
117 Worker,
118 Data,
119 Response
da309861
JB
120 >(
121 this,
122 this.opts.workerChoiceStrategy,
123 this.opts.workerChoiceStrategyOptions
124 )
b6b32453
JB
125
126 this.setupHook()
127
afe0d5bf 128 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
129 this.createAndSetupWorker()
130 }
afe0d5bf
JB
131
132 this.startTimestamp = performance.now()
c97c7edb
S
133 }
134
a35560ba 135 private checkFilePath (filePath: string): void {
ffcbbad8
JB
136 if (
137 filePath == null ||
138 (typeof filePath === 'string' && filePath.trim().length === 0)
139 ) {
c510fea7
APA
140 throw new Error('Please specify a file with a worker implementation')
141 }
142 }
143
8d3782fa
JB
144 private checkNumberOfWorkers (numberOfWorkers: number): void {
145 if (numberOfWorkers == null) {
146 throw new Error(
147 'Cannot instantiate a pool without specifying the number of workers'
148 )
78cea37e 149 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 150 throw new TypeError(
0d80593b 151 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
152 )
153 } else if (numberOfWorkers < 0) {
473c717a 154 throw new RangeError(
8d3782fa
JB
155 'Cannot instantiate a pool with a negative number of workers'
156 )
6b27d407 157 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
158 throw new Error('Cannot instantiate a fixed pool with no worker')
159 }
160 }
161
7c0ba920 162 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
163 if (isPlainObject(opts)) {
164 this.opts.workerChoiceStrategy =
165 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
166 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
167 this.opts.workerChoiceStrategyOptions =
168 opts.workerChoiceStrategyOptions ??
169 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
170 this.checkValidWorkerChoiceStrategyOptions(
171 this.opts.workerChoiceStrategyOptions
172 )
1f68cede 173 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
174 this.opts.enableEvents = opts.enableEvents ?? true
175 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
176 if (this.opts.enableTasksQueue) {
177 this.checkValidTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
181 opts.tasksQueueOptions as TasksQueueOptions
182 )
183 }
184 } else {
185 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 186 }
aee46736
JB
187 }
188
189 private checkValidWorkerChoiceStrategy (
190 workerChoiceStrategy: WorkerChoiceStrategy
191 ): void {
192 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 193 throw new Error(
aee46736 194 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
195 )
196 }
7c0ba920
JB
197 }
198
0d80593b
JB
199 private checkValidWorkerChoiceStrategyOptions (
200 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
201 ): void {
202 if (!isPlainObject(workerChoiceStrategyOptions)) {
203 throw new TypeError(
204 'Invalid worker choice strategy options: must be a plain object'
205 )
206 }
49be33fe
JB
207 if (
208 workerChoiceStrategyOptions.weights != null &&
6b27d407 209 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
210 ) {
211 throw new Error(
212 'Invalid worker choice strategy options: must have a weight for each worker node'
213 )
214 }
f0d7f803
JB
215 if (
216 workerChoiceStrategyOptions.measurement != null &&
217 !Object.values(Measurements).includes(
218 workerChoiceStrategyOptions.measurement
219 )
220 ) {
221 throw new Error(
222 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
223 )
224 }
0d80593b
JB
225 }
226
a20f0ba5
JB
227 private checkValidTasksQueueOptions (
228 tasksQueueOptions: TasksQueueOptions
229 ): void {
0d80593b
JB
230 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
231 throw new TypeError('Invalid tasks queue options: must be a plain object')
232 }
f0d7f803
JB
233 if (
234 tasksQueueOptions?.concurrency != null &&
235 !Number.isSafeInteger(tasksQueueOptions.concurrency)
236 ) {
237 throw new TypeError(
238 'Invalid worker tasks concurrency: must be an integer'
239 )
240 }
241 if (
242 tasksQueueOptions?.concurrency != null &&
243 tasksQueueOptions.concurrency <= 0
244 ) {
a20f0ba5 245 throw new Error(
f0d7f803 246 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
247 )
248 }
249 }
250
08f3f44c 251 /** @inheritDoc */
6b27d407
JB
252 public get info (): PoolInfo {
253 return {
23ccf9d7 254 version,
6b27d407 255 type: this.type,
184855e6 256 worker: this.worker,
6b27d407
JB
257 minSize: this.minSize,
258 maxSize: this.maxSize,
c05f0d50
JB
259 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
260 .runTime.aggregate &&
1305e9a8
JB
261 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
262 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
263 workerNodes: this.workerNodes.length,
264 idleWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
f59e1027 266 workerNode.usage.tasks.executing === 0
a4e07f72
JB
267 ? accumulator + 1
268 : accumulator,
6b27d407
JB
269 0
270 ),
271 busyWorkerNodes: this.workerNodes.reduce(
272 (accumulator, workerNode) =>
f59e1027 273 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
274 0
275 ),
a4e07f72 276 executedTasks: this.workerNodes.reduce(
6b27d407 277 (accumulator, workerNode) =>
f59e1027 278 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
279 0
280 ),
281 executingTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
f59e1027 283 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
284 0
285 ),
286 queuedTasks: this.workerNodes.reduce(
df593701 287 (accumulator, workerNode) =>
f59e1027 288 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
289 0
290 ),
291 maxQueuedTasks: this.workerNodes.reduce(
292 (accumulator, workerNode) =>
f59e1027 293 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 294 0
a4e07f72
JB
295 ),
296 failedTasks: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
f59e1027 298 accumulator + workerNode.usage.tasks.failed,
a4e07f72 299 0
1dcf8b7b
JB
300 ),
301 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
302 .runTime.aggregate && {
303 runTime: {
304 minimum: Math.min(
305 ...this.workerNodes.map(
71514351 306 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
1dcf8b7b
JB
307 )
308 ),
309 maximum: Math.max(
310 ...this.workerNodes.map(
71514351 311 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
1dcf8b7b
JB
312 )
313 )
314 }
315 }),
316 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
317 .waitTime.aggregate && {
318 waitTime: {
319 minimum: Math.min(
320 ...this.workerNodes.map(
71514351 321 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
1dcf8b7b
JB
322 )
323 ),
324 maximum: Math.max(
325 ...this.workerNodes.map(
71514351 326 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
1dcf8b7b
JB
327 )
328 )
329 }
330 })
6b27d407
JB
331 }
332 }
08f3f44c 333
afe0d5bf
JB
334 /**
335 * Gets the approximate pool utilization.
336 *
337 * @returns The pool utilization.
338 */
339 private get utilization (): number {
fe7d90db
JB
340 const poolRunTimeCapacity =
341 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
342 const totalTasksRunTime = this.workerNodes.reduce(
343 (accumulator, workerNode) =>
71514351 344 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
345 0
346 )
347 const totalTasksWaitTime = this.workerNodes.reduce(
348 (accumulator, workerNode) =>
71514351 349 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
350 0
351 )
352 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
353 }
354
8881ae32
JB
355 /**
356 * Pool type.
357 *
358 * If it is `'dynamic'`, it provides the `max` property.
359 */
360 protected abstract get type (): PoolType
361
184855e6
JB
362 /**
363 * Gets the worker type.
364 */
365 protected abstract get worker (): WorkerType
366
c2ade475 367 /**
6b27d407 368 * Pool minimum size.
c2ade475 369 */
6b27d407 370 protected abstract get minSize (): number
ff733df7
JB
371
372 /**
6b27d407 373 * Pool maximum size.
ff733df7 374 */
6b27d407 375 protected abstract get maxSize (): number
a35560ba 376
f59e1027
JB
377 /**
378 * Get the worker given its id.
379 *
380 * @param workerId - The worker id.
381 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
382 */
383 private getWorkerById (workerId: number): Worker | undefined {
384 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
385 ?.worker
386 }
387
ffcbbad8 388 /**
f06e48d8 389 * Gets the given worker its worker node key.
ffcbbad8
JB
390 *
391 * @param worker - The worker.
f59e1027 392 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 393 */
f06e48d8
JB
394 private getWorkerNodeKey (worker: Worker): number {
395 return this.workerNodes.findIndex(
396 workerNode => workerNode.worker === worker
397 )
bf9549ae
JB
398 }
399
afc003b2 400 /** @inheritDoc */
a35560ba 401 public setWorkerChoiceStrategy (
59219cbb
JB
402 workerChoiceStrategy: WorkerChoiceStrategy,
403 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 404 ): void {
aee46736 405 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 406 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
407 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
408 this.opts.workerChoiceStrategy
409 )
410 if (workerChoiceStrategyOptions != null) {
411 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
412 }
8a1260a3 413 for (const workerNode of this.workerNodes) {
8604aaab
JB
414 this.setWorkerNodeTasksUsage(
415 workerNode,
8a1260a3 416 this.getInitialWorkerUsage(workerNode.worker)
8604aaab 417 )
b6b32453 418 this.setWorkerStatistics(workerNode.worker)
59219cbb 419 }
a20f0ba5
JB
420 }
421
422 /** @inheritDoc */
423 public setWorkerChoiceStrategyOptions (
424 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
425 ): void {
0d80593b 426 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
427 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
428 this.workerChoiceStrategyContext.setOptions(
429 this.opts.workerChoiceStrategyOptions
a35560ba
S
430 )
431 }
432
a20f0ba5 433 /** @inheritDoc */
8f52842f
JB
434 public enableTasksQueue (
435 enable: boolean,
436 tasksQueueOptions?: TasksQueueOptions
437 ): void {
a20f0ba5 438 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 439 this.flushTasksQueues()
a20f0ba5
JB
440 }
441 this.opts.enableTasksQueue = enable
8f52842f 442 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
443 }
444
445 /** @inheritDoc */
8f52842f 446 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 447 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
448 this.checkValidTasksQueueOptions(tasksQueueOptions)
449 this.opts.tasksQueueOptions =
450 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 451 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
452 delete this.opts.tasksQueueOptions
453 }
454 }
455
456 private buildTasksQueueOptions (
457 tasksQueueOptions: TasksQueueOptions
458 ): TasksQueueOptions {
459 return {
460 concurrency: tasksQueueOptions?.concurrency ?? 1
461 }
462 }
463
c319c66b
JB
464 /**
465 * Whether the pool is full or not.
466 *
467 * The pool filling boolean status.
468 */
dea903a8
JB
469 protected get full (): boolean {
470 return this.workerNodes.length >= this.maxSize
471 }
c2ade475 472
c319c66b
JB
473 /**
474 * Whether the pool is busy or not.
475 *
476 * The pool busyness boolean status.
477 */
478 protected abstract get busy (): boolean
7c0ba920 479
6c6afb84
JB
480 /**
481 * Whether worker nodes are executing at least one task.
482 *
483 * @returns Worker nodes busyness boolean status.
484 */
c2ade475 485 protected internalBusy (): boolean {
e0ae6100
JB
486 return (
487 this.workerNodes.findIndex(workerNode => {
f59e1027 488 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
489 }) === -1
490 )
cb70b19d
JB
491 }
492
afc003b2 493 /** @inheritDoc */
a86b6df1 494 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 495 const timestamp = performance.now()
20dcad1a 496 const workerNodeKey = this.chooseWorkerNode()
adc3c320 497 const submittedTask: Task<Data> = {
a86b6df1 498 name,
e5a5c0fc
JB
499 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
500 data: data ?? ({} as Data),
b6b32453 501 timestamp,
2845f2a5 502 id: randomUUID()
adc3c320 503 }
2e81254d 504 const res = new Promise<Response>((resolve, reject) => {
02706357 505 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
506 resolve,
507 reject,
20dcad1a 508 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
509 })
510 })
ff733df7
JB
511 if (
512 this.opts.enableTasksQueue === true &&
7171d33f 513 (this.busy ||
f59e1027 514 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 515 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 516 .concurrency as number))
ff733df7 517 ) {
26a929d7
JB
518 this.enqueueTask(workerNodeKey, submittedTask)
519 } else {
2e81254d 520 this.executeTask(workerNodeKey, submittedTask)
adc3c320 521 }
ff733df7 522 this.checkAndEmitEvents()
78cea37e 523 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
524 return res
525 }
c97c7edb 526
afc003b2 527 /** @inheritDoc */
c97c7edb 528 public async destroy (): Promise<void> {
1fbcaa7c 529 await Promise.all(
875a7c37
JB
530 this.workerNodes.map(async (workerNode, workerNodeKey) => {
531 this.flushTasksQueue(workerNodeKey)
47aacbaa 532 // FIXME: wait for tasks to be finished
920278a2
JB
533 const workerExitPromise = new Promise<void>(resolve => {
534 workerNode.worker.on('exit', () => {
535 resolve()
536 })
537 })
f06e48d8 538 await this.destroyWorker(workerNode.worker)
920278a2 539 await workerExitPromise
1fbcaa7c
JB
540 })
541 )
c97c7edb
S
542 }
543
4a6952ff 544 /**
6c6afb84 545 * Terminates the given worker.
4a6952ff 546 *
f06e48d8 547 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
548 */
549 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 550
729c563d 551 /**
6677a3d3
JB
552 * Setup hook to execute code before worker nodes are created in the abstract constructor.
553 * Can be overridden.
afc003b2
JB
554 *
555 * @virtual
729c563d 556 */
280c2a77 557 protected setupHook (): void {
d99ba5a8 558 // Intentionally empty
280c2a77 559 }
c97c7edb 560
729c563d 561 /**
280c2a77
S
562 * Should return whether the worker is the main worker or not.
563 */
564 protected abstract isMain (): boolean
565
566 /**
2e81254d 567 * Hook executed before the worker task execution.
bf9549ae 568 * Can be overridden.
729c563d 569 *
f06e48d8 570 * @param workerNodeKey - The worker node key.
1c6fe997 571 * @param task - The task to execute.
729c563d 572 */
1c6fe997
JB
573 protected beforeTaskExecutionHook (
574 workerNodeKey: number,
575 task: Task<Data>
576 ): void {
f59e1027 577 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
578 ++workerUsage.tasks.executing
579 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
580 }
581
c01733f1 582 /**
2e81254d 583 * Hook executed after the worker task execution.
bf9549ae 584 * Can be overridden.
c01733f1 585 *
c923ce56 586 * @param worker - The worker.
38e795c1 587 * @param message - The received message.
c01733f1 588 */
2e81254d 589 protected afterTaskExecutionHook (
c923ce56 590 worker: Worker,
2740a743 591 message: MessageValue<Response>
bf9549ae 592 ): void {
f59e1027 593 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
594 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
595 this.updateRunTimeWorkerUsage(workerUsage, message)
596 this.updateEluWorkerUsage(workerUsage, message)
597 }
598
599 private updateTaskStatisticsWorkerUsage (
600 workerUsage: WorkerUsage,
601 message: MessageValue<Response>
602 ): void {
a4e07f72
JB
603 const workerTaskStatistics = workerUsage.tasks
604 --workerTaskStatistics.executing
605 ++workerTaskStatistics.executed
82f36766 606 if (message.taskError != null) {
a4e07f72 607 ++workerTaskStatistics.failed
2740a743 608 }
f8eb0a2a
JB
609 }
610
a4e07f72
JB
611 private updateRunTimeWorkerUsage (
612 workerUsage: WorkerUsage,
f8eb0a2a
JB
613 message: MessageValue<Response>
614 ): void {
87de9ff5
JB
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 617 .aggregate
87de9ff5 618 ) {
f7510105 619 const taskRunTime = message.taskPerformance?.runTime ?? 0
71514351
JB
620 workerUsage.runTime.aggregate =
621 (workerUsage.runTime.aggregate ?? 0) + taskRunTime
f7510105
JB
622 workerUsage.runTime.minimum = Math.min(
623 taskRunTime,
71514351 624 workerUsage.runTime?.minimum ?? Infinity
f7510105
JB
625 )
626 workerUsage.runTime.maximum = Math.max(
627 taskRunTime,
71514351 628 workerUsage.runTime?.maximum ?? -Infinity
f7510105 629 )
c6bd2650 630 if (
932fc8be
JB
631 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
632 .average &&
a4e07f72 633 workerUsage.tasks.executed !== 0
c6bd2650 634 ) {
a4e07f72 635 workerUsage.runTime.average =
f1c06930
JB
636 workerUsage.runTime.aggregate /
637 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 638 }
3fa4cdd2 639 if (
932fc8be
JB
640 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
641 .median &&
d715b7bc 642 message.taskPerformance?.runTime != null
3fa4cdd2 643 ) {
a4e07f72
JB
644 workerUsage.runTime.history.push(message.taskPerformance.runTime)
645 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 646 }
3032893a 647 }
f8eb0a2a
JB
648 }
649
a4e07f72
JB
650 private updateWaitTimeWorkerUsage (
651 workerUsage: WorkerUsage,
1c6fe997 652 task: Task<Data>
f8eb0a2a 653 ): void {
1c6fe997
JB
654 const timestamp = performance.now()
655 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
656 if (
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 658 .aggregate
87de9ff5 659 ) {
71514351
JB
660 workerUsage.waitTime.aggregate =
661 (workerUsage.waitTime?.aggregate ?? 0) + taskWaitTime
f7510105
JB
662 workerUsage.waitTime.minimum = Math.min(
663 taskWaitTime,
71514351 664 workerUsage.waitTime?.minimum ?? Infinity
f7510105
JB
665 )
666 workerUsage.waitTime.maximum = Math.max(
667 taskWaitTime,
71514351 668 workerUsage.waitTime?.maximum ?? -Infinity
f7510105 669 )
09a6305f 670 if (
87de9ff5 671 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 672 .waitTime.average &&
a4e07f72 673 workerUsage.tasks.executed !== 0
09a6305f 674 ) {
a4e07f72 675 workerUsage.waitTime.average =
f1c06930
JB
676 workerUsage.waitTime.aggregate /
677 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
678 }
679 if (
87de9ff5 680 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
71514351 681 .waitTime.median
09a6305f 682 ) {
1c6fe997 683 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 684 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 685 }
0567595a 686 }
c01733f1 687 }
688
a4e07f72 689 private updateEluWorkerUsage (
5df69fab 690 workerUsage: WorkerUsage,
62c15a68
JB
691 message: MessageValue<Response>
692 ): void {
5df69fab
JB
693 if (
694 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
695 .aggregate
696 ) {
f7510105 697 if (message.taskPerformance?.elu != null) {
71514351
JB
698 workerUsage.elu.idle.aggregate =
699 (workerUsage.elu.idle?.aggregate ?? 0) +
700 message.taskPerformance.elu.idle
701 workerUsage.elu.active.aggregate =
702 (workerUsage.elu.active?.aggregate ?? 0) +
703 message.taskPerformance.elu.active
f7510105
JB
704 if (workerUsage.elu.utilization != null) {
705 workerUsage.elu.utilization =
706 (workerUsage.elu.utilization +
707 message.taskPerformance.elu.utilization) /
708 2
709 } else {
710 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
711 }
712 workerUsage.elu.idle.minimum = Math.min(
713 message.taskPerformance.elu.idle,
71514351 714 workerUsage.elu.idle?.minimum ?? Infinity
f7510105
JB
715 )
716 workerUsage.elu.idle.maximum = Math.max(
717 message.taskPerformance.elu.idle,
71514351 718 workerUsage.elu.idle?.maximum ?? -Infinity
f7510105
JB
719 )
720 workerUsage.elu.active.minimum = Math.min(
721 message.taskPerformance.elu.active,
71514351 722 workerUsage.elu.active?.minimum ?? Infinity
f7510105
JB
723 )
724 workerUsage.elu.active.maximum = Math.max(
725 message.taskPerformance.elu.active,
71514351 726 workerUsage.elu.active?.maximum ?? -Infinity
f7510105 727 )
71514351
JB
728 if (
729 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
730 .average &&
731 workerUsage.tasks.executed !== 0
732 ) {
733 const executedTasks =
734 workerUsage.tasks.executed - workerUsage.tasks.failed
735 workerUsage.elu.idle.average =
736 workerUsage.elu.idle.aggregate / executedTasks
737 workerUsage.elu.active.average =
738 workerUsage.elu.active.aggregate / executedTasks
739 }
740 if (
741 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
742 .median &&
743 message.taskPerformance?.elu != null
744 ) {
745 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
746 workerUsage.elu.active.history.push(
747 message.taskPerformance.elu.active
748 )
749 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
750 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
751 }
62c15a68
JB
752 }
753 }
754 }
755
280c2a77 756 /**
f06e48d8 757 * Chooses a worker node for the next task.
280c2a77 758 *
6c6afb84 759 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 760 *
20dcad1a 761 * @returns The worker node key
280c2a77 762 */
6c6afb84 763 private chooseWorkerNode (): number {
930dcf12 764 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
765 const worker = this.createAndSetupDynamicWorker()
766 if (
767 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
768 ) {
769 return this.getWorkerNodeKey(worker)
770 }
17393ac8 771 }
930dcf12
JB
772 return this.workerChoiceStrategyContext.execute()
773 }
774
6c6afb84
JB
775 /**
776 * Conditions for dynamic worker creation.
777 *
778 * @returns Whether to create a dynamic worker or not.
779 */
780 private shallCreateDynamicWorker (): boolean {
930dcf12 781 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
782 }
783
280c2a77 784 /**
675bb809 785 * Sends a message to the given worker.
280c2a77 786 *
38e795c1
JB
787 * @param worker - The worker which should receive the message.
788 * @param message - The message.
280c2a77
S
789 */
790 protected abstract sendToWorker (
791 worker: Worker,
792 message: MessageValue<Data>
793 ): void
794
4a6952ff 795 /**
f06e48d8 796 * Registers a listener callback on the given worker.
4a6952ff 797 *
38e795c1
JB
798 * @param worker - The worker which should register a listener.
799 * @param listener - The message listener callback.
4a6952ff 800 */
e102732c
JB
801 private registerWorkerMessageListener<Message extends Data | Response>(
802 worker: Worker,
803 listener: (message: MessageValue<Message>) => void
804 ): void {
805 worker.on('message', listener as MessageHandler<Worker>)
806 }
c97c7edb 807
729c563d 808 /**
41344292 809 * Creates a new worker.
6c6afb84
JB
810 *
811 * @returns Newly created worker.
729c563d 812 */
280c2a77 813 protected abstract createWorker (): Worker
c97c7edb 814
729c563d 815 /**
f06e48d8 816 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 817 * Can be overridden.
729c563d 818 *
38e795c1 819 * @param worker - The newly created worker.
729c563d 820 */
6677a3d3 821 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
822 // Listen to worker messages.
823 this.registerWorkerMessageListener(worker, this.workerListener())
824 }
c97c7edb 825
4a6952ff 826 /**
f06e48d8 827 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
828 *
829 * @returns New, completely set up worker.
830 */
831 protected createAndSetupWorker (): Worker {
bdacc2d2 832 const worker = this.createWorker()
280c2a77 833
35cf1c03 834 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 835 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
836 worker.on('error', error => {
837 if (this.emitter != null) {
838 this.emitter.emit(PoolEvents.error, error)
839 }
5bc91f3e
JB
840 if (this.opts.enableTasksQueue === true) {
841 const workerNodeKey = this.getWorkerNodeKey(worker)
842 while (this.tasksQueueSize(workerNodeKey) > 0) {
843 let targetWorkerNodeKey: number = workerNodeKey
844 let minQueuedTasks = Infinity
845 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
846 if (
847 workerNodeId !== workerNodeKey &&
848 workerNode.usage.tasks.queued === 0
849 ) {
850 targetWorkerNodeKey = workerNodeId
851 break
852 }
853 if (
854 workerNodeId !== workerNodeKey &&
855 workerNode.usage.tasks.queued < minQueuedTasks
856 ) {
857 minQueuedTasks = workerNode.usage.tasks.queued
858 targetWorkerNodeKey = workerNodeId
859 }
860 }
861 this.enqueueTask(
862 targetWorkerNodeKey,
863 this.dequeueTask(workerNodeKey) as Task<Data>
864 )
865 }
866 }
e8a4c3ea 867 if (this.opts.restartWorkerOnError === true) {
8a1260a3
JB
868 if (this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic) {
869 this.createAndSetupDynamicWorker()
870 } else {
871 this.createAndSetupWorker()
872 }
5baee0d7
JB
873 }
874 })
a35560ba
S
875 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
876 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 877 worker.once('exit', () => {
f06e48d8 878 this.removeWorkerNode(worker)
a974afa6 879 })
280c2a77 880
f06e48d8 881 this.pushWorkerNode(worker)
280c2a77 882
b6b32453
JB
883 this.setWorkerStatistics(worker)
884
280c2a77
S
885 this.afterWorkerSetup(worker)
886
c97c7edb
S
887 return worker
888 }
be0676b3 889
930dcf12
JB
890 /**
891 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
892 *
893 * @returns New, completely set up dynamic worker.
894 */
895 protected createAndSetupDynamicWorker (): Worker {
896 const worker = this.createAndSetupWorker()
8a1260a3 897 this.getWorkerInfo(this.getWorkerNodeKey(worker)).dynamic = true
930dcf12 898 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 899 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
900 if (
901 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
902 (message.kill != null &&
903 ((this.opts.enableTasksQueue === false &&
f59e1027 904 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 905 (this.opts.enableTasksQueue === true &&
f59e1027 906 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 907 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
908 ) {
909 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
910 void (this.destroyWorker(worker) as Promise<void>)
911 }
912 })
913 return worker
914 }
915
be0676b3 916 /**
ff733df7 917 * This function is the listener registered for each worker message.
be0676b3 918 *
bdacc2d2 919 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
920 */
921 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 922 return message => {
f59e1027
JB
923 if (message.workerId != null && message.started != null) {
924 // Worker started message received
6b272951 925 this.handleWorkerStartedMessage(message)
f59e1027 926 } else if (message.id != null) {
a3445496 927 // Task execution response received
6b272951
JB
928 this.handleTaskExecutionResponse(message)
929 }
930 }
931 }
932
933 private handleWorkerStartedMessage (message: MessageValue<Response>): void {
934 // Worker started message received
935 const worker = this.getWorkerById(message.workerId as number)
936 if (worker != null) {
937 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
938 message.started as boolean
939 } else {
940 throw new Error(
941 `Worker started message received from unknown worker '${
942 message.workerId as number
943 }'`
944 )
945 }
946 }
947
948 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
949 const promiseResponse = this.promiseResponseMap.get(message.id as string)
950 if (promiseResponse != null) {
951 if (message.taskError != null) {
952 if (this.emitter != null) {
953 this.emitter.emit(PoolEvents.taskError, message.taskError)
be0676b3 954 }
6b272951
JB
955 promiseResponse.reject(message.taskError.message)
956 } else {
957 promiseResponse.resolve(message.data as Response)
958 }
959 this.afterTaskExecutionHook(promiseResponse.worker, message)
960 this.promiseResponseMap.delete(message.id as string)
961 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
962 if (
963 this.opts.enableTasksQueue === true &&
964 this.tasksQueueSize(workerNodeKey) > 0
965 ) {
966 this.executeTask(
967 workerNodeKey,
968 this.dequeueTask(workerNodeKey) as Task<Data>
969 )
be0676b3 970 }
6b272951 971 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 972 }
be0676b3 973 }
7c0ba920 974
ff733df7 975 private checkAndEmitEvents (): void {
1f68cede 976 if (this.emitter != null) {
ff733df7 977 if (this.busy) {
2845f2a5 978 this.emitter.emit(PoolEvents.busy, this.info)
ff733df7 979 }
6b27d407 980 if (this.type === PoolTypes.dynamic && this.full) {
2845f2a5 981 this.emitter.emit(PoolEvents.full, this.info)
ff733df7 982 }
164d950a
JB
983 }
984 }
985
0ebe2a9f
JB
986 /**
987 * Sets the given worker node its tasks usage in the pool.
988 *
989 * @param workerNode - The worker node.
a4e07f72 990 * @param workerUsage - The worker usage.
0ebe2a9f
JB
991 */
992 private setWorkerNodeTasksUsage (
993 workerNode: WorkerNode<Worker, Data>,
a4e07f72 994 workerUsage: WorkerUsage
0ebe2a9f 995 ): void {
f59e1027 996 workerNode.usage = workerUsage
0ebe2a9f
JB
997 }
998
8a1260a3
JB
999 /**
1000 * Gets the worker information.
1001 *
1002 * @param workerNodeKey - The worker node key.
1003 */
1004 private getWorkerInfo (workerNodeKey: number): WorkerInfo {
1005 return this.workerNodes[workerNodeKey].info
1006 }
1007
a05c10de 1008 /**
f06e48d8 1009 * Pushes the given worker in the pool worker nodes.
ea7a90d3 1010 *
38e795c1 1011 * @param worker - The worker.
f06e48d8 1012 * @returns The worker nodes length.
ea7a90d3 1013 */
f06e48d8 1014 private pushWorkerNode (worker: Worker): number {
9c16fb4b 1015 this.workerNodes.push({
ffcbbad8 1016 worker,
8a1260a3
JB
1017 info: this.getInitialWorkerInfo(worker),
1018 usage: this.getInitialWorkerUsage(),
29ee7e9a 1019 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 1020 })
9c16fb4b 1021 this.setWorkerNodeTasksUsage(
8a1260a3
JB
1022 this.workerNodes[this.getWorkerNodeKey(worker)],
1023 this.getInitialWorkerUsage(worker)
9c16fb4b
JB
1024 )
1025 return this.workerNodes.length
ea7a90d3 1026 }
c923ce56 1027
83fa0a36
JB
1028 /**
1029 * Gets the worker id.
1030 *
1031 * @param worker - The worker.
1032 * @returns The worker id.
1033 */
1034 private getWorkerId (worker: Worker): number | undefined {
1035 if (this.worker === WorkerTypes.thread) {
1036 return worker.threadId
1037 } else if (this.worker === WorkerTypes.cluster) {
1038 return worker.id
1039 }
1040 }
1041
8604aaab
JB
1042 // /**
1043 // * Sets the given worker in the pool worker nodes.
1044 // *
1045 // * @param workerNodeKey - The worker node key.
1046 // * @param worker - The worker.
f59e1027 1047 // * @param workerInfo - The worker info.
8604aaab
JB
1048 // * @param workerUsage - The worker usage.
1049 // * @param tasksQueue - The worker task queue.
1050 // */
1051 // private setWorkerNode (
1052 // workerNodeKey: number,
1053 // worker: Worker,
f59e1027 1054 // workerInfo: WorkerInfo,
8604aaab
JB
1055 // workerUsage: WorkerUsage,
1056 // tasksQueue: Queue<Task<Data>>
1057 // ): void {
1058 // this.workerNodes[workerNodeKey] = {
1059 // worker,
f59e1027
JB
1060 // info: workerInfo,
1061 // usage: workerUsage,
8604aaab
JB
1062 // tasksQueue
1063 // }
1064 // }
51fe3d3c
JB
1065
1066 /**
f06e48d8 1067 * Removes the given worker from the pool worker nodes.
51fe3d3c 1068 *
f06e48d8 1069 * @param worker - The worker.
51fe3d3c 1070 */
416fd65c 1071 private removeWorkerNode (worker: Worker): void {
f06e48d8 1072 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
1073 if (workerNodeKey !== -1) {
1074 this.workerNodes.splice(workerNodeKey, 1)
1075 this.workerChoiceStrategyContext.remove(workerNodeKey)
1076 }
51fe3d3c 1077 }
adc3c320 1078
2e81254d 1079 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1080 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
1081 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
1082 }
1083
f9f00b5f 1084 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 1085 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
1086 }
1087
416fd65c 1088 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 1089 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
1090 }
1091
416fd65c 1092 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 1093 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 1094 }
ff733df7 1095
df593701
JB
1096 private tasksMaxQueueSize (workerNodeKey: number): number {
1097 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
1098 }
1099
416fd65c 1100 private flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1101 while (this.tasksQueueSize(workerNodeKey) > 0) {
1102 this.executeTask(
1103 workerNodeKey,
1104 this.dequeueTask(workerNodeKey) as Task<Data>
1105 )
ff733df7 1106 }
df593701 1107 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
1108 }
1109
ef41a6e6
JB
1110 private flushTasksQueues (): void {
1111 for (const [workerNodeKey] of this.workerNodes.entries()) {
1112 this.flushTasksQueue(workerNodeKey)
1113 }
1114 }
b6b32453
JB
1115
1116 private setWorkerStatistics (worker: Worker): void {
1117 this.sendToWorker(worker, {
1118 statistics: {
87de9ff5
JB
1119 runTime:
1120 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 1121 .runTime.aggregate,
87de9ff5 1122 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 1123 .elu.aggregate
b6b32453
JB
1124 }
1125 })
1126 }
8604aaab 1127
8a1260a3
JB
1128 private getInitialWorkerUsage (worker?: Worker): WorkerUsage {
1129 const getTasksQueueSize = (worker?: Worker): number => {
71514351
JB
1130 if (worker == null) {
1131 return 0
1132 }
02fd3265 1133 // FIXME: Workaround tasks queue initialization race issue.
71514351
JB
1134 try {
1135 return this.tasksQueueSize(this.getWorkerNodeKey(worker))
1136 } catch {
1137 return 0
1138 }
9c16fb4b 1139 }
8a1260a3 1140 const getTasksMaxQueueSize = (worker?: Worker): number => {
71514351
JB
1141 if (worker == null) {
1142 return 0
1143 }
02fd3265 1144 // FIXME: Workaround tasks queue initialization race issue.
71514351
JB
1145 try {
1146 return this.tasksMaxQueueSize(this.getWorkerNodeKey(worker))
1147 } catch {
1148 return 0
1149 }
df593701 1150 }
8604aaab 1151 return {
9c16fb4b
JB
1152 tasks: {
1153 executed: 0,
1154 executing: 0,
1155 get queued (): number {
8a1260a3 1156 return getTasksQueueSize(worker)
9c16fb4b 1157 },
df593701 1158 get maxQueued (): number {
8a1260a3 1159 return getTasksMaxQueueSize(worker)
df593701 1160 },
9c16fb4b
JB
1161 failed: 0
1162 },
8604aaab 1163 runTime: {
8604aaab
JB
1164 history: new CircularArray()
1165 },
1166 waitTime: {
8604aaab
JB
1167 history: new CircularArray()
1168 },
5df69fab
JB
1169 elu: {
1170 idle: {
5df69fab
JB
1171 history: new CircularArray()
1172 },
1173 active: {
5df69fab 1174 history: new CircularArray()
f7510105 1175 }
5df69fab 1176 }
8604aaab
JB
1177 }
1178 }
8a1260a3
JB
1179
1180 private getInitialWorkerInfo (worker: Worker): WorkerInfo {
1181 return { id: this.getWorkerId(worker), dynamic: false, started: true }
1182 }
c97c7edb 1183}