docs: update copyright year
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
f18fd12b 5import { AsyncResource } from 'node:async_hooks'
5c4d16da
JB
6import type {
7 MessageValue,
8 PromiseResponseWrapper,
76d91ea0 9 Task
d35e5717 10} from '../utility-types.js'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16 13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
d35e5717
JB
23} from '../utils.js'
24import { KillBehaviors } from '../worker/worker-options.js'
25import type { TaskFunction } from '../worker/task-functions.js'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
d35e5717 34} from './pool.js'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
9f95d5eb 39 WorkerNodeEventDetail,
67f3f2d6 40 WorkerType
d35e5717 41} from './worker.js'
a35560ba 42import {
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
d35e5717
JB
47} from './selection-strategies/selection-strategies-types.js'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
49import { version } from './version.js'
50import { WorkerNode } from './worker-node.js'
bde6b5d7
JB
51import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
bfc75cca 54 checkValidWorkerChoiceStrategy,
32b141fd 55 getDefaultTasksQueueOptions,
c329fd41
JB
56 updateEluWorkerUsage,
57 updateRunTimeWorkerUsage,
58 updateTaskStatisticsWorkerUsage,
59 updateWaitTimeWorkerUsage,
87347ea8 60 waitWorkerNodeEvents
d35e5717 61} from './utils.js'
23ccf9d7 62
729c563d 63/**
ea7a90d3 64 * Base class that implements some shared logic for all poolifier pools.
729c563d 65 *
38e795c1 66 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 69 */
c97c7edb 70export abstract class AbstractPool<
f06e48d8 71 Worker extends IWorker,
d3c8a1a8
S
72 Data = unknown,
73 Response = unknown
c4855468 74> implements IPool<Worker, Data, Response> {
afc003b2 75 /** @inheritDoc */
4b628b48 76 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 77
afc003b2 78 /** @inheritDoc */
f80125ca 79 public emitter?: EventEmitterAsyncResource
7c0ba920 80
be0676b3 81 /**
419e8121 82 * The task execution response promise map:
2740a743 83 * - `key`: The message id of each submitted task.
a3445496 84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 85 *
a3445496 86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 87 */
501aea93
JB
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 90
a35560ba 91 /**
51fe3d3c 92 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 93 */
c63a35a0 94 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
78cea37e
JB
95 Worker,
96 Data,
97 Response
a35560ba
S
98 >
99
72ae84a2
JB
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
15b176e0
JB
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
47352846
JB
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
711623b8
JB
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
55082af9
JB
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
afe0d5bf
JB
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
729c563d
S
128 /**
129 * Constructs a new poolifier pool.
130 *
00e1bdeb 131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 132 * @param filePath - Path to the worker file.
38e795c1 133 * @param opts - Options for the pool.
00e1bdeb 134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 135 */
c97c7edb 136 public constructor (
26ce26ca 137 protected readonly minimumNumberOfWorkers: number,
b4213b7f 138 protected readonly filePath: string,
26ce26ca
JB
139 protected readonly opts: PoolOptions<Worker>,
140 protected readonly maximumNumberOfWorkers?: number
c97c7edb 141 ) {
78cea37e 142 if (!this.isMain()) {
04f45163 143 throw new Error(
8c6d4acf 144 'Cannot start a pool from a worker with the same type as the pool'
04f45163 145 )
c97c7edb 146 }
26ce26ca 147 this.checkPoolType()
bde6b5d7 148 checkFilePath(this.filePath)
26ce26ca 149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 150 this.checkPoolOptions(this.opts)
1086026a 151
7254e419
JB
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 155
6bd72cd0 156 if (this.opts.enableEvents === true) {
b5604034 157 this.initializeEventEmitter()
7c0ba920 158 }
d59df138
JB
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
da309861
JB
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
b6b32453
JB
168
169 this.setupHook()
170
72ae84a2
JB
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
47352846 173 this.started = false
075e51d1 174 this.starting = false
711623b8 175 this.destroying = false
55082af9 176 this.readyEventEmitted = false
47352846
JB
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
afe0d5bf
JB
180
181 this.startTimestamp = performance.now()
c97c7edb
S
182 }
183
26ce26ca
JB
184 private checkPoolType (): void {
185 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
186 throw new Error(
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
188 )
189 }
190 }
191
c63a35a0
JB
192 private checkMinimumNumberOfWorkers (
193 minimumNumberOfWorkers: number | undefined
194 ): void {
af7f2788 195 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
af7f2788 199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 200 throw new TypeError(
0d80593b 201 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 202 )
af7f2788 203 } else if (minimumNumberOfWorkers < 0) {
473c717a 204 throw new RangeError(
8d3782fa
JB
205 'Cannot instantiate a pool with a negative number of workers'
206 )
af7f2788 207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 213 if (isPlainObject(opts)) {
47352846 214 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 215 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 218 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 219 opts.workerChoiceStrategyOptions
2324f8c9 220 )
26ce26ca
JB
221 if (opts.workerChoiceStrategyOptions != null) {
222 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 223 }
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
c63a35a0 228 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 230 opts.tasksQueueOptions
0d80593b
JB
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 235 }
aee46736
JB
236 }
237
0d80593b 238 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 239 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 240 ): void {
2324f8c9
JB
241 if (
242 workerChoiceStrategyOptions != null &&
243 !isPlainObject(workerChoiceStrategyOptions)
244 ) {
0d80593b
JB
245 throw new TypeError(
246 'Invalid worker choice strategy options: must be a plain object'
247 )
248 }
49be33fe 249 if (
2324f8c9 250 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
251 Object.keys(workerChoiceStrategyOptions.weights).length !==
252 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
253 ) {
254 throw new Error(
255 'Invalid worker choice strategy options: must have a weight for each worker node'
256 )
257 }
f0d7f803 258 if (
2324f8c9 259 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
260 !Object.values(Measurements).includes(
261 workerChoiceStrategyOptions.measurement
262 )
263 ) {
264 throw new Error(
265 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
266 )
267 }
0d80593b
JB
268 }
269
b5604034 270 private initializeEventEmitter (): void {
5b7d00b4 271 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
272 name: `poolifier:${this.type}-${this.worker}-pool`
273 })
274 }
275
08f3f44c 276 /** @inheritDoc */
6b27d407
JB
277 public get info (): PoolInfo {
278 return {
23ccf9d7 279 version,
6b27d407 280 type: this.type,
184855e6 281 worker: this.worker,
47352846 282 started: this.started,
2431bdb4 283 ready: this.ready,
67f3f2d6
JB
284 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
285 strategy: this.opts.workerChoiceStrategy!,
0e8587d2 286 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
26ce26ca
JB
287 minSize: this.minimumNumberOfWorkers,
288 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
289 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 290 .runTime.aggregate === true &&
c63a35a0
JB
291 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
292 .waitTime.aggregate && {
293 utilization: round(this.utilization)
294 }),
6b27d407
JB
295 workerNodes: this.workerNodes.length,
296 idleWorkerNodes: this.workerNodes.reduce(
297 (accumulator, workerNode) =>
f59e1027 298 workerNode.usage.tasks.executing === 0
a4e07f72
JB
299 ? accumulator + 1
300 : accumulator,
6b27d407
JB
301 0
302 ),
5eb72b9e
JB
303 ...(this.opts.enableTasksQueue === true && {
304 stealingWorkerNodes: this.workerNodes.reduce(
305 (accumulator, workerNode) =>
306 workerNode.info.stealing ? accumulator + 1 : accumulator,
307 0
308 )
309 }),
6b27d407 310 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
311 (accumulator, _workerNode, workerNodeKey) =>
312 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
313 0
314 ),
a4e07f72 315 executedTasks: this.workerNodes.reduce(
6b27d407 316 (accumulator, workerNode) =>
f59e1027 317 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
318 0
319 ),
320 executingTasks: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
f59e1027 322 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
323 0
324 ),
daf86646
JB
325 ...(this.opts.enableTasksQueue === true && {
326 queuedTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
328 accumulator + workerNode.usage.tasks.queued,
329 0
330 )
331 }),
332 ...(this.opts.enableTasksQueue === true && {
333 maxQueuedTasks: this.workerNodes.reduce(
334 (accumulator, workerNode) =>
c63a35a0 335 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
336 0
337 )
338 }),
a1763c54
JB
339 ...(this.opts.enableTasksQueue === true && {
340 backPressure: this.hasBackPressure()
341 }),
68cbdc84
JB
342 ...(this.opts.enableTasksQueue === true && {
343 stolenTasks: this.workerNodes.reduce(
344 (accumulator, workerNode) =>
345 accumulator + workerNode.usage.tasks.stolen,
346 0
347 )
348 }),
a4e07f72
JB
349 failedTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
f59e1027 351 accumulator + workerNode.usage.tasks.failed,
a4e07f72 352 0
1dcf8b7b 353 ),
26ce26ca 354 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 355 .runTime.aggregate === true && {
1dcf8b7b 356 runTime: {
98e72cda 357 minimum: round(
90d6701c 358 min(
98e72cda 359 ...this.workerNodes.map(
c63a35a0 360 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 361 )
1dcf8b7b
JB
362 )
363 ),
98e72cda 364 maximum: round(
90d6701c 365 max(
98e72cda 366 ...this.workerNodes.map(
c63a35a0 367 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 368 )
1dcf8b7b 369 )
98e72cda 370 ),
c63a35a0 371 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
372 .runTime.average && {
373 average: round(
374 average(
375 this.workerNodes.reduce<number[]>(
376 (accumulator, workerNode) =>
377 accumulator.concat(workerNode.usage.runTime.history),
378 []
379 )
98e72cda 380 )
dc021bcc 381 )
3baa0837 382 }),
c63a35a0 383 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
384 .runTime.median && {
385 median: round(
386 median(
3baa0837
JB
387 this.workerNodes.reduce<number[]>(
388 (accumulator, workerNode) =>
389 accumulator.concat(workerNode.usage.runTime.history),
390 []
98e72cda
JB
391 )
392 )
393 )
394 })
1dcf8b7b
JB
395 }
396 }),
26ce26ca 397 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 398 .waitTime.aggregate === true && {
1dcf8b7b 399 waitTime: {
98e72cda 400 minimum: round(
90d6701c 401 min(
98e72cda 402 ...this.workerNodes.map(
c63a35a0 403 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 404 )
1dcf8b7b
JB
405 )
406 ),
98e72cda 407 maximum: round(
90d6701c 408 max(
98e72cda 409 ...this.workerNodes.map(
c63a35a0 410 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 411 )
1dcf8b7b 412 )
98e72cda 413 ),
c63a35a0 414 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
415 .waitTime.average && {
416 average: round(
417 average(
418 this.workerNodes.reduce<number[]>(
419 (accumulator, workerNode) =>
420 accumulator.concat(workerNode.usage.waitTime.history),
421 []
422 )
98e72cda 423 )
dc021bcc 424 )
3baa0837 425 }),
c63a35a0 426 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
427 .waitTime.median && {
428 median: round(
429 median(
3baa0837
JB
430 this.workerNodes.reduce<number[]>(
431 (accumulator, workerNode) =>
432 accumulator.concat(workerNode.usage.waitTime.history),
433 []
98e72cda
JB
434 )
435 )
436 )
437 })
1dcf8b7b
JB
438 }
439 })
6b27d407
JB
440 }
441 }
08f3f44c 442
aa9eede8
JB
443 /**
444 * The pool readiness boolean status.
445 */
2431bdb4 446 private get ready (): boolean {
8e8d9101
JB
447 if (this.empty) {
448 return false
449 }
2431bdb4 450 return (
b97d82d8
JB
451 this.workerNodes.reduce(
452 (accumulator, workerNode) =>
453 !workerNode.info.dynamic && workerNode.info.ready
454 ? accumulator + 1
455 : accumulator,
456 0
26ce26ca 457 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
458 )
459 }
460
8e8d9101
JB
461 /**
462 * The pool emptiness boolean status.
463 */
464 protected get empty (): boolean {
fb43035c 465 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
466 }
467
afe0d5bf 468 /**
aa9eede8 469 * The approximate pool utilization.
afe0d5bf
JB
470 *
471 * @returns The pool utilization.
472 */
473 private get utilization (): number {
8e5ca040 474 const poolTimeCapacity =
26ce26ca
JB
475 (performance.now() - this.startTimestamp) *
476 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
477 const totalTasksRunTime = this.workerNodes.reduce(
478 (accumulator, workerNode) =>
c63a35a0 479 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
480 0
481 )
482 const totalTasksWaitTime = this.workerNodes.reduce(
483 (accumulator, workerNode) =>
c63a35a0 484 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
485 0
486 )
8e5ca040 487 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
488 }
489
8881ae32 490 /**
aa9eede8 491 * The pool type.
8881ae32
JB
492 *
493 * If it is `'dynamic'`, it provides the `max` property.
494 */
495 protected abstract get type (): PoolType
496
184855e6 497 /**
aa9eede8 498 * The worker type.
184855e6
JB
499 */
500 protected abstract get worker (): WorkerType
501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
4d159167 508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
8e5a7cf9 511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
aa9eede8
JB
518 /**
519 * Gets the worker node key given its worker id.
520 *
521 * @param workerId - The worker id.
aad6fb64 522 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 523 */
72ae84a2 524 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 525 return this.workerNodes.findIndex(
041dc05b 526 workerNode => workerNode.info.id === workerId
aad6fb64 527 )
aa9eede8
JB
528 }
529
afc003b2 530 /** @inheritDoc */
a35560ba 531 public setWorkerChoiceStrategy (
59219cbb
JB
532 workerChoiceStrategy: WorkerChoiceStrategy,
533 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 534 ): void {
bde6b5d7 535 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 536 this.opts.workerChoiceStrategy = workerChoiceStrategy
c63a35a0 537 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
b6b32453
JB
538 this.opts.workerChoiceStrategy
539 )
540 if (workerChoiceStrategyOptions != null) {
541 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
542 }
aa9eede8 543 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 544 workerNode.resetUsage()
9edb9717 545 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 546 }
a20f0ba5
JB
547 }
548
549 /** @inheritDoc */
550 public setWorkerChoiceStrategyOptions (
c63a35a0 551 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
a20f0ba5 552 ): void {
0d80593b 553 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
554 if (workerChoiceStrategyOptions != null) {
555 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 556 }
c63a35a0 557 this.workerChoiceStrategyContext?.setOptions(
a20f0ba5 558 this.opts.workerChoiceStrategyOptions
a35560ba
S
559 )
560 }
561
a20f0ba5 562 /** @inheritDoc */
8f52842f
JB
563 public enableTasksQueue (
564 enable: boolean,
565 tasksQueueOptions?: TasksQueueOptions
566 ): void {
a20f0ba5 567 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
568 this.unsetTaskStealing()
569 this.unsetTasksStealingOnBackPressure()
ef41a6e6 570 this.flushTasksQueues()
a20f0ba5
JB
571 }
572 this.opts.enableTasksQueue = enable
c63a35a0 573 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
574 }
575
576 /** @inheritDoc */
c63a35a0
JB
577 public setTasksQueueOptions (
578 tasksQueueOptions: TasksQueueOptions | undefined
579 ): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true) {
bde6b5d7 581 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
582 this.opts.tasksQueueOptions =
583 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
584 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
585 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 586 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 587 this.unsetTaskStealing()
d6ca1416
JB
588 this.setTaskStealing()
589 } else {
590 this.unsetTaskStealing()
591 }
592 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 593 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
594 this.setTasksStealingOnBackPressure()
595 } else {
596 this.unsetTasksStealingOnBackPressure()
597 }
5baee0d7 598 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
599 delete this.opts.tasksQueueOptions
600 }
601 }
602
9b38ab2d 603 private buildTasksQueueOptions (
c63a35a0 604 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
605 ): TasksQueueOptions {
606 return {
26ce26ca
JB
607 ...getDefaultTasksQueueOptions(
608 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
609 ),
9b38ab2d
JB
610 ...tasksQueueOptions
611 }
612 }
613
5b49e864 614 private setTasksQueueSize (size: number): void {
20c6f652 615 for (const workerNode of this.workerNodes) {
ff3f866a 616 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
617 }
618 }
619
d6ca1416
JB
620 private setTaskStealing (): void {
621 for (const [workerNodeKey] of this.workerNodes.entries()) {
e44639e9 622 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
623 }
624 }
625
626 private unsetTaskStealing (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 628 this.workerNodes[workerNodeKey].off(
e44639e9
JB
629 'idle',
630 this.handleWorkerNodeIdleEvent
9f95d5eb 631 )
d6ca1416
JB
632 }
633 }
634
635 private setTasksStealingOnBackPressure (): void {
636 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 637 this.workerNodes[workerNodeKey].on(
b5e75be8 638 'backPressure',
e44639e9 639 this.handleWorkerNodeBackPressureEvent
9f95d5eb 640 )
d6ca1416
JB
641 }
642 }
643
644 private unsetTasksStealingOnBackPressure (): void {
645 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 646 this.workerNodes[workerNodeKey].off(
b5e75be8 647 'backPressure',
e44639e9 648 this.handleWorkerNodeBackPressureEvent
9f95d5eb 649 )
d6ca1416
JB
650 }
651 }
652
c319c66b
JB
653 /**
654 * Whether the pool is full or not.
655 *
656 * The pool filling boolean status.
657 */
dea903a8 658 protected get full (): boolean {
26ce26ca
JB
659 return (
660 this.workerNodes.length >=
661 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
662 )
dea903a8 663 }
c2ade475 664
c319c66b
JB
665 /**
666 * Whether the pool is busy or not.
667 *
668 * The pool busyness boolean status.
669 */
670 protected abstract get busy (): boolean
7c0ba920 671
6c6afb84 672 /**
3d76750a 673 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
674 *
675 * @returns Worker nodes busyness boolean status.
676 */
c2ade475 677 protected internalBusy (): boolean {
3d76750a
JB
678 if (this.opts.enableTasksQueue === true) {
679 return (
680 this.workerNodes.findIndex(
041dc05b 681 workerNode =>
3d76750a
JB
682 workerNode.info.ready &&
683 workerNode.usage.tasks.executing <
67f3f2d6
JB
684 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
685 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
686 ) === -1
687 )
3d76750a 688 }
419e8121
JB
689 return (
690 this.workerNodes.findIndex(
691 workerNode =>
692 workerNode.info.ready && workerNode.usage.tasks.executing === 0
693 ) === -1
694 )
cb70b19d
JB
695 }
696
42c677c1
JB
697 private isWorkerNodeBusy (workerNodeKey: number): boolean {
698 if (this.opts.enableTasksQueue === true) {
699 return (
700 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
701 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
702 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
703 )
704 }
705 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
706 }
707
e81c38f2 708 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
709 workerNodeKey: number,
710 message: MessageValue<Data>
711 ): Promise<boolean> {
72ae84a2 712 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
713 const taskFunctionOperationListener = (
714 message: MessageValue<Response>
715 ): void => {
716 this.checkMessageWorkerId(message)
1851fed0 717 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 718 if (
ae036c3e
JB
719 message.taskFunctionOperationStatus != null &&
720 message.workerId === workerId
72ae84a2 721 ) {
ae036c3e
JB
722 if (message.taskFunctionOperationStatus) {
723 resolve(true)
c63a35a0 724 } else {
ae036c3e
JB
725 reject(
726 new Error(
c63a35a0 727 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 728 )
72ae84a2 729 )
ae036c3e
JB
730 }
731 this.deregisterWorkerMessageListener(
732 this.getWorkerNodeKeyByWorkerId(message.workerId),
733 taskFunctionOperationListener
72ae84a2
JB
734 )
735 }
ae036c3e
JB
736 }
737 this.registerWorkerMessageListener(
738 workerNodeKey,
739 taskFunctionOperationListener
740 )
72ae84a2
JB
741 this.sendToWorker(workerNodeKey, message)
742 })
743 }
744
745 private async sendTaskFunctionOperationToWorkers (
adee6053 746 message: MessageValue<Data>
e81c38f2
JB
747 ): Promise<boolean> {
748 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
749 const responsesReceived = new Array<MessageValue<Response>>()
750 const taskFunctionOperationsListener = (
751 message: MessageValue<Response>
752 ): void => {
753 this.checkMessageWorkerId(message)
754 if (message.taskFunctionOperationStatus != null) {
755 responsesReceived.push(message)
756 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 757 if (
e81c38f2
JB
758 responsesReceived.every(
759 message => message.taskFunctionOperationStatus === true
760 )
761 ) {
762 resolve(true)
763 } else if (
e81c38f2
JB
764 responsesReceived.some(
765 message => message.taskFunctionOperationStatus === false
766 )
767 ) {
b0b55f57
JB
768 const errorResponse = responsesReceived.find(
769 response => response.taskFunctionOperationStatus === false
770 )
e81c38f2
JB
771 reject(
772 new Error(
b0b55f57 773 `Task function operation '${
e81c38f2 774 message.taskFunctionOperation as string
c63a35a0
JB
775 }' failed on worker ${errorResponse?.workerId} with error: '${
776 errorResponse?.workerError?.message
b0b55f57 777 }'`
e81c38f2
JB
778 )
779 )
780 }
ae036c3e
JB
781 this.deregisterWorkerMessageListener(
782 this.getWorkerNodeKeyByWorkerId(message.workerId),
783 taskFunctionOperationsListener
784 )
e81c38f2 785 }
ae036c3e
JB
786 }
787 }
788 for (const [workerNodeKey] of this.workerNodes.entries()) {
789 this.registerWorkerMessageListener(
790 workerNodeKey,
791 taskFunctionOperationsListener
792 )
72ae84a2 793 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
794 }
795 })
6703b9f4
JB
796 }
797
798 /** @inheritDoc */
799 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
800 for (const workerNode of this.workerNodes) {
801 if (
802 Array.isArray(workerNode.info.taskFunctionNames) &&
803 workerNode.info.taskFunctionNames.includes(name)
804 ) {
805 return true
806 }
807 }
808 return false
6703b9f4
JB
809 }
810
811 /** @inheritDoc */
e81c38f2
JB
812 public async addTaskFunction (
813 name: string,
3feeab69 814 fn: TaskFunction<Data, Response>
e81c38f2 815 ): Promise<boolean> {
3feeab69
JB
816 if (typeof name !== 'string') {
817 throw new TypeError('name argument must be a string')
818 }
819 if (typeof name === 'string' && name.trim().length === 0) {
820 throw new TypeError('name argument must not be an empty string')
821 }
822 if (typeof fn !== 'function') {
823 throw new TypeError('fn argument must be a function')
824 }
adee6053 825 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
826 taskFunctionOperation: 'add',
827 taskFunctionName: name,
3feeab69 828 taskFunction: fn.toString()
6703b9f4 829 })
adee6053
JB
830 this.taskFunctions.set(name, fn)
831 return opResult
6703b9f4
JB
832 }
833
834 /** @inheritDoc */
e81c38f2 835 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
836 if (!this.taskFunctions.has(name)) {
837 throw new Error(
16248b23 838 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
839 )
840 }
adee6053 841 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
842 taskFunctionOperation: 'remove',
843 taskFunctionName: name
844 })
adee6053
JB
845 this.deleteTaskFunctionWorkerUsages(name)
846 this.taskFunctions.delete(name)
847 return opResult
6703b9f4
JB
848 }
849
90d7d101 850 /** @inheritDoc */
6703b9f4 851 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
852 for (const workerNode of this.workerNodes) {
853 if (
6703b9f4
JB
854 Array.isArray(workerNode.info.taskFunctionNames) &&
855 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 856 ) {
6703b9f4 857 return workerNode.info.taskFunctionNames
f2dbbf95 858 }
90d7d101 859 }
f2dbbf95 860 return []
90d7d101
JB
861 }
862
6703b9f4 863 /** @inheritDoc */
e81c38f2 864 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 865 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
866 taskFunctionOperation: 'default',
867 taskFunctionName: name
868 })
6703b9f4
JB
869 }
870
adee6053
JB
871 private deleteTaskFunctionWorkerUsages (name: string): void {
872 for (const workerNode of this.workerNodes) {
873 workerNode.deleteTaskFunctionWorkerUsage(name)
874 }
875 }
876
375f7504
JB
877 private shallExecuteTask (workerNodeKey: number): boolean {
878 return (
879 this.tasksQueueSize(workerNodeKey) === 0 &&
880 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
881 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
882 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
883 )
884 }
885
afc003b2 886 /** @inheritDoc */
7d91a8cd
JB
887 public async execute (
888 data?: Data,
889 name?: string,
890 transferList?: TransferListItem[]
891 ): Promise<Response> {
52b71763 892 return await new Promise<Response>((resolve, reject) => {
15b176e0 893 if (!this.started) {
47352846 894 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 895 return
15b176e0 896 }
711623b8
JB
897 if (this.destroying) {
898 reject(new Error('Cannot execute a task on destroying pool'))
899 return
900 }
7d91a8cd
JB
901 if (name != null && typeof name !== 'string') {
902 reject(new TypeError('name argument must be a string'))
9d2d0da1 903 return
7d91a8cd 904 }
90d7d101
JB
905 if (
906 name != null &&
907 typeof name === 'string' &&
908 name.trim().length === 0
909 ) {
f58b60b9 910 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 911 return
90d7d101 912 }
b558f6b5
JB
913 if (transferList != null && !Array.isArray(transferList)) {
914 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 915 return
b558f6b5
JB
916 }
917 const timestamp = performance.now()
918 const workerNodeKey = this.chooseWorkerNode()
501aea93 919 const task: Task<Data> = {
52b71763
JB
920 name: name ?? DEFAULT_TASK_NAME,
921 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
922 data: data ?? ({} as Data),
7d91a8cd 923 transferList,
52b71763 924 timestamp,
7629bdf1 925 taskId: randomUUID()
52b71763 926 }
67f3f2d6
JB
927 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
928 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
929 resolve,
930 reject,
f18fd12b
JB
931 workerNodeKey,
932 ...(this.emitter != null && {
933 asyncResource: new AsyncResource('poolifier:task', {
934 triggerAsyncId: this.emitter.asyncId,
935 requireManualDestroy: true
936 })
937 })
2e81254d 938 })
52b71763 939 if (
4e377863
JB
940 this.opts.enableTasksQueue === false ||
941 (this.opts.enableTasksQueue === true &&
375f7504 942 this.shallExecuteTask(workerNodeKey))
52b71763 943 ) {
501aea93 944 this.executeTask(workerNodeKey, task)
4e377863
JB
945 } else {
946 this.enqueueTask(workerNodeKey, task)
52b71763 947 }
2e81254d 948 })
280c2a77 949 }
c97c7edb 950
47352846
JB
951 /** @inheritdoc */
952 public start (): void {
711623b8
JB
953 if (this.started) {
954 throw new Error('Cannot start an already started pool')
955 }
956 if (this.starting) {
957 throw new Error('Cannot start an already starting pool')
958 }
959 if (this.destroying) {
960 throw new Error('Cannot start a destroying pool')
961 }
47352846
JB
962 this.starting = true
963 while (
964 this.workerNodes.reduce(
965 (accumulator, workerNode) =>
966 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
967 0
26ce26ca 968 ) < this.minimumNumberOfWorkers
47352846
JB
969 ) {
970 this.createAndSetupWorkerNode()
971 }
972 this.starting = false
973 this.started = true
974 }
975
afc003b2 976 /** @inheritDoc */
c97c7edb 977 public async destroy (): Promise<void> {
711623b8
JB
978 if (!this.started) {
979 throw new Error('Cannot destroy an already destroyed pool')
980 }
981 if (this.starting) {
982 throw new Error('Cannot destroy an starting pool')
983 }
984 if (this.destroying) {
985 throw new Error('Cannot destroy an already destroying pool')
986 }
987 this.destroying = true
1fbcaa7c 988 await Promise.all(
14c39df1 989 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 990 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
991 })
992 )
33e6bb4c 993 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 994 this.emitter?.emitDestroy()
78f60f82 995 this.emitter?.removeAllListeners()
55082af9 996 this.readyEventEmitted = false
711623b8 997 this.destroying = false
15b176e0 998 this.started = false
c97c7edb
S
999 }
1000
26ce26ca 1001 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1002 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1003 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1004 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1005 resolve()
85b2561d
JB
1006 return
1007 }
ae036c3e
JB
1008 const killMessageListener = (message: MessageValue<Response>): void => {
1009 this.checkMessageWorkerId(message)
1e3214b6
JB
1010 if (message.kill === 'success') {
1011 resolve()
1012 } else if (message.kill === 'failure') {
72ae84a2
JB
1013 reject(
1014 new Error(
c63a35a0 1015 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1016 )
1017 )
1e3214b6 1018 }
ae036c3e 1019 }
51d9cfbd 1020 // FIXME: should be registered only once
ae036c3e 1021 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1022 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1023 })
1e3214b6
JB
1024 }
1025
4a6952ff 1026 /**
aa9eede8 1027 * Terminates the worker node given its worker node key.
4a6952ff 1028 *
aa9eede8 1029 * @param workerNodeKey - The worker node key.
4a6952ff 1030 */
07e0c9e5
JB
1031 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1032 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1033 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1034 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1035 await waitWorkerNodeEvents(
1036 workerNode,
1037 'taskFinished',
1038 flushedTasks,
1039 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1040 getDefaultTasksQueueOptions(
1041 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1042 ).tasksFinishedTimeout
32b141fd 1043 )
07e0c9e5
JB
1044 await this.sendKillMessageToWorker(workerNodeKey)
1045 await workerNode.terminate()
1046 }
c97c7edb 1047
729c563d 1048 /**
6677a3d3
JB
1049 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1050 * Can be overridden.
afc003b2
JB
1051 *
1052 * @virtual
729c563d 1053 */
280c2a77 1054 protected setupHook (): void {
965df41c 1055 /* Intentionally empty */
280c2a77 1056 }
c97c7edb 1057
729c563d 1058 /**
280c2a77
S
1059 * Should return whether the worker is the main worker or not.
1060 */
1061 protected abstract isMain (): boolean
1062
1063 /**
2e81254d 1064 * Hook executed before the worker task execution.
bf9549ae 1065 * Can be overridden.
729c563d 1066 *
f06e48d8 1067 * @param workerNodeKey - The worker node key.
1c6fe997 1068 * @param task - The task to execute.
729c563d 1069 */
1c6fe997
JB
1070 protected beforeTaskExecutionHook (
1071 workerNodeKey: number,
1072 task: Task<Data>
1073 ): void {
c63a35a0 1074 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1075 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1076 const workerUsage = this.workerNodes[workerNodeKey].usage
1077 ++workerUsage.tasks.executing
c329fd41
JB
1078 updateWaitTimeWorkerUsage(
1079 this.workerChoiceStrategyContext,
1080 workerUsage,
1081 task
1082 )
94407def
JB
1083 }
1084 if (
1085 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1086 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1087 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1088 null
94407def 1089 ) {
67f3f2d6 1090 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1091 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1092 workerNodeKey
67f3f2d6
JB
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1095 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1096 updateWaitTimeWorkerUsage(
1097 this.workerChoiceStrategyContext,
1098 taskFunctionWorkerUsage,
1099 task
1100 )
b558f6b5 1101 }
c97c7edb
S
1102 }
1103
c01733f1 1104 /**
2e81254d 1105 * Hook executed after the worker task execution.
bf9549ae 1106 * Can be overridden.
c01733f1 1107 *
501aea93 1108 * @param workerNodeKey - The worker node key.
38e795c1 1109 * @param message - The received message.
c01733f1 1110 */
2e81254d 1111 protected afterTaskExecutionHook (
501aea93 1112 workerNodeKey: number,
2740a743 1113 message: MessageValue<Response>
bf9549ae 1114 ): void {
c329fd41 1115 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1116 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1117 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1118 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1119 updateTaskStatisticsWorkerUsage(workerUsage, message)
1120 updateRunTimeWorkerUsage(
1121 this.workerChoiceStrategyContext,
1122 workerUsage,
1123 message
1124 )
1125 updateEluWorkerUsage(
1126 this.workerChoiceStrategyContext,
1127 workerUsage,
1128 message
1129 )
1130 needWorkerChoiceStrategyUpdate = true
94407def
JB
1131 }
1132 if (
1133 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1134 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1135 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1136 message.taskPerformance!.name
94407def
JB
1137 ) != null
1138 ) {
67f3f2d6 1139 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1140 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1141 workerNodeKey
67f3f2d6
JB
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1144 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1145 updateRunTimeWorkerUsage(
1146 this.workerChoiceStrategyContext,
1147 taskFunctionWorkerUsage,
1148 message
1149 )
1150 updateEluWorkerUsage(
1151 this.workerChoiceStrategyContext,
1152 taskFunctionWorkerUsage,
1153 message
1154 )
1155 needWorkerChoiceStrategyUpdate = true
1156 }
1157 if (needWorkerChoiceStrategyUpdate) {
c63a35a0 1158 this.workerChoiceStrategyContext?.update(workerNodeKey)
b558f6b5
JB
1159 }
1160 }
1161
db0e38ee
JB
1162 /**
1163 * Whether the worker node shall update its task function worker usage or not.
1164 *
1165 * @param workerNodeKey - The worker node key.
1166 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1167 */
1168 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1169 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1170 return (
1851fed0 1171 workerInfo != null &&
6703b9f4
JB
1172 Array.isArray(workerInfo.taskFunctionNames) &&
1173 workerInfo.taskFunctionNames.length > 2
b558f6b5 1174 )
f1c06930
JB
1175 }
1176
280c2a77 1177 /**
f06e48d8 1178 * Chooses a worker node for the next task.
280c2a77 1179 *
6c6afb84 1180 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1181 *
aa9eede8 1182 * @returns The chosen worker node key
280c2a77 1183 */
6c6afb84 1184 private chooseWorkerNode (): number {
930dcf12 1185 if (this.shallCreateDynamicWorker()) {
aa9eede8 1186 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1187 if (
c63a35a0
JB
1188 this.workerChoiceStrategyContext?.getStrategyPolicy()
1189 .dynamicWorkerUsage === true
6c6afb84 1190 ) {
aa9eede8 1191 return workerNodeKey
6c6afb84 1192 }
17393ac8 1193 }
c63a35a0 1194 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
5d240702 1195 return this.workerChoiceStrategyContext!.execute()
930dcf12
JB
1196 }
1197
6c6afb84
JB
1198 /**
1199 * Conditions for dynamic worker creation.
1200 *
1201 * @returns Whether to create a dynamic worker or not.
1202 */
9d9fb7b6 1203 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1204
280c2a77 1205 /**
aa9eede8 1206 * Sends a message to worker given its worker node key.
280c2a77 1207 *
aa9eede8 1208 * @param workerNodeKey - The worker node key.
38e795c1 1209 * @param message - The message.
7d91a8cd 1210 * @param transferList - The optional array of transferable objects.
280c2a77
S
1211 */
1212 protected abstract sendToWorker (
aa9eede8 1213 workerNodeKey: number,
7d91a8cd
JB
1214 message: MessageValue<Data>,
1215 transferList?: TransferListItem[]
280c2a77
S
1216 ): void
1217
4a6952ff 1218 /**
aa9eede8 1219 * Creates a new, completely set up worker node.
4a6952ff 1220 *
aa9eede8 1221 * @returns New, completely set up worker node key.
4a6952ff 1222 */
aa9eede8 1223 protected createAndSetupWorkerNode (): number {
c3719753
JB
1224 const workerNode = this.createWorkerNode()
1225 workerNode.registerWorkerEventHandler(
1226 'online',
1227 this.opts.onlineHandler ?? EMPTY_FUNCTION
1228 )
1229 workerNode.registerWorkerEventHandler(
1230 'message',
1231 this.opts.messageHandler ?? EMPTY_FUNCTION
1232 )
1233 workerNode.registerWorkerEventHandler(
1234 'error',
1235 this.opts.errorHandler ?? EMPTY_FUNCTION
1236 )
1237 workerNode.registerWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1238 workerNode.info.ready = false
2a69b8c5 1239 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1240 if (
b6bfca01 1241 this.started &&
711623b8 1242 !this.destroying &&
9b38ab2d 1243 this.opts.restartWorkerOnError === true
15b176e0 1244 ) {
07e0c9e5 1245 if (workerNode.info.dynamic) {
aa9eede8 1246 this.createAndSetupDynamicWorkerNode()
8a1260a3 1247 } else {
aa9eede8 1248 this.createAndSetupWorkerNode()
8a1260a3 1249 }
5baee0d7 1250 }
3c9123c7
JB
1251 if (
1252 this.started &&
1253 !this.destroying &&
1254 this.opts.enableTasksQueue === true
1255 ) {
9974369e 1256 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1257 }
b4f8aca8
JB
1258 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1259 workerNode?.terminate().catch(error => {
07e0c9e5
JB
1260 this.emitter?.emit(PoolEvents.error, error)
1261 })
5baee0d7 1262 })
c3719753
JB
1263 workerNode.registerWorkerEventHandler(
1264 'exit',
1265 this.opts.exitHandler ?? EMPTY_FUNCTION
1266 )
1267 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1268 this.removeWorkerNode(workerNode)
a974afa6 1269 })
c3719753 1270 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1271 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1272 return workerNodeKey
c97c7edb 1273 }
be0676b3 1274
930dcf12 1275 /**
aa9eede8 1276 * Creates a new, completely set up dynamic worker node.
930dcf12 1277 *
aa9eede8 1278 * @returns New, completely set up dynamic worker node key.
930dcf12 1279 */
aa9eede8
JB
1280 protected createAndSetupDynamicWorkerNode (): number {
1281 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1282 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1283 this.checkMessageWorkerId(message)
aa9eede8
JB
1284 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1285 message.workerId
aad6fb64 1286 )
2b6b412f 1287 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1288 // Kill message received from worker
930dcf12
JB
1289 if (
1290 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1291 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1292 ((this.opts.enableTasksQueue === false &&
aa9eede8 1293 workerUsage.tasks.executing === 0) ||
7b56f532 1294 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1295 workerUsage.tasks.executing === 0 &&
1296 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1297 ) {
f3827d5d 1298 // Flag the worker node as not ready immediately
ae3ab61d 1299 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1300 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1301 this.emitter?.emit(PoolEvents.error, error)
1302 })
930dcf12
JB
1303 }
1304 })
aa9eede8 1305 this.sendToWorker(workerNodeKey, {
e9dd5b66 1306 checkActive: true
21f710aa 1307 })
72ae84a2
JB
1308 if (this.taskFunctions.size > 0) {
1309 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1310 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1311 taskFunctionOperation: 'add',
1312 taskFunctionName,
1313 taskFunction: taskFunction.toString()
1314 }).catch(error => {
1315 this.emitter?.emit(PoolEvents.error, error)
1316 })
1317 }
1318 }
e44639e9
JB
1319 const workerNode = this.workerNodes[workerNodeKey]
1320 workerNode.info.dynamic = true
b1aae695 1321 if (
c63a35a0
JB
1322 this.workerChoiceStrategyContext?.getStrategyPolicy()
1323 .dynamicWorkerReady === true ||
1324 this.workerChoiceStrategyContext?.getStrategyPolicy()
1325 .dynamicWorkerUsage === true
b1aae695 1326 ) {
e44639e9 1327 workerNode.info.ready = true
b5e113f6 1328 }
33e6bb4c 1329 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1330 return workerNodeKey
930dcf12
JB
1331 }
1332
a2ed5053 1333 /**
aa9eede8 1334 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1335 *
aa9eede8 1336 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1337 * @param listener - The message listener callback.
1338 */
85aeb3f3
JB
1339 protected abstract registerWorkerMessageListener<
1340 Message extends Data | Response
aa9eede8
JB
1341 >(
1342 workerNodeKey: number,
1343 listener: (message: MessageValue<Message>) => void
1344 ): void
a2ed5053 1345
ae036c3e
JB
1346 /**
1347 * Registers once a listener callback on the worker given its worker node key.
1348 *
1349 * @param workerNodeKey - The worker node key.
1350 * @param listener - The message listener callback.
1351 */
1352 protected abstract registerOnceWorkerMessageListener<
1353 Message extends Data | Response
1354 >(
1355 workerNodeKey: number,
1356 listener: (message: MessageValue<Message>) => void
1357 ): void
1358
1359 /**
1360 * Deregisters a listener callback on the worker given its worker node key.
1361 *
1362 * @param workerNodeKey - The worker node key.
1363 * @param listener - The message listener callback.
1364 */
1365 protected abstract deregisterWorkerMessageListener<
1366 Message extends Data | Response
1367 >(
1368 workerNodeKey: number,
1369 listener: (message: MessageValue<Message>) => void
1370 ): void
1371
a2ed5053 1372 /**
aa9eede8 1373 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1374 * Can be overridden.
1375 *
aa9eede8 1376 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1377 */
aa9eede8 1378 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1379 // Listen to worker messages.
fcd39179
JB
1380 this.registerWorkerMessageListener(
1381 workerNodeKey,
3a776b6d 1382 this.workerMessageListener
fcd39179 1383 )
85aeb3f3 1384 // Send the startup message to worker.
aa9eede8 1385 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1386 // Send the statistics message to worker.
1387 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1388 if (this.opts.enableTasksQueue === true) {
dbd73092 1389 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1390 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1391 'idle',
1392 this.handleWorkerNodeIdleEvent
9f95d5eb 1393 )
47352846
JB
1394 }
1395 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1396 this.workerNodes[workerNodeKey].on(
b5e75be8 1397 'backPressure',
e44639e9 1398 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1399 )
47352846 1400 }
72695f86 1401 }
d2c73f82
JB
1402 }
1403
85aeb3f3 1404 /**
aa9eede8
JB
1405 * Sends the startup message to worker given its worker node key.
1406 *
1407 * @param workerNodeKey - The worker node key.
1408 */
1409 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1410
1411 /**
9edb9717 1412 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1413 *
aa9eede8 1414 * @param workerNodeKey - The worker node key.
85aeb3f3 1415 */
9edb9717 1416 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1417 this.sendToWorker(workerNodeKey, {
1418 statistics: {
1419 runTime:
c63a35a0 1420 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 1421 .runTime.aggregate ?? false,
c63a35a0 1422 elu:
f4d0a470 1423 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c63a35a0 1424 .aggregate ?? false
72ae84a2 1425 }
aa9eede8
JB
1426 })
1427 }
a2ed5053 1428
5eb72b9e
JB
1429 private cannotStealTask (): boolean {
1430 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1431 }
1432
42c677c1
JB
1433 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1434 if (this.shallExecuteTask(workerNodeKey)) {
1435 this.executeTask(workerNodeKey, task)
1436 } else {
1437 this.enqueueTask(workerNodeKey, task)
1438 }
1439 }
1440
a2ed5053 1441 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1442 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1443 return
1444 }
a2ed5053 1445 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1446 const destinationWorkerNodeKey = this.workerNodes.reduce(
1447 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1448 return workerNode.info.ready &&
1449 workerNode.usage.tasks.queued <
1450 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1451 ? workerNodeKey
1452 : minWorkerNodeKey
1453 },
1454 0
1455 )
42c677c1
JB
1456 this.handleTask(
1457 destinationWorkerNodeKey,
67f3f2d6
JB
1458 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1459 this.dequeueTask(workerNodeKey)!
42c677c1 1460 )
dd951876
JB
1461 }
1462 }
1463
b1838604
JB
1464 private updateTaskStolenStatisticsWorkerUsage (
1465 workerNodeKey: number,
b1838604
JB
1466 taskName: string
1467 ): void {
1a880eca 1468 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1469 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1470 if (workerNode?.usage != null) {
b1838604
JB
1471 ++workerNode.usage.tasks.stolen
1472 }
1473 if (
1474 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1475 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1476 ) {
67f3f2d6
JB
1477 const taskFunctionWorkerUsage =
1478 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1479 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1480 ++taskFunctionWorkerUsage.tasks.stolen
1481 }
1482 }
1483
463226a4 1484 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1485 workerNodeKey: number
463226a4
JB
1486 ): void {
1487 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1488 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1489 if (workerNode?.usage != null) {
463226a4
JB
1490 ++workerNode.usage.tasks.sequentiallyStolen
1491 }
f1f77f45
JB
1492 }
1493
1494 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1495 workerNodeKey: number,
1496 taskName: string
1497 ): void {
1498 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1499 if (
1500 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1501 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1502 ) {
67f3f2d6
JB
1503 const taskFunctionWorkerUsage =
1504 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1505 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1506 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1507 }
1508 }
1509
1510 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1511 workerNodeKey: number
463226a4
JB
1512 ): void {
1513 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1514 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1515 if (workerNode?.usage != null) {
463226a4
JB
1516 workerNode.usage.tasks.sequentiallyStolen = 0
1517 }
f1f77f45
JB
1518 }
1519
1520 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1521 workerNodeKey: number,
1522 taskName: string
1523 ): void {
1524 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1525 if (
1526 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1527 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1528 ) {
67f3f2d6
JB
1529 const taskFunctionWorkerUsage =
1530 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1531 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1532 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1533 }
1534 }
1535
e44639e9 1536 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1537 eventDetail: WorkerNodeEventDetail,
463226a4 1538 previousStolenTask?: Task<Data>
9f95d5eb 1539 ): void => {
e1c2dba7 1540 const { workerNodeKey } = eventDetail
463226a4
JB
1541 if (workerNodeKey == null) {
1542 throw new Error(
c63a35a0 1543 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1544 )
1545 }
c63a35a0 1546 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1547 if (
1548 this.cannotStealTask() ||
c63a35a0
JB
1549 (this.info.stealingWorkerNodes ?? 0) >
1550 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1551 ) {
1851fed0 1552 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1553 workerInfo.stealing = false
5eb72b9e
JB
1554 }
1555 return
1556 }
463226a4
JB
1557 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1558 if (
1851fed0 1559 workerInfo != null &&
463226a4
JB
1560 previousStolenTask != null &&
1561 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1562 (workerNodeTasksUsage.executing > 0 ||
1563 this.tasksQueueSize(workerNodeKey) > 0)
1564 ) {
c63a35a0 1565 workerInfo.stealing = false
67f3f2d6 1566 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45 1567 for (const taskName of this.workerNodes[workerNodeKey].info
67f3f2d6 1568 .taskFunctionNames!) {
f1f77f45
JB
1569 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1570 workerNodeKey,
1571 taskName
1572 )
1573 }
1574 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1575 return
1576 }
1851fed0
JB
1577 if (workerInfo == null) {
1578 throw new Error(
1579 `Worker node with key '${workerNodeKey}' not found in pool`
1580 )
1581 }
c63a35a0 1582 workerInfo.stealing = true
463226a4 1583 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1584 if (
1585 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1586 stolenTask != null
1587 ) {
67f3f2d6 1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1589 const taskFunctionTasksWorkerUsage = this.workerNodes[
1590 workerNodeKey
67f3f2d6
JB
1591 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1592 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1593 if (
1594 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1595 (previousStolenTask != null &&
1596 previousStolenTask.name === stolenTask.name &&
1597 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1598 ) {
1599 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1600 workerNodeKey,
67f3f2d6
JB
1601 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1602 stolenTask.name!
f1f77f45
JB
1603 )
1604 } else {
1605 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1606 workerNodeKey,
67f3f2d6
JB
1607 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1608 stolenTask.name!
f1f77f45
JB
1609 )
1610 }
1611 }
463226a4
JB
1612 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1613 .then(() => {
e44639e9 1614 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1615 return undefined
1616 })
2e8cfbb7
JB
1617 .catch(error => {
1618 this.emitter?.emit(PoolEvents.error, error)
1619 })
463226a4
JB
1620 }
1621
1622 private readonly workerNodeStealTask = (
1623 workerNodeKey: number
1624 ): Task<Data> | undefined => {
dd951876 1625 const workerNodes = this.workerNodes
a6b3272b 1626 .slice()
dd951876
JB
1627 .sort(
1628 (workerNodeA, workerNodeB) =>
1629 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1630 )
f201a0cd 1631 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1632 (sourceWorkerNode, sourceWorkerNodeKey) =>
1633 sourceWorkerNode.info.ready &&
5eb72b9e 1634 !sourceWorkerNode.info.stealing &&
463226a4
JB
1635 sourceWorkerNodeKey !== workerNodeKey &&
1636 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1637 )
1638 if (sourceWorkerNode != null) {
67f3f2d6
JB
1639 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1640 const task = sourceWorkerNode.popTask()!
42c677c1 1641 this.handleTask(workerNodeKey, task)
f1f77f45 1642 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1643 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1644 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1645 return task
72695f86
JB
1646 }
1647 }
1648
e44639e9 1649 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1650 eventDetail: WorkerNodeEventDetail
9f95d5eb 1651 ): void => {
5eb72b9e
JB
1652 if (
1653 this.cannotStealTask() ||
c63a35a0
JB
1654 (this.info.stealingWorkerNodes ?? 0) >
1655 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1656 ) {
cb71d660
JB
1657 return
1658 }
e1c2dba7 1659 const { workerId } = eventDetail
f778c355 1660 const sizeOffset = 1
67f3f2d6
JB
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1663 return
1664 }
72695f86 1665 const sourceWorkerNode =
b5e75be8 1666 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1667 const workerNodes = this.workerNodes
a6b3272b 1668 .slice()
72695f86
JB
1669 .sort(
1670 (workerNodeA, workerNodeB) =>
1671 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1672 )
1673 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1674 if (
0bc68267 1675 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1676 workerNode.info.ready &&
5eb72b9e 1677 !workerNode.info.stealing &&
b5e75be8 1678 workerNode.info.id !== workerId &&
0bc68267 1679 workerNode.usage.tasks.queued <
67f3f2d6
JB
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1682 ) {
c63a35a0 1683 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1684 if (workerInfo == null) {
1685 throw new Error(
1686 `Worker node with key '${workerNodeKey}' not found in pool`
1687 )
1688 }
c63a35a0 1689 workerInfo.stealing = true
67f3f2d6
JB
1690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1691 const task = sourceWorkerNode.popTask()!
42c677c1 1692 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1694 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1695 workerInfo.stealing = false
10ecf8fd 1696 }
a2ed5053
JB
1697 }
1698 }
1699
be0676b3 1700 /**
fcd39179 1701 * This method is the message listener registered on each worker.
be0676b3 1702 */
3a776b6d
JB
1703 protected readonly workerMessageListener = (
1704 message: MessageValue<Response>
1705 ): void => {
fcd39179 1706 this.checkMessageWorkerId(message)
b641345c
JB
1707 const { workerId, ready, taskId, taskFunctionNames } = message
1708 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1709 // Worker ready response received from worker
1710 this.handleWorkerReadyResponse(message)
b641345c 1711 } else if (taskId != null) {
fcd39179
JB
1712 // Task execution response received from worker
1713 this.handleTaskExecutionResponse(message)
b641345c 1714 } else if (taskFunctionNames != null) {
fcd39179 1715 // Task function names message received from worker
1851fed0 1716 const workerInfo = this.getWorkerInfo(
b641345c 1717 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1718 )
1719 if (workerInfo != null) {
1720 workerInfo.taskFunctionNames = taskFunctionNames
1721 }
6b272951
JB
1722 }
1723 }
1724
8e8d9101
JB
1725 private checkAndEmitReadyEvent (): void {
1726 if (!this.readyEventEmitted && this.ready) {
1727 this.emitter?.emit(PoolEvents.ready, this.info)
1728 this.readyEventEmitted = true
1729 }
1730 }
1731
10e2aa7e 1732 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4 1733 const { workerId, ready, taskFunctionNames } = message
e44639e9 1734 if (ready == null || !ready) {
c63a35a0 1735 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1736 }
e44639e9
JB
1737 const workerNode =
1738 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1739 workerNode.info.ready = ready
1740 workerNode.info.taskFunctionNames = taskFunctionNames
8e8d9101 1741 this.checkAndEmitReadyEvent()
6b272951
JB
1742 }
1743
1744 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1745 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1746 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1747 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1748 if (promiseResponse != null) {
f18fd12b 1749 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1750 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1751 if (workerError != null) {
1752 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1753 asyncResource != null
1754 ? asyncResource.runInAsyncScope(
1755 reject,
1756 this.emitter,
1757 workerError.message
1758 )
1759 : reject(workerError.message)
6b272951 1760 } else {
f18fd12b
JB
1761 asyncResource != null
1762 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1763 : resolve(data as Response)
6b272951 1764 }
f18fd12b 1765 asyncResource?.emitDestroy()
501aea93 1766 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1767 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1768 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1769 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1770 workerNode?.emit('taskFinished', taskId)
85b2561d 1771 if (this.opts.enableTasksQueue === true && !this.destroying) {
9b358e72 1772 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1773 if (
1774 this.tasksQueueSize(workerNodeKey) > 0 &&
1775 workerNodeTasksUsage.executing <
67f3f2d6
JB
1776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1777 this.opts.tasksQueueOptions!.concurrency!
463226a4 1778 ) {
67f3f2d6
JB
1779 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1780 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1781 }
1782 if (
1783 workerNodeTasksUsage.executing === 0 &&
1784 this.tasksQueueSize(workerNodeKey) === 0 &&
1785 workerNodeTasksUsage.sequentiallyStolen === 0
1786 ) {
e44639e9 1787 workerNode.emit('idle', {
7f0e1334 1788 workerId,
e1c2dba7
JB
1789 workerNodeKey
1790 })
463226a4 1791 }
be0676b3
APA
1792 }
1793 }
be0676b3 1794 }
7c0ba920 1795
a1763c54 1796 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1797 if (this.busy) {
1798 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1799 }
1800 }
1801
1802 private checkAndEmitTaskQueuingEvents (): void {
1803 if (this.hasBackPressure()) {
1804 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1805 }
1806 }
1807
d0878034
JB
1808 /**
1809 * Emits dynamic worker creation events.
1810 */
1811 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1812
8a1260a3 1813 /**
aa9eede8 1814 * Gets the worker information given its worker node key.
8a1260a3
JB
1815 *
1816 * @param workerNodeKey - The worker node key.
3f09ed9f 1817 * @returns The worker information.
8a1260a3 1818 */
1851fed0
JB
1819 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1820 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1821 }
1822
a05c10de 1823 /**
c3719753 1824 * Creates a worker node.
ea7a90d3 1825 *
c3719753 1826 * @returns The created worker node.
ea7a90d3 1827 */
c3719753 1828 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1829 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1830 this.worker,
1831 this.filePath,
1832 {
1833 env: this.opts.env,
1834 workerOptions: this.opts.workerOptions,
1835 tasksQueueBackPressureSize:
32b141fd 1836 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1837 getDefaultTasksQueueOptions(
1838 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1839 ).size
c3719753 1840 }
671d5154 1841 )
b97d82d8 1842 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1843 if (this.starting) {
1844 workerNode.info.ready = true
1845 }
c3719753
JB
1846 return workerNode
1847 }
1848
1849 /**
1850 * Adds the given worker node in the pool worker nodes.
1851 *
1852 * @param workerNode - The worker node.
1853 * @returns The added worker node key.
1854 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1855 */
1856 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1857 this.workerNodes.push(workerNode)
c3719753 1858 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1859 if (workerNodeKey === -1) {
86ed0598 1860 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1861 }
1862 return workerNodeKey
ea7a90d3 1863 }
c923ce56 1864
8e8d9101
JB
1865 private checkAndEmitEmptyEvent (): void {
1866 if (this.empty) {
1867 this.emitter?.emit(PoolEvents.empty, this.info)
1868 this.readyEventEmitted = false
1869 }
1870 }
1871
51fe3d3c 1872 /**
9974369e 1873 * Removes the worker node from the pool worker nodes.
51fe3d3c 1874 *
9974369e 1875 * @param workerNode - The worker node.
51fe3d3c 1876 */
9974369e
JB
1877 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1878 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1879 if (workerNodeKey !== -1) {
1880 this.workerNodes.splice(workerNodeKey, 1)
c63a35a0 1881 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1f68cede 1882 }
8e8d9101 1883 this.checkAndEmitEmptyEvent()
51fe3d3c 1884 }
adc3c320 1885
ae3ab61d 1886 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
1887 const workerInfo = this.getWorkerInfo(workerNodeKey)
1888 if (workerInfo != null) {
1889 workerInfo.ready = false
1890 }
ae3ab61d
JB
1891 }
1892
9e844245
JB
1893 private hasBackPressure (): boolean {
1894 return (
1895 this.opts.enableTasksQueue === true &&
1896 this.workerNodes.findIndex(
041dc05b 1897 workerNode => !workerNode.hasBackPressure()
a1763c54 1898 ) === -1
9e844245 1899 )
e2b31e32
JB
1900 }
1901
b0a4db63 1902 /**
aa9eede8 1903 * Executes the given task on the worker given its worker node key.
b0a4db63 1904 *
aa9eede8 1905 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1906 * @param task - The task to execute.
1907 */
2e81254d 1908 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1909 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1910 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1911 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1912 }
1913
f9f00b5f 1914 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1915 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1916 this.checkAndEmitTaskQueuingEvents()
1917 return tasksQueueSize
adc3c320
JB
1918 }
1919
416fd65c 1920 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1921 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1922 }
1923
416fd65c 1924 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1925 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1926 }
1927
87347ea8
JB
1928 protected flushTasksQueue (workerNodeKey: number): number {
1929 let flushedTasks = 0
920278a2 1930 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
1931 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1932 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 1933 ++flushedTasks
ff733df7 1934 }
4b628b48 1935 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1936 return flushedTasks
ff733df7
JB
1937 }
1938
ef41a6e6
JB
1939 private flushTasksQueues (): void {
1940 for (const [workerNodeKey] of this.workerNodes.entries()) {
1941 this.flushTasksQueue(workerNodeKey)
1942 }
1943 }
c97c7edb 1944}