feat: add statistics accounting to ELU fields
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
8604aaab
JB
24import type {
25 IWorker,
26 Task,
27 TaskStatistics,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba
S
31import {
32 WorkerChoiceStrategies,
a20f0ba5
JB
33 type WorkerChoiceStrategy,
34 type WorkerChoiceStrategyOptions
bdaf31cd
JB
35} from './selection-strategies/selection-strategies-types'
36import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 37
729c563d 38/**
ea7a90d3 39 * Base class that implements some shared logic for all poolifier pools.
729c563d 40 *
38e795c1
JB
41 * @typeParam Worker - Type of worker which manages this pool.
42 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 43 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 44 */
c97c7edb 45export abstract class AbstractPool<
f06e48d8 46 Worker extends IWorker,
d3c8a1a8
S
47 Data = unknown,
48 Response = unknown
c4855468 49> implements IPool<Worker, Data, Response> {
afc003b2 50 /** @inheritDoc */
f06e48d8 51 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 52
afc003b2 53 /** @inheritDoc */
7c0ba920
JB
54 public readonly emitter?: PoolEmitter
55
be0676b3 56 /**
a3445496 57 * The execution response promise map.
be0676b3 58 *
2740a743 59 * - `key`: The message id of each submitted task.
a3445496 60 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 61 *
a3445496 62 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 63 */
c923ce56
JB
64 protected promiseResponseMap: Map<
65 string,
66 PromiseResponseWrapper<Worker, Response>
67 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 68
a35560ba 69 /**
51fe3d3c 70 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 71 *
51fe3d3c 72 * Default to a round robin algorithm.
a35560ba
S
73 */
74 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
75 Worker,
76 Data,
77 Response
a35560ba
S
78 >
79
729c563d
S
80 /**
81 * Constructs a new poolifier pool.
82 *
38e795c1 83 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 84 * @param filePath - Path to the worker file.
38e795c1 85 * @param opts - Options for the pool.
729c563d 86 */
c97c7edb 87 public constructor (
b4213b7f
JB
88 protected readonly numberOfWorkers: number,
89 protected readonly filePath: string,
90 protected readonly opts: PoolOptions<Worker>
c97c7edb 91 ) {
78cea37e 92 if (!this.isMain()) {
c97c7edb
S
93 throw new Error('Cannot start a pool from a worker!')
94 }
8d3782fa 95 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 96 this.checkFilePath(this.filePath)
7c0ba920 97 this.checkPoolOptions(this.opts)
1086026a 98
7254e419
JB
99 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
100 this.executeTask = this.executeTask.bind(this)
101 this.enqueueTask = this.enqueueTask.bind(this)
102 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 103
6bd72cd0 104 if (this.opts.enableEvents === true) {
7c0ba920
JB
105 this.emitter = new PoolEmitter()
106 }
d59df138
JB
107 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
108 Worker,
109 Data,
110 Response
da309861
JB
111 >(
112 this,
113 this.opts.workerChoiceStrategy,
114 this.opts.workerChoiceStrategyOptions
115 )
b6b32453
JB
116
117 this.setupHook()
118
119 for (let i = 1; i <= this.numberOfWorkers; i++) {
120 this.createAndSetupWorker()
121 }
c97c7edb
S
122 }
123
a35560ba 124 private checkFilePath (filePath: string): void {
ffcbbad8
JB
125 if (
126 filePath == null ||
127 (typeof filePath === 'string' && filePath.trim().length === 0)
128 ) {
c510fea7
APA
129 throw new Error('Please specify a file with a worker implementation')
130 }
131 }
132
8d3782fa
JB
133 private checkNumberOfWorkers (numberOfWorkers: number): void {
134 if (numberOfWorkers == null) {
135 throw new Error(
136 'Cannot instantiate a pool without specifying the number of workers'
137 )
78cea37e 138 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 139 throw new TypeError(
0d80593b 140 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
141 )
142 } else if (numberOfWorkers < 0) {
473c717a 143 throw new RangeError(
8d3782fa
JB
144 'Cannot instantiate a pool with a negative number of workers'
145 )
6b27d407 146 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
147 throw new Error('Cannot instantiate a fixed pool with no worker')
148 }
149 }
150
7c0ba920 151 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
152 if (isPlainObject(opts)) {
153 this.opts.workerChoiceStrategy =
154 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
155 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
156 this.opts.workerChoiceStrategyOptions =
157 opts.workerChoiceStrategyOptions ??
158 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
159 this.checkValidWorkerChoiceStrategyOptions(
160 this.opts.workerChoiceStrategyOptions
161 )
1f68cede 162 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
163 this.opts.enableEvents = opts.enableEvents ?? true
164 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
165 if (this.opts.enableTasksQueue) {
166 this.checkValidTasksQueueOptions(
167 opts.tasksQueueOptions as TasksQueueOptions
168 )
169 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
170 opts.tasksQueueOptions as TasksQueueOptions
171 )
172 }
173 } else {
174 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 175 }
aee46736
JB
176 }
177
178 private checkValidWorkerChoiceStrategy (
179 workerChoiceStrategy: WorkerChoiceStrategy
180 ): void {
181 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 182 throw new Error(
aee46736 183 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
184 )
185 }
7c0ba920
JB
186 }
187
0d80593b
JB
188 private checkValidWorkerChoiceStrategyOptions (
189 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
190 ): void {
191 if (!isPlainObject(workerChoiceStrategyOptions)) {
192 throw new TypeError(
193 'Invalid worker choice strategy options: must be a plain object'
194 )
195 }
49be33fe
JB
196 if (
197 workerChoiceStrategyOptions.weights != null &&
6b27d407 198 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
199 ) {
200 throw new Error(
201 'Invalid worker choice strategy options: must have a weight for each worker node'
202 )
203 }
0d80593b
JB
204 }
205
a20f0ba5
JB
206 private checkValidTasksQueueOptions (
207 tasksQueueOptions: TasksQueueOptions
208 ): void {
0d80593b
JB
209 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
210 throw new TypeError('Invalid tasks queue options: must be a plain object')
211 }
a20f0ba5
JB
212 if ((tasksQueueOptions?.concurrency as number) <= 0) {
213 throw new Error(
214 `Invalid worker tasks concurrency '${
215 tasksQueueOptions.concurrency as number
216 }'`
217 )
218 }
219 }
220
08f3f44c 221 /** @inheritDoc */
6b27d407
JB
222 public get info (): PoolInfo {
223 return {
224 type: this.type,
184855e6 225 worker: this.worker,
6b27d407
JB
226 minSize: this.minSize,
227 maxSize: this.maxSize,
228 workerNodes: this.workerNodes.length,
229 idleWorkerNodes: this.workerNodes.reduce(
230 (accumulator, workerNode) =>
a4e07f72
JB
231 workerNode.workerUsage.tasks.executing === 0
232 ? accumulator + 1
233 : accumulator,
6b27d407
JB
234 0
235 ),
236 busyWorkerNodes: this.workerNodes.reduce(
237 (accumulator, workerNode) =>
a4e07f72
JB
238 workerNode.workerUsage.tasks.executing > 0
239 ? accumulator + 1
240 : accumulator,
6b27d407
JB
241 0
242 ),
a4e07f72 243 executedTasks: this.workerNodes.reduce(
6b27d407 244 (accumulator, workerNode) =>
a4e07f72
JB
245 accumulator + workerNode.workerUsage.tasks.executed,
246 0
247 ),
248 executingTasks: this.workerNodes.reduce(
249 (accumulator, workerNode) =>
250 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
251 0
252 ),
253 queuedTasks: this.workerNodes.reduce(
254 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
255 0
256 ),
257 maxQueuedTasks: this.workerNodes.reduce(
258 (accumulator, workerNode) =>
259 accumulator + workerNode.tasksQueue.maxSize,
260 0
a4e07f72
JB
261 ),
262 failedTasks: this.workerNodes.reduce(
263 (accumulator, workerNode) =>
264 accumulator + workerNode.workerUsage.tasks.failed,
265 0
6b27d407
JB
266 )
267 }
268 }
08f3f44c 269
8881ae32
JB
270 /**
271 * Pool type.
272 *
273 * If it is `'dynamic'`, it provides the `max` property.
274 */
275 protected abstract get type (): PoolType
276
184855e6
JB
277 /**
278 * Gets the worker type.
279 */
280 protected abstract get worker (): WorkerType
281
c2ade475 282 /**
6b27d407 283 * Pool minimum size.
c2ade475 284 */
6b27d407 285 protected abstract get minSize (): number
ff733df7
JB
286
287 /**
6b27d407 288 * Pool maximum size.
ff733df7 289 */
6b27d407 290 protected abstract get maxSize (): number
a35560ba 291
ffcbbad8 292 /**
f06e48d8 293 * Gets the given worker its worker node key.
ffcbbad8
JB
294 *
295 * @param worker - The worker.
f06e48d8 296 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 297 */
f06e48d8
JB
298 private getWorkerNodeKey (worker: Worker): number {
299 return this.workerNodes.findIndex(
300 workerNode => workerNode.worker === worker
301 )
bf9549ae
JB
302 }
303
afc003b2 304 /** @inheritDoc */
a35560ba 305 public setWorkerChoiceStrategy (
59219cbb
JB
306 workerChoiceStrategy: WorkerChoiceStrategy,
307 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 308 ): void {
aee46736 309 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 310 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
311 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
312 this.opts.workerChoiceStrategy
313 )
314 if (workerChoiceStrategyOptions != null) {
315 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
316 }
0ebe2a9f 317 for (const workerNode of this.workerNodes) {
8604aaab
JB
318 this.setWorkerNodeTasksUsage(
319 workerNode,
320 this.getWorkerUsage(workerNode.worker)
321 )
b6b32453 322 this.setWorkerStatistics(workerNode.worker)
59219cbb 323 }
a20f0ba5
JB
324 }
325
326 /** @inheritDoc */
327 public setWorkerChoiceStrategyOptions (
328 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
329 ): void {
0d80593b 330 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
331 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
332 this.workerChoiceStrategyContext.setOptions(
333 this.opts.workerChoiceStrategyOptions
a35560ba
S
334 )
335 }
336
a20f0ba5 337 /** @inheritDoc */
8f52842f
JB
338 public enableTasksQueue (
339 enable: boolean,
340 tasksQueueOptions?: TasksQueueOptions
341 ): void {
a20f0ba5 342 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 343 this.flushTasksQueues()
a20f0ba5
JB
344 }
345 this.opts.enableTasksQueue = enable
8f52842f 346 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
347 }
348
349 /** @inheritDoc */
8f52842f 350 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 351 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
352 this.checkValidTasksQueueOptions(tasksQueueOptions)
353 this.opts.tasksQueueOptions =
354 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 355 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
356 delete this.opts.tasksQueueOptions
357 }
358 }
359
360 private buildTasksQueueOptions (
361 tasksQueueOptions: TasksQueueOptions
362 ): TasksQueueOptions {
363 return {
364 concurrency: tasksQueueOptions?.concurrency ?? 1
365 }
366 }
367
c319c66b
JB
368 /**
369 * Whether the pool is full or not.
370 *
371 * The pool filling boolean status.
372 */
dea903a8
JB
373 protected get full (): boolean {
374 return this.workerNodes.length >= this.maxSize
375 }
c2ade475 376
c319c66b
JB
377 /**
378 * Whether the pool is busy or not.
379 *
380 * The pool busyness boolean status.
381 */
382 protected abstract get busy (): boolean
7c0ba920 383
c2ade475 384 protected internalBusy (): boolean {
e0ae6100
JB
385 return (
386 this.workerNodes.findIndex(workerNode => {
a4e07f72 387 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
388 }) === -1
389 )
cb70b19d
JB
390 }
391
afc003b2 392 /** @inheritDoc */
a86b6df1 393 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 394 const timestamp = performance.now()
20dcad1a 395 const workerNodeKey = this.chooseWorkerNode()
adc3c320 396 const submittedTask: Task<Data> = {
a86b6df1 397 name,
e5a5c0fc
JB
398 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
399 data: data ?? ({} as Data),
b6b32453 400 timestamp,
adc3c320
JB
401 id: crypto.randomUUID()
402 }
2e81254d 403 const res = new Promise<Response>((resolve, reject) => {
02706357 404 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
405 resolve,
406 reject,
20dcad1a 407 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
408 })
409 })
ff733df7
JB
410 if (
411 this.opts.enableTasksQueue === true &&
7171d33f 412 (this.busy ||
a4e07f72 413 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 414 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 415 .concurrency as number))
ff733df7 416 ) {
26a929d7
JB
417 this.enqueueTask(workerNodeKey, submittedTask)
418 } else {
2e81254d 419 this.executeTask(workerNodeKey, submittedTask)
adc3c320 420 }
b0d6ed8f 421 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 422 this.checkAndEmitEvents()
78cea37e 423 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
424 return res
425 }
c97c7edb 426
afc003b2 427 /** @inheritDoc */
c97c7edb 428 public async destroy (): Promise<void> {
1fbcaa7c 429 await Promise.all(
875a7c37
JB
430 this.workerNodes.map(async (workerNode, workerNodeKey) => {
431 this.flushTasksQueue(workerNodeKey)
47aacbaa 432 // FIXME: wait for tasks to be finished
f06e48d8 433 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
434 })
435 )
c97c7edb
S
436 }
437
4a6952ff 438 /**
f06e48d8 439 * Shutdowns the given worker.
4a6952ff 440 *
f06e48d8 441 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
442 */
443 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 444
729c563d 445 /**
2e81254d 446 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 447 * Can be overridden
afc003b2
JB
448 *
449 * @virtual
729c563d 450 */
280c2a77 451 protected setupHook (): void {
d99ba5a8 452 // Intentionally empty
280c2a77 453 }
c97c7edb 454
729c563d 455 /**
280c2a77
S
456 * Should return whether the worker is the main worker or not.
457 */
458 protected abstract isMain (): boolean
459
460 /**
2e81254d 461 * Hook executed before the worker task execution.
bf9549ae 462 * Can be overridden.
729c563d 463 *
f06e48d8 464 * @param workerNodeKey - The worker node key.
1c6fe997 465 * @param task - The task to execute.
729c563d 466 */
1c6fe997
JB
467 protected beforeTaskExecutionHook (
468 workerNodeKey: number,
469 task: Task<Data>
470 ): void {
471 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
472 ++workerUsage.tasks.executing
473 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
474 }
475
c01733f1 476 /**
2e81254d 477 * Hook executed after the worker task execution.
bf9549ae 478 * Can be overridden.
c01733f1 479 *
c923ce56 480 * @param worker - The worker.
38e795c1 481 * @param message - The received message.
c01733f1 482 */
2e81254d 483 protected afterTaskExecutionHook (
c923ce56 484 worker: Worker,
2740a743 485 message: MessageValue<Response>
bf9549ae 486 ): void {
a4e07f72
JB
487 const workerUsage =
488 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
489 const workerTaskStatistics = workerUsage.tasks
490 --workerTaskStatistics.executing
491 ++workerTaskStatistics.executed
82f36766 492 if (message.taskError != null) {
a4e07f72 493 ++workerTaskStatistics.failed
2740a743 494 }
a4e07f72 495 this.updateRunTimeWorkerUsage(workerUsage, message)
a4e07f72 496 this.updateEluWorkerUsage(workerUsage, message)
f8eb0a2a
JB
497 }
498
a4e07f72
JB
499 private updateRunTimeWorkerUsage (
500 workerUsage: WorkerUsage,
f8eb0a2a
JB
501 message: MessageValue<Response>
502 ): void {
87de9ff5
JB
503 if (
504 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 505 .aggregate
87de9ff5 506 ) {
932fc8be 507 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 508 if (
932fc8be
JB
509 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
510 .average &&
a4e07f72 511 workerUsage.tasks.executed !== 0
c6bd2650 512 ) {
a4e07f72 513 workerUsage.runTime.average =
932fc8be 514 workerUsage.runTime.aggregate / workerUsage.tasks.executed
3032893a 515 }
3fa4cdd2 516 if (
932fc8be
JB
517 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
518 .median &&
d715b7bc 519 message.taskPerformance?.runTime != null
3fa4cdd2 520 ) {
a4e07f72
JB
521 workerUsage.runTime.history.push(message.taskPerformance.runTime)
522 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 523 }
3032893a 524 }
f8eb0a2a
JB
525 }
526
a4e07f72
JB
527 private updateWaitTimeWorkerUsage (
528 workerUsage: WorkerUsage,
1c6fe997 529 task: Task<Data>
f8eb0a2a 530 ): void {
1c6fe997
JB
531 const timestamp = performance.now()
532 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
533 if (
534 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 535 .aggregate
87de9ff5 536 ) {
932fc8be 537 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 538 if (
87de9ff5 539 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 540 .waitTime.average &&
a4e07f72 541 workerUsage.tasks.executed !== 0
09a6305f 542 ) {
a4e07f72 543 workerUsage.waitTime.average =
932fc8be 544 workerUsage.waitTime.aggregate / workerUsage.tasks.executed
09a6305f
JB
545 }
546 if (
87de9ff5 547 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 548 .waitTime.median &&
1c6fe997 549 taskWaitTime != null
09a6305f 550 ) {
1c6fe997 551 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 552 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 553 }
0567595a 554 }
c01733f1 555 }
556
a4e07f72 557 private updateEluWorkerUsage (
5df69fab 558 workerUsage: WorkerUsage,
62c15a68
JB
559 message: MessageValue<Response>
560 ): void {
5df69fab
JB
561 if (
562 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
563 .aggregate
564 ) {
565 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
566 workerUsage.elu.idle.aggregate =
567 workerUsage.elu.idle.aggregate + message.taskPerformance.elu.idle
568 workerUsage.elu.active.aggregate =
569 workerUsage.elu.active.aggregate + message.taskPerformance.elu.active
570 workerUsage.elu.utilization =
571 (workerUsage.elu.utilization +
572 message.taskPerformance.elu.utilization) /
573 2
574 } else if (message.taskPerformance?.elu != null) {
575 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
576 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
577 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
578 }
d715b7bc 579 if (
5df69fab
JB
580 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
581 .average &&
582 workerUsage.tasks.executed !== 0
583 ) {
584 workerUsage.elu.idle.average =
585 workerUsage.elu.idle.aggregate / workerUsage.tasks.executed
586 workerUsage.elu.active.average =
587 workerUsage.elu.active.aggregate / workerUsage.tasks.executed
588 }
589 if (
590 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
591 .median &&
d715b7bc
JB
592 message.taskPerformance?.elu != null
593 ) {
5df69fab
JB
594 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
595 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
596 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
597 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
598 }
599 }
600 }
601
280c2a77 602 /**
f06e48d8 603 * Chooses a worker node for the next task.
280c2a77 604 *
20dcad1a 605 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 606 *
20dcad1a 607 * @returns The worker node key
280c2a77 608 */
20dcad1a 609 protected chooseWorkerNode (): number {
f06e48d8 610 let workerNodeKey: number
6b27d407 611 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
612 const workerCreated = this.createAndSetupWorker()
613 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 614 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
615 if (
616 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 617 (message.kill != null &&
a4e07f72
JB
618 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
619 .executing === 0)
17393ac8 620 ) {
ff733df7 621 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 622 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 623 // FIXME: wait for tasks to be finished
7c5a1080 624 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
625 }
626 })
adc3c320 627 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 628 } else {
f06e48d8 629 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 630 }
20dcad1a 631 return workerNodeKey
c97c7edb
S
632 }
633
280c2a77 634 /**
675bb809 635 * Sends a message to the given worker.
280c2a77 636 *
38e795c1
JB
637 * @param worker - The worker which should receive the message.
638 * @param message - The message.
280c2a77
S
639 */
640 protected abstract sendToWorker (
641 worker: Worker,
642 message: MessageValue<Data>
643 ): void
644
4a6952ff 645 /**
f06e48d8 646 * Registers a listener callback on the given worker.
4a6952ff 647 *
38e795c1
JB
648 * @param worker - The worker which should register a listener.
649 * @param listener - The message listener callback.
4a6952ff
JB
650 */
651 protected abstract registerWorkerMessageListener<
4f7fa42a 652 Message extends Data | Response
78cea37e 653 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 654
729c563d
S
655 /**
656 * Returns a newly created worker.
657 */
280c2a77 658 protected abstract createWorker (): Worker
c97c7edb 659
729c563d 660 /**
f06e48d8 661 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 662 *
38e795c1 663 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 664 *
38e795c1 665 * @param worker - The newly created worker.
729c563d 666 */
280c2a77 667 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 668
4a6952ff 669 /**
f06e48d8 670 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
671 *
672 * @returns New, completely set up worker.
673 */
674 protected createAndSetupWorker (): Worker {
bdacc2d2 675 const worker = this.createWorker()
280c2a77 676
35cf1c03 677 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 678 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
679 worker.on('error', error => {
680 if (this.emitter != null) {
681 this.emitter.emit(PoolEvents.error, error)
682 }
683 })
5baee0d7
JB
684 worker.on('error', () => {
685 if (this.opts.restartWorkerOnError === true) {
1f68cede 686 this.createAndSetupWorker()
5baee0d7
JB
687 }
688 })
a35560ba
S
689 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
690 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 691 worker.once('exit', () => {
f06e48d8 692 this.removeWorkerNode(worker)
a974afa6 693 })
280c2a77 694
f06e48d8 695 this.pushWorkerNode(worker)
280c2a77 696
b6b32453
JB
697 this.setWorkerStatistics(worker)
698
280c2a77
S
699 this.afterWorkerSetup(worker)
700
c97c7edb
S
701 return worker
702 }
be0676b3
APA
703
704 /**
ff733df7 705 * This function is the listener registered for each worker message.
be0676b3 706 *
bdacc2d2 707 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
708 */
709 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 710 return message => {
b1989cfd 711 if (message.id != null) {
a3445496 712 // Task execution response received
2740a743 713 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 714 if (promiseResponse != null) {
82f36766
JB
715 if (message.taskError != null) {
716 promiseResponse.reject(message.taskError.message)
91ee39ed 717 if (this.emitter != null) {
82f36766 718 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 719 }
a05c10de 720 } else {
2740a743 721 promiseResponse.resolve(message.data as Response)
a05c10de 722 }
2e81254d 723 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 724 this.promiseResponseMap.delete(message.id)
ff733df7
JB
725 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
726 if (
727 this.opts.enableTasksQueue === true &&
416fd65c 728 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 729 ) {
2e81254d
JB
730 this.executeTask(
731 workerNodeKey,
ff733df7
JB
732 this.dequeueTask(workerNodeKey) as Task<Data>
733 )
734 }
be0676b3
APA
735 }
736 }
737 }
be0676b3 738 }
7c0ba920 739
ff733df7 740 private checkAndEmitEvents (): void {
1f68cede 741 if (this.emitter != null) {
ff733df7 742 if (this.busy) {
6b27d407 743 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 744 }
6b27d407
JB
745 if (this.type === PoolTypes.dynamic && this.full) {
746 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 747 }
164d950a
JB
748 }
749 }
750
0ebe2a9f
JB
751 /**
752 * Sets the given worker node its tasks usage in the pool.
753 *
754 * @param workerNode - The worker node.
a4e07f72 755 * @param workerUsage - The worker usage.
0ebe2a9f
JB
756 */
757 private setWorkerNodeTasksUsage (
758 workerNode: WorkerNode<Worker, Data>,
a4e07f72 759 workerUsage: WorkerUsage
0ebe2a9f 760 ): void {
a4e07f72 761 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
762 }
763
a05c10de 764 /**
f06e48d8 765 * Pushes the given worker in the pool worker nodes.
ea7a90d3 766 *
38e795c1 767 * @param worker - The worker.
f06e48d8 768 * @returns The worker nodes length.
ea7a90d3 769 */
f06e48d8
JB
770 private pushWorkerNode (worker: Worker): number {
771 return this.workerNodes.push({
ffcbbad8 772 worker,
8604aaab 773 workerUsage: this.getWorkerUsage(worker),
29ee7e9a 774 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
775 })
776 }
c923ce56 777
8604aaab
JB
778 // /**
779 // * Sets the given worker in the pool worker nodes.
780 // *
781 // * @param workerNodeKey - The worker node key.
782 // * @param worker - The worker.
783 // * @param workerUsage - The worker usage.
784 // * @param tasksQueue - The worker task queue.
785 // */
786 // private setWorkerNode (
787 // workerNodeKey: number,
788 // worker: Worker,
789 // workerUsage: WorkerUsage,
790 // tasksQueue: Queue<Task<Data>>
791 // ): void {
792 // this.workerNodes[workerNodeKey] = {
793 // worker,
794 // workerUsage,
795 // tasksQueue
796 // }
797 // }
51fe3d3c
JB
798
799 /**
f06e48d8 800 * Removes the given worker from the pool worker nodes.
51fe3d3c 801 *
f06e48d8 802 * @param worker - The worker.
51fe3d3c 803 */
416fd65c 804 private removeWorkerNode (worker: Worker): void {
f06e48d8 805 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
806 if (workerNodeKey !== -1) {
807 this.workerNodes.splice(workerNodeKey, 1)
808 this.workerChoiceStrategyContext.remove(workerNodeKey)
809 }
51fe3d3c 810 }
adc3c320 811
2e81254d 812 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 813 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
814 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
815 }
816
f9f00b5f 817 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 818 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
819 }
820
416fd65c 821 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 822 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
823 }
824
416fd65c 825 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 826 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 827 }
ff733df7 828
416fd65c
JB
829 private flushTasksQueue (workerNodeKey: number): void {
830 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
831 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
832 this.executeTask(
833 workerNodeKey,
834 this.dequeueTask(workerNodeKey) as Task<Data>
835 )
ff733df7 836 }
ff733df7
JB
837 }
838 }
839
ef41a6e6
JB
840 private flushTasksQueues (): void {
841 for (const [workerNodeKey] of this.workerNodes.entries()) {
842 this.flushTasksQueue(workerNodeKey)
843 }
844 }
b6b32453
JB
845
846 private setWorkerStatistics (worker: Worker): void {
847 this.sendToWorker(worker, {
848 statistics: {
87de9ff5
JB
849 runTime:
850 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 851 .runTime.aggregate,
87de9ff5 852 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 853 .elu.aggregate
b6b32453
JB
854 }
855 })
856 }
8604aaab
JB
857
858 private getWorkerUsage (worker: Worker): WorkerUsage {
859 return {
1c6fe997 860 tasks: this.getTaskStatistics(worker),
8604aaab 861 runTime: {
932fc8be 862 aggregate: 0,
8604aaab
JB
863 average: 0,
864 median: 0,
865 history: new CircularArray()
866 },
867 waitTime: {
932fc8be 868 aggregate: 0,
8604aaab
JB
869 average: 0,
870 median: 0,
871 history: new CircularArray()
872 },
5df69fab
JB
873 elu: {
874 idle: {
875 aggregate: 0,
876 average: 0,
877 median: 0,
878 history: new CircularArray()
879 },
880 active: {
881 aggregate: 0,
882 average: 0,
883 median: 0,
884 history: new CircularArray()
885 },
886 utilization: 0
887 }
8604aaab
JB
888 }
889 }
890
1c6fe997
JB
891 private getTaskStatistics (worker: Worker): TaskStatistics {
892 const queueSize =
893 this.workerNodes[this.getWorkerNodeKey(worker)]?.tasksQueue?.size
8604aaab
JB
894 return {
895 executed: 0,
896 executing: 0,
897 get queued (): number {
1c6fe997 898 return queueSize ?? 0
8604aaab
JB
899 },
900 failed: 0
901 }
902 }
c97c7edb 903}