Merge pull request #786 from poolifier/worker-info
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
afe0d5bf
JB
8 median,
9 round
bbeadd16 10} from '../utils'
34a0cfab 11import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 12import { CircularArray } from '../circular-array'
29ee7e9a 13import { Queue } from '../queue'
c4855468 14import {
65d7a1c9 15 type IPool,
7c5a1080 16 PoolEmitter,
c4855468 17 PoolEvents,
6b27d407 18 type PoolInfo,
c4855468 19 type PoolOptions,
6b27d407
JB
20 type PoolType,
21 PoolTypes,
184855e6 22 type TasksQueueOptions,
83fa0a36
JB
23 type WorkerType,
24 WorkerTypes
c4855468 25} from './pool'
e102732c
JB
26import type {
27 IWorker,
28 MessageHandler,
29 Task,
30 WorkerNode,
31 WorkerUsage
32} from './worker'
a35560ba 33import {
f0d7f803 34 Measurements,
a35560ba 35 WorkerChoiceStrategies,
a20f0ba5
JB
36 type WorkerChoiceStrategy,
37 type WorkerChoiceStrategyOptions
bdaf31cd
JB
38} from './selection-strategies/selection-strategies-types'
39import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 40
729c563d 41/**
ea7a90d3 42 * Base class that implements some shared logic for all poolifier pools.
729c563d 43 *
38e795c1 44 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
45 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
46 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 47 */
c97c7edb 48export abstract class AbstractPool<
f06e48d8 49 Worker extends IWorker,
d3c8a1a8
S
50 Data = unknown,
51 Response = unknown
c4855468 52> implements IPool<Worker, Data, Response> {
afc003b2 53 /** @inheritDoc */
f06e48d8 54 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 55
afc003b2 56 /** @inheritDoc */
7c0ba920
JB
57 public readonly emitter?: PoolEmitter
58
be0676b3 59 /**
a3445496 60 * The execution response promise map.
be0676b3 61 *
2740a743 62 * - `key`: The message id of each submitted task.
a3445496 63 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 64 *
a3445496 65 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 66 */
c923ce56
JB
67 protected promiseResponseMap: Map<
68 string,
69 PromiseResponseWrapper<Worker, Response>
70 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 71
a35560ba 72 /**
51fe3d3c 73 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
74 */
75 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
76 Worker,
77 Data,
78 Response
a35560ba
S
79 >
80
afe0d5bf
JB
81 /**
82 * The start timestamp of the pool.
83 */
84 private readonly startTimestamp
85
729c563d
S
86 /**
87 * Constructs a new poolifier pool.
88 *
38e795c1 89 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 90 * @param filePath - Path to the worker file.
38e795c1 91 * @param opts - Options for the pool.
729c563d 92 */
c97c7edb 93 public constructor (
b4213b7f
JB
94 protected readonly numberOfWorkers: number,
95 protected readonly filePath: string,
96 protected readonly opts: PoolOptions<Worker>
c97c7edb 97 ) {
78cea37e 98 if (!this.isMain()) {
c97c7edb
S
99 throw new Error('Cannot start a pool from a worker!')
100 }
8d3782fa 101 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 102 this.checkFilePath(this.filePath)
7c0ba920 103 this.checkPoolOptions(this.opts)
1086026a 104
7254e419
JB
105 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
106 this.executeTask = this.executeTask.bind(this)
107 this.enqueueTask = this.enqueueTask.bind(this)
108 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 109
6bd72cd0 110 if (this.opts.enableEvents === true) {
7c0ba920
JB
111 this.emitter = new PoolEmitter()
112 }
d59df138
JB
113 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
114 Worker,
115 Data,
116 Response
da309861
JB
117 >(
118 this,
119 this.opts.workerChoiceStrategy,
120 this.opts.workerChoiceStrategyOptions
121 )
b6b32453
JB
122
123 this.setupHook()
124
afe0d5bf 125 while (this.workerNodes.length < this.numberOfWorkers) {
b6b32453
JB
126 this.createAndSetupWorker()
127 }
afe0d5bf
JB
128
129 this.startTimestamp = performance.now()
c97c7edb
S
130 }
131
a35560ba 132 private checkFilePath (filePath: string): void {
ffcbbad8
JB
133 if (
134 filePath == null ||
135 (typeof filePath === 'string' && filePath.trim().length === 0)
136 ) {
c510fea7
APA
137 throw new Error('Please specify a file with a worker implementation')
138 }
139 }
140
8d3782fa
JB
141 private checkNumberOfWorkers (numberOfWorkers: number): void {
142 if (numberOfWorkers == null) {
143 throw new Error(
144 'Cannot instantiate a pool without specifying the number of workers'
145 )
78cea37e 146 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 147 throw new TypeError(
0d80593b 148 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
149 )
150 } else if (numberOfWorkers < 0) {
473c717a 151 throw new RangeError(
8d3782fa
JB
152 'Cannot instantiate a pool with a negative number of workers'
153 )
6b27d407 154 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
155 throw new Error('Cannot instantiate a fixed pool with no worker')
156 }
157 }
158
7c0ba920 159 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
160 if (isPlainObject(opts)) {
161 this.opts.workerChoiceStrategy =
162 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
163 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
164 this.opts.workerChoiceStrategyOptions =
165 opts.workerChoiceStrategyOptions ??
166 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
167 this.checkValidWorkerChoiceStrategyOptions(
168 this.opts.workerChoiceStrategyOptions
169 )
1f68cede 170 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
171 this.opts.enableEvents = opts.enableEvents ?? true
172 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
173 if (this.opts.enableTasksQueue) {
174 this.checkValidTasksQueueOptions(
175 opts.tasksQueueOptions as TasksQueueOptions
176 )
177 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
178 opts.tasksQueueOptions as TasksQueueOptions
179 )
180 }
181 } else {
182 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 183 }
aee46736
JB
184 }
185
186 private checkValidWorkerChoiceStrategy (
187 workerChoiceStrategy: WorkerChoiceStrategy
188 ): void {
189 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 190 throw new Error(
aee46736 191 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
192 )
193 }
7c0ba920
JB
194 }
195
0d80593b
JB
196 private checkValidWorkerChoiceStrategyOptions (
197 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
198 ): void {
199 if (!isPlainObject(workerChoiceStrategyOptions)) {
200 throw new TypeError(
201 'Invalid worker choice strategy options: must be a plain object'
202 )
203 }
49be33fe
JB
204 if (
205 workerChoiceStrategyOptions.weights != null &&
6b27d407 206 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
207 ) {
208 throw new Error(
209 'Invalid worker choice strategy options: must have a weight for each worker node'
210 )
211 }
f0d7f803
JB
212 if (
213 workerChoiceStrategyOptions.measurement != null &&
214 !Object.values(Measurements).includes(
215 workerChoiceStrategyOptions.measurement
216 )
217 ) {
218 throw new Error(
219 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
220 )
221 }
0d80593b
JB
222 }
223
a20f0ba5
JB
224 private checkValidTasksQueueOptions (
225 tasksQueueOptions: TasksQueueOptions
226 ): void {
0d80593b
JB
227 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
228 throw new TypeError('Invalid tasks queue options: must be a plain object')
229 }
f0d7f803
JB
230 if (
231 tasksQueueOptions?.concurrency != null &&
232 !Number.isSafeInteger(tasksQueueOptions.concurrency)
233 ) {
234 throw new TypeError(
235 'Invalid worker tasks concurrency: must be an integer'
236 )
237 }
238 if (
239 tasksQueueOptions?.concurrency != null &&
240 tasksQueueOptions.concurrency <= 0
241 ) {
a20f0ba5 242 throw new Error(
f0d7f803 243 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
244 )
245 }
246 }
247
08f3f44c 248 /** @inheritDoc */
6b27d407
JB
249 public get info (): PoolInfo {
250 return {
251 type: this.type,
184855e6 252 worker: this.worker,
6b27d407
JB
253 minSize: this.minSize,
254 maxSize: this.maxSize,
afe0d5bf 255 utilization: round(this.utilization),
6b27d407
JB
256 workerNodes: this.workerNodes.length,
257 idleWorkerNodes: this.workerNodes.reduce(
258 (accumulator, workerNode) =>
f59e1027 259 workerNode.usage.tasks.executing === 0
a4e07f72
JB
260 ? accumulator + 1
261 : accumulator,
6b27d407
JB
262 0
263 ),
264 busyWorkerNodes: this.workerNodes.reduce(
265 (accumulator, workerNode) =>
f59e1027 266 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
267 0
268 ),
a4e07f72 269 executedTasks: this.workerNodes.reduce(
6b27d407 270 (accumulator, workerNode) =>
f59e1027 271 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
272 0
273 ),
274 executingTasks: this.workerNodes.reduce(
275 (accumulator, workerNode) =>
f59e1027 276 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
277 0
278 ),
279 queuedTasks: this.workerNodes.reduce(
df593701 280 (accumulator, workerNode) =>
f59e1027 281 accumulator + workerNode.usage.tasks.queued,
6b27d407
JB
282 0
283 ),
284 maxQueuedTasks: this.workerNodes.reduce(
285 (accumulator, workerNode) =>
f59e1027 286 accumulator + workerNode.usage.tasks.maxQueued,
6b27d407 287 0
a4e07f72
JB
288 ),
289 failedTasks: this.workerNodes.reduce(
290 (accumulator, workerNode) =>
f59e1027 291 accumulator + workerNode.usage.tasks.failed,
a4e07f72 292 0
6b27d407
JB
293 )
294 }
295 }
08f3f44c 296
afe0d5bf
JB
297 /**
298 * Gets the pool run time.
299 *
300 * @returns The pool run time in milliseconds.
301 */
302 private get runTime (): number {
303 return performance.now() - this.startTimestamp
304 }
305
306 /**
307 * Gets the approximate pool utilization.
308 *
309 * @returns The pool utilization.
310 */
311 private get utilization (): number {
312 const poolRunTimeCapacity = this.runTime * this.maxSize
313 const totalTasksRunTime = this.workerNodes.reduce(
314 (accumulator, workerNode) =>
315 accumulator + workerNode.usage.runTime.aggregate,
316 0
317 )
318 const totalTasksWaitTime = this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 accumulator + workerNode.usage.waitTime.aggregate,
321 0
322 )
323 return (totalTasksRunTime + totalTasksWaitTime) / poolRunTimeCapacity
324 }
325
8881ae32
JB
326 /**
327 * Pool type.
328 *
329 * If it is `'dynamic'`, it provides the `max` property.
330 */
331 protected abstract get type (): PoolType
332
184855e6
JB
333 /**
334 * Gets the worker type.
335 */
336 protected abstract get worker (): WorkerType
337
c2ade475 338 /**
6b27d407 339 * Pool minimum size.
c2ade475 340 */
6b27d407 341 protected abstract get minSize (): number
ff733df7
JB
342
343 /**
6b27d407 344 * Pool maximum size.
ff733df7 345 */
6b27d407 346 protected abstract get maxSize (): number
a35560ba 347
f59e1027
JB
348 /**
349 * Get the worker given its id.
350 *
351 * @param workerId - The worker id.
352 * @returns The worker if found in the pool worker nodes, `undefined` otherwise.
353 */
354 private getWorkerById (workerId: number): Worker | undefined {
355 return this.workerNodes.find(workerNode => workerNode.info.id === workerId)
356 ?.worker
357 }
358
ffcbbad8 359 /**
f06e48d8 360 * Gets the given worker its worker node key.
ffcbbad8
JB
361 *
362 * @param worker - The worker.
f59e1027 363 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 364 */
f06e48d8
JB
365 private getWorkerNodeKey (worker: Worker): number {
366 return this.workerNodes.findIndex(
367 workerNode => workerNode.worker === worker
368 )
bf9549ae
JB
369 }
370
afc003b2 371 /** @inheritDoc */
a35560ba 372 public setWorkerChoiceStrategy (
59219cbb
JB
373 workerChoiceStrategy: WorkerChoiceStrategy,
374 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 375 ): void {
aee46736 376 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 377 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
378 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
379 this.opts.workerChoiceStrategy
380 )
381 if (workerChoiceStrategyOptions != null) {
382 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
383 }
9c16fb4b 384 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
385 this.setWorkerNodeTasksUsage(
386 workerNode,
9c16fb4b 387 this.getWorkerUsage(workerNodeKey)
8604aaab 388 )
b6b32453 389 this.setWorkerStatistics(workerNode.worker)
59219cbb 390 }
a20f0ba5
JB
391 }
392
393 /** @inheritDoc */
394 public setWorkerChoiceStrategyOptions (
395 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
396 ): void {
0d80593b 397 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
398 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
399 this.workerChoiceStrategyContext.setOptions(
400 this.opts.workerChoiceStrategyOptions
a35560ba
S
401 )
402 }
403
a20f0ba5 404 /** @inheritDoc */
8f52842f
JB
405 public enableTasksQueue (
406 enable: boolean,
407 tasksQueueOptions?: TasksQueueOptions
408 ): void {
a20f0ba5 409 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 410 this.flushTasksQueues()
a20f0ba5
JB
411 }
412 this.opts.enableTasksQueue = enable
8f52842f 413 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
414 }
415
416 /** @inheritDoc */
8f52842f 417 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 418 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
419 this.checkValidTasksQueueOptions(tasksQueueOptions)
420 this.opts.tasksQueueOptions =
421 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 422 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
423 delete this.opts.tasksQueueOptions
424 }
425 }
426
427 private buildTasksQueueOptions (
428 tasksQueueOptions: TasksQueueOptions
429 ): TasksQueueOptions {
430 return {
431 concurrency: tasksQueueOptions?.concurrency ?? 1
432 }
433 }
434
c319c66b
JB
435 /**
436 * Whether the pool is full or not.
437 *
438 * The pool filling boolean status.
439 */
dea903a8
JB
440 protected get full (): boolean {
441 return this.workerNodes.length >= this.maxSize
442 }
c2ade475 443
c319c66b
JB
444 /**
445 * Whether the pool is busy or not.
446 *
447 * The pool busyness boolean status.
448 */
449 protected abstract get busy (): boolean
7c0ba920 450
6c6afb84
JB
451 /**
452 * Whether worker nodes are executing at least one task.
453 *
454 * @returns Worker nodes busyness boolean status.
455 */
c2ade475 456 protected internalBusy (): boolean {
e0ae6100
JB
457 return (
458 this.workerNodes.findIndex(workerNode => {
f59e1027 459 return workerNode.usage.tasks.executing === 0
e0ae6100
JB
460 }) === -1
461 )
cb70b19d
JB
462 }
463
afc003b2 464 /** @inheritDoc */
a86b6df1 465 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 466 const timestamp = performance.now()
20dcad1a 467 const workerNodeKey = this.chooseWorkerNode()
adc3c320 468 const submittedTask: Task<Data> = {
a86b6df1 469 name,
e5a5c0fc
JB
470 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
471 data: data ?? ({} as Data),
b6b32453 472 timestamp,
adc3c320
JB
473 id: crypto.randomUUID()
474 }
2e81254d 475 const res = new Promise<Response>((resolve, reject) => {
02706357 476 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
477 resolve,
478 reject,
20dcad1a 479 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
480 })
481 })
ff733df7
JB
482 if (
483 this.opts.enableTasksQueue === true &&
7171d33f 484 (this.busy ||
f59e1027 485 this.workerNodes[workerNodeKey].usage.tasks.executing >=
7171d33f 486 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 487 .concurrency as number))
ff733df7 488 ) {
26a929d7
JB
489 this.enqueueTask(workerNodeKey, submittedTask)
490 } else {
2e81254d 491 this.executeTask(workerNodeKey, submittedTask)
adc3c320 492 }
ff733df7 493 this.checkAndEmitEvents()
78cea37e 494 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
495 return res
496 }
c97c7edb 497
afc003b2 498 /** @inheritDoc */
c97c7edb 499 public async destroy (): Promise<void> {
1fbcaa7c 500 await Promise.all(
875a7c37
JB
501 this.workerNodes.map(async (workerNode, workerNodeKey) => {
502 this.flushTasksQueue(workerNodeKey)
47aacbaa 503 // FIXME: wait for tasks to be finished
f06e48d8 504 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
505 })
506 )
c97c7edb
S
507 }
508
4a6952ff 509 /**
6c6afb84 510 * Terminates the given worker.
4a6952ff 511 *
f06e48d8 512 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
513 */
514 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 515
729c563d 516 /**
6677a3d3
JB
517 * Setup hook to execute code before worker nodes are created in the abstract constructor.
518 * Can be overridden.
afc003b2
JB
519 *
520 * @virtual
729c563d 521 */
280c2a77 522 protected setupHook (): void {
d99ba5a8 523 // Intentionally empty
280c2a77 524 }
c97c7edb 525
729c563d 526 /**
280c2a77
S
527 * Should return whether the worker is the main worker or not.
528 */
529 protected abstract isMain (): boolean
530
531 /**
2e81254d 532 * Hook executed before the worker task execution.
bf9549ae 533 * Can be overridden.
729c563d 534 *
f06e48d8 535 * @param workerNodeKey - The worker node key.
1c6fe997 536 * @param task - The task to execute.
729c563d 537 */
1c6fe997
JB
538 protected beforeTaskExecutionHook (
539 workerNodeKey: number,
540 task: Task<Data>
541 ): void {
f59e1027 542 const workerUsage = this.workerNodes[workerNodeKey].usage
1c6fe997
JB
543 ++workerUsage.tasks.executing
544 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
545 }
546
c01733f1 547 /**
2e81254d 548 * Hook executed after the worker task execution.
bf9549ae 549 * Can be overridden.
c01733f1 550 *
c923ce56 551 * @param worker - The worker.
38e795c1 552 * @param message - The received message.
c01733f1 553 */
2e81254d 554 protected afterTaskExecutionHook (
c923ce56 555 worker: Worker,
2740a743 556 message: MessageValue<Response>
bf9549ae 557 ): void {
f59e1027 558 const workerUsage = this.workerNodes[this.getWorkerNodeKey(worker)].usage
f1c06930
JB
559 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
560 this.updateRunTimeWorkerUsage(workerUsage, message)
561 this.updateEluWorkerUsage(workerUsage, message)
562 }
563
564 private updateTaskStatisticsWorkerUsage (
565 workerUsage: WorkerUsage,
566 message: MessageValue<Response>
567 ): void {
a4e07f72
JB
568 const workerTaskStatistics = workerUsage.tasks
569 --workerTaskStatistics.executing
570 ++workerTaskStatistics.executed
82f36766 571 if (message.taskError != null) {
a4e07f72 572 ++workerTaskStatistics.failed
2740a743 573 }
f8eb0a2a
JB
574 }
575
a4e07f72
JB
576 private updateRunTimeWorkerUsage (
577 workerUsage: WorkerUsage,
f8eb0a2a
JB
578 message: MessageValue<Response>
579 ): void {
87de9ff5
JB
580 if (
581 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 582 .aggregate
87de9ff5 583 ) {
932fc8be 584 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 585 if (
932fc8be
JB
586 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
587 .average &&
a4e07f72 588 workerUsage.tasks.executed !== 0
c6bd2650 589 ) {
a4e07f72 590 workerUsage.runTime.average =
f1c06930
JB
591 workerUsage.runTime.aggregate /
592 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 593 }
3fa4cdd2 594 if (
932fc8be
JB
595 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
596 .median &&
d715b7bc 597 message.taskPerformance?.runTime != null
3fa4cdd2 598 ) {
a4e07f72
JB
599 workerUsage.runTime.history.push(message.taskPerformance.runTime)
600 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 601 }
3032893a 602 }
f8eb0a2a
JB
603 }
604
a4e07f72
JB
605 private updateWaitTimeWorkerUsage (
606 workerUsage: WorkerUsage,
1c6fe997 607 task: Task<Data>
f8eb0a2a 608 ): void {
1c6fe997
JB
609 const timestamp = performance.now()
610 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
611 if (
612 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 613 .aggregate
87de9ff5 614 ) {
932fc8be 615 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 616 if (
87de9ff5 617 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 618 .waitTime.average &&
a4e07f72 619 workerUsage.tasks.executed !== 0
09a6305f 620 ) {
a4e07f72 621 workerUsage.waitTime.average =
f1c06930
JB
622 workerUsage.waitTime.aggregate /
623 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
624 }
625 if (
87de9ff5 626 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 627 .waitTime.median &&
1c6fe997 628 taskWaitTime != null
09a6305f 629 ) {
1c6fe997 630 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 631 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 632 }
0567595a 633 }
c01733f1 634 }
635
a4e07f72 636 private updateEluWorkerUsage (
5df69fab 637 workerUsage: WorkerUsage,
62c15a68
JB
638 message: MessageValue<Response>
639 ): void {
5df69fab
JB
640 if (
641 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
642 .aggregate
643 ) {
644 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
645 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
646 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
647 workerUsage.elu.utilization =
648 (workerUsage.elu.utilization +
649 message.taskPerformance.elu.utilization) /
650 2
651 } else if (message.taskPerformance?.elu != null) {
652 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
653 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
654 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
655 }
d715b7bc 656 if (
5df69fab
JB
657 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
658 .average &&
659 workerUsage.tasks.executed !== 0
660 ) {
f1c06930
JB
661 const executedTasks =
662 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 663 workerUsage.elu.idle.average =
f1c06930 664 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 665 workerUsage.elu.active.average =
f1c06930 666 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
667 }
668 if (
669 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
670 .median &&
d715b7bc
JB
671 message.taskPerformance?.elu != null
672 ) {
5df69fab
JB
673 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
674 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
675 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
676 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
677 }
678 }
679 }
680
280c2a77 681 /**
f06e48d8 682 * Chooses a worker node for the next task.
280c2a77 683 *
6c6afb84 684 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 685 *
20dcad1a 686 * @returns The worker node key
280c2a77 687 */
6c6afb84 688 private chooseWorkerNode (): number {
930dcf12 689 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
690 const worker = this.createAndSetupDynamicWorker()
691 if (
692 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
693 ) {
694 return this.getWorkerNodeKey(worker)
695 }
17393ac8 696 }
930dcf12
JB
697 return this.workerChoiceStrategyContext.execute()
698 }
699
6c6afb84
JB
700 /**
701 * Conditions for dynamic worker creation.
702 *
703 * @returns Whether to create a dynamic worker or not.
704 */
705 private shallCreateDynamicWorker (): boolean {
930dcf12 706 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
707 }
708
280c2a77 709 /**
675bb809 710 * Sends a message to the given worker.
280c2a77 711 *
38e795c1
JB
712 * @param worker - The worker which should receive the message.
713 * @param message - The message.
280c2a77
S
714 */
715 protected abstract sendToWorker (
716 worker: Worker,
717 message: MessageValue<Data>
718 ): void
719
4a6952ff 720 /**
f06e48d8 721 * Registers a listener callback on the given worker.
4a6952ff 722 *
38e795c1
JB
723 * @param worker - The worker which should register a listener.
724 * @param listener - The message listener callback.
4a6952ff 725 */
e102732c
JB
726 private registerWorkerMessageListener<Message extends Data | Response>(
727 worker: Worker,
728 listener: (message: MessageValue<Message>) => void
729 ): void {
730 worker.on('message', listener as MessageHandler<Worker>)
731 }
c97c7edb 732
729c563d 733 /**
41344292 734 * Creates a new worker.
6c6afb84
JB
735 *
736 * @returns Newly created worker.
729c563d 737 */
280c2a77 738 protected abstract createWorker (): Worker
c97c7edb 739
729c563d 740 /**
f06e48d8 741 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 742 * Can be overridden.
729c563d 743 *
38e795c1 744 * @param worker - The newly created worker.
729c563d 745 */
6677a3d3 746 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
747 // Listen to worker messages.
748 this.registerWorkerMessageListener(worker, this.workerListener())
749 }
c97c7edb 750
4a6952ff 751 /**
f06e48d8 752 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
753 *
754 * @returns New, completely set up worker.
755 */
756 protected createAndSetupWorker (): Worker {
bdacc2d2 757 const worker = this.createWorker()
280c2a77 758
35cf1c03 759 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 760 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
761 worker.on('error', error => {
762 if (this.emitter != null) {
763 this.emitter.emit(PoolEvents.error, error)
764 }
e8a4c3ea 765 if (this.opts.restartWorkerOnError === true) {
1f68cede 766 this.createAndSetupWorker()
5baee0d7
JB
767 }
768 })
a35560ba
S
769 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
770 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 771 worker.once('exit', () => {
f06e48d8 772 this.removeWorkerNode(worker)
a974afa6 773 })
280c2a77 774
f06e48d8 775 this.pushWorkerNode(worker)
280c2a77 776
b6b32453
JB
777 this.setWorkerStatistics(worker)
778
280c2a77
S
779 this.afterWorkerSetup(worker)
780
c97c7edb
S
781 return worker
782 }
be0676b3 783
930dcf12
JB
784 /**
785 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
786 *
787 * @returns New, completely set up dynamic worker.
788 */
789 protected createAndSetupDynamicWorker (): Worker {
790 const worker = this.createAndSetupWorker()
791 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 792 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
793 if (
794 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
795 (message.kill != null &&
796 ((this.opts.enableTasksQueue === false &&
f59e1027 797 this.workerNodes[workerNodeKey].usage.tasks.executing === 0) ||
7b56f532 798 (this.opts.enableTasksQueue === true &&
f59e1027 799 this.workerNodes[workerNodeKey].usage.tasks.executing === 0 &&
e8b3a5ab 800 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
801 ) {
802 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
803 void (this.destroyWorker(worker) as Promise<void>)
804 }
805 })
806 return worker
807 }
808
be0676b3 809 /**
ff733df7 810 * This function is the listener registered for each worker message.
be0676b3 811 *
bdacc2d2 812 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
813 */
814 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 815 return message => {
f59e1027
JB
816 if (message.workerId != null && message.started != null) {
817 // Worker started message received
83fa0a36
JB
818 const worker = this.getWorkerById(message.workerId)
819 if (worker != null) {
820 this.workerNodes[this.getWorkerNodeKey(worker)].info.started =
821 message.started
822 } else {
7ae6fb74
JB
823 throw new Error(
824 `Worker started message received from unknown worker '${message.workerId}'`
825 )
83fa0a36 826 }
f59e1027 827 } else if (message.id != null) {
a3445496 828 // Task execution response received
2740a743 829 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 830 if (promiseResponse != null) {
82f36766 831 if (message.taskError != null) {
91ee39ed 832 if (this.emitter != null) {
82f36766 833 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 834 }
985d0e79 835 promiseResponse.reject(message.taskError.message)
a05c10de 836 } else {
2740a743 837 promiseResponse.resolve(message.data as Response)
a05c10de 838 }
2e81254d 839 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 840 this.promiseResponseMap.delete(message.id)
ff733df7
JB
841 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
842 if (
843 this.opts.enableTasksQueue === true &&
416fd65c 844 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 845 ) {
2e81254d
JB
846 this.executeTask(
847 workerNodeKey,
ff733df7
JB
848 this.dequeueTask(workerNodeKey) as Task<Data>
849 )
850 }
e5536a06 851 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
852 }
853 }
854 }
be0676b3 855 }
7c0ba920 856
ff733df7 857 private checkAndEmitEvents (): void {
1f68cede 858 if (this.emitter != null) {
ff733df7 859 if (this.busy) {
6b27d407 860 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 861 }
6b27d407
JB
862 if (this.type === PoolTypes.dynamic && this.full) {
863 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 864 }
164d950a
JB
865 }
866 }
867
0ebe2a9f
JB
868 /**
869 * Sets the given worker node its tasks usage in the pool.
870 *
871 * @param workerNode - The worker node.
a4e07f72 872 * @param workerUsage - The worker usage.
0ebe2a9f
JB
873 */
874 private setWorkerNodeTasksUsage (
875 workerNode: WorkerNode<Worker, Data>,
a4e07f72 876 workerUsage: WorkerUsage
0ebe2a9f 877 ): void {
f59e1027 878 workerNode.usage = workerUsage
0ebe2a9f
JB
879 }
880
a05c10de 881 /**
f06e48d8 882 * Pushes the given worker in the pool worker nodes.
ea7a90d3 883 *
38e795c1 884 * @param worker - The worker.
f06e48d8 885 * @returns The worker nodes length.
ea7a90d3 886 */
f06e48d8 887 private pushWorkerNode (worker: Worker): number {
9c16fb4b 888 this.workerNodes.push({
ffcbbad8 889 worker,
7ae6fb74 890 info: { id: this.getWorkerId(worker), started: true },
f59e1027 891 usage: this.getWorkerUsage(),
29ee7e9a 892 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 893 })
9c16fb4b
JB
894 const workerNodeKey = this.getWorkerNodeKey(worker)
895 this.setWorkerNodeTasksUsage(
896 this.workerNodes[workerNodeKey],
897 this.getWorkerUsage(workerNodeKey)
898 )
899 return this.workerNodes.length
ea7a90d3 900 }
c923ce56 901
83fa0a36
JB
902 /**
903 * Gets the worker id.
904 *
905 * @param worker - The worker.
906 * @returns The worker id.
907 */
908 private getWorkerId (worker: Worker): number | undefined {
909 if (this.worker === WorkerTypes.thread) {
910 return worker.threadId
911 } else if (this.worker === WorkerTypes.cluster) {
912 return worker.id
913 }
914 }
915
8604aaab
JB
916 // /**
917 // * Sets the given worker in the pool worker nodes.
918 // *
919 // * @param workerNodeKey - The worker node key.
920 // * @param worker - The worker.
f59e1027 921 // * @param workerInfo - The worker info.
8604aaab
JB
922 // * @param workerUsage - The worker usage.
923 // * @param tasksQueue - The worker task queue.
924 // */
925 // private setWorkerNode (
926 // workerNodeKey: number,
927 // worker: Worker,
f59e1027 928 // workerInfo: WorkerInfo,
8604aaab
JB
929 // workerUsage: WorkerUsage,
930 // tasksQueue: Queue<Task<Data>>
931 // ): void {
932 // this.workerNodes[workerNodeKey] = {
933 // worker,
f59e1027
JB
934 // info: workerInfo,
935 // usage: workerUsage,
8604aaab
JB
936 // tasksQueue
937 // }
938 // }
51fe3d3c
JB
939
940 /**
f06e48d8 941 * Removes the given worker from the pool worker nodes.
51fe3d3c 942 *
f06e48d8 943 * @param worker - The worker.
51fe3d3c 944 */
416fd65c 945 private removeWorkerNode (worker: Worker): void {
f06e48d8 946 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
947 if (workerNodeKey !== -1) {
948 this.workerNodes.splice(workerNodeKey, 1)
949 this.workerChoiceStrategyContext.remove(workerNodeKey)
950 }
51fe3d3c 951 }
adc3c320 952
2e81254d 953 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 954 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
955 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
956 }
957
f9f00b5f 958 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 959 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
960 }
961
416fd65c 962 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 963 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
964 }
965
416fd65c 966 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 967 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 968 }
ff733df7 969
df593701
JB
970 private tasksMaxQueueSize (workerNodeKey: number): number {
971 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
972 }
973
416fd65c
JB
974 private flushTasksQueue (workerNodeKey: number): void {
975 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
976 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
977 this.executeTask(
978 workerNodeKey,
979 this.dequeueTask(workerNodeKey) as Task<Data>
980 )
ff733df7 981 }
ff733df7 982 }
df593701 983 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
984 }
985
ef41a6e6
JB
986 private flushTasksQueues (): void {
987 for (const [workerNodeKey] of this.workerNodes.entries()) {
988 this.flushTasksQueue(workerNodeKey)
989 }
990 }
b6b32453
JB
991
992 private setWorkerStatistics (worker: Worker): void {
993 this.sendToWorker(worker, {
994 statistics: {
87de9ff5
JB
995 runTime:
996 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 997 .runTime.aggregate,
87de9ff5 998 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 999 .elu.aggregate
b6b32453
JB
1000 }
1001 })
1002 }
8604aaab 1003
9c16fb4b 1004 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
1005 const getTasksQueueSize = (workerNodeKey?: number): number => {
1006 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 1007 }
df593701
JB
1008 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
1009 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
1010 }
8604aaab 1011 return {
9c16fb4b
JB
1012 tasks: {
1013 executed: 0,
1014 executing: 0,
1015 get queued (): number {
e3347a5c 1016 return getTasksQueueSize(workerNodeKey)
9c16fb4b 1017 },
df593701
JB
1018 get maxQueued (): number {
1019 return getTasksMaxQueueSize(workerNodeKey)
1020 },
9c16fb4b
JB
1021 failed: 0
1022 },
8604aaab 1023 runTime: {
932fc8be 1024 aggregate: 0,
8604aaab
JB
1025 average: 0,
1026 median: 0,
1027 history: new CircularArray()
1028 },
1029 waitTime: {
932fc8be 1030 aggregate: 0,
8604aaab
JB
1031 average: 0,
1032 median: 0,
1033 history: new CircularArray()
1034 },
5df69fab
JB
1035 elu: {
1036 idle: {
1037 aggregate: 0,
1038 average: 0,
1039 median: 0,
1040 history: new CircularArray()
1041 },
1042 active: {
1043 aggregate: 0,
1044 average: 0,
1045 median: 0,
1046 history: new CircularArray()
1047 },
1048 utilization: 0
1049 }
8604aaab
JB
1050 }
1051 }
c97c7edb 1052}