refactor: use named export in benchmarking code
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
fc3e6586 1import crypto from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
2740a743 3import type { MessageValue, PromiseResponseWrapper } from '../utility-types'
bbeadd16
JB
4import {
5 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
6 EMPTY_FUNCTION,
0d80593b 7 isPlainObject,
bbeadd16
JB
8 median
9} from '../utils'
34a0cfab 10import { KillBehaviors, isKillBehavior } from '../worker/worker-options'
65d7a1c9 11import { CircularArray } from '../circular-array'
29ee7e9a 12import { Queue } from '../queue'
c4855468 13import {
65d7a1c9 14 type IPool,
7c5a1080 15 PoolEmitter,
c4855468 16 PoolEvents,
6b27d407 17 type PoolInfo,
c4855468 18 type PoolOptions,
6b27d407
JB
19 type PoolType,
20 PoolTypes,
184855e6
JB
21 type TasksQueueOptions,
22 type WorkerType
c4855468 23} from './pool'
9c16fb4b 24import type { IWorker, Task, WorkerNode, WorkerUsage } from './worker'
a35560ba 25import {
f0d7f803 26 Measurements,
a35560ba 27 WorkerChoiceStrategies,
a20f0ba5
JB
28 type WorkerChoiceStrategy,
29 type WorkerChoiceStrategyOptions
bdaf31cd
JB
30} from './selection-strategies/selection-strategies-types'
31import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
c97c7edb 32
729c563d 33/**
ea7a90d3 34 * Base class that implements some shared logic for all poolifier pools.
729c563d 35 *
38e795c1
JB
36 * @typeParam Worker - Type of worker which manages this pool.
37 * @typeParam Data - Type of data sent to the worker. This can only be serializable data.
02706357 38 * @typeParam Response - Type of execution response. This can only be serializable data.
729c563d 39 */
c97c7edb 40export abstract class AbstractPool<
f06e48d8 41 Worker extends IWorker,
d3c8a1a8
S
42 Data = unknown,
43 Response = unknown
c4855468 44> implements IPool<Worker, Data, Response> {
afc003b2 45 /** @inheritDoc */
f06e48d8 46 public readonly workerNodes: Array<WorkerNode<Worker, Data>> = []
4a6952ff 47
afc003b2 48 /** @inheritDoc */
7c0ba920
JB
49 public readonly emitter?: PoolEmitter
50
be0676b3 51 /**
a3445496 52 * The execution response promise map.
be0676b3 53 *
2740a743 54 * - `key`: The message id of each submitted task.
a3445496 55 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 56 *
a3445496 57 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 58 */
c923ce56
JB
59 protected promiseResponseMap: Map<
60 string,
61 PromiseResponseWrapper<Worker, Response>
62 > = new Map<string, PromiseResponseWrapper<Worker, Response>>()
c97c7edb 63
a35560ba 64 /**
51fe3d3c 65 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
66 */
67 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
68 Worker,
69 Data,
70 Response
a35560ba
S
71 >
72
729c563d
S
73 /**
74 * Constructs a new poolifier pool.
75 *
38e795c1 76 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 77 * @param filePath - Path to the worker file.
38e795c1 78 * @param opts - Options for the pool.
729c563d 79 */
c97c7edb 80 public constructor (
b4213b7f
JB
81 protected readonly numberOfWorkers: number,
82 protected readonly filePath: string,
83 protected readonly opts: PoolOptions<Worker>
c97c7edb 84 ) {
78cea37e 85 if (!this.isMain()) {
c97c7edb
S
86 throw new Error('Cannot start a pool from a worker!')
87 }
8d3782fa 88 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 89 this.checkFilePath(this.filePath)
7c0ba920 90 this.checkPoolOptions(this.opts)
1086026a 91
7254e419
JB
92 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
93 this.executeTask = this.executeTask.bind(this)
94 this.enqueueTask = this.enqueueTask.bind(this)
95 this.checkAndEmitEvents = this.checkAndEmitEvents.bind(this)
1086026a 96
6bd72cd0 97 if (this.opts.enableEvents === true) {
7c0ba920
JB
98 this.emitter = new PoolEmitter()
99 }
d59df138
JB
100 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
101 Worker,
102 Data,
103 Response
da309861
JB
104 >(
105 this,
106 this.opts.workerChoiceStrategy,
107 this.opts.workerChoiceStrategyOptions
108 )
b6b32453
JB
109
110 this.setupHook()
111
112 for (let i = 1; i <= this.numberOfWorkers; i++) {
113 this.createAndSetupWorker()
114 }
c97c7edb
S
115 }
116
a35560ba 117 private checkFilePath (filePath: string): void {
ffcbbad8
JB
118 if (
119 filePath == null ||
120 (typeof filePath === 'string' && filePath.trim().length === 0)
121 ) {
c510fea7
APA
122 throw new Error('Please specify a file with a worker implementation')
123 }
124 }
125
8d3782fa
JB
126 private checkNumberOfWorkers (numberOfWorkers: number): void {
127 if (numberOfWorkers == null) {
128 throw new Error(
129 'Cannot instantiate a pool without specifying the number of workers'
130 )
78cea37e 131 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 132 throw new TypeError(
0d80593b 133 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
134 )
135 } else if (numberOfWorkers < 0) {
473c717a 136 throw new RangeError(
8d3782fa
JB
137 'Cannot instantiate a pool with a negative number of workers'
138 )
6b27d407 139 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
8d3782fa
JB
140 throw new Error('Cannot instantiate a fixed pool with no worker')
141 }
142 }
143
7c0ba920 144 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
145 if (isPlainObject(opts)) {
146 this.opts.workerChoiceStrategy =
147 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
148 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
149 this.opts.workerChoiceStrategyOptions =
150 opts.workerChoiceStrategyOptions ??
151 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS
49be33fe
JB
152 this.checkValidWorkerChoiceStrategyOptions(
153 this.opts.workerChoiceStrategyOptions
154 )
1f68cede 155 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
156 this.opts.enableEvents = opts.enableEvents ?? true
157 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
158 if (this.opts.enableTasksQueue) {
159 this.checkValidTasksQueueOptions(
160 opts.tasksQueueOptions as TasksQueueOptions
161 )
162 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
163 opts.tasksQueueOptions as TasksQueueOptions
164 )
165 }
166 } else {
167 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 168 }
aee46736
JB
169 }
170
171 private checkValidWorkerChoiceStrategy (
172 workerChoiceStrategy: WorkerChoiceStrategy
173 ): void {
174 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 175 throw new Error(
aee46736 176 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
177 )
178 }
7c0ba920
JB
179 }
180
0d80593b
JB
181 private checkValidWorkerChoiceStrategyOptions (
182 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
183 ): void {
184 if (!isPlainObject(workerChoiceStrategyOptions)) {
185 throw new TypeError(
186 'Invalid worker choice strategy options: must be a plain object'
187 )
188 }
49be33fe
JB
189 if (
190 workerChoiceStrategyOptions.weights != null &&
6b27d407 191 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
192 ) {
193 throw new Error(
194 'Invalid worker choice strategy options: must have a weight for each worker node'
195 )
196 }
f0d7f803
JB
197 if (
198 workerChoiceStrategyOptions.measurement != null &&
199 !Object.values(Measurements).includes(
200 workerChoiceStrategyOptions.measurement
201 )
202 ) {
203 throw new Error(
204 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
205 )
206 }
0d80593b
JB
207 }
208
a20f0ba5
JB
209 private checkValidTasksQueueOptions (
210 tasksQueueOptions: TasksQueueOptions
211 ): void {
0d80593b
JB
212 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
213 throw new TypeError('Invalid tasks queue options: must be a plain object')
214 }
f0d7f803
JB
215 if (
216 tasksQueueOptions?.concurrency != null &&
217 !Number.isSafeInteger(tasksQueueOptions.concurrency)
218 ) {
219 throw new TypeError(
220 'Invalid worker tasks concurrency: must be an integer'
221 )
222 }
223 if (
224 tasksQueueOptions?.concurrency != null &&
225 tasksQueueOptions.concurrency <= 0
226 ) {
a20f0ba5 227 throw new Error(
f0d7f803 228 `Invalid worker tasks concurrency '${tasksQueueOptions.concurrency}'`
a20f0ba5
JB
229 )
230 }
231 }
232
08f3f44c 233 /** @inheritDoc */
6b27d407
JB
234 public get info (): PoolInfo {
235 return {
236 type: this.type,
184855e6 237 worker: this.worker,
6b27d407
JB
238 minSize: this.minSize,
239 maxSize: this.maxSize,
240 workerNodes: this.workerNodes.length,
241 idleWorkerNodes: this.workerNodes.reduce(
242 (accumulator, workerNode) =>
a4e07f72
JB
243 workerNode.workerUsage.tasks.executing === 0
244 ? accumulator + 1
245 : accumulator,
6b27d407
JB
246 0
247 ),
248 busyWorkerNodes: this.workerNodes.reduce(
249 (accumulator, workerNode) =>
a4e07f72
JB
250 workerNode.workerUsage.tasks.executing > 0
251 ? accumulator + 1
252 : accumulator,
6b27d407
JB
253 0
254 ),
a4e07f72 255 executedTasks: this.workerNodes.reduce(
6b27d407 256 (accumulator, workerNode) =>
a4e07f72
JB
257 accumulator + workerNode.workerUsage.tasks.executed,
258 0
259 ),
260 executingTasks: this.workerNodes.reduce(
261 (accumulator, workerNode) =>
262 accumulator + workerNode.workerUsage.tasks.executing,
6b27d407
JB
263 0
264 ),
265 queuedTasks: this.workerNodes.reduce(
df593701
JB
266 (accumulator, workerNode) =>
267 accumulator + workerNode.workerUsage.tasks.queued,
6b27d407
JB
268 0
269 ),
270 maxQueuedTasks: this.workerNodes.reduce(
271 (accumulator, workerNode) =>
df593701 272 accumulator + workerNode.workerUsage.tasks.maxQueued,
6b27d407 273 0
a4e07f72
JB
274 ),
275 failedTasks: this.workerNodes.reduce(
276 (accumulator, workerNode) =>
277 accumulator + workerNode.workerUsage.tasks.failed,
278 0
6b27d407
JB
279 )
280 }
281 }
08f3f44c 282
8881ae32
JB
283 /**
284 * Pool type.
285 *
286 * If it is `'dynamic'`, it provides the `max` property.
287 */
288 protected abstract get type (): PoolType
289
184855e6
JB
290 /**
291 * Gets the worker type.
292 */
293 protected abstract get worker (): WorkerType
294
c2ade475 295 /**
6b27d407 296 * Pool minimum size.
c2ade475 297 */
6b27d407 298 protected abstract get minSize (): number
ff733df7
JB
299
300 /**
6b27d407 301 * Pool maximum size.
ff733df7 302 */
6b27d407 303 protected abstract get maxSize (): number
a35560ba 304
ffcbbad8 305 /**
f06e48d8 306 * Gets the given worker its worker node key.
ffcbbad8
JB
307 *
308 * @param worker - The worker.
f06e48d8 309 * @returns The worker node key if the worker is found in the pool worker nodes, `-1` otherwise.
ffcbbad8 310 */
f06e48d8
JB
311 private getWorkerNodeKey (worker: Worker): number {
312 return this.workerNodes.findIndex(
313 workerNode => workerNode.worker === worker
314 )
bf9549ae
JB
315 }
316
afc003b2 317 /** @inheritDoc */
a35560ba 318 public setWorkerChoiceStrategy (
59219cbb
JB
319 workerChoiceStrategy: WorkerChoiceStrategy,
320 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 321 ): void {
aee46736 322 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 323 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
324 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
325 this.opts.workerChoiceStrategy
326 )
327 if (workerChoiceStrategyOptions != null) {
328 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
329 }
9c16fb4b 330 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
8604aaab
JB
331 this.setWorkerNodeTasksUsage(
332 workerNode,
9c16fb4b 333 this.getWorkerUsage(workerNodeKey)
8604aaab 334 )
b6b32453 335 this.setWorkerStatistics(workerNode.worker)
59219cbb 336 }
a20f0ba5
JB
337 }
338
339 /** @inheritDoc */
340 public setWorkerChoiceStrategyOptions (
341 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
342 ): void {
0d80593b 343 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
a20f0ba5
JB
344 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
345 this.workerChoiceStrategyContext.setOptions(
346 this.opts.workerChoiceStrategyOptions
a35560ba
S
347 )
348 }
349
a20f0ba5 350 /** @inheritDoc */
8f52842f
JB
351 public enableTasksQueue (
352 enable: boolean,
353 tasksQueueOptions?: TasksQueueOptions
354 ): void {
a20f0ba5 355 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 356 this.flushTasksQueues()
a20f0ba5
JB
357 }
358 this.opts.enableTasksQueue = enable
8f52842f 359 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
360 }
361
362 /** @inheritDoc */
8f52842f 363 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 364 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
365 this.checkValidTasksQueueOptions(tasksQueueOptions)
366 this.opts.tasksQueueOptions =
367 this.buildTasksQueueOptions(tasksQueueOptions)
5baee0d7 368 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
369 delete this.opts.tasksQueueOptions
370 }
371 }
372
373 private buildTasksQueueOptions (
374 tasksQueueOptions: TasksQueueOptions
375 ): TasksQueueOptions {
376 return {
377 concurrency: tasksQueueOptions?.concurrency ?? 1
378 }
379 }
380
c319c66b
JB
381 /**
382 * Whether the pool is full or not.
383 *
384 * The pool filling boolean status.
385 */
dea903a8
JB
386 protected get full (): boolean {
387 return this.workerNodes.length >= this.maxSize
388 }
c2ade475 389
c319c66b
JB
390 /**
391 * Whether the pool is busy or not.
392 *
393 * The pool busyness boolean status.
394 */
395 protected abstract get busy (): boolean
7c0ba920 396
6c6afb84
JB
397 /**
398 * Whether worker nodes are executing at least one task.
399 *
400 * @returns Worker nodes busyness boolean status.
401 */
c2ade475 402 protected internalBusy (): boolean {
e0ae6100
JB
403 return (
404 this.workerNodes.findIndex(workerNode => {
a4e07f72 405 return workerNode.workerUsage.tasks.executing === 0
e0ae6100
JB
406 }) === -1
407 )
cb70b19d
JB
408 }
409
afc003b2 410 /** @inheritDoc */
a86b6df1 411 public async execute (data?: Data, name?: string): Promise<Response> {
b6b32453 412 const timestamp = performance.now()
20dcad1a 413 const workerNodeKey = this.chooseWorkerNode()
adc3c320 414 const submittedTask: Task<Data> = {
a86b6df1 415 name,
e5a5c0fc
JB
416 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
417 data: data ?? ({} as Data),
b6b32453 418 timestamp,
adc3c320
JB
419 id: crypto.randomUUID()
420 }
2e81254d 421 const res = new Promise<Response>((resolve, reject) => {
02706357 422 this.promiseResponseMap.set(submittedTask.id as string, {
2e81254d
JB
423 resolve,
424 reject,
20dcad1a 425 worker: this.workerNodes[workerNodeKey].worker
2e81254d
JB
426 })
427 })
ff733df7
JB
428 if (
429 this.opts.enableTasksQueue === true &&
7171d33f 430 (this.busy ||
a4e07f72 431 this.workerNodes[workerNodeKey].workerUsage.tasks.executing >=
7171d33f 432 ((this.opts.tasksQueueOptions as TasksQueueOptions)
3528c992 433 .concurrency as number))
ff733df7 434 ) {
26a929d7
JB
435 this.enqueueTask(workerNodeKey, submittedTask)
436 } else {
2e81254d 437 this.executeTask(workerNodeKey, submittedTask)
adc3c320 438 }
ff733df7 439 this.checkAndEmitEvents()
78cea37e 440 // eslint-disable-next-line @typescript-eslint/return-await
280c2a77
S
441 return res
442 }
c97c7edb 443
afc003b2 444 /** @inheritDoc */
c97c7edb 445 public async destroy (): Promise<void> {
1fbcaa7c 446 await Promise.all(
875a7c37
JB
447 this.workerNodes.map(async (workerNode, workerNodeKey) => {
448 this.flushTasksQueue(workerNodeKey)
47aacbaa 449 // FIXME: wait for tasks to be finished
f06e48d8 450 await this.destroyWorker(workerNode.worker)
1fbcaa7c
JB
451 })
452 )
c97c7edb
S
453 }
454
4a6952ff 455 /**
6c6afb84 456 * Terminates the given worker.
4a6952ff 457 *
f06e48d8 458 * @param worker - A worker within `workerNodes`.
4a6952ff
JB
459 */
460 protected abstract destroyWorker (worker: Worker): void | Promise<void>
c97c7edb 461
729c563d 462 /**
2e81254d 463 * Setup hook to execute code before worker node are created in the abstract constructor.
d99ba5a8 464 * Can be overridden
afc003b2
JB
465 *
466 * @virtual
729c563d 467 */
280c2a77 468 protected setupHook (): void {
d99ba5a8 469 // Intentionally empty
280c2a77 470 }
c97c7edb 471
729c563d 472 /**
280c2a77
S
473 * Should return whether the worker is the main worker or not.
474 */
475 protected abstract isMain (): boolean
476
477 /**
2e81254d 478 * Hook executed before the worker task execution.
bf9549ae 479 * Can be overridden.
729c563d 480 *
f06e48d8 481 * @param workerNodeKey - The worker node key.
1c6fe997 482 * @param task - The task to execute.
729c563d 483 */
1c6fe997
JB
484 protected beforeTaskExecutionHook (
485 workerNodeKey: number,
486 task: Task<Data>
487 ): void {
488 const workerUsage = this.workerNodes[workerNodeKey].workerUsage
489 ++workerUsage.tasks.executing
490 this.updateWaitTimeWorkerUsage(workerUsage, task)
c97c7edb
S
491 }
492
c01733f1 493 /**
2e81254d 494 * Hook executed after the worker task execution.
bf9549ae 495 * Can be overridden.
c01733f1 496 *
c923ce56 497 * @param worker - The worker.
38e795c1 498 * @param message - The received message.
c01733f1 499 */
2e81254d 500 protected afterTaskExecutionHook (
c923ce56 501 worker: Worker,
2740a743 502 message: MessageValue<Response>
bf9549ae 503 ): void {
a4e07f72
JB
504 const workerUsage =
505 this.workerNodes[this.getWorkerNodeKey(worker)].workerUsage
f1c06930
JB
506 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
507 this.updateRunTimeWorkerUsage(workerUsage, message)
508 this.updateEluWorkerUsage(workerUsage, message)
509 }
510
511 private updateTaskStatisticsWorkerUsage (
512 workerUsage: WorkerUsage,
513 message: MessageValue<Response>
514 ): void {
a4e07f72
JB
515 const workerTaskStatistics = workerUsage.tasks
516 --workerTaskStatistics.executing
517 ++workerTaskStatistics.executed
82f36766 518 if (message.taskError != null) {
a4e07f72 519 ++workerTaskStatistics.failed
2740a743 520 }
f8eb0a2a
JB
521 }
522
a4e07f72
JB
523 private updateRunTimeWorkerUsage (
524 workerUsage: WorkerUsage,
f8eb0a2a
JB
525 message: MessageValue<Response>
526 ): void {
87de9ff5
JB
527 if (
528 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
932fc8be 529 .aggregate
87de9ff5 530 ) {
932fc8be 531 workerUsage.runTime.aggregate += message.taskPerformance?.runTime ?? 0
c6bd2650 532 if (
932fc8be
JB
533 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
534 .average &&
a4e07f72 535 workerUsage.tasks.executed !== 0
c6bd2650 536 ) {
a4e07f72 537 workerUsage.runTime.average =
f1c06930
JB
538 workerUsage.runTime.aggregate /
539 (workerUsage.tasks.executed - workerUsage.tasks.failed)
3032893a 540 }
3fa4cdd2 541 if (
932fc8be
JB
542 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime
543 .median &&
d715b7bc 544 message.taskPerformance?.runTime != null
3fa4cdd2 545 ) {
a4e07f72
JB
546 workerUsage.runTime.history.push(message.taskPerformance.runTime)
547 workerUsage.runTime.median = median(workerUsage.runTime.history)
78099a15 548 }
3032893a 549 }
f8eb0a2a
JB
550 }
551
a4e07f72
JB
552 private updateWaitTimeWorkerUsage (
553 workerUsage: WorkerUsage,
1c6fe997 554 task: Task<Data>
f8eb0a2a 555 ): void {
1c6fe997
JB
556 const timestamp = performance.now()
557 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
87de9ff5
JB
558 if (
559 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime
932fc8be 560 .aggregate
87de9ff5 561 ) {
932fc8be 562 workerUsage.waitTime.aggregate += taskWaitTime ?? 0
09a6305f 563 if (
87de9ff5 564 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 565 .waitTime.average &&
a4e07f72 566 workerUsage.tasks.executed !== 0
09a6305f 567 ) {
a4e07f72 568 workerUsage.waitTime.average =
f1c06930
JB
569 workerUsage.waitTime.aggregate /
570 (workerUsage.tasks.executed - workerUsage.tasks.failed)
09a6305f
JB
571 }
572 if (
87de9ff5 573 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 574 .waitTime.median &&
1c6fe997 575 taskWaitTime != null
09a6305f 576 ) {
1c6fe997 577 workerUsage.waitTime.history.push(taskWaitTime)
a4e07f72 578 workerUsage.waitTime.median = median(workerUsage.waitTime.history)
09a6305f 579 }
0567595a 580 }
c01733f1 581 }
582
a4e07f72 583 private updateEluWorkerUsage (
5df69fab 584 workerUsage: WorkerUsage,
62c15a68
JB
585 message: MessageValue<Response>
586 ): void {
5df69fab
JB
587 if (
588 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
589 .aggregate
590 ) {
591 if (workerUsage.elu != null && message.taskPerformance?.elu != null) {
9adcefab
JB
592 workerUsage.elu.idle.aggregate += message.taskPerformance.elu.idle
593 workerUsage.elu.active.aggregate += message.taskPerformance.elu.active
5df69fab
JB
594 workerUsage.elu.utilization =
595 (workerUsage.elu.utilization +
596 message.taskPerformance.elu.utilization) /
597 2
598 } else if (message.taskPerformance?.elu != null) {
599 workerUsage.elu.idle.aggregate = message.taskPerformance.elu.idle
600 workerUsage.elu.active.aggregate = message.taskPerformance.elu.active
601 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
602 }
d715b7bc 603 if (
5df69fab
JB
604 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
605 .average &&
606 workerUsage.tasks.executed !== 0
607 ) {
f1c06930
JB
608 const executedTasks =
609 workerUsage.tasks.executed - workerUsage.tasks.failed
5df69fab 610 workerUsage.elu.idle.average =
f1c06930 611 workerUsage.elu.idle.aggregate / executedTasks
5df69fab 612 workerUsage.elu.active.average =
f1c06930 613 workerUsage.elu.active.aggregate / executedTasks
5df69fab
JB
614 }
615 if (
616 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
617 .median &&
d715b7bc
JB
618 message.taskPerformance?.elu != null
619 ) {
5df69fab
JB
620 workerUsage.elu.idle.history.push(message.taskPerformance.elu.idle)
621 workerUsage.elu.active.history.push(message.taskPerformance.elu.active)
622 workerUsage.elu.idle.median = median(workerUsage.elu.idle.history)
623 workerUsage.elu.active.median = median(workerUsage.elu.active.history)
62c15a68
JB
624 }
625 }
626 }
627
280c2a77 628 /**
f06e48d8 629 * Chooses a worker node for the next task.
280c2a77 630 *
6c6afb84 631 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 632 *
20dcad1a 633 * @returns The worker node key
280c2a77 634 */
6c6afb84 635 private chooseWorkerNode (): number {
930dcf12 636 if (this.shallCreateDynamicWorker()) {
6c6afb84
JB
637 const worker = this.createAndSetupDynamicWorker()
638 if (
639 this.workerChoiceStrategyContext.getStrategyPolicy().useDynamicWorker
640 ) {
641 return this.getWorkerNodeKey(worker)
642 }
17393ac8 643 }
930dcf12
JB
644 return this.workerChoiceStrategyContext.execute()
645 }
646
6c6afb84
JB
647 /**
648 * Conditions for dynamic worker creation.
649 *
650 * @returns Whether to create a dynamic worker or not.
651 */
652 private shallCreateDynamicWorker (): boolean {
930dcf12 653 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
654 }
655
280c2a77 656 /**
675bb809 657 * Sends a message to the given worker.
280c2a77 658 *
38e795c1
JB
659 * @param worker - The worker which should receive the message.
660 * @param message - The message.
280c2a77
S
661 */
662 protected abstract sendToWorker (
663 worker: Worker,
664 message: MessageValue<Data>
665 ): void
666
4a6952ff 667 /**
f06e48d8 668 * Registers a listener callback on the given worker.
4a6952ff 669 *
38e795c1
JB
670 * @param worker - The worker which should register a listener.
671 * @param listener - The message listener callback.
4a6952ff
JB
672 */
673 protected abstract registerWorkerMessageListener<
4f7fa42a 674 Message extends Data | Response
78cea37e 675 >(worker: Worker, listener: (message: MessageValue<Message>) => void): void
c97c7edb 676
729c563d 677 /**
41344292 678 * Creates a new worker.
6c6afb84
JB
679 *
680 * @returns Newly created worker.
729c563d 681 */
280c2a77 682 protected abstract createWorker (): Worker
c97c7edb 683
729c563d 684 /**
f06e48d8 685 * Function that can be hooked up when a worker has been newly created and moved to the pool worker nodes.
729c563d 686 *
38e795c1 687 * Can be used to update the `maxListeners` or binding the `main-worker`\<-\>`worker` connection if not bind by default.
729c563d 688 *
38e795c1 689 * @param worker - The newly created worker.
729c563d 690 */
280c2a77 691 protected abstract afterWorkerSetup (worker: Worker): void
c97c7edb 692
4a6952ff 693 /**
f06e48d8 694 * Creates a new worker and sets it up completely in the pool worker nodes.
4a6952ff
JB
695 *
696 * @returns New, completely set up worker.
697 */
698 protected createAndSetupWorker (): Worker {
bdacc2d2 699 const worker = this.createWorker()
280c2a77 700
35cf1c03 701 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 702 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
1f68cede
JB
703 worker.on('error', error => {
704 if (this.emitter != null) {
705 this.emitter.emit(PoolEvents.error, error)
706 }
5baee0d7 707 if (this.opts.restartWorkerOnError === true) {
1f68cede 708 this.createAndSetupWorker()
5baee0d7
JB
709 }
710 })
a35560ba
S
711 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
712 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 713 worker.once('exit', () => {
f06e48d8 714 this.removeWorkerNode(worker)
a974afa6 715 })
280c2a77 716
f06e48d8 717 this.pushWorkerNode(worker)
280c2a77 718
b6b32453
JB
719 this.setWorkerStatistics(worker)
720
280c2a77
S
721 this.afterWorkerSetup(worker)
722
c97c7edb
S
723 return worker
724 }
be0676b3 725
930dcf12
JB
726 /**
727 * Creates a new dynamic worker and sets it up completely in the pool worker nodes.
728 *
729 * @returns New, completely set up dynamic worker.
730 */
731 protected createAndSetupDynamicWorker (): Worker {
732 const worker = this.createAndSetupWorker()
733 this.registerWorkerMessageListener(worker, message => {
e8b3a5ab 734 const workerNodeKey = this.getWorkerNodeKey(worker)
930dcf12
JB
735 if (
736 isKillBehavior(KillBehaviors.HARD, message.kill) ||
7b56f532
JB
737 (message.kill != null &&
738 ((this.opts.enableTasksQueue === false &&
e8b3a5ab
JB
739 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
740 0) ||
7b56f532 741 (this.opts.enableTasksQueue === true &&
e8b3a5ab
JB
742 this.workerNodes[workerNodeKey].workerUsage.tasks.executing ===
743 0 &&
744 this.tasksQueueSize(workerNodeKey) === 0)))
930dcf12
JB
745 ) {
746 // Kill message received from the worker: no new tasks are submitted to that worker for a while ( > maxInactiveTime)
930dcf12
JB
747 void (this.destroyWorker(worker) as Promise<void>)
748 }
749 })
750 return worker
751 }
752
be0676b3 753 /**
ff733df7 754 * This function is the listener registered for each worker message.
be0676b3 755 *
bdacc2d2 756 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
757 */
758 protected workerListener (): (message: MessageValue<Response>) => void {
4a6952ff 759 return message => {
b1989cfd 760 if (message.id != null) {
a3445496 761 // Task execution response received
2740a743 762 const promiseResponse = this.promiseResponseMap.get(message.id)
b1989cfd 763 if (promiseResponse != null) {
82f36766 764 if (message.taskError != null) {
91ee39ed 765 if (this.emitter != null) {
82f36766 766 this.emitter.emit(PoolEvents.taskError, message.taskError)
91ee39ed 767 }
cd9580e7 768 promiseResponse.reject(message.taskError.message)
a05c10de 769 } else {
2740a743 770 promiseResponse.resolve(message.data as Response)
a05c10de 771 }
2e81254d 772 this.afterTaskExecutionHook(promiseResponse.worker, message)
2740a743 773 this.promiseResponseMap.delete(message.id)
ff733df7
JB
774 const workerNodeKey = this.getWorkerNodeKey(promiseResponse.worker)
775 if (
776 this.opts.enableTasksQueue === true &&
416fd65c 777 this.tasksQueueSize(workerNodeKey) > 0
ff733df7 778 ) {
2e81254d
JB
779 this.executeTask(
780 workerNodeKey,
ff733df7
JB
781 this.dequeueTask(workerNodeKey) as Task<Data>
782 )
783 }
e5536a06 784 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3
APA
785 }
786 }
787 }
be0676b3 788 }
7c0ba920 789
ff733df7 790 private checkAndEmitEvents (): void {
1f68cede 791 if (this.emitter != null) {
ff733df7 792 if (this.busy) {
6b27d407 793 this.emitter?.emit(PoolEvents.busy, this.info)
ff733df7 794 }
6b27d407
JB
795 if (this.type === PoolTypes.dynamic && this.full) {
796 this.emitter?.emit(PoolEvents.full, this.info)
ff733df7 797 }
164d950a
JB
798 }
799 }
800
0ebe2a9f
JB
801 /**
802 * Sets the given worker node its tasks usage in the pool.
803 *
804 * @param workerNode - The worker node.
a4e07f72 805 * @param workerUsage - The worker usage.
0ebe2a9f
JB
806 */
807 private setWorkerNodeTasksUsage (
808 workerNode: WorkerNode<Worker, Data>,
a4e07f72 809 workerUsage: WorkerUsage
0ebe2a9f 810 ): void {
a4e07f72 811 workerNode.workerUsage = workerUsage
0ebe2a9f
JB
812 }
813
a05c10de 814 /**
f06e48d8 815 * Pushes the given worker in the pool worker nodes.
ea7a90d3 816 *
38e795c1 817 * @param worker - The worker.
f06e48d8 818 * @returns The worker nodes length.
ea7a90d3 819 */
f06e48d8 820 private pushWorkerNode (worker: Worker): number {
9c16fb4b 821 this.workerNodes.push({
ffcbbad8 822 worker,
9c16fb4b 823 workerUsage: this.getWorkerUsage(),
29ee7e9a 824 tasksQueue: new Queue<Task<Data>>()
ea7a90d3 825 })
9c16fb4b
JB
826 const workerNodeKey = this.getWorkerNodeKey(worker)
827 this.setWorkerNodeTasksUsage(
828 this.workerNodes[workerNodeKey],
829 this.getWorkerUsage(workerNodeKey)
830 )
831 return this.workerNodes.length
ea7a90d3 832 }
c923ce56 833
8604aaab
JB
834 // /**
835 // * Sets the given worker in the pool worker nodes.
836 // *
837 // * @param workerNodeKey - The worker node key.
838 // * @param worker - The worker.
839 // * @param workerUsage - The worker usage.
840 // * @param tasksQueue - The worker task queue.
841 // */
842 // private setWorkerNode (
843 // workerNodeKey: number,
844 // worker: Worker,
845 // workerUsage: WorkerUsage,
846 // tasksQueue: Queue<Task<Data>>
847 // ): void {
848 // this.workerNodes[workerNodeKey] = {
849 // worker,
850 // workerUsage,
851 // tasksQueue
852 // }
853 // }
51fe3d3c
JB
854
855 /**
f06e48d8 856 * Removes the given worker from the pool worker nodes.
51fe3d3c 857 *
f06e48d8 858 * @param worker - The worker.
51fe3d3c 859 */
416fd65c 860 private removeWorkerNode (worker: Worker): void {
f06e48d8 861 const workerNodeKey = this.getWorkerNodeKey(worker)
1f68cede
JB
862 if (workerNodeKey !== -1) {
863 this.workerNodes.splice(workerNodeKey, 1)
864 this.workerChoiceStrategyContext.remove(workerNodeKey)
865 }
51fe3d3c 866 }
adc3c320 867
2e81254d 868 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 869 this.beforeTaskExecutionHook(workerNodeKey, task)
2e81254d
JB
870 this.sendToWorker(this.workerNodes[workerNodeKey].worker, task)
871 }
872
f9f00b5f 873 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
29ee7e9a 874 return this.workerNodes[workerNodeKey].tasksQueue.enqueue(task)
adc3c320
JB
875 }
876
416fd65c 877 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
29ee7e9a 878 return this.workerNodes[workerNodeKey].tasksQueue.dequeue()
adc3c320
JB
879 }
880
416fd65c 881 private tasksQueueSize (workerNodeKey: number): number {
4d8bf9e4 882 return this.workerNodes[workerNodeKey].tasksQueue.size
adc3c320 883 }
ff733df7 884
df593701
JB
885 private tasksMaxQueueSize (workerNodeKey: number): number {
886 return this.workerNodes[workerNodeKey].tasksQueue.maxSize
887 }
888
416fd65c
JB
889 private flushTasksQueue (workerNodeKey: number): void {
890 if (this.tasksQueueSize(workerNodeKey) > 0) {
29ee7e9a
JB
891 for (let i = 0; i < this.tasksQueueSize(workerNodeKey); i++) {
892 this.executeTask(
893 workerNodeKey,
894 this.dequeueTask(workerNodeKey) as Task<Data>
895 )
ff733df7 896 }
ff733df7 897 }
df593701 898 this.workerNodes[workerNodeKey].tasksQueue.clear()
ff733df7
JB
899 }
900
ef41a6e6
JB
901 private flushTasksQueues (): void {
902 for (const [workerNodeKey] of this.workerNodes.entries()) {
903 this.flushTasksQueue(workerNodeKey)
904 }
905 }
b6b32453
JB
906
907 private setWorkerStatistics (worker: Worker): void {
908 this.sendToWorker(worker, {
909 statistics: {
87de9ff5
JB
910 runTime:
911 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
932fc8be 912 .runTime.aggregate,
87de9ff5 913 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
5df69fab 914 .elu.aggregate
b6b32453
JB
915 }
916 })
917 }
8604aaab 918
9c16fb4b 919 private getWorkerUsage (workerNodeKey?: number): WorkerUsage {
e3347a5c
JB
920 const getTasksQueueSize = (workerNodeKey?: number): number => {
921 return workerNodeKey != null ? this.tasksQueueSize(workerNodeKey) : 0
9c16fb4b 922 }
df593701
JB
923 const getTasksMaxQueueSize = (workerNodeKey?: number): number => {
924 return workerNodeKey != null ? this.tasksMaxQueueSize(workerNodeKey) : 0
925 }
8604aaab 926 return {
9c16fb4b
JB
927 tasks: {
928 executed: 0,
929 executing: 0,
930 get queued (): number {
e3347a5c 931 return getTasksQueueSize(workerNodeKey)
9c16fb4b 932 },
df593701
JB
933 get maxQueued (): number {
934 return getTasksMaxQueueSize(workerNodeKey)
935 },
9c16fb4b
JB
936 failed: 0
937 },
8604aaab 938 runTime: {
932fc8be 939 aggregate: 0,
8604aaab
JB
940 average: 0,
941 median: 0,
942 history: new CircularArray()
943 },
944 waitTime: {
932fc8be 945 aggregate: 0,
8604aaab
JB
946 average: 0,
947 median: 0,
948 history: new CircularArray()
949 },
5df69fab
JB
950 elu: {
951 idle: {
952 aggregate: 0,
953 average: 0,
954 median: 0,
955 history: new CircularArray()
956 },
957 active: {
958 aggregate: 0,
959 average: 0,
960 median: 0,
961 history: new CircularArray()
962 },
963 utilization: 0
964 }
8604aaab
JB
965 }
966 }
c97c7edb 967}