fix: fix tasks queued count computation
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
9c16fb4b 24import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
a35560ba 25import {
f0d7f803 26 Measurements,
a35560ba 27 WorkerChoiceStrategies,
a20f0ba5
JB
28 type WorkerChoiceStrategy,
29 type WorkerChoiceStrategyOptions
bdaf31cd
JB
30} from './selection-strategies/selection-strategies-types'
31import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 32
729c563d 33/**
ea7a90d3 34 * Base class that implements some shared logic for all poolifier pools.
729c563d 35 *
38e795c1
JB
36 * @typeParam Worker - Type of worker which manages this pool.
37 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 38 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 39 */
c97c7edb 40export abstract class AbstractPool<
f06e48d8 41 Worker extends IWorker,
d3c8a1a8
S
42 Data = unknown,
43 Response = unknown
c4855468 44> implements IPool<Worker, Data, Response> {
afc003b2 45 /** @inheritDoc */
f06e48d8 46 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 47
afc003b2 48 /** @inheritDoc */
7c0ba920
JB
49 public readonly emitter?: PoolEmitter
50
be0676b3 51 /**
a3445496 52 * The execution response promise map.
be0676b3 53 *
2740a743 54 * - `key`: The message id of each submitted task.
a3445496 55 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 56 *
a3445496 57 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 58 */
c923ce56
JB
59 protected promiseResponseMap: Map<
60 string,
61 PromiseResponseWrapper<Worker, Response>
62 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 63
a35560ba 64 /**
51fe3d3c 65 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
68 Worker,
69 Data,
70 Response
a35560ba
S
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
38e795c1 76 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 77 * @param filePath - Path to the worker file.
38e795c1 78 * @param opts - Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
b4213b7f
JB
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
78cea37e 85 if (!this.isMain()) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
1086026a 91
7254e419
JB
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 96
6bd72cd0 97 if (this.opts.enableEvents === true) {
7c0ba920
JB
98 this.emitter = new PoolEmitter()
99 }
d59df138
JB
100 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
101 Worker,
102 Data,
103 Response
da309861
JB
104 >(
105 this,
106 this.opts.workerChoiceStrategy,
107 this.opts.workerChoiceStrategyOptions
108 )
b6b32453
JB
109
110 this.setupHook()
111
112 for (let i = 1; i <= this.numberOfWorkers; i++) {
113 this.createAndSetupWorker()
114 }
c97c7edb
S
115 }
116
a35560ba 117 private checkFilePath (filePath: string): void {
ffcbbad8
JB
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
c510fea7
APA
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
78cea37e 131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 132 throw new TypeError(
0d80593b 133 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
134 )
135 } else if (numberOfWorkers < 0) {
473c717a 136 throw new RangeError(
8d3782fa
JB
137 'Cannot instantiate a pool with a negative number of workers'
138 )
6b27d407 139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
1f68cede 155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 168 }
aee46736
JB
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 175 throw new Error(
aee46736 176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
177 )
178 }
7c0ba920
JB
179 }
180
0d80593b
JB
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
49be33fe
JB
189 if (
190 workerChoiceStrategyOptions.weights != null &&
6b27d407 191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
f0d7f803
JB
197 if (
198 workerChoiceStrategyOptions.measurement != null &&
199 !Object.values(Measurements).includes(
200 workerChoiceStrategyOptions.measurement
201 )
202 ) {
203 throw new Error(
204 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
205 )
206 }
0d80593b
JB
207 }
208
a20f0ba5
JB
209 private checkValidTasksQueueOptions (
210 tasksQueueOptions: TasksQueueOptions
211 ): void {
0d80593b
JB
212 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
213 throw new TypeError('Invalid tasks queue options: must be a plain object')
214 }
f0d7f803
JB
215 if (
216 tasksQueueOptions?.concurrency != null &&
217 !Number.isSafeInteger(tasksQueueOptions.concurrency)
218 ) {
219 throw new TypeError(
220 'Invalid worker tasks concurrency: must be an integer'
221 )
222 }
223 if (
224 tasksQueueOptions?.concurrency != null &&
225 tasksQueueOptions.concurrency <= 0
226 ) {
a20f0ba5 227 throw new Error(
f0d7f803 228 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
229 )
230 }
231 }
232
08f3f44c 233 /** @inheritDoc */
6b27d407
JB
234 public get info (): PoolInfo {
235 return {
236 type: this.type,
184855e6 237 worker: this.worker,
6b27d407
JB
238 minSize: this.minSize,
239 maxSize: this.maxSize,
240 workerNodes: this.workerNodes.length,
241 idleWorkerNodes: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
a4e07f72
JB
243 workerNode.workerUsage.tasks.executing === 0
244 ? accumulator + 1
245 : accumulator,
6b27d407
JB
246 0
247 ),
248 busyWorkerNodes: this.workerNodes.reduce(
249 (accumulator, workerNode) =>
a4e07f72
JB
250 workerNode.workerUsage.tasks.executing > 0
251 ? accumulator + 1
252 : accumulator,
6b27d407
JB
253 0
254 ),
a4e07f72 255 executedTasks: this.workerNodes.reduce(
6b27d407 256 (accumulator, workerNode) =>
a4e07f72
JB
257 accumulator + workerNode.workerUsage.tasks.executed,
258 0
259 ),
260 executingTasks: this.workerNodes.reduce(
261 (accumulator, workerNode) =>
262 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
263 0
264 ),
265 queuedTasks: this.workerNodes.reduce(
266 (accumulator, workerNode) => accumulator + workerNode.tasksQueue.size,
267 0
268 ),
269 maxQueuedTasks: this.workerNodes.reduce(
270 (accumulator, workerNode) =>
271 accumulator + workerNode.tasksQueue.maxSize,
272 0
a4e07f72
JB
273 ),
274 failedTasks: this.workerNodes.reduce(
275 (accumulator, workerNode) =>
276 accumulator + workerNode.workerUsage.tasks.failed,
277 0
6b27d407
JB
278 )
279 }
280 }
08f3f44c 281
8881ae32
JB
282 /**
283 * Pool type.
284 *
285 * If it is `'dynamic'`, it provides the `max` property.
286 */
287 protected abstract get type (): PoolType
288
184855e6
JB
289 /**
290 * Gets the worker type.
291 */
292 protected abstract get worker (): WorkerType
293
c2ade475 294 /**
6b27d407 295 * Pool minimum size.
c2ade475 296 */
6b27d407 297 protected abstract get minSize (): number
ff733df7
JB
298
299 /**
6b27d407 300 * Pool maximum size.
ff733df7 301 */
6b27d407 302 protected abstract get maxSize (): number
a35560ba 303
ffcbbad8 304 /**
f06e48d8 305 * Gets the given worker its worker node key.
ffcbbad8
JB
306 *
307 * @param worker - The worker.
f06e48d8 308 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 309 */
f06e48d8
JB
310 private getWorkerNodeKey (worker: Worker): number {
311 return this.workerNodes.findIndex(
312 workerNode => workerNode.worker === worker
313 )
bf9549ae
JB
314 }
315
afc003b2 316 /** @inheritDoc */
a35560ba 317 public setWorkerChoiceStrategy (
59219cbb
JB
318 workerChoiceStrategy: WorkerChoiceStrategy,
319 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 320 ): void {
aee46736 321 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 322 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
323 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
324 this.opts.workerChoiceStrategy
325 )
326 if (workerChoiceStrategyOptions != null) {
327 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
328 }
9c16fb4b 329 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
330 this.setWorkerNodeTasksUsage(
331 workerNode,
9c16fb4b 332 this.getWorkerUsage(workerNodeKey)
8604aaab 333 )
b6b32453 334 this.setWorkerStatistics(workerNode.worker)
59219cbb 335 }
a20f0ba5
JB
336 }
337
338 /** @inheritDoc */
339 public setWorkerChoiceStrategyOptions (
340 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
341 ): void {
0d80593b 342 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
343 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
344 this.workerChoiceStrategyContext.setOptions(
345 this.opts.workerChoiceStrategyOptions
a35560ba
S
346 )
347 }
348
a20f0ba5 349 /** @inheritDoc */
8f52842f
JB
350 public enableTasksQueue (
351 enable: boolean,
352 tasksQueueOptions?: TasksQueueOptions
353 ): void {
a20f0ba5 354 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 355 this.flushTasksQueues()
a20f0ba5
JB
356 }
357 this.opts.enableTasksQueue = enable
8f52842f 358 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
359 }
360
361 /** @inheritDoc */
8f52842f 362 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 363 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
364 this.checkValidTasksQueueOptions(tasksQueueOptions)
365 this.opts.tasksQueueOptions =
366 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 367 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
368 delete this.opts.tasksQueueOptions
369 }
370 }
371
372 private buildTasksQueueOptions (
373 tasksQueueOptions: TasksQueueOptions
374 ): TasksQueueOptions {
375 return {
376 concurrency: tasksQueueOptions?.concurrency ?? 1
377 }
378 }
379
c319c66b
JB
380 /**
381 * Whether the pool is full or not.
382 *
383 * The pool filling boolean status.
384 */
dea903a8
JB
385 protected get full (): boolean {
386 return this.workerNodes.length >= this.maxSize
387 }
c2ade475 388
c319c66b
JB
389 /**
390 * Whether the pool is busy or not.
391 *
392 * The pool busyness boolean status.
393 */
394 protected abstract get busy (): boolean
7c0ba920 395
6c6afb84
JB
396 /**
397 * Whether worker nodes are executing at least one task.
398 *
399 * @returns Worker nodes busyness boolean status.
400 */
c2ade475 401 protected internalBusy (): boolean {
e0ae6100
JB
402 return (
403 this.workerNodes.findIndex(workerNode => {
a4e07f72 404 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
405 }) === -1
406 )
cb70b19d
JB
407 }
408
afc003b2 409 /** @inheritDoc */
a86b6df1 410 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 411 const timestamp = performance.now()
20dcad1a 412 const workerNodeKey = this.chooseWorkerNode()
adc3c320 413 const submittedTask: Task<Data> = {
a86b6df1 414 name,
e5a5c0fc
JB
415 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
416 data: data ?? ({} as Data),
b6b32453 417 timestamp,
adc3c320
JB
418 id: crypto.randomUUID()
419 }
2e81254d 420 const res = new Promise<Response>((resolve, reject) => {
02706357 421 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
422 resolve,
423 reject,
20dcad1a 424 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
425 })
426 })
ff733df7
JB
427 if (
428 this.opts.enableTasksQueue === true &&
7171d33f 429 (this.busy ||
a4e07f72 430 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 431 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 432 .concurrency as number))
ff733df7 433 ) {
26a929d7
JB
434 this.enqueueTask(workerNodeKey, submittedTask)
435 } else {
2e81254d 436 this.executeTask(workerNodeKey, submittedTask)
adc3c320 437 }
ff733df7 438 this.checkAndEmitEvents()
78cea37e 439 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
440 return res
441 }
c97c7edb 442
afc003b2 443 /** @inheritDoc */
c97c7edb 444 public async destroy (): Promise<void> {
1fbcaa7c 445 await Promise.all(
875a7c37
JB
446 this.workerNodes.map(async (workerNode, workerNodeKey) => {
447 this.flushTasksQueue(workerNodeKey)
47aacbaa 448 // FIXME: wait for tasks to be finished
f06e48d8 449 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
450 })
451 )
c97c7edb
S
452 }
453
4a6952ff 454 /**
6c6afb84 455 * Terminates the given worker.
4a6952ff 456 *
f06e48d8 457 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
458 */
459 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 460
729c563d 461 /**
2e81254d 462 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 463 * Can be overridden
afc003b2
JB
464 *
465 * @virtual
729c563d 466 */
280c2a77 467 protected setupHook (): void {
d99ba5a8 468 // Intentionally empty
280c2a77 469 }
c97c7edb 470
729c563d 471 /**
280c2a77
S
472 * Should return whether the worker is the main worker or not.
473 */
474 protected abstract isMain (): boolean
475
476 /**
2e81254d 477 * Hook executed before the worker task execution.
bf9549ae 478 * Can be overridden.
729c563d 479 *
f06e48d8 480 * @param workerNodeKey - The worker node key.
1c6fe997 481 * @param task - The task to execute.
729c563d 482 */
1c6fe997
JB
483 protected beforeTaskExecutionHook (
484 workerNodeKey: number,
485 task: Task<Data>
486 ): void {
487 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
488 ++workerUsage.tasks.executing
489 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
490 }
491
c01733f1 492 /**
2e81254d 493 * Hook executed after the worker task execution.
bf9549ae 494 * Can be overridden.
c01733f1 495 *
c923ce56 496 * @param worker - The worker.
38e795c1 497 * @param message - The received message.
c01733f1 498 */
2e81254d 499 protected afterTaskExecutionHook (
c923ce56 500 worker: Worker,
2740a743 501 message: MessageValue<Response>
bf9549ae 502 ): void {
a4e07f72
JB
503 const workerUsage =
504 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
f1c06930
JB
505 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
506 this.updateRunTimeWorkerUsage(workerUsage, message)
507 this.updateEluWorkerUsage(workerUsage, message)
508 }
509
510 private updateTaskStatisticsWorkerUsage (
511 workerUsage: WorkerUsage,
512 message: MessageValue<Response>
513 ): void {
a4e07f72
JB
514 const workerTaskStatistics = workerUsage.tasks
515 --workerTaskStatistics.executing
516 ++workerTaskStatistics.executed
82f36766 517 if (message.taskError != null) {
a4e07f72 518 ++workerTaskStatistics.failed
2740a743 519 }
f8eb0a2a
JB
520 }
521
a4e07f72
JB
522 private updateRunTimeWorkerUsage (
523 workerUsage: WorkerUsage,
f8eb0a2a
JB
524 message: MessageValue<Response>
525 ): void {
87de9ff5
JB
526 if (
527 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 528 .aggregate
87de9ff5 529 ) {
932fc8be 530 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 531 if (
932fc8be
JB
532 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
533 .average &&
a4e07f72 534 workerUsage.tasks.executed !== 0
c6bd2650 535 ) {
a4e07f72 536 workerUsage.runTime.average =
f1c06930
JB
537 workerUsage.runTime.aggregate /
538 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 539 }
3fa4cdd2 540 if (
932fc8be
JB
541 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
542 .median &&
d715b7bc 543 message.taskPerformance?.runTime != null
3fa4cdd2 544 ) {
a4e07f72
JB
545 workerUsage.runTime.history.push(message.taskPerformance.runTime)
546 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 547 }
3032893a 548 }
f8eb0a2a
JB
549 }
550
a4e07f72
JB
551 private updateWaitTimeWorkerUsage (
552 workerUsage: WorkerUsage,
1c6fe997 553 task: Task<Data>
f8eb0a2a 554 ): void {
1c6fe997
JB
555 const timestamp = performance.now()
556 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
557 if (
558 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 559 .aggregate
87de9ff5 560 ) {
932fc8be 561 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 562 if (
87de9ff5 563 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 564 .waitTime.average &&
a4e07f72 565 workerUsage.tasks.executed !== 0
09a6305f 566 ) {
a4e07f72 567 workerUsage.waitTime.average =
f1c06930
JB
568 workerUsage.waitTime.aggregate /
569 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
570 }
571 if (
87de9ff5 572 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 573 .waitTime.median &&
1c6fe997 574 taskWaitTime != null
09a6305f 575 ) {
1c6fe997 576 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 577 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 578 }
0567595a 579 }
c01733f1 580 }
581
a4e07f72 582 private updateEluWorkerUsage (
5df69fab 583 workerUsage: WorkerUsage,
62c15a68
JB
584 message: MessageValue<Response>
585 ): void {
5df69fab
JB
586 if (
587 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
588 .aggregate
589 ) {
590 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
591 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
592 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
593 workerUsage.elu.utilization =
594 (workerUsage.elu.utilization +
595 message.taskPerformance.elu.utilization) /
596 2
597 } else if (message.taskPerformance?.elu != null) {
598 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
599 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
600 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
601 }
d715b7bc 602 if (
5df69fab
JB
603 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
604 .average &&
605 workerUsage.tasks.executed !== 0
606 ) {
f1c06930
JB
607 const executedTasks =
608 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 609 workerUsage.elu.idle.average =
f1c06930 610 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 611 workerUsage.elu.active.average =
f1c06930 612 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
613 }
614 if (
615 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
616 .median &&
d715b7bc
JB
617 message.taskPerformance?.elu != null
618 ) {
5df69fab
JB
619 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
620 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
621 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
622 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
623 }
624 }
625 }
626
280c2a77 627 /**
f06e48d8 628 * Chooses a worker node for the next task.
280c2a77 629 *
6c6afb84 630 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 631 *
20dcad1a 632 * @returns The worker node key
280c2a77 633 */
6c6afb84 634 private chooseWorkerNode (): number {
930dcf12 635 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
636 const worker = this.createAndSetupDynamicWorker()
637 if (
638 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
639 ) {
640 return this.getWorkerNodeKey(worker)
641 }
17393ac8 642 }
930dcf12
JB
643 return this.workerChoiceStrategyContext.execute()
644 }
645
6c6afb84
JB
646 /**
647 * Conditions for dynamic worker creation.
648 *
649 * @returns Whether to create a dynamic worker or not.
650 */
651 private shallCreateDynamicWorker (): boolean {
930dcf12 652 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
653 }
654
280c2a77 655 /**
675bb809 656 * Sends a message to the given worker.
280c2a77 657 *
38e795c1
JB
658 * @param worker - The worker which should receive the message.
659 * @param message - The message.
280c2a77
S
660 */
661 protected abstract sendToWorker (
662 worker: Worker,
663 message: MessageValue<Data>
664 ): void
665
4a6952ff 666 /**
f06e48d8 667 * Registers a listener callback on the given worker.
4a6952ff 668 *
38e795c1
JB
669 * @param worker - The worker which should register a listener.
670 * @param listener - The message listener callback.
4a6952ff
JB
671 */
672 protected abstract registerWorkerMessageListener<
4f7fa42a 673 Message extends Data | Response
78cea37e 674 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 675
729c563d 676 /**
41344292 677 * Creates a new worker.
6c6afb84
JB
678 *
679 * @returns Newly created worker.
729c563d 680 */
280c2a77 681 protected abstract createWorker (): Worker
c97c7edb 682
729c563d 683 /**
f06e48d8 684 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 685 *
38e795c1 686 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 687 *
38e795c1 688 * @param worker - The newly created worker.
729c563d 689 */
280c2a77 690 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 691
4a6952ff 692 /**
f06e48d8 693 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
694 *
695 * @returns New, completely set up worker.
696 */
697 protected createAndSetupWorker (): Worker {
bdacc2d2 698 const worker = this.createWorker()
280c2a77 699
35cf1c03 700 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 701 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
702 worker.on('error', error => {
703 if (this.emitter != null) {
704 this.emitter.emit(PoolEvents.error, error)
705 }
5baee0d7 706 if (this.opts.restartWorkerOnError === true) {
1f68cede 707 this.createAndSetupWorker()
5baee0d7
JB
708 }
709 })
a35560ba
S
710 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
711 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 712 worker.once('exit', () => {
f06e48d8 713 this.removeWorkerNode(worker)
a974afa6 714 })
280c2a77 715
f06e48d8 716 this.pushWorkerNode(worker)
280c2a77 717
b6b32453
JB
718 this.setWorkerStatistics(worker)
719
280c2a77
S
720 this.afterWorkerSetup(worker)
721
c97c7edb
S
722 return worker
723 }
be0676b3 724
930dcf12
JB
725 /**
726 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
727 *
728 * @returns New, completely set up dynamic worker.
729 */
730 protected createAndSetupDynamicWorker (): Worker {
731 const worker = this.createAndSetupWorker()
732 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 733 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
734 if (
735 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
736 (message.kill != null &&
737 ((this.opts.enableTasksQueue === false &&
e8b3a5ab
JB
738 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
739 0) ||
7b56f532 740 (this.opts.enableTasksQueue === true &&
e8b3a5ab
JB
741 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
742 0 &&
743 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
744 ) {
745 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
746 void (this.destroyWorker(worker) as Promise<void>)
747 }
748 })
749 return worker
750 }
751
be0676b3 752 /**
ff733df7 753 * This function is the listener registered for each worker message.
be0676b3 754 *
bdacc2d2 755 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
756 */
757 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 758 return message => {
b1989cfd 759 if (message.id != null) {
a3445496 760 // Task execution response received
2740a743 761 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 762 if (promiseResponse != null) {
82f36766 763 if (message.taskError != null) {
91ee39ed 764 if (this.emitter != null) {
82f36766 765 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 766 }
cd9580e7 767 promiseResponse.reject(message.taskError.message)
a05c10de 768 } else {
2740a743 769 promiseResponse.resolve(message.data as Response)
a05c10de 770 }
2e81254d 771 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 772 this.promiseResponseMap.delete(message.id)
ff733df7
JB
773 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
774 if (
775 this.opts.enableTasksQueue === true &&
416fd65c 776 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 777 ) {
2e81254d
JB
778 this.executeTask(
779 workerNodeKey,
ff733df7
JB
780 this.dequeueTask(workerNodeKey) as Task<Data>
781 )
782 }
e5536a06 783 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
784 }
785 }
786 }
be0676b3 787 }
7c0ba920 788
ff733df7 789 private checkAndEmitEvents (): void {
1f68cede 790 if (this.emitter != null) {
ff733df7 791 if (this.busy) {
6b27d407 792 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 793 }
6b27d407
JB
794 if (this.type === PoolTypes.dynamic && this.full) {
795 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 796 }
164d950a
JB
797 }
798 }
799
0ebe2a9f
JB
800 /**
801 * Sets the given worker node its tasks usage in the pool.
802 *
803 * @param workerNode - The worker node.
a4e07f72 804 * @param workerUsage - The worker usage.
0ebe2a9f
JB
805 */
806 private setWorkerNodeTasksUsage (
807 workerNode: WorkerNode<Worker, Data>,
a4e07f72 808 workerUsage: WorkerUsage
0ebe2a9f 809 ): void {
a4e07f72 810 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
811 }
812
a05c10de 813 /**
f06e48d8 814 * Pushes the given worker in the pool worker nodes.
ea7a90d3 815 *
38e795c1 816 * @param worker - The worker.
f06e48d8 817 * @returns The worker nodes length.
ea7a90d3 818 */
f06e48d8 819 private pushWorkerNode (worker: Worker): number {
9c16fb4b 820 this.workerNodes.push({
ffcbbad8 821 worker,
9c16fb4b 822 workerUsage: this.getWorkerUsage(),
29ee7e9a 823 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 824 })
9c16fb4b
JB
825 const workerNodeKey = this.getWorkerNodeKey(worker)
826 this.setWorkerNodeTasksUsage(
827 this.workerNodes[workerNodeKey],
828 this.getWorkerUsage(workerNodeKey)
829 )
830 return this.workerNodes.length
ea7a90d3 831 }
c923ce56 832
8604aaab
JB
833 // /**
834 // * Sets the given worker in the pool worker nodes.
835 // *
836 // * @param workerNodeKey - The worker node key.
837 // * @param worker - The worker.
838 // * @param workerUsage - The worker usage.
839 // * @param tasksQueue - The worker task queue.
840 // */
841 // private setWorkerNode (
842 // workerNodeKey: number,
843 // worker: Worker,
844 // workerUsage: WorkerUsage,
845 // tasksQueue: Queue<Task<Data>>
846 // ): void {
847 // this.workerNodes[workerNodeKey] = {
848 // worker,
849 // workerUsage,
850 // tasksQueue
851 // }
852 // }
51fe3d3c
JB
853
854 /**
f06e48d8 855 * Removes the given worker from the pool worker nodes.
51fe3d3c 856 *
f06e48d8 857 * @param worker - The worker.
51fe3d3c 858 */
416fd65c 859 private removeWorkerNode (worker: Worker): void {
f06e48d8 860 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
861 if (workerNodeKey !== -1) {
862 this.workerNodes.splice(workerNodeKey, 1)
863 this.workerChoiceStrategyContext.remove(workerNodeKey)
864 }
51fe3d3c 865 }
adc3c320 866
2e81254d 867 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 868 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
869 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
870 }
871
f9f00b5f 872 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 873 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
874 }
875
416fd65c 876 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 877 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
878 }
879
416fd65c 880 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 881 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 882 }
ff733df7 883
416fd65c
JB
884 private flushTasksQueue (workerNodeKey: number): void {
885 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
886 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
887 this.executeTask(
888 workerNodeKey,
889 this.dequeueTask(workerNodeKey) as Task<Data>
890 )
ff733df7 891 }
ff733df7
JB
892 }
893 }
894
ef41a6e6
JB
895 private flushTasksQueues (): void {
896 for (const [workerNodeKey] of this.workerNodes.entries()) {
897 this.flushTasksQueue(workerNodeKey)
898 }
899 }
b6b32453
JB
900
901 private setWorkerStatistics (worker: Worker): void {
902 this.sendToWorker(worker, {
903 statistics: {
87de9ff5
JB
904 runTime:
905 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 906 .runTime.aggregate,
87de9ff5 907 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 908 .elu.aggregate
b6b32453
JB
909 }
910 })
911 }
8604aaab 912
9c16fb4b
JB
913 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
914 const getQueueSize = (workerNodeKey: number): number => {
915 return this.tasksQueueSize(workerNodeKey)
916 }
8604aaab 917 return {
9c16fb4b
JB
918 tasks: {
919 executed: 0,
920 executing: 0,
921 get queued (): number {
922 return workerNodeKey == null ? 0 : getQueueSize(workerNodeKey)
923 },
924 failed: 0
925 },
8604aaab 926 runTime: {
932fc8be 927 aggregate: 0,
8604aaab
JB
928 average: 0,
929 median: 0,
930 history: new CircularArray()
931 },
932 waitTime: {
932fc8be 933 aggregate: 0,
8604aaab
JB
934 average: 0,
935 median: 0,
936 history: new CircularArray()
937 },
5df69fab
JB
938 elu: {
939 idle: {
940 aggregate: 0,
941 average: 0,
942 median: 0,
943 history: new CircularArray()
944 },
945 active: {
946 aggregate: 0,
947 average: 0,
948 median: 0,
949 history: new CircularArray()
950 },
951 utilization: 0
952 }
8604aaab
JB
953 }
954 }
c97c7edb 955}