refactor: abstract out measurement statistics
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
a4e07f72 24import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
a35560ba
S
25import {
26 WorkerChoiceStrategies,
a20f0ba5
JB
27 type WorkerChoiceStrategy,
28 type WorkerChoiceStrategyOptions
bdaf31cd
JB
29} from './selection-strategies/selection-strategies-types'
30import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 31
729c563d 32/**
ea7a90d3 33 * Base class that implements some shared logic for all poolifier pools.
729c563d 34 *
38e795c1
JB
35 * @typeParam Worker - Type of worker which manages this pool.
36 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 37 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 38 */
c97c7edb 39export abstract class AbstractPool<
f06e48d8 40 Worker extends IWorker,
d3c8a1a8
S
41 Data = unknown,
42 Response = unknown
c4855468 43> implements IPool<Worker, Data, Response> {
afc003b2 44 /** @inheritDoc */
f06e48d8 45 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 46
afc003b2 47 /** @inheritDoc */
7c0ba920
JB
48 public readonly emitter?: PoolEmitter
49
be0676b3 50 /**
a3445496 51 * The execution response promise map.
be0676b3 52 *
2740a743 53 * - `key`: The message id of each submitted task.
a3445496 54 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 55 *
a3445496 56 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 57 */
c923ce56
JB
58 protected promiseResponseMap: Map<
59 string,
60 PromiseResponseWrapper<Worker, Response>
61 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 62
a35560ba 63 /**
51fe3d3c 64 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 65 *
51fe3d3c 66 * Default to a round robin algorithm.
a35560ba
S
67 */
68 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
69 Worker,
70 Data,
71 Response
a35560ba
S
72 >
73
729c563d
S
74 /**
75 * Constructs a new poolifier pool.
76 *
38e795c1 77 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 78 * @param filePath - Path to the worker file.
38e795c1 79 * @param opts - Options for the pool.
729c563d 80 */
c97c7edb 81 public constructor (
b4213b7f
JB
82 protected readonly numberOfWorkers: number,
83 protected readonly filePath: string,
84 protected readonly opts: PoolOptions<Worker>
c97c7edb 85 ) {
78cea37e 86 if (!this.isMain()) {
c97c7edb
S
87 throw new Error('Cannot start a pool from a worker!')
88 }
8d3782fa 89 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 90 this.checkFilePath(this.filePath)
7c0ba920 91 this.checkPoolOptions(this.opts)
1086026a 92
7254e419
JB
93 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
94 this.executeTask = this.executeTask.bind(this)
95 this.enqueueTask = this.enqueueTask.bind(this)
96 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 97
6bd72cd0 98 if (this.opts.enableEvents === true) {
7c0ba920
JB
99 this.emitter = new PoolEmitter()
100 }
d59df138
JB
101 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
102 Worker,
103 Data,
104 Response
da309861
JB
105 >(
106 this,
107 this.opts.workerChoiceStrategy,
108 this.opts.workerChoiceStrategyOptions
109 )
b6b32453
JB
110
111 this.setupHook()
112
113 for (let i = 1; i <= this.numberOfWorkers; i++) {
114 this.createAndSetupWorker()
115 }
c97c7edb
S
116 }
117
a35560ba 118 private checkFilePath (filePath: string): void {
ffcbbad8
JB
119 if (
120 filePath == null ||
121 (typeof filePath === 'string' && filePath.trim().length === 0)
122 ) {
c510fea7
APA
123 throw new Error('Please specify a file with a worker implementation')
124 }
125 }
126
8d3782fa
JB
127 private checkNumberOfWorkers (numberOfWorkers: number): void {
128 if (numberOfWorkers == null) {
129 throw new Error(
130 'Cannot instantiate a pool without specifying the number of workers'
131 )
78cea37e 132 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 133 throw new TypeError(
0d80593b 134 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
135 )
136 } else if (numberOfWorkers < 0) {
473c717a 137 throw new RangeError(
8d3782fa
JB
138 'Cannot instantiate a pool with a negative number of workers'
139 )
6b27d407 140 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
141 throw new Error('Cannot instantiate a fixed pool with no worker')
142 }
143 }
144
7c0ba920 145 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
146 if (isPlainObject(opts)) {
147 this.opts.workerChoiceStrategy =
148 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
149 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
150 this.opts.workerChoiceStrategyOptions =
151 opts.workerChoiceStrategyOptions ??
152 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
153 this.checkValidWorkerChoiceStrategyOptions(
154 this.opts.workerChoiceStrategyOptions
155 )
1f68cede 156 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
157 this.opts.enableEvents = opts.enableEvents ?? true
158 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
159 if (this.opts.enableTasksQueue) {
160 this.checkValidTasksQueueOptions(
161 opts.tasksQueueOptions as TasksQueueOptions
162 )
163 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
164 opts.tasksQueueOptions as TasksQueueOptions
165 )
166 }
167 } else {
168 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 169 }
aee46736
JB
170 }
171
172 private checkValidWorkerChoiceStrategy (
173 workerChoiceStrategy: WorkerChoiceStrategy
174 ): void {
175 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 176 throw new Error(
aee46736 177 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
178 )
179 }
7c0ba920
JB
180 }
181
0d80593b
JB
182 private checkValidWorkerChoiceStrategyOptions (
183 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
184 ): void {
185 if (!isPlainObject(workerChoiceStrategyOptions)) {
186 throw new TypeError(
187 'Invalid worker choice strategy options: must be a plain object'
188 )
189 }
49be33fe
JB
190 if (
191 workerChoiceStrategyOptions.weights != null &&
6b27d407 192 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
193 ) {
194 throw new Error(
195 'Invalid worker choice strategy options: must have a weight for each worker node'
196 )
197 }
0d80593b
JB
198 }
199
a20f0ba5
JB
200 private checkValidTasksQueueOptions (
201 tasksQueueOptions: TasksQueueOptions
202 ): void {
0d80593b
JB
203 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
204 throw new TypeError('Invalid tasks queue options: must be a plain object')
205 }
a20f0ba5
JB
206 if ((tasksQueueOptions?.concurrency as number) <= 0) {
207 throw new Error(
208 `Invalid worker tasks concurrency '${
209 tasksQueueOptions.concurrency as number
210 }'`
211 )
212 }
213 }
214
08f3f44c 215 /** @inheritDoc */
6b27d407
JB
216 public get info (): PoolInfo {
217 return {
218 type: this.type,
184855e6 219 worker: this.worker,
6b27d407
JB
220 minSize: this.minSize,
221 maxSize: this.maxSize,
222 workerNodes: this.workerNodes.length,
223 idleWorkerNodes: this.workerNodes.reduce(
224 (accumulator, workerNode) =>
a4e07f72
JB
225 workerNode.workerUsage.tasks.executing === 0
226 ? accumulator + 1
227 : accumulator,
6b27d407
JB
228 0
229 ),
230 busyWorkerNodes: this.workerNodes.reduce(
231 (accumulator, workerNode) =>
a4e07f72
JB
232 workerNode.workerUsage.tasks.executing > 0
233 ? accumulator + 1
234 : accumulator,
6b27d407
JB
235 0
236 ),
a4e07f72 237 executedTasks: this.workerNodes.reduce(
6b27d407 238 (accumulator, workerNode) =>
a4e07f72
JB
239 accumulator + workerNode.workerUsage.tasks.executed,
240 0
241 ),
242 executingTasks: this.workerNodes.reduce(
243 (accumulator, workerNode) =>
244 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
245 0
246 ),
247 queuedTasks: this.workerNodes.reduce(
248 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
249 0
250 ),
251 maxQueuedTasks: this.workerNodes.reduce(
252 (accumulator, workerNode) =>
253 accumulator + workerNode.tasksQueue.maxSize,
254 0
a4e07f72
JB
255 ),
256 failedTasks: this.workerNodes.reduce(
257 (accumulator, workerNode) =>
258 accumulator + workerNode.workerUsage.tasks.failed,
259 0
6b27d407
JB
260 )
261 }
262 }
08f3f44c 263
8881ae32
JB
264 /**
265 * Pool type.
266 *
267 * If it is `'dynamic'`, it provides the `max` property.
268 */
269 protected abstract get type (): PoolType
270
184855e6
JB
271 /**
272 * Gets the worker type.
273 */
274 protected abstract get worker (): WorkerType
275
c2ade475 276 /**
6b27d407 277 * Pool minimum size.
c2ade475 278 */
6b27d407 279 protected abstract get minSize (): number
ff733df7
JB
280
281 /**
6b27d407 282 * Pool maximum size.
ff733df7 283 */
6b27d407 284 protected abstract get maxSize (): number
a35560ba 285
ffcbbad8 286 /**
f06e48d8 287 * Gets the given worker its worker node key.
ffcbbad8
JB
288 *
289 * @param worker - The worker.
f06e48d8 290 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 291 */
f06e48d8
JB
292 private getWorkerNodeKey (worker: Worker): number {
293 return this.workerNodes.findIndex(
294 workerNode => workerNode.worker === worker
295 )
bf9549ae
JB
296 }
297
afc003b2 298 /** @inheritDoc */
a35560ba 299 public setWorkerChoiceStrategy (
59219cbb
JB
300 workerChoiceStrategy: WorkerChoiceStrategy,
301 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 302 ): void {
aee46736 303 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 304 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
305 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
306 this.opts.workerChoiceStrategy
307 )
308 if (workerChoiceStrategyOptions != null) {
309 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
310 }
0ebe2a9f 311 for (const workerNode of this.workerNodes) {
f82cd357 312 this.setWorkerNodeTasksUsage(workerNode, {
a4e07f72
JB
313 tasks: {
314 executing: 0,
315 executed: 0,
316 queued:
317 this.opts.enableTasksQueue === true
318 ? workerNode.tasksQueue.size
319 : 0,
320 failed: 0
321 },
322 runTime: {
323 aggregation: 0,
324 average: 0,
325 median: 0,
326 history: new CircularArray()
327 },
328 waitTime: {
329 aggregation: 0,
330 average: 0,
331 median: 0,
332 history: new CircularArray()
333 },
62c15a68 334 elu: undefined
f82cd357 335 })
b6b32453 336 this.setWorkerStatistics(workerNode.worker)
59219cbb 337 }
a20f0ba5
JB
338 }
339
340 /** @inheritDoc */
341 public setWorkerChoiceStrategyOptions (
342 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
343 ): void {
0d80593b 344 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
345 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
346 this.workerChoiceStrategyContext.setOptions(
347 this.opts.workerChoiceStrategyOptions
a35560ba
S
348 )
349 }
350
a20f0ba5 351 /** @inheritDoc */
8f52842f
JB
352 public enableTasksQueue (
353 enable: boolean,
354 tasksQueueOptions?: TasksQueueOptions
355 ): void {
a20f0ba5 356 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 357 this.flushTasksQueues()
a20f0ba5
JB
358 }
359 this.opts.enableTasksQueue = enable
8f52842f 360 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
361 }
362
363 /** @inheritDoc */
8f52842f 364 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 365 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
366 this.checkValidTasksQueueOptions(tasksQueueOptions)
367 this.opts.tasksQueueOptions =
368 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 369 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
370 delete this.opts.tasksQueueOptions
371 }
372 }
373
374 private buildTasksQueueOptions (
375 tasksQueueOptions: TasksQueueOptions
376 ): TasksQueueOptions {
377 return {
378 concurrency: tasksQueueOptions?.concurrency ?? 1
379 }
380 }
381
c319c66b
JB
382 /**
383 * Whether the pool is full or not.
384 *
385 * The pool filling boolean status.
386 */
dea903a8
JB
387 protected get full (): boolean {
388 return this.workerNodes.length >= this.maxSize
389 }
c2ade475 390
c319c66b
JB
391 /**
392 * Whether the pool is busy or not.
393 *
394 * The pool busyness boolean status.
395 */
396 protected abstract get busy (): boolean
7c0ba920 397
c2ade475 398 protected internalBusy (): boolean {
e0ae6100
JB
399 return (
400 this.workerNodes.findIndex(workerNode => {
a4e07f72 401 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
402 }) === -1
403 )
cb70b19d
JB
404 }
405
afc003b2 406 /** @inheritDoc */
a86b6df1 407 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 408 const timestamp = performance.now()
20dcad1a 409 const workerNodeKey = this.chooseWorkerNode()
adc3c320 410 const submittedTask: Task<Data> = {
a86b6df1 411 name,
e5a5c0fc
JB
412 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
413 data: data ?? ({} as Data),
b6b32453 414 timestamp,
adc3c320
JB
415 id: crypto.randomUUID()
416 }
2e81254d 417 const res = new Promise<Response>((resolve, reject) => {
02706357 418 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
419 resolve,
420 reject,
20dcad1a 421 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
422 })
423 })
ff733df7
JB
424 if (
425 this.opts.enableTasksQueue === true &&
7171d33f 426 (this.busy ||
a4e07f72 427 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 428 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 429 .concurrency as number))
ff733df7 430 ) {
26a929d7
JB
431 this.enqueueTask(workerNodeKey, submittedTask)
432 } else {
2e81254d 433 this.executeTask(workerNodeKey, submittedTask)
adc3c320 434 }
b0d6ed8f 435 this.workerChoiceStrategyContext.update(workerNodeKey)
ff733df7 436 this.checkAndEmitEvents()
78cea37e 437 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
438 return res
439 }
c97c7edb 440
afc003b2 441 /** @inheritDoc */
c97c7edb 442 public async destroy (): Promise<void> {
1fbcaa7c 443 await Promise.all(
875a7c37
JB
444 this.workerNodes.map(async (workerNode, workerNodeKey) => {
445 this.flushTasksQueue(workerNodeKey)
47aacbaa 446 // FIXME: wait for tasks to be finished
f06e48d8 447 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
448 })
449 )
c97c7edb
S
450 }
451
4a6952ff 452 /**
f06e48d8 453 * Shutdowns the given worker.
4a6952ff 454 *
f06e48d8 455 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
456 */
457 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 458
729c563d 459 /**
2e81254d 460 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 461 * Can be overridden
afc003b2
JB
462 *
463 * @virtual
729c563d 464 */
280c2a77 465 protected setupHook (): void {
d99ba5a8 466 // Intentionally empty
280c2a77 467 }
c97c7edb 468
729c563d 469 /**
280c2a77
S
470 * Should return whether the worker is the main worker or not.
471 */
472 protected abstract isMain (): boolean
473
474 /**
2e81254d 475 * Hook executed before the worker task execution.
bf9549ae 476 * Can be overridden.
729c563d 477 *
f06e48d8 478 * @param workerNodeKey - The worker node key.
729c563d 479 */
f20f344f 480 protected beforeTaskExecutionHook (workerNodeKey: number): void {
a4e07f72
JB
481 ++this.workerNodes[workerNodeKey].workerUsage.tasks.executing
482 if (this.opts.enableTasksQueue === true) {
483 this.workerNodes[workerNodeKey].workerUsage.tasks.queued =
484 this.tasksQueueSize(workerNodeKey)
485 }
c97c7edb
S
486 }
487
c01733f1 488 /**
2e81254d 489 * Hook executed after the worker task execution.
bf9549ae 490 * Can be overridden.
c01733f1 491 *
c923ce56 492 * @param worker - The worker.
38e795c1 493 * @param message - The received message.
c01733f1 494 */
2e81254d 495 protected afterTaskExecutionHook (
c923ce56 496 worker: Worker,
2740a743 497 message: MessageValue<Response>
bf9549ae 498 ): void {
a4e07f72
JB
499 const workerUsage =
500 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
501 const workerTaskStatistics = workerUsage.tasks
502 --workerTaskStatistics.executing
503 ++workerTaskStatistics.executed
82f36766 504 if (message.taskError != null) {
a4e07f72 505 ++workerTaskStatistics.failed
2740a743 506 }
a4e07f72
JB
507
508 this.updateRunTimeWorkerUsage(workerUsage, message)
509 this.updateWaitTimeWorkerUsage(workerUsage, message)
510 this.updateEluWorkerUsage(workerUsage, message)
f8eb0a2a
JB
511 }
512
a4e07f72
JB
513 private updateRunTimeWorkerUsage (
514 workerUsage: WorkerUsage,
f8eb0a2a
JB
515 message: MessageValue<Response>
516 ): void {
b6b32453 517 if (this.workerChoiceStrategyContext.getTaskStatistics().runTime) {
a4e07f72 518 workerUsage.runTime.aggregation += message.taskPerformance?.runTime ?? 0
c6bd2650 519 if (
b6b32453 520 this.workerChoiceStrategyContext.getTaskStatistics().avgRunTime &&
a4e07f72 521 workerUsage.tasks.executed !== 0
c6bd2650 522 ) {
a4e07f72
JB
523 workerUsage.runTime.average =
524 workerUsage.runTime.aggregation / workerUsage.tasks.executed
3032893a 525 }
3fa4cdd2 526 if (
b6b32453 527 this.workerChoiceStrategyContext.getTaskStatistics().medRunTime &&
d715b7bc 528 message.taskPerformance?.runTime != null
3fa4cdd2 529 ) {
a4e07f72
JB
530 workerUsage.runTime.history.push(message.taskPerformance.runTime)
531 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 532 }
3032893a 533 }
f8eb0a2a
JB
534 }
535
a4e07f72
JB
536 private updateWaitTimeWorkerUsage (
537 workerUsage: WorkerUsage,
f8eb0a2a
JB
538 message: MessageValue<Response>
539 ): void {
b6b32453 540 if (this.workerChoiceStrategyContext.getTaskStatistics().waitTime) {
a4e07f72 541 workerUsage.waitTime.aggregation += message.taskPerformance?.waitTime ?? 0
09a6305f 542 if (
b6b32453 543 this.workerChoiceStrategyContext.getTaskStatistics().avgWaitTime &&
a4e07f72 544 workerUsage.tasks.executed !== 0
09a6305f 545 ) {
a4e07f72
JB
546 workerUsage.waitTime.average =
547 workerUsage.waitTime.aggregation / workerUsage.tasks.executed
09a6305f
JB
548 }
549 if (
b6b32453 550 this.workerChoiceStrategyContext.getTaskStatistics().medWaitTime &&
d715b7bc 551 message.taskPerformance?.waitTime != null
09a6305f 552 ) {
a4e07f72
JB
553 workerUsage.waitTime.history.push(message.taskPerformance.waitTime)
554 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 555 }
0567595a 556 }
c01733f1 557 }
558
a4e07f72
JB
559 private updateEluWorkerUsage (
560 workerTasksUsage: WorkerUsage,
62c15a68
JB
561 message: MessageValue<Response>
562 ): void {
b6b32453 563 if (this.workerChoiceStrategyContext.getTaskStatistics().elu) {
d715b7bc
JB
564 if (
565 workerTasksUsage.elu != null &&
566 message.taskPerformance?.elu != null
567 ) {
62c15a68 568 workerTasksUsage.elu = {
d715b7bc
JB
569 idle: workerTasksUsage.elu.idle + message.taskPerformance.elu.idle,
570 active:
571 workerTasksUsage.elu.active + message.taskPerformance.elu.active,
62c15a68 572 utilization:
d715b7bc
JB
573 (workerTasksUsage.elu.utilization +
574 message.taskPerformance.elu.utilization) /
575 2
62c15a68 576 }
d715b7bc
JB
577 } else if (message.taskPerformance?.elu != null) {
578 workerTasksUsage.elu = message.taskPerformance.elu
62c15a68
JB
579 }
580 }
581 }
582
280c2a77 583 /**
f06e48d8 584 * Chooses a worker node for the next task.
280c2a77 585 *
20dcad1a 586 * The default worker choice strategy uses a round robin algorithm to distribute the load.
280c2a77 587 *
20dcad1a 588 * @returns The worker node key
280c2a77 589 */
20dcad1a 590 protected chooseWorkerNode (): number {
f06e48d8 591 let workerNodeKey: number
6b27d407 592 if (this.type === PoolTypes.dynamic && !this.full && this.internalBusy()) {
adc3c320
JB
593 const workerCreated = this.createAndSetupWorker()
594 this.registerWorkerMessageListener(workerCreated, message => {
a4958de2 595 const currentWorkerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8
JB
596 if (
597 isKillBehavior(KillBehaviors.HARD, message.kill) ||
d2097c13 598 (message.kill != null &&
a4e07f72
JB
599 this.workerNodes[currentWorkerNodeKey].workerUsage.tasks
600 .executing === 0)
17393ac8 601 ) {
ff733df7 602 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
a4958de2 603 this.flushTasksQueue(currentWorkerNodeKey)
47aacbaa 604 // FIXME: wait for tasks to be finished
7c5a1080 605 void (this.destroyWorker(workerCreated) as Promise<void>)
17393ac8
JB
606 }
607 })
adc3c320 608 workerNodeKey = this.getWorkerNodeKey(workerCreated)
17393ac8 609 } else {
f06e48d8 610 workerNodeKey = this.workerChoiceStrategyContext.execute()
17393ac8 611 }
20dcad1a 612 return workerNodeKey
c97c7edb
S
613 }
614
280c2a77 615 /**
675bb809 616 * Sends a message to the given worker.
280c2a77 617 *
38e795c1
JB
618 * @param worker - The worker which should receive the message.
619 * @param message - The message.
280c2a77
S
620 */
621 protected abstract sendToWorker (
622 worker: Worker,
623 message: MessageValue<Data>
624 ): void
625
4a6952ff 626 /**
f06e48d8 627 * Registers a listener callback on the given worker.
4a6952ff 628 *
38e795c1
JB
629 * @param worker - The worker which should register a listener.
630 * @param listener - The message listener callback.
4a6952ff
JB
631 */
632 protected abstract registerWorkerMessageListener<
4f7fa42a 633 Message extends Data | Response
78cea37e 634 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 635
729c563d
S
636 /**
637 * Returns a newly created worker.
638 */
280c2a77 639 protected abstract createWorker (): Worker
c97c7edb 640
729c563d 641 /**
f06e48d8 642 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 643 *
38e795c1 644 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 645 *
38e795c1 646 * @param worker - The newly created worker.
729c563d 647 */
280c2a77 648 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 649
4a6952ff 650 /**
f06e48d8 651 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
652 *
653 * @returns New, completely set up worker.
654 */
655 protected createAndSetupWorker (): Worker {
bdacc2d2 656 const worker = this.createWorker()
280c2a77 657
35cf1c03 658 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 659 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
660 worker.on('error', error => {
661 if (this.emitter != null) {
662 this.emitter.emit(PoolEvents.error, error)
663 }
664 })
5baee0d7
JB
665 worker.on('error', () => {
666 if (this.opts.restartWorkerOnError === true) {
1f68cede 667 this.createAndSetupWorker()
5baee0d7
JB
668 }
669 })
a35560ba
S
670 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
671 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 672 worker.once('exit', () => {
f06e48d8 673 this.removeWorkerNode(worker)
a974afa6 674 })
280c2a77 675
f06e48d8 676 this.pushWorkerNode(worker)
280c2a77 677
b6b32453
JB
678 this.setWorkerStatistics(worker)
679
280c2a77
S
680 this.afterWorkerSetup(worker)
681
c97c7edb
S
682 return worker
683 }
be0676b3
APA
684
685 /**
ff733df7 686 * This function is the listener registered for each worker message.
be0676b3 687 *
bdacc2d2 688 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
689 */
690 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 691 return message => {
b1989cfd 692 if (message.id != null) {
a3445496 693 // Task execution response received
2740a743 694 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 695 if (promiseResponse != null) {
82f36766
JB
696 if (message.taskError != null) {
697 promiseResponse.reject(message.taskError.message)
91ee39ed 698 if (this.emitter != null) {
82f36766 699 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 700 }
a05c10de 701 } else {
2740a743 702 promiseResponse.resolve(message.data as Response)
a05c10de 703 }
2e81254d 704 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 705 this.promiseResponseMap.delete(message.id)
ff733df7
JB
706 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
707 if (
708 this.opts.enableTasksQueue === true &&
416fd65c 709 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 710 ) {
2e81254d
JB
711 this.executeTask(
712 workerNodeKey,
ff733df7
JB
713 this.dequeueTask(workerNodeKey) as Task<Data>
714 )
715 }
be0676b3
APA
716 }
717 }
718 }
be0676b3 719 }
7c0ba920 720
ff733df7 721 private checkAndEmitEvents (): void {
1f68cede 722 if (this.emitter != null) {
ff733df7 723 if (this.busy) {
6b27d407 724 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 725 }
6b27d407
JB
726 if (this.type === PoolTypes.dynamic && this.full) {
727 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 728 }
164d950a
JB
729 }
730 }
731
0ebe2a9f
JB
732 /**
733 * Sets the given worker node its tasks usage in the pool.
734 *
735 * @param workerNode - The worker node.
a4e07f72 736 * @param workerUsage - The worker usage.
0ebe2a9f
JB
737 */
738 private setWorkerNodeTasksUsage (
739 workerNode: WorkerNode<Worker, Data>,
a4e07f72 740 workerUsage: WorkerUsage
0ebe2a9f 741 ): void {
a4e07f72 742 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
743 }
744
a05c10de 745 /**
f06e48d8 746 * Pushes the given worker in the pool worker nodes.
ea7a90d3 747 *
38e795c1 748 * @param worker - The worker.
f06e48d8 749 * @returns The worker nodes length.
ea7a90d3 750 */
f06e48d8
JB
751 private pushWorkerNode (worker: Worker): number {
752 return this.workerNodes.push({
ffcbbad8 753 worker,
a4e07f72
JB
754 workerUsage: {
755 tasks: {
756 executed: 0,
757 executing: 0,
758 queued: 0,
759 failed: 0
760 },
761 runTime: {
762 aggregation: 0,
763 average: 0,
764 median: 0,
765 history: new CircularArray()
766 },
767
768 waitTime: {
769 aggregation: 0,
770 average: 0,
771 median: 0,
772 history: new CircularArray()
773 },
62c15a68 774 elu: undefined
f82cd357 775 },
29ee7e9a 776 tasksQueue: new Queue<Task<Data>>()
ea7a90d3
JB
777 })
778 }
c923ce56
JB
779
780 /**
f06e48d8 781 * Sets the given worker in the pool worker nodes.
c923ce56 782 *
f06e48d8 783 * @param workerNodeKey - The worker node key.
c923ce56 784 * @param worker - The worker.
a4e07f72 785 * @param workerUsage - The worker usage.
f06e48d8 786 * @param tasksQueue - The worker task queue.
c923ce56 787 */
f06e48d8
JB
788 private setWorkerNode (
789 workerNodeKey: number,
c923ce56 790 worker: Worker,
a4e07f72 791 workerUsage: WorkerUsage,
29ee7e9a 792 tasksQueue: Queue<Task<Data>>
c923ce56 793 ): void {
f06e48d8 794 this.workerNodes[workerNodeKey] = {
c923ce56 795 worker,
a4e07f72 796 workerUsage,
f06e48d8 797 tasksQueue
c923ce56
JB
798 }
799 }
51fe3d3c
JB
800
801 /**
f06e48d8 802 * Removes the given worker from the pool worker nodes.
51fe3d3c 803 *
f06e48d8 804 * @param worker - The worker.
51fe3d3c 805 */
416fd65c 806 private removeWorkerNode (worker: Worker): void {
f06e48d8 807 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
808 if (workerNodeKey !== -1) {
809 this.workerNodes.splice(workerNodeKey, 1)
810 this.workerChoiceStrategyContext.remove(workerNodeKey)
811 }
51fe3d3c 812 }
adc3c320 813
2e81254d 814 private executeTask (workerNodeKey: number, task: Task<Data>): void {
027c2215 815 this.beforeTaskExecutionHook(workerNodeKey)
2e81254d
JB
816 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
817 }
818
f9f00b5f 819 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 820 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
821 }
822
416fd65c 823 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 824 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
825 }
826
416fd65c 827 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 828 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 829 }
ff733df7 830
416fd65c
JB
831 private flushTasksQueue (workerNodeKey: number): void {
832 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
833 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
834 this.executeTask(
835 workerNodeKey,
836 this.dequeueTask(workerNodeKey) as Task<Data>
837 )
ff733df7 838 }
ff733df7
JB
839 }
840 }
841
ef41a6e6
JB
842 private flushTasksQueues (): void {
843 for (const [workerNodeKey] of this.workerNodes.entries()) {
844 this.flushTasksQueue(workerNodeKey)
845 }
846 }
b6b32453
JB
847
848 private setWorkerStatistics (worker: Worker): void {
849 this.sendToWorker(worker, {
850 statistics: {
851 runTime: this.workerChoiceStrategyContext.getTaskStatistics().runTime,
852 waitTime: this.workerChoiceStrategyContext.getTaskStatistics().waitTime,
853 elu: this.workerChoiceStrategyContext.getTaskStatistics().elu
854 }
855 })
856 }
c97c7edb 857}