test: align randomInt interval
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
f18fd12b 5import { AsyncResource } from 'node:async_hooks'
5c4d16da
JB
6import type {
7 MessageValue,
8 PromiseResponseWrapper,
76d91ea0 9 Task
d35e5717 10} from '../utility-types.js'
bbeadd16 11import {
ff128cc9 12 DEFAULT_TASK_NAME,
bbeadd16 13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
d35e5717
JB
23} from '../utils.js'
24import { KillBehaviors } from '../worker/worker-options.js'
25import type { TaskFunction } from '../worker/task-functions.js'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
d35e5717 34} from './pool.js'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
38 WorkerInfo,
9f95d5eb 39 WorkerNodeEventDetail,
67f3f2d6 40 WorkerType
d35e5717 41} from './worker.js'
a35560ba 42import {
f0d7f803 43 Measurements,
a35560ba 44 WorkerChoiceStrategies,
a20f0ba5
JB
45 type WorkerChoiceStrategy,
46 type WorkerChoiceStrategyOptions
d35e5717
JB
47} from './selection-strategies/selection-strategies-types.js'
48import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
49import { version } from './version.js'
50import { WorkerNode } from './worker-node.js'
bde6b5d7
JB
51import {
52 checkFilePath,
53 checkValidTasksQueueOptions,
bfc75cca 54 checkValidWorkerChoiceStrategy,
32b141fd 55 getDefaultTasksQueueOptions,
c329fd41
JB
56 updateEluWorkerUsage,
57 updateRunTimeWorkerUsage,
58 updateTaskStatisticsWorkerUsage,
59 updateWaitTimeWorkerUsage,
87347ea8 60 waitWorkerNodeEvents
d35e5717 61} from './utils.js'
23ccf9d7 62
729c563d 63/**
ea7a90d3 64 * Base class that implements some shared logic for all poolifier pools.
729c563d 65 *
38e795c1 66 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
67 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
68 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 69 */
c97c7edb 70export abstract class AbstractPool<
f06e48d8 71 Worker extends IWorker,
d3c8a1a8
S
72 Data = unknown,
73 Response = unknown
c4855468 74> implements IPool<Worker, Data, Response> {
afc003b2 75 /** @inheritDoc */
4b628b48 76 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 77
afc003b2 78 /** @inheritDoc */
f80125ca 79 public emitter?: EventEmitterAsyncResource
7c0ba920 80
be0676b3 81 /**
419e8121 82 * The task execution response promise map:
2740a743 83 * - `key`: The message id of each submitted task.
a3445496 84 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 85 *
a3445496 86 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 87 */
501aea93
JB
88 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
89 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 90
a35560ba 91 /**
51fe3d3c 92 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 93 */
c63a35a0 94 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
78cea37e
JB
95 Worker,
96 Data,
97 Response
a35560ba
S
98 >
99
72ae84a2
JB
100 /**
101 * The task functions added at runtime map:
102 * - `key`: The task function name.
103 * - `value`: The task function itself.
104 */
105 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
106
15b176e0
JB
107 /**
108 * Whether the pool is started or not.
109 */
110 private started: boolean
47352846
JB
111 /**
112 * Whether the pool is starting or not.
113 */
114 private starting: boolean
711623b8
JB
115 /**
116 * Whether the pool is destroying or not.
117 */
118 private destroying: boolean
55082af9
JB
119 /**
120 * Whether the pool ready event has been emitted or not.
121 */
122 private readyEventEmitted: boolean
afe0d5bf
JB
123 /**
124 * The start timestamp of the pool.
125 */
126 private readonly startTimestamp
127
729c563d
S
128 /**
129 * Constructs a new poolifier pool.
130 *
00e1bdeb 131 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 132 * @param filePath - Path to the worker file.
38e795c1 133 * @param opts - Options for the pool.
00e1bdeb 134 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 135 */
c97c7edb 136 public constructor (
26ce26ca 137 protected readonly minimumNumberOfWorkers: number,
b4213b7f 138 protected readonly filePath: string,
26ce26ca
JB
139 protected readonly opts: PoolOptions<Worker>,
140 protected readonly maximumNumberOfWorkers?: number
c97c7edb 141 ) {
78cea37e 142 if (!this.isMain()) {
04f45163 143 throw new Error(
8c6d4acf 144 'Cannot start a pool from a worker with the same type as the pool'
04f45163 145 )
c97c7edb 146 }
26ce26ca 147 this.checkPoolType()
bde6b5d7 148 checkFilePath(this.filePath)
26ce26ca 149 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 150 this.checkPoolOptions(this.opts)
1086026a 151
7254e419
JB
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 155
6bd72cd0 156 if (this.opts.enableEvents === true) {
b5604034 157 this.initializeEventEmitter()
7c0ba920 158 }
d59df138
JB
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
da309861
JB
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
b6b32453
JB
168
169 this.setupHook()
170
72ae84a2
JB
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
47352846 173 this.started = false
075e51d1 174 this.starting = false
711623b8 175 this.destroying = false
55082af9 176 this.readyEventEmitted = false
47352846
JB
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
afe0d5bf
JB
180
181 this.startTimestamp = performance.now()
c97c7edb
S
182 }
183
26ce26ca
JB
184 private checkPoolType (): void {
185 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
186 throw new Error(
187 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
188 )
189 }
190 }
191
c63a35a0
JB
192 private checkMinimumNumberOfWorkers (
193 minimumNumberOfWorkers: number | undefined
194 ): void {
af7f2788 195 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
196 throw new Error(
197 'Cannot instantiate a pool without specifying the number of workers'
198 )
af7f2788 199 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 200 throw new TypeError(
0d80593b 201 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 202 )
af7f2788 203 } else if (minimumNumberOfWorkers < 0) {
473c717a 204 throw new RangeError(
8d3782fa
JB
205 'Cannot instantiate a pool with a negative number of workers'
206 )
af7f2788 207 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
208 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
209 }
210 }
211
7c0ba920 212 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 213 if (isPlainObject(opts)) {
47352846 214 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 215 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
216 this.opts.workerChoiceStrategy =
217 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 218 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 219 opts.workerChoiceStrategyOptions
2324f8c9 220 )
26ce26ca
JB
221 if (opts.workerChoiceStrategyOptions != null) {
222 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 223 }
1f68cede 224 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
225 this.opts.enableEvents = opts.enableEvents ?? true
226 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
227 if (this.opts.enableTasksQueue) {
c63a35a0 228 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 229 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 230 opts.tasksQueueOptions
0d80593b
JB
231 )
232 }
233 } else {
234 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 235 }
aee46736
JB
236 }
237
0d80593b 238 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 239 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 240 ): void {
2324f8c9
JB
241 if (
242 workerChoiceStrategyOptions != null &&
243 !isPlainObject(workerChoiceStrategyOptions)
244 ) {
0d80593b
JB
245 throw new TypeError(
246 'Invalid worker choice strategy options: must be a plain object'
247 )
248 }
49be33fe 249 if (
2324f8c9 250 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
251 Object.keys(workerChoiceStrategyOptions.weights).length !==
252 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
253 ) {
254 throw new Error(
255 'Invalid worker choice strategy options: must have a weight for each worker node'
256 )
257 }
f0d7f803 258 if (
2324f8c9 259 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
260 !Object.values(Measurements).includes(
261 workerChoiceStrategyOptions.measurement
262 )
263 ) {
264 throw new Error(
265 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
266 )
267 }
0d80593b
JB
268 }
269
b5604034 270 private initializeEventEmitter (): void {
5b7d00b4 271 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
272 name: `poolifier:${this.type}-${this.worker}-pool`
273 })
274 }
275
08f3f44c 276 /** @inheritDoc */
6b27d407
JB
277 public get info (): PoolInfo {
278 return {
23ccf9d7 279 version,
6b27d407 280 type: this.type,
184855e6 281 worker: this.worker,
47352846 282 started: this.started,
2431bdb4 283 ready: this.ready,
67f3f2d6
JB
284 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
285 strategy: this.opts.workerChoiceStrategy!,
26ce26ca
JB
286 minSize: this.minimumNumberOfWorkers,
287 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
288 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
c63a35a0
JB
289 ?.runTime.aggregate === true &&
290 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
291 .waitTime.aggregate && {
292 utilization: round(this.utilization)
293 }),
6b27d407
JB
294 workerNodes: this.workerNodes.length,
295 idleWorkerNodes: this.workerNodes.reduce(
296 (accumulator, workerNode) =>
f59e1027 297 workerNode.usage.tasks.executing === 0
a4e07f72
JB
298 ? accumulator + 1
299 : accumulator,
6b27d407
JB
300 0
301 ),
5eb72b9e
JB
302 ...(this.opts.enableTasksQueue === true && {
303 stealingWorkerNodes: this.workerNodes.reduce(
304 (accumulator, workerNode) =>
305 workerNode.info.stealing ? accumulator + 1 : accumulator,
306 0
307 )
308 }),
6b27d407 309 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
310 (accumulator, _workerNode, workerNodeKey) =>
311 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
312 0
313 ),
a4e07f72 314 executedTasks: this.workerNodes.reduce(
6b27d407 315 (accumulator, workerNode) =>
f59e1027 316 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
317 0
318 ),
319 executingTasks: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
f59e1027 321 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
322 0
323 ),
daf86646
JB
324 ...(this.opts.enableTasksQueue === true && {
325 queuedTasks: this.workerNodes.reduce(
326 (accumulator, workerNode) =>
327 accumulator + workerNode.usage.tasks.queued,
328 0
329 )
330 }),
331 ...(this.opts.enableTasksQueue === true && {
332 maxQueuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
c63a35a0 334 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
335 0
336 )
337 }),
a1763c54
JB
338 ...(this.opts.enableTasksQueue === true && {
339 backPressure: this.hasBackPressure()
340 }),
68cbdc84
JB
341 ...(this.opts.enableTasksQueue === true && {
342 stolenTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + workerNode.usage.tasks.stolen,
345 0
346 )
347 }),
a4e07f72
JB
348 failedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
f59e1027 350 accumulator + workerNode.usage.tasks.failed,
a4e07f72 351 0
1dcf8b7b 352 ),
26ce26ca 353 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
c63a35a0 354 ?.runTime.aggregate === true && {
1dcf8b7b 355 runTime: {
98e72cda 356 minimum: round(
90d6701c 357 min(
98e72cda 358 ...this.workerNodes.map(
c63a35a0 359 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 360 )
1dcf8b7b
JB
361 )
362 ),
98e72cda 363 maximum: round(
90d6701c 364 max(
98e72cda 365 ...this.workerNodes.map(
c63a35a0 366 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 367 )
1dcf8b7b 368 )
98e72cda 369 ),
c63a35a0 370 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
371 .runTime.average && {
372 average: round(
373 average(
374 this.workerNodes.reduce<number[]>(
375 (accumulator, workerNode) =>
376 accumulator.concat(workerNode.usage.runTime.history),
377 []
378 )
98e72cda 379 )
dc021bcc 380 )
3baa0837 381 }),
c63a35a0 382 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
383 .runTime.median && {
384 median: round(
385 median(
3baa0837
JB
386 this.workerNodes.reduce<number[]>(
387 (accumulator, workerNode) =>
388 accumulator.concat(workerNode.usage.runTime.history),
389 []
98e72cda
JB
390 )
391 )
392 )
393 })
1dcf8b7b
JB
394 }
395 }),
26ce26ca 396 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
c63a35a0 397 ?.waitTime.aggregate === true && {
1dcf8b7b 398 waitTime: {
98e72cda 399 minimum: round(
90d6701c 400 min(
98e72cda 401 ...this.workerNodes.map(
c63a35a0 402 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 403 )
1dcf8b7b
JB
404 )
405 ),
98e72cda 406 maximum: round(
90d6701c 407 max(
98e72cda 408 ...this.workerNodes.map(
c63a35a0 409 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 410 )
1dcf8b7b 411 )
98e72cda 412 ),
c63a35a0 413 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
414 .waitTime.average && {
415 average: round(
416 average(
417 this.workerNodes.reduce<number[]>(
418 (accumulator, workerNode) =>
419 accumulator.concat(workerNode.usage.waitTime.history),
420 []
421 )
98e72cda 422 )
dc021bcc 423 )
3baa0837 424 }),
c63a35a0 425 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
426 .waitTime.median && {
427 median: round(
428 median(
3baa0837
JB
429 this.workerNodes.reduce<number[]>(
430 (accumulator, workerNode) =>
431 accumulator.concat(workerNode.usage.waitTime.history),
432 []
98e72cda
JB
433 )
434 )
435 )
436 })
1dcf8b7b
JB
437 }
438 })
6b27d407
JB
439 }
440 }
08f3f44c 441
aa9eede8
JB
442 /**
443 * The pool readiness boolean status.
444 */
2431bdb4 445 private get ready (): boolean {
8e8d9101
JB
446 if (this.empty) {
447 return false
448 }
2431bdb4 449 return (
b97d82d8
JB
450 this.workerNodes.reduce(
451 (accumulator, workerNode) =>
452 !workerNode.info.dynamic && workerNode.info.ready
453 ? accumulator + 1
454 : accumulator,
455 0
26ce26ca 456 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
457 )
458 }
459
8e8d9101
JB
460 /**
461 * The pool emptiness boolean status.
462 */
463 protected get empty (): boolean {
fb43035c 464 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
465 }
466
afe0d5bf 467 /**
aa9eede8 468 * The approximate pool utilization.
afe0d5bf
JB
469 *
470 * @returns The pool utilization.
471 */
472 private get utilization (): number {
8e5ca040 473 const poolTimeCapacity =
26ce26ca
JB
474 (performance.now() - this.startTimestamp) *
475 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
476 const totalTasksRunTime = this.workerNodes.reduce(
477 (accumulator, workerNode) =>
c63a35a0 478 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
479 0
480 )
481 const totalTasksWaitTime = this.workerNodes.reduce(
482 (accumulator, workerNode) =>
c63a35a0 483 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
484 0
485 )
8e5ca040 486 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
487 }
488
8881ae32 489 /**
aa9eede8 490 * The pool type.
8881ae32
JB
491 *
492 * If it is `'dynamic'`, it provides the `max` property.
493 */
494 protected abstract get type (): PoolType
495
184855e6 496 /**
aa9eede8 497 * The worker type.
184855e6
JB
498 */
499 protected abstract get worker (): WorkerType
500
6b813701
JB
501 /**
502 * Checks if the worker id sent in the received message from a worker is valid.
503 *
504 * @param message - The received message.
505 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
506 */
4d159167 507 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
508 if (message.workerId == null) {
509 throw new Error('Worker message received without worker id')
8e5a7cf9 510 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
511 throw new Error(
512 `Worker message received from unknown worker '${message.workerId}'`
513 )
514 }
515 }
516
aa9eede8
JB
517 /**
518 * Gets the worker node key given its worker id.
519 *
520 * @param workerId - The worker id.
aad6fb64 521 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 522 */
72ae84a2 523 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 524 return this.workerNodes.findIndex(
041dc05b 525 workerNode => workerNode.info.id === workerId
aad6fb64 526 )
aa9eede8
JB
527 }
528
afc003b2 529 /** @inheritDoc */
a35560ba 530 public setWorkerChoiceStrategy (
59219cbb
JB
531 workerChoiceStrategy: WorkerChoiceStrategy,
532 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 533 ): void {
bde6b5d7 534 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 535 this.opts.workerChoiceStrategy = workerChoiceStrategy
c63a35a0 536 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
b6b32453
JB
537 this.opts.workerChoiceStrategy
538 )
539 if (workerChoiceStrategyOptions != null) {
540 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
541 }
aa9eede8 542 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 543 workerNode.resetUsage()
9edb9717 544 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 545 }
a20f0ba5
JB
546 }
547
548 /** @inheritDoc */
549 public setWorkerChoiceStrategyOptions (
c63a35a0 550 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
a20f0ba5 551 ): void {
0d80593b 552 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
553 if (workerChoiceStrategyOptions != null) {
554 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 555 }
c63a35a0 556 this.workerChoiceStrategyContext?.setOptions(
a20f0ba5 557 this.opts.workerChoiceStrategyOptions
a35560ba
S
558 )
559 }
560
a20f0ba5 561 /** @inheritDoc */
8f52842f
JB
562 public enableTasksQueue (
563 enable: boolean,
564 tasksQueueOptions?: TasksQueueOptions
565 ): void {
a20f0ba5 566 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
567 this.unsetTaskStealing()
568 this.unsetTasksStealingOnBackPressure()
ef41a6e6 569 this.flushTasksQueues()
a20f0ba5
JB
570 }
571 this.opts.enableTasksQueue = enable
c63a35a0 572 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
573 }
574
575 /** @inheritDoc */
c63a35a0
JB
576 public setTasksQueueOptions (
577 tasksQueueOptions: TasksQueueOptions | undefined
578 ): void {
a20f0ba5 579 if (this.opts.enableTasksQueue === true) {
bde6b5d7 580 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
581 this.opts.tasksQueueOptions =
582 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
583 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
584 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 585 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 586 this.unsetTaskStealing()
d6ca1416
JB
587 this.setTaskStealing()
588 } else {
589 this.unsetTaskStealing()
590 }
591 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 592 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
593 this.setTasksStealingOnBackPressure()
594 } else {
595 this.unsetTasksStealingOnBackPressure()
596 }
5baee0d7 597 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
598 delete this.opts.tasksQueueOptions
599 }
600 }
601
9b38ab2d 602 private buildTasksQueueOptions (
c63a35a0 603 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
604 ): TasksQueueOptions {
605 return {
26ce26ca
JB
606 ...getDefaultTasksQueueOptions(
607 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
608 ),
9b38ab2d
JB
609 ...tasksQueueOptions
610 }
611 }
612
5b49e864 613 private setTasksQueueSize (size: number): void {
20c6f652 614 for (const workerNode of this.workerNodes) {
ff3f866a 615 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
616 }
617 }
618
d6ca1416
JB
619 private setTaskStealing (): void {
620 for (const [workerNodeKey] of this.workerNodes.entries()) {
e44639e9 621 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
622 }
623 }
624
625 private unsetTaskStealing (): void {
626 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 627 this.workerNodes[workerNodeKey].off(
e44639e9
JB
628 'idle',
629 this.handleWorkerNodeIdleEvent
9f95d5eb 630 )
d6ca1416
JB
631 }
632 }
633
634 private setTasksStealingOnBackPressure (): void {
635 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 636 this.workerNodes[workerNodeKey].on(
b5e75be8 637 'backPressure',
e44639e9 638 this.handleWorkerNodeBackPressureEvent
9f95d5eb 639 )
d6ca1416
JB
640 }
641 }
642
643 private unsetTasksStealingOnBackPressure (): void {
644 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 645 this.workerNodes[workerNodeKey].off(
b5e75be8 646 'backPressure',
e44639e9 647 this.handleWorkerNodeBackPressureEvent
9f95d5eb 648 )
d6ca1416
JB
649 }
650 }
651
c319c66b
JB
652 /**
653 * Whether the pool is full or not.
654 *
655 * The pool filling boolean status.
656 */
dea903a8 657 protected get full (): boolean {
26ce26ca
JB
658 return (
659 this.workerNodes.length >=
660 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
661 )
dea903a8 662 }
c2ade475 663
c319c66b
JB
664 /**
665 * Whether the pool is busy or not.
666 *
667 * The pool busyness boolean status.
668 */
669 protected abstract get busy (): boolean
7c0ba920 670
6c6afb84 671 /**
3d76750a 672 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
673 *
674 * @returns Worker nodes busyness boolean status.
675 */
c2ade475 676 protected internalBusy (): boolean {
3d76750a
JB
677 if (this.opts.enableTasksQueue === true) {
678 return (
679 this.workerNodes.findIndex(
041dc05b 680 workerNode =>
3d76750a
JB
681 workerNode.info.ready &&
682 workerNode.usage.tasks.executing <
67f3f2d6
JB
683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
684 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
685 ) === -1
686 )
3d76750a 687 }
419e8121
JB
688 return (
689 this.workerNodes.findIndex(
690 workerNode =>
691 workerNode.info.ready && workerNode.usage.tasks.executing === 0
692 ) === -1
693 )
cb70b19d
JB
694 }
695
42c677c1
JB
696 private isWorkerNodeBusy (workerNodeKey: number): boolean {
697 if (this.opts.enableTasksQueue === true) {
698 return (
699 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
700 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
701 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
702 )
703 }
704 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
705 }
706
e81c38f2 707 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
708 workerNodeKey: number,
709 message: MessageValue<Data>
710 ): Promise<boolean> {
72ae84a2 711 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
712 const taskFunctionOperationListener = (
713 message: MessageValue<Response>
714 ): void => {
715 this.checkMessageWorkerId(message)
1851fed0 716 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 717 if (
ae036c3e
JB
718 message.taskFunctionOperationStatus != null &&
719 message.workerId === workerId
72ae84a2 720 ) {
ae036c3e
JB
721 if (message.taskFunctionOperationStatus) {
722 resolve(true)
c63a35a0 723 } else {
ae036c3e
JB
724 reject(
725 new Error(
c63a35a0 726 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 727 )
72ae84a2 728 )
ae036c3e
JB
729 }
730 this.deregisterWorkerMessageListener(
731 this.getWorkerNodeKeyByWorkerId(message.workerId),
732 taskFunctionOperationListener
72ae84a2
JB
733 )
734 }
ae036c3e
JB
735 }
736 this.registerWorkerMessageListener(
737 workerNodeKey,
738 taskFunctionOperationListener
739 )
72ae84a2
JB
740 this.sendToWorker(workerNodeKey, message)
741 })
742 }
743
744 private async sendTaskFunctionOperationToWorkers (
adee6053 745 message: MessageValue<Data>
e81c38f2
JB
746 ): Promise<boolean> {
747 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
748 const responsesReceived = new Array<MessageValue<Response>>()
749 const taskFunctionOperationsListener = (
750 message: MessageValue<Response>
751 ): void => {
752 this.checkMessageWorkerId(message)
753 if (message.taskFunctionOperationStatus != null) {
754 responsesReceived.push(message)
755 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 756 if (
e81c38f2
JB
757 responsesReceived.every(
758 message => message.taskFunctionOperationStatus === true
759 )
760 ) {
761 resolve(true)
762 } else if (
e81c38f2
JB
763 responsesReceived.some(
764 message => message.taskFunctionOperationStatus === false
765 )
766 ) {
b0b55f57
JB
767 const errorResponse = responsesReceived.find(
768 response => response.taskFunctionOperationStatus === false
769 )
e81c38f2
JB
770 reject(
771 new Error(
b0b55f57 772 `Task function operation '${
e81c38f2 773 message.taskFunctionOperation as string
c63a35a0
JB
774 }' failed on worker ${errorResponse?.workerId} with error: '${
775 errorResponse?.workerError?.message
b0b55f57 776 }'`
e81c38f2
JB
777 )
778 )
779 }
ae036c3e
JB
780 this.deregisterWorkerMessageListener(
781 this.getWorkerNodeKeyByWorkerId(message.workerId),
782 taskFunctionOperationsListener
783 )
e81c38f2 784 }
ae036c3e
JB
785 }
786 }
787 for (const [workerNodeKey] of this.workerNodes.entries()) {
788 this.registerWorkerMessageListener(
789 workerNodeKey,
790 taskFunctionOperationsListener
791 )
72ae84a2 792 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
793 }
794 })
6703b9f4
JB
795 }
796
797 /** @inheritDoc */
798 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
799 for (const workerNode of this.workerNodes) {
800 if (
801 Array.isArray(workerNode.info.taskFunctionNames) &&
802 workerNode.info.taskFunctionNames.includes(name)
803 ) {
804 return true
805 }
806 }
807 return false
6703b9f4
JB
808 }
809
810 /** @inheritDoc */
e81c38f2
JB
811 public async addTaskFunction (
812 name: string,
3feeab69 813 fn: TaskFunction<Data, Response>
e81c38f2 814 ): Promise<boolean> {
3feeab69
JB
815 if (typeof name !== 'string') {
816 throw new TypeError('name argument must be a string')
817 }
818 if (typeof name === 'string' && name.trim().length === 0) {
819 throw new TypeError('name argument must not be an empty string')
820 }
821 if (typeof fn !== 'function') {
822 throw new TypeError('fn argument must be a function')
823 }
adee6053 824 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
825 taskFunctionOperation: 'add',
826 taskFunctionName: name,
3feeab69 827 taskFunction: fn.toString()
6703b9f4 828 })
adee6053
JB
829 this.taskFunctions.set(name, fn)
830 return opResult
6703b9f4
JB
831 }
832
833 /** @inheritDoc */
e81c38f2 834 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
835 if (!this.taskFunctions.has(name)) {
836 throw new Error(
16248b23 837 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
838 )
839 }
adee6053 840 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
841 taskFunctionOperation: 'remove',
842 taskFunctionName: name
843 })
adee6053
JB
844 this.deleteTaskFunctionWorkerUsages(name)
845 this.taskFunctions.delete(name)
846 return opResult
6703b9f4
JB
847 }
848
90d7d101 849 /** @inheritDoc */
6703b9f4 850 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
851 for (const workerNode of this.workerNodes) {
852 if (
6703b9f4
JB
853 Array.isArray(workerNode.info.taskFunctionNames) &&
854 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 855 ) {
6703b9f4 856 return workerNode.info.taskFunctionNames
f2dbbf95 857 }
90d7d101 858 }
f2dbbf95 859 return []
90d7d101
JB
860 }
861
6703b9f4 862 /** @inheritDoc */
e81c38f2 863 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 864 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
865 taskFunctionOperation: 'default',
866 taskFunctionName: name
867 })
6703b9f4
JB
868 }
869
adee6053
JB
870 private deleteTaskFunctionWorkerUsages (name: string): void {
871 for (const workerNode of this.workerNodes) {
872 workerNode.deleteTaskFunctionWorkerUsage(name)
873 }
874 }
875
375f7504
JB
876 private shallExecuteTask (workerNodeKey: number): boolean {
877 return (
878 this.tasksQueueSize(workerNodeKey) === 0 &&
879 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
880 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
881 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
882 )
883 }
884
afc003b2 885 /** @inheritDoc */
7d91a8cd
JB
886 public async execute (
887 data?: Data,
888 name?: string,
889 transferList?: TransferListItem[]
890 ): Promise<Response> {
52b71763 891 return await new Promise<Response>((resolve, reject) => {
15b176e0 892 if (!this.started) {
47352846 893 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 894 return
15b176e0 895 }
711623b8
JB
896 if (this.destroying) {
897 reject(new Error('Cannot execute a task on destroying pool'))
898 return
899 }
7d91a8cd
JB
900 if (name != null && typeof name !== 'string') {
901 reject(new TypeError('name argument must be a string'))
9d2d0da1 902 return
7d91a8cd 903 }
90d7d101
JB
904 if (
905 name != null &&
906 typeof name === 'string' &&
907 name.trim().length === 0
908 ) {
f58b60b9 909 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 910 return
90d7d101 911 }
b558f6b5
JB
912 if (transferList != null && !Array.isArray(transferList)) {
913 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 914 return
b558f6b5
JB
915 }
916 const timestamp = performance.now()
917 const workerNodeKey = this.chooseWorkerNode()
501aea93 918 const task: Task<Data> = {
52b71763
JB
919 name: name ?? DEFAULT_TASK_NAME,
920 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
921 data: data ?? ({} as Data),
7d91a8cd 922 transferList,
52b71763 923 timestamp,
7629bdf1 924 taskId: randomUUID()
52b71763 925 }
67f3f2d6
JB
926 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
927 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
928 resolve,
929 reject,
f18fd12b
JB
930 workerNodeKey,
931 ...(this.emitter != null && {
932 asyncResource: new AsyncResource('poolifier:task', {
933 triggerAsyncId: this.emitter.asyncId,
934 requireManualDestroy: true
935 })
936 })
2e81254d 937 })
52b71763 938 if (
4e377863
JB
939 this.opts.enableTasksQueue === false ||
940 (this.opts.enableTasksQueue === true &&
375f7504 941 this.shallExecuteTask(workerNodeKey))
52b71763 942 ) {
501aea93 943 this.executeTask(workerNodeKey, task)
4e377863
JB
944 } else {
945 this.enqueueTask(workerNodeKey, task)
52b71763 946 }
2e81254d 947 })
280c2a77 948 }
c97c7edb 949
47352846
JB
950 /** @inheritdoc */
951 public start (): void {
711623b8
JB
952 if (this.started) {
953 throw new Error('Cannot start an already started pool')
954 }
955 if (this.starting) {
956 throw new Error('Cannot start an already starting pool')
957 }
958 if (this.destroying) {
959 throw new Error('Cannot start a destroying pool')
960 }
47352846
JB
961 this.starting = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
26ce26ca 967 ) < this.minimumNumberOfWorkers
47352846
JB
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.starting = false
972 this.started = true
973 }
974
afc003b2 975 /** @inheritDoc */
c97c7edb 976 public async destroy (): Promise<void> {
711623b8
JB
977 if (!this.started) {
978 throw new Error('Cannot destroy an already destroyed pool')
979 }
980 if (this.starting) {
981 throw new Error('Cannot destroy an starting pool')
982 }
983 if (this.destroying) {
984 throw new Error('Cannot destroy an already destroying pool')
985 }
986 this.destroying = true
1fbcaa7c 987 await Promise.all(
14c39df1 988 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 989 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
990 })
991 )
33e6bb4c 992 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 993 this.emitter?.emitDestroy()
78f60f82 994 this.emitter?.removeAllListeners()
55082af9 995 this.readyEventEmitted = false
711623b8 996 this.destroying = false
15b176e0 997 this.started = false
c97c7edb
S
998 }
999
26ce26ca 1000 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1001 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1002 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1003 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1004 resolve()
85b2561d
JB
1005 return
1006 }
ae036c3e
JB
1007 const killMessageListener = (message: MessageValue<Response>): void => {
1008 this.checkMessageWorkerId(message)
1e3214b6
JB
1009 if (message.kill === 'success') {
1010 resolve()
1011 } else if (message.kill === 'failure') {
72ae84a2
JB
1012 reject(
1013 new Error(
c63a35a0 1014 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1015 )
1016 )
1e3214b6 1017 }
ae036c3e 1018 }
51d9cfbd 1019 // FIXME: should be registered only once
ae036c3e 1020 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1021 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1022 })
1e3214b6
JB
1023 }
1024
4a6952ff 1025 /**
aa9eede8 1026 * Terminates the worker node given its worker node key.
4a6952ff 1027 *
aa9eede8 1028 * @param workerNodeKey - The worker node key.
4a6952ff 1029 */
07e0c9e5
JB
1030 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1031 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1032 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1033 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1034 await waitWorkerNodeEvents(
1035 workerNode,
1036 'taskFinished',
1037 flushedTasks,
1038 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1039 getDefaultTasksQueueOptions(
1040 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1041 ).tasksFinishedTimeout
32b141fd 1042 )
07e0c9e5
JB
1043 await this.sendKillMessageToWorker(workerNodeKey)
1044 await workerNode.terminate()
1045 }
c97c7edb 1046
729c563d 1047 /**
6677a3d3
JB
1048 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1049 * Can be overridden.
afc003b2
JB
1050 *
1051 * @virtual
729c563d 1052 */
280c2a77 1053 protected setupHook (): void {
965df41c 1054 /* Intentionally empty */
280c2a77 1055 }
c97c7edb 1056
729c563d 1057 /**
280c2a77
S
1058 * Should return whether the worker is the main worker or not.
1059 */
1060 protected abstract isMain (): boolean
1061
1062 /**
2e81254d 1063 * Hook executed before the worker task execution.
bf9549ae 1064 * Can be overridden.
729c563d 1065 *
f06e48d8 1066 * @param workerNodeKey - The worker node key.
1c6fe997 1067 * @param task - The task to execute.
729c563d 1068 */
1c6fe997
JB
1069 protected beforeTaskExecutionHook (
1070 workerNodeKey: number,
1071 task: Task<Data>
1072 ): void {
c63a35a0 1073 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1074 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1075 const workerUsage = this.workerNodes[workerNodeKey].usage
1076 ++workerUsage.tasks.executing
c329fd41
JB
1077 updateWaitTimeWorkerUsage(
1078 this.workerChoiceStrategyContext,
1079 workerUsage,
1080 task
1081 )
94407def
JB
1082 }
1083 if (
1084 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1085 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1086 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1087 null
94407def 1088 ) {
67f3f2d6 1089 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1090 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1091 workerNodeKey
67f3f2d6
JB
1092 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1093 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1094 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1095 updateWaitTimeWorkerUsage(
1096 this.workerChoiceStrategyContext,
1097 taskFunctionWorkerUsage,
1098 task
1099 )
b558f6b5 1100 }
c97c7edb
S
1101 }
1102
c01733f1 1103 /**
2e81254d 1104 * Hook executed after the worker task execution.
bf9549ae 1105 * Can be overridden.
c01733f1 1106 *
501aea93 1107 * @param workerNodeKey - The worker node key.
38e795c1 1108 * @param message - The received message.
c01733f1 1109 */
2e81254d 1110 protected afterTaskExecutionHook (
501aea93 1111 workerNodeKey: number,
2740a743 1112 message: MessageValue<Response>
bf9549ae 1113 ): void {
c329fd41 1114 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1115 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1116 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1117 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1118 updateTaskStatisticsWorkerUsage(workerUsage, message)
1119 updateRunTimeWorkerUsage(
1120 this.workerChoiceStrategyContext,
1121 workerUsage,
1122 message
1123 )
1124 updateEluWorkerUsage(
1125 this.workerChoiceStrategyContext,
1126 workerUsage,
1127 message
1128 )
1129 needWorkerChoiceStrategyUpdate = true
94407def
JB
1130 }
1131 if (
1132 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1133 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1134 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1135 message.taskPerformance!.name
94407def
JB
1136 ) != null
1137 ) {
67f3f2d6 1138 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1139 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1140 workerNodeKey
67f3f2d6
JB
1141 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1142 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1143 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1144 updateRunTimeWorkerUsage(
1145 this.workerChoiceStrategyContext,
1146 taskFunctionWorkerUsage,
1147 message
1148 )
1149 updateEluWorkerUsage(
1150 this.workerChoiceStrategyContext,
1151 taskFunctionWorkerUsage,
1152 message
1153 )
1154 needWorkerChoiceStrategyUpdate = true
1155 }
1156 if (needWorkerChoiceStrategyUpdate) {
c63a35a0 1157 this.workerChoiceStrategyContext?.update(workerNodeKey)
b558f6b5
JB
1158 }
1159 }
1160
db0e38ee
JB
1161 /**
1162 * Whether the worker node shall update its task function worker usage or not.
1163 *
1164 * @param workerNodeKey - The worker node key.
1165 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1166 */
1167 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1168 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1169 return (
1851fed0 1170 workerInfo != null &&
6703b9f4
JB
1171 Array.isArray(workerInfo.taskFunctionNames) &&
1172 workerInfo.taskFunctionNames.length > 2
b558f6b5 1173 )
f1c06930
JB
1174 }
1175
280c2a77 1176 /**
f06e48d8 1177 * Chooses a worker node for the next task.
280c2a77 1178 *
6c6afb84 1179 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1180 *
aa9eede8 1181 * @returns The chosen worker node key
280c2a77 1182 */
6c6afb84 1183 private chooseWorkerNode (): number {
930dcf12 1184 if (this.shallCreateDynamicWorker()) {
aa9eede8 1185 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1186 if (
c63a35a0
JB
1187 this.workerChoiceStrategyContext?.getStrategyPolicy()
1188 .dynamicWorkerUsage === true
6c6afb84 1189 ) {
aa9eede8 1190 return workerNodeKey
6c6afb84 1191 }
17393ac8 1192 }
c63a35a0 1193 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
5d240702 1194 return this.workerChoiceStrategyContext!.execute()
930dcf12
JB
1195 }
1196
6c6afb84
JB
1197 /**
1198 * Conditions for dynamic worker creation.
1199 *
1200 * @returns Whether to create a dynamic worker or not.
1201 */
9d9fb7b6 1202 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1203
280c2a77 1204 /**
aa9eede8 1205 * Sends a message to worker given its worker node key.
280c2a77 1206 *
aa9eede8 1207 * @param workerNodeKey - The worker node key.
38e795c1 1208 * @param message - The message.
7d91a8cd 1209 * @param transferList - The optional array of transferable objects.
280c2a77
S
1210 */
1211 protected abstract sendToWorker (
aa9eede8 1212 workerNodeKey: number,
7d91a8cd
JB
1213 message: MessageValue<Data>,
1214 transferList?: TransferListItem[]
280c2a77
S
1215 ): void
1216
4a6952ff 1217 /**
aa9eede8 1218 * Creates a new, completely set up worker node.
4a6952ff 1219 *
aa9eede8 1220 * @returns New, completely set up worker node key.
4a6952ff 1221 */
aa9eede8 1222 protected createAndSetupWorkerNode (): number {
c3719753
JB
1223 const workerNode = this.createWorkerNode()
1224 workerNode.registerWorkerEventHandler(
1225 'online',
1226 this.opts.onlineHandler ?? EMPTY_FUNCTION
1227 )
1228 workerNode.registerWorkerEventHandler(
1229 'message',
1230 this.opts.messageHandler ?? EMPTY_FUNCTION
1231 )
1232 workerNode.registerWorkerEventHandler(
1233 'error',
1234 this.opts.errorHandler ?? EMPTY_FUNCTION
1235 )
1236 workerNode.registerWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1237 workerNode.info.ready = false
2a69b8c5 1238 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1239 if (
b6bfca01 1240 this.started &&
711623b8 1241 !this.destroying &&
9b38ab2d 1242 this.opts.restartWorkerOnError === true
15b176e0 1243 ) {
07e0c9e5 1244 if (workerNode.info.dynamic) {
aa9eede8 1245 this.createAndSetupDynamicWorkerNode()
8a1260a3 1246 } else {
aa9eede8 1247 this.createAndSetupWorkerNode()
8a1260a3 1248 }
5baee0d7 1249 }
3c9123c7
JB
1250 if (
1251 this.started &&
1252 !this.destroying &&
1253 this.opts.enableTasksQueue === true
1254 ) {
9974369e 1255 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1256 }
b4f8aca8
JB
1257 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1258 workerNode?.terminate().catch(error => {
07e0c9e5
JB
1259 this.emitter?.emit(PoolEvents.error, error)
1260 })
5baee0d7 1261 })
c3719753
JB
1262 workerNode.registerWorkerEventHandler(
1263 'exit',
1264 this.opts.exitHandler ?? EMPTY_FUNCTION
1265 )
1266 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1267 this.removeWorkerNode(workerNode)
a974afa6 1268 })
c3719753 1269 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1270 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1271 return workerNodeKey
c97c7edb 1272 }
be0676b3 1273
930dcf12 1274 /**
aa9eede8 1275 * Creates a new, completely set up dynamic worker node.
930dcf12 1276 *
aa9eede8 1277 * @returns New, completely set up dynamic worker node key.
930dcf12 1278 */
aa9eede8
JB
1279 protected createAndSetupDynamicWorkerNode (): number {
1280 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1281 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1282 this.checkMessageWorkerId(message)
aa9eede8
JB
1283 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1284 message.workerId
aad6fb64 1285 )
2b6b412f 1286 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1287 // Kill message received from worker
930dcf12
JB
1288 if (
1289 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1290 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1291 ((this.opts.enableTasksQueue === false &&
aa9eede8 1292 workerUsage.tasks.executing === 0) ||
7b56f532 1293 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1294 workerUsage.tasks.executing === 0 &&
1295 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1296 ) {
f3827d5d 1297 // Flag the worker node as not ready immediately
ae3ab61d 1298 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1299 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1300 this.emitter?.emit(PoolEvents.error, error)
1301 })
930dcf12
JB
1302 }
1303 })
aa9eede8 1304 this.sendToWorker(workerNodeKey, {
e9dd5b66 1305 checkActive: true
21f710aa 1306 })
72ae84a2
JB
1307 if (this.taskFunctions.size > 0) {
1308 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1309 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1310 taskFunctionOperation: 'add',
1311 taskFunctionName,
1312 taskFunction: taskFunction.toString()
1313 }).catch(error => {
1314 this.emitter?.emit(PoolEvents.error, error)
1315 })
1316 }
1317 }
e44639e9
JB
1318 const workerNode = this.workerNodes[workerNodeKey]
1319 workerNode.info.dynamic = true
b1aae695 1320 if (
c63a35a0
JB
1321 this.workerChoiceStrategyContext?.getStrategyPolicy()
1322 .dynamicWorkerReady === true ||
1323 this.workerChoiceStrategyContext?.getStrategyPolicy()
1324 .dynamicWorkerUsage === true
b1aae695 1325 ) {
e44639e9 1326 workerNode.info.ready = true
b5e113f6 1327 }
33e6bb4c 1328 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1329 return workerNodeKey
930dcf12
JB
1330 }
1331
a2ed5053 1332 /**
aa9eede8 1333 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1334 *
aa9eede8 1335 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1336 * @param listener - The message listener callback.
1337 */
85aeb3f3
JB
1338 protected abstract registerWorkerMessageListener<
1339 Message extends Data | Response
aa9eede8
JB
1340 >(
1341 workerNodeKey: number,
1342 listener: (message: MessageValue<Message>) => void
1343 ): void
a2ed5053 1344
ae036c3e
JB
1345 /**
1346 * Registers once a listener callback on the worker given its worker node key.
1347 *
1348 * @param workerNodeKey - The worker node key.
1349 * @param listener - The message listener callback.
1350 */
1351 protected abstract registerOnceWorkerMessageListener<
1352 Message extends Data | Response
1353 >(
1354 workerNodeKey: number,
1355 listener: (message: MessageValue<Message>) => void
1356 ): void
1357
1358 /**
1359 * Deregisters a listener callback on the worker given its worker node key.
1360 *
1361 * @param workerNodeKey - The worker node key.
1362 * @param listener - The message listener callback.
1363 */
1364 protected abstract deregisterWorkerMessageListener<
1365 Message extends Data | Response
1366 >(
1367 workerNodeKey: number,
1368 listener: (message: MessageValue<Message>) => void
1369 ): void
1370
a2ed5053 1371 /**
aa9eede8 1372 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1373 * Can be overridden.
1374 *
aa9eede8 1375 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1376 */
aa9eede8 1377 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1378 // Listen to worker messages.
fcd39179
JB
1379 this.registerWorkerMessageListener(
1380 workerNodeKey,
3a776b6d 1381 this.workerMessageListener
fcd39179 1382 )
85aeb3f3 1383 // Send the startup message to worker.
aa9eede8 1384 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1385 // Send the statistics message to worker.
1386 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1387 if (this.opts.enableTasksQueue === true) {
dbd73092 1388 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1389 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1390 'idle',
1391 this.handleWorkerNodeIdleEvent
9f95d5eb 1392 )
47352846
JB
1393 }
1394 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1395 this.workerNodes[workerNodeKey].on(
b5e75be8 1396 'backPressure',
e44639e9 1397 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1398 )
47352846 1399 }
72695f86 1400 }
d2c73f82
JB
1401 }
1402
85aeb3f3 1403 /**
aa9eede8
JB
1404 * Sends the startup message to worker given its worker node key.
1405 *
1406 * @param workerNodeKey - The worker node key.
1407 */
1408 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1409
1410 /**
9edb9717 1411 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1412 *
aa9eede8 1413 * @param workerNodeKey - The worker node key.
85aeb3f3 1414 */
9edb9717 1415 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1416 this.sendToWorker(workerNodeKey, {
1417 statistics: {
1418 runTime:
c63a35a0
JB
1419 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
1420 ?.runTime.aggregate ?? false,
1421 elu:
1422 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()?.elu
1423 .aggregate ?? false
72ae84a2 1424 }
aa9eede8
JB
1425 })
1426 }
a2ed5053 1427
5eb72b9e
JB
1428 private cannotStealTask (): boolean {
1429 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1430 }
1431
42c677c1
JB
1432 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1433 if (this.shallExecuteTask(workerNodeKey)) {
1434 this.executeTask(workerNodeKey, task)
1435 } else {
1436 this.enqueueTask(workerNodeKey, task)
1437 }
1438 }
1439
a2ed5053 1440 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1441 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1442 return
1443 }
a2ed5053 1444 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1445 const destinationWorkerNodeKey = this.workerNodes.reduce(
1446 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1447 return workerNode.info.ready &&
1448 workerNode.usage.tasks.queued <
1449 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1450 ? workerNodeKey
1451 : minWorkerNodeKey
1452 },
1453 0
1454 )
42c677c1
JB
1455 this.handleTask(
1456 destinationWorkerNodeKey,
67f3f2d6
JB
1457 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1458 this.dequeueTask(workerNodeKey)!
42c677c1 1459 )
dd951876
JB
1460 }
1461 }
1462
b1838604
JB
1463 private updateTaskStolenStatisticsWorkerUsage (
1464 workerNodeKey: number,
b1838604
JB
1465 taskName: string
1466 ): void {
1a880eca 1467 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1468 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1469 if (workerNode?.usage != null) {
b1838604
JB
1470 ++workerNode.usage.tasks.stolen
1471 }
1472 if (
1473 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1474 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1475 ) {
67f3f2d6
JB
1476 const taskFunctionWorkerUsage =
1477 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1478 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1479 ++taskFunctionWorkerUsage.tasks.stolen
1480 }
1481 }
1482
463226a4 1483 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1484 workerNodeKey: number
463226a4
JB
1485 ): void {
1486 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1487 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1488 if (workerNode?.usage != null) {
463226a4
JB
1489 ++workerNode.usage.tasks.sequentiallyStolen
1490 }
f1f77f45
JB
1491 }
1492
1493 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1494 workerNodeKey: number,
1495 taskName: string
1496 ): void {
1497 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1498 if (
1499 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1500 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1501 ) {
67f3f2d6
JB
1502 const taskFunctionWorkerUsage =
1503 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1504 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1505 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1506 }
1507 }
1508
1509 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1510 workerNodeKey: number
463226a4
JB
1511 ): void {
1512 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1513 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1514 if (workerNode?.usage != null) {
463226a4
JB
1515 workerNode.usage.tasks.sequentiallyStolen = 0
1516 }
f1f77f45
JB
1517 }
1518
1519 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1520 workerNodeKey: number,
1521 taskName: string
1522 ): void {
1523 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1524 if (
1525 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1526 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1527 ) {
67f3f2d6
JB
1528 const taskFunctionWorkerUsage =
1529 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1530 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1531 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1532 }
1533 }
1534
e44639e9 1535 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1536 eventDetail: WorkerNodeEventDetail,
463226a4 1537 previousStolenTask?: Task<Data>
9f95d5eb 1538 ): void => {
e1c2dba7 1539 const { workerNodeKey } = eventDetail
463226a4
JB
1540 if (workerNodeKey == null) {
1541 throw new Error(
c63a35a0 1542 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1543 )
1544 }
c63a35a0 1545 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1546 if (
1547 this.cannotStealTask() ||
c63a35a0
JB
1548 (this.info.stealingWorkerNodes ?? 0) >
1549 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1550 ) {
1851fed0 1551 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1552 workerInfo.stealing = false
5eb72b9e
JB
1553 }
1554 return
1555 }
463226a4
JB
1556 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1557 if (
1851fed0 1558 workerInfo != null &&
463226a4
JB
1559 previousStolenTask != null &&
1560 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1561 (workerNodeTasksUsage.executing > 0 ||
1562 this.tasksQueueSize(workerNodeKey) > 0)
1563 ) {
c63a35a0 1564 workerInfo.stealing = false
67f3f2d6 1565 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45 1566 for (const taskName of this.workerNodes[workerNodeKey].info
67f3f2d6 1567 .taskFunctionNames!) {
f1f77f45
JB
1568 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1569 workerNodeKey,
1570 taskName
1571 )
1572 }
1573 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1574 return
1575 }
1851fed0
JB
1576 if (workerInfo == null) {
1577 throw new Error(
1578 `Worker node with key '${workerNodeKey}' not found in pool`
1579 )
1580 }
c63a35a0 1581 workerInfo.stealing = true
463226a4 1582 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1583 if (
1584 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1585 stolenTask != null
1586 ) {
67f3f2d6 1587 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1588 const taskFunctionTasksWorkerUsage = this.workerNodes[
1589 workerNodeKey
67f3f2d6
JB
1590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1591 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1592 if (
1593 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1594 (previousStolenTask != null &&
1595 previousStolenTask.name === stolenTask.name &&
1596 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1597 ) {
1598 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1599 workerNodeKey,
67f3f2d6
JB
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1601 stolenTask.name!
f1f77f45
JB
1602 )
1603 } else {
1604 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1605 workerNodeKey,
67f3f2d6
JB
1606 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1607 stolenTask.name!
f1f77f45
JB
1608 )
1609 }
1610 }
463226a4
JB
1611 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1612 .then(() => {
e44639e9 1613 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1614 return undefined
1615 })
2e8cfbb7
JB
1616 .catch(error => {
1617 this.emitter?.emit(PoolEvents.error, error)
1618 })
463226a4
JB
1619 }
1620
1621 private readonly workerNodeStealTask = (
1622 workerNodeKey: number
1623 ): Task<Data> | undefined => {
dd951876 1624 const workerNodes = this.workerNodes
a6b3272b 1625 .slice()
dd951876
JB
1626 .sort(
1627 (workerNodeA, workerNodeB) =>
1628 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1629 )
f201a0cd 1630 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1631 (sourceWorkerNode, sourceWorkerNodeKey) =>
1632 sourceWorkerNode.info.ready &&
5eb72b9e 1633 !sourceWorkerNode.info.stealing &&
463226a4
JB
1634 sourceWorkerNodeKey !== workerNodeKey &&
1635 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1636 )
1637 if (sourceWorkerNode != null) {
67f3f2d6
JB
1638 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1639 const task = sourceWorkerNode.popTask()!
42c677c1 1640 this.handleTask(workerNodeKey, task)
f1f77f45 1641 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1642 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1643 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1644 return task
72695f86
JB
1645 }
1646 }
1647
e44639e9 1648 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1649 eventDetail: WorkerNodeEventDetail
9f95d5eb 1650 ): void => {
5eb72b9e
JB
1651 if (
1652 this.cannotStealTask() ||
c63a35a0
JB
1653 (this.info.stealingWorkerNodes ?? 0) >
1654 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1655 ) {
cb71d660
JB
1656 return
1657 }
e1c2dba7 1658 const { workerId } = eventDetail
f778c355 1659 const sizeOffset = 1
67f3f2d6
JB
1660 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1661 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1662 return
1663 }
72695f86 1664 const sourceWorkerNode =
b5e75be8 1665 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1666 const workerNodes = this.workerNodes
a6b3272b 1667 .slice()
72695f86
JB
1668 .sort(
1669 (workerNodeA, workerNodeB) =>
1670 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1671 )
1672 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1673 if (
0bc68267 1674 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1675 workerNode.info.ready &&
5eb72b9e 1676 !workerNode.info.stealing &&
b5e75be8 1677 workerNode.info.id !== workerId &&
0bc68267 1678 workerNode.usage.tasks.queued <
67f3f2d6
JB
1679 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1680 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1681 ) {
c63a35a0 1682 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1683 if (workerInfo == null) {
1684 throw new Error(
1685 `Worker node with key '${workerNodeKey}' not found in pool`
1686 )
1687 }
c63a35a0 1688 workerInfo.stealing = true
67f3f2d6
JB
1689 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1690 const task = sourceWorkerNode.popTask()!
42c677c1 1691 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1692 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1693 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1694 workerInfo.stealing = false
10ecf8fd 1695 }
a2ed5053
JB
1696 }
1697 }
1698
be0676b3 1699 /**
fcd39179 1700 * This method is the message listener registered on each worker.
be0676b3 1701 */
3a776b6d
JB
1702 protected readonly workerMessageListener = (
1703 message: MessageValue<Response>
1704 ): void => {
fcd39179 1705 this.checkMessageWorkerId(message)
b641345c
JB
1706 const { workerId, ready, taskId, taskFunctionNames } = message
1707 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1708 // Worker ready response received from worker
1709 this.handleWorkerReadyResponse(message)
b641345c 1710 } else if (taskId != null) {
fcd39179
JB
1711 // Task execution response received from worker
1712 this.handleTaskExecutionResponse(message)
b641345c 1713 } else if (taskFunctionNames != null) {
fcd39179 1714 // Task function names message received from worker
1851fed0 1715 const workerInfo = this.getWorkerInfo(
b641345c 1716 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1717 )
1718 if (workerInfo != null) {
1719 workerInfo.taskFunctionNames = taskFunctionNames
1720 }
6b272951
JB
1721 }
1722 }
1723
8e8d9101
JB
1724 private checkAndEmitReadyEvent (): void {
1725 if (!this.readyEventEmitted && this.ready) {
1726 this.emitter?.emit(PoolEvents.ready, this.info)
1727 this.readyEventEmitted = true
1728 }
1729 }
1730
10e2aa7e 1731 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4 1732 const { workerId, ready, taskFunctionNames } = message
e44639e9 1733 if (ready == null || !ready) {
c63a35a0 1734 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1735 }
e44639e9
JB
1736 const workerNode =
1737 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1738 workerNode.info.ready = ready
1739 workerNode.info.taskFunctionNames = taskFunctionNames
8e8d9101 1740 this.checkAndEmitReadyEvent()
6b272951
JB
1741 }
1742
1743 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1744 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1745 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1746 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1747 if (promiseResponse != null) {
f18fd12b 1748 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1749 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1750 if (workerError != null) {
1751 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1752 asyncResource != null
1753 ? asyncResource.runInAsyncScope(
1754 reject,
1755 this.emitter,
1756 workerError.message
1757 )
1758 : reject(workerError.message)
6b272951 1759 } else {
f18fd12b
JB
1760 asyncResource != null
1761 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1762 : resolve(data as Response)
6b272951 1763 }
f18fd12b 1764 asyncResource?.emitDestroy()
501aea93 1765 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1768 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1769 workerNode?.emit('taskFinished', taskId)
85b2561d 1770 if (this.opts.enableTasksQueue === true && !this.destroying) {
9b358e72 1771 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1772 if (
1773 this.tasksQueueSize(workerNodeKey) > 0 &&
1774 workerNodeTasksUsage.executing <
67f3f2d6
JB
1775 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1776 this.opts.tasksQueueOptions!.concurrency!
463226a4 1777 ) {
67f3f2d6
JB
1778 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1779 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1780 }
1781 if (
1782 workerNodeTasksUsage.executing === 0 &&
1783 this.tasksQueueSize(workerNodeKey) === 0 &&
1784 workerNodeTasksUsage.sequentiallyStolen === 0
1785 ) {
e44639e9 1786 workerNode.emit('idle', {
7f0e1334 1787 workerId,
e1c2dba7
JB
1788 workerNodeKey
1789 })
463226a4 1790 }
be0676b3
APA
1791 }
1792 }
be0676b3 1793 }
7c0ba920 1794
a1763c54 1795 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1796 if (this.busy) {
1797 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1798 }
1799 }
1800
1801 private checkAndEmitTaskQueuingEvents (): void {
1802 if (this.hasBackPressure()) {
1803 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1804 }
1805 }
1806
d0878034
JB
1807 /**
1808 * Emits dynamic worker creation events.
1809 */
1810 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1811
8a1260a3 1812 /**
aa9eede8 1813 * Gets the worker information given its worker node key.
8a1260a3
JB
1814 *
1815 * @param workerNodeKey - The worker node key.
3f09ed9f 1816 * @returns The worker information.
8a1260a3 1817 */
1851fed0
JB
1818 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1819 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1820 }
1821
a05c10de 1822 /**
c3719753 1823 * Creates a worker node.
ea7a90d3 1824 *
c3719753 1825 * @returns The created worker node.
ea7a90d3 1826 */
c3719753 1827 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1828 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1829 this.worker,
1830 this.filePath,
1831 {
1832 env: this.opts.env,
1833 workerOptions: this.opts.workerOptions,
1834 tasksQueueBackPressureSize:
32b141fd 1835 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1836 getDefaultTasksQueueOptions(
1837 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1838 ).size
c3719753 1839 }
671d5154 1840 )
b97d82d8 1841 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1842 if (this.starting) {
1843 workerNode.info.ready = true
1844 }
c3719753
JB
1845 return workerNode
1846 }
1847
1848 /**
1849 * Adds the given worker node in the pool worker nodes.
1850 *
1851 * @param workerNode - The worker node.
1852 * @returns The added worker node key.
1853 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1854 */
1855 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1856 this.workerNodes.push(workerNode)
c3719753 1857 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1858 if (workerNodeKey === -1) {
86ed0598 1859 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1860 }
1861 return workerNodeKey
ea7a90d3 1862 }
c923ce56 1863
8e8d9101
JB
1864 private checkAndEmitEmptyEvent (): void {
1865 if (this.empty) {
1866 this.emitter?.emit(PoolEvents.empty, this.info)
1867 this.readyEventEmitted = false
1868 }
1869 }
1870
51fe3d3c 1871 /**
9974369e 1872 * Removes the worker node from the pool worker nodes.
51fe3d3c 1873 *
9974369e 1874 * @param workerNode - The worker node.
51fe3d3c 1875 */
9974369e
JB
1876 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1877 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1878 if (workerNodeKey !== -1) {
1879 this.workerNodes.splice(workerNodeKey, 1)
c63a35a0 1880 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1f68cede 1881 }
8e8d9101 1882 this.checkAndEmitEmptyEvent()
51fe3d3c 1883 }
adc3c320 1884
ae3ab61d 1885 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
1886 const workerInfo = this.getWorkerInfo(workerNodeKey)
1887 if (workerInfo != null) {
1888 workerInfo.ready = false
1889 }
ae3ab61d
JB
1890 }
1891
9e844245
JB
1892 private hasBackPressure (): boolean {
1893 return (
1894 this.opts.enableTasksQueue === true &&
1895 this.workerNodes.findIndex(
041dc05b 1896 workerNode => !workerNode.hasBackPressure()
a1763c54 1897 ) === -1
9e844245 1898 )
e2b31e32
JB
1899 }
1900
b0a4db63 1901 /**
aa9eede8 1902 * Executes the given task on the worker given its worker node key.
b0a4db63 1903 *
aa9eede8 1904 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1905 * @param task - The task to execute.
1906 */
2e81254d 1907 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1908 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1909 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1910 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1911 }
1912
f9f00b5f 1913 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1914 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1915 this.checkAndEmitTaskQueuingEvents()
1916 return tasksQueueSize
adc3c320
JB
1917 }
1918
416fd65c 1919 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1920 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1921 }
1922
416fd65c 1923 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1924 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1925 }
1926
87347ea8
JB
1927 protected flushTasksQueue (workerNodeKey: number): number {
1928 let flushedTasks = 0
920278a2 1929 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
1930 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1931 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 1932 ++flushedTasks
ff733df7 1933 }
4b628b48 1934 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1935 return flushedTasks
ff733df7
JB
1936 }
1937
ef41a6e6
JB
1938 private flushTasksQueues (): void {
1939 for (const [workerNodeKey] of this.workerNodes.entries()) {
1940 this.flushTasksQueue(workerNodeKey)
1941 }
1942 }
c97c7edb 1943}