fix: refine type definition for transferList
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
ded253e2 1import { AsyncResource } from 'node:async_hooks'
2845f2a5 2import { randomUUID } from 'node:crypto'
ded253e2 3import { EventEmitterAsyncResource } from 'node:events'
62c15a68 4import { performance } from 'node:perf_hooks'
ef083f7b 5import type { TransferListItem } from 'node:worker_threads'
ded253e2 6
5c4d16da
JB
7import type {
8 MessageValue,
9 PromiseResponseWrapper,
76d91ea0 10 Task
d35e5717 11} from '../utility-types.js'
bbeadd16 12import {
ded253e2 13 average,
ff128cc9 14 DEFAULT_TASK_NAME,
bbeadd16 15 EMPTY_FUNCTION,
463226a4 16 exponentialDelay,
59317253 17 isKillBehavior,
0d80593b 18 isPlainObject,
90d6701c 19 max,
afe0d5bf 20 median,
90d6701c 21 min,
463226a4
JB
22 round,
23 sleep
d35e5717 24} from '../utils.js'
d35e5717 25import type { TaskFunction } from '../worker/task-functions.js'
ded253e2 26import { KillBehaviors } from '../worker/worker-options.js'
c4855468 27import {
65d7a1c9 28 type IPool,
c4855468 29 PoolEvents,
6b27d407 30 type PoolInfo,
c4855468 31 type PoolOptions,
6b27d407
JB
32 type PoolType,
33 PoolTypes,
4b628b48 34 type TasksQueueOptions
d35e5717 35} from './pool.js'
a35560ba 36import {
f0d7f803 37 Measurements,
a35560ba 38 WorkerChoiceStrategies,
a20f0ba5
JB
39 type WorkerChoiceStrategy,
40 type WorkerChoiceStrategyOptions
d35e5717
JB
41} from './selection-strategies/selection-strategies-types.js'
42import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context.js'
bde6b5d7
JB
43import {
44 checkFilePath,
45 checkValidTasksQueueOptions,
bfc75cca 46 checkValidWorkerChoiceStrategy,
32b141fd 47 getDefaultTasksQueueOptions,
c329fd41
JB
48 updateEluWorkerUsage,
49 updateRunTimeWorkerUsage,
50 updateTaskStatisticsWorkerUsage,
51 updateWaitTimeWorkerUsage,
87347ea8 52 waitWorkerNodeEvents
d35e5717 53} from './utils.js'
ded253e2
JB
54import { version } from './version.js'
55import type {
56 IWorker,
57 IWorkerNode,
58 WorkerInfo,
59 WorkerNodeEventDetail,
60 WorkerType
61} from './worker.js'
62import { WorkerNode } from './worker-node.js'
23ccf9d7 63
729c563d 64/**
ea7a90d3 65 * Base class that implements some shared logic for all poolifier pools.
729c563d 66 *
38e795c1 67 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
68 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
69 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 70 */
c97c7edb 71export abstract class AbstractPool<
f06e48d8 72 Worker extends IWorker,
d3c8a1a8
S
73 Data = unknown,
74 Response = unknown
c4855468 75> implements IPool<Worker, Data, Response> {
afc003b2 76 /** @inheritDoc */
4b628b48 77 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 78
afc003b2 79 /** @inheritDoc */
f80125ca 80 public emitter?: EventEmitterAsyncResource
7c0ba920 81
be0676b3 82 /**
419e8121 83 * The task execution response promise map:
2740a743 84 * - `key`: The message id of each submitted task.
a3445496 85 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 86 *
a3445496 87 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 88 */
501aea93
JB
89 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
90 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 91
a35560ba 92 /**
51fe3d3c 93 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba 94 */
c63a35a0 95 protected workerChoiceStrategyContext?: WorkerChoiceStrategyContext<
78cea37e
JB
96 Worker,
97 Data,
98 Response
a35560ba
S
99 >
100
72ae84a2
JB
101 /**
102 * The task functions added at runtime map:
103 * - `key`: The task function name.
104 * - `value`: The task function itself.
105 */
106 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
107
15b176e0
JB
108 /**
109 * Whether the pool is started or not.
110 */
111 private started: boolean
47352846
JB
112 /**
113 * Whether the pool is starting or not.
114 */
115 private starting: boolean
711623b8
JB
116 /**
117 * Whether the pool is destroying or not.
118 */
119 private destroying: boolean
b362a929
JB
120 /**
121 * Whether the minimum number of workers is starting or not.
122 */
123 private startingMinimumNumberOfWorkers: boolean
55082af9
JB
124 /**
125 * Whether the pool ready event has been emitted or not.
126 */
127 private readyEventEmitted: boolean
afe0d5bf
JB
128 /**
129 * The start timestamp of the pool.
130 */
131 private readonly startTimestamp
132
729c563d
S
133 /**
134 * Constructs a new poolifier pool.
135 *
00e1bdeb 136 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 137 * @param filePath - Path to the worker file.
38e795c1 138 * @param opts - Options for the pool.
00e1bdeb 139 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 140 */
c97c7edb 141 public constructor (
26ce26ca 142 protected readonly minimumNumberOfWorkers: number,
b4213b7f 143 protected readonly filePath: string,
26ce26ca
JB
144 protected readonly opts: PoolOptions<Worker>,
145 protected readonly maximumNumberOfWorkers?: number
c97c7edb 146 ) {
78cea37e 147 if (!this.isMain()) {
04f45163 148 throw new Error(
8c6d4acf 149 'Cannot start a pool from a worker with the same type as the pool'
04f45163 150 )
c97c7edb 151 }
26ce26ca 152 this.checkPoolType()
bde6b5d7 153 checkFilePath(this.filePath)
26ce26ca 154 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 155 this.checkPoolOptions(this.opts)
1086026a 156
7254e419
JB
157 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
158 this.executeTask = this.executeTask.bind(this)
159 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 160
6bd72cd0 161 if (this.opts.enableEvents === true) {
b5604034 162 this.initializeEventEmitter()
7c0ba920 163 }
d59df138
JB
164 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
165 Worker,
166 Data,
167 Response
da309861
JB
168 >(
169 this,
170 this.opts.workerChoiceStrategy,
171 this.opts.workerChoiceStrategyOptions
172 )
b6b32453
JB
173
174 this.setupHook()
175
72ae84a2
JB
176 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
177
47352846 178 this.started = false
075e51d1 179 this.starting = false
711623b8 180 this.destroying = false
55082af9 181 this.readyEventEmitted = false
b362a929 182 this.startingMinimumNumberOfWorkers = false
47352846
JB
183 if (this.opts.startWorkers === true) {
184 this.start()
185 }
afe0d5bf
JB
186
187 this.startTimestamp = performance.now()
c97c7edb
S
188 }
189
26ce26ca
JB
190 private checkPoolType (): void {
191 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
192 throw new Error(
193 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
194 )
195 }
196 }
197
c63a35a0
JB
198 private checkMinimumNumberOfWorkers (
199 minimumNumberOfWorkers: number | undefined
200 ): void {
af7f2788 201 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
202 throw new Error(
203 'Cannot instantiate a pool without specifying the number of workers'
204 )
af7f2788 205 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 206 throw new TypeError(
0d80593b 207 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 208 )
af7f2788 209 } else if (minimumNumberOfWorkers < 0) {
473c717a 210 throw new RangeError(
8d3782fa
JB
211 'Cannot instantiate a pool with a negative number of workers'
212 )
af7f2788 213 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
214 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
215 }
216 }
217
7c0ba920 218 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 219 if (isPlainObject(opts)) {
47352846 220 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 221 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
222 this.opts.workerChoiceStrategy =
223 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 224 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 225 opts.workerChoiceStrategyOptions
2324f8c9 226 )
26ce26ca
JB
227 if (opts.workerChoiceStrategyOptions != null) {
228 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 229 }
1f68cede 230 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
231 this.opts.enableEvents = opts.enableEvents ?? true
232 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
233 if (this.opts.enableTasksQueue) {
c63a35a0 234 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 235 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 236 opts.tasksQueueOptions
0d80593b
JB
237 )
238 }
239 } else {
240 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 241 }
aee46736
JB
242 }
243
0d80593b 244 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 245 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 246 ): void {
2324f8c9
JB
247 if (
248 workerChoiceStrategyOptions != null &&
249 !isPlainObject(workerChoiceStrategyOptions)
250 ) {
0d80593b
JB
251 throw new TypeError(
252 'Invalid worker choice strategy options: must be a plain object'
253 )
254 }
49be33fe 255 if (
2324f8c9 256 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
257 Object.keys(workerChoiceStrategyOptions.weights).length !==
258 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
259 ) {
260 throw new Error(
261 'Invalid worker choice strategy options: must have a weight for each worker node'
262 )
263 }
f0d7f803 264 if (
2324f8c9 265 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
266 !Object.values(Measurements).includes(
267 workerChoiceStrategyOptions.measurement
268 )
269 ) {
270 throw new Error(
271 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
272 )
273 }
0d80593b
JB
274 }
275
b5604034 276 private initializeEventEmitter (): void {
5b7d00b4 277 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
278 name: `poolifier:${this.type}-${this.worker}-pool`
279 })
280 }
281
08f3f44c 282 /** @inheritDoc */
6b27d407
JB
283 public get info (): PoolInfo {
284 return {
23ccf9d7 285 version,
6b27d407 286 type: this.type,
184855e6 287 worker: this.worker,
47352846 288 started: this.started,
2431bdb4 289 ready: this.ready,
67f3f2d6
JB
290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
291 strategy: this.opts.workerChoiceStrategy!,
0e8587d2 292 strategyRetries: this.workerChoiceStrategyContext?.retriesCount ?? 0,
26ce26ca
JB
293 minSize: this.minimumNumberOfWorkers,
294 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
295 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 296 .runTime.aggregate === true &&
c63a35a0
JB
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && {
299 utilization: round(this.utilization)
300 }),
6b27d407
JB
301 workerNodes: this.workerNodes.length,
302 idleWorkerNodes: this.workerNodes.reduce(
303 (accumulator, workerNode) =>
f59e1027 304 workerNode.usage.tasks.executing === 0
a4e07f72
JB
305 ? accumulator + 1
306 : accumulator,
6b27d407
JB
307 0
308 ),
5eb72b9e
JB
309 ...(this.opts.enableTasksQueue === true && {
310 stealingWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
312 workerNode.info.stealing ? accumulator + 1 : accumulator,
313 0
314 )
315 }),
6b27d407 316 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
317 (accumulator, _workerNode, workerNodeKey) =>
318 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
319 0
320 ),
a4e07f72 321 executedTasks: this.workerNodes.reduce(
6b27d407 322 (accumulator, workerNode) =>
f59e1027 323 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
324 0
325 ),
326 executingTasks: this.workerNodes.reduce(
327 (accumulator, workerNode) =>
f59e1027 328 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
329 0
330 ),
daf86646
JB
331 ...(this.opts.enableTasksQueue === true && {
332 queuedTasks: this.workerNodes.reduce(
333 (accumulator, workerNode) =>
334 accumulator + workerNode.usage.tasks.queued,
335 0
336 )
337 }),
338 ...(this.opts.enableTasksQueue === true && {
339 maxQueuedTasks: this.workerNodes.reduce(
340 (accumulator, workerNode) =>
c63a35a0 341 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
342 0
343 )
344 }),
a1763c54
JB
345 ...(this.opts.enableTasksQueue === true && {
346 backPressure: this.hasBackPressure()
347 }),
68cbdc84
JB
348 ...(this.opts.enableTasksQueue === true && {
349 stolenTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
351 accumulator + workerNode.usage.tasks.stolen,
352 0
353 )
354 }),
a4e07f72
JB
355 failedTasks: this.workerNodes.reduce(
356 (accumulator, workerNode) =>
f59e1027 357 accumulator + workerNode.usage.tasks.failed,
a4e07f72 358 0
1dcf8b7b 359 ),
26ce26ca 360 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 361 .runTime.aggregate === true && {
1dcf8b7b 362 runTime: {
98e72cda 363 minimum: round(
90d6701c 364 min(
98e72cda 365 ...this.workerNodes.map(
c63a35a0 366 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 367 )
1dcf8b7b
JB
368 )
369 ),
98e72cda 370 maximum: round(
90d6701c 371 max(
98e72cda 372 ...this.workerNodes.map(
c63a35a0 373 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 374 )
1dcf8b7b 375 )
98e72cda 376 ),
c63a35a0 377 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
378 .runTime.average && {
379 average: round(
380 average(
381 this.workerNodes.reduce<number[]>(
382 (accumulator, workerNode) =>
383 accumulator.concat(workerNode.usage.runTime.history),
384 []
385 )
98e72cda 386 )
dc021bcc 387 )
3baa0837 388 }),
c63a35a0 389 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
390 .runTime.median && {
391 median: round(
392 median(
3baa0837
JB
393 this.workerNodes.reduce<number[]>(
394 (accumulator, workerNode) =>
395 accumulator.concat(workerNode.usage.runTime.history),
396 []
98e72cda
JB
397 )
398 )
399 )
400 })
1dcf8b7b
JB
401 }
402 }),
26ce26ca 403 ...(this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 404 .waitTime.aggregate === true && {
1dcf8b7b 405 waitTime: {
98e72cda 406 minimum: round(
90d6701c 407 min(
98e72cda 408 ...this.workerNodes.map(
c63a35a0 409 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 410 )
1dcf8b7b
JB
411 )
412 ),
98e72cda 413 maximum: round(
90d6701c 414 max(
98e72cda 415 ...this.workerNodes.map(
c63a35a0 416 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 417 )
1dcf8b7b 418 )
98e72cda 419 ),
c63a35a0 420 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
3baa0837
JB
421 .waitTime.average && {
422 average: round(
423 average(
424 this.workerNodes.reduce<number[]>(
425 (accumulator, workerNode) =>
426 accumulator.concat(workerNode.usage.waitTime.history),
427 []
428 )
98e72cda 429 )
dc021bcc 430 )
3baa0837 431 }),
c63a35a0 432 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
98e72cda
JB
433 .waitTime.median && {
434 median: round(
435 median(
3baa0837
JB
436 this.workerNodes.reduce<number[]>(
437 (accumulator, workerNode) =>
438 accumulator.concat(workerNode.usage.waitTime.history),
439 []
98e72cda
JB
440 )
441 )
442 )
443 })
1dcf8b7b
JB
444 }
445 })
6b27d407
JB
446 }
447 }
08f3f44c 448
aa9eede8
JB
449 /**
450 * The pool readiness boolean status.
451 */
2431bdb4 452 private get ready (): boolean {
8e8d9101
JB
453 if (this.empty) {
454 return false
455 }
2431bdb4 456 return (
b97d82d8
JB
457 this.workerNodes.reduce(
458 (accumulator, workerNode) =>
459 !workerNode.info.dynamic && workerNode.info.ready
460 ? accumulator + 1
461 : accumulator,
462 0
26ce26ca 463 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
464 )
465 }
466
8e8d9101
JB
467 /**
468 * The pool emptiness boolean status.
469 */
470 protected get empty (): boolean {
fb43035c 471 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
472 }
473
afe0d5bf 474 /**
aa9eede8 475 * The approximate pool utilization.
afe0d5bf
JB
476 *
477 * @returns The pool utilization.
478 */
479 private get utilization (): number {
8e5ca040 480 const poolTimeCapacity =
26ce26ca
JB
481 (performance.now() - this.startTimestamp) *
482 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
483 const totalTasksRunTime = this.workerNodes.reduce(
484 (accumulator, workerNode) =>
c63a35a0 485 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
486 0
487 )
488 const totalTasksWaitTime = this.workerNodes.reduce(
489 (accumulator, workerNode) =>
c63a35a0 490 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
491 0
492 )
8e5ca040 493 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
494 }
495
8881ae32 496 /**
aa9eede8 497 * The pool type.
8881ae32
JB
498 *
499 * If it is `'dynamic'`, it provides the `max` property.
500 */
501 protected abstract get type (): PoolType
502
184855e6 503 /**
aa9eede8 504 * The worker type.
184855e6
JB
505 */
506 protected abstract get worker (): WorkerType
507
6b813701
JB
508 /**
509 * Checks if the worker id sent in the received message from a worker is valid.
510 *
511 * @param message - The received message.
512 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
513 */
4d159167 514 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
515 if (message.workerId == null) {
516 throw new Error('Worker message received without worker id')
8e5a7cf9 517 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
518 throw new Error(
519 `Worker message received from unknown worker '${message.workerId}'`
520 )
521 }
522 }
523
aa9eede8
JB
524 /**
525 * Gets the worker node key given its worker id.
526 *
527 * @param workerId - The worker id.
aad6fb64 528 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 529 */
72ae84a2 530 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 531 return this.workerNodes.findIndex(
041dc05b 532 workerNode => workerNode.info.id === workerId
aad6fb64 533 )
aa9eede8
JB
534 }
535
afc003b2 536 /** @inheritDoc */
a35560ba 537 public setWorkerChoiceStrategy (
59219cbb
JB
538 workerChoiceStrategy: WorkerChoiceStrategy,
539 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 540 ): void {
bde6b5d7 541 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 542 this.opts.workerChoiceStrategy = workerChoiceStrategy
c63a35a0 543 this.workerChoiceStrategyContext?.setWorkerChoiceStrategy(
b6b32453
JB
544 this.opts.workerChoiceStrategy
545 )
546 if (workerChoiceStrategyOptions != null) {
547 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
548 }
aa9eede8 549 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 550 workerNode.resetUsage()
9edb9717 551 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 552 }
a20f0ba5
JB
553 }
554
555 /** @inheritDoc */
556 public setWorkerChoiceStrategyOptions (
c63a35a0 557 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
a20f0ba5 558 ): void {
0d80593b 559 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
560 if (workerChoiceStrategyOptions != null) {
561 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
8990357d 562 }
c63a35a0 563 this.workerChoiceStrategyContext?.setOptions(
a20f0ba5 564 this.opts.workerChoiceStrategyOptions
a35560ba
S
565 )
566 }
567
a20f0ba5 568 /** @inheritDoc */
8f52842f
JB
569 public enableTasksQueue (
570 enable: boolean,
571 tasksQueueOptions?: TasksQueueOptions
572 ): void {
a20f0ba5 573 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
574 this.unsetTaskStealing()
575 this.unsetTasksStealingOnBackPressure()
ef41a6e6 576 this.flushTasksQueues()
a20f0ba5
JB
577 }
578 this.opts.enableTasksQueue = enable
c63a35a0 579 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
580 }
581
582 /** @inheritDoc */
c63a35a0
JB
583 public setTasksQueueOptions (
584 tasksQueueOptions: TasksQueueOptions | undefined
585 ): void {
a20f0ba5 586 if (this.opts.enableTasksQueue === true) {
bde6b5d7 587 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
588 this.opts.tasksQueueOptions =
589 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
590 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
591 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 592 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 593 this.unsetTaskStealing()
d6ca1416
JB
594 this.setTaskStealing()
595 } else {
596 this.unsetTaskStealing()
597 }
598 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 599 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
600 this.setTasksStealingOnBackPressure()
601 } else {
602 this.unsetTasksStealingOnBackPressure()
603 }
5baee0d7 604 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
605 delete this.opts.tasksQueueOptions
606 }
607 }
608
9b38ab2d 609 private buildTasksQueueOptions (
c63a35a0 610 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
611 ): TasksQueueOptions {
612 return {
26ce26ca
JB
613 ...getDefaultTasksQueueOptions(
614 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
615 ),
9b38ab2d
JB
616 ...tasksQueueOptions
617 }
618 }
619
5b49e864 620 private setTasksQueueSize (size: number): void {
20c6f652 621 for (const workerNode of this.workerNodes) {
ff3f866a 622 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
623 }
624 }
625
d6ca1416
JB
626 private setTaskStealing (): void {
627 for (const [workerNodeKey] of this.workerNodes.entries()) {
e44639e9 628 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
629 }
630 }
631
632 private unsetTaskStealing (): void {
633 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 634 this.workerNodes[workerNodeKey].off(
e44639e9
JB
635 'idle',
636 this.handleWorkerNodeIdleEvent
9f95d5eb 637 )
d6ca1416
JB
638 }
639 }
640
641 private setTasksStealingOnBackPressure (): void {
642 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 643 this.workerNodes[workerNodeKey].on(
b5e75be8 644 'backPressure',
e44639e9 645 this.handleWorkerNodeBackPressureEvent
9f95d5eb 646 )
d6ca1416
JB
647 }
648 }
649
650 private unsetTasksStealingOnBackPressure (): void {
651 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 652 this.workerNodes[workerNodeKey].off(
b5e75be8 653 'backPressure',
e44639e9 654 this.handleWorkerNodeBackPressureEvent
9f95d5eb 655 )
d6ca1416
JB
656 }
657 }
658
c319c66b
JB
659 /**
660 * Whether the pool is full or not.
661 *
662 * The pool filling boolean status.
663 */
dea903a8 664 protected get full (): boolean {
26ce26ca
JB
665 return (
666 this.workerNodes.length >=
667 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
668 )
dea903a8 669 }
c2ade475 670
c319c66b
JB
671 /**
672 * Whether the pool is busy or not.
673 *
674 * The pool busyness boolean status.
675 */
676 protected abstract get busy (): boolean
7c0ba920 677
6c6afb84 678 /**
3d76750a 679 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
680 *
681 * @returns Worker nodes busyness boolean status.
682 */
c2ade475 683 protected internalBusy (): boolean {
3d76750a
JB
684 if (this.opts.enableTasksQueue === true) {
685 return (
686 this.workerNodes.findIndex(
041dc05b 687 workerNode =>
3d76750a
JB
688 workerNode.info.ready &&
689 workerNode.usage.tasks.executing <
67f3f2d6
JB
690 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
691 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
692 ) === -1
693 )
3d76750a 694 }
419e8121
JB
695 return (
696 this.workerNodes.findIndex(
697 workerNode =>
698 workerNode.info.ready && workerNode.usage.tasks.executing === 0
699 ) === -1
700 )
cb70b19d
JB
701 }
702
42c677c1
JB
703 private isWorkerNodeBusy (workerNodeKey: number): boolean {
704 if (this.opts.enableTasksQueue === true) {
705 return (
706 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
707 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
708 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
709 )
710 }
711 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
712 }
713
e81c38f2 714 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
715 workerNodeKey: number,
716 message: MessageValue<Data>
717 ): Promise<boolean> {
72ae84a2 718 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
719 const taskFunctionOperationListener = (
720 message: MessageValue<Response>
721 ): void => {
722 this.checkMessageWorkerId(message)
1851fed0 723 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 724 if (
ae036c3e
JB
725 message.taskFunctionOperationStatus != null &&
726 message.workerId === workerId
72ae84a2 727 ) {
ae036c3e
JB
728 if (message.taskFunctionOperationStatus) {
729 resolve(true)
c63a35a0 730 } else {
ae036c3e
JB
731 reject(
732 new Error(
c63a35a0 733 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 734 )
72ae84a2 735 )
ae036c3e
JB
736 }
737 this.deregisterWorkerMessageListener(
738 this.getWorkerNodeKeyByWorkerId(message.workerId),
739 taskFunctionOperationListener
72ae84a2
JB
740 )
741 }
ae036c3e
JB
742 }
743 this.registerWorkerMessageListener(
744 workerNodeKey,
745 taskFunctionOperationListener
746 )
72ae84a2
JB
747 this.sendToWorker(workerNodeKey, message)
748 })
749 }
750
751 private async sendTaskFunctionOperationToWorkers (
adee6053 752 message: MessageValue<Data>
e81c38f2
JB
753 ): Promise<boolean> {
754 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
755 const responsesReceived = new Array<MessageValue<Response>>()
756 const taskFunctionOperationsListener = (
757 message: MessageValue<Response>
758 ): void => {
759 this.checkMessageWorkerId(message)
760 if (message.taskFunctionOperationStatus != null) {
761 responsesReceived.push(message)
762 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 763 if (
e81c38f2
JB
764 responsesReceived.every(
765 message => message.taskFunctionOperationStatus === true
766 )
767 ) {
768 resolve(true)
769 } else if (
e81c38f2
JB
770 responsesReceived.some(
771 message => message.taskFunctionOperationStatus === false
772 )
773 ) {
b0b55f57
JB
774 const errorResponse = responsesReceived.find(
775 response => response.taskFunctionOperationStatus === false
776 )
e81c38f2
JB
777 reject(
778 new Error(
b0b55f57 779 `Task function operation '${
e81c38f2 780 message.taskFunctionOperation as string
c63a35a0
JB
781 }' failed on worker ${errorResponse?.workerId} with error: '${
782 errorResponse?.workerError?.message
b0b55f57 783 }'`
e81c38f2
JB
784 )
785 )
786 }
ae036c3e
JB
787 this.deregisterWorkerMessageListener(
788 this.getWorkerNodeKeyByWorkerId(message.workerId),
789 taskFunctionOperationsListener
790 )
e81c38f2 791 }
ae036c3e
JB
792 }
793 }
794 for (const [workerNodeKey] of this.workerNodes.entries()) {
795 this.registerWorkerMessageListener(
796 workerNodeKey,
797 taskFunctionOperationsListener
798 )
72ae84a2 799 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
800 }
801 })
6703b9f4
JB
802 }
803
804 /** @inheritDoc */
805 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
806 for (const workerNode of this.workerNodes) {
807 if (
808 Array.isArray(workerNode.info.taskFunctionNames) &&
809 workerNode.info.taskFunctionNames.includes(name)
810 ) {
811 return true
812 }
813 }
814 return false
6703b9f4
JB
815 }
816
817 /** @inheritDoc */
e81c38f2
JB
818 public async addTaskFunction (
819 name: string,
3feeab69 820 fn: TaskFunction<Data, Response>
e81c38f2 821 ): Promise<boolean> {
3feeab69
JB
822 if (typeof name !== 'string') {
823 throw new TypeError('name argument must be a string')
824 }
825 if (typeof name === 'string' && name.trim().length === 0) {
826 throw new TypeError('name argument must not be an empty string')
827 }
828 if (typeof fn !== 'function') {
829 throw new TypeError('fn argument must be a function')
830 }
adee6053 831 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
832 taskFunctionOperation: 'add',
833 taskFunctionName: name,
3feeab69 834 taskFunction: fn.toString()
6703b9f4 835 })
adee6053
JB
836 this.taskFunctions.set(name, fn)
837 return opResult
6703b9f4
JB
838 }
839
840 /** @inheritDoc */
e81c38f2 841 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
842 if (!this.taskFunctions.has(name)) {
843 throw new Error(
16248b23 844 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
845 )
846 }
adee6053 847 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
848 taskFunctionOperation: 'remove',
849 taskFunctionName: name
850 })
adee6053
JB
851 this.deleteTaskFunctionWorkerUsages(name)
852 this.taskFunctions.delete(name)
853 return opResult
6703b9f4
JB
854 }
855
90d7d101 856 /** @inheritDoc */
6703b9f4 857 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
858 for (const workerNode of this.workerNodes) {
859 if (
6703b9f4
JB
860 Array.isArray(workerNode.info.taskFunctionNames) &&
861 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 862 ) {
6703b9f4 863 return workerNode.info.taskFunctionNames
f2dbbf95 864 }
90d7d101 865 }
f2dbbf95 866 return []
90d7d101
JB
867 }
868
6703b9f4 869 /** @inheritDoc */
e81c38f2 870 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 871 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
872 taskFunctionOperation: 'default',
873 taskFunctionName: name
874 })
6703b9f4
JB
875 }
876
adee6053
JB
877 private deleteTaskFunctionWorkerUsages (name: string): void {
878 for (const workerNode of this.workerNodes) {
879 workerNode.deleteTaskFunctionWorkerUsage(name)
880 }
881 }
882
375f7504
JB
883 private shallExecuteTask (workerNodeKey: number): boolean {
884 return (
885 this.tasksQueueSize(workerNodeKey) === 0 &&
886 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
887 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
888 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
889 )
890 }
891
afc003b2 892 /** @inheritDoc */
7d91a8cd
JB
893 public async execute (
894 data?: Data,
895 name?: string,
6a3ecc50 896 transferList?: readonly TransferListItem[]
7d91a8cd 897 ): Promise<Response> {
52b71763 898 return await new Promise<Response>((resolve, reject) => {
15b176e0 899 if (!this.started) {
47352846 900 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 901 return
15b176e0 902 }
711623b8
JB
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
7d91a8cd
JB
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
9d2d0da1 909 return
7d91a8cd 910 }
90d7d101
JB
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
f58b60b9 916 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 917 return
90d7d101 918 }
b558f6b5
JB
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 921 return
b558f6b5
JB
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
501aea93 925 const task: Task<Data> = {
52b71763
JB
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
7d91a8cd 929 transferList,
52b71763 930 timestamp,
7629bdf1 931 taskId: randomUUID()
52b71763 932 }
67f3f2d6
JB
933 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
934 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
935 resolve,
936 reject,
f18fd12b
JB
937 workerNodeKey,
938 ...(this.emitter != null && {
939 asyncResource: new AsyncResource('poolifier:task', {
940 triggerAsyncId: this.emitter.asyncId,
941 requireManualDestroy: true
942 })
943 })
2e81254d 944 })
52b71763 945 if (
4e377863
JB
946 this.opts.enableTasksQueue === false ||
947 (this.opts.enableTasksQueue === true &&
375f7504 948 this.shallExecuteTask(workerNodeKey))
52b71763 949 ) {
501aea93 950 this.executeTask(workerNodeKey, task)
4e377863
JB
951 } else {
952 this.enqueueTask(workerNodeKey, task)
52b71763 953 }
2e81254d 954 })
280c2a77 955 }
c97c7edb 956
60dc0a9c
JB
957 /**
958 * Starts the minimum number of workers.
959 */
960 private startMinimumNumberOfWorkers (): void {
b362a929 961 this.startingMinimumNumberOfWorkers = true
60dc0a9c
JB
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.minimumNumberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
b362a929 971 this.startingMinimumNumberOfWorkers = false
60dc0a9c
JB
972 }
973
47352846
JB
974 /** @inheritdoc */
975 public start (): void {
711623b8
JB
976 if (this.started) {
977 throw new Error('Cannot start an already started pool')
978 }
979 if (this.starting) {
980 throw new Error('Cannot start an already starting pool')
981 }
982 if (this.destroying) {
983 throw new Error('Cannot start a destroying pool')
984 }
47352846 985 this.starting = true
60dc0a9c 986 this.startMinimumNumberOfWorkers()
47352846
JB
987 this.starting = false
988 this.started = true
989 }
990
afc003b2 991 /** @inheritDoc */
c97c7edb 992 public async destroy (): Promise<void> {
711623b8
JB
993 if (!this.started) {
994 throw new Error('Cannot destroy an already destroyed pool')
995 }
996 if (this.starting) {
997 throw new Error('Cannot destroy an starting pool')
998 }
999 if (this.destroying) {
1000 throw new Error('Cannot destroy an already destroying pool')
1001 }
1002 this.destroying = true
1fbcaa7c 1003 await Promise.all(
14c39df1 1004 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 1005 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
1006 })
1007 )
33e6bb4c 1008 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 1009 this.emitter?.emitDestroy()
55082af9 1010 this.readyEventEmitted = false
711623b8 1011 this.destroying = false
15b176e0 1012 this.started = false
c97c7edb
S
1013 }
1014
26ce26ca 1015 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1016 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1017 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1018 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1019 resolve()
85b2561d
JB
1020 return
1021 }
ae036c3e
JB
1022 const killMessageListener = (message: MessageValue<Response>): void => {
1023 this.checkMessageWorkerId(message)
1e3214b6
JB
1024 if (message.kill === 'success') {
1025 resolve()
1026 } else if (message.kill === 'failure') {
72ae84a2
JB
1027 reject(
1028 new Error(
c63a35a0 1029 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1030 )
1031 )
1e3214b6 1032 }
ae036c3e 1033 }
51d9cfbd 1034 // FIXME: should be registered only once
ae036c3e 1035 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1036 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1037 })
1e3214b6
JB
1038 }
1039
4a6952ff 1040 /**
aa9eede8 1041 * Terminates the worker node given its worker node key.
4a6952ff 1042 *
aa9eede8 1043 * @param workerNodeKey - The worker node key.
4a6952ff 1044 */
07e0c9e5
JB
1045 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1046 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1047 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1048 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1049 await waitWorkerNodeEvents(
1050 workerNode,
1051 'taskFinished',
1052 flushedTasks,
1053 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1054 getDefaultTasksQueueOptions(
1055 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1056 ).tasksFinishedTimeout
32b141fd 1057 )
07e0c9e5
JB
1058 await this.sendKillMessageToWorker(workerNodeKey)
1059 await workerNode.terminate()
1060 }
c97c7edb 1061
729c563d 1062 /**
6677a3d3
JB
1063 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1064 * Can be overridden.
afc003b2
JB
1065 *
1066 * @virtual
729c563d 1067 */
280c2a77 1068 protected setupHook (): void {
965df41c 1069 /* Intentionally empty */
280c2a77 1070 }
c97c7edb 1071
729c563d 1072 /**
5bec72fd
JB
1073 * Returns whether the worker is the main worker or not.
1074 *
1075 * @returns `true` if the worker is the main worker, `false` otherwise.
280c2a77
S
1076 */
1077 protected abstract isMain (): boolean
1078
1079 /**
2e81254d 1080 * Hook executed before the worker task execution.
bf9549ae 1081 * Can be overridden.
729c563d 1082 *
f06e48d8 1083 * @param workerNodeKey - The worker node key.
1c6fe997 1084 * @param task - The task to execute.
729c563d 1085 */
1c6fe997
JB
1086 protected beforeTaskExecutionHook (
1087 workerNodeKey: number,
1088 task: Task<Data>
1089 ): void {
c63a35a0 1090 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1091 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1092 const workerUsage = this.workerNodes[workerNodeKey].usage
1093 ++workerUsage.tasks.executing
c329fd41
JB
1094 updateWaitTimeWorkerUsage(
1095 this.workerChoiceStrategyContext,
1096 workerUsage,
1097 task
1098 )
94407def
JB
1099 }
1100 if (
1101 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1102 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1103 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1104 null
94407def 1105 ) {
67f3f2d6 1106 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1107 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1108 workerNodeKey
67f3f2d6
JB
1109 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1110 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1111 ++taskFunctionWorkerUsage.tasks.executing
c329fd41
JB
1112 updateWaitTimeWorkerUsage(
1113 this.workerChoiceStrategyContext,
1114 taskFunctionWorkerUsage,
1115 task
1116 )
b558f6b5 1117 }
c97c7edb
S
1118 }
1119
c01733f1 1120 /**
2e81254d 1121 * Hook executed after the worker task execution.
bf9549ae 1122 * Can be overridden.
c01733f1 1123 *
501aea93 1124 * @param workerNodeKey - The worker node key.
38e795c1 1125 * @param message - The received message.
c01733f1 1126 */
2e81254d 1127 protected afterTaskExecutionHook (
501aea93 1128 workerNodeKey: number,
2740a743 1129 message: MessageValue<Response>
bf9549ae 1130 ): void {
c329fd41 1131 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1132 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1133 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1134 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1135 updateTaskStatisticsWorkerUsage(workerUsage, message)
1136 updateRunTimeWorkerUsage(
1137 this.workerChoiceStrategyContext,
1138 workerUsage,
1139 message
1140 )
1141 updateEluWorkerUsage(
1142 this.workerChoiceStrategyContext,
1143 workerUsage,
1144 message
1145 )
1146 needWorkerChoiceStrategyUpdate = true
94407def
JB
1147 }
1148 if (
1149 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1150 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1151 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1152 message.taskPerformance!.name
94407def
JB
1153 ) != null
1154 ) {
67f3f2d6 1155 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1156 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1157 workerNodeKey
67f3f2d6
JB
1158 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1159 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1160 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1161 updateRunTimeWorkerUsage(
1162 this.workerChoiceStrategyContext,
1163 taskFunctionWorkerUsage,
1164 message
1165 )
1166 updateEluWorkerUsage(
1167 this.workerChoiceStrategyContext,
1168 taskFunctionWorkerUsage,
1169 message
1170 )
1171 needWorkerChoiceStrategyUpdate = true
1172 }
1173 if (needWorkerChoiceStrategyUpdate) {
c63a35a0 1174 this.workerChoiceStrategyContext?.update(workerNodeKey)
b558f6b5
JB
1175 }
1176 }
1177
db0e38ee
JB
1178 /**
1179 * Whether the worker node shall update its task function worker usage or not.
1180 *
1181 * @param workerNodeKey - The worker node key.
1182 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1183 */
1184 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1185 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1186 return (
1851fed0 1187 workerInfo != null &&
6703b9f4
JB
1188 Array.isArray(workerInfo.taskFunctionNames) &&
1189 workerInfo.taskFunctionNames.length > 2
b558f6b5 1190 )
f1c06930
JB
1191 }
1192
280c2a77 1193 /**
f06e48d8 1194 * Chooses a worker node for the next task.
280c2a77 1195 *
6c6afb84 1196 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1197 *
aa9eede8 1198 * @returns The chosen worker node key
280c2a77 1199 */
6c6afb84 1200 private chooseWorkerNode (): number {
930dcf12 1201 if (this.shallCreateDynamicWorker()) {
aa9eede8 1202 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1203 if (
c63a35a0
JB
1204 this.workerChoiceStrategyContext?.getStrategyPolicy()
1205 .dynamicWorkerUsage === true
6c6afb84 1206 ) {
aa9eede8 1207 return workerNodeKey
6c6afb84 1208 }
17393ac8 1209 }
c63a35a0 1210 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
5d240702 1211 return this.workerChoiceStrategyContext!.execute()
930dcf12
JB
1212 }
1213
6c6afb84
JB
1214 /**
1215 * Conditions for dynamic worker creation.
1216 *
1217 * @returns Whether to create a dynamic worker or not.
1218 */
9d9fb7b6 1219 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1220
280c2a77 1221 /**
aa9eede8 1222 * Sends a message to worker given its worker node key.
280c2a77 1223 *
aa9eede8 1224 * @param workerNodeKey - The worker node key.
38e795c1 1225 * @param message - The message.
7d91a8cd 1226 * @param transferList - The optional array of transferable objects.
280c2a77
S
1227 */
1228 protected abstract sendToWorker (
aa9eede8 1229 workerNodeKey: number,
7d91a8cd 1230 message: MessageValue<Data>,
6a3ecc50 1231 transferList?: readonly TransferListItem[]
280c2a77
S
1232 ): void
1233
4a6952ff 1234 /**
aa9eede8 1235 * Creates a new, completely set up worker node.
4a6952ff 1236 *
aa9eede8 1237 * @returns New, completely set up worker node key.
4a6952ff 1238 */
aa9eede8 1239 protected createAndSetupWorkerNode (): number {
c3719753
JB
1240 const workerNode = this.createWorkerNode()
1241 workerNode.registerWorkerEventHandler(
1242 'online',
1243 this.opts.onlineHandler ?? EMPTY_FUNCTION
1244 )
1245 workerNode.registerWorkerEventHandler(
1246 'message',
1247 this.opts.messageHandler ?? EMPTY_FUNCTION
1248 )
1249 workerNode.registerWorkerEventHandler(
1250 'error',
1251 this.opts.errorHandler ?? EMPTY_FUNCTION
1252 )
b271fce0 1253 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1254 workerNode.info.ready = false
2a69b8c5 1255 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1256 if (
b6bfca01 1257 this.started &&
711623b8 1258 !this.destroying &&
9b38ab2d 1259 this.opts.restartWorkerOnError === true
15b176e0 1260 ) {
07e0c9e5 1261 if (workerNode.info.dynamic) {
aa9eede8 1262 this.createAndSetupDynamicWorkerNode()
b362a929 1263 } else if (!this.startingMinimumNumberOfWorkers) {
3d720596 1264 this.startMinimumNumberOfWorkers()
8a1260a3 1265 }
5baee0d7 1266 }
3c9123c7
JB
1267 if (
1268 this.started &&
1269 !this.destroying &&
1270 this.opts.enableTasksQueue === true
1271 ) {
9974369e 1272 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1273 }
b4f8aca8 1274 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
07f6f7b1 1275 workerNode?.terminate().catch((error: unknown) => {
07e0c9e5
JB
1276 this.emitter?.emit(PoolEvents.error, error)
1277 })
5baee0d7 1278 })
c3719753
JB
1279 workerNode.registerWorkerEventHandler(
1280 'exit',
1281 this.opts.exitHandler ?? EMPTY_FUNCTION
1282 )
1283 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1284 this.removeWorkerNode(workerNode)
b362a929
JB
1285 if (
1286 this.started &&
1287 !this.startingMinimumNumberOfWorkers &&
1288 !this.destroying
1289 ) {
60dc0a9c
JB
1290 this.startMinimumNumberOfWorkers()
1291 }
a974afa6 1292 })
c3719753 1293 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1294 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1295 return workerNodeKey
c97c7edb 1296 }
be0676b3 1297
930dcf12 1298 /**
aa9eede8 1299 * Creates a new, completely set up dynamic worker node.
930dcf12 1300 *
aa9eede8 1301 * @returns New, completely set up dynamic worker node key.
930dcf12 1302 */
aa9eede8
JB
1303 protected createAndSetupDynamicWorkerNode (): number {
1304 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1305 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1306 this.checkMessageWorkerId(message)
aa9eede8
JB
1307 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1308 message.workerId
aad6fb64 1309 )
2b6b412f 1310 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1311 // Kill message received from worker
930dcf12
JB
1312 if (
1313 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1314 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1315 ((this.opts.enableTasksQueue === false &&
aa9eede8 1316 workerUsage.tasks.executing === 0) ||
7b56f532 1317 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1318 workerUsage.tasks.executing === 0 &&
1319 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1320 ) {
f3827d5d 1321 // Flag the worker node as not ready immediately
ae3ab61d 1322 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
07f6f7b1 1323 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
5270d253
JB
1324 this.emitter?.emit(PoolEvents.error, error)
1325 })
930dcf12
JB
1326 }
1327 })
aa9eede8 1328 this.sendToWorker(workerNodeKey, {
e9dd5b66 1329 checkActive: true
21f710aa 1330 })
72ae84a2
JB
1331 if (this.taskFunctions.size > 0) {
1332 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1333 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1334 taskFunctionOperation: 'add',
1335 taskFunctionName,
1336 taskFunction: taskFunction.toString()
07f6f7b1 1337 }).catch((error: unknown) => {
72ae84a2
JB
1338 this.emitter?.emit(PoolEvents.error, error)
1339 })
1340 }
1341 }
e44639e9
JB
1342 const workerNode = this.workerNodes[workerNodeKey]
1343 workerNode.info.dynamic = true
b1aae695 1344 if (
c63a35a0
JB
1345 this.workerChoiceStrategyContext?.getStrategyPolicy()
1346 .dynamicWorkerReady === true ||
1347 this.workerChoiceStrategyContext?.getStrategyPolicy()
1348 .dynamicWorkerUsage === true
b1aae695 1349 ) {
e44639e9 1350 workerNode.info.ready = true
b5e113f6 1351 }
33e6bb4c 1352 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1353 return workerNodeKey
930dcf12
JB
1354 }
1355
a2ed5053 1356 /**
aa9eede8 1357 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1358 *
aa9eede8 1359 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1360 * @param listener - The message listener callback.
1361 */
85aeb3f3
JB
1362 protected abstract registerWorkerMessageListener<
1363 Message extends Data | Response
aa9eede8
JB
1364 >(
1365 workerNodeKey: number,
1366 listener: (message: MessageValue<Message>) => void
1367 ): void
a2ed5053 1368
ae036c3e
JB
1369 /**
1370 * Registers once a listener callback on the worker given its worker node key.
1371 *
1372 * @param workerNodeKey - The worker node key.
1373 * @param listener - The message listener callback.
1374 */
1375 protected abstract registerOnceWorkerMessageListener<
1376 Message extends Data | Response
1377 >(
1378 workerNodeKey: number,
1379 listener: (message: MessageValue<Message>) => void
1380 ): void
1381
1382 /**
1383 * Deregisters a listener callback on the worker given its worker node key.
1384 *
1385 * @param workerNodeKey - The worker node key.
1386 * @param listener - The message listener callback.
1387 */
1388 protected abstract deregisterWorkerMessageListener<
1389 Message extends Data | Response
1390 >(
1391 workerNodeKey: number,
1392 listener: (message: MessageValue<Message>) => void
1393 ): void
1394
a2ed5053 1395 /**
aa9eede8 1396 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1397 * Can be overridden.
1398 *
aa9eede8 1399 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1400 */
aa9eede8 1401 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1402 // Listen to worker messages.
fcd39179
JB
1403 this.registerWorkerMessageListener(
1404 workerNodeKey,
3a776b6d 1405 this.workerMessageListener
fcd39179 1406 )
85aeb3f3 1407 // Send the startup message to worker.
aa9eede8 1408 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1409 // Send the statistics message to worker.
1410 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1411 if (this.opts.enableTasksQueue === true) {
dbd73092 1412 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1413 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1414 'idle',
1415 this.handleWorkerNodeIdleEvent
9f95d5eb 1416 )
47352846
JB
1417 }
1418 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1419 this.workerNodes[workerNodeKey].on(
b5e75be8 1420 'backPressure',
e44639e9 1421 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1422 )
47352846 1423 }
72695f86 1424 }
d2c73f82
JB
1425 }
1426
85aeb3f3 1427 /**
aa9eede8
JB
1428 * Sends the startup message to worker given its worker node key.
1429 *
1430 * @param workerNodeKey - The worker node key.
1431 */
1432 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1433
1434 /**
9edb9717 1435 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1436 *
aa9eede8 1437 * @param workerNodeKey - The worker node key.
85aeb3f3 1438 */
9edb9717 1439 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1440 this.sendToWorker(workerNodeKey, {
1441 statistics: {
1442 runTime:
c63a35a0 1443 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements()
f4d0a470 1444 .runTime.aggregate ?? false,
c63a35a0 1445 elu:
f4d0a470 1446 this.workerChoiceStrategyContext?.getTaskStatisticsRequirements().elu
c63a35a0 1447 .aggregate ?? false
72ae84a2 1448 }
aa9eede8
JB
1449 })
1450 }
a2ed5053 1451
5eb72b9e
JB
1452 private cannotStealTask (): boolean {
1453 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1454 }
1455
42c677c1
JB
1456 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1457 if (this.shallExecuteTask(workerNodeKey)) {
1458 this.executeTask(workerNodeKey, task)
1459 } else {
1460 this.enqueueTask(workerNodeKey, task)
1461 }
1462 }
1463
a2ed5053 1464 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1465 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1466 return
1467 }
a2ed5053 1468 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1469 const destinationWorkerNodeKey = this.workerNodes.reduce(
1470 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1471 return workerNode.info.ready &&
1472 workerNode.usage.tasks.queued <
1473 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1474 ? workerNodeKey
1475 : minWorkerNodeKey
1476 },
1477 0
1478 )
42c677c1
JB
1479 this.handleTask(
1480 destinationWorkerNodeKey,
67f3f2d6
JB
1481 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1482 this.dequeueTask(workerNodeKey)!
42c677c1 1483 )
dd951876
JB
1484 }
1485 }
1486
b1838604
JB
1487 private updateTaskStolenStatisticsWorkerUsage (
1488 workerNodeKey: number,
b1838604
JB
1489 taskName: string
1490 ): void {
1a880eca 1491 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1492 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1493 if (workerNode?.usage != null) {
b1838604
JB
1494 ++workerNode.usage.tasks.stolen
1495 }
1496 if (
1497 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1498 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1499 ) {
67f3f2d6
JB
1500 const taskFunctionWorkerUsage =
1501 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1502 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1503 ++taskFunctionWorkerUsage.tasks.stolen
1504 }
1505 }
1506
463226a4 1507 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1508 workerNodeKey: number
463226a4
JB
1509 ): void {
1510 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1511 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1512 if (workerNode?.usage != null) {
463226a4
JB
1513 ++workerNode.usage.tasks.sequentiallyStolen
1514 }
f1f77f45
JB
1515 }
1516
1517 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1518 workerNodeKey: number,
1519 taskName: string
1520 ): void {
1521 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1522 if (
1523 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1524 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1525 ) {
67f3f2d6
JB
1526 const taskFunctionWorkerUsage =
1527 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1528 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1529 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1530 }
1531 }
1532
1533 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1534 workerNodeKey: number
463226a4
JB
1535 ): void {
1536 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1537 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1538 if (workerNode?.usage != null) {
463226a4
JB
1539 workerNode.usage.tasks.sequentiallyStolen = 0
1540 }
f1f77f45
JB
1541 }
1542
1543 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1544 workerNodeKey: number,
1545 taskName: string
1546 ): void {
1547 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1548 if (
1549 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1550 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1551 ) {
67f3f2d6
JB
1552 const taskFunctionWorkerUsage =
1553 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1554 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1555 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1556 }
1557 }
1558
e44639e9 1559 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1560 eventDetail: WorkerNodeEventDetail,
463226a4 1561 previousStolenTask?: Task<Data>
9f95d5eb 1562 ): void => {
e1c2dba7 1563 const { workerNodeKey } = eventDetail
463226a4
JB
1564 if (workerNodeKey == null) {
1565 throw new Error(
c63a35a0 1566 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1567 )
1568 }
c63a35a0 1569 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1570 if (
1571 this.cannotStealTask() ||
c63a35a0
JB
1572 (this.info.stealingWorkerNodes ?? 0) >
1573 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1574 ) {
1851fed0 1575 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1576 workerInfo.stealing = false
5eb72b9e
JB
1577 }
1578 return
1579 }
463226a4
JB
1580 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1581 if (
1851fed0 1582 workerInfo != null &&
463226a4
JB
1583 previousStolenTask != null &&
1584 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1585 (workerNodeTasksUsage.executing > 0 ||
1586 this.tasksQueueSize(workerNodeKey) > 0)
1587 ) {
c63a35a0 1588 workerInfo.stealing = false
67f3f2d6 1589 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
369179f6 1590 for (const taskName of workerInfo.taskFunctionNames!) {
f1f77f45
JB
1591 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1592 workerNodeKey,
1593 taskName
1594 )
1595 }
1596 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1597 return
1598 }
1851fed0
JB
1599 if (workerInfo == null) {
1600 throw new Error(
1601 `Worker node with key '${workerNodeKey}' not found in pool`
1602 )
1603 }
c63a35a0 1604 workerInfo.stealing = true
463226a4 1605 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1606 if (
1607 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1608 stolenTask != null
1609 ) {
67f3f2d6 1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1611 const taskFunctionTasksWorkerUsage = this.workerNodes[
1612 workerNodeKey
67f3f2d6
JB
1613 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1614 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1615 if (
1616 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1617 (previousStolenTask != null &&
1618 previousStolenTask.name === stolenTask.name &&
1619 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1620 ) {
1621 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1622 workerNodeKey,
67f3f2d6
JB
1623 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1624 stolenTask.name!
f1f77f45
JB
1625 )
1626 } else {
1627 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1628 workerNodeKey,
67f3f2d6
JB
1629 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1630 stolenTask.name!
f1f77f45
JB
1631 )
1632 }
1633 }
463226a4
JB
1634 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1635 .then(() => {
e44639e9 1636 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1637 return undefined
1638 })
07f6f7b1 1639 .catch((error: unknown) => {
2e8cfbb7
JB
1640 this.emitter?.emit(PoolEvents.error, error)
1641 })
463226a4
JB
1642 }
1643
1644 private readonly workerNodeStealTask = (
1645 workerNodeKey: number
1646 ): Task<Data> | undefined => {
dd951876 1647 const workerNodes = this.workerNodes
a6b3272b 1648 .slice()
dd951876
JB
1649 .sort(
1650 (workerNodeA, workerNodeB) =>
1651 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1652 )
f201a0cd 1653 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1654 (sourceWorkerNode, sourceWorkerNodeKey) =>
1655 sourceWorkerNode.info.ready &&
5eb72b9e 1656 !sourceWorkerNode.info.stealing &&
463226a4
JB
1657 sourceWorkerNodeKey !== workerNodeKey &&
1658 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1659 )
1660 if (sourceWorkerNode != null) {
67f3f2d6
JB
1661 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1662 const task = sourceWorkerNode.popTask()!
42c677c1 1663 this.handleTask(workerNodeKey, task)
f1f77f45 1664 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1665 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1666 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1667 return task
72695f86
JB
1668 }
1669 }
1670
e44639e9 1671 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1672 eventDetail: WorkerNodeEventDetail
9f95d5eb 1673 ): void => {
5eb72b9e
JB
1674 if (
1675 this.cannotStealTask() ||
c63a35a0
JB
1676 (this.info.stealingWorkerNodes ?? 0) >
1677 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1678 ) {
cb71d660
JB
1679 return
1680 }
e1c2dba7 1681 const { workerId } = eventDetail
f778c355 1682 const sizeOffset = 1
67f3f2d6
JB
1683 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1684 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1685 return
1686 }
72695f86 1687 const sourceWorkerNode =
b5e75be8 1688 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1689 const workerNodes = this.workerNodes
a6b3272b 1690 .slice()
72695f86
JB
1691 .sort(
1692 (workerNodeA, workerNodeB) =>
1693 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1694 )
1695 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1696 if (
0bc68267 1697 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1698 workerNode.info.ready &&
5eb72b9e 1699 !workerNode.info.stealing &&
b5e75be8 1700 workerNode.info.id !== workerId &&
0bc68267 1701 workerNode.usage.tasks.queued <
67f3f2d6
JB
1702 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1703 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1704 ) {
c63a35a0 1705 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1706 if (workerInfo == null) {
1707 throw new Error(
1708 `Worker node with key '${workerNodeKey}' not found in pool`
1709 )
1710 }
c63a35a0 1711 workerInfo.stealing = true
67f3f2d6
JB
1712 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1713 const task = sourceWorkerNode.popTask()!
42c677c1 1714 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1715 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1716 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1717 workerInfo.stealing = false
10ecf8fd 1718 }
a2ed5053
JB
1719 }
1720 }
1721
be0676b3 1722 /**
fcd39179 1723 * This method is the message listener registered on each worker.
be0676b3 1724 */
3a776b6d
JB
1725 protected readonly workerMessageListener = (
1726 message: MessageValue<Response>
1727 ): void => {
fcd39179 1728 this.checkMessageWorkerId(message)
b641345c
JB
1729 const { workerId, ready, taskId, taskFunctionNames } = message
1730 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1731 // Worker ready response received from worker
1732 this.handleWorkerReadyResponse(message)
b641345c 1733 } else if (taskId != null) {
fcd39179
JB
1734 // Task execution response received from worker
1735 this.handleTaskExecutionResponse(message)
b641345c 1736 } else if (taskFunctionNames != null) {
fcd39179 1737 // Task function names message received from worker
1851fed0 1738 const workerInfo = this.getWorkerInfo(
b641345c 1739 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1740 )
1741 if (workerInfo != null) {
1742 workerInfo.taskFunctionNames = taskFunctionNames
1743 }
6b272951
JB
1744 }
1745 }
1746
8e8d9101
JB
1747 private checkAndEmitReadyEvent (): void {
1748 if (!this.readyEventEmitted && this.ready) {
1749 this.emitter?.emit(PoolEvents.ready, this.info)
1750 this.readyEventEmitted = true
1751 }
1752 }
1753
10e2aa7e 1754 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4 1755 const { workerId, ready, taskFunctionNames } = message
e44639e9 1756 if (ready == null || !ready) {
c63a35a0 1757 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1758 }
e44639e9
JB
1759 const workerNode =
1760 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1761 workerNode.info.ready = ready
1762 workerNode.info.taskFunctionNames = taskFunctionNames
8e8d9101 1763 this.checkAndEmitReadyEvent()
6b272951
JB
1764 }
1765
1766 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1767 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1768 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1769 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1770 if (promiseResponse != null) {
f18fd12b 1771 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1772 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1773 if (workerError != null) {
1774 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1775 asyncResource != null
1776 ? asyncResource.runInAsyncScope(
1777 reject,
1778 this.emitter,
1779 workerError.message
1780 )
1781 : reject(workerError.message)
6b272951 1782 } else {
f18fd12b
JB
1783 asyncResource != null
1784 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1785 : resolve(data as Response)
6b272951 1786 }
f18fd12b 1787 asyncResource?.emitDestroy()
501aea93 1788 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1789 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1790 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1791 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1792 workerNode?.emit('taskFinished', taskId)
16d7e943
JB
1793 if (
1794 this.opts.enableTasksQueue === true &&
1795 !this.destroying &&
1796 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1797 workerNode != null
1798 ) {
9b358e72 1799 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1800 if (
1801 this.tasksQueueSize(workerNodeKey) > 0 &&
1802 workerNodeTasksUsage.executing <
67f3f2d6
JB
1803 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1804 this.opts.tasksQueueOptions!.concurrency!
463226a4 1805 ) {
67f3f2d6
JB
1806 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1807 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1808 }
1809 if (
1810 workerNodeTasksUsage.executing === 0 &&
1811 this.tasksQueueSize(workerNodeKey) === 0 &&
1812 workerNodeTasksUsage.sequentiallyStolen === 0
1813 ) {
e44639e9 1814 workerNode.emit('idle', {
7f0e1334 1815 workerId,
e1c2dba7
JB
1816 workerNodeKey
1817 })
463226a4 1818 }
be0676b3
APA
1819 }
1820 }
be0676b3 1821 }
7c0ba920 1822
a1763c54 1823 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1824 if (this.busy) {
1825 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1826 }
1827 }
1828
1829 private checkAndEmitTaskQueuingEvents (): void {
1830 if (this.hasBackPressure()) {
1831 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1832 }
1833 }
1834
d0878034
JB
1835 /**
1836 * Emits dynamic worker creation events.
1837 */
1838 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1839
8a1260a3 1840 /**
aa9eede8 1841 * Gets the worker information given its worker node key.
8a1260a3
JB
1842 *
1843 * @param workerNodeKey - The worker node key.
3f09ed9f 1844 * @returns The worker information.
8a1260a3 1845 */
1851fed0
JB
1846 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1847 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1848 }
1849
a05c10de 1850 /**
c3719753 1851 * Creates a worker node.
ea7a90d3 1852 *
c3719753 1853 * @returns The created worker node.
ea7a90d3 1854 */
c3719753 1855 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1856 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1857 this.worker,
1858 this.filePath,
1859 {
1860 env: this.opts.env,
1861 workerOptions: this.opts.workerOptions,
1862 tasksQueueBackPressureSize:
32b141fd 1863 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1864 getDefaultTasksQueueOptions(
1865 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1866 ).size
c3719753 1867 }
671d5154 1868 )
b97d82d8 1869 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1870 if (this.starting) {
1871 workerNode.info.ready = true
1872 }
c3719753
JB
1873 return workerNode
1874 }
1875
1876 /**
1877 * Adds the given worker node in the pool worker nodes.
1878 *
1879 * @param workerNode - The worker node.
1880 * @returns The added worker node key.
1881 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1882 */
1883 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1884 this.workerNodes.push(workerNode)
c3719753 1885 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1886 if (workerNodeKey === -1) {
86ed0598 1887 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1888 }
1889 return workerNodeKey
ea7a90d3 1890 }
c923ce56 1891
8e8d9101
JB
1892 private checkAndEmitEmptyEvent (): void {
1893 if (this.empty) {
1894 this.emitter?.emit(PoolEvents.empty, this.info)
1895 this.readyEventEmitted = false
1896 }
1897 }
1898
51fe3d3c 1899 /**
9974369e 1900 * Removes the worker node from the pool worker nodes.
51fe3d3c 1901 *
9974369e 1902 * @param workerNode - The worker node.
51fe3d3c 1903 */
9974369e
JB
1904 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1905 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1906 if (workerNodeKey !== -1) {
1907 this.workerNodes.splice(workerNodeKey, 1)
c63a35a0 1908 this.workerChoiceStrategyContext?.remove(workerNodeKey)
1f68cede 1909 }
8e8d9101 1910 this.checkAndEmitEmptyEvent()
51fe3d3c 1911 }
adc3c320 1912
ae3ab61d 1913 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
1914 const workerInfo = this.getWorkerInfo(workerNodeKey)
1915 if (workerInfo != null) {
1916 workerInfo.ready = false
1917 }
ae3ab61d
JB
1918 }
1919
9e844245
JB
1920 private hasBackPressure (): boolean {
1921 return (
1922 this.opts.enableTasksQueue === true &&
1923 this.workerNodes.findIndex(
041dc05b 1924 workerNode => !workerNode.hasBackPressure()
a1763c54 1925 ) === -1
9e844245 1926 )
e2b31e32
JB
1927 }
1928
b0a4db63 1929 /**
aa9eede8 1930 * Executes the given task on the worker given its worker node key.
b0a4db63 1931 *
aa9eede8 1932 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1933 * @param task - The task to execute.
1934 */
2e81254d 1935 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1936 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1937 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1938 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1939 }
1940
f9f00b5f 1941 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1942 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1943 this.checkAndEmitTaskQueuingEvents()
1944 return tasksQueueSize
adc3c320
JB
1945 }
1946
416fd65c 1947 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1948 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1949 }
1950
416fd65c 1951 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1952 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1953 }
1954
87347ea8
JB
1955 protected flushTasksQueue (workerNodeKey: number): number {
1956 let flushedTasks = 0
920278a2 1957 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
1958 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1959 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 1960 ++flushedTasks
ff733df7 1961 }
4b628b48 1962 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 1963 return flushedTasks
ff733df7
JB
1964 }
1965
ef41a6e6
JB
1966 private flushTasksQueues (): void {
1967 for (const [workerNodeKey] of this.workerNodes.entries()) {
1968 this.flushTasksQueue(workerNodeKey)
1969 }
1970 }
c97c7edb 1971}