perf: stop continuous task stealing at task executing
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
3d6dd312 3import { existsSync } from 'node:fs'
7d91a8cd 4import { type TransferListItem } from 'node:worker_threads'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
ff3f866a
JB
8 Task,
9 Writable
5c4d16da 10} from '../utility-types'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16
JB
13 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
14 EMPTY_FUNCTION,
dc021bcc 15 average,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
afe0d5bf 18 median,
e4f20deb
JB
19 round,
20 updateMeasurementStatistics
bbeadd16 21} from '../utils'
59317253 22import { KillBehaviors } from '../worker/worker-options'
c4855468 23import {
65d7a1c9 24 type IPool,
7c5a1080 25 PoolEmitter,
c4855468 26 PoolEvents,
6b27d407 27 type PoolInfo,
c4855468 28 type PoolOptions,
6b27d407
JB
29 type PoolType,
30 PoolTypes,
4b628b48 31 type TasksQueueOptions
c4855468 32} from './pool'
bbfa38a2
JB
33import type {
34 IWorker,
35 IWorkerNode,
36 WorkerInfo,
37 WorkerType,
38 WorkerUsage
e102732c 39} from './worker'
a35560ba 40import {
008512c7 41 type MeasurementStatisticsRequirements,
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
bdaf31cd
JB
46} from './selection-strategies/selection-strategies-types'
47import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 48import { version } from './version'
4b628b48 49import { WorkerNode } from './worker-node'
23ccf9d7 50
729c563d 51/**
ea7a90d3 52 * Base class that implements some shared logic for all poolifier pools.
729c563d 53 *
38e795c1 54 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
55 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
56 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 57 */
c97c7edb 58export abstract class AbstractPool<
f06e48d8 59 Worker extends IWorker,
d3c8a1a8
S
60 Data = unknown,
61 Response = unknown
c4855468 62> implements IPool<Worker, Data, Response> {
afc003b2 63 /** @inheritDoc */
4b628b48 64 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 65
afc003b2 66 /** @inheritDoc */
7c0ba920
JB
67 public readonly emitter?: PoolEmitter
68
be0676b3 69 /**
52b71763 70 * The task execution response promise map.
be0676b3 71 *
2740a743 72 * - `key`: The message id of each submitted task.
a3445496 73 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 74 *
a3445496 75 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 76 */
501aea93
JB
77 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
78 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 79
a35560ba 80 /**
51fe3d3c 81 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
82 */
83 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
84 Worker,
85 Data,
86 Response
a35560ba
S
87 >
88
8735b4e5
JB
89 /**
90 * Dynamic pool maximum size property placeholder.
91 */
92 protected readonly max?: number
93
075e51d1 94 /**
adc9cc64 95 * Whether the pool is starting or not.
075e51d1
JB
96 */
97 private readonly starting: boolean
15b176e0
JB
98 /**
99 * Whether the pool is started or not.
100 */
101 private started: boolean
afe0d5bf
JB
102 /**
103 * The start timestamp of the pool.
104 */
105 private readonly startTimestamp
106
729c563d
S
107 /**
108 * Constructs a new poolifier pool.
109 *
38e795c1 110 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 111 * @param filePath - Path to the worker file.
38e795c1 112 * @param opts - Options for the pool.
729c563d 113 */
c97c7edb 114 public constructor (
b4213b7f
JB
115 protected readonly numberOfWorkers: number,
116 protected readonly filePath: string,
117 protected readonly opts: PoolOptions<Worker>
c97c7edb 118 ) {
78cea37e 119 if (!this.isMain()) {
04f45163 120 throw new Error(
8c6d4acf 121 'Cannot start a pool from a worker with the same type as the pool'
04f45163 122 )
c97c7edb 123 }
8d3782fa 124 this.checkNumberOfWorkers(this.numberOfWorkers)
c510fea7 125 this.checkFilePath(this.filePath)
7c0ba920 126 this.checkPoolOptions(this.opts)
1086026a 127
7254e419
JB
128 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
129 this.executeTask = this.executeTask.bind(this)
130 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 131
6bd72cd0 132 if (this.opts.enableEvents === true) {
7c0ba920
JB
133 this.emitter = new PoolEmitter()
134 }
d59df138
JB
135 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
136 Worker,
137 Data,
138 Response
da309861
JB
139 >(
140 this,
141 this.opts.workerChoiceStrategy,
142 this.opts.workerChoiceStrategyOptions
143 )
b6b32453
JB
144
145 this.setupHook()
146
075e51d1 147 this.starting = true
e761c033 148 this.startPool()
075e51d1 149 this.starting = false
15b176e0 150 this.started = true
afe0d5bf
JB
151
152 this.startTimestamp = performance.now()
c97c7edb
S
153 }
154
a35560ba 155 private checkFilePath (filePath: string): void {
ffcbbad8
JB
156 if (
157 filePath == null ||
3d6dd312 158 typeof filePath !== 'string' ||
ffcbbad8
JB
159 (typeof filePath === 'string' && filePath.trim().length === 0)
160 ) {
c510fea7
APA
161 throw new Error('Please specify a file with a worker implementation')
162 }
3d6dd312
JB
163 if (!existsSync(filePath)) {
164 throw new Error(`Cannot find the worker file '${filePath}'`)
165 }
c510fea7
APA
166 }
167
8d3782fa
JB
168 private checkNumberOfWorkers (numberOfWorkers: number): void {
169 if (numberOfWorkers == null) {
170 throw new Error(
171 'Cannot instantiate a pool without specifying the number of workers'
172 )
78cea37e 173 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 174 throw new TypeError(
0d80593b 175 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
176 )
177 } else if (numberOfWorkers < 0) {
473c717a 178 throw new RangeError(
8d3782fa
JB
179 'Cannot instantiate a pool with a negative number of workers'
180 )
6b27d407 181 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
182 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
183 }
184 }
185
186 protected checkDynamicPoolSize (min: number, max: number): void {
079de991 187 if (this.type === PoolTypes.dynamic) {
a5ed75b7 188 if (max == null) {
e695d66f 189 throw new TypeError(
a5ed75b7
JB
190 'Cannot instantiate a dynamic pool without specifying the maximum pool size'
191 )
192 } else if (!Number.isSafeInteger(max)) {
2761efb4
JB
193 throw new TypeError(
194 'Cannot instantiate a dynamic pool with a non safe integer maximum pool size'
195 )
196 } else if (min > max) {
079de991
JB
197 throw new RangeError(
198 'Cannot instantiate a dynamic pool with a maximum pool size inferior to the minimum pool size'
199 )
b97d82d8 200 } else if (max === 0) {
079de991 201 throw new RangeError(
d640b48b 202 'Cannot instantiate a dynamic pool with a maximum pool size equal to zero'
079de991
JB
203 )
204 } else if (min === max) {
205 throw new RangeError(
206 'Cannot instantiate a dynamic pool with a minimum pool size equal to the maximum pool size. Use a fixed pool instead'
207 )
208 }
8d3782fa
JB
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b
JB
213 if (isPlainObject(opts)) {
214 this.opts.workerChoiceStrategy =
215 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
216 this.checkValidWorkerChoiceStrategy(this.opts.workerChoiceStrategy)
8990357d
JB
217 this.opts.workerChoiceStrategyOptions = {
218 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
219 ...opts.workerChoiceStrategyOptions
220 }
49be33fe
JB
221 this.checkValidWorkerChoiceStrategyOptions(
222 this.opts.workerChoiceStrategyOptions
223 )
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
228 this.checkValidTasksQueueOptions(
229 opts.tasksQueueOptions as TasksQueueOptions
230 )
231 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
232 opts.tasksQueueOptions as TasksQueueOptions
233 )
234 }
235 } else {
236 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 237 }
aee46736
JB
238 }
239
240 private checkValidWorkerChoiceStrategy (
241 workerChoiceStrategy: WorkerChoiceStrategy
242 ): void {
243 if (!Object.values(WorkerChoiceStrategies).includes(workerChoiceStrategy)) {
b529c323 244 throw new Error(
aee46736 245 `Invalid worker choice strategy '${workerChoiceStrategy}'`
b529c323
JB
246 )
247 }
7c0ba920
JB
248 }
249
0d80593b
JB
250 private checkValidWorkerChoiceStrategyOptions (
251 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
252 ): void {
253 if (!isPlainObject(workerChoiceStrategyOptions)) {
254 throw new TypeError(
255 'Invalid worker choice strategy options: must be a plain object'
256 )
257 }
8990357d
JB
258 if (
259 workerChoiceStrategyOptions.choiceRetries != null &&
260 !Number.isSafeInteger(workerChoiceStrategyOptions.choiceRetries)
261 ) {
262 throw new TypeError(
263 'Invalid worker choice strategy options: choice retries must be an integer'
264 )
265 }
266 if (
267 workerChoiceStrategyOptions.choiceRetries != null &&
268 workerChoiceStrategyOptions.choiceRetries <= 0
269 ) {
270 throw new RangeError(
271 `Invalid worker choice strategy options: choice retries '${workerChoiceStrategyOptions.choiceRetries}' must be greater than zero`
272 )
273 }
49be33fe
JB
274 if (
275 workerChoiceStrategyOptions.weights != null &&
6b27d407 276 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
277 ) {
278 throw new Error(
279 'Invalid worker choice strategy options: must have a weight for each worker node'
280 )
281 }
f0d7f803
JB
282 if (
283 workerChoiceStrategyOptions.measurement != null &&
284 !Object.values(Measurements).includes(
285 workerChoiceStrategyOptions.measurement
286 )
287 ) {
288 throw new Error(
289 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
290 )
291 }
0d80593b
JB
292 }
293
a20f0ba5 294 private checkValidTasksQueueOptions (
ff3f866a 295 tasksQueueOptions: Writable<TasksQueueOptions>
a20f0ba5 296 ): void {
0d80593b
JB
297 if (tasksQueueOptions != null && !isPlainObject(tasksQueueOptions)) {
298 throw new TypeError('Invalid tasks queue options: must be a plain object')
299 }
f0d7f803
JB
300 if (
301 tasksQueueOptions?.concurrency != null &&
302 !Number.isSafeInteger(tasksQueueOptions.concurrency)
303 ) {
304 throw new TypeError(
20c6f652 305 'Invalid worker node tasks concurrency: must be an integer'
f0d7f803
JB
306 )
307 }
308 if (
309 tasksQueueOptions?.concurrency != null &&
310 tasksQueueOptions.concurrency <= 0
311 ) {
e695d66f 312 throw new RangeError(
20c6f652
JB
313 `Invalid worker node tasks concurrency: ${tasksQueueOptions.concurrency} is a negative integer or zero`
314 )
315 }
316 if (
317 tasksQueueOptions?.queueMaxSize != null &&
ff3f866a 318 tasksQueueOptions?.size != null
20c6f652 319 ) {
ff3f866a
JB
320 throw new Error(
321 'Invalid tasks queue options: cannot specify both queueMaxSize and size'
20c6f652
JB
322 )
323 }
ff3f866a
JB
324 if (tasksQueueOptions?.queueMaxSize != null) {
325 tasksQueueOptions.size = tasksQueueOptions.queueMaxSize
326 }
20c6f652 327 if (
ff3f866a
JB
328 tasksQueueOptions?.size != null &&
329 !Number.isSafeInteger(tasksQueueOptions.size)
20c6f652 330 ) {
ff3f866a
JB
331 throw new TypeError(
332 'Invalid worker node tasks queue max size: must be an integer'
333 )
334 }
335 if (tasksQueueOptions?.size != null && tasksQueueOptions.size <= 0) {
20c6f652 336 throw new RangeError(
ff3f866a 337 `Invalid worker node tasks queue max size: ${tasksQueueOptions.size} is a negative integer or zero`
a20f0ba5
JB
338 )
339 }
340 }
341
e761c033
JB
342 private startPool (): void {
343 while (
344 this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
347 0
348 ) < this.numberOfWorkers
349 ) {
aa9eede8 350 this.createAndSetupWorkerNode()
e761c033
JB
351 }
352 }
353
08f3f44c 354 /** @inheritDoc */
6b27d407
JB
355 public get info (): PoolInfo {
356 return {
23ccf9d7 357 version,
6b27d407 358 type: this.type,
184855e6 359 worker: this.worker,
2431bdb4
JB
360 ready: this.ready,
361 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
362 minSize: this.minSize,
363 maxSize: this.maxSize,
c05f0d50
JB
364 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
365 .runTime.aggregate &&
1305e9a8
JB
366 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
367 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
368 workerNodes: this.workerNodes.length,
369 idleWorkerNodes: this.workerNodes.reduce(
370 (accumulator, workerNode) =>
f59e1027 371 workerNode.usage.tasks.executing === 0
a4e07f72
JB
372 ? accumulator + 1
373 : accumulator,
6b27d407
JB
374 0
375 ),
376 busyWorkerNodes: this.workerNodes.reduce(
377 (accumulator, workerNode) =>
f59e1027 378 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
379 0
380 ),
a4e07f72 381 executedTasks: this.workerNodes.reduce(
6b27d407 382 (accumulator, workerNode) =>
f59e1027 383 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
384 0
385 ),
386 executingTasks: this.workerNodes.reduce(
387 (accumulator, workerNode) =>
f59e1027 388 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
389 0
390 ),
daf86646
JB
391 ...(this.opts.enableTasksQueue === true && {
392 queuedTasks: this.workerNodes.reduce(
393 (accumulator, workerNode) =>
394 accumulator + workerNode.usage.tasks.queued,
395 0
396 )
397 }),
398 ...(this.opts.enableTasksQueue === true && {
399 maxQueuedTasks: this.workerNodes.reduce(
400 (accumulator, workerNode) =>
401 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
402 0
403 )
404 }),
a1763c54
JB
405 ...(this.opts.enableTasksQueue === true && {
406 backPressure: this.hasBackPressure()
407 }),
68cbdc84
JB
408 ...(this.opts.enableTasksQueue === true && {
409 stolenTasks: this.workerNodes.reduce(
410 (accumulator, workerNode) =>
411 accumulator + workerNode.usage.tasks.stolen,
412 0
413 )
414 }),
a4e07f72
JB
415 failedTasks: this.workerNodes.reduce(
416 (accumulator, workerNode) =>
f59e1027 417 accumulator + workerNode.usage.tasks.failed,
a4e07f72 418 0
1dcf8b7b
JB
419 ),
420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
421 .runTime.aggregate && {
422 runTime: {
98e72cda
JB
423 minimum: round(
424 Math.min(
425 ...this.workerNodes.map(
8ebe6c30 426 (workerNode) => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 427 )
1dcf8b7b
JB
428 )
429 ),
98e72cda
JB
430 maximum: round(
431 Math.max(
432 ...this.workerNodes.map(
8ebe6c30 433 (workerNode) => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 434 )
1dcf8b7b 435 )
98e72cda 436 ),
3baa0837
JB
437 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
438 .runTime.average && {
439 average: round(
440 average(
441 this.workerNodes.reduce<number[]>(
442 (accumulator, workerNode) =>
443 accumulator.concat(workerNode.usage.runTime.history),
444 []
445 )
98e72cda 446 )
dc021bcc 447 )
3baa0837 448 }),
98e72cda
JB
449 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
450 .runTime.median && {
451 median: round(
452 median(
3baa0837
JB
453 this.workerNodes.reduce<number[]>(
454 (accumulator, workerNode) =>
455 accumulator.concat(workerNode.usage.runTime.history),
456 []
98e72cda
JB
457 )
458 )
459 )
460 })
1dcf8b7b
JB
461 }
462 }),
463 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
464 .waitTime.aggregate && {
465 waitTime: {
98e72cda
JB
466 minimum: round(
467 Math.min(
468 ...this.workerNodes.map(
8ebe6c30 469 (workerNode) => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 470 )
1dcf8b7b
JB
471 )
472 ),
98e72cda
JB
473 maximum: round(
474 Math.max(
475 ...this.workerNodes.map(
8ebe6c30 476 (workerNode) => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 477 )
1dcf8b7b 478 )
98e72cda 479 ),
3baa0837
JB
480 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
481 .waitTime.average && {
482 average: round(
483 average(
484 this.workerNodes.reduce<number[]>(
485 (accumulator, workerNode) =>
486 accumulator.concat(workerNode.usage.waitTime.history),
487 []
488 )
98e72cda 489 )
dc021bcc 490 )
3baa0837 491 }),
98e72cda
JB
492 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
493 .waitTime.median && {
494 median: round(
495 median(
3baa0837
JB
496 this.workerNodes.reduce<number[]>(
497 (accumulator, workerNode) =>
498 accumulator.concat(workerNode.usage.waitTime.history),
499 []
98e72cda
JB
500 )
501 )
502 )
503 })
1dcf8b7b
JB
504 }
505 })
6b27d407
JB
506 }
507 }
08f3f44c 508
aa9eede8
JB
509 /**
510 * The pool readiness boolean status.
511 */
2431bdb4
JB
512 private get ready (): boolean {
513 return (
b97d82d8
JB
514 this.workerNodes.reduce(
515 (accumulator, workerNode) =>
516 !workerNode.info.dynamic && workerNode.info.ready
517 ? accumulator + 1
518 : accumulator,
519 0
520 ) >= this.minSize
2431bdb4
JB
521 )
522 }
523
afe0d5bf 524 /**
aa9eede8 525 * The approximate pool utilization.
afe0d5bf
JB
526 *
527 * @returns The pool utilization.
528 */
529 private get utilization (): number {
8e5ca040 530 const poolTimeCapacity =
fe7d90db 531 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
532 const totalTasksRunTime = this.workerNodes.reduce(
533 (accumulator, workerNode) =>
71514351 534 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
535 0
536 )
537 const totalTasksWaitTime = this.workerNodes.reduce(
538 (accumulator, workerNode) =>
71514351 539 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
540 0
541 )
8e5ca040 542 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
543 }
544
8881ae32 545 /**
aa9eede8 546 * The pool type.
8881ae32
JB
547 *
548 * If it is `'dynamic'`, it provides the `max` property.
549 */
550 protected abstract get type (): PoolType
551
184855e6 552 /**
aa9eede8 553 * The worker type.
184855e6
JB
554 */
555 protected abstract get worker (): WorkerType
556
c2ade475 557 /**
aa9eede8 558 * The pool minimum size.
c2ade475 559 */
8735b4e5
JB
560 protected get minSize (): number {
561 return this.numberOfWorkers
562 }
ff733df7
JB
563
564 /**
aa9eede8 565 * The pool maximum size.
ff733df7 566 */
8735b4e5
JB
567 protected get maxSize (): number {
568 return this.max ?? this.numberOfWorkers
569 }
a35560ba 570
6b813701
JB
571 /**
572 * Checks if the worker id sent in the received message from a worker is valid.
573 *
574 * @param message - The received message.
575 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
576 */
21f710aa 577 private checkMessageWorkerId (message: MessageValue<Response>): void {
310de0aa
JB
578 if (message.workerId == null) {
579 throw new Error('Worker message received without worker id')
580 } else if (
21f710aa 581 message.workerId != null &&
aad6fb64 582 this.getWorkerNodeKeyByWorkerId(message.workerId) === -1
21f710aa
JB
583 ) {
584 throw new Error(
585 `Worker message received from unknown worker '${message.workerId}'`
586 )
587 }
588 }
589
ffcbbad8 590 /**
f06e48d8 591 * Gets the given worker its worker node key.
ffcbbad8
JB
592 *
593 * @param worker - The worker.
f59e1027 594 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 595 */
aad6fb64 596 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 597 return this.workerNodes.findIndex(
8ebe6c30 598 (workerNode) => workerNode.worker === worker
f06e48d8 599 )
bf9549ae
JB
600 }
601
aa9eede8
JB
602 /**
603 * Gets the worker node key given its worker id.
604 *
605 * @param workerId - The worker id.
aad6fb64 606 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 607 */
aad6fb64
JB
608 private getWorkerNodeKeyByWorkerId (workerId: number): number {
609 return this.workerNodes.findIndex(
8ebe6c30 610 (workerNode) => workerNode.info.id === workerId
aad6fb64 611 )
aa9eede8
JB
612 }
613
afc003b2 614 /** @inheritDoc */
a35560ba 615 public setWorkerChoiceStrategy (
59219cbb
JB
616 workerChoiceStrategy: WorkerChoiceStrategy,
617 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 618 ): void {
aee46736 619 this.checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 620 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
621 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
622 this.opts.workerChoiceStrategy
623 )
624 if (workerChoiceStrategyOptions != null) {
625 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
626 }
aa9eede8 627 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 628 workerNode.resetUsage()
9edb9717 629 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 630 }
a20f0ba5
JB
631 }
632
633 /** @inheritDoc */
634 public setWorkerChoiceStrategyOptions (
635 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
636 ): void {
0d80593b 637 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
638 this.opts.workerChoiceStrategyOptions = {
639 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
640 ...workerChoiceStrategyOptions
641 }
a20f0ba5
JB
642 this.workerChoiceStrategyContext.setOptions(
643 this.opts.workerChoiceStrategyOptions
a35560ba
S
644 )
645 }
646
a20f0ba5 647 /** @inheritDoc */
8f52842f
JB
648 public enableTasksQueue (
649 enable: boolean,
650 tasksQueueOptions?: TasksQueueOptions
651 ): void {
a20f0ba5 652 if (this.opts.enableTasksQueue === true && !enable) {
ef41a6e6 653 this.flushTasksQueues()
a20f0ba5
JB
654 }
655 this.opts.enableTasksQueue = enable
8f52842f 656 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
657 }
658
659 /** @inheritDoc */
8f52842f 660 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 661 if (this.opts.enableTasksQueue === true) {
8f52842f
JB
662 this.checkValidTasksQueueOptions(tasksQueueOptions)
663 this.opts.tasksQueueOptions =
664 this.buildTasksQueueOptions(tasksQueueOptions)
ff3f866a 665 this.setTasksQueueMaxSize(this.opts.tasksQueueOptions.size as number)
5baee0d7 666 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
667 delete this.opts.tasksQueueOptions
668 }
669 }
670
ff3f866a 671 private setTasksQueueMaxSize (size: number): void {
20c6f652 672 for (const workerNode of this.workerNodes) {
ff3f866a 673 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
674 }
675 }
676
a20f0ba5
JB
677 private buildTasksQueueOptions (
678 tasksQueueOptions: TasksQueueOptions
679 ): TasksQueueOptions {
680 return {
20c6f652 681 ...{
ff3f866a 682 size: Math.pow(this.maxSize, 2),
20c6f652
JB
683 concurrency: 1
684 },
685 ...tasksQueueOptions
a20f0ba5
JB
686 }
687 }
688
c319c66b
JB
689 /**
690 * Whether the pool is full or not.
691 *
692 * The pool filling boolean status.
693 */
dea903a8
JB
694 protected get full (): boolean {
695 return this.workerNodes.length >= this.maxSize
696 }
c2ade475 697
c319c66b
JB
698 /**
699 * Whether the pool is busy or not.
700 *
701 * The pool busyness boolean status.
702 */
703 protected abstract get busy (): boolean
7c0ba920 704
6c6afb84 705 /**
3d76750a 706 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
707 *
708 * @returns Worker nodes busyness boolean status.
709 */
c2ade475 710 protected internalBusy (): boolean {
3d76750a
JB
711 if (this.opts.enableTasksQueue === true) {
712 return (
713 this.workerNodes.findIndex(
8ebe6c30 714 (workerNode) =>
3d76750a
JB
715 workerNode.info.ready &&
716 workerNode.usage.tasks.executing <
717 (this.opts.tasksQueueOptions?.concurrency as number)
718 ) === -1
719 )
720 } else {
721 return (
722 this.workerNodes.findIndex(
8ebe6c30 723 (workerNode) =>
3d76750a
JB
724 workerNode.info.ready && workerNode.usage.tasks.executing === 0
725 ) === -1
726 )
727 }
cb70b19d
JB
728 }
729
90d7d101
JB
730 /** @inheritDoc */
731 public listTaskFunctions (): string[] {
f2dbbf95
JB
732 for (const workerNode of this.workerNodes) {
733 if (
734 Array.isArray(workerNode.info.taskFunctions) &&
735 workerNode.info.taskFunctions.length > 0
736 ) {
737 return workerNode.info.taskFunctions
738 }
90d7d101 739 }
f2dbbf95 740 return []
90d7d101
JB
741 }
742
afc003b2 743 /** @inheritDoc */
7d91a8cd
JB
744 public async execute (
745 data?: Data,
746 name?: string,
747 transferList?: TransferListItem[]
748 ): Promise<Response> {
52b71763 749 return await new Promise<Response>((resolve, reject) => {
15b176e0
JB
750 if (!this.started) {
751 reject(new Error('Cannot execute a task on destroyed pool'))
752 }
7d91a8cd
JB
753 if (name != null && typeof name !== 'string') {
754 reject(new TypeError('name argument must be a string'))
755 }
90d7d101
JB
756 if (
757 name != null &&
758 typeof name === 'string' &&
759 name.trim().length === 0
760 ) {
f58b60b9 761 reject(new TypeError('name argument must not be an empty string'))
90d7d101 762 }
b558f6b5
JB
763 if (transferList != null && !Array.isArray(transferList)) {
764 reject(new TypeError('transferList argument must be an array'))
765 }
766 const timestamp = performance.now()
767 const workerNodeKey = this.chooseWorkerNode()
94407def 768 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
cea399c8
JB
769 if (
770 name != null &&
a5d15204
JB
771 Array.isArray(workerInfo.taskFunctions) &&
772 !workerInfo.taskFunctions.includes(name)
cea399c8 773 ) {
90d7d101
JB
774 reject(
775 new Error(`Task function '${name}' is not registered in the pool`)
776 )
777 }
501aea93 778 const task: Task<Data> = {
52b71763
JB
779 name: name ?? DEFAULT_TASK_NAME,
780 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
781 data: data ?? ({} as Data),
7d91a8cd 782 transferList,
52b71763 783 timestamp,
a5d15204 784 workerId: workerInfo.id as number,
7629bdf1 785 taskId: randomUUID()
52b71763 786 }
7629bdf1 787 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
788 resolve,
789 reject,
501aea93 790 workerNodeKey
2e81254d 791 })
52b71763 792 if (
4e377863
JB
793 this.opts.enableTasksQueue === false ||
794 (this.opts.enableTasksQueue === true &&
033f1776 795 this.tasksQueueSize(workerNodeKey) === 0 &&
4e377863 796 this.workerNodes[workerNodeKey].usage.tasks.executing <
b5e113f6 797 (this.opts.tasksQueueOptions?.concurrency as number))
52b71763 798 ) {
501aea93 799 this.executeTask(workerNodeKey, task)
4e377863
JB
800 } else {
801 this.enqueueTask(workerNodeKey, task)
52b71763 802 }
2e81254d 803 })
280c2a77 804 }
c97c7edb 805
afc003b2 806 /** @inheritDoc */
c97c7edb 807 public async destroy (): Promise<void> {
1fbcaa7c 808 await Promise.all(
81c02522 809 this.workerNodes.map(async (_, workerNodeKey) => {
aa9eede8 810 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
811 })
812 )
33e6bb4c 813 this.emitter?.emit(PoolEvents.destroy, this.info)
15b176e0 814 this.started = false
c97c7edb
S
815 }
816
1e3214b6
JB
817 protected async sendKillMessageToWorker (
818 workerNodeKey: number,
819 workerId: number
820 ): Promise<void> {
9edb9717 821 await new Promise<void>((resolve, reject) => {
1e3214b6
JB
822 this.registerWorkerMessageListener(workerNodeKey, (message) => {
823 if (message.kill === 'success') {
824 resolve()
825 } else if (message.kill === 'failure') {
e1af34e6 826 reject(new Error(`Worker ${workerId} kill message handling failed`))
1e3214b6
JB
827 }
828 })
9edb9717 829 this.sendToWorker(workerNodeKey, { kill: true, workerId })
1e3214b6 830 })
1e3214b6
JB
831 }
832
4a6952ff 833 /**
aa9eede8 834 * Terminates the worker node given its worker node key.
4a6952ff 835 *
aa9eede8 836 * @param workerNodeKey - The worker node key.
4a6952ff 837 */
81c02522 838 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 839
729c563d 840 /**
6677a3d3
JB
841 * Setup hook to execute code before worker nodes are created in the abstract constructor.
842 * Can be overridden.
afc003b2
JB
843 *
844 * @virtual
729c563d 845 */
280c2a77 846 protected setupHook (): void {
033f1776 847 /** Intentionally empty */
280c2a77 848 }
c97c7edb 849
729c563d 850 /**
280c2a77
S
851 * Should return whether the worker is the main worker or not.
852 */
853 protected abstract isMain (): boolean
854
855 /**
2e81254d 856 * Hook executed before the worker task execution.
bf9549ae 857 * Can be overridden.
729c563d 858 *
f06e48d8 859 * @param workerNodeKey - The worker node key.
1c6fe997 860 * @param task - The task to execute.
729c563d 861 */
1c6fe997
JB
862 protected beforeTaskExecutionHook (
863 workerNodeKey: number,
864 task: Task<Data>
865 ): void {
94407def
JB
866 if (this.workerNodes[workerNodeKey]?.usage != null) {
867 const workerUsage = this.workerNodes[workerNodeKey].usage
868 ++workerUsage.tasks.executing
869 this.updateWaitTimeWorkerUsage(workerUsage, task)
870 }
871 if (
872 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
873 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
874 task.name as string
875 ) != null
876 ) {
db0e38ee 877 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 878 workerNodeKey
db0e38ee 879 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
880 ++taskFunctionWorkerUsage.tasks.executing
881 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 882 }
c97c7edb
S
883 }
884
c01733f1 885 /**
2e81254d 886 * Hook executed after the worker task execution.
bf9549ae 887 * Can be overridden.
c01733f1 888 *
501aea93 889 * @param workerNodeKey - The worker node key.
38e795c1 890 * @param message - The received message.
c01733f1 891 */
2e81254d 892 protected afterTaskExecutionHook (
501aea93 893 workerNodeKey: number,
2740a743 894 message: MessageValue<Response>
bf9549ae 895 ): void {
94407def
JB
896 if (this.workerNodes[workerNodeKey]?.usage != null) {
897 const workerUsage = this.workerNodes[workerNodeKey].usage
898 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
899 this.updateRunTimeWorkerUsage(workerUsage, message)
900 this.updateEluWorkerUsage(workerUsage, message)
901 }
902 if (
903 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
904 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 905 message.taskPerformance?.name as string
94407def
JB
906 ) != null
907 ) {
db0e38ee 908 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 909 workerNodeKey
db0e38ee 910 ].getTaskFunctionWorkerUsage(
0628755c 911 message.taskPerformance?.name as string
b558f6b5 912 ) as WorkerUsage
db0e38ee
JB
913 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
914 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
915 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
916 }
917 }
918
db0e38ee
JB
919 /**
920 * Whether the worker node shall update its task function worker usage or not.
921 *
922 * @param workerNodeKey - The worker node key.
923 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
924 */
925 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 926 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 927 return (
94407def 928 workerInfo != null &&
a5d15204 929 Array.isArray(workerInfo.taskFunctions) &&
db0e38ee 930 workerInfo.taskFunctions.length > 2
b558f6b5 931 )
f1c06930
JB
932 }
933
934 private updateTaskStatisticsWorkerUsage (
935 workerUsage: WorkerUsage,
936 message: MessageValue<Response>
937 ): void {
a4e07f72 938 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
939 if (
940 workerTaskStatistics.executing != null &&
941 workerTaskStatistics.executing > 0
942 ) {
943 --workerTaskStatistics.executing
5bb5be17 944 }
98e72cda
JB
945 if (message.taskError == null) {
946 ++workerTaskStatistics.executed
947 } else {
a4e07f72 948 ++workerTaskStatistics.failed
2740a743 949 }
f8eb0a2a
JB
950 }
951
a4e07f72
JB
952 private updateRunTimeWorkerUsage (
953 workerUsage: WorkerUsage,
f8eb0a2a
JB
954 message: MessageValue<Response>
955 ): void {
dc021bcc
JB
956 if (message.taskError != null) {
957 return
958 }
e4f20deb
JB
959 updateMeasurementStatistics(
960 workerUsage.runTime,
961 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 962 message.taskPerformance?.runTime ?? 0
e4f20deb 963 )
f8eb0a2a
JB
964 }
965
a4e07f72
JB
966 private updateWaitTimeWorkerUsage (
967 workerUsage: WorkerUsage,
1c6fe997 968 task: Task<Data>
f8eb0a2a 969 ): void {
1c6fe997
JB
970 const timestamp = performance.now()
971 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
972 updateMeasurementStatistics(
973 workerUsage.waitTime,
974 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 975 taskWaitTime
e4f20deb 976 )
c01733f1 977 }
978
a4e07f72 979 private updateEluWorkerUsage (
5df69fab 980 workerUsage: WorkerUsage,
62c15a68
JB
981 message: MessageValue<Response>
982 ): void {
dc021bcc
JB
983 if (message.taskError != null) {
984 return
985 }
008512c7
JB
986 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
987 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
988 updateMeasurementStatistics(
989 workerUsage.elu.active,
008512c7 990 eluTaskStatisticsRequirements,
dc021bcc 991 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
992 )
993 updateMeasurementStatistics(
994 workerUsage.elu.idle,
008512c7 995 eluTaskStatisticsRequirements,
dc021bcc 996 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 997 )
008512c7 998 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 999 if (message.taskPerformance?.elu != null) {
f7510105
JB
1000 if (workerUsage.elu.utilization != null) {
1001 workerUsage.elu.utilization =
1002 (workerUsage.elu.utilization +
1003 message.taskPerformance.elu.utilization) /
1004 2
1005 } else {
1006 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1007 }
62c15a68
JB
1008 }
1009 }
1010 }
1011
280c2a77 1012 /**
f06e48d8 1013 * Chooses a worker node for the next task.
280c2a77 1014 *
6c6afb84 1015 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1016 *
aa9eede8 1017 * @returns The chosen worker node key
280c2a77 1018 */
6c6afb84 1019 private chooseWorkerNode (): number {
930dcf12 1020 if (this.shallCreateDynamicWorker()) {
aa9eede8 1021 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1022 if (
b1aae695 1023 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1024 ) {
aa9eede8 1025 return workerNodeKey
6c6afb84 1026 }
17393ac8 1027 }
930dcf12
JB
1028 return this.workerChoiceStrategyContext.execute()
1029 }
1030
6c6afb84
JB
1031 /**
1032 * Conditions for dynamic worker creation.
1033 *
1034 * @returns Whether to create a dynamic worker or not.
1035 */
1036 private shallCreateDynamicWorker (): boolean {
930dcf12 1037 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1038 }
1039
280c2a77 1040 /**
aa9eede8 1041 * Sends a message to worker given its worker node key.
280c2a77 1042 *
aa9eede8 1043 * @param workerNodeKey - The worker node key.
38e795c1 1044 * @param message - The message.
7d91a8cd 1045 * @param transferList - The optional array of transferable objects.
280c2a77
S
1046 */
1047 protected abstract sendToWorker (
aa9eede8 1048 workerNodeKey: number,
7d91a8cd
JB
1049 message: MessageValue<Data>,
1050 transferList?: TransferListItem[]
280c2a77
S
1051 ): void
1052
729c563d 1053 /**
41344292 1054 * Creates a new worker.
6c6afb84
JB
1055 *
1056 * @returns Newly created worker.
729c563d 1057 */
280c2a77 1058 protected abstract createWorker (): Worker
c97c7edb 1059
4a6952ff 1060 /**
aa9eede8 1061 * Creates a new, completely set up worker node.
4a6952ff 1062 *
aa9eede8 1063 * @returns New, completely set up worker node key.
4a6952ff 1064 */
aa9eede8 1065 protected createAndSetupWorkerNode (): number {
bdacc2d2 1066 const worker = this.createWorker()
280c2a77 1067
fd04474e 1068 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1069 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1070 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
8ebe6c30 1071 worker.on('error', (error) => {
aad6fb64 1072 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
94407def 1073 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
9b106837 1074 workerInfo.ready = false
0dc838e3 1075 this.workerNodes[workerNodeKey].closeChannel()
2a69b8c5 1076 this.emitter?.emit(PoolEvents.error, error)
15b176e0
JB
1077 if (
1078 this.opts.restartWorkerOnError === true &&
1079 !this.starting &&
1080 this.started
1081 ) {
9b106837 1082 if (workerInfo.dynamic) {
aa9eede8 1083 this.createAndSetupDynamicWorkerNode()
8a1260a3 1084 } else {
aa9eede8 1085 this.createAndSetupWorkerNode()
8a1260a3 1086 }
5baee0d7 1087 }
19dbc45b 1088 if (this.opts.enableTasksQueue === true) {
9b106837 1089 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1090 }
5baee0d7 1091 })
a35560ba 1092 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1093 worker.once('exit', () => {
f06e48d8 1094 this.removeWorkerNode(worker)
a974afa6 1095 })
280c2a77 1096
aa9eede8 1097 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1098
aa9eede8 1099 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1100
aa9eede8 1101 return workerNodeKey
c97c7edb 1102 }
be0676b3 1103
930dcf12 1104 /**
aa9eede8 1105 * Creates a new, completely set up dynamic worker node.
930dcf12 1106 *
aa9eede8 1107 * @returns New, completely set up dynamic worker node key.
930dcf12 1108 */
aa9eede8
JB
1109 protected createAndSetupDynamicWorkerNode (): number {
1110 const workerNodeKey = this.createAndSetupWorkerNode()
8ebe6c30 1111 this.registerWorkerMessageListener(workerNodeKey, (message) => {
aa9eede8
JB
1112 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1113 message.workerId
aad6fb64 1114 )
aa9eede8 1115 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1116 // Kill message received from worker
930dcf12
JB
1117 if (
1118 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1119 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1120 ((this.opts.enableTasksQueue === false &&
aa9eede8 1121 workerUsage.tasks.executing === 0) ||
7b56f532 1122 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1123 workerUsage.tasks.executing === 0 &&
1124 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1125 ) {
5270d253
JB
1126 this.destroyWorkerNode(localWorkerNodeKey).catch((error) => {
1127 this.emitter?.emit(PoolEvents.error, error)
1128 })
930dcf12
JB
1129 }
1130 })
94407def 1131 const workerInfo = this.getWorkerInfo(workerNodeKey) as WorkerInfo
aa9eede8 1132 this.sendToWorker(workerNodeKey, {
b0a4db63 1133 checkActive: true,
21f710aa
JB
1134 workerId: workerInfo.id as number
1135 })
b5e113f6 1136 workerInfo.dynamic = true
b1aae695
JB
1137 if (
1138 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1139 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1140 ) {
b5e113f6
JB
1141 workerInfo.ready = true
1142 }
33e6bb4c 1143 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1144 return workerNodeKey
930dcf12
JB
1145 }
1146
a2ed5053 1147 /**
aa9eede8 1148 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1149 *
aa9eede8 1150 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1151 * @param listener - The message listener callback.
1152 */
85aeb3f3
JB
1153 protected abstract registerWorkerMessageListener<
1154 Message extends Data | Response
aa9eede8
JB
1155 >(
1156 workerNodeKey: number,
1157 listener: (message: MessageValue<Message>) => void
1158 ): void
a2ed5053
JB
1159
1160 /**
aa9eede8 1161 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1162 * Can be overridden.
1163 *
aa9eede8 1164 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1165 */
aa9eede8 1166 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1167 // Listen to worker messages.
aa9eede8 1168 this.registerWorkerMessageListener(workerNodeKey, this.workerListener())
85aeb3f3 1169 // Send the startup message to worker.
aa9eede8 1170 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1171 // Send the statistics message to worker.
1172 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1173 if (this.opts.enableTasksQueue === true) {
a6b3272b
JB
1174 this.workerNodes[workerNodeKey].onEmptyQueue =
1175 this.taskStealingOnEmptyQueue.bind(this)
72695f86
JB
1176 this.workerNodes[workerNodeKey].onBackPressure =
1177 this.tasksStealingOnBackPressure.bind(this)
1178 }
d2c73f82
JB
1179 }
1180
85aeb3f3 1181 /**
aa9eede8
JB
1182 * Sends the startup message to worker given its worker node key.
1183 *
1184 * @param workerNodeKey - The worker node key.
1185 */
1186 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1187
1188 /**
9edb9717 1189 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1190 *
aa9eede8 1191 * @param workerNodeKey - The worker node key.
85aeb3f3 1192 */
9edb9717 1193 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1194 this.sendToWorker(workerNodeKey, {
1195 statistics: {
1196 runTime:
1197 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1198 .runTime.aggregate,
1199 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1200 .elu.aggregate
1201 },
94407def 1202 workerId: (this.getWorkerInfo(workerNodeKey) as WorkerInfo).id as number
aa9eede8
JB
1203 })
1204 }
a2ed5053
JB
1205
1206 private redistributeQueuedTasks (workerNodeKey: number): void {
1207 while (this.tasksQueueSize(workerNodeKey) > 0) {
0bc68267 1208 let destinationWorkerNodeKey!: number
a2ed5053 1209 let minQueuedTasks = Infinity
a6b3272b 1210 for (const [workerNodeId, workerNode] of this.workerNodes.entries()) {
58baffd3 1211 if (workerNode.info.ready && workerNodeId !== workerNodeKey) {
58baffd3
JB
1212 if (workerNode.usage.tasks.queued === 0) {
1213 destinationWorkerNodeKey = workerNodeId
1214 break
1215 }
1216 if (workerNode.usage.tasks.queued < minQueuedTasks) {
1217 minQueuedTasks = workerNode.usage.tasks.queued
1218 destinationWorkerNodeKey = workerNodeId
1219 }
a2ed5053
JB
1220 }
1221 }
0bc68267 1222 if (destinationWorkerNodeKey != null) {
033f1776 1223 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
0bc68267
JB
1224 const task = {
1225 ...(this.dequeueTask(workerNodeKey) as Task<Data>),
033f1776 1226 workerId: destinationWorkerNode.info.id as number
0bc68267 1227 }
033f1776
JB
1228 if (
1229 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
1230 destinationWorkerNode.usage.tasks.executing <
1231 (this.opts.tasksQueueOptions?.concurrency as number)
1232 ) {
0bc68267
JB
1233 this.executeTask(destinationWorkerNodeKey, task)
1234 } else {
1235 this.enqueueTask(destinationWorkerNodeKey, task)
1236 }
dd951876
JB
1237 }
1238 }
1239 }
1240
1241 private taskStealingOnEmptyQueue (workerId: number): void {
a6b3272b
JB
1242 const destinationWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1243 const destinationWorkerNode = this.workerNodes[destinationWorkerNodeKey]
dd951876 1244 const workerNodes = this.workerNodes
a6b3272b 1245 .slice()
dd951876
JB
1246 .sort(
1247 (workerNodeA, workerNodeB) =>
1248 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1249 )
dd951876 1250 for (const sourceWorkerNode of workerNodes) {
0bc68267
JB
1251 if (sourceWorkerNode.usage.tasks.queued === 0) {
1252 break
1253 }
a6b3272b
JB
1254 if (
1255 sourceWorkerNode.info.ready &&
1256 sourceWorkerNode.info.id !== workerId &&
1257 sourceWorkerNode.usage.tasks.queued > 0
1258 ) {
1259 const task = {
1260 ...(sourceWorkerNode.popTask() as Task<Data>),
1261 workerId: destinationWorkerNode.info.id as number
1262 }
dd951876 1263 if (
2cb4ff45 1264 this.tasksQueueSize(destinationWorkerNodeKey) === 0 &&
a6b3272b 1265 destinationWorkerNode.usage.tasks.executing <
033f1776 1266 (this.opts.tasksQueueOptions?.concurrency as number)
dd951876 1267 ) {
2cb4ff45
JB
1268 this.executeTask(destinationWorkerNodeKey, task)
1269 } else {
1270 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876 1271 }
b2559a74
JB
1272 if (destinationWorkerNode?.usage != null) {
1273 ++destinationWorkerNode.usage.tasks.stolen
1274 }
1275 if (
1276 this.shallUpdateTaskFunctionWorkerUsage(destinationWorkerNodeKey) &&
1277 destinationWorkerNode.getTaskFunctionWorkerUsage(
1278 task.name as string
1279 ) != null
1280 ) {
68cbdc84
JB
1281 const taskFunctionWorkerUsage =
1282 destinationWorkerNode.getTaskFunctionWorkerUsage(
1283 task.name as string
1284 ) as WorkerUsage
1285 ++taskFunctionWorkerUsage.tasks.stolen
1286 }
dd951876 1287 break
72695f86
JB
1288 }
1289 }
1290 }
1291
1292 private tasksStealingOnBackPressure (workerId: number): void {
1293 const sourceWorkerNode =
1294 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1295 const workerNodes = this.workerNodes
a6b3272b 1296 .slice()
72695f86
JB
1297 .sort(
1298 (workerNodeA, workerNodeB) =>
1299 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1300 )
1301 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1302 if (
0bc68267 1303 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b
JB
1304 workerNode.info.ready &&
1305 workerNode.info.id !== workerId &&
0bc68267
JB
1306 workerNode.usage.tasks.queued <
1307 (this.opts.tasksQueueOptions?.size as number) - 1
72695f86 1308 ) {
dd951876
JB
1309 const task = {
1310 ...(sourceWorkerNode.popTask() as Task<Data>),
1311 workerId: workerNode.info.id as number
1312 }
4de3d785 1313 if (
033f1776 1314 this.tasksQueueSize(workerNodeKey) === 0 &&
4de3d785 1315 workerNode.usage.tasks.executing <
033f1776 1316 (this.opts.tasksQueueOptions?.concurrency as number)
4de3d785 1317 ) {
dd951876 1318 this.executeTask(workerNodeKey, task)
4de3d785 1319 } else {
dd951876 1320 this.enqueueTask(workerNodeKey, task)
4de3d785 1321 }
b2559a74
JB
1322 if (workerNode?.usage != null) {
1323 ++workerNode.usage.tasks.stolen
1324 }
1325 if (
1326 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1327 workerNode.getTaskFunctionWorkerUsage(task.name as string) != null
1328 ) {
68cbdc84
JB
1329 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1330 task.name as string
1331 ) as WorkerUsage
1332 ++taskFunctionWorkerUsage.tasks.stolen
1333 }
10ecf8fd 1334 }
a2ed5053
JB
1335 }
1336 }
1337
be0676b3 1338 /**
aa9eede8 1339 * This method is the listener registered for each worker message.
be0676b3 1340 *
bdacc2d2 1341 * @returns The listener function to execute when a message is received from a worker.
be0676b3
APA
1342 */
1343 protected workerListener (): (message: MessageValue<Response>) => void {
8ebe6c30 1344 return (message) => {
21f710aa 1345 this.checkMessageWorkerId(message)
a5d15204 1346 if (message.ready != null && message.taskFunctions != null) {
81c02522 1347 // Worker ready response received from worker
10e2aa7e 1348 this.handleWorkerReadyResponse(message)
7629bdf1 1349 } else if (message.taskId != null) {
81c02522 1350 // Task execution response received from worker
6b272951 1351 this.handleTaskExecutionResponse(message)
90d7d101
JB
1352 } else if (message.taskFunctions != null) {
1353 // Task functions message received from worker
94407def
JB
1354 (
1355 this.getWorkerInfo(
1356 this.getWorkerNodeKeyByWorkerId(message.workerId)
1357 ) as WorkerInfo
b558f6b5 1358 ).taskFunctions = message.taskFunctions
6b272951
JB
1359 }
1360 }
1361 }
1362
10e2aa7e 1363 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
f05ed93c
JB
1364 if (message.ready === false) {
1365 throw new Error(`Worker ${message.workerId} failed to initialize`)
1366 }
a5d15204 1367 const workerInfo = this.getWorkerInfo(
aad6fb64 1368 this.getWorkerNodeKeyByWorkerId(message.workerId)
94407def 1369 ) as WorkerInfo
a5d15204
JB
1370 workerInfo.ready = message.ready as boolean
1371 workerInfo.taskFunctions = message.taskFunctions
2431bdb4
JB
1372 if (this.emitter != null && this.ready) {
1373 this.emitter.emit(PoolEvents.ready, this.info)
1374 }
6b272951
JB
1375 }
1376
1377 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
5441aea6
JB
1378 const { taskId, taskError, data } = message
1379 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1380 if (promiseResponse != null) {
5441aea6
JB
1381 if (taskError != null) {
1382 this.emitter?.emit(PoolEvents.taskError, taskError)
1383 promiseResponse.reject(taskError.message)
6b272951 1384 } else {
5441aea6 1385 promiseResponse.resolve(data as Response)
6b272951 1386 }
501aea93
JB
1387 const workerNodeKey = promiseResponse.workerNodeKey
1388 this.afterTaskExecutionHook(workerNodeKey, message)
5441aea6 1389 this.promiseResponseMap.delete(taskId as string)
6b272951
JB
1390 if (
1391 this.opts.enableTasksQueue === true &&
b5e113f6
JB
1392 this.tasksQueueSize(workerNodeKey) > 0 &&
1393 this.workerNodes[workerNodeKey].usage.tasks.executing <
1394 (this.opts.tasksQueueOptions?.concurrency as number)
6b272951
JB
1395 ) {
1396 this.executeTask(
1397 workerNodeKey,
1398 this.dequeueTask(workerNodeKey) as Task<Data>
1399 )
be0676b3 1400 }
6b272951 1401 this.workerChoiceStrategyContext.update(workerNodeKey)
be0676b3 1402 }
be0676b3 1403 }
7c0ba920 1404
a1763c54 1405 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1406 if (this.busy) {
1407 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1408 }
1409 }
1410
1411 private checkAndEmitTaskQueuingEvents (): void {
1412 if (this.hasBackPressure()) {
1413 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1414 }
1415 }
1416
33e6bb4c
JB
1417 private checkAndEmitDynamicWorkerCreationEvents (): void {
1418 if (this.type === PoolTypes.dynamic) {
1419 if (this.full) {
1420 this.emitter?.emit(PoolEvents.full, this.info)
1421 }
1422 }
1423 }
1424
8a1260a3 1425 /**
aa9eede8 1426 * Gets the worker information given its worker node key.
8a1260a3
JB
1427 *
1428 * @param workerNodeKey - The worker node key.
3f09ed9f 1429 * @returns The worker information.
8a1260a3 1430 */
94407def
JB
1431 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1432 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1433 }
1434
a05c10de 1435 /**
b0a4db63 1436 * Adds the given worker in the pool worker nodes.
ea7a90d3 1437 *
38e795c1 1438 * @param worker - The worker.
aa9eede8
JB
1439 * @returns The added worker node key.
1440 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1441 */
b0a4db63 1442 private addWorkerNode (worker: Worker): number {
671d5154
JB
1443 const workerNode = new WorkerNode<Worker, Data>(
1444 worker,
1445 this.worker,
ff3f866a 1446 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1447 )
b97d82d8 1448 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1449 if (this.starting) {
1450 workerNode.info.ready = true
1451 }
aa9eede8 1452 this.workerNodes.push(workerNode)
aad6fb64 1453 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1454 if (workerNodeKey === -1) {
a5d15204 1455 throw new Error('Worker node added not found')
aa9eede8
JB
1456 }
1457 return workerNodeKey
ea7a90d3 1458 }
c923ce56 1459
51fe3d3c 1460 /**
f06e48d8 1461 * Removes the given worker from the pool worker nodes.
51fe3d3c 1462 *
f06e48d8 1463 * @param worker - The worker.
51fe3d3c 1464 */
416fd65c 1465 private removeWorkerNode (worker: Worker): void {
aad6fb64 1466 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1467 if (workerNodeKey !== -1) {
1468 this.workerNodes.splice(workerNodeKey, 1)
1469 this.workerChoiceStrategyContext.remove(workerNodeKey)
1470 }
51fe3d3c 1471 }
adc3c320 1472
e2b31e32
JB
1473 /** @inheritDoc */
1474 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1475 return (
e2b31e32
JB
1476 this.opts.enableTasksQueue === true &&
1477 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1478 )
1479 }
1480
1481 private hasBackPressure (): boolean {
1482 return (
1483 this.opts.enableTasksQueue === true &&
1484 this.workerNodes.findIndex(
1485 (workerNode) => !workerNode.hasBackPressure()
a1763c54 1486 ) === -1
9e844245 1487 )
e2b31e32
JB
1488 }
1489
b0a4db63 1490 /**
aa9eede8 1491 * Executes the given task on the worker given its worker node key.
b0a4db63 1492 *
aa9eede8 1493 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1494 * @param task - The task to execute.
1495 */
2e81254d 1496 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1497 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1498 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1499 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1500 }
1501
f9f00b5f 1502 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1503 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1504 this.checkAndEmitTaskQueuingEvents()
1505 return tasksQueueSize
adc3c320
JB
1506 }
1507
416fd65c 1508 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1509 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1510 }
1511
416fd65c 1512 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1513 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1514 }
1515
81c02522 1516 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1517 while (this.tasksQueueSize(workerNodeKey) > 0) {
1518 this.executeTask(
1519 workerNodeKey,
1520 this.dequeueTask(workerNodeKey) as Task<Data>
1521 )
ff733df7 1522 }
4b628b48 1523 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1524 }
1525
ef41a6e6
JB
1526 private flushTasksQueues (): void {
1527 for (const [workerNodeKey] of this.workerNodes.entries()) {
1528 this.flushTasksQueue(workerNodeKey)
1529 }
1530 }
c97c7edb 1531}