perf: build worker choice strategies policy and task equirements on
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
ded253e2 1import { AsyncResource } from 'node:async_hooks'
2845f2a5 2import { randomUUID } from 'node:crypto'
ded253e2 3import { EventEmitterAsyncResource } from 'node:events'
62c15a68 4import { performance } from 'node:perf_hooks'
ef083f7b 5import type { TransferListItem } from 'node:worker_threads'
ded253e2 6
5c4d16da
JB
7import type {
8 MessageValue,
9 PromiseResponseWrapper,
31847469
JB
10 Task,
11 TaskFunctionProperties
d35e5717 12} from '../utility-types.js'
bbeadd16 13import {
ded253e2 14 average,
31847469 15 buildTaskFunctionProperties,
ff128cc9 16 DEFAULT_TASK_NAME,
bbeadd16 17 EMPTY_FUNCTION,
463226a4 18 exponentialDelay,
59317253 19 isKillBehavior,
0d80593b 20 isPlainObject,
90d6701c 21 max,
afe0d5bf 22 median,
90d6701c 23 min,
463226a4
JB
24 round,
25 sleep
d35e5717 26} from '../utils.js'
31847469
JB
27import type {
28 TaskFunction,
29 TaskFunctionObject
30} from '../worker/task-functions.js'
ded253e2 31import { KillBehaviors } from '../worker/worker-options.js'
c4855468 32import {
65d7a1c9 33 type IPool,
c4855468 34 PoolEvents,
6b27d407 35 type PoolInfo,
c4855468 36 type PoolOptions,
6b27d407
JB
37 type PoolType,
38 PoolTypes,
4b628b48 39 type TasksQueueOptions
d35e5717 40} from './pool.js'
a35560ba 41import {
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
d35e5717 46} from './selection-strategies/selection-strategies-types.js'
bcfb06ce 47import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
bde6b5d7
JB
48import {
49 checkFilePath,
50 checkValidTasksQueueOptions,
bfc75cca 51 checkValidWorkerChoiceStrategy,
32b141fd 52 getDefaultTasksQueueOptions,
c329fd41
JB
53 updateEluWorkerUsage,
54 updateRunTimeWorkerUsage,
55 updateTaskStatisticsWorkerUsage,
56 updateWaitTimeWorkerUsage,
87347ea8 57 waitWorkerNodeEvents
d35e5717 58} from './utils.js'
ded253e2
JB
59import { version } from './version.js'
60import type {
61 IWorker,
62 IWorkerNode,
63 WorkerInfo,
64 WorkerNodeEventDetail,
65 WorkerType
66} from './worker.js'
67import { WorkerNode } from './worker-node.js'
23ccf9d7 68
729c563d 69/**
ea7a90d3 70 * Base class that implements some shared logic for all poolifier pools.
729c563d 71 *
38e795c1 72 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
73 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
74 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 75 */
c97c7edb 76export abstract class AbstractPool<
f06e48d8 77 Worker extends IWorker,
d3c8a1a8
S
78 Data = unknown,
79 Response = unknown
c4855468 80> implements IPool<Worker, Data, Response> {
afc003b2 81 /** @inheritDoc */
4b628b48 82 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 83
afc003b2 84 /** @inheritDoc */
f80125ca 85 public emitter?: EventEmitterAsyncResource
7c0ba920 86
be0676b3 87 /**
419e8121 88 * The task execution response promise map:
2740a743 89 * - `key`: The message id of each submitted task.
bcfb06ce 90 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
be0676b3 91 *
a3445496 92 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 93 */
501aea93
JB
94 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
95 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 96
a35560ba 97 /**
bcfb06ce 98 * Worker choice strategies context referencing worker choice algorithms implementation.
a35560ba 99 */
bcfb06ce 100 protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
78cea37e
JB
101 Worker,
102 Data,
103 Response
a35560ba
S
104 >
105
72ae84a2
JB
106 /**
107 * The task functions added at runtime map:
108 * - `key`: The task function name.
31847469 109 * - `value`: The task function object.
72ae84a2 110 */
31847469
JB
111 private readonly taskFunctions: Map<
112 string,
113 TaskFunctionObject<Data, Response>
114 >
72ae84a2 115
15b176e0
JB
116 /**
117 * Whether the pool is started or not.
118 */
119 private started: boolean
47352846
JB
120 /**
121 * Whether the pool is starting or not.
122 */
123 private starting: boolean
711623b8
JB
124 /**
125 * Whether the pool is destroying or not.
126 */
127 private destroying: boolean
b362a929
JB
128 /**
129 * Whether the minimum number of workers is starting or not.
130 */
131 private startingMinimumNumberOfWorkers: boolean
55082af9
JB
132 /**
133 * Whether the pool ready event has been emitted or not.
134 */
135 private readyEventEmitted: boolean
afe0d5bf
JB
136 /**
137 * The start timestamp of the pool.
138 */
139 private readonly startTimestamp
140
729c563d
S
141 /**
142 * Constructs a new poolifier pool.
143 *
00e1bdeb 144 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 145 * @param filePath - Path to the worker file.
38e795c1 146 * @param opts - Options for the pool.
00e1bdeb 147 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 148 */
c97c7edb 149 public constructor (
26ce26ca 150 protected readonly minimumNumberOfWorkers: number,
b4213b7f 151 protected readonly filePath: string,
26ce26ca
JB
152 protected readonly opts: PoolOptions<Worker>,
153 protected readonly maximumNumberOfWorkers?: number
c97c7edb 154 ) {
78cea37e 155 if (!this.isMain()) {
04f45163 156 throw new Error(
8c6d4acf 157 'Cannot start a pool from a worker with the same type as the pool'
04f45163 158 )
c97c7edb 159 }
26ce26ca 160 this.checkPoolType()
bde6b5d7 161 checkFilePath(this.filePath)
26ce26ca 162 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 163 this.checkPoolOptions(this.opts)
1086026a 164
7254e419
JB
165 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
166 this.executeTask = this.executeTask.bind(this)
167 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 168
6bd72cd0 169 if (this.opts.enableEvents === true) {
b5604034 170 this.initializeEventEmitter()
7c0ba920 171 }
bcfb06ce 172 this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
d59df138
JB
173 Worker,
174 Data,
175 Response
da309861
JB
176 >(
177 this,
bcfb06ce
JB
178 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
179 [this.opts.workerChoiceStrategy!],
da309861
JB
180 this.opts.workerChoiceStrategyOptions
181 )
b6b32453
JB
182
183 this.setupHook()
184
31847469 185 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
72ae84a2 186
47352846 187 this.started = false
075e51d1 188 this.starting = false
711623b8 189 this.destroying = false
55082af9 190 this.readyEventEmitted = false
b362a929 191 this.startingMinimumNumberOfWorkers = false
47352846
JB
192 if (this.opts.startWorkers === true) {
193 this.start()
194 }
afe0d5bf
JB
195
196 this.startTimestamp = performance.now()
c97c7edb
S
197 }
198
26ce26ca
JB
199 private checkPoolType (): void {
200 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
201 throw new Error(
202 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
203 )
204 }
205 }
206
c63a35a0
JB
207 private checkMinimumNumberOfWorkers (
208 minimumNumberOfWorkers: number | undefined
209 ): void {
af7f2788 210 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
211 throw new Error(
212 'Cannot instantiate a pool without specifying the number of workers'
213 )
af7f2788 214 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 215 throw new TypeError(
0d80593b 216 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 217 )
af7f2788 218 } else if (minimumNumberOfWorkers < 0) {
473c717a 219 throw new RangeError(
8d3782fa
JB
220 'Cannot instantiate a pool with a negative number of workers'
221 )
af7f2788 222 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
223 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
224 }
225 }
226
7c0ba920 227 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 228 if (isPlainObject(opts)) {
47352846 229 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 230 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
231 this.opts.workerChoiceStrategy =
232 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 233 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 234 opts.workerChoiceStrategyOptions
2324f8c9 235 )
26ce26ca
JB
236 if (opts.workerChoiceStrategyOptions != null) {
237 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 238 }
1f68cede 239 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
240 this.opts.enableEvents = opts.enableEvents ?? true
241 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
242 if (this.opts.enableTasksQueue) {
c63a35a0 243 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 244 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 245 opts.tasksQueueOptions
0d80593b
JB
246 )
247 }
248 } else {
249 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 250 }
aee46736
JB
251 }
252
0d80593b 253 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 254 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 255 ): void {
2324f8c9
JB
256 if (
257 workerChoiceStrategyOptions != null &&
258 !isPlainObject(workerChoiceStrategyOptions)
259 ) {
0d80593b
JB
260 throw new TypeError(
261 'Invalid worker choice strategy options: must be a plain object'
262 )
263 }
49be33fe 264 if (
2324f8c9 265 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
266 Object.keys(workerChoiceStrategyOptions.weights).length !==
267 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
268 ) {
269 throw new Error(
270 'Invalid worker choice strategy options: must have a weight for each worker node'
271 )
272 }
f0d7f803 273 if (
2324f8c9 274 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
275 !Object.values(Measurements).includes(
276 workerChoiceStrategyOptions.measurement
277 )
278 ) {
279 throw new Error(
280 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
281 )
282 }
0d80593b
JB
283 }
284
b5604034 285 private initializeEventEmitter (): void {
5b7d00b4 286 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
287 name: `poolifier:${this.type}-${this.worker}-pool`
288 })
289 }
290
08f3f44c 291 /** @inheritDoc */
6b27d407
JB
292 public get info (): PoolInfo {
293 return {
23ccf9d7 294 version,
6b27d407 295 type: this.type,
184855e6 296 worker: this.worker,
47352846 297 started: this.started,
2431bdb4 298 ready: this.ready,
67f3f2d6 299 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
bcfb06ce
JB
300 defaultStrategy: this.opts.workerChoiceStrategy!,
301 strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
26ce26ca
JB
302 minSize: this.minimumNumberOfWorkers,
303 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
bcfb06ce 304 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 305 .runTime.aggregate === true &&
bcfb06ce 306 this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
c63a35a0
JB
307 .waitTime.aggregate && {
308 utilization: round(this.utilization)
309 }),
6b27d407
JB
310 workerNodes: this.workerNodes.length,
311 idleWorkerNodes: this.workerNodes.reduce(
312 (accumulator, workerNode) =>
f59e1027 313 workerNode.usage.tasks.executing === 0
a4e07f72
JB
314 ? accumulator + 1
315 : accumulator,
6b27d407
JB
316 0
317 ),
5eb72b9e
JB
318 ...(this.opts.enableTasksQueue === true && {
319 stealingWorkerNodes: this.workerNodes.reduce(
320 (accumulator, workerNode) =>
321 workerNode.info.stealing ? accumulator + 1 : accumulator,
322 0
323 )
324 }),
6b27d407 325 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
326 (accumulator, _workerNode, workerNodeKey) =>
327 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
328 0
329 ),
a4e07f72 330 executedTasks: this.workerNodes.reduce(
6b27d407 331 (accumulator, workerNode) =>
f59e1027 332 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
333 0
334 ),
335 executingTasks: this.workerNodes.reduce(
336 (accumulator, workerNode) =>
f59e1027 337 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
338 0
339 ),
daf86646
JB
340 ...(this.opts.enableTasksQueue === true && {
341 queuedTasks: this.workerNodes.reduce(
342 (accumulator, workerNode) =>
343 accumulator + workerNode.usage.tasks.queued,
344 0
345 )
346 }),
347 ...(this.opts.enableTasksQueue === true && {
348 maxQueuedTasks: this.workerNodes.reduce(
349 (accumulator, workerNode) =>
c63a35a0 350 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
351 0
352 )
353 }),
a1763c54
JB
354 ...(this.opts.enableTasksQueue === true && {
355 backPressure: this.hasBackPressure()
356 }),
68cbdc84
JB
357 ...(this.opts.enableTasksQueue === true && {
358 stolenTasks: this.workerNodes.reduce(
359 (accumulator, workerNode) =>
360 accumulator + workerNode.usage.tasks.stolen,
361 0
362 )
363 }),
a4e07f72
JB
364 failedTasks: this.workerNodes.reduce(
365 (accumulator, workerNode) =>
f59e1027 366 accumulator + workerNode.usage.tasks.failed,
a4e07f72 367 0
1dcf8b7b 368 ),
bcfb06ce 369 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 370 .runTime.aggregate === true && {
1dcf8b7b 371 runTime: {
98e72cda 372 minimum: round(
90d6701c 373 min(
98e72cda 374 ...this.workerNodes.map(
c63a35a0 375 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 376 )
1dcf8b7b
JB
377 )
378 ),
98e72cda 379 maximum: round(
90d6701c 380 max(
98e72cda 381 ...this.workerNodes.map(
c63a35a0 382 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 383 )
1dcf8b7b 384 )
98e72cda 385 ),
bcfb06ce 386 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
3baa0837
JB
387 .runTime.average && {
388 average: round(
389 average(
390 this.workerNodes.reduce<number[]>(
391 (accumulator, workerNode) =>
392 accumulator.concat(workerNode.usage.runTime.history),
393 []
394 )
98e72cda 395 )
dc021bcc 396 )
3baa0837 397 }),
bcfb06ce 398 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
98e72cda
JB
399 .runTime.median && {
400 median: round(
401 median(
3baa0837
JB
402 this.workerNodes.reduce<number[]>(
403 (accumulator, workerNode) =>
404 accumulator.concat(workerNode.usage.runTime.history),
405 []
98e72cda
JB
406 )
407 )
408 )
409 })
1dcf8b7b
JB
410 }
411 }),
bcfb06ce 412 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 413 .waitTime.aggregate === true && {
1dcf8b7b 414 waitTime: {
98e72cda 415 minimum: round(
90d6701c 416 min(
98e72cda 417 ...this.workerNodes.map(
c63a35a0 418 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 419 )
1dcf8b7b
JB
420 )
421 ),
98e72cda 422 maximum: round(
90d6701c 423 max(
98e72cda 424 ...this.workerNodes.map(
c63a35a0 425 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 426 )
1dcf8b7b 427 )
98e72cda 428 ),
bcfb06ce 429 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
3baa0837
JB
430 .waitTime.average && {
431 average: round(
432 average(
433 this.workerNodes.reduce<number[]>(
434 (accumulator, workerNode) =>
435 accumulator.concat(workerNode.usage.waitTime.history),
436 []
437 )
98e72cda 438 )
dc021bcc 439 )
3baa0837 440 }),
bcfb06ce 441 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
98e72cda
JB
442 .waitTime.median && {
443 median: round(
444 median(
3baa0837
JB
445 this.workerNodes.reduce<number[]>(
446 (accumulator, workerNode) =>
447 accumulator.concat(workerNode.usage.waitTime.history),
448 []
98e72cda
JB
449 )
450 )
451 )
452 })
1dcf8b7b
JB
453 }
454 })
6b27d407
JB
455 }
456 }
08f3f44c 457
aa9eede8
JB
458 /**
459 * The pool readiness boolean status.
460 */
2431bdb4 461 private get ready (): boolean {
8e8d9101
JB
462 if (this.empty) {
463 return false
464 }
2431bdb4 465 return (
b97d82d8
JB
466 this.workerNodes.reduce(
467 (accumulator, workerNode) =>
468 !workerNode.info.dynamic && workerNode.info.ready
469 ? accumulator + 1
470 : accumulator,
471 0
26ce26ca 472 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
473 )
474 }
475
8e8d9101
JB
476 /**
477 * The pool emptiness boolean status.
478 */
479 protected get empty (): boolean {
fb43035c 480 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
481 }
482
afe0d5bf 483 /**
aa9eede8 484 * The approximate pool utilization.
afe0d5bf
JB
485 *
486 * @returns The pool utilization.
487 */
488 private get utilization (): number {
8e5ca040 489 const poolTimeCapacity =
26ce26ca
JB
490 (performance.now() - this.startTimestamp) *
491 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
492 const totalTasksRunTime = this.workerNodes.reduce(
493 (accumulator, workerNode) =>
c63a35a0 494 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
495 0
496 )
497 const totalTasksWaitTime = this.workerNodes.reduce(
498 (accumulator, workerNode) =>
c63a35a0 499 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
500 0
501 )
8e5ca040 502 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
503 }
504
8881ae32 505 /**
aa9eede8 506 * The pool type.
8881ae32
JB
507 *
508 * If it is `'dynamic'`, it provides the `max` property.
509 */
510 protected abstract get type (): PoolType
511
184855e6 512 /**
aa9eede8 513 * The worker type.
184855e6
JB
514 */
515 protected abstract get worker (): WorkerType
516
6b813701
JB
517 /**
518 * Checks if the worker id sent in the received message from a worker is valid.
519 *
520 * @param message - The received message.
521 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
522 */
4d159167 523 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
524 if (message.workerId == null) {
525 throw new Error('Worker message received without worker id')
8e5a7cf9 526 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
527 throw new Error(
528 `Worker message received from unknown worker '${message.workerId}'`
529 )
530 }
531 }
532
aa9eede8
JB
533 /**
534 * Gets the worker node key given its worker id.
535 *
536 * @param workerId - The worker id.
aad6fb64 537 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 538 */
72ae84a2 539 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 540 return this.workerNodes.findIndex(
041dc05b 541 workerNode => workerNode.info.id === workerId
aad6fb64 542 )
aa9eede8
JB
543 }
544
afc003b2 545 /** @inheritDoc */
a35560ba 546 public setWorkerChoiceStrategy (
59219cbb
JB
547 workerChoiceStrategy: WorkerChoiceStrategy,
548 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 549 ): void {
19b8be8b 550 let requireSync = false
bde6b5d7 551 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
85bbc7ab 552 if (workerChoiceStrategyOptions != null) {
19b8be8b
JB
553 requireSync = this.setWorkerChoiceStrategyOptions(
554 workerChoiceStrategyOptions
555 )
85bbc7ab
JB
556 }
557 if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
558 this.opts.workerChoiceStrategy = workerChoiceStrategy
559 this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
560 this.opts.workerChoiceStrategy,
561 this.opts.workerChoiceStrategyOptions
562 )
19b8be8b
JB
563 requireSync = true
564 }
565 if (requireSync) {
85bbc7ab
JB
566 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
567 this.getWorkerWorkerChoiceStrategies(),
568 this.opts.workerChoiceStrategyOptions
569 )
19b8be8b 570 for (const workerNodeKey of this.workerNodes.keys()) {
85bbc7ab
JB
571 this.sendStatisticsMessageToWorker(workerNodeKey)
572 }
59219cbb 573 }
a20f0ba5
JB
574 }
575
576 /** @inheritDoc */
577 public setWorkerChoiceStrategyOptions (
c63a35a0 578 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
19b8be8b 579 ): boolean {
0d80593b 580 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
581 if (workerChoiceStrategyOptions != null) {
582 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
19b8be8b
JB
583 this.workerChoiceStrategiesContext?.setOptions(
584 this.opts.workerChoiceStrategyOptions
585 )
586 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
587 this.getWorkerWorkerChoiceStrategies(),
588 this.opts.workerChoiceStrategyOptions
589 )
590 for (const workerNodeKey of this.workerNodes.keys()) {
591 this.sendStatisticsMessageToWorker(workerNodeKey)
592 }
593 return true
8990357d 594 }
19b8be8b 595 return false
a35560ba
S
596 }
597
a20f0ba5 598 /** @inheritDoc */
8f52842f
JB
599 public enableTasksQueue (
600 enable: boolean,
601 tasksQueueOptions?: TasksQueueOptions
602 ): void {
a20f0ba5 603 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
604 this.unsetTaskStealing()
605 this.unsetTasksStealingOnBackPressure()
ef41a6e6 606 this.flushTasksQueues()
a20f0ba5
JB
607 }
608 this.opts.enableTasksQueue = enable
c63a35a0 609 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
610 }
611
612 /** @inheritDoc */
c63a35a0
JB
613 public setTasksQueueOptions (
614 tasksQueueOptions: TasksQueueOptions | undefined
615 ): void {
a20f0ba5 616 if (this.opts.enableTasksQueue === true) {
bde6b5d7 617 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
618 this.opts.tasksQueueOptions =
619 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
620 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
621 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 622 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 623 this.unsetTaskStealing()
d6ca1416
JB
624 this.setTaskStealing()
625 } else {
626 this.unsetTaskStealing()
627 }
628 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 629 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
630 this.setTasksStealingOnBackPressure()
631 } else {
632 this.unsetTasksStealingOnBackPressure()
633 }
5baee0d7 634 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
635 delete this.opts.tasksQueueOptions
636 }
637 }
638
9b38ab2d 639 private buildTasksQueueOptions (
c63a35a0 640 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
641 ): TasksQueueOptions {
642 return {
26ce26ca
JB
643 ...getDefaultTasksQueueOptions(
644 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
645 ),
9b38ab2d
JB
646 ...tasksQueueOptions
647 }
648 }
649
5b49e864 650 private setTasksQueueSize (size: number): void {
20c6f652 651 for (const workerNode of this.workerNodes) {
ff3f866a 652 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
653 }
654 }
655
d6ca1416 656 private setTaskStealing (): void {
19b8be8b 657 for (const workerNodeKey of this.workerNodes.keys()) {
e44639e9 658 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
659 }
660 }
661
662 private unsetTaskStealing (): void {
19b8be8b 663 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 664 this.workerNodes[workerNodeKey].off(
e44639e9
JB
665 'idle',
666 this.handleWorkerNodeIdleEvent
9f95d5eb 667 )
d6ca1416
JB
668 }
669 }
670
671 private setTasksStealingOnBackPressure (): void {
19b8be8b 672 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 673 this.workerNodes[workerNodeKey].on(
b5e75be8 674 'backPressure',
e44639e9 675 this.handleWorkerNodeBackPressureEvent
9f95d5eb 676 )
d6ca1416
JB
677 }
678 }
679
680 private unsetTasksStealingOnBackPressure (): void {
19b8be8b 681 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 682 this.workerNodes[workerNodeKey].off(
b5e75be8 683 'backPressure',
e44639e9 684 this.handleWorkerNodeBackPressureEvent
9f95d5eb 685 )
d6ca1416
JB
686 }
687 }
688
c319c66b
JB
689 /**
690 * Whether the pool is full or not.
691 *
692 * The pool filling boolean status.
693 */
dea903a8 694 protected get full (): boolean {
26ce26ca
JB
695 return (
696 this.workerNodes.length >=
697 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
698 )
dea903a8 699 }
c2ade475 700
c319c66b
JB
701 /**
702 * Whether the pool is busy or not.
703 *
704 * The pool busyness boolean status.
705 */
706 protected abstract get busy (): boolean
7c0ba920 707
6c6afb84 708 /**
3d76750a 709 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
710 *
711 * @returns Worker nodes busyness boolean status.
712 */
c2ade475 713 protected internalBusy (): boolean {
3d76750a
JB
714 if (this.opts.enableTasksQueue === true) {
715 return (
716 this.workerNodes.findIndex(
041dc05b 717 workerNode =>
3d76750a
JB
718 workerNode.info.ready &&
719 workerNode.usage.tasks.executing <
67f3f2d6
JB
720 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
721 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
722 ) === -1
723 )
3d76750a 724 }
419e8121
JB
725 return (
726 this.workerNodes.findIndex(
727 workerNode =>
728 workerNode.info.ready && workerNode.usage.tasks.executing === 0
729 ) === -1
730 )
cb70b19d
JB
731 }
732
42c677c1
JB
733 private isWorkerNodeBusy (workerNodeKey: number): boolean {
734 if (this.opts.enableTasksQueue === true) {
735 return (
736 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
737 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
738 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
739 )
740 }
741 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
742 }
743
e81c38f2 744 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
745 workerNodeKey: number,
746 message: MessageValue<Data>
747 ): Promise<boolean> {
72ae84a2 748 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
749 const taskFunctionOperationListener = (
750 message: MessageValue<Response>
751 ): void => {
752 this.checkMessageWorkerId(message)
1851fed0 753 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 754 if (
ae036c3e
JB
755 message.taskFunctionOperationStatus != null &&
756 message.workerId === workerId
72ae84a2 757 ) {
ae036c3e
JB
758 if (message.taskFunctionOperationStatus) {
759 resolve(true)
c63a35a0 760 } else {
ae036c3e
JB
761 reject(
762 new Error(
c63a35a0 763 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 764 )
72ae84a2 765 )
ae036c3e
JB
766 }
767 this.deregisterWorkerMessageListener(
768 this.getWorkerNodeKeyByWorkerId(message.workerId),
769 taskFunctionOperationListener
72ae84a2
JB
770 )
771 }
ae036c3e
JB
772 }
773 this.registerWorkerMessageListener(
774 workerNodeKey,
775 taskFunctionOperationListener
776 )
72ae84a2
JB
777 this.sendToWorker(workerNodeKey, message)
778 })
779 }
780
781 private async sendTaskFunctionOperationToWorkers (
adee6053 782 message: MessageValue<Data>
e81c38f2
JB
783 ): Promise<boolean> {
784 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
785 const responsesReceived = new Array<MessageValue<Response>>()
786 const taskFunctionOperationsListener = (
787 message: MessageValue<Response>
788 ): void => {
789 this.checkMessageWorkerId(message)
790 if (message.taskFunctionOperationStatus != null) {
791 responsesReceived.push(message)
792 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 793 if (
e81c38f2
JB
794 responsesReceived.every(
795 message => message.taskFunctionOperationStatus === true
796 )
797 ) {
798 resolve(true)
799 } else if (
e81c38f2
JB
800 responsesReceived.some(
801 message => message.taskFunctionOperationStatus === false
802 )
803 ) {
b0b55f57
JB
804 const errorResponse = responsesReceived.find(
805 response => response.taskFunctionOperationStatus === false
806 )
e81c38f2
JB
807 reject(
808 new Error(
b0b55f57 809 `Task function operation '${
e81c38f2 810 message.taskFunctionOperation as string
c63a35a0
JB
811 }' failed on worker ${errorResponse?.workerId} with error: '${
812 errorResponse?.workerError?.message
b0b55f57 813 }'`
e81c38f2
JB
814 )
815 )
816 }
ae036c3e
JB
817 this.deregisterWorkerMessageListener(
818 this.getWorkerNodeKeyByWorkerId(message.workerId),
819 taskFunctionOperationsListener
820 )
e81c38f2 821 }
ae036c3e
JB
822 }
823 }
19b8be8b 824 for (const workerNodeKey of this.workerNodes.keys()) {
ae036c3e
JB
825 this.registerWorkerMessageListener(
826 workerNodeKey,
827 taskFunctionOperationsListener
828 )
72ae84a2 829 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
830 }
831 })
6703b9f4
JB
832 }
833
834 /** @inheritDoc */
835 public hasTaskFunction (name: string): boolean {
85bbc7ab
JB
836 return this.listTaskFunctionsProperties().some(
837 taskFunctionProperties => taskFunctionProperties.name === name
838 )
6703b9f4
JB
839 }
840
841 /** @inheritDoc */
e81c38f2
JB
842 public async addTaskFunction (
843 name: string,
31847469 844 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
e81c38f2 845 ): Promise<boolean> {
3feeab69
JB
846 if (typeof name !== 'string') {
847 throw new TypeError('name argument must be a string')
848 }
849 if (typeof name === 'string' && name.trim().length === 0) {
850 throw new TypeError('name argument must not be an empty string')
851 }
31847469
JB
852 if (typeof fn === 'function') {
853 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
854 }
855 if (typeof fn.taskFunction !== 'function') {
856 throw new TypeError('taskFunction property must be a function')
3feeab69 857 }
adee6053 858 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4 859 taskFunctionOperation: 'add',
31847469
JB
860 taskFunctionProperties: buildTaskFunctionProperties(name, fn),
861 taskFunction: fn.taskFunction.toString()
6703b9f4 862 })
adee6053 863 this.taskFunctions.set(name, fn)
85bbc7ab
JB
864 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
865 this.getWorkerWorkerChoiceStrategies()
866 )
adee6053 867 return opResult
6703b9f4
JB
868 }
869
870 /** @inheritDoc */
e81c38f2 871 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
872 if (!this.taskFunctions.has(name)) {
873 throw new Error(
16248b23 874 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
875 )
876 }
adee6053 877 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4 878 taskFunctionOperation: 'remove',
31847469
JB
879 taskFunctionProperties: buildTaskFunctionProperties(
880 name,
881 this.taskFunctions.get(name)
882 )
6703b9f4 883 })
adee6053
JB
884 this.deleteTaskFunctionWorkerUsages(name)
885 this.taskFunctions.delete(name)
85bbc7ab
JB
886 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
887 this.getWorkerWorkerChoiceStrategies()
888 )
adee6053 889 return opResult
6703b9f4
JB
890 }
891
90d7d101 892 /** @inheritDoc */
31847469 893 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
f2dbbf95
JB
894 for (const workerNode of this.workerNodes) {
895 if (
31847469
JB
896 Array.isArray(workerNode.info.taskFunctionsProperties) &&
897 workerNode.info.taskFunctionsProperties.length > 0
f2dbbf95 898 ) {
31847469 899 return workerNode.info.taskFunctionsProperties
f2dbbf95 900 }
90d7d101 901 }
f2dbbf95 902 return []
90d7d101
JB
903 }
904
bcfb06ce
JB
905 /**
906 * Gets task function strategy, if any.
907 *
908 * @param name - The task function name.
909 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
910 */
911 private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
912 name?: string
913 ): WorkerChoiceStrategy | undefined => {
914 if (name != null) {
915 return this.listTaskFunctionsProperties().find(
916 (taskFunctionProperties: TaskFunctionProperties) =>
917 taskFunctionProperties.name === name
918 )?.strategy
919 }
920 }
921
85bbc7ab
JB
922 /**
923 * Gets the worker choice strategies registered in this pool.
924 *
925 * @returns The worker choice strategies.
926 */
927 private readonly getWorkerWorkerChoiceStrategies =
928 (): Set<WorkerChoiceStrategy> => {
929 return new Set([
930 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
931 this.opts.workerChoiceStrategy!,
932 ...(this.listTaskFunctionsProperties()
933 .map(
934 (taskFunctionProperties: TaskFunctionProperties) =>
935 taskFunctionProperties.strategy
936 )
937 .filter(
938 (strategy: WorkerChoiceStrategy | undefined) => strategy != null
939 ) as WorkerChoiceStrategy[])
940 ])
941 }
942
6703b9f4 943 /** @inheritDoc */
e81c38f2 944 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 945 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4 946 taskFunctionOperation: 'default',
31847469
JB
947 taskFunctionProperties: buildTaskFunctionProperties(
948 name,
949 this.taskFunctions.get(name)
950 )
6703b9f4 951 })
6703b9f4
JB
952 }
953
adee6053
JB
954 private deleteTaskFunctionWorkerUsages (name: string): void {
955 for (const workerNode of this.workerNodes) {
956 workerNode.deleteTaskFunctionWorkerUsage(name)
957 }
958 }
959
375f7504
JB
960 private shallExecuteTask (workerNodeKey: number): boolean {
961 return (
962 this.tasksQueueSize(workerNodeKey) === 0 &&
963 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
964 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
965 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
966 )
967 }
968
afc003b2 969 /** @inheritDoc */
7d91a8cd
JB
970 public async execute (
971 data?: Data,
972 name?: string,
6a3ecc50 973 transferList?: readonly TransferListItem[]
7d91a8cd 974 ): Promise<Response> {
52b71763 975 return await new Promise<Response>((resolve, reject) => {
15b176e0 976 if (!this.started) {
47352846 977 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 978 return
15b176e0 979 }
711623b8
JB
980 if (this.destroying) {
981 reject(new Error('Cannot execute a task on destroying pool'))
982 return
983 }
7d91a8cd
JB
984 if (name != null && typeof name !== 'string') {
985 reject(new TypeError('name argument must be a string'))
9d2d0da1 986 return
7d91a8cd 987 }
90d7d101
JB
988 if (
989 name != null &&
990 typeof name === 'string' &&
991 name.trim().length === 0
992 ) {
f58b60b9 993 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 994 return
90d7d101 995 }
b558f6b5
JB
996 if (transferList != null && !Array.isArray(transferList)) {
997 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 998 return
b558f6b5
JB
999 }
1000 const timestamp = performance.now()
bcfb06ce
JB
1001 const workerNodeKey = this.chooseWorkerNode(
1002 this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
1003 )
501aea93 1004 const task: Task<Data> = {
52b71763
JB
1005 name: name ?? DEFAULT_TASK_NAME,
1006 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
1007 data: data ?? ({} as Data),
7d91a8cd 1008 transferList,
52b71763 1009 timestamp,
7629bdf1 1010 taskId: randomUUID()
52b71763 1011 }
67f3f2d6
JB
1012 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1013 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
1014 resolve,
1015 reject,
f18fd12b
JB
1016 workerNodeKey,
1017 ...(this.emitter != null && {
1018 asyncResource: new AsyncResource('poolifier:task', {
1019 triggerAsyncId: this.emitter.asyncId,
1020 requireManualDestroy: true
1021 })
1022 })
2e81254d 1023 })
52b71763 1024 if (
4e377863
JB
1025 this.opts.enableTasksQueue === false ||
1026 (this.opts.enableTasksQueue === true &&
375f7504 1027 this.shallExecuteTask(workerNodeKey))
52b71763 1028 ) {
501aea93 1029 this.executeTask(workerNodeKey, task)
4e377863
JB
1030 } else {
1031 this.enqueueTask(workerNodeKey, task)
52b71763 1032 }
2e81254d 1033 })
280c2a77 1034 }
c97c7edb 1035
60dc0a9c
JB
1036 /**
1037 * Starts the minimum number of workers.
1038 */
1039 private startMinimumNumberOfWorkers (): void {
b362a929 1040 this.startingMinimumNumberOfWorkers = true
60dc0a9c
JB
1041 while (
1042 this.workerNodes.reduce(
1043 (accumulator, workerNode) =>
1044 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
1045 0
1046 ) < this.minimumNumberOfWorkers
1047 ) {
1048 this.createAndSetupWorkerNode()
1049 }
b362a929 1050 this.startingMinimumNumberOfWorkers = false
60dc0a9c
JB
1051 }
1052
47352846
JB
1053 /** @inheritdoc */
1054 public start (): void {
711623b8
JB
1055 if (this.started) {
1056 throw new Error('Cannot start an already started pool')
1057 }
1058 if (this.starting) {
1059 throw new Error('Cannot start an already starting pool')
1060 }
1061 if (this.destroying) {
1062 throw new Error('Cannot start a destroying pool')
1063 }
47352846 1064 this.starting = true
60dc0a9c 1065 this.startMinimumNumberOfWorkers()
47352846
JB
1066 this.starting = false
1067 this.started = true
1068 }
1069
afc003b2 1070 /** @inheritDoc */
c97c7edb 1071 public async destroy (): Promise<void> {
711623b8
JB
1072 if (!this.started) {
1073 throw new Error('Cannot destroy an already destroyed pool')
1074 }
1075 if (this.starting) {
1076 throw new Error('Cannot destroy an starting pool')
1077 }
1078 if (this.destroying) {
1079 throw new Error('Cannot destroy an already destroying pool')
1080 }
1081 this.destroying = true
1fbcaa7c 1082 await Promise.all(
14c39df1 1083 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 1084 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
1085 })
1086 )
33e6bb4c 1087 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 1088 this.emitter?.emitDestroy()
55082af9 1089 this.readyEventEmitted = false
711623b8 1090 this.destroying = false
15b176e0 1091 this.started = false
c97c7edb
S
1092 }
1093
26ce26ca 1094 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1095 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1096 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1097 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1098 resolve()
85b2561d
JB
1099 return
1100 }
ae036c3e
JB
1101 const killMessageListener = (message: MessageValue<Response>): void => {
1102 this.checkMessageWorkerId(message)
1e3214b6
JB
1103 if (message.kill === 'success') {
1104 resolve()
1105 } else if (message.kill === 'failure') {
72ae84a2
JB
1106 reject(
1107 new Error(
c63a35a0 1108 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1109 )
1110 )
1e3214b6 1111 }
ae036c3e 1112 }
51d9cfbd 1113 // FIXME: should be registered only once
ae036c3e 1114 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1115 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1116 })
1e3214b6
JB
1117 }
1118
4a6952ff 1119 /**
aa9eede8 1120 * Terminates the worker node given its worker node key.
4a6952ff 1121 *
aa9eede8 1122 * @param workerNodeKey - The worker node key.
4a6952ff 1123 */
07e0c9e5
JB
1124 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1125 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1126 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1127 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1128 await waitWorkerNodeEvents(
1129 workerNode,
1130 'taskFinished',
1131 flushedTasks,
1132 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1133 getDefaultTasksQueueOptions(
1134 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1135 ).tasksFinishedTimeout
32b141fd 1136 )
07e0c9e5
JB
1137 await this.sendKillMessageToWorker(workerNodeKey)
1138 await workerNode.terminate()
1139 }
c97c7edb 1140
729c563d 1141 /**
6677a3d3
JB
1142 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1143 * Can be overridden.
afc003b2
JB
1144 *
1145 * @virtual
729c563d 1146 */
280c2a77 1147 protected setupHook (): void {
965df41c 1148 /* Intentionally empty */
280c2a77 1149 }
c97c7edb 1150
729c563d 1151 /**
5bec72fd
JB
1152 * Returns whether the worker is the main worker or not.
1153 *
1154 * @returns `true` if the worker is the main worker, `false` otherwise.
280c2a77
S
1155 */
1156 protected abstract isMain (): boolean
1157
1158 /**
2e81254d 1159 * Hook executed before the worker task execution.
bf9549ae 1160 * Can be overridden.
729c563d 1161 *
f06e48d8 1162 * @param workerNodeKey - The worker node key.
1c6fe997 1163 * @param task - The task to execute.
729c563d 1164 */
1c6fe997
JB
1165 protected beforeTaskExecutionHook (
1166 workerNodeKey: number,
1167 task: Task<Data>
1168 ): void {
c63a35a0 1169 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1170 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1171 const workerUsage = this.workerNodes[workerNodeKey].usage
1172 ++workerUsage.tasks.executing
c329fd41 1173 updateWaitTimeWorkerUsage(
bcfb06ce 1174 this.workerChoiceStrategiesContext,
c329fd41
JB
1175 workerUsage,
1176 task
1177 )
94407def
JB
1178 }
1179 if (
1180 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1181 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1182 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1183 null
94407def 1184 ) {
67f3f2d6 1185 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1186 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1187 workerNodeKey
67f3f2d6
JB
1188 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1189 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1190 ++taskFunctionWorkerUsage.tasks.executing
c329fd41 1191 updateWaitTimeWorkerUsage(
bcfb06ce 1192 this.workerChoiceStrategiesContext,
c329fd41
JB
1193 taskFunctionWorkerUsage,
1194 task
1195 )
b558f6b5 1196 }
c97c7edb
S
1197 }
1198
c01733f1 1199 /**
2e81254d 1200 * Hook executed after the worker task execution.
bf9549ae 1201 * Can be overridden.
c01733f1 1202 *
501aea93 1203 * @param workerNodeKey - The worker node key.
38e795c1 1204 * @param message - The received message.
c01733f1 1205 */
2e81254d 1206 protected afterTaskExecutionHook (
501aea93 1207 workerNodeKey: number,
2740a743 1208 message: MessageValue<Response>
bf9549ae 1209 ): void {
c329fd41 1210 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1211 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1212 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1213 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1214 updateTaskStatisticsWorkerUsage(workerUsage, message)
1215 updateRunTimeWorkerUsage(
bcfb06ce 1216 this.workerChoiceStrategiesContext,
c329fd41
JB
1217 workerUsage,
1218 message
1219 )
1220 updateEluWorkerUsage(
bcfb06ce 1221 this.workerChoiceStrategiesContext,
c329fd41
JB
1222 workerUsage,
1223 message
1224 )
1225 needWorkerChoiceStrategyUpdate = true
94407def
JB
1226 }
1227 if (
1228 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1229 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1230 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1231 message.taskPerformance!.name
94407def
JB
1232 ) != null
1233 ) {
67f3f2d6 1234 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1235 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1236 workerNodeKey
67f3f2d6
JB
1237 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1238 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1239 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1240 updateRunTimeWorkerUsage(
bcfb06ce 1241 this.workerChoiceStrategiesContext,
c329fd41
JB
1242 taskFunctionWorkerUsage,
1243 message
1244 )
1245 updateEluWorkerUsage(
bcfb06ce 1246 this.workerChoiceStrategiesContext,
c329fd41
JB
1247 taskFunctionWorkerUsage,
1248 message
1249 )
1250 needWorkerChoiceStrategyUpdate = true
1251 }
1252 if (needWorkerChoiceStrategyUpdate) {
bcfb06ce 1253 this.workerChoiceStrategiesContext?.update(workerNodeKey)
b558f6b5
JB
1254 }
1255 }
1256
db0e38ee
JB
1257 /**
1258 * Whether the worker node shall update its task function worker usage or not.
1259 *
1260 * @param workerNodeKey - The worker node key.
1261 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1262 */
1263 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1264 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1265 return (
1851fed0 1266 workerInfo != null &&
31847469
JB
1267 Array.isArray(workerInfo.taskFunctionsProperties) &&
1268 workerInfo.taskFunctionsProperties.length > 2
b558f6b5 1269 )
f1c06930
JB
1270 }
1271
280c2a77 1272 /**
f06e48d8 1273 * Chooses a worker node for the next task.
280c2a77 1274 *
bcfb06ce 1275 * @param workerChoiceStrategy - The worker choice strategy.
aa9eede8 1276 * @returns The chosen worker node key
280c2a77 1277 */
bcfb06ce
JB
1278 private chooseWorkerNode (
1279 workerChoiceStrategy?: WorkerChoiceStrategy
1280 ): number {
930dcf12 1281 if (this.shallCreateDynamicWorker()) {
aa9eede8 1282 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1283 if (
bcfb06ce
JB
1284 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1285 true
6c6afb84 1286 ) {
aa9eede8 1287 return workerNodeKey
6c6afb84 1288 }
17393ac8 1289 }
c63a35a0 1290 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
bcfb06ce 1291 return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
930dcf12
JB
1292 }
1293
6c6afb84
JB
1294 /**
1295 * Conditions for dynamic worker creation.
1296 *
1297 * @returns Whether to create a dynamic worker or not.
1298 */
9d9fb7b6 1299 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1300
280c2a77 1301 /**
aa9eede8 1302 * Sends a message to worker given its worker node key.
280c2a77 1303 *
aa9eede8 1304 * @param workerNodeKey - The worker node key.
38e795c1 1305 * @param message - The message.
7d91a8cd 1306 * @param transferList - The optional array of transferable objects.
280c2a77
S
1307 */
1308 protected abstract sendToWorker (
aa9eede8 1309 workerNodeKey: number,
7d91a8cd 1310 message: MessageValue<Data>,
6a3ecc50 1311 transferList?: readonly TransferListItem[]
280c2a77
S
1312 ): void
1313
4a6952ff 1314 /**
aa9eede8 1315 * Creates a new, completely set up worker node.
4a6952ff 1316 *
aa9eede8 1317 * @returns New, completely set up worker node key.
4a6952ff 1318 */
aa9eede8 1319 protected createAndSetupWorkerNode (): number {
c3719753
JB
1320 const workerNode = this.createWorkerNode()
1321 workerNode.registerWorkerEventHandler(
1322 'online',
1323 this.opts.onlineHandler ?? EMPTY_FUNCTION
1324 )
1325 workerNode.registerWorkerEventHandler(
1326 'message',
1327 this.opts.messageHandler ?? EMPTY_FUNCTION
1328 )
1329 workerNode.registerWorkerEventHandler(
1330 'error',
1331 this.opts.errorHandler ?? EMPTY_FUNCTION
1332 )
b271fce0 1333 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1334 workerNode.info.ready = false
2a69b8c5 1335 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1336 if (
b6bfca01 1337 this.started &&
711623b8 1338 !this.destroying &&
9b38ab2d 1339 this.opts.restartWorkerOnError === true
15b176e0 1340 ) {
07e0c9e5 1341 if (workerNode.info.dynamic) {
aa9eede8 1342 this.createAndSetupDynamicWorkerNode()
b362a929 1343 } else if (!this.startingMinimumNumberOfWorkers) {
3d720596 1344 this.startMinimumNumberOfWorkers()
8a1260a3 1345 }
5baee0d7 1346 }
3c9123c7
JB
1347 if (
1348 this.started &&
1349 !this.destroying &&
1350 this.opts.enableTasksQueue === true
1351 ) {
9974369e 1352 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1353 }
b4f8aca8 1354 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
07f6f7b1 1355 workerNode?.terminate().catch((error: unknown) => {
07e0c9e5
JB
1356 this.emitter?.emit(PoolEvents.error, error)
1357 })
5baee0d7 1358 })
c3719753
JB
1359 workerNode.registerWorkerEventHandler(
1360 'exit',
1361 this.opts.exitHandler ?? EMPTY_FUNCTION
1362 )
1363 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1364 this.removeWorkerNode(workerNode)
b362a929
JB
1365 if (
1366 this.started &&
1367 !this.startingMinimumNumberOfWorkers &&
1368 !this.destroying
1369 ) {
60dc0a9c
JB
1370 this.startMinimumNumberOfWorkers()
1371 }
a974afa6 1372 })
c3719753 1373 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1374 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1375 return workerNodeKey
c97c7edb 1376 }
be0676b3 1377
930dcf12 1378 /**
aa9eede8 1379 * Creates a new, completely set up dynamic worker node.
930dcf12 1380 *
aa9eede8 1381 * @returns New, completely set up dynamic worker node key.
930dcf12 1382 */
aa9eede8
JB
1383 protected createAndSetupDynamicWorkerNode (): number {
1384 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1385 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1386 this.checkMessageWorkerId(message)
aa9eede8
JB
1387 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1388 message.workerId
aad6fb64 1389 )
2b6b412f 1390 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1391 // Kill message received from worker
930dcf12
JB
1392 if (
1393 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1394 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1395 ((this.opts.enableTasksQueue === false &&
aa9eede8 1396 workerUsage.tasks.executing === 0) ||
7b56f532 1397 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1398 workerUsage.tasks.executing === 0 &&
1399 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1400 ) {
f3827d5d 1401 // Flag the worker node as not ready immediately
ae3ab61d 1402 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
07f6f7b1 1403 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
5270d253
JB
1404 this.emitter?.emit(PoolEvents.error, error)
1405 })
930dcf12
JB
1406 }
1407 })
aa9eede8 1408 this.sendToWorker(workerNodeKey, {
e9dd5b66 1409 checkActive: true
21f710aa 1410 })
72ae84a2 1411 if (this.taskFunctions.size > 0) {
31847469 1412 for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
72ae84a2
JB
1413 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1414 taskFunctionOperation: 'add',
31847469
JB
1415 taskFunctionProperties: buildTaskFunctionProperties(
1416 taskFunctionName,
1417 taskFunctionObject
1418 ),
1419 taskFunction: taskFunctionObject.taskFunction.toString()
07f6f7b1 1420 }).catch((error: unknown) => {
72ae84a2
JB
1421 this.emitter?.emit(PoolEvents.error, error)
1422 })
1423 }
1424 }
e44639e9
JB
1425 const workerNode = this.workerNodes[workerNodeKey]
1426 workerNode.info.dynamic = true
b1aae695 1427 if (
bcfb06ce
JB
1428 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
1429 true ||
1430 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1431 true
b1aae695 1432 ) {
e44639e9 1433 workerNode.info.ready = true
b5e113f6 1434 }
33e6bb4c 1435 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1436 return workerNodeKey
930dcf12
JB
1437 }
1438
a2ed5053 1439 /**
aa9eede8 1440 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1441 *
aa9eede8 1442 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1443 * @param listener - The message listener callback.
1444 */
85aeb3f3
JB
1445 protected abstract registerWorkerMessageListener<
1446 Message extends Data | Response
aa9eede8
JB
1447 >(
1448 workerNodeKey: number,
1449 listener: (message: MessageValue<Message>) => void
1450 ): void
a2ed5053 1451
ae036c3e
JB
1452 /**
1453 * Registers once a listener callback on the worker given its worker node key.
1454 *
1455 * @param workerNodeKey - The worker node key.
1456 * @param listener - The message listener callback.
1457 */
1458 protected abstract registerOnceWorkerMessageListener<
1459 Message extends Data | Response
1460 >(
1461 workerNodeKey: number,
1462 listener: (message: MessageValue<Message>) => void
1463 ): void
1464
1465 /**
1466 * Deregisters a listener callback on the worker given its worker node key.
1467 *
1468 * @param workerNodeKey - The worker node key.
1469 * @param listener - The message listener callback.
1470 */
1471 protected abstract deregisterWorkerMessageListener<
1472 Message extends Data | Response
1473 >(
1474 workerNodeKey: number,
1475 listener: (message: MessageValue<Message>) => void
1476 ): void
1477
a2ed5053 1478 /**
aa9eede8 1479 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1480 * Can be overridden.
1481 *
aa9eede8 1482 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1483 */
aa9eede8 1484 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1485 // Listen to worker messages.
fcd39179
JB
1486 this.registerWorkerMessageListener(
1487 workerNodeKey,
3a776b6d 1488 this.workerMessageListener
fcd39179 1489 )
85aeb3f3 1490 // Send the startup message to worker.
aa9eede8 1491 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1492 // Send the statistics message to worker.
1493 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1494 if (this.opts.enableTasksQueue === true) {
dbd73092 1495 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1496 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1497 'idle',
1498 this.handleWorkerNodeIdleEvent
9f95d5eb 1499 )
47352846
JB
1500 }
1501 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1502 this.workerNodes[workerNodeKey].on(
b5e75be8 1503 'backPressure',
e44639e9 1504 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1505 )
47352846 1506 }
72695f86 1507 }
d2c73f82
JB
1508 }
1509
85aeb3f3 1510 /**
aa9eede8
JB
1511 * Sends the startup message to worker given its worker node key.
1512 *
1513 * @param workerNodeKey - The worker node key.
1514 */
1515 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1516
1517 /**
9edb9717 1518 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1519 *
aa9eede8 1520 * @param workerNodeKey - The worker node key.
85aeb3f3 1521 */
9edb9717 1522 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1523 this.sendToWorker(workerNodeKey, {
1524 statistics: {
1525 runTime:
bcfb06ce 1526 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 1527 .runTime.aggregate ?? false,
c63a35a0 1528 elu:
bcfb06ce
JB
1529 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1530 .elu.aggregate ?? false
72ae84a2 1531 }
aa9eede8
JB
1532 })
1533 }
a2ed5053 1534
5eb72b9e
JB
1535 private cannotStealTask (): boolean {
1536 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1537 }
1538
42c677c1
JB
1539 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1540 if (this.shallExecuteTask(workerNodeKey)) {
1541 this.executeTask(workerNodeKey, task)
1542 } else {
1543 this.enqueueTask(workerNodeKey, task)
1544 }
1545 }
1546
a2ed5053 1547 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1548 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1549 return
1550 }
a2ed5053 1551 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1552 const destinationWorkerNodeKey = this.workerNodes.reduce(
1553 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1554 return workerNode.info.ready &&
1555 workerNode.usage.tasks.queued <
1556 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1557 ? workerNodeKey
1558 : minWorkerNodeKey
1559 },
1560 0
1561 )
42c677c1
JB
1562 this.handleTask(
1563 destinationWorkerNodeKey,
67f3f2d6
JB
1564 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1565 this.dequeueTask(workerNodeKey)!
42c677c1 1566 )
dd951876
JB
1567 }
1568 }
1569
b1838604
JB
1570 private updateTaskStolenStatisticsWorkerUsage (
1571 workerNodeKey: number,
b1838604
JB
1572 taskName: string
1573 ): void {
1a880eca 1574 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1575 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1576 if (workerNode?.usage != null) {
b1838604
JB
1577 ++workerNode.usage.tasks.stolen
1578 }
1579 if (
1580 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1581 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1582 ) {
67f3f2d6
JB
1583 const taskFunctionWorkerUsage =
1584 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1585 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1586 ++taskFunctionWorkerUsage.tasks.stolen
1587 }
1588 }
1589
463226a4 1590 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1591 workerNodeKey: number
463226a4
JB
1592 ): void {
1593 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1594 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1595 if (workerNode?.usage != null) {
463226a4
JB
1596 ++workerNode.usage.tasks.sequentiallyStolen
1597 }
f1f77f45
JB
1598 }
1599
1600 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1601 workerNodeKey: number,
1602 taskName: string
1603 ): void {
1604 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1605 if (
1606 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1607 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1608 ) {
67f3f2d6
JB
1609 const taskFunctionWorkerUsage =
1610 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1611 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1612 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1613 }
1614 }
1615
1616 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1617 workerNodeKey: number
463226a4
JB
1618 ): void {
1619 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1620 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1621 if (workerNode?.usage != null) {
463226a4
JB
1622 workerNode.usage.tasks.sequentiallyStolen = 0
1623 }
f1f77f45
JB
1624 }
1625
1626 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1627 workerNodeKey: number,
1628 taskName: string
1629 ): void {
1630 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1631 if (
1632 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1633 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1634 ) {
67f3f2d6
JB
1635 const taskFunctionWorkerUsage =
1636 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1637 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1638 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1639 }
1640 }
1641
e44639e9 1642 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1643 eventDetail: WorkerNodeEventDetail,
463226a4 1644 previousStolenTask?: Task<Data>
9f95d5eb 1645 ): void => {
e1c2dba7 1646 const { workerNodeKey } = eventDetail
463226a4
JB
1647 if (workerNodeKey == null) {
1648 throw new Error(
c63a35a0 1649 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1650 )
1651 }
c63a35a0 1652 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1653 if (
1654 this.cannotStealTask() ||
c63a35a0
JB
1655 (this.info.stealingWorkerNodes ?? 0) >
1656 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1657 ) {
1851fed0 1658 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1659 workerInfo.stealing = false
5eb72b9e
JB
1660 }
1661 return
1662 }
463226a4
JB
1663 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1664 if (
1851fed0 1665 workerInfo != null &&
463226a4
JB
1666 previousStolenTask != null &&
1667 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1668 (workerNodeTasksUsage.executing > 0 ||
1669 this.tasksQueueSize(workerNodeKey) > 0)
1670 ) {
c63a35a0 1671 workerInfo.stealing = false
67f3f2d6 1672 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 1673 for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
f1f77f45
JB
1674 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1675 workerNodeKey,
31847469 1676 taskFunctionProperties.name
f1f77f45
JB
1677 )
1678 }
1679 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1680 return
1681 }
1851fed0
JB
1682 if (workerInfo == null) {
1683 throw new Error(
1684 `Worker node with key '${workerNodeKey}' not found in pool`
1685 )
1686 }
c63a35a0 1687 workerInfo.stealing = true
463226a4 1688 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1689 if (
1690 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1691 stolenTask != null
1692 ) {
67f3f2d6 1693 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1694 const taskFunctionTasksWorkerUsage = this.workerNodes[
1695 workerNodeKey
67f3f2d6
JB
1696 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1697 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1698 if (
1699 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1700 (previousStolenTask != null &&
1701 previousStolenTask.name === stolenTask.name &&
1702 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1703 ) {
1704 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1705 workerNodeKey,
67f3f2d6
JB
1706 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1707 stolenTask.name!
f1f77f45
JB
1708 )
1709 } else {
1710 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1711 workerNodeKey,
67f3f2d6
JB
1712 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1713 stolenTask.name!
f1f77f45
JB
1714 )
1715 }
1716 }
463226a4
JB
1717 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1718 .then(() => {
e44639e9 1719 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1720 return undefined
1721 })
07f6f7b1 1722 .catch((error: unknown) => {
2e8cfbb7
JB
1723 this.emitter?.emit(PoolEvents.error, error)
1724 })
463226a4
JB
1725 }
1726
1727 private readonly workerNodeStealTask = (
1728 workerNodeKey: number
1729 ): Task<Data> | undefined => {
dd951876 1730 const workerNodes = this.workerNodes
a6b3272b 1731 .slice()
dd951876
JB
1732 .sort(
1733 (workerNodeA, workerNodeB) =>
1734 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1735 )
f201a0cd 1736 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1737 (sourceWorkerNode, sourceWorkerNodeKey) =>
1738 sourceWorkerNode.info.ready &&
5eb72b9e 1739 !sourceWorkerNode.info.stealing &&
463226a4
JB
1740 sourceWorkerNodeKey !== workerNodeKey &&
1741 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1742 )
1743 if (sourceWorkerNode != null) {
67f3f2d6
JB
1744 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1745 const task = sourceWorkerNode.popTask()!
42c677c1 1746 this.handleTask(workerNodeKey, task)
f1f77f45 1747 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1748 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1749 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1750 return task
72695f86
JB
1751 }
1752 }
1753
e44639e9 1754 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1755 eventDetail: WorkerNodeEventDetail
9f95d5eb 1756 ): void => {
5eb72b9e
JB
1757 if (
1758 this.cannotStealTask() ||
c63a35a0
JB
1759 (this.info.stealingWorkerNodes ?? 0) >
1760 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1761 ) {
cb71d660
JB
1762 return
1763 }
e1c2dba7 1764 const { workerId } = eventDetail
f778c355 1765 const sizeOffset = 1
67f3f2d6
JB
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1768 return
1769 }
72695f86 1770 const sourceWorkerNode =
b5e75be8 1771 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1772 const workerNodes = this.workerNodes
a6b3272b 1773 .slice()
72695f86
JB
1774 .sort(
1775 (workerNodeA, workerNodeB) =>
1776 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1777 )
1778 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1779 if (
0bc68267 1780 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1781 workerNode.info.ready &&
5eb72b9e 1782 !workerNode.info.stealing &&
b5e75be8 1783 workerNode.info.id !== workerId &&
0bc68267 1784 workerNode.usage.tasks.queued <
67f3f2d6
JB
1785 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1786 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1787 ) {
c63a35a0 1788 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1789 if (workerInfo == null) {
1790 throw new Error(
1791 `Worker node with key '${workerNodeKey}' not found in pool`
1792 )
1793 }
c63a35a0 1794 workerInfo.stealing = true
67f3f2d6
JB
1795 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1796 const task = sourceWorkerNode.popTask()!
42c677c1 1797 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1798 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1799 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1800 workerInfo.stealing = false
10ecf8fd 1801 }
a2ed5053
JB
1802 }
1803 }
1804
be0676b3 1805 /**
fcd39179 1806 * This method is the message listener registered on each worker.
be0676b3 1807 */
3a776b6d
JB
1808 protected readonly workerMessageListener = (
1809 message: MessageValue<Response>
1810 ): void => {
fcd39179 1811 this.checkMessageWorkerId(message)
31847469
JB
1812 const { workerId, ready, taskId, taskFunctionsProperties } = message
1813 if (ready != null && taskFunctionsProperties != null) {
fcd39179
JB
1814 // Worker ready response received from worker
1815 this.handleWorkerReadyResponse(message)
31847469
JB
1816 } else if (taskFunctionsProperties != null) {
1817 // Task function properties message received from worker
1851fed0 1818 const workerInfo = this.getWorkerInfo(
b641345c 1819 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1820 )
1821 if (workerInfo != null) {
31847469 1822 workerInfo.taskFunctionsProperties = taskFunctionsProperties
1851fed0 1823 }
31847469
JB
1824 } else if (taskId != null) {
1825 // Task execution response received from worker
1826 this.handleTaskExecutionResponse(message)
6b272951
JB
1827 }
1828 }
1829
8e8d9101
JB
1830 private checkAndEmitReadyEvent (): void {
1831 if (!this.readyEventEmitted && this.ready) {
1832 this.emitter?.emit(PoolEvents.ready, this.info)
1833 this.readyEventEmitted = true
1834 }
1835 }
1836
10e2aa7e 1837 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
31847469 1838 const { workerId, ready, taskFunctionsProperties } = message
e44639e9 1839 if (ready == null || !ready) {
c63a35a0 1840 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1841 }
e44639e9
JB
1842 const workerNode =
1843 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1844 workerNode.info.ready = ready
31847469 1845 workerNode.info.taskFunctionsProperties = taskFunctionsProperties
8e8d9101 1846 this.checkAndEmitReadyEvent()
6b272951
JB
1847 }
1848
1849 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1850 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1851 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1852 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1853 if (promiseResponse != null) {
f18fd12b 1854 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1855 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1856 if (workerError != null) {
1857 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1858 asyncResource != null
1859 ? asyncResource.runInAsyncScope(
1860 reject,
1861 this.emitter,
1862 workerError.message
1863 )
1864 : reject(workerError.message)
6b272951 1865 } else {
f18fd12b
JB
1866 asyncResource != null
1867 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1868 : resolve(data as Response)
6b272951 1869 }
f18fd12b 1870 asyncResource?.emitDestroy()
501aea93 1871 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1872 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1873 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1874 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1875 workerNode?.emit('taskFinished', taskId)
16d7e943
JB
1876 if (
1877 this.opts.enableTasksQueue === true &&
1878 !this.destroying &&
1879 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1880 workerNode != null
1881 ) {
9b358e72 1882 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1883 if (
1884 this.tasksQueueSize(workerNodeKey) > 0 &&
1885 workerNodeTasksUsage.executing <
67f3f2d6
JB
1886 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1887 this.opts.tasksQueueOptions!.concurrency!
463226a4 1888 ) {
67f3f2d6
JB
1889 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1890 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1891 }
1892 if (
1893 workerNodeTasksUsage.executing === 0 &&
1894 this.tasksQueueSize(workerNodeKey) === 0 &&
1895 workerNodeTasksUsage.sequentiallyStolen === 0
1896 ) {
e44639e9 1897 workerNode.emit('idle', {
7f0e1334 1898 workerId,
e1c2dba7
JB
1899 workerNodeKey
1900 })
463226a4 1901 }
be0676b3
APA
1902 }
1903 }
be0676b3 1904 }
7c0ba920 1905
a1763c54 1906 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1907 if (this.busy) {
1908 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1909 }
1910 }
1911
1912 private checkAndEmitTaskQueuingEvents (): void {
1913 if (this.hasBackPressure()) {
1914 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1915 }
1916 }
1917
d0878034
JB
1918 /**
1919 * Emits dynamic worker creation events.
1920 */
1921 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1922
8a1260a3 1923 /**
aa9eede8 1924 * Gets the worker information given its worker node key.
8a1260a3
JB
1925 *
1926 * @param workerNodeKey - The worker node key.
3f09ed9f 1927 * @returns The worker information.
8a1260a3 1928 */
1851fed0
JB
1929 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1930 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1931 }
1932
a05c10de 1933 /**
c3719753 1934 * Creates a worker node.
ea7a90d3 1935 *
c3719753 1936 * @returns The created worker node.
ea7a90d3 1937 */
c3719753 1938 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1939 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1940 this.worker,
1941 this.filePath,
1942 {
1943 env: this.opts.env,
1944 workerOptions: this.opts.workerOptions,
1945 tasksQueueBackPressureSize:
32b141fd 1946 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1947 getDefaultTasksQueueOptions(
1948 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1949 ).size
c3719753 1950 }
671d5154 1951 )
b97d82d8 1952 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1953 if (this.starting) {
1954 workerNode.info.ready = true
1955 }
c3719753
JB
1956 return workerNode
1957 }
1958
1959 /**
1960 * Adds the given worker node in the pool worker nodes.
1961 *
1962 * @param workerNode - The worker node.
1963 * @returns The added worker node key.
1964 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1965 */
1966 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1967 this.workerNodes.push(workerNode)
c3719753 1968 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1969 if (workerNodeKey === -1) {
86ed0598 1970 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1971 }
1972 return workerNodeKey
ea7a90d3 1973 }
c923ce56 1974
8e8d9101
JB
1975 private checkAndEmitEmptyEvent (): void {
1976 if (this.empty) {
1977 this.emitter?.emit(PoolEvents.empty, this.info)
1978 this.readyEventEmitted = false
1979 }
1980 }
1981
51fe3d3c 1982 /**
9974369e 1983 * Removes the worker node from the pool worker nodes.
51fe3d3c 1984 *
9974369e 1985 * @param workerNode - The worker node.
51fe3d3c 1986 */
9974369e
JB
1987 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
1988 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
1989 if (workerNodeKey !== -1) {
1990 this.workerNodes.splice(workerNodeKey, 1)
bcfb06ce 1991 this.workerChoiceStrategiesContext?.remove(workerNodeKey)
1f68cede 1992 }
8e8d9101 1993 this.checkAndEmitEmptyEvent()
51fe3d3c 1994 }
adc3c320 1995
ae3ab61d 1996 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
1997 const workerInfo = this.getWorkerInfo(workerNodeKey)
1998 if (workerInfo != null) {
1999 workerInfo.ready = false
2000 }
ae3ab61d
JB
2001 }
2002
9e844245
JB
2003 private hasBackPressure (): boolean {
2004 return (
2005 this.opts.enableTasksQueue === true &&
2006 this.workerNodes.findIndex(
041dc05b 2007 workerNode => !workerNode.hasBackPressure()
a1763c54 2008 ) === -1
9e844245 2009 )
e2b31e32
JB
2010 }
2011
b0a4db63 2012 /**
aa9eede8 2013 * Executes the given task on the worker given its worker node key.
b0a4db63 2014 *
aa9eede8 2015 * @param workerNodeKey - The worker node key.
b0a4db63
JB
2016 * @param task - The task to execute.
2017 */
2e81254d 2018 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 2019 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 2020 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 2021 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
2022 }
2023
f9f00b5f 2024 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
2025 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
2026 this.checkAndEmitTaskQueuingEvents()
2027 return tasksQueueSize
adc3c320
JB
2028 }
2029
416fd65c 2030 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 2031 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
2032 }
2033
416fd65c 2034 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 2035 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
2036 }
2037
87347ea8
JB
2038 protected flushTasksQueue (workerNodeKey: number): number {
2039 let flushedTasks = 0
920278a2 2040 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
2041 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2042 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 2043 ++flushedTasks
ff733df7 2044 }
4b628b48 2045 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 2046 return flushedTasks
ff733df7
JB
2047 }
2048
ef41a6e6 2049 private flushTasksQueues (): void {
19b8be8b 2050 for (const workerNodeKey of this.workerNodes.keys()) {
ef41a6e6
JB
2051 this.flushTasksQueue(workerNodeKey)
2052 }
2053 }
c97c7edb 2054}