refactor: factor out measurement statistics requirements default
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
e102732c
JB
24import type {
25 IWorker,
26 MessageHandler,
27 Task,
28 WorkerNode,
29 WorkerUsage
30} from './worker'
a35560ba 31import {
f0d7f803 32 Measurements,
a35560ba 33 WorkerChoiceStrategies,
a20f0ba5
JB
34 type WorkerChoiceStrategy,
35 type WorkerChoiceStrategyOptions
bdaf31cd
JB
36} from './selection-strategies/selection-strategies-types'
37import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 38
729c563d 39/**
ea7a90d3 40 * Base class that implements some shared logic for all poolifier pools.
729c563d 41 *
38e795c1 42 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
43 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
44 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 45 */
c97c7edb 46export abstract class AbstractPool<
f06e48d8 47 Worker extends IWorker,
d3c8a1a8
S
48 Data = unknown,
49 Response = unknown
c4855468 50> implements IPool<Worker, Data, Response> {
afc003b2 51 /** @inheritDoc */
f06e48d8 52 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 53
afc003b2 54 /** @inheritDoc */
7c0ba920
JB
55 public readonly emitter?: PoolEmitter
56
be0676b3 57 /**
a3445496 58 * The execution response promise map.
be0676b3 59 *
2740a743 60 * - `key`: The message id of each submitted task.
a3445496 61 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 62 *
a3445496 63 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 64 */
c923ce56
JB
65 protected promiseResponseMap: Map<
66 string,
67 PromiseResponseWrapper<Worker, Response>
68 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 69
a35560ba 70 /**
51fe3d3c 71 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
72 */
73 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
74 Worker,
75 Data,
76 Response
a35560ba
S
77 >
78
729c563d
S
79 /**
80 * Constructs a new poolifier pool.
81 *
38e795c1 82 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 83 * @param filePath - Path to the worker file.
38e795c1 84 * @param opts - Options for the pool.
729c563d 85 */
c97c7edb 86 public constructor (
b4213b7f
JB
87 protected readonly numberOfWorkers: number,
88 protected readonly filePath: string,
89 protected readonly opts: PoolOptions<Worker>
c97c7edb 90 ) {
78cea37e 91 if (!this.isMain()) {
c97c7edb
S
92 throw new Error('Cannot start a pool from a worker!')
93 }
8d3782fa 94 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 95 this.checkFilePath(this.filePath)
7c0ba920 96 this.checkPoolOptions(this.opts)
1086026a 97
7254e419
JB
98 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
99 this.executeTask = this.executeTask.bind(this)
100 this.enqueueTask = this.enqueueTask.bind(this)
101 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 102
6bd72cd0 103 if (this.opts.enableEvents === true) {
7c0ba920
JB
104 this.emitter = new PoolEmitter()
105 }
d59df138
JB
106 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
107 Worker,
108 Data,
109 Response
da309861
JB
110 >(
111 this,
112 this.opts.workerChoiceStrategy,
113 this.opts.workerChoiceStrategyOptions
114 )
b6b32453
JB
115
116 this.setupHook()
117
118 for (let i = 1; i <= this.numberOfWorkers; i++) {
119 this.createAndSetupWorker()
120 }
c97c7edb
S
121 }
122
a35560ba 123 private checkFilePath (filePath: string): void {
ffcbbad8
JB
124 if (
125 filePath == null ||
126 (typeof filePath === 'string' && filePath.trim().length === 0)
127 ) {
c510fea7
APA
128 throw new Error('Please specify a file with a worker implementation')
129 }
130 }
131
8d3782fa
JB
132 private checkNumberOfWorkers (numberOfWorkers: number): void {
133 if (numberOfWorkers == null) {
134 throw new Error(
135 'Cannot instantiate a pool without specifying the number of workers'
136 )
78cea37e 137 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 138 throw new TypeError(
0d80593b 139 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
140 )
141 } else if (numberOfWorkers < 0) {
473c717a 142 throw new RangeError(
8d3782fa
JB
143 'Cannot instantiate a pool with a negative number of workers'
144 )
6b27d407 145 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
146 throw new Error('Cannot instantiate a fixed pool with no worker')
147 }
148 }
149
7c0ba920 150 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
151 if (isPlainObject(opts)) {
152 this.opts.workerChoiceStrategy =
153 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
154 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
155 this.opts.workerChoiceStrategyOptions =
156 opts.workerChoiceStrategyOptions ??
157 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
158 this.checkValidWorkerChoiceStrategyOptions(
159 this.opts.workerChoiceStrategyOptions
160 )
1f68cede 161 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
162 this.opts.enableEvents = opts.enableEvents ?? true
163 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
164 if (this.opts.enableTasksQueue) {
165 this.checkValidTasksQueueOptions(
166 opts.tasksQueueOptions as TasksQueueOptions
167 )
168 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
169 opts.tasksQueueOptions as TasksQueueOptions
170 )
171 }
172 } else {
173 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 174 }
aee46736
JB
175 }
176
177 private checkValidWorkerChoiceStrategy (
178 workerChoiceStrategy: WorkerChoiceStrategy
179 ): void {
180 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 181 throw new Error(
aee46736 182 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
183 )
184 }
7c0ba920
JB
185 }
186
0d80593b
JB
187 private checkValidWorkerChoiceStrategyOptions (
188 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
189 ): void {
190 if (!isPlainObject(workerChoiceStrategyOptions)) {
191 throw new TypeError(
192 'Invalid worker choice strategy options: must be a plain object'
193 )
194 }
49be33fe
JB
195 if (
196 workerChoiceStrategyOptions.weights != null &&
6b27d407 197 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
198 ) {
199 throw new Error(
200 'Invalid worker choice strategy options: must have a weight for each worker node'
201 )
202 }
f0d7f803
JB
203 if (
204 workerChoiceStrategyOptions.measurement != null &&
205 !Object.values(Measurements).includes(
206 workerChoiceStrategyOptions.measurement
207 )
208 ) {
209 throw new Error(
210 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
211 )
212 }
0d80593b
JB
213 }
214
a20f0ba5
JB
215 private checkValidTasksQueueOptions (
216 tasksQueueOptions: TasksQueueOptions
217 ): void {
0d80593b
JB
218 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
219 throw new TypeError('Invalid tasks queue options: must be a plain object')
220 }
f0d7f803
JB
221 if (
222 tasksQueueOptions?.concurrency != null &&
223 !Number.isSafeInteger(tasksQueueOptions.concurrency)
224 ) {
225 throw new TypeError(
226 'Invalid worker tasks concurrency: must be an integer'
227 )
228 }
229 if (
230 tasksQueueOptions?.concurrency != null &&
231 tasksQueueOptions.concurrency <= 0
232 ) {
a20f0ba5 233 throw new Error(
f0d7f803 234 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
235 )
236 }
237 }
238
08f3f44c 239 /** @inheritDoc */
6b27d407
JB
240 public get info (): PoolInfo {
241 return {
242 type: this.type,
184855e6 243 worker: this.worker,
6b27d407
JB
244 minSize: this.minSize,
245 maxSize: this.maxSize,
246 workerNodes: this.workerNodes.length,
247 idleWorkerNodes: this.workerNodes.reduce(
248 (accumulator, workerNode) =>
a4e07f72
JB
249 workerNode.workerUsage.tasks.executing === 0
250 ? accumulator + 1
251 : accumulator,
6b27d407
JB
252 0
253 ),
254 busyWorkerNodes: this.workerNodes.reduce(
255 (accumulator, workerNode) =>
a4e07f72
JB
256 workerNode.workerUsage.tasks.executing > 0
257 ? accumulator + 1
258 : accumulator,
6b27d407
JB
259 0
260 ),
a4e07f72 261 executedTasks: this.workerNodes.reduce(
6b27d407 262 (accumulator, workerNode) =>
a4e07f72
JB
263 accumulator + workerNode.workerUsage.tasks.executed,
264 0
265 ),
266 executingTasks: this.workerNodes.reduce(
267 (accumulator, workerNode) =>
268 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
269 0
270 ),
271 queuedTasks: this.workerNodes.reduce(
df593701
JB
272 (accumulator, workerNode) =>
273 accumulator + workerNode.workerUsage.tasks.queued,
6b27d407
JB
274 0
275 ),
276 maxQueuedTasks: this.workerNodes.reduce(
277 (accumulator, workerNode) =>
df593701 278 accumulator + workerNode.workerUsage.tasks.maxQueued,
6b27d407 279 0
a4e07f72
JB
280 ),
281 failedTasks: this.workerNodes.reduce(
282 (accumulator, workerNode) =>
283 accumulator + workerNode.workerUsage.tasks.failed,
284 0
6b27d407
JB
285 )
286 }
287 }
08f3f44c 288
8881ae32
JB
289 /**
290 * Pool type.
291 *
292 * If it is `'dynamic'`, it provides the `max` property.
293 */
294 protected abstract get type (): PoolType
295
184855e6
JB
296 /**
297 * Gets the worker type.
298 */
299 protected abstract get worker (): WorkerType
300
c2ade475 301 /**
6b27d407 302 * Pool minimum size.
c2ade475 303 */
6b27d407 304 protected abstract get minSize (): number
ff733df7
JB
305
306 /**
6b27d407 307 * Pool maximum size.
ff733df7 308 */
6b27d407 309 protected abstract get maxSize (): number
a35560ba 310
ffcbbad8 311 /**
f06e48d8 312 * Gets the given worker its worker node key.
ffcbbad8
JB
313 *
314 * @param worker - The worker.
f06e48d8 315 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 316 */
f06e48d8
JB
317 private getWorkerNodeKey (worker: Worker): number {
318 return this.workerNodes.findIndex(
319 workerNode => workerNode.worker === worker
320 )
bf9549ae
JB
321 }
322
afc003b2 323 /** @inheritDoc */
a35560ba 324 public setWorkerChoiceStrategy (
59219cbb
JB
325 workerChoiceStrategy: WorkerChoiceStrategy,
326 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 327 ): void {
aee46736 328 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 329 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
330 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
331 this.opts.workerChoiceStrategy
332 )
333 if (workerChoiceStrategyOptions != null) {
334 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
335 }
9c16fb4b 336 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
337 this.setWorkerNodeTasksUsage(
338 workerNode,
9c16fb4b 339 this.getWorkerUsage(workerNodeKey)
8604aaab 340 )
b6b32453 341 this.setWorkerStatistics(workerNode.worker)
59219cbb 342 }
a20f0ba5
JB
343 }
344
345 /** @inheritDoc */
346 public setWorkerChoiceStrategyOptions (
347 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
348 ): void {
0d80593b 349 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
350 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
351 this.workerChoiceStrategyContext.setOptions(
352 this.opts.workerChoiceStrategyOptions
a35560ba
S
353 )
354 }
355
a20f0ba5 356 /** @inheritDoc */
8f52842f
JB
357 public enableTasksQueue (
358 enable: boolean,
359 tasksQueueOptions?: TasksQueueOptions
360 ): void {
a20f0ba5 361 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 362 this.flushTasksQueues()
a20f0ba5
JB
363 }
364 this.opts.enableTasksQueue = enable
8f52842f 365 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
366 }
367
368 /** @inheritDoc */
8f52842f 369 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 370 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
371 this.checkValidTasksQueueOptions(tasksQueueOptions)
372 this.opts.tasksQueueOptions =
373 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 374 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
375 delete this.opts.tasksQueueOptions
376 }
377 }
378
379 private buildTasksQueueOptions (
380 tasksQueueOptions: TasksQueueOptions
381 ): TasksQueueOptions {
382 return {
383 concurrency: tasksQueueOptions?.concurrency ?? 1
384 }
385 }
386
c319c66b
JB
387 /**
388 * Whether the pool is full or not.
389 *
390 * The pool filling boolean status.
391 */
dea903a8
JB
392 protected get full (): boolean {
393 return this.workerNodes.length >= this.maxSize
394 }
c2ade475 395
c319c66b
JB
396 /**
397 * Whether the pool is busy or not.
398 *
399 * The pool busyness boolean status.
400 */
401 protected abstract get busy (): boolean
7c0ba920 402
6c6afb84
JB
403 /**
404 * Whether worker nodes are executing at least one task.
405 *
406 * @returns Worker nodes busyness boolean status.
407 */
c2ade475 408 protected internalBusy (): boolean {
e0ae6100
JB
409 return (
410 this.workerNodes.findIndex(workerNode => {
a4e07f72 411 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
412 }) === -1
413 )
cb70b19d
JB
414 }
415
afc003b2 416 /** @inheritDoc */
a86b6df1 417 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 418 const timestamp = performance.now()
20dcad1a 419 const workerNodeKey = this.chooseWorkerNode()
adc3c320 420 const submittedTask: Task<Data> = {
a86b6df1 421 name,
e5a5c0fc
JB
422 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
423 data: data ?? ({} as Data),
b6b32453 424 timestamp,
adc3c320
JB
425 id: crypto.randomUUID()
426 }
2e81254d 427 const res = new Promise<Response>((resolve, reject) => {
02706357 428 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
429 resolve,
430 reject,
20dcad1a 431 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
432 })
433 })
ff733df7
JB
434 if (
435 this.opts.enableTasksQueue === true &&
7171d33f 436 (this.busy ||
a4e07f72 437 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 438 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 439 .concurrency as number))
ff733df7 440 ) {
26a929d7
JB
441 this.enqueueTask(workerNodeKey, submittedTask)
442 } else {
2e81254d 443 this.executeTask(workerNodeKey, submittedTask)
adc3c320 444 }
ff733df7 445 this.checkAndEmitEvents()
78cea37e 446 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
447 return res
448 }
c97c7edb 449
afc003b2 450 /** @inheritDoc */
c97c7edb 451 public async destroy (): Promise<void> {
1fbcaa7c 452 await Promise.all(
875a7c37
JB
453 this.workerNodes.map(async (workerNode, workerNodeKey) => {
454 this.flushTasksQueue(workerNodeKey)
47aacbaa 455 // FIXME: wait for tasks to be finished
f06e48d8 456 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
457 })
458 )
c97c7edb
S
459 }
460
4a6952ff 461 /**
6c6afb84 462 * Terminates the given worker.
4a6952ff 463 *
f06e48d8 464 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
465 */
466 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 467
729c563d 468 /**
6677a3d3
JB
469 * Setup hook to execute code before worker nodes are created in the abstract constructor.
470 * Can be overridden.
afc003b2
JB
471 *
472 * @virtual
729c563d 473 */
280c2a77 474 protected setupHook (): void {
d99ba5a8 475 // Intentionally empty
280c2a77 476 }
c97c7edb 477
729c563d 478 /**
280c2a77
S
479 * Should return whether the worker is the main worker or not.
480 */
481 protected abstract isMain (): boolean
482
483 /**
2e81254d 484 * Hook executed before the worker task execution.
bf9549ae 485 * Can be overridden.
729c563d 486 *
f06e48d8 487 * @param workerNodeKey - The worker node key.
1c6fe997 488 * @param task - The task to execute.
729c563d 489 */
1c6fe997
JB
490 protected beforeTaskExecutionHook (
491 workerNodeKey: number,
492 task: Task<Data>
493 ): void {
494 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
495 ++workerUsage.tasks.executing
496 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
497 }
498
c01733f1 499 /**
2e81254d 500 * Hook executed after the worker task execution.
bf9549ae 501 * Can be overridden.
c01733f1 502 *
c923ce56 503 * @param worker - The worker.
38e795c1 504 * @param message - The received message.
c01733f1 505 */
2e81254d 506 protected afterTaskExecutionHook (
c923ce56 507 worker: Worker,
2740a743 508 message: MessageValue<Response>
bf9549ae 509 ): void {
a4e07f72
JB
510 const workerUsage =
511 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
f1c06930
JB
512 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
513 this.updateRunTimeWorkerUsage(workerUsage, message)
514 this.updateEluWorkerUsage(workerUsage, message)
515 }
516
517 private updateTaskStatisticsWorkerUsage (
518 workerUsage: WorkerUsage,
519 message: MessageValue<Response>
520 ): void {
a4e07f72
JB
521 const workerTaskStatistics = workerUsage.tasks
522 --workerTaskStatistics.executing
523 ++workerTaskStatistics.executed
82f36766 524 if (message.taskError != null) {
a4e07f72 525 ++workerTaskStatistics.failed
2740a743 526 }
f8eb0a2a
JB
527 }
528
a4e07f72
JB
529 private updateRunTimeWorkerUsage (
530 workerUsage: WorkerUsage,
f8eb0a2a
JB
531 message: MessageValue<Response>
532 ): void {
87de9ff5
JB
533 if (
534 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 535 .aggregate
87de9ff5 536 ) {
932fc8be 537 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 538 if (
932fc8be
JB
539 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
540 .average &&
a4e07f72 541 workerUsage.tasks.executed !== 0
c6bd2650 542 ) {
a4e07f72 543 workerUsage.runTime.average =
f1c06930
JB
544 workerUsage.runTime.aggregate /
545 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 546 }
3fa4cdd2 547 if (
932fc8be
JB
548 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
549 .median &&
d715b7bc 550 message.taskPerformance?.runTime != null
3fa4cdd2 551 ) {
a4e07f72
JB
552 workerUsage.runTime.history.push(message.taskPerformance.runTime)
553 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 554 }
3032893a 555 }
f8eb0a2a
JB
556 }
557
a4e07f72
JB
558 private updateWaitTimeWorkerUsage (
559 workerUsage: WorkerUsage,
1c6fe997 560 task: Task<Data>
f8eb0a2a 561 ): void {
1c6fe997
JB
562 const timestamp = performance.now()
563 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
564 if (
565 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 566 .aggregate
87de9ff5 567 ) {
932fc8be 568 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 569 if (
87de9ff5 570 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 571 .waitTime.average &&
a4e07f72 572 workerUsage.tasks.executed !== 0
09a6305f 573 ) {
a4e07f72 574 workerUsage.waitTime.average =
f1c06930
JB
575 workerUsage.waitTime.aggregate /
576 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
577 }
578 if (
87de9ff5 579 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 580 .waitTime.median &&
1c6fe997 581 taskWaitTime != null
09a6305f 582 ) {
1c6fe997 583 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 584 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 585 }
0567595a 586 }
c01733f1 587 }
588
a4e07f72 589 private updateEluWorkerUsage (
5df69fab 590 workerUsage: WorkerUsage,
62c15a68
JB
591 message: MessageValue<Response>
592 ): void {
5df69fab
JB
593 if (
594 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
595 .aggregate
596 ) {
597 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
598 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
599 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
600 workerUsage.elu.utilization =
601 (workerUsage.elu.utilization +
602 message.taskPerformance.elu.utilization) /
603 2
604 } else if (message.taskPerformance?.elu != null) {
605 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
606 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
607 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
608 }
d715b7bc 609 if (
5df69fab
JB
610 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
611 .average &&
612 workerUsage.tasks.executed !== 0
613 ) {
f1c06930
JB
614 const executedTasks =
615 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 616 workerUsage.elu.idle.average =
f1c06930 617 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 618 workerUsage.elu.active.average =
f1c06930 619 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
620 }
621 if (
622 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
623 .median &&
d715b7bc
JB
624 message.taskPerformance?.elu != null
625 ) {
5df69fab
JB
626 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
627 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
628 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
629 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
630 }
631 }
632 }
633
280c2a77 634 /**
f06e48d8 635 * Chooses a worker node for the next task.
280c2a77 636 *
6c6afb84 637 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 638 *
20dcad1a 639 * @returns The worker node key
280c2a77 640 */
6c6afb84 641 private chooseWorkerNode (): number {
930dcf12 642 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
643 const worker = this.createAndSetupDynamicWorker()
644 if (
645 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
646 ) {
647 return this.getWorkerNodeKey(worker)
648 }
17393ac8 649 }
930dcf12
JB
650 return this.workerChoiceStrategyContext.execute()
651 }
652
6c6afb84
JB
653 /**
654 * Conditions for dynamic worker creation.
655 *
656 * @returns Whether to create a dynamic worker or not.
657 */
658 private shallCreateDynamicWorker (): boolean {
930dcf12 659 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
660 }
661
280c2a77 662 /**
675bb809 663 * Sends a message to the given worker.
280c2a77 664 *
38e795c1
JB
665 * @param worker - The worker which should receive the message.
666 * @param message - The message.
280c2a77
S
667 */
668 protected abstract sendToWorker (
669 worker: Worker,
670 message: MessageValue<Data>
671 ): void
672
4a6952ff 673 /**
f06e48d8 674 * Registers a listener callback on the given worker.
4a6952ff 675 *
38e795c1
JB
676 * @param worker - The worker which should register a listener.
677 * @param listener - The message listener callback.
4a6952ff 678 */
e102732c
JB
679 private registerWorkerMessageListener<Message extends Data | Response>(
680 worker: Worker,
681 listener: (message: MessageValue<Message>) => void
682 ): void {
683 worker.on('message', listener as MessageHandler<Worker>)
684 }
c97c7edb 685
729c563d 686 /**
41344292 687 * Creates a new worker.
6c6afb84
JB
688 *
689 * @returns Newly created worker.
729c563d 690 */
280c2a77 691 protected abstract createWorker (): Worker
c97c7edb 692
729c563d 693 /**
f06e48d8 694 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
6677a3d3 695 * Can be overridden.
729c563d 696 *
38e795c1 697 * @param worker - The newly created worker.
729c563d 698 */
6677a3d3 699 protected afterWorkerSetup (worker: Worker): void {
e102732c
JB
700 // Listen to worker messages.
701 this.registerWorkerMessageListener(worker, this.workerListener())
702 }
c97c7edb 703
4a6952ff 704 /**
f06e48d8 705 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
706 *
707 * @returns New, completely set up worker.
708 */
709 protected createAndSetupWorker (): Worker {
bdacc2d2 710 const worker = this.createWorker()
280c2a77 711
35cf1c03 712 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 713 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
714 worker.on('error', error => {
715 if (this.emitter != null) {
716 this.emitter.emit(PoolEvents.error, error)
717 }
5baee0d7 718 if (this.opts.restartWorkerOnError === true) {
1f68cede 719 this.createAndSetupWorker()
5baee0d7
JB
720 }
721 })
a35560ba
S
722 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
723 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 724 worker.once('exit', () => {
f06e48d8 725 this.removeWorkerNode(worker)
a974afa6 726 })
280c2a77 727
f06e48d8 728 this.pushWorkerNode(worker)
280c2a77 729
b6b32453
JB
730 this.setWorkerStatistics(worker)
731
280c2a77
S
732 this.afterWorkerSetup(worker)
733
c97c7edb
S
734 return worker
735 }
be0676b3 736
930dcf12
JB
737 /**
738 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
739 *
740 * @returns New, completely set up dynamic worker.
741 */
742 protected createAndSetupDynamicWorker (): Worker {
743 const worker = this.createAndSetupWorker()
744 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 745 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
746 if (
747 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
748 (message.kill != null &&
749 ((this.opts.enableTasksQueue === false &&
e8b3a5ab
JB
750 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
751 0) ||
7b56f532 752 (this.opts.enableTasksQueue === true &&
e8b3a5ab
JB
753 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
754 0 &&
755 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
756 ) {
757 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
758 void (this.destroyWorker(worker) as Promise<void>)
759 }
760 })
761 return worker
762 }
763
be0676b3 764 /**
ff733df7 765 * This function is the listener registered for each worker message.
be0676b3 766 *
bdacc2d2 767 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
768 */
769 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 770 return message => {
b1989cfd 771 if (message.id != null) {
a3445496 772 // Task execution response received
2740a743 773 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 774 if (promiseResponse != null) {
82f36766 775 if (message.taskError != null) {
91ee39ed 776 if (this.emitter != null) {
82f36766 777 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 778 }
cd9580e7 779 promiseResponse.reject(message.taskError.message)
a05c10de 780 } else {
2740a743 781 promiseResponse.resolve(message.data as Response)
a05c10de 782 }
2e81254d 783 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 784 this.promiseResponseMap.delete(message.id)
ff733df7
JB
785 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
786 if (
787 this.opts.enableTasksQueue === true &&
416fd65c 788 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 789 ) {
2e81254d
JB
790 this.executeTask(
791 workerNodeKey,
ff733df7
JB
792 this.dequeueTask(workerNodeKey) as Task<Data>
793 )
794 }
e5536a06 795 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
796 }
797 }
798 }
be0676b3 799 }
7c0ba920 800
ff733df7 801 private checkAndEmitEvents (): void {
1f68cede 802 if (this.emitter != null) {
ff733df7 803 if (this.busy) {
6b27d407 804 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 805 }
6b27d407
JB
806 if (this.type === PoolTypes.dynamic && this.full) {
807 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 808 }
164d950a
JB
809 }
810 }
811
0ebe2a9f
JB
812 /**
813 * Sets the given worker node its tasks usage in the pool.
814 *
815 * @param workerNode - The worker node.
a4e07f72 816 * @param workerUsage - The worker usage.
0ebe2a9f
JB
817 */
818 private setWorkerNodeTasksUsage (
819 workerNode: WorkerNode<Worker, Data>,
a4e07f72 820 workerUsage: WorkerUsage
0ebe2a9f 821 ): void {
a4e07f72 822 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
823 }
824
a05c10de 825 /**
f06e48d8 826 * Pushes the given worker in the pool worker nodes.
ea7a90d3 827 *
38e795c1 828 * @param worker - The worker.
f06e48d8 829 * @returns The worker nodes length.
ea7a90d3 830 */
f06e48d8 831 private pushWorkerNode (worker: Worker): number {
9c16fb4b 832 this.workerNodes.push({
ffcbbad8 833 worker,
9c16fb4b 834 workerUsage: this.getWorkerUsage(),
29ee7e9a 835 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 836 })
9c16fb4b
JB
837 const workerNodeKey = this.getWorkerNodeKey(worker)
838 this.setWorkerNodeTasksUsage(
839 this.workerNodes[workerNodeKey],
840 this.getWorkerUsage(workerNodeKey)
841 )
842 return this.workerNodes.length
ea7a90d3 843 }
c923ce56 844
8604aaab
JB
845 // /**
846 // * Sets the given worker in the pool worker nodes.
847 // *
848 // * @param workerNodeKey - The worker node key.
849 // * @param worker - The worker.
850 // * @param workerUsage - The worker usage.
851 // * @param tasksQueue - The worker task queue.
852 // */
853 // private setWorkerNode (
854 // workerNodeKey: number,
855 // worker: Worker,
856 // workerUsage: WorkerUsage,
857 // tasksQueue: Queue<Task<Data>>
858 // ): void {
859 // this.workerNodes[workerNodeKey] = {
860 // worker,
861 // workerUsage,
862 // tasksQueue
863 // }
864 // }
51fe3d3c
JB
865
866 /**
f06e48d8 867 * Removes the given worker from the pool worker nodes.
51fe3d3c 868 *
f06e48d8 869 * @param worker - The worker.
51fe3d3c 870 */
416fd65c 871 private removeWorkerNode (worker: Worker): void {
f06e48d8 872 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
873 if (workerNodeKey !== -1) {
874 this.workerNodes.splice(workerNodeKey, 1)
875 this.workerChoiceStrategyContext.remove(workerNodeKey)
876 }
51fe3d3c 877 }
adc3c320 878
2e81254d 879 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 880 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
881 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
882 }
883
f9f00b5f 884 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 885 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
886 }
887
416fd65c 888 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 889 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
890 }
891
416fd65c 892 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 893 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 894 }
ff733df7 895
df593701
JB
896 private tasksMaxQueueSize (workerNodeKey: number): number {
897 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
898 }
899
416fd65c
JB
900 private flushTasksQueue (workerNodeKey: number): void {
901 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
902 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
903 this.executeTask(
904 workerNodeKey,
905 this.dequeueTask(workerNodeKey) as Task<Data>
906 )
ff733df7 907 }
ff733df7 908 }
df593701 909 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
910 }
911
ef41a6e6
JB
912 private flushTasksQueues (): void {
913 for (const [workerNodeKey] of this.workerNodes.entries()) {
914 this.flushTasksQueue(workerNodeKey)
915 }
916 }
b6b32453
JB
917
918 private setWorkerStatistics (worker: Worker): void {
919 this.sendToWorker(worker, {
920 statistics: {
87de9ff5
JB
921 runTime:
922 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 923 .runTime.aggregate,
87de9ff5 924 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 925 .elu.aggregate
b6b32453
JB
926 }
927 })
928 }
8604aaab 929
9c16fb4b 930 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
931 const getTasksQueueSize = (workerNodeKey?: number): number => {
932 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 933 }
df593701
JB
934 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
935 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
936 }
8604aaab 937 return {
9c16fb4b
JB
938 tasks: {
939 executed: 0,
940 executing: 0,
941 get queued (): number {
e3347a5c 942 return getTasksQueueSize(workerNodeKey)
9c16fb4b 943 },
df593701
JB
944 get maxQueued (): number {
945 return getTasksMaxQueueSize(workerNodeKey)
946 },
9c16fb4b
JB
947 failed: 0
948 },
8604aaab 949 runTime: {
932fc8be 950 aggregate: 0,
8604aaab
JB
951 average: 0,
952 median: 0,
953 history: new CircularArray()
954 },
955 waitTime: {
932fc8be 956 aggregate: 0,
8604aaab
JB
957 average: 0,
958 median: 0,
959 history: new CircularArray()
960 },
5df69fab
JB
961 elu: {
962 idle: {
963 aggregate: 0,
964 average: 0,
965 median: 0,
966 history: new CircularArray()
967 },
968 active: {
969 aggregate: 0,
970 average: 0,
971 median: 0,
972 history: new CircularArray()
973 },
974 utilization: 0
975 }
8604aaab
JB
976 }
977 }
c97c7edb 978}