fix: guarantee the minimun number of workers on started pool
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
ded253e2 1import { AsyncResource } from 'node:async_hooks'
2845f2a5 2import { randomUUID } from 'node:crypto'
ded253e2 3import { EventEmitterAsyncResource } from 'node:events'
62c15a68 4import { performance } from 'node:perf_hooks'
ef083f7b 5import type { TransferListItem } from 'node:worker_threads'
ded253e2 6
5c4d16da
JB
7import type {
8 MessageValue,
9 PromiseResponseWrapper,
76d91ea0 10 Task
d35e5717 11} from '../utility-types.js'
bbeadd16 12import {
ded253e2 13 average,
ff128cc9 14 DEFAULT_TASK_NAME,
bbeadd16 15 EMPTY_FUNCTION,
463226a4 16 exponentialDelay,
59317253 17 isKillBehavior,
0d80593b 18 isPlainObject,
90d6701c 19 max,
afe0d5bf 20 median,
90d6701c 21 min,
463226a4
JB
22 round,
23 sleep
d35e5717 24} from '../utils.js'
d35e5717 25import type { TaskFunction } from '../worker/task-functions.js'
ded253e2 26import { KillBehaviors } from '../worker/worker-options.js'
c4855468 27import {
65d7a1c9 28 type IPool,
c4855468 29 PoolEvents,
6b27d407 30 type PoolInfo,
c4855468 31 type PoolOptions,
6b27d407
JB
32 type PoolType,
33 PoolTypes,
4b628b48 34 type TasksQueueOptions
d35e5717 35} from './pool.js'
a35560ba 36import {
f0d7f803 37 Measurements,
a35560ba 38 WorkerChoiceStrategies,
a20f0ba5
JB
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
d35e5717
JB
41} from './selection-strategies/selection-strategies-types.js'
42import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
bde6b5d7
JB
43import {
44 checkFilePath,
45 checkValidTasksQueueOptions,
bfc75cca 46 checkValidWorkerChoiceStrategy,
32b141fd 47 getDefaultTasksQueueOptions,
c329fd41
JB
48 updateEluWorkerUsage,
49 updateRunTimeWorkerUsage,
50 updateTaskStatisticsWorkerUsage,
51 updateWaitTimeWorkerUsage,
87347ea8 52 waitWorkerNodeEvents
d35e5717 53} from './utils.js'
ded253e2
JB
54import { version } from './version.js'
55import type {
56 IWorker,
57 IWorkerNode,
58 WorkerInfo,
59 WorkerNodeEventDetail,
60 WorkerType
61} from './worker.js'
62import { WorkerNode } from './worker-node.js'
23ccf9d7 63
729c563d 64/**
ea7a90d3 65 * Base class that implements some shared logic for all poolifier pools.
729c563d 66 *
38e795c1 67 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 70 */
c97c7edb 71export abstract class AbstractPool<
f06e48d8 72 Worker extends IWorker,
d3c8a1a8
S
73 Data = unknown,
74 Response = unknown
c4855468 75> implements IPool<Worker, Data, Response> {
afc003b2 76 /** @inheritDoc */
4b628b48 77 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 78
afc003b2 79 /** @inheritDoc */
f80125ca 80 public emitter?: EventEmitterAsyncResource
7c0ba920 81
be0676b3 82 /**
419e8121 83 * The task execution response promise map:
2740a743 84 * - `key`: The message id of each submitted task.
a3445496 85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 86 *
a3445496 87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 88 */
501aea93
JB
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 91
a35560ba 92 /**
51fe3d3c 93 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 94 */
c63a35a0 95 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
78cea37e
JB
96 Worker,
97 Data,
98 Response
a35560ba
S
99 >
100
72ae84a2
JB
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
15b176e0
JB
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
47352846
JB
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
711623b8
JB
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
55082af9
JB
120 /**
121 * Whether the pool ready event has been emitted or not.
122 */
123 private readyEventEmitted: boolean
afe0d5bf
JB
124 /**
125 * The start timestamp of the pool.
126 */
127 private readonly startTimestamp
128
729c563d
S
129 /**
130 * Constructs a new poolifier pool.
131 *
00e1bdeb 132 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 133 * @param filePath - Path to the worker file.
38e795c1 134 * @param opts - Options for the pool.
00e1bdeb 135 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 136 */
c97c7edb 137 public constructor (
26ce26ca 138 protected readonly minimumNumberOfWorkers: number,
b4213b7f 139 protected readonly filePath: string,
26ce26ca
JB
140 protected readonly opts: PoolOptions<Worker>,
141 protected readonly maximumNumberOfWorkers?: number
c97c7edb 142 ) {
78cea37e 143 if (!this.isMain()) {
04f45163 144 throw new Error(
8c6d4acf 145 'Cannot start a pool from a worker with the same type as the pool'
04f45163 146 )
c97c7edb 147 }
26ce26ca 148 this.checkPoolType()
bde6b5d7 149 checkFilePath(this.filePath)
26ce26ca 150 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 151 this.checkPoolOptions(this.opts)
1086026a 152
7254e419
JB
153 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
154 this.executeTask = this.executeTask.bind(this)
155 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 156
6bd72cd0 157 if (this.opts.enableEvents === true) {
b5604034 158 this.initializeEventEmitter()
7c0ba920 159 }
d59df138
JB
160 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
161 Worker,
162 Data,
163 Response
da309861
JB
164 >(
165 this,
166 this.opts.workerChoiceStrategy,
167 this.opts.workerChoiceStrategyOptions
168 )
b6b32453
JB
169
170 this.setupHook()
171
72ae84a2
JB
172 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
173
47352846 174 this.started = false
075e51d1 175 this.starting = false
711623b8 176 this.destroying = false
55082af9 177 this.readyEventEmitted = false
47352846
JB
178 if (this.opts.startWorkers === true) {
179 this.start()
180 }
afe0d5bf
JB
181
182 this.startTimestamp = performance.now()
c97c7edb
S
183 }
184
26ce26ca
JB
185 private checkPoolType (): void {
186 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
187 throw new Error(
188 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
189 )
190 }
191 }
192
c63a35a0
JB
193 private checkMinimumNumberOfWorkers (
194 minimumNumberOfWorkers: number | undefined
195 ): void {
af7f2788 196 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
197 throw new Error(
198 'Cannot instantiate a pool without specifying the number of workers'
199 )
af7f2788 200 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 201 throw new TypeError(
0d80593b 202 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 203 )
af7f2788 204 } else if (minimumNumberOfWorkers < 0) {
473c717a 205 throw new RangeError(
8d3782fa
JB
206 'Cannot instantiate a pool with a negative number of workers'
207 )
af7f2788 208 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
209 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
210 }
211 }
212
7c0ba920 213 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 214 if (isPlainObject(opts)) {
47352846 215 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 216 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
217 this.opts.workerChoiceStrategy =
218 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 219 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 220 opts.workerChoiceStrategyOptions
2324f8c9 221 )
26ce26ca
JB
222 if (opts.workerChoiceStrategyOptions != null) {
223 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 224 }
1f68cede 225 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
226 this.opts.enableEvents = opts.enableEvents ?? true
227 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
228 if (this.opts.enableTasksQueue) {
c63a35a0 229 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 230 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 231 opts.tasksQueueOptions
0d80593b
JB
232 )
233 }
234 } else {
235 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 236 }
aee46736
JB
237 }
238
0d80593b 239 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 240 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 241 ): void {
2324f8c9
JB
242 if (
243 workerChoiceStrategyOptions != null &&
244 !isPlainObject(workerChoiceStrategyOptions)
245 ) {
0d80593b
JB
246 throw new TypeError(
247 'Invalid worker choice strategy options: must be a plain object'
248 )
249 }
49be33fe 250 if (
2324f8c9 251 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
252 Object.keys(workerChoiceStrategyOptions.weights).length !==
253 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
254 ) {
255 throw new Error(
256 'Invalid worker choice strategy options: must have a weight for each worker node'
257 )
258 }
f0d7f803 259 if (
2324f8c9 260 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
261 !Object.values(Measurements).includes(
262 workerChoiceStrategyOptions.measurement
263 )
264 ) {
265 throw new Error(
266 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
267 )
268 }
0d80593b
JB
269 }
270
b5604034 271 private initializeEventEmitter (): void {
5b7d00b4 272 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
273 name: `poolifier:${this.type}-${this.worker}-pool`
274 })
275 }
276
08f3f44c 277 /** @inheritDoc */
6b27d407
JB
278 public get info (): PoolInfo {
279 return {
23ccf9d7 280 version,
6b27d407 281 type: this.type,
184855e6 282 worker: this.worker,
47352846 283 started: this.started,
2431bdb4 284 ready: this.ready,
67f3f2d6
JB
285 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
286 strategy: this.opts.workerChoiceStrategy!,
0e8587d2 287 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
26ce26ca
JB
288 minSize: this.minimumNumberOfWorkers,
289 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
290 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 291 .runTime.aggregate === true &&
c63a35a0
JB
292 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
293 .waitTime.aggregate && {
294 utilization: round(this.utilization)
295 }),
6b27d407
JB
296 workerNodes: this.workerNodes.length,
297 idleWorkerNodes: this.workerNodes.reduce(
298 (accumulator, workerNode) =>
f59e1027 299 workerNode.usage.tasks.executing === 0
a4e07f72
JB
300 ? accumulator + 1
301 : accumulator,
6b27d407
JB
302 0
303 ),
5eb72b9e
JB
304 ...(this.opts.enableTasksQueue === true && {
305 stealingWorkerNodes: this.workerNodes.reduce(
306 (accumulator, workerNode) =>
307 workerNode.info.stealing ? accumulator + 1 : accumulator,
308 0
309 )
310 }),
6b27d407 311 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
312 (accumulator, _workerNode, workerNodeKey) =>
313 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
314 0
315 ),
a4e07f72 316 executedTasks: this.workerNodes.reduce(
6b27d407 317 (accumulator, workerNode) =>
f59e1027 318 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
319 0
320 ),
321 executingTasks: this.workerNodes.reduce(
322 (accumulator, workerNode) =>
f59e1027 323 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
324 0
325 ),
daf86646
JB
326 ...(this.opts.enableTasksQueue === true && {
327 queuedTasks: this.workerNodes.reduce(
328 (accumulator, workerNode) =>
329 accumulator + workerNode.usage.tasks.queued,
330 0
331 )
332 }),
333 ...(this.opts.enableTasksQueue === true && {
334 maxQueuedTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
c63a35a0 336 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
337 0
338 )
339 }),
a1763c54
JB
340 ...(this.opts.enableTasksQueue === true && {
341 backPressure: this.hasBackPressure()
342 }),
68cbdc84
JB
343 ...(this.opts.enableTasksQueue === true && {
344 stolenTasks: this.workerNodes.reduce(
345 (accumulator, workerNode) =>
346 accumulator + workerNode.usage.tasks.stolen,
347 0
348 )
349 }),
a4e07f72
JB
350 failedTasks: this.workerNodes.reduce(
351 (accumulator, workerNode) =>
f59e1027 352 accumulator + workerNode.usage.tasks.failed,
a4e07f72 353 0
1dcf8b7b 354 ),
26ce26ca 355 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 356 .runTime.aggregate === true && {
1dcf8b7b 357 runTime: {
98e72cda 358 minimum: round(
90d6701c 359 min(
98e72cda 360 ...this.workerNodes.map(
c63a35a0 361 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 362 )
1dcf8b7b
JB
363 )
364 ),
98e72cda 365 maximum: round(
90d6701c 366 max(
98e72cda 367 ...this.workerNodes.map(
c63a35a0 368 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 369 )
1dcf8b7b 370 )
98e72cda 371 ),
c63a35a0 372 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
373 .runTime.average && {
374 average: round(
375 average(
376 this.workerNodes.reduce<number[]>(
377 (accumulator, workerNode) =>
378 accumulator.concat(workerNode.usage.runTime.history),
379 []
380 )
98e72cda 381 )
dc021bcc 382 )
3baa0837 383 }),
c63a35a0 384 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
385 .runTime.median && {
386 median: round(
387 median(
3baa0837
JB
388 this.workerNodes.reduce<number[]>(
389 (accumulator, workerNode) =>
390 accumulator.concat(workerNode.usage.runTime.history),
391 []
98e72cda
JB
392 )
393 )
394 )
395 })
1dcf8b7b
JB
396 }
397 }),
26ce26ca 398 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 399 .waitTime.aggregate === true && {
1dcf8b7b 400 waitTime: {
98e72cda 401 minimum: round(
90d6701c 402 min(
98e72cda 403 ...this.workerNodes.map(
c63a35a0 404 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 405 )
1dcf8b7b
JB
406 )
407 ),
98e72cda 408 maximum: round(
90d6701c 409 max(
98e72cda 410 ...this.workerNodes.map(
c63a35a0 411 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 412 )
1dcf8b7b 413 )
98e72cda 414 ),
c63a35a0 415 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
416 .waitTime.average && {
417 average: round(
418 average(
419 this.workerNodes.reduce<number[]>(
420 (accumulator, workerNode) =>
421 accumulator.concat(workerNode.usage.waitTime.history),
422 []
423 )
98e72cda 424 )
dc021bcc 425 )
3baa0837 426 }),
c63a35a0 427 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
428 .waitTime.median && {
429 median: round(
430 median(
3baa0837
JB
431 this.workerNodes.reduce<number[]>(
432 (accumulator, workerNode) =>
433 accumulator.concat(workerNode.usage.waitTime.history),
434 []
98e72cda
JB
435 )
436 )
437 )
438 })
1dcf8b7b
JB
439 }
440 })
6b27d407
JB
441 }
442 }
08f3f44c 443
aa9eede8
JB
444 /**
445 * The pool readiness boolean status.
446 */
2431bdb4 447 private get ready (): boolean {
8e8d9101
JB
448 if (this.empty) {
449 return false
450 }
2431bdb4 451 return (
b97d82d8
JB
452 this.workerNodes.reduce(
453 (accumulator, workerNode) =>
454 !workerNode.info.dynamic && workerNode.info.ready
455 ? accumulator + 1
456 : accumulator,
457 0
26ce26ca 458 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
459 )
460 }
461
8e8d9101
JB
462 /**
463 * The pool emptiness boolean status.
464 */
465 protected get empty (): boolean {
fb43035c 466 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
467 }
468
afe0d5bf 469 /**
aa9eede8 470 * The approximate pool utilization.
afe0d5bf
JB
471 *
472 * @returns The pool utilization.
473 */
474 private get utilization (): number {
8e5ca040 475 const poolTimeCapacity =
26ce26ca
JB
476 (performance.now() - this.startTimestamp) *
477 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
478 const totalTasksRunTime = this.workerNodes.reduce(
479 (accumulator, workerNode) =>
c63a35a0 480 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
481 0
482 )
483 const totalTasksWaitTime = this.workerNodes.reduce(
484 (accumulator, workerNode) =>
c63a35a0 485 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
486 0
487 )
8e5ca040 488 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
489 }
490
8881ae32 491 /**
aa9eede8 492 * The pool type.
8881ae32
JB
493 *
494 * If it is `'dynamic'`, it provides the `max` property.
495 */
496 protected abstract get type (): PoolType
497
184855e6 498 /**
aa9eede8 499 * The worker type.
184855e6
JB
500 */
501 protected abstract get worker (): WorkerType
502
6b813701
JB
503 /**
504 * Checks if the worker id sent in the received message from a worker is valid.
505 *
506 * @param message - The received message.
507 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
508 */
4d159167 509 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
510 if (message.workerId == null) {
511 throw new Error('Worker message received without worker id')
8e5a7cf9 512 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
513 throw new Error(
514 `Worker message received from unknown worker '${message.workerId}'`
515 )
516 }
517 }
518
aa9eede8
JB
519 /**
520 * Gets the worker node key given its worker id.
521 *
522 * @param workerId - The worker id.
aad6fb64 523 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 524 */
72ae84a2 525 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 526 return this.workerNodes.findIndex(
041dc05b 527 workerNode => workerNode.info.id === workerId
aad6fb64 528 )
aa9eede8
JB
529 }
530
afc003b2 531 /** @inheritDoc */
a35560ba 532 public setWorkerChoiceStrategy (
59219cbb
JB
533 workerChoiceStrategy: WorkerChoiceStrategy,
534 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 535 ): void {
bde6b5d7 536 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 537 this.opts.workerChoiceStrategy = workerChoiceStrategy
c63a35a0 538 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
b6b32453
JB
539 this.opts.workerChoiceStrategy
540 )
541 if (workerChoiceStrategyOptions != null) {
542 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
543 }
aa9eede8 544 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 545 workerNode.resetUsage()
9edb9717 546 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 547 }
a20f0ba5
JB
548 }
549
550 /** @inheritDoc */
551 public setWorkerChoiceStrategyOptions (
c63a35a0 552 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
a20f0ba5 553 ): void {
0d80593b 554 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
555 if (workerChoiceStrategyOptions != null) {
556 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 557 }
c63a35a0 558 this.workerChoiceStrategyContext?.setOptions(
a20f0ba5 559 this.opts.workerChoiceStrategyOptions
a35560ba
S
560 )
561 }
562
a20f0ba5 563 /** @inheritDoc */
8f52842f
JB
564 public enableTasksQueue (
565 enable: boolean,
566 tasksQueueOptions?: TasksQueueOptions
567 ): void {
a20f0ba5 568 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
569 this.unsetTaskStealing()
570 this.unsetTasksStealingOnBackPressure()
ef41a6e6 571 this.flushTasksQueues()
a20f0ba5
JB
572 }
573 this.opts.enableTasksQueue = enable
c63a35a0 574 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
575 }
576
577 /** @inheritDoc */
c63a35a0
JB
578 public setTasksQueueOptions (
579 tasksQueueOptions: TasksQueueOptions | undefined
580 ): void {
a20f0ba5 581 if (this.opts.enableTasksQueue === true) {
bde6b5d7 582 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
583 this.opts.tasksQueueOptions =
584 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
585 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
586 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 587 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 588 this.unsetTaskStealing()
d6ca1416
JB
589 this.setTaskStealing()
590 } else {
591 this.unsetTaskStealing()
592 }
593 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 594 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
595 this.setTasksStealingOnBackPressure()
596 } else {
597 this.unsetTasksStealingOnBackPressure()
598 }
5baee0d7 599 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
600 delete this.opts.tasksQueueOptions
601 }
602 }
603
9b38ab2d 604 private buildTasksQueueOptions (
c63a35a0 605 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
606 ): TasksQueueOptions {
607 return {
26ce26ca
JB
608 ...getDefaultTasksQueueOptions(
609 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
610 ),
9b38ab2d
JB
611 ...tasksQueueOptions
612 }
613 }
614
5b49e864 615 private setTasksQueueSize (size: number): void {
20c6f652 616 for (const workerNode of this.workerNodes) {
ff3f866a 617 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
618 }
619 }
620
d6ca1416
JB
621 private setTaskStealing (): void {
622 for (const [workerNodeKey] of this.workerNodes.entries()) {
e44639e9 623 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
624 }
625 }
626
627 private unsetTaskStealing (): void {
628 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 629 this.workerNodes[workerNodeKey].off(
e44639e9
JB
630 'idle',
631 this.handleWorkerNodeIdleEvent
9f95d5eb 632 )
d6ca1416
JB
633 }
634 }
635
636 private setTasksStealingOnBackPressure (): void {
637 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 638 this.workerNodes[workerNodeKey].on(
b5e75be8 639 'backPressure',
e44639e9 640 this.handleWorkerNodeBackPressureEvent
9f95d5eb 641 )
d6ca1416
JB
642 }
643 }
644
645 private unsetTasksStealingOnBackPressure (): void {
646 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 647 this.workerNodes[workerNodeKey].off(
b5e75be8 648 'backPressure',
e44639e9 649 this.handleWorkerNodeBackPressureEvent
9f95d5eb 650 )
d6ca1416
JB
651 }
652 }
653
c319c66b
JB
654 /**
655 * Whether the pool is full or not.
656 *
657 * The pool filling boolean status.
658 */
dea903a8 659 protected get full (): boolean {
26ce26ca
JB
660 return (
661 this.workerNodes.length >=
662 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
663 )
dea903a8 664 }
c2ade475 665
c319c66b
JB
666 /**
667 * Whether the pool is busy or not.
668 *
669 * The pool busyness boolean status.
670 */
671 protected abstract get busy (): boolean
7c0ba920 672
6c6afb84 673 /**
3d76750a 674 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
675 *
676 * @returns Worker nodes busyness boolean status.
677 */
c2ade475 678 protected internalBusy (): boolean {
3d76750a
JB
679 if (this.opts.enableTasksQueue === true) {
680 return (
681 this.workerNodes.findIndex(
041dc05b 682 workerNode =>
3d76750a
JB
683 workerNode.info.ready &&
684 workerNode.usage.tasks.executing <
67f3f2d6
JB
685 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
686 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
687 ) === -1
688 )
3d76750a 689 }
419e8121
JB
690 return (
691 this.workerNodes.findIndex(
692 workerNode =>
693 workerNode.info.ready && workerNode.usage.tasks.executing === 0
694 ) === -1
695 )
cb70b19d
JB
696 }
697
42c677c1
JB
698 private isWorkerNodeBusy (workerNodeKey: number): boolean {
699 if (this.opts.enableTasksQueue === true) {
700 return (
701 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
703 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
704 )
705 }
706 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
707 }
708
e81c38f2 709 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
710 workerNodeKey: number,
711 message: MessageValue<Data>
712 ): Promise<boolean> {
72ae84a2 713 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
714 const taskFunctionOperationListener = (
715 message: MessageValue<Response>
716 ): void => {
717 this.checkMessageWorkerId(message)
1851fed0 718 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 719 if (
ae036c3e
JB
720 message.taskFunctionOperationStatus != null &&
721 message.workerId === workerId
72ae84a2 722 ) {
ae036c3e
JB
723 if (message.taskFunctionOperationStatus) {
724 resolve(true)
c63a35a0 725 } else {
ae036c3e
JB
726 reject(
727 new Error(
c63a35a0 728 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 729 )
72ae84a2 730 )
ae036c3e
JB
731 }
732 this.deregisterWorkerMessageListener(
733 this.getWorkerNodeKeyByWorkerId(message.workerId),
734 taskFunctionOperationListener
72ae84a2
JB
735 )
736 }
ae036c3e
JB
737 }
738 this.registerWorkerMessageListener(
739 workerNodeKey,
740 taskFunctionOperationListener
741 )
72ae84a2
JB
742 this.sendToWorker(workerNodeKey, message)
743 })
744 }
745
746 private async sendTaskFunctionOperationToWorkers (
adee6053 747 message: MessageValue<Data>
e81c38f2
JB
748 ): Promise<boolean> {
749 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
750 const responsesReceived = new Array<MessageValue<Response>>()
751 const taskFunctionOperationsListener = (
752 message: MessageValue<Response>
753 ): void => {
754 this.checkMessageWorkerId(message)
755 if (message.taskFunctionOperationStatus != null) {
756 responsesReceived.push(message)
757 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 758 if (
e81c38f2
JB
759 responsesReceived.every(
760 message => message.taskFunctionOperationStatus === true
761 )
762 ) {
763 resolve(true)
764 } else if (
e81c38f2
JB
765 responsesReceived.some(
766 message => message.taskFunctionOperationStatus === false
767 )
768 ) {
b0b55f57
JB
769 const errorResponse = responsesReceived.find(
770 response => response.taskFunctionOperationStatus === false
771 )
e81c38f2
JB
772 reject(
773 new Error(
b0b55f57 774 `Task function operation '${
e81c38f2 775 message.taskFunctionOperation as string
c63a35a0
JB
776 }' failed on worker ${errorResponse?.workerId} with error: '${
777 errorResponse?.workerError?.message
b0b55f57 778 }'`
e81c38f2
JB
779 )
780 )
781 }
ae036c3e
JB
782 this.deregisterWorkerMessageListener(
783 this.getWorkerNodeKeyByWorkerId(message.workerId),
784 taskFunctionOperationsListener
785 )
e81c38f2 786 }
ae036c3e
JB
787 }
788 }
789 for (const [workerNodeKey] of this.workerNodes.entries()) {
790 this.registerWorkerMessageListener(
791 workerNodeKey,
792 taskFunctionOperationsListener
793 )
72ae84a2 794 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
795 }
796 })
6703b9f4
JB
797 }
798
799 /** @inheritDoc */
800 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
801 for (const workerNode of this.workerNodes) {
802 if (
803 Array.isArray(workerNode.info.taskFunctionNames) &&
804 workerNode.info.taskFunctionNames.includes(name)
805 ) {
806 return true
807 }
808 }
809 return false
6703b9f4
JB
810 }
811
812 /** @inheritDoc */
e81c38f2
JB
813 public async addTaskFunction (
814 name: string,
3feeab69 815 fn: TaskFunction<Data, Response>
e81c38f2 816 ): Promise<boolean> {
3feeab69
JB
817 if (typeof name !== 'string') {
818 throw new TypeError('name argument must be a string')
819 }
820 if (typeof name === 'string' && name.trim().length === 0) {
821 throw new TypeError('name argument must not be an empty string')
822 }
823 if (typeof fn !== 'function') {
824 throw new TypeError('fn argument must be a function')
825 }
adee6053 826 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
827 taskFunctionOperation: 'add',
828 taskFunctionName: name,
3feeab69 829 taskFunction: fn.toString()
6703b9f4 830 })
adee6053
JB
831 this.taskFunctions.set(name, fn)
832 return opResult
6703b9f4
JB
833 }
834
835 /** @inheritDoc */
e81c38f2 836 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
837 if (!this.taskFunctions.has(name)) {
838 throw new Error(
16248b23 839 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
840 )
841 }
adee6053 842 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
843 taskFunctionOperation: 'remove',
844 taskFunctionName: name
845 })
adee6053
JB
846 this.deleteTaskFunctionWorkerUsages(name)
847 this.taskFunctions.delete(name)
848 return opResult
6703b9f4
JB
849 }
850
90d7d101 851 /** @inheritDoc */
6703b9f4 852 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
853 for (const workerNode of this.workerNodes) {
854 if (
6703b9f4
JB
855 Array.isArray(workerNode.info.taskFunctionNames) &&
856 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 857 ) {
6703b9f4 858 return workerNode.info.taskFunctionNames
f2dbbf95 859 }
90d7d101 860 }
f2dbbf95 861 return []
90d7d101
JB
862 }
863
6703b9f4 864 /** @inheritDoc */
e81c38f2 865 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 866 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
867 taskFunctionOperation: 'default',
868 taskFunctionName: name
869 })
6703b9f4
JB
870 }
871
adee6053
JB
872 private deleteTaskFunctionWorkerUsages (name: string): void {
873 for (const workerNode of this.workerNodes) {
874 workerNode.deleteTaskFunctionWorkerUsage(name)
875 }
876 }
877
375f7504
JB
878 private shallExecuteTask (workerNodeKey: number): boolean {
879 return (
880 this.tasksQueueSize(workerNodeKey) === 0 &&
881 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
882 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
883 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
884 )
885 }
886
afc003b2 887 /** @inheritDoc */
7d91a8cd
JB
888 public async execute (
889 data?: Data,
890 name?: string,
891 transferList?: TransferListItem[]
892 ): Promise<Response> {
52b71763 893 return await new Promise<Response>((resolve, reject) => {
15b176e0 894 if (!this.started) {
47352846 895 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 896 return
15b176e0 897 }
711623b8
JB
898 if (this.destroying) {
899 reject(new Error('Cannot execute a task on destroying pool'))
900 return
901 }
7d91a8cd
JB
902 if (name != null && typeof name !== 'string') {
903 reject(new TypeError('name argument must be a string'))
9d2d0da1 904 return
7d91a8cd 905 }
90d7d101
JB
906 if (
907 name != null &&
908 typeof name === 'string' &&
909 name.trim().length === 0
910 ) {
f58b60b9 911 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 912 return
90d7d101 913 }
b558f6b5
JB
914 if (transferList != null && !Array.isArray(transferList)) {
915 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 916 return
b558f6b5
JB
917 }
918 const timestamp = performance.now()
919 const workerNodeKey = this.chooseWorkerNode()
501aea93 920 const task: Task<Data> = {
52b71763
JB
921 name: name ?? DEFAULT_TASK_NAME,
922 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
923 data: data ?? ({} as Data),
7d91a8cd 924 transferList,
52b71763 925 timestamp,
7629bdf1 926 taskId: randomUUID()
52b71763 927 }
67f3f2d6
JB
928 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
929 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
930 resolve,
931 reject,
f18fd12b
JB
932 workerNodeKey,
933 ...(this.emitter != null && {
934 asyncResource: new AsyncResource('poolifier:task', {
935 triggerAsyncId: this.emitter.asyncId,
936 requireManualDestroy: true
937 })
938 })
2e81254d 939 })
52b71763 940 if (
4e377863
JB
941 this.opts.enableTasksQueue === false ||
942 (this.opts.enableTasksQueue === true &&
375f7504 943 this.shallExecuteTask(workerNodeKey))
52b71763 944 ) {
501aea93 945 this.executeTask(workerNodeKey, task)
4e377863
JB
946 } else {
947 this.enqueueTask(workerNodeKey, task)
52b71763 948 }
2e81254d 949 })
280c2a77 950 }
c97c7edb 951
60dc0a9c
JB
952 /**
953 * Starts the minimum number of workers.
954 */
955 private startMinimumNumberOfWorkers (): void {
956 while (
957 this.workerNodes.reduce(
958 (accumulator, workerNode) =>
959 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
960 0
961 ) < this.minimumNumberOfWorkers
962 ) {
963 this.createAndSetupWorkerNode()
964 }
965 }
966
47352846
JB
967 /** @inheritdoc */
968 public start (): void {
711623b8
JB
969 if (this.started) {
970 throw new Error('Cannot start an already started pool')
971 }
972 if (this.starting) {
973 throw new Error('Cannot start an already starting pool')
974 }
975 if (this.destroying) {
976 throw new Error('Cannot start a destroying pool')
977 }
47352846 978 this.starting = true
60dc0a9c 979 this.startMinimumNumberOfWorkers()
47352846
JB
980 this.starting = false
981 this.started = true
982 }
983
afc003b2 984 /** @inheritDoc */
c97c7edb 985 public async destroy (): Promise<void> {
711623b8
JB
986 if (!this.started) {
987 throw new Error('Cannot destroy an already destroyed pool')
988 }
989 if (this.starting) {
990 throw new Error('Cannot destroy an starting pool')
991 }
992 if (this.destroying) {
993 throw new Error('Cannot destroy an already destroying pool')
994 }
995 this.destroying = true
1fbcaa7c 996 await Promise.all(
14c39df1 997 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 998 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
999 })
1000 )
33e6bb4c 1001 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 1002 this.emitter?.emitDestroy()
55082af9 1003 this.readyEventEmitted = false
711623b8 1004 this.destroying = false
15b176e0 1005 this.started = false
c97c7edb
S
1006 }
1007
26ce26ca 1008 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1009 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1010 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1011 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1012 resolve()
85b2561d
JB
1013 return
1014 }
ae036c3e
JB
1015 const killMessageListener = (message: MessageValue<Response>): void => {
1016 this.checkMessageWorkerId(message)
1e3214b6
JB
1017 if (message.kill === 'success') {
1018 resolve()
1019 } else if (message.kill === 'failure') {
72ae84a2
JB
1020 reject(
1021 new Error(
c63a35a0 1022 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1023 )
1024 )
1e3214b6 1025 }
ae036c3e 1026 }
51d9cfbd 1027 // FIXME: should be registered only once
ae036c3e 1028 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1029 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1030 })
1e3214b6
JB
1031 }
1032
4a6952ff 1033 /**
aa9eede8 1034 * Terminates the worker node given its worker node key.
4a6952ff 1035 *
aa9eede8 1036 * @param workerNodeKey - The worker node key.
4a6952ff 1037 */
07e0c9e5
JB
1038 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1039 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1040 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1041 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1042 await waitWorkerNodeEvents(
1043 workerNode,
1044 'taskFinished',
1045 flushedTasks,
1046 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1047 getDefaultTasksQueueOptions(
1048 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1049 ).tasksFinishedTimeout
32b141fd 1050 )
07e0c9e5
JB
1051 await this.sendKillMessageToWorker(workerNodeKey)
1052 await workerNode.terminate()
1053 }
c97c7edb 1054
729c563d 1055 /**
6677a3d3
JB
1056 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1057 * Can be overridden.
afc003b2
JB
1058 *
1059 * @virtual
729c563d 1060 */
280c2a77 1061 protected setupHook (): void {
965df41c 1062 /* Intentionally empty */
280c2a77 1063 }
c97c7edb 1064
729c563d 1065 /**
280c2a77
S
1066 * Should return whether the worker is the main worker or not.
1067 */
1068 protected abstract isMain (): boolean
1069
1070 /**
2e81254d 1071 * Hook executed before the worker task execution.
bf9549ae 1072 * Can be overridden.
729c563d 1073 *
f06e48d8 1074 * @param workerNodeKey - The worker node key.
1c6fe997 1075 * @param task - The task to execute.
729c563d 1076 */
1c6fe997
JB
1077 protected beforeTaskExecutionHook (
1078 workerNodeKey: number,
1079 task: Task<Data>
1080 ): void {
c63a35a0 1081 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1082 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1083 const workerUsage = this.workerNodes[workerNodeKey].usage
1084 ++workerUsage.tasks.executing
c329fd41
JB
1085 updateWaitTimeWorkerUsage(
1086 this.workerChoiceStrategyContext,
1087 workerUsage,
1088 task
1089 )
94407def
JB
1090 }
1091 if (
1092 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1093 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1094 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1095 null
94407def 1096 ) {
67f3f2d6 1097 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1098 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1099 workerNodeKey
67f3f2d6
JB
1100 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1101 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1102 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1103 updateWaitTimeWorkerUsage(
1104 this.workerChoiceStrategyContext,
1105 taskFunctionWorkerUsage,
1106 task
1107 )
b558f6b5 1108 }
c97c7edb
S
1109 }
1110
c01733f1 1111 /**
2e81254d 1112 * Hook executed after the worker task execution.
bf9549ae 1113 * Can be overridden.
c01733f1 1114 *
501aea93 1115 * @param workerNodeKey - The worker node key.
38e795c1 1116 * @param message - The received message.
c01733f1 1117 */
2e81254d 1118 protected afterTaskExecutionHook (
501aea93 1119 workerNodeKey: number,
2740a743 1120 message: MessageValue<Response>
bf9549ae 1121 ): void {
c329fd41 1122 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1123 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1124 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1125 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1126 updateTaskStatisticsWorkerUsage(workerUsage, message)
1127 updateRunTimeWorkerUsage(
1128 this.workerChoiceStrategyContext,
1129 workerUsage,
1130 message
1131 )
1132 updateEluWorkerUsage(
1133 this.workerChoiceStrategyContext,
1134 workerUsage,
1135 message
1136 )
1137 needWorkerChoiceStrategyUpdate = true
94407def
JB
1138 }
1139 if (
1140 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1141 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1142 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1143 message.taskPerformance!.name
94407def
JB
1144 ) != null
1145 ) {
67f3f2d6 1146 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1147 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1148 workerNodeKey
67f3f2d6
JB
1149 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1150 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1151 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1152 updateRunTimeWorkerUsage(
1153 this.workerChoiceStrategyContext,
1154 taskFunctionWorkerUsage,
1155 message
1156 )
1157 updateEluWorkerUsage(
1158 this.workerChoiceStrategyContext,
1159 taskFunctionWorkerUsage,
1160 message
1161 )
1162 needWorkerChoiceStrategyUpdate = true
1163 }
1164 if (needWorkerChoiceStrategyUpdate) {
c63a35a0 1165 this.workerChoiceStrategyContext?.update(workerNodeKey)
b558f6b5
JB
1166 }
1167 }
1168
db0e38ee
JB
1169 /**
1170 * Whether the worker node shall update its task function worker usage or not.
1171 *
1172 * @param workerNodeKey - The worker node key.
1173 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1174 */
1175 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1176 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1177 return (
1851fed0 1178 workerInfo != null &&
6703b9f4
JB
1179 Array.isArray(workerInfo.taskFunctionNames) &&
1180 workerInfo.taskFunctionNames.length > 2
b558f6b5 1181 )
f1c06930
JB
1182 }
1183
280c2a77 1184 /**
f06e48d8 1185 * Chooses a worker node for the next task.
280c2a77 1186 *
6c6afb84 1187 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1188 *
aa9eede8 1189 * @returns The chosen worker node key
280c2a77 1190 */
6c6afb84 1191 private chooseWorkerNode (): number {
930dcf12 1192 if (this.shallCreateDynamicWorker()) {
aa9eede8 1193 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1194 if (
c63a35a0
JB
1195 this.workerChoiceStrategyContext?.getStrategyPolicy()
1196 .dynamicWorkerUsage === true
6c6afb84 1197 ) {
aa9eede8 1198 return workerNodeKey
6c6afb84 1199 }
17393ac8 1200 }
c63a35a0 1201 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
5d240702 1202 return this.workerChoiceStrategyContext!.execute()
930dcf12
JB
1203 }
1204
6c6afb84
JB
1205 /**
1206 * Conditions for dynamic worker creation.
1207 *
1208 * @returns Whether to create a dynamic worker or not.
1209 */
9d9fb7b6 1210 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1211
280c2a77 1212 /**
aa9eede8 1213 * Sends a message to worker given its worker node key.
280c2a77 1214 *
aa9eede8 1215 * @param workerNodeKey - The worker node key.
38e795c1 1216 * @param message - The message.
7d91a8cd 1217 * @param transferList - The optional array of transferable objects.
280c2a77
S
1218 */
1219 protected abstract sendToWorker (
aa9eede8 1220 workerNodeKey: number,
7d91a8cd
JB
1221 message: MessageValue<Data>,
1222 transferList?: TransferListItem[]
280c2a77
S
1223 ): void
1224
4a6952ff 1225 /**
aa9eede8 1226 * Creates a new, completely set up worker node.
4a6952ff 1227 *
aa9eede8 1228 * @returns New, completely set up worker node key.
4a6952ff 1229 */
aa9eede8 1230 protected createAndSetupWorkerNode (): number {
c3719753
JB
1231 const workerNode = this.createWorkerNode()
1232 workerNode.registerWorkerEventHandler(
1233 'online',
1234 this.opts.onlineHandler ?? EMPTY_FUNCTION
1235 )
1236 workerNode.registerWorkerEventHandler(
1237 'message',
1238 this.opts.messageHandler ?? EMPTY_FUNCTION
1239 )
1240 workerNode.registerWorkerEventHandler(
1241 'error',
1242 this.opts.errorHandler ?? EMPTY_FUNCTION
1243 )
1244 workerNode.registerWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1245 workerNode.info.ready = false
2a69b8c5 1246 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1247 if (
b6bfca01 1248 this.started &&
711623b8 1249 !this.destroying &&
9b38ab2d 1250 this.opts.restartWorkerOnError === true
15b176e0 1251 ) {
07e0c9e5 1252 if (workerNode.info.dynamic) {
aa9eede8 1253 this.createAndSetupDynamicWorkerNode()
8a1260a3 1254 } else {
aa9eede8 1255 this.createAndSetupWorkerNode()
8a1260a3 1256 }
5baee0d7 1257 }
3c9123c7
JB
1258 if (
1259 this.started &&
1260 !this.destroying &&
1261 this.opts.enableTasksQueue === true
1262 ) {
9974369e 1263 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1264 }
b4f8aca8 1265 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
07f6f7b1 1266 workerNode?.terminate().catch((error: unknown) => {
07e0c9e5
JB
1267 this.emitter?.emit(PoolEvents.error, error)
1268 })
5baee0d7 1269 })
c3719753
JB
1270 workerNode.registerWorkerEventHandler(
1271 'exit',
1272 this.opts.exitHandler ?? EMPTY_FUNCTION
1273 )
1274 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1275 this.removeWorkerNode(workerNode)
60dc0a9c
JB
1276 if (this.started && !this.destroying) {
1277 this.startMinimumNumberOfWorkers()
1278 }
a974afa6 1279 })
c3719753 1280 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1281 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1282 return workerNodeKey
c97c7edb 1283 }
be0676b3 1284
930dcf12 1285 /**
aa9eede8 1286 * Creates a new, completely set up dynamic worker node.
930dcf12 1287 *
aa9eede8 1288 * @returns New, completely set up dynamic worker node key.
930dcf12 1289 */
aa9eede8
JB
1290 protected createAndSetupDynamicWorkerNode (): number {
1291 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1292 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1293 this.checkMessageWorkerId(message)
aa9eede8
JB
1294 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1295 message.workerId
aad6fb64 1296 )
2b6b412f 1297 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1298 // Kill message received from worker
930dcf12
JB
1299 if (
1300 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1301 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1302 ((this.opts.enableTasksQueue === false &&
aa9eede8 1303 workerUsage.tasks.executing === 0) ||
7b56f532 1304 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1305 workerUsage.tasks.executing === 0 &&
1306 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1307 ) {
f3827d5d 1308 // Flag the worker node as not ready immediately
ae3ab61d 1309 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
07f6f7b1 1310 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
5270d253
JB
1311 this.emitter?.emit(PoolEvents.error, error)
1312 })
930dcf12
JB
1313 }
1314 })
aa9eede8 1315 this.sendToWorker(workerNodeKey, {
e9dd5b66 1316 checkActive: true
21f710aa 1317 })
72ae84a2
JB
1318 if (this.taskFunctions.size > 0) {
1319 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1320 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1321 taskFunctionOperation: 'add',
1322 taskFunctionName,
1323 taskFunction: taskFunction.toString()
07f6f7b1 1324 }).catch((error: unknown) => {
72ae84a2
JB
1325 this.emitter?.emit(PoolEvents.error, error)
1326 })
1327 }
1328 }
e44639e9
JB
1329 const workerNode = this.workerNodes[workerNodeKey]
1330 workerNode.info.dynamic = true
b1aae695 1331 if (
c63a35a0
JB
1332 this.workerChoiceStrategyContext?.getStrategyPolicy()
1333 .dynamicWorkerReady === true ||
1334 this.workerChoiceStrategyContext?.getStrategyPolicy()
1335 .dynamicWorkerUsage === true
b1aae695 1336 ) {
e44639e9 1337 workerNode.info.ready = true
b5e113f6 1338 }
33e6bb4c 1339 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1340 return workerNodeKey
930dcf12
JB
1341 }
1342
a2ed5053 1343 /**
aa9eede8 1344 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1345 *
aa9eede8 1346 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1347 * @param listener - The message listener callback.
1348 */
85aeb3f3
JB
1349 protected abstract registerWorkerMessageListener<
1350 Message extends Data | Response
aa9eede8
JB
1351 >(
1352 workerNodeKey: number,
1353 listener: (message: MessageValue<Message>) => void
1354 ): void
a2ed5053 1355
ae036c3e
JB
1356 /**
1357 * Registers once a listener callback on the worker given its worker node key.
1358 *
1359 * @param workerNodeKey - The worker node key.
1360 * @param listener - The message listener callback.
1361 */
1362 protected abstract registerOnceWorkerMessageListener<
1363 Message extends Data | Response
1364 >(
1365 workerNodeKey: number,
1366 listener: (message: MessageValue<Message>) => void
1367 ): void
1368
1369 /**
1370 * Deregisters a listener callback on the worker given its worker node key.
1371 *
1372 * @param workerNodeKey - The worker node key.
1373 * @param listener - The message listener callback.
1374 */
1375 protected abstract deregisterWorkerMessageListener<
1376 Message extends Data | Response
1377 >(
1378 workerNodeKey: number,
1379 listener: (message: MessageValue<Message>) => void
1380 ): void
1381
a2ed5053 1382 /**
aa9eede8 1383 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1384 * Can be overridden.
1385 *
aa9eede8 1386 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1387 */
aa9eede8 1388 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1389 // Listen to worker messages.
fcd39179
JB
1390 this.registerWorkerMessageListener(
1391 workerNodeKey,
3a776b6d 1392 this.workerMessageListener
fcd39179 1393 )
85aeb3f3 1394 // Send the startup message to worker.
aa9eede8 1395 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1396 // Send the statistics message to worker.
1397 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1398 if (this.opts.enableTasksQueue === true) {
dbd73092 1399 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1400 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1401 'idle',
1402 this.handleWorkerNodeIdleEvent
9f95d5eb 1403 )
47352846
JB
1404 }
1405 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1406 this.workerNodes[workerNodeKey].on(
b5e75be8 1407 'backPressure',
e44639e9 1408 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1409 )
47352846 1410 }
72695f86 1411 }
d2c73f82
JB
1412 }
1413
85aeb3f3 1414 /**
aa9eede8
JB
1415 * Sends the startup message to worker given its worker node key.
1416 *
1417 * @param workerNodeKey - The worker node key.
1418 */
1419 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1420
1421 /**
9edb9717 1422 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1423 *
aa9eede8 1424 * @param workerNodeKey - The worker node key.
85aeb3f3 1425 */
9edb9717 1426 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1427 this.sendToWorker(workerNodeKey, {
1428 statistics: {
1429 runTime:
c63a35a0 1430 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 1431 .runTime.aggregate ?? false,
c63a35a0 1432 elu:
f4d0a470 1433 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c63a35a0 1434 .aggregate ?? false
72ae84a2 1435 }
aa9eede8
JB
1436 })
1437 }
a2ed5053 1438
5eb72b9e
JB
1439 private cannotStealTask (): boolean {
1440 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1441 }
1442
42c677c1
JB
1443 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1444 if (this.shallExecuteTask(workerNodeKey)) {
1445 this.executeTask(workerNodeKey, task)
1446 } else {
1447 this.enqueueTask(workerNodeKey, task)
1448 }
1449 }
1450
a2ed5053 1451 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1452 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1453 return
1454 }
a2ed5053 1455 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1456 const destinationWorkerNodeKey = this.workerNodes.reduce(
1457 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1458 return workerNode.info.ready &&
1459 workerNode.usage.tasks.queued <
1460 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1461 ? workerNodeKey
1462 : minWorkerNodeKey
1463 },
1464 0
1465 )
42c677c1
JB
1466 this.handleTask(
1467 destinationWorkerNodeKey,
67f3f2d6
JB
1468 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1469 this.dequeueTask(workerNodeKey)!
42c677c1 1470 )
dd951876
JB
1471 }
1472 }
1473
b1838604
JB
1474 private updateTaskStolenStatisticsWorkerUsage (
1475 workerNodeKey: number,
b1838604
JB
1476 taskName: string
1477 ): void {
1a880eca 1478 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1479 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1480 if (workerNode?.usage != null) {
b1838604
JB
1481 ++workerNode.usage.tasks.stolen
1482 }
1483 if (
1484 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1485 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1486 ) {
67f3f2d6
JB
1487 const taskFunctionWorkerUsage =
1488 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1489 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1490 ++taskFunctionWorkerUsage.tasks.stolen
1491 }
1492 }
1493
463226a4 1494 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1495 workerNodeKey: number
463226a4
JB
1496 ): void {
1497 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1498 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1499 if (workerNode?.usage != null) {
463226a4
JB
1500 ++workerNode.usage.tasks.sequentiallyStolen
1501 }
f1f77f45
JB
1502 }
1503
1504 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1505 workerNodeKey: number,
1506 taskName: string
1507 ): void {
1508 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1509 if (
1510 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1511 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1512 ) {
67f3f2d6
JB
1513 const taskFunctionWorkerUsage =
1514 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1515 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1516 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1517 }
1518 }
1519
1520 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1521 workerNodeKey: number
463226a4
JB
1522 ): void {
1523 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1524 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1525 if (workerNode?.usage != null) {
463226a4
JB
1526 workerNode.usage.tasks.sequentiallyStolen = 0
1527 }
f1f77f45
JB
1528 }
1529
1530 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1531 workerNodeKey: number,
1532 taskName: string
1533 ): void {
1534 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1535 if (
1536 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1537 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1538 ) {
67f3f2d6
JB
1539 const taskFunctionWorkerUsage =
1540 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1541 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1542 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1543 }
1544 }
1545
e44639e9 1546 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1547 eventDetail: WorkerNodeEventDetail,
463226a4 1548 previousStolenTask?: Task<Data>
9f95d5eb 1549 ): void => {
e1c2dba7 1550 const { workerNodeKey } = eventDetail
463226a4
JB
1551 if (workerNodeKey == null) {
1552 throw new Error(
c63a35a0 1553 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1554 )
1555 }
c63a35a0 1556 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1557 if (
1558 this.cannotStealTask() ||
c63a35a0
JB
1559 (this.info.stealingWorkerNodes ?? 0) >
1560 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1561 ) {
1851fed0 1562 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1563 workerInfo.stealing = false
5eb72b9e
JB
1564 }
1565 return
1566 }
463226a4
JB
1567 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1568 if (
1851fed0 1569 workerInfo != null &&
463226a4
JB
1570 previousStolenTask != null &&
1571 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1572 (workerNodeTasksUsage.executing > 0 ||
1573 this.tasksQueueSize(workerNodeKey) > 0)
1574 ) {
c63a35a0 1575 workerInfo.stealing = false
67f3f2d6 1576 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
369179f6 1577 for (const taskName of workerInfo.taskFunctionNames!) {
f1f77f45
JB
1578 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1579 workerNodeKey,
1580 taskName
1581 )
1582 }
1583 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1584 return
1585 }
1851fed0
JB
1586 if (workerInfo == null) {
1587 throw new Error(
1588 `Worker node with key '${workerNodeKey}' not found in pool`
1589 )
1590 }
c63a35a0 1591 workerInfo.stealing = true
463226a4 1592 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1593 if (
1594 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1595 stolenTask != null
1596 ) {
67f3f2d6 1597 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1598 const taskFunctionTasksWorkerUsage = this.workerNodes[
1599 workerNodeKey
67f3f2d6
JB
1600 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1601 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1602 if (
1603 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1604 (previousStolenTask != null &&
1605 previousStolenTask.name === stolenTask.name &&
1606 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1607 ) {
1608 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1609 workerNodeKey,
67f3f2d6
JB
1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 stolenTask.name!
f1f77f45
JB
1612 )
1613 } else {
1614 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1615 workerNodeKey,
67f3f2d6
JB
1616 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1617 stolenTask.name!
f1f77f45
JB
1618 )
1619 }
1620 }
463226a4
JB
1621 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1622 .then(() => {
e44639e9 1623 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1624 return undefined
1625 })
07f6f7b1 1626 .catch((error: unknown) => {
2e8cfbb7
JB
1627 this.emitter?.emit(PoolEvents.error, error)
1628 })
463226a4
JB
1629 }
1630
1631 private readonly workerNodeStealTask = (
1632 workerNodeKey: number
1633 ): Task<Data> | undefined => {
dd951876 1634 const workerNodes = this.workerNodes
a6b3272b 1635 .slice()
dd951876
JB
1636 .sort(
1637 (workerNodeA, workerNodeB) =>
1638 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1639 )
f201a0cd 1640 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1641 (sourceWorkerNode, sourceWorkerNodeKey) =>
1642 sourceWorkerNode.info.ready &&
5eb72b9e 1643 !sourceWorkerNode.info.stealing &&
463226a4
JB
1644 sourceWorkerNodeKey !== workerNodeKey &&
1645 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1646 )
1647 if (sourceWorkerNode != null) {
67f3f2d6
JB
1648 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1649 const task = sourceWorkerNode.popTask()!
42c677c1 1650 this.handleTask(workerNodeKey, task)
f1f77f45 1651 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1652 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1653 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1654 return task
72695f86
JB
1655 }
1656 }
1657
e44639e9 1658 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1659 eventDetail: WorkerNodeEventDetail
9f95d5eb 1660 ): void => {
5eb72b9e
JB
1661 if (
1662 this.cannotStealTask() ||
c63a35a0
JB
1663 (this.info.stealingWorkerNodes ?? 0) >
1664 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1665 ) {
cb71d660
JB
1666 return
1667 }
e1c2dba7 1668 const { workerId } = eventDetail
f778c355 1669 const sizeOffset = 1
67f3f2d6
JB
1670 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1671 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1672 return
1673 }
72695f86 1674 const sourceWorkerNode =
b5e75be8 1675 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1676 const workerNodes = this.workerNodes
a6b3272b 1677 .slice()
72695f86
JB
1678 .sort(
1679 (workerNodeA, workerNodeB) =>
1680 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1681 )
1682 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1683 if (
0bc68267 1684 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1685 workerNode.info.ready &&
5eb72b9e 1686 !workerNode.info.stealing &&
b5e75be8 1687 workerNode.info.id !== workerId &&
0bc68267 1688 workerNode.usage.tasks.queued <
67f3f2d6
JB
1689 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1690 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1691 ) {
c63a35a0 1692 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1693 if (workerInfo == null) {
1694 throw new Error(
1695 `Worker node with key '${workerNodeKey}' not found in pool`
1696 )
1697 }
c63a35a0 1698 workerInfo.stealing = true
67f3f2d6
JB
1699 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1700 const task = sourceWorkerNode.popTask()!
42c677c1 1701 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1703 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1704 workerInfo.stealing = false
10ecf8fd 1705 }
a2ed5053
JB
1706 }
1707 }
1708
be0676b3 1709 /**
fcd39179 1710 * This method is the message listener registered on each worker.
be0676b3 1711 */
3a776b6d
JB
1712 protected readonly workerMessageListener = (
1713 message: MessageValue<Response>
1714 ): void => {
fcd39179 1715 this.checkMessageWorkerId(message)
b641345c
JB
1716 const { workerId, ready, taskId, taskFunctionNames } = message
1717 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1718 // Worker ready response received from worker
1719 this.handleWorkerReadyResponse(message)
b641345c 1720 } else if (taskId != null) {
fcd39179
JB
1721 // Task execution response received from worker
1722 this.handleTaskExecutionResponse(message)
b641345c 1723 } else if (taskFunctionNames != null) {
fcd39179 1724 // Task function names message received from worker
1851fed0 1725 const workerInfo = this.getWorkerInfo(
b641345c 1726 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1727 )
1728 if (workerInfo != null) {
1729 workerInfo.taskFunctionNames = taskFunctionNames
1730 }
6b272951
JB
1731 }
1732 }
1733
8e8d9101
JB
1734 private checkAndEmitReadyEvent (): void {
1735 if (!this.readyEventEmitted && this.ready) {
1736 this.emitter?.emit(PoolEvents.ready, this.info)
1737 this.readyEventEmitted = true
1738 }
1739 }
1740
10e2aa7e 1741 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4 1742 const { workerId, ready, taskFunctionNames } = message
e44639e9 1743 if (ready == null || !ready) {
c63a35a0 1744 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1745 }
e44639e9
JB
1746 const workerNode =
1747 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1748 workerNode.info.ready = ready
1749 workerNode.info.taskFunctionNames = taskFunctionNames
8e8d9101 1750 this.checkAndEmitReadyEvent()
6b272951
JB
1751 }
1752
1753 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1754 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1755 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1756 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1757 if (promiseResponse != null) {
f18fd12b 1758 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1759 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1760 if (workerError != null) {
1761 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1762 asyncResource != null
1763 ? asyncResource.runInAsyncScope(
1764 reject,
1765 this.emitter,
1766 workerError.message
1767 )
1768 : reject(workerError.message)
6b272951 1769 } else {
f18fd12b
JB
1770 asyncResource != null
1771 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1772 : resolve(data as Response)
6b272951 1773 }
f18fd12b 1774 asyncResource?.emitDestroy()
501aea93 1775 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1777 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1778 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1779 workerNode?.emit('taskFinished', taskId)
16d7e943
JB
1780 if (
1781 this.opts.enableTasksQueue === true &&
1782 !this.destroying &&
1783 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1784 workerNode != null
1785 ) {
9b358e72 1786 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1787 if (
1788 this.tasksQueueSize(workerNodeKey) > 0 &&
1789 workerNodeTasksUsage.executing <
67f3f2d6
JB
1790 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1791 this.opts.tasksQueueOptions!.concurrency!
463226a4 1792 ) {
67f3f2d6
JB
1793 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1794 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1795 }
1796 if (
1797 workerNodeTasksUsage.executing === 0 &&
1798 this.tasksQueueSize(workerNodeKey) === 0 &&
1799 workerNodeTasksUsage.sequentiallyStolen === 0
1800 ) {
e44639e9 1801 workerNode.emit('idle', {
7f0e1334 1802 workerId,
e1c2dba7
JB
1803 workerNodeKey
1804 })
463226a4 1805 }
be0676b3
APA
1806 }
1807 }
be0676b3 1808 }
7c0ba920 1809
a1763c54 1810 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1811 if (this.busy) {
1812 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1813 }
1814 }
1815
1816 private checkAndEmitTaskQueuingEvents (): void {
1817 if (this.hasBackPressure()) {
1818 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1819 }
1820 }
1821
d0878034
JB
1822 /**
1823 * Emits dynamic worker creation events.
1824 */
1825 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1826
8a1260a3 1827 /**
aa9eede8 1828 * Gets the worker information given its worker node key.
8a1260a3
JB
1829 *
1830 * @param workerNodeKey - The worker node key.
3f09ed9f 1831 * @returns The worker information.
8a1260a3 1832 */
1851fed0
JB
1833 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1834 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1835 }
1836
a05c10de 1837 /**
c3719753 1838 * Creates a worker node.
ea7a90d3 1839 *
c3719753 1840 * @returns The created worker node.
ea7a90d3 1841 */
c3719753 1842 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1843 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1844 this.worker,
1845 this.filePath,
1846 {
1847 env: this.opts.env,
1848 workerOptions: this.opts.workerOptions,
1849 tasksQueueBackPressureSize:
32b141fd 1850 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1851 getDefaultTasksQueueOptions(
1852 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1853 ).size
c3719753 1854 }
671d5154 1855 )
b97d82d8 1856 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1857 if (this.starting) {
1858 workerNode.info.ready = true
1859 }
c3719753
JB
1860 return workerNode
1861 }
1862
1863 /**
1864 * Adds the given worker node in the pool worker nodes.
1865 *
1866 * @param workerNode - The worker node.
1867 * @returns The added worker node key.
1868 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1869 */
1870 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1871 this.workerNodes.push(workerNode)
c3719753 1872 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1873 if (workerNodeKey === -1) {
86ed0598 1874 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1875 }
1876 return workerNodeKey
ea7a90d3 1877 }
c923ce56 1878
8e8d9101
JB
1879 private checkAndEmitEmptyEvent (): void {
1880 if (this.empty) {
1881 this.emitter?.emit(PoolEvents.empty, this.info)
1882 this.readyEventEmitted = false
1883 }
1884 }
1885
51fe3d3c 1886 /**
9974369e 1887 * Removes the worker node from the pool worker nodes.
51fe3d3c 1888 *
9974369e 1889 * @param workerNode - The worker node.
51fe3d3c 1890 */
9974369e
JB
1891 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1892 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1893 if (workerNodeKey !== -1) {
1894 this.workerNodes.splice(workerNodeKey, 1)
c63a35a0 1895 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1f68cede 1896 }
8e8d9101 1897 this.checkAndEmitEmptyEvent()
51fe3d3c 1898 }
adc3c320 1899
ae3ab61d 1900 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
1901 const workerInfo = this.getWorkerInfo(workerNodeKey)
1902 if (workerInfo != null) {
1903 workerInfo.ready = false
1904 }
ae3ab61d
JB
1905 }
1906
9e844245
JB
1907 private hasBackPressure (): boolean {
1908 return (
1909 this.opts.enableTasksQueue === true &&
1910 this.workerNodes.findIndex(
041dc05b 1911 workerNode => !workerNode.hasBackPressure()
a1763c54 1912 ) === -1
9e844245 1913 )
e2b31e32
JB
1914 }
1915
b0a4db63 1916 /**
aa9eede8 1917 * Executes the given task on the worker given its worker node key.
b0a4db63 1918 *
aa9eede8 1919 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1920 * @param task - The task to execute.
1921 */
2e81254d 1922 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1923 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1924 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1925 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1926 }
1927
f9f00b5f 1928 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1929 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1930 this.checkAndEmitTaskQueuingEvents()
1931 return tasksQueueSize
adc3c320
JB
1932 }
1933
416fd65c 1934 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1935 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1936 }
1937
416fd65c 1938 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1939 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1940 }
1941
87347ea8
JB
1942 protected flushTasksQueue (workerNodeKey: number): number {
1943 let flushedTasks = 0
920278a2 1944 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
1945 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1946 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 1947 ++flushedTasks
ff733df7 1948 }
4b628b48 1949 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1950 return flushedTasks
ff733df7
JB
1951 }
1952
ef41a6e6
JB
1953 private flushTasksQueues (): void {
1954 for (const [workerNodeKey] of this.workerNodes.entries()) {
1955 this.flushTasksQueue(workerNodeKey)
1956 }
1957 }
c97c7edb 1958}