docs: refine README.md
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
ded253e2 1import { AsyncResource } from 'node:async_hooks'
2845f2a5 2import { randomUUID } from 'node:crypto'
ded253e2 3import { EventEmitterAsyncResource } from 'node:events'
62c15a68 4import { performance } from 'node:perf_hooks'
ef083f7b 5import type { TransferListItem } from 'node:worker_threads'
ded253e2 6
5c4d16da
JB
7import type {
8 MessageValue,
9 PromiseResponseWrapper,
76d91ea0 10 Task
d35e5717 11} from '../utility-types.js'
bbeadd16 12import {
ded253e2 13 average,
ff128cc9 14 DEFAULT_TASK_NAME,
bbeadd16 15 EMPTY_FUNCTION,
463226a4 16 exponentialDelay,
59317253 17 isKillBehavior,
0d80593b 18 isPlainObject,
90d6701c 19 max,
afe0d5bf 20 median,
90d6701c 21 min,
463226a4
JB
22 round,
23 sleep
d35e5717 24} from '../utils.js'
d35e5717 25import type { TaskFunction } from '../worker/task-functions.js'
ded253e2 26import { KillBehaviors } from '../worker/worker-options.js'
c4855468 27import {
65d7a1c9 28 type IPool,
c4855468 29 PoolEvents,
6b27d407 30 type PoolInfo,
c4855468 31 type PoolOptions,
6b27d407
JB
32 type PoolType,
33 PoolTypes,
4b628b48 34 type TasksQueueOptions
d35e5717 35} from './pool.js'
a35560ba 36import {
f0d7f803 37 Measurements,
a35560ba 38 WorkerChoiceStrategies,
a20f0ba5
JB
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
d35e5717
JB
41} from './selection-strategies/selection-strategies-types.js'
42import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
bde6b5d7
JB
43import {
44 checkFilePath,
45 checkValidTasksQueueOptions,
bfc75cca 46 checkValidWorkerChoiceStrategy,
32b141fd 47 getDefaultTasksQueueOptions,
c329fd41
JB
48 updateEluWorkerUsage,
49 updateRunTimeWorkerUsage,
50 updateTaskStatisticsWorkerUsage,
51 updateWaitTimeWorkerUsage,
87347ea8 52 waitWorkerNodeEvents
d35e5717 53} from './utils.js'
ded253e2
JB
54import { version } from './version.js'
55import type {
56 IWorker,
57 IWorkerNode,
58 WorkerInfo,
59 WorkerNodeEventDetail,
60 WorkerType
61} from './worker.js'
62import { WorkerNode } from './worker-node.js'
23ccf9d7 63
729c563d 64/**
ea7a90d3 65 * Base class that implements some shared logic for all poolifier pools.
729c563d 66 *
38e795c1 67 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 70 */
c97c7edb 71export abstract class AbstractPool<
f06e48d8 72 Worker extends IWorker,
d3c8a1a8
S
73 Data = unknown,
74 Response = unknown
c4855468 75> implements IPool<Worker, Data, Response> {
afc003b2 76 /** @inheritDoc */
4b628b48 77 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 78
afc003b2 79 /** @inheritDoc */
f80125ca 80 public emitter?: EventEmitterAsyncResource
7c0ba920 81
be0676b3 82 /**
419e8121 83 * The task execution response promise map:
2740a743 84 * - `key`: The message id of each submitted task.
a3445496 85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 86 *
a3445496 87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 88 */
501aea93
JB
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 91
a35560ba 92 /**
51fe3d3c 93 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 94 */
c63a35a0 95 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
78cea37e
JB
96 Worker,
97 Data,
98 Response
a35560ba
S
99 >
100
72ae84a2
JB
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
15b176e0
JB
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
47352846
JB
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
711623b8
JB
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
b362a929
JB
120 /**
121 * Whether the minimum number of workers is starting or not.
122 */
123 private startingMinimumNumberOfWorkers: boolean
55082af9
JB
124 /**
125 * Whether the pool ready event has been emitted or not.
126 */
127 private readyEventEmitted: boolean
afe0d5bf
JB
128 /**
129 * The start timestamp of the pool.
130 */
131 private readonly startTimestamp
132
729c563d
S
133 /**
134 * Constructs a new poolifier pool.
135 *
00e1bdeb 136 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 137 * @param filePath - Path to the worker file.
38e795c1 138 * @param opts - Options for the pool.
00e1bdeb 139 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 140 */
c97c7edb 141 public constructor (
26ce26ca 142 protected readonly minimumNumberOfWorkers: number,
b4213b7f 143 protected readonly filePath: string,
26ce26ca
JB
144 protected readonly opts: PoolOptions<Worker>,
145 protected readonly maximumNumberOfWorkers?: number
c97c7edb 146 ) {
78cea37e 147 if (!this.isMain()) {
04f45163 148 throw new Error(
8c6d4acf 149 'Cannot start a pool from a worker with the same type as the pool'
04f45163 150 )
c97c7edb 151 }
26ce26ca 152 this.checkPoolType()
bde6b5d7 153 checkFilePath(this.filePath)
26ce26ca 154 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 155 this.checkPoolOptions(this.opts)
1086026a 156
7254e419
JB
157 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
158 this.executeTask = this.executeTask.bind(this)
159 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 160
6bd72cd0 161 if (this.opts.enableEvents === true) {
b5604034 162 this.initializeEventEmitter()
7c0ba920 163 }
d59df138
JB
164 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
165 Worker,
166 Data,
167 Response
da309861
JB
168 >(
169 this,
170 this.opts.workerChoiceStrategy,
171 this.opts.workerChoiceStrategyOptions
172 )
b6b32453
JB
173
174 this.setupHook()
175
72ae84a2
JB
176 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
177
47352846 178 this.started = false
075e51d1 179 this.starting = false
711623b8 180 this.destroying = false
55082af9 181 this.readyEventEmitted = false
b362a929 182 this.startingMinimumNumberOfWorkers = false
47352846
JB
183 if (this.opts.startWorkers === true) {
184 this.start()
185 }
afe0d5bf
JB
186
187 this.startTimestamp = performance.now()
c97c7edb
S
188 }
189
26ce26ca
JB
190 private checkPoolType (): void {
191 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
192 throw new Error(
193 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
194 )
195 }
196 }
197
c63a35a0
JB
198 private checkMinimumNumberOfWorkers (
199 minimumNumberOfWorkers: number | undefined
200 ): void {
af7f2788 201 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
202 throw new Error(
203 'Cannot instantiate a pool without specifying the number of workers'
204 )
af7f2788 205 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 206 throw new TypeError(
0d80593b 207 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 208 )
af7f2788 209 } else if (minimumNumberOfWorkers < 0) {
473c717a 210 throw new RangeError(
8d3782fa
JB
211 'Cannot instantiate a pool with a negative number of workers'
212 )
af7f2788 213 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
214 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
215 }
216 }
217
7c0ba920 218 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 219 if (isPlainObject(opts)) {
47352846 220 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 221 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
222 this.opts.workerChoiceStrategy =
223 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 224 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 225 opts.workerChoiceStrategyOptions
2324f8c9 226 )
26ce26ca
JB
227 if (opts.workerChoiceStrategyOptions != null) {
228 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 229 }
1f68cede 230 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
231 this.opts.enableEvents = opts.enableEvents ?? true
232 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
233 if (this.opts.enableTasksQueue) {
c63a35a0 234 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 235 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 236 opts.tasksQueueOptions
0d80593b
JB
237 )
238 }
239 } else {
240 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 241 }
aee46736
JB
242 }
243
0d80593b 244 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 245 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 246 ): void {
2324f8c9
JB
247 if (
248 workerChoiceStrategyOptions != null &&
249 !isPlainObject(workerChoiceStrategyOptions)
250 ) {
0d80593b
JB
251 throw new TypeError(
252 'Invalid worker choice strategy options: must be a plain object'
253 )
254 }
49be33fe 255 if (
2324f8c9 256 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
257 Object.keys(workerChoiceStrategyOptions.weights).length !==
258 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
259 ) {
260 throw new Error(
261 'Invalid worker choice strategy options: must have a weight for each worker node'
262 )
263 }
f0d7f803 264 if (
2324f8c9 265 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
266 !Object.values(Measurements).includes(
267 workerChoiceStrategyOptions.measurement
268 )
269 ) {
270 throw new Error(
271 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 )
273 }
0d80593b
JB
274 }
275
b5604034 276 private initializeEventEmitter (): void {
5b7d00b4 277 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
278 name: `poolifier:${this.type}-${this.worker}-pool`
279 })
280 }
281
08f3f44c 282 /** @inheritDoc */
6b27d407
JB
283 public get info (): PoolInfo {
284 return {
23ccf9d7 285 version,
6b27d407 286 type: this.type,
184855e6 287 worker: this.worker,
47352846 288 started: this.started,
2431bdb4 289 ready: this.ready,
67f3f2d6
JB
290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
291 strategy: this.opts.workerChoiceStrategy!,
0e8587d2 292 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
26ce26ca
JB
293 minSize: this.minimumNumberOfWorkers,
294 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
295 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 296 .runTime.aggregate === true &&
c63a35a0
JB
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && {
299 utilization: round(this.utilization)
300 }),
6b27d407
JB
301 workerNodes: this.workerNodes.length,
302 idleWorkerNodes: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
f59e1027 304 workerNode.usage.tasks.executing === 0
a4e07f72
JB
305 ? accumulator + 1
306 : accumulator,
6b27d407
JB
307 0
308 ),
5eb72b9e
JB
309 ...(this.opts.enableTasksQueue === true && {
310 stealingWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 workerNode.info.stealing ? accumulator + 1 : accumulator,
313 0
314 )
315 }),
6b27d407 316 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
317 (accumulator, _workerNode, workerNodeKey) =>
318 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
319 0
320 ),
a4e07f72 321 executedTasks: this.workerNodes.reduce(
6b27d407 322 (accumulator, workerNode) =>
f59e1027 323 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
324 0
325 ),
326 executingTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
f59e1027 328 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
329 0
330 ),
daf86646
JB
331 ...(this.opts.enableTasksQueue === true && {
332 queuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.queued,
335 0
336 )
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 maxQueuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
c63a35a0 341 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
342 0
343 )
344 }),
a1763c54
JB
345 ...(this.opts.enableTasksQueue === true && {
346 backPressure: this.hasBackPressure()
347 }),
68cbdc84
JB
348 ...(this.opts.enableTasksQueue === true && {
349 stolenTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 accumulator + workerNode.usage.tasks.stolen,
352 0
353 )
354 }),
a4e07f72
JB
355 failedTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
f59e1027 357 accumulator + workerNode.usage.tasks.failed,
a4e07f72 358 0
1dcf8b7b 359 ),
26ce26ca 360 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 361 .runTime.aggregate === true && {
1dcf8b7b 362 runTime: {
98e72cda 363 minimum: round(
90d6701c 364 min(
98e72cda 365 ...this.workerNodes.map(
c63a35a0 366 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 367 )
1dcf8b7b
JB
368 )
369 ),
98e72cda 370 maximum: round(
90d6701c 371 max(
98e72cda 372 ...this.workerNodes.map(
c63a35a0 373 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 374 )
1dcf8b7b 375 )
98e72cda 376 ),
c63a35a0 377 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
378 .runTime.average && {
379 average: round(
380 average(
381 this.workerNodes.reduce<number[]>(
382 (accumulator, workerNode) =>
383 accumulator.concat(workerNode.usage.runTime.history),
384 []
385 )
98e72cda 386 )
dc021bcc 387 )
3baa0837 388 }),
c63a35a0 389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
390 .runTime.median && {
391 median: round(
392 median(
3baa0837
JB
393 this.workerNodes.reduce<number[]>(
394 (accumulator, workerNode) =>
395 accumulator.concat(workerNode.usage.runTime.history),
396 []
98e72cda
JB
397 )
398 )
399 )
400 })
1dcf8b7b
JB
401 }
402 }),
26ce26ca 403 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 404 .waitTime.aggregate === true && {
1dcf8b7b 405 waitTime: {
98e72cda 406 minimum: round(
90d6701c 407 min(
98e72cda 408 ...this.workerNodes.map(
c63a35a0 409 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 410 )
1dcf8b7b
JB
411 )
412 ),
98e72cda 413 maximum: round(
90d6701c 414 max(
98e72cda 415 ...this.workerNodes.map(
c63a35a0 416 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 417 )
1dcf8b7b 418 )
98e72cda 419 ),
c63a35a0 420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
421 .waitTime.average && {
422 average: round(
423 average(
424 this.workerNodes.reduce<number[]>(
425 (accumulator, workerNode) =>
426 accumulator.concat(workerNode.usage.waitTime.history),
427 []
428 )
98e72cda 429 )
dc021bcc 430 )
3baa0837 431 }),
c63a35a0 432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
433 .waitTime.median && {
434 median: round(
435 median(
3baa0837
JB
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.waitTime.history),
439 []
98e72cda
JB
440 )
441 )
442 )
443 })
1dcf8b7b
JB
444 }
445 })
6b27d407
JB
446 }
447 }
08f3f44c 448
aa9eede8
JB
449 /**
450 * The pool readiness boolean status.
451 */
2431bdb4 452 private get ready (): boolean {
8e8d9101
JB
453 if (this.empty) {
454 return false
455 }
2431bdb4 456 return (
b97d82d8
JB
457 this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 !workerNode.info.dynamic && workerNode.info.ready
460 ? accumulator + 1
461 : accumulator,
462 0
26ce26ca 463 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
464 )
465 }
466
8e8d9101
JB
467 /**
468 * The pool emptiness boolean status.
469 */
470 protected get empty (): boolean {
fb43035c 471 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
472 }
473
afe0d5bf 474 /**
aa9eede8 475 * The approximate pool utilization.
afe0d5bf
JB
476 *
477 * @returns The pool utilization.
478 */
479 private get utilization (): number {
8e5ca040 480 const poolTimeCapacity =
26ce26ca
JB
481 (performance.now() - this.startTimestamp) *
482 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
483 const totalTasksRunTime = this.workerNodes.reduce(
484 (accumulator, workerNode) =>
c63a35a0 485 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
486 0
487 )
488 const totalTasksWaitTime = this.workerNodes.reduce(
489 (accumulator, workerNode) =>
c63a35a0 490 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
491 0
492 )
8e5ca040 493 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
494 }
495
8881ae32 496 /**
aa9eede8 497 * The pool type.
8881ae32
JB
498 *
499 * If it is `'dynamic'`, it provides the `max` property.
500 */
501 protected abstract get type (): PoolType
502
184855e6 503 /**
aa9eede8 504 * The worker type.
184855e6
JB
505 */
506 protected abstract get worker (): WorkerType
507
6b813701
JB
508 /**
509 * Checks if the worker id sent in the received message from a worker is valid.
510 *
511 * @param message - The received message.
512 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
513 */
4d159167 514 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
515 if (message.workerId == null) {
516 throw new Error('Worker message received without worker id')
8e5a7cf9 517 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
518 throw new Error(
519 `Worker message received from unknown worker '${message.workerId}'`
520 )
521 }
522 }
523
aa9eede8
JB
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
aad6fb64 528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 529 */
72ae84a2 530 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 531 return this.workerNodes.findIndex(
041dc05b 532 workerNode => workerNode.info.id === workerId
aad6fb64 533 )
aa9eede8
JB
534 }
535
afc003b2 536 /** @inheritDoc */
a35560ba 537 public setWorkerChoiceStrategy (
59219cbb
JB
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 540 ): void {
bde6b5d7 541 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 542 this.opts.workerChoiceStrategy = workerChoiceStrategy
c63a35a0 543 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
b6b32453
JB
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
aa9eede8 549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 550 workerNode.resetUsage()
9edb9717 551 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 552 }
a20f0ba5
JB
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
c63a35a0 557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
a20f0ba5 558 ): void {
0d80593b 559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
560 if (workerChoiceStrategyOptions != null) {
561 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 562 }
c63a35a0 563 this.workerChoiceStrategyContext?.setOptions(
a20f0ba5 564 this.opts.workerChoiceStrategyOptions
a35560ba
S
565 )
566 }
567
a20f0ba5 568 /** @inheritDoc */
8f52842f
JB
569 public enableTasksQueue (
570 enable: boolean,
571 tasksQueueOptions?: TasksQueueOptions
572 ): void {
a20f0ba5 573 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
ef41a6e6 576 this.flushTasksQueues()
a20f0ba5
JB
577 }
578 this.opts.enableTasksQueue = enable
c63a35a0 579 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
580 }
581
582 /** @inheritDoc */
c63a35a0
JB
583 public setTasksQueueOptions (
584 tasksQueueOptions: TasksQueueOptions | undefined
585 ): void {
a20f0ba5 586 if (this.opts.enableTasksQueue === true) {
bde6b5d7 587 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
588 this.opts.tasksQueueOptions =
589 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
591 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 592 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 593 this.unsetTaskStealing()
d6ca1416
JB
594 this.setTaskStealing()
595 } else {
596 this.unsetTaskStealing()
597 }
598 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 599 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
600 this.setTasksStealingOnBackPressure()
601 } else {
602 this.unsetTasksStealingOnBackPressure()
603 }
5baee0d7 604 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
605 delete this.opts.tasksQueueOptions
606 }
607 }
608
9b38ab2d 609 private buildTasksQueueOptions (
c63a35a0 610 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
611 ): TasksQueueOptions {
612 return {
26ce26ca
JB
613 ...getDefaultTasksQueueOptions(
614 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
615 ),
9b38ab2d
JB
616 ...tasksQueueOptions
617 }
618 }
619
5b49e864 620 private setTasksQueueSize (size: number): void {
20c6f652 621 for (const workerNode of this.workerNodes) {
ff3f866a 622 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
623 }
624 }
625
d6ca1416
JB
626 private setTaskStealing (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
e44639e9 628 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
629 }
630 }
631
632 private unsetTaskStealing (): void {
633 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 634 this.workerNodes[workerNodeKey].off(
e44639e9
JB
635 'idle',
636 this.handleWorkerNodeIdleEvent
9f95d5eb 637 )
d6ca1416
JB
638 }
639 }
640
641 private setTasksStealingOnBackPressure (): void {
642 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 643 this.workerNodes[workerNodeKey].on(
b5e75be8 644 'backPressure',
e44639e9 645 this.handleWorkerNodeBackPressureEvent
9f95d5eb 646 )
d6ca1416
JB
647 }
648 }
649
650 private unsetTasksStealingOnBackPressure (): void {
651 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 652 this.workerNodes[workerNodeKey].off(
b5e75be8 653 'backPressure',
e44639e9 654 this.handleWorkerNodeBackPressureEvent
9f95d5eb 655 )
d6ca1416
JB
656 }
657 }
658
c319c66b
JB
659 /**
660 * Whether the pool is full or not.
661 *
662 * The pool filling boolean status.
663 */
dea903a8 664 protected get full (): boolean {
26ce26ca
JB
665 return (
666 this.workerNodes.length >=
667 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
668 )
dea903a8 669 }
c2ade475 670
c319c66b
JB
671 /**
672 * Whether the pool is busy or not.
673 *
674 * The pool busyness boolean status.
675 */
676 protected abstract get busy (): boolean
7c0ba920 677
6c6afb84 678 /**
3d76750a 679 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
680 *
681 * @returns Worker nodes busyness boolean status.
682 */
c2ade475 683 protected internalBusy (): boolean {
3d76750a
JB
684 if (this.opts.enableTasksQueue === true) {
685 return (
686 this.workerNodes.findIndex(
041dc05b 687 workerNode =>
3d76750a
JB
688 workerNode.info.ready &&
689 workerNode.usage.tasks.executing <
67f3f2d6
JB
690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
691 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
692 ) === -1
693 )
3d76750a 694 }
419e8121
JB
695 return (
696 this.workerNodes.findIndex(
697 workerNode =>
698 workerNode.info.ready && workerNode.usage.tasks.executing === 0
699 ) === -1
700 )
cb70b19d
JB
701 }
702
42c677c1
JB
703 private isWorkerNodeBusy (workerNodeKey: number): boolean {
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
707 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
708 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
709 )
710 }
711 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
712 }
713
e81c38f2 714 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
715 workerNodeKey: number,
716 message: MessageValue<Data>
717 ): Promise<boolean> {
72ae84a2 718 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
719 const taskFunctionOperationListener = (
720 message: MessageValue<Response>
721 ): void => {
722 this.checkMessageWorkerId(message)
1851fed0 723 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 724 if (
ae036c3e
JB
725 message.taskFunctionOperationStatus != null &&
726 message.workerId === workerId
72ae84a2 727 ) {
ae036c3e
JB
728 if (message.taskFunctionOperationStatus) {
729 resolve(true)
c63a35a0 730 } else {
ae036c3e
JB
731 reject(
732 new Error(
c63a35a0 733 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 734 )
72ae84a2 735 )
ae036c3e
JB
736 }
737 this.deregisterWorkerMessageListener(
738 this.getWorkerNodeKeyByWorkerId(message.workerId),
739 taskFunctionOperationListener
72ae84a2
JB
740 )
741 }
ae036c3e
JB
742 }
743 this.registerWorkerMessageListener(
744 workerNodeKey,
745 taskFunctionOperationListener
746 )
72ae84a2
JB
747 this.sendToWorker(workerNodeKey, message)
748 })
749 }
750
751 private async sendTaskFunctionOperationToWorkers (
adee6053 752 message: MessageValue<Data>
e81c38f2
JB
753 ): Promise<boolean> {
754 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
755 const responsesReceived = new Array<MessageValue<Response>>()
756 const taskFunctionOperationsListener = (
757 message: MessageValue<Response>
758 ): void => {
759 this.checkMessageWorkerId(message)
760 if (message.taskFunctionOperationStatus != null) {
761 responsesReceived.push(message)
762 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 763 if (
e81c38f2
JB
764 responsesReceived.every(
765 message => message.taskFunctionOperationStatus === true
766 )
767 ) {
768 resolve(true)
769 } else if (
e81c38f2
JB
770 responsesReceived.some(
771 message => message.taskFunctionOperationStatus === false
772 )
773 ) {
b0b55f57
JB
774 const errorResponse = responsesReceived.find(
775 response => response.taskFunctionOperationStatus === false
776 )
e81c38f2
JB
777 reject(
778 new Error(
b0b55f57 779 `Task function operation '${
e81c38f2 780 message.taskFunctionOperation as string
c63a35a0
JB
781 }' failed on worker ${errorResponse?.workerId} with error: '${
782 errorResponse?.workerError?.message
b0b55f57 783 }'`
e81c38f2
JB
784 )
785 )
786 }
ae036c3e
JB
787 this.deregisterWorkerMessageListener(
788 this.getWorkerNodeKeyByWorkerId(message.workerId),
789 taskFunctionOperationsListener
790 )
e81c38f2 791 }
ae036c3e
JB
792 }
793 }
794 for (const [workerNodeKey] of this.workerNodes.entries()) {
795 this.registerWorkerMessageListener(
796 workerNodeKey,
797 taskFunctionOperationsListener
798 )
72ae84a2 799 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
800 }
801 })
6703b9f4
JB
802 }
803
804 /** @inheritDoc */
805 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
806 for (const workerNode of this.workerNodes) {
807 if (
808 Array.isArray(workerNode.info.taskFunctionNames) &&
809 workerNode.info.taskFunctionNames.includes(name)
810 ) {
811 return true
812 }
813 }
814 return false
6703b9f4
JB
815 }
816
817 /** @inheritDoc */
e81c38f2
JB
818 public async addTaskFunction (
819 name: string,
3feeab69 820 fn: TaskFunction<Data, Response>
e81c38f2 821 ): Promise<boolean> {
3feeab69
JB
822 if (typeof name !== 'string') {
823 throw new TypeError('name argument must be a string')
824 }
825 if (typeof name === 'string' && name.trim().length === 0) {
826 throw new TypeError('name argument must not be an empty string')
827 }
828 if (typeof fn !== 'function') {
829 throw new TypeError('fn argument must be a function')
830 }
adee6053 831 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
832 taskFunctionOperation: 'add',
833 taskFunctionName: name,
3feeab69 834 taskFunction: fn.toString()
6703b9f4 835 })
adee6053
JB
836 this.taskFunctions.set(name, fn)
837 return opResult
6703b9f4
JB
838 }
839
840 /** @inheritDoc */
e81c38f2 841 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
842 if (!this.taskFunctions.has(name)) {
843 throw new Error(
16248b23 844 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
845 )
846 }
adee6053 847 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
848 taskFunctionOperation: 'remove',
849 taskFunctionName: name
850 })
adee6053
JB
851 this.deleteTaskFunctionWorkerUsages(name)
852 this.taskFunctions.delete(name)
853 return opResult
6703b9f4
JB
854 }
855
90d7d101 856 /** @inheritDoc */
6703b9f4 857 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
858 for (const workerNode of this.workerNodes) {
859 if (
6703b9f4
JB
860 Array.isArray(workerNode.info.taskFunctionNames) &&
861 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 862 ) {
6703b9f4 863 return workerNode.info.taskFunctionNames
f2dbbf95 864 }
90d7d101 865 }
f2dbbf95 866 return []
90d7d101
JB
867 }
868
6703b9f4 869 /** @inheritDoc */
e81c38f2 870 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 871 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
872 taskFunctionOperation: 'default',
873 taskFunctionName: name
874 })
6703b9f4
JB
875 }
876
adee6053
JB
877 private deleteTaskFunctionWorkerUsages (name: string): void {
878 for (const workerNode of this.workerNodes) {
879 workerNode.deleteTaskFunctionWorkerUsage(name)
880 }
881 }
882
375f7504
JB
883 private shallExecuteTask (workerNodeKey: number): boolean {
884 return (
885 this.tasksQueueSize(workerNodeKey) === 0 &&
886 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
887 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
888 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
889 )
890 }
891
afc003b2 892 /** @inheritDoc */
7d91a8cd
JB
893 public async execute (
894 data?: Data,
895 name?: string,
896 transferList?: TransferListItem[]
897 ): Promise<Response> {
52b71763 898 return await new Promise<Response>((resolve, reject) => {
15b176e0 899 if (!this.started) {
47352846 900 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 901 return
15b176e0 902 }
711623b8
JB
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
7d91a8cd
JB
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
9d2d0da1 909 return
7d91a8cd 910 }
90d7d101
JB
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
f58b60b9 916 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 917 return
90d7d101 918 }
b558f6b5
JB
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 921 return
b558f6b5
JB
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
501aea93 925 const task: Task<Data> = {
52b71763
JB
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
7d91a8cd 929 transferList,
52b71763 930 timestamp,
7629bdf1 931 taskId: randomUUID()
52b71763 932 }
67f3f2d6
JB
933 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
934 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
935 resolve,
936 reject,
f18fd12b
JB
937 workerNodeKey,
938 ...(this.emitter != null && {
939 asyncResource: new AsyncResource('poolifier:task', {
940 triggerAsyncId: this.emitter.asyncId,
941 requireManualDestroy: true
942 })
943 })
2e81254d 944 })
52b71763 945 if (
4e377863
JB
946 this.opts.enableTasksQueue === false ||
947 (this.opts.enableTasksQueue === true &&
375f7504 948 this.shallExecuteTask(workerNodeKey))
52b71763 949 ) {
501aea93 950 this.executeTask(workerNodeKey, task)
4e377863
JB
951 } else {
952 this.enqueueTask(workerNodeKey, task)
52b71763 953 }
2e81254d 954 })
280c2a77 955 }
c97c7edb 956
60dc0a9c
JB
957 /**
958 * Starts the minimum number of workers.
959 */
960 private startMinimumNumberOfWorkers (): void {
b362a929 961 this.startingMinimumNumberOfWorkers = true
60dc0a9c
JB
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.minimumNumberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
b362a929 971 this.startingMinimumNumberOfWorkers = false
60dc0a9c
JB
972 }
973
47352846
JB
974 /** @inheritdoc */
975 public start (): void {
711623b8
JB
976 if (this.started) {
977 throw new Error('Cannot start an already started pool')
978 }
979 if (this.starting) {
980 throw new Error('Cannot start an already starting pool')
981 }
982 if (this.destroying) {
983 throw new Error('Cannot start a destroying pool')
984 }
47352846 985 this.starting = true
60dc0a9c 986 this.startMinimumNumberOfWorkers()
47352846
JB
987 this.starting = false
988 this.started = true
989 }
990
afc003b2 991 /** @inheritDoc */
c97c7edb 992 public async destroy (): Promise<void> {
711623b8
JB
993 if (!this.started) {
994 throw new Error('Cannot destroy an already destroyed pool')
995 }
996 if (this.starting) {
997 throw new Error('Cannot destroy an starting pool')
998 }
999 if (this.destroying) {
1000 throw new Error('Cannot destroy an already destroying pool')
1001 }
1002 this.destroying = true
1fbcaa7c 1003 await Promise.all(
14c39df1 1004 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 1005 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
1006 })
1007 )
33e6bb4c 1008 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 1009 this.emitter?.emitDestroy()
55082af9 1010 this.readyEventEmitted = false
711623b8 1011 this.destroying = false
15b176e0 1012 this.started = false
c97c7edb
S
1013 }
1014
26ce26ca 1015 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1016 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1017 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1018 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1019 resolve()
85b2561d
JB
1020 return
1021 }
ae036c3e
JB
1022 const killMessageListener = (message: MessageValue<Response>): void => {
1023 this.checkMessageWorkerId(message)
1e3214b6
JB
1024 if (message.kill === 'success') {
1025 resolve()
1026 } else if (message.kill === 'failure') {
72ae84a2
JB
1027 reject(
1028 new Error(
c63a35a0 1029 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1030 )
1031 )
1e3214b6 1032 }
ae036c3e 1033 }
51d9cfbd 1034 // FIXME: should be registered only once
ae036c3e 1035 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1036 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1037 })
1e3214b6
JB
1038 }
1039
4a6952ff 1040 /**
aa9eede8 1041 * Terminates the worker node given its worker node key.
4a6952ff 1042 *
aa9eede8 1043 * @param workerNodeKey - The worker node key.
4a6952ff 1044 */
07e0c9e5
JB
1045 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1046 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1047 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1048 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1049 await waitWorkerNodeEvents(
1050 workerNode,
1051 'taskFinished',
1052 flushedTasks,
1053 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1054 getDefaultTasksQueueOptions(
1055 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1056 ).tasksFinishedTimeout
32b141fd 1057 )
07e0c9e5
JB
1058 await this.sendKillMessageToWorker(workerNodeKey)
1059 await workerNode.terminate()
1060 }
c97c7edb 1061
729c563d 1062 /**
6677a3d3
JB
1063 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1064 * Can be overridden.
afc003b2
JB
1065 *
1066 * @virtual
729c563d 1067 */
280c2a77 1068 protected setupHook (): void {
965df41c 1069 /* Intentionally empty */
280c2a77 1070 }
c97c7edb 1071
729c563d 1072 /**
280c2a77
S
1073 * Should return whether the worker is the main worker or not.
1074 */
1075 protected abstract isMain (): boolean
1076
1077 /**
2e81254d 1078 * Hook executed before the worker task execution.
bf9549ae 1079 * Can be overridden.
729c563d 1080 *
f06e48d8 1081 * @param workerNodeKey - The worker node key.
1c6fe997 1082 * @param task - The task to execute.
729c563d 1083 */
1c6fe997
JB
1084 protected beforeTaskExecutionHook (
1085 workerNodeKey: number,
1086 task: Task<Data>
1087 ): void {
c63a35a0 1088 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1089 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1090 const workerUsage = this.workerNodes[workerNodeKey].usage
1091 ++workerUsage.tasks.executing
c329fd41
JB
1092 updateWaitTimeWorkerUsage(
1093 this.workerChoiceStrategyContext,
1094 workerUsage,
1095 task
1096 )
94407def
JB
1097 }
1098 if (
1099 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1100 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1101 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1102 null
94407def 1103 ) {
67f3f2d6 1104 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1105 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1106 workerNodeKey
67f3f2d6
JB
1107 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1108 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1109 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1110 updateWaitTimeWorkerUsage(
1111 this.workerChoiceStrategyContext,
1112 taskFunctionWorkerUsage,
1113 task
1114 )
b558f6b5 1115 }
c97c7edb
S
1116 }
1117
c01733f1 1118 /**
2e81254d 1119 * Hook executed after the worker task execution.
bf9549ae 1120 * Can be overridden.
c01733f1 1121 *
501aea93 1122 * @param workerNodeKey - The worker node key.
38e795c1 1123 * @param message - The received message.
c01733f1 1124 */
2e81254d 1125 protected afterTaskExecutionHook (
501aea93 1126 workerNodeKey: number,
2740a743 1127 message: MessageValue<Response>
bf9549ae 1128 ): void {
c329fd41 1129 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1130 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1131 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1132 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1133 updateTaskStatisticsWorkerUsage(workerUsage, message)
1134 updateRunTimeWorkerUsage(
1135 this.workerChoiceStrategyContext,
1136 workerUsage,
1137 message
1138 )
1139 updateEluWorkerUsage(
1140 this.workerChoiceStrategyContext,
1141 workerUsage,
1142 message
1143 )
1144 needWorkerChoiceStrategyUpdate = true
94407def
JB
1145 }
1146 if (
1147 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1148 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1149 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1150 message.taskPerformance!.name
94407def
JB
1151 ) != null
1152 ) {
67f3f2d6 1153 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1154 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1155 workerNodeKey
67f3f2d6
JB
1156 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1157 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1158 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1159 updateRunTimeWorkerUsage(
1160 this.workerChoiceStrategyContext,
1161 taskFunctionWorkerUsage,
1162 message
1163 )
1164 updateEluWorkerUsage(
1165 this.workerChoiceStrategyContext,
1166 taskFunctionWorkerUsage,
1167 message
1168 )
1169 needWorkerChoiceStrategyUpdate = true
1170 }
1171 if (needWorkerChoiceStrategyUpdate) {
c63a35a0 1172 this.workerChoiceStrategyContext?.update(workerNodeKey)
b558f6b5
JB
1173 }
1174 }
1175
db0e38ee
JB
1176 /**
1177 * Whether the worker node shall update its task function worker usage or not.
1178 *
1179 * @param workerNodeKey - The worker node key.
1180 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1181 */
1182 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1183 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1184 return (
1851fed0 1185 workerInfo != null &&
6703b9f4
JB
1186 Array.isArray(workerInfo.taskFunctionNames) &&
1187 workerInfo.taskFunctionNames.length > 2
b558f6b5 1188 )
f1c06930
JB
1189 }
1190
280c2a77 1191 /**
f06e48d8 1192 * Chooses a worker node for the next task.
280c2a77 1193 *
6c6afb84 1194 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1195 *
aa9eede8 1196 * @returns The chosen worker node key
280c2a77 1197 */
6c6afb84 1198 private chooseWorkerNode (): number {
930dcf12 1199 if (this.shallCreateDynamicWorker()) {
aa9eede8 1200 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1201 if (
c63a35a0
JB
1202 this.workerChoiceStrategyContext?.getStrategyPolicy()
1203 .dynamicWorkerUsage === true
6c6afb84 1204 ) {
aa9eede8 1205 return workerNodeKey
6c6afb84 1206 }
17393ac8 1207 }
c63a35a0 1208 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
5d240702 1209 return this.workerChoiceStrategyContext!.execute()
930dcf12
JB
1210 }
1211
6c6afb84
JB
1212 /**
1213 * Conditions for dynamic worker creation.
1214 *
1215 * @returns Whether to create a dynamic worker or not.
1216 */
9d9fb7b6 1217 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1218
280c2a77 1219 /**
aa9eede8 1220 * Sends a message to worker given its worker node key.
280c2a77 1221 *
aa9eede8 1222 * @param workerNodeKey - The worker node key.
38e795c1 1223 * @param message - The message.
7d91a8cd 1224 * @param transferList - The optional array of transferable objects.
280c2a77
S
1225 */
1226 protected abstract sendToWorker (
aa9eede8 1227 workerNodeKey: number,
7d91a8cd
JB
1228 message: MessageValue<Data>,
1229 transferList?: TransferListItem[]
280c2a77
S
1230 ): void
1231
4a6952ff 1232 /**
aa9eede8 1233 * Creates a new, completely set up worker node.
4a6952ff 1234 *
aa9eede8 1235 * @returns New, completely set up worker node key.
4a6952ff 1236 */
aa9eede8 1237 protected createAndSetupWorkerNode (): number {
c3719753
JB
1238 const workerNode = this.createWorkerNode()
1239 workerNode.registerWorkerEventHandler(
1240 'online',
1241 this.opts.onlineHandler ?? EMPTY_FUNCTION
1242 )
1243 workerNode.registerWorkerEventHandler(
1244 'message',
1245 this.opts.messageHandler ?? EMPTY_FUNCTION
1246 )
1247 workerNode.registerWorkerEventHandler(
1248 'error',
1249 this.opts.errorHandler ?? EMPTY_FUNCTION
1250 )
b271fce0 1251 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1252 workerNode.info.ready = false
2a69b8c5 1253 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1254 if (
b6bfca01 1255 this.started &&
711623b8 1256 !this.destroying &&
9b38ab2d 1257 this.opts.restartWorkerOnError === true
15b176e0 1258 ) {
07e0c9e5 1259 if (workerNode.info.dynamic) {
aa9eede8 1260 this.createAndSetupDynamicWorkerNode()
b362a929 1261 } else if (!this.startingMinimumNumberOfWorkers) {
3d720596 1262 this.startMinimumNumberOfWorkers()
8a1260a3 1263 }
5baee0d7 1264 }
3c9123c7
JB
1265 if (
1266 this.started &&
1267 !this.destroying &&
1268 this.opts.enableTasksQueue === true
1269 ) {
9974369e 1270 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1271 }
b4f8aca8 1272 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
07f6f7b1 1273 workerNode?.terminate().catch((error: unknown) => {
07e0c9e5
JB
1274 this.emitter?.emit(PoolEvents.error, error)
1275 })
5baee0d7 1276 })
c3719753
JB
1277 workerNode.registerWorkerEventHandler(
1278 'exit',
1279 this.opts.exitHandler ?? EMPTY_FUNCTION
1280 )
1281 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1282 this.removeWorkerNode(workerNode)
b362a929
JB
1283 if (
1284 this.started &&
1285 !this.startingMinimumNumberOfWorkers &&
1286 !this.destroying
1287 ) {
60dc0a9c
JB
1288 this.startMinimumNumberOfWorkers()
1289 }
a974afa6 1290 })
c3719753 1291 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1292 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1293 return workerNodeKey
c97c7edb 1294 }
be0676b3 1295
930dcf12 1296 /**
aa9eede8 1297 * Creates a new, completely set up dynamic worker node.
930dcf12 1298 *
aa9eede8 1299 * @returns New, completely set up dynamic worker node key.
930dcf12 1300 */
aa9eede8
JB
1301 protected createAndSetupDynamicWorkerNode (): number {
1302 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1303 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1304 this.checkMessageWorkerId(message)
aa9eede8
JB
1305 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1306 message.workerId
aad6fb64 1307 )
2b6b412f 1308 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1309 // Kill message received from worker
930dcf12
JB
1310 if (
1311 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1312 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1313 ((this.opts.enableTasksQueue === false &&
aa9eede8 1314 workerUsage.tasks.executing === 0) ||
7b56f532 1315 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1316 workerUsage.tasks.executing === 0 &&
1317 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1318 ) {
f3827d5d 1319 // Flag the worker node as not ready immediately
ae3ab61d 1320 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
07f6f7b1 1321 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
5270d253
JB
1322 this.emitter?.emit(PoolEvents.error, error)
1323 })
930dcf12
JB
1324 }
1325 })
aa9eede8 1326 this.sendToWorker(workerNodeKey, {
e9dd5b66 1327 checkActive: true
21f710aa 1328 })
72ae84a2
JB
1329 if (this.taskFunctions.size > 0) {
1330 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1331 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1332 taskFunctionOperation: 'add',
1333 taskFunctionName,
1334 taskFunction: taskFunction.toString()
07f6f7b1 1335 }).catch((error: unknown) => {
72ae84a2
JB
1336 this.emitter?.emit(PoolEvents.error, error)
1337 })
1338 }
1339 }
e44639e9
JB
1340 const workerNode = this.workerNodes[workerNodeKey]
1341 workerNode.info.dynamic = true
b1aae695 1342 if (
c63a35a0
JB
1343 this.workerChoiceStrategyContext?.getStrategyPolicy()
1344 .dynamicWorkerReady === true ||
1345 this.workerChoiceStrategyContext?.getStrategyPolicy()
1346 .dynamicWorkerUsage === true
b1aae695 1347 ) {
e44639e9 1348 workerNode.info.ready = true
b5e113f6 1349 }
33e6bb4c 1350 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1351 return workerNodeKey
930dcf12
JB
1352 }
1353
a2ed5053 1354 /**
aa9eede8 1355 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1356 *
aa9eede8 1357 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1358 * @param listener - The message listener callback.
1359 */
85aeb3f3
JB
1360 protected abstract registerWorkerMessageListener<
1361 Message extends Data | Response
aa9eede8
JB
1362 >(
1363 workerNodeKey: number,
1364 listener: (message: MessageValue<Message>) => void
1365 ): void
a2ed5053 1366
ae036c3e
JB
1367 /**
1368 * Registers once a listener callback on the worker given its worker node key.
1369 *
1370 * @param workerNodeKey - The worker node key.
1371 * @param listener - The message listener callback.
1372 */
1373 protected abstract registerOnceWorkerMessageListener<
1374 Message extends Data | Response
1375 >(
1376 workerNodeKey: number,
1377 listener: (message: MessageValue<Message>) => void
1378 ): void
1379
1380 /**
1381 * Deregisters a listener callback on the worker given its worker node key.
1382 *
1383 * @param workerNodeKey - The worker node key.
1384 * @param listener - The message listener callback.
1385 */
1386 protected abstract deregisterWorkerMessageListener<
1387 Message extends Data | Response
1388 >(
1389 workerNodeKey: number,
1390 listener: (message: MessageValue<Message>) => void
1391 ): void
1392
a2ed5053 1393 /**
aa9eede8 1394 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1395 * Can be overridden.
1396 *
aa9eede8 1397 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1398 */
aa9eede8 1399 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1400 // Listen to worker messages.
fcd39179
JB
1401 this.registerWorkerMessageListener(
1402 workerNodeKey,
3a776b6d 1403 this.workerMessageListener
fcd39179 1404 )
85aeb3f3 1405 // Send the startup message to worker.
aa9eede8 1406 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1407 // Send the statistics message to worker.
1408 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1409 if (this.opts.enableTasksQueue === true) {
dbd73092 1410 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1411 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1412 'idle',
1413 this.handleWorkerNodeIdleEvent
9f95d5eb 1414 )
47352846
JB
1415 }
1416 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1417 this.workerNodes[workerNodeKey].on(
b5e75be8 1418 'backPressure',
e44639e9 1419 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1420 )
47352846 1421 }
72695f86 1422 }
d2c73f82
JB
1423 }
1424
85aeb3f3 1425 /**
aa9eede8
JB
1426 * Sends the startup message to worker given its worker node key.
1427 *
1428 * @param workerNodeKey - The worker node key.
1429 */
1430 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1431
1432 /**
9edb9717 1433 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1434 *
aa9eede8 1435 * @param workerNodeKey - The worker node key.
85aeb3f3 1436 */
9edb9717 1437 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1438 this.sendToWorker(workerNodeKey, {
1439 statistics: {
1440 runTime:
c63a35a0 1441 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 1442 .runTime.aggregate ?? false,
c63a35a0 1443 elu:
f4d0a470 1444 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c63a35a0 1445 .aggregate ?? false
72ae84a2 1446 }
aa9eede8
JB
1447 })
1448 }
a2ed5053 1449
5eb72b9e
JB
1450 private cannotStealTask (): boolean {
1451 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1452 }
1453
42c677c1
JB
1454 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1455 if (this.shallExecuteTask(workerNodeKey)) {
1456 this.executeTask(workerNodeKey, task)
1457 } else {
1458 this.enqueueTask(workerNodeKey, task)
1459 }
1460 }
1461
a2ed5053 1462 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1463 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1464 return
1465 }
a2ed5053 1466 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1467 const destinationWorkerNodeKey = this.workerNodes.reduce(
1468 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1469 return workerNode.info.ready &&
1470 workerNode.usage.tasks.queued <
1471 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1472 ? workerNodeKey
1473 : minWorkerNodeKey
1474 },
1475 0
1476 )
42c677c1
JB
1477 this.handleTask(
1478 destinationWorkerNodeKey,
67f3f2d6
JB
1479 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1480 this.dequeueTask(workerNodeKey)!
42c677c1 1481 )
dd951876
JB
1482 }
1483 }
1484
b1838604
JB
1485 private updateTaskStolenStatisticsWorkerUsage (
1486 workerNodeKey: number,
b1838604
JB
1487 taskName: string
1488 ): void {
1a880eca 1489 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1490 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1491 if (workerNode?.usage != null) {
b1838604
JB
1492 ++workerNode.usage.tasks.stolen
1493 }
1494 if (
1495 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1496 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1497 ) {
67f3f2d6
JB
1498 const taskFunctionWorkerUsage =
1499 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1500 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1501 ++taskFunctionWorkerUsage.tasks.stolen
1502 }
1503 }
1504
463226a4 1505 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1506 workerNodeKey: number
463226a4
JB
1507 ): void {
1508 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1509 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1510 if (workerNode?.usage != null) {
463226a4
JB
1511 ++workerNode.usage.tasks.sequentiallyStolen
1512 }
f1f77f45
JB
1513 }
1514
1515 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1516 workerNodeKey: number,
1517 taskName: string
1518 ): void {
1519 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1520 if (
1521 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1522 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1523 ) {
67f3f2d6
JB
1524 const taskFunctionWorkerUsage =
1525 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1526 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1527 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1528 }
1529 }
1530
1531 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1532 workerNodeKey: number
463226a4
JB
1533 ): void {
1534 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1535 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1536 if (workerNode?.usage != null) {
463226a4
JB
1537 workerNode.usage.tasks.sequentiallyStolen = 0
1538 }
f1f77f45
JB
1539 }
1540
1541 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1542 workerNodeKey: number,
1543 taskName: string
1544 ): void {
1545 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1546 if (
1547 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1548 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1549 ) {
67f3f2d6
JB
1550 const taskFunctionWorkerUsage =
1551 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1552 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1553 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1554 }
1555 }
1556
e44639e9 1557 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1558 eventDetail: WorkerNodeEventDetail,
463226a4 1559 previousStolenTask?: Task<Data>
9f95d5eb 1560 ): void => {
e1c2dba7 1561 const { workerNodeKey } = eventDetail
463226a4
JB
1562 if (workerNodeKey == null) {
1563 throw new Error(
c63a35a0 1564 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1565 )
1566 }
c63a35a0 1567 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1568 if (
1569 this.cannotStealTask() ||
c63a35a0
JB
1570 (this.info.stealingWorkerNodes ?? 0) >
1571 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1572 ) {
1851fed0 1573 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1574 workerInfo.stealing = false
5eb72b9e
JB
1575 }
1576 return
1577 }
463226a4
JB
1578 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1579 if (
1851fed0 1580 workerInfo != null &&
463226a4
JB
1581 previousStolenTask != null &&
1582 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1583 (workerNodeTasksUsage.executing > 0 ||
1584 this.tasksQueueSize(workerNodeKey) > 0)
1585 ) {
c63a35a0 1586 workerInfo.stealing = false
67f3f2d6 1587 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
369179f6 1588 for (const taskName of workerInfo.taskFunctionNames!) {
f1f77f45
JB
1589 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1590 workerNodeKey,
1591 taskName
1592 )
1593 }
1594 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1595 return
1596 }
1851fed0
JB
1597 if (workerInfo == null) {
1598 throw new Error(
1599 `Worker node with key '${workerNodeKey}' not found in pool`
1600 )
1601 }
c63a35a0 1602 workerInfo.stealing = true
463226a4 1603 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1604 if (
1605 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1606 stolenTask != null
1607 ) {
67f3f2d6 1608 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1609 const taskFunctionTasksWorkerUsage = this.workerNodes[
1610 workerNodeKey
67f3f2d6
JB
1611 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1612 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1613 if (
1614 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1615 (previousStolenTask != null &&
1616 previousStolenTask.name === stolenTask.name &&
1617 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1618 ) {
1619 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1620 workerNodeKey,
67f3f2d6
JB
1621 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1622 stolenTask.name!
f1f77f45
JB
1623 )
1624 } else {
1625 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1626 workerNodeKey,
67f3f2d6
JB
1627 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1628 stolenTask.name!
f1f77f45
JB
1629 )
1630 }
1631 }
463226a4
JB
1632 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1633 .then(() => {
e44639e9 1634 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1635 return undefined
1636 })
07f6f7b1 1637 .catch((error: unknown) => {
2e8cfbb7
JB
1638 this.emitter?.emit(PoolEvents.error, error)
1639 })
463226a4
JB
1640 }
1641
1642 private readonly workerNodeStealTask = (
1643 workerNodeKey: number
1644 ): Task<Data> | undefined => {
dd951876 1645 const workerNodes = this.workerNodes
a6b3272b 1646 .slice()
dd951876
JB
1647 .sort(
1648 (workerNodeA, workerNodeB) =>
1649 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1650 )
f201a0cd 1651 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1652 (sourceWorkerNode, sourceWorkerNodeKey) =>
1653 sourceWorkerNode.info.ready &&
5eb72b9e 1654 !sourceWorkerNode.info.stealing &&
463226a4
JB
1655 sourceWorkerNodeKey !== workerNodeKey &&
1656 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1657 )
1658 if (sourceWorkerNode != null) {
67f3f2d6
JB
1659 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1660 const task = sourceWorkerNode.popTask()!
42c677c1 1661 this.handleTask(workerNodeKey, task)
f1f77f45 1662 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1663 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1664 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1665 return task
72695f86
JB
1666 }
1667 }
1668
e44639e9 1669 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1670 eventDetail: WorkerNodeEventDetail
9f95d5eb 1671 ): void => {
5eb72b9e
JB
1672 if (
1673 this.cannotStealTask() ||
c63a35a0
JB
1674 (this.info.stealingWorkerNodes ?? 0) >
1675 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1676 ) {
cb71d660
JB
1677 return
1678 }
e1c2dba7 1679 const { workerId } = eventDetail
f778c355 1680 const sizeOffset = 1
67f3f2d6
JB
1681 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1682 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1683 return
1684 }
72695f86 1685 const sourceWorkerNode =
b5e75be8 1686 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1687 const workerNodes = this.workerNodes
a6b3272b 1688 .slice()
72695f86
JB
1689 .sort(
1690 (workerNodeA, workerNodeB) =>
1691 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1692 )
1693 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1694 if (
0bc68267 1695 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1696 workerNode.info.ready &&
5eb72b9e 1697 !workerNode.info.stealing &&
b5e75be8 1698 workerNode.info.id !== workerId &&
0bc68267 1699 workerNode.usage.tasks.queued <
67f3f2d6
JB
1700 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1701 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1702 ) {
c63a35a0 1703 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1704 if (workerInfo == null) {
1705 throw new Error(
1706 `Worker node with key '${workerNodeKey}' not found in pool`
1707 )
1708 }
c63a35a0 1709 workerInfo.stealing = true
67f3f2d6
JB
1710 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1711 const task = sourceWorkerNode.popTask()!
42c677c1 1712 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1713 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1714 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1715 workerInfo.stealing = false
10ecf8fd 1716 }
a2ed5053
JB
1717 }
1718 }
1719
be0676b3 1720 /**
fcd39179 1721 * This method is the message listener registered on each worker.
be0676b3 1722 */
3a776b6d
JB
1723 protected readonly workerMessageListener = (
1724 message: MessageValue<Response>
1725 ): void => {
fcd39179 1726 this.checkMessageWorkerId(message)
b641345c
JB
1727 const { workerId, ready, taskId, taskFunctionNames } = message
1728 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1729 // Worker ready response received from worker
1730 this.handleWorkerReadyResponse(message)
b641345c 1731 } else if (taskId != null) {
fcd39179
JB
1732 // Task execution response received from worker
1733 this.handleTaskExecutionResponse(message)
b641345c 1734 } else if (taskFunctionNames != null) {
fcd39179 1735 // Task function names message received from worker
1851fed0 1736 const workerInfo = this.getWorkerInfo(
b641345c 1737 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1738 )
1739 if (workerInfo != null) {
1740 workerInfo.taskFunctionNames = taskFunctionNames
1741 }
6b272951
JB
1742 }
1743 }
1744
8e8d9101
JB
1745 private checkAndEmitReadyEvent (): void {
1746 if (!this.readyEventEmitted && this.ready) {
1747 this.emitter?.emit(PoolEvents.ready, this.info)
1748 this.readyEventEmitted = true
1749 }
1750 }
1751
10e2aa7e 1752 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4 1753 const { workerId, ready, taskFunctionNames } = message
e44639e9 1754 if (ready == null || !ready) {
c63a35a0 1755 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1756 }
e44639e9
JB
1757 const workerNode =
1758 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1759 workerNode.info.ready = ready
1760 workerNode.info.taskFunctionNames = taskFunctionNames
8e8d9101 1761 this.checkAndEmitReadyEvent()
6b272951
JB
1762 }
1763
1764 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1765 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1768 if (promiseResponse != null) {
f18fd12b 1769 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1770 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1771 if (workerError != null) {
1772 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1773 asyncResource != null
1774 ? asyncResource.runInAsyncScope(
1775 reject,
1776 this.emitter,
1777 workerError.message
1778 )
1779 : reject(workerError.message)
6b272951 1780 } else {
f18fd12b
JB
1781 asyncResource != null
1782 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1783 : resolve(data as Response)
6b272951 1784 }
f18fd12b 1785 asyncResource?.emitDestroy()
501aea93 1786 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1787 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1788 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1789 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1790 workerNode?.emit('taskFinished', taskId)
16d7e943
JB
1791 if (
1792 this.opts.enableTasksQueue === true &&
1793 !this.destroying &&
1794 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1795 workerNode != null
1796 ) {
9b358e72 1797 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1798 if (
1799 this.tasksQueueSize(workerNodeKey) > 0 &&
1800 workerNodeTasksUsage.executing <
67f3f2d6
JB
1801 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1802 this.opts.tasksQueueOptions!.concurrency!
463226a4 1803 ) {
67f3f2d6
JB
1804 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1805 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1806 }
1807 if (
1808 workerNodeTasksUsage.executing === 0 &&
1809 this.tasksQueueSize(workerNodeKey) === 0 &&
1810 workerNodeTasksUsage.sequentiallyStolen === 0
1811 ) {
e44639e9 1812 workerNode.emit('idle', {
7f0e1334 1813 workerId,
e1c2dba7
JB
1814 workerNodeKey
1815 })
463226a4 1816 }
be0676b3
APA
1817 }
1818 }
be0676b3 1819 }
7c0ba920 1820
a1763c54 1821 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1822 if (this.busy) {
1823 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1824 }
1825 }
1826
1827 private checkAndEmitTaskQueuingEvents (): void {
1828 if (this.hasBackPressure()) {
1829 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1830 }
1831 }
1832
d0878034
JB
1833 /**
1834 * Emits dynamic worker creation events.
1835 */
1836 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1837
8a1260a3 1838 /**
aa9eede8 1839 * Gets the worker information given its worker node key.
8a1260a3
JB
1840 *
1841 * @param workerNodeKey - The worker node key.
3f09ed9f 1842 * @returns The worker information.
8a1260a3 1843 */
1851fed0
JB
1844 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1845 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1846 }
1847
a05c10de 1848 /**
c3719753 1849 * Creates a worker node.
ea7a90d3 1850 *
c3719753 1851 * @returns The created worker node.
ea7a90d3 1852 */
c3719753 1853 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1854 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1855 this.worker,
1856 this.filePath,
1857 {
1858 env: this.opts.env,
1859 workerOptions: this.opts.workerOptions,
1860 tasksQueueBackPressureSize:
32b141fd 1861 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1862 getDefaultTasksQueueOptions(
1863 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1864 ).size
c3719753 1865 }
671d5154 1866 )
b97d82d8 1867 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1868 if (this.starting) {
1869 workerNode.info.ready = true
1870 }
c3719753
JB
1871 return workerNode
1872 }
1873
1874 /**
1875 * Adds the given worker node in the pool worker nodes.
1876 *
1877 * @param workerNode - The worker node.
1878 * @returns The added worker node key.
1879 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1880 */
1881 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1882 this.workerNodes.push(workerNode)
c3719753 1883 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1884 if (workerNodeKey === -1) {
86ed0598 1885 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1886 }
1887 return workerNodeKey
ea7a90d3 1888 }
c923ce56 1889
8e8d9101
JB
1890 private checkAndEmitEmptyEvent (): void {
1891 if (this.empty) {
1892 this.emitter?.emit(PoolEvents.empty, this.info)
1893 this.readyEventEmitted = false
1894 }
1895 }
1896
51fe3d3c 1897 /**
9974369e 1898 * Removes the worker node from the pool worker nodes.
51fe3d3c 1899 *
9974369e 1900 * @param workerNode - The worker node.
51fe3d3c 1901 */
9974369e
JB
1902 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1903 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1904 if (workerNodeKey !== -1) {
1905 this.workerNodes.splice(workerNodeKey, 1)
c63a35a0 1906 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1f68cede 1907 }
8e8d9101 1908 this.checkAndEmitEmptyEvent()
51fe3d3c 1909 }
adc3c320 1910
ae3ab61d 1911 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
1912 const workerInfo = this.getWorkerInfo(workerNodeKey)
1913 if (workerInfo != null) {
1914 workerInfo.ready = false
1915 }
ae3ab61d
JB
1916 }
1917
9e844245
JB
1918 private hasBackPressure (): boolean {
1919 return (
1920 this.opts.enableTasksQueue === true &&
1921 this.workerNodes.findIndex(
041dc05b 1922 workerNode => !workerNode.hasBackPressure()
a1763c54 1923 ) === -1
9e844245 1924 )
e2b31e32
JB
1925 }
1926
b0a4db63 1927 /**
aa9eede8 1928 * Executes the given task on the worker given its worker node key.
b0a4db63 1929 *
aa9eede8 1930 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1931 * @param task - The task to execute.
1932 */
2e81254d 1933 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1934 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1935 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1936 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1937 }
1938
f9f00b5f 1939 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1940 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1941 this.checkAndEmitTaskQueuingEvents()
1942 return tasksQueueSize
adc3c320
JB
1943 }
1944
416fd65c 1945 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1946 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1947 }
1948
416fd65c 1949 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1950 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1951 }
1952
87347ea8
JB
1953 protected flushTasksQueue (workerNodeKey: number): number {
1954 let flushedTasks = 0
920278a2 1955 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
1956 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1957 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 1958 ++flushedTasks
ff733df7 1959 }
4b628b48 1960 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1961 return flushedTasks
ff733df7
JB
1962 }
1963
ef41a6e6
JB
1964 private flushTasksQueues (): void {
1965 for (const [workerNodeKey] of this.workerNodes.entries()) {
1966 this.flushTasksQueue(workerNodeKey)
1967 }
1968 }
c97c7edb 1969}