feat: use priority queue for task queueing
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
ded253e2 1import { AsyncResource } from 'node:async_hooks'
2845f2a5 2import { randomUUID } from 'node:crypto'
ded253e2 3import { EventEmitterAsyncResource } from 'node:events'
62c15a68 4import { performance } from 'node:perf_hooks'
ef083f7b 5import type { TransferListItem } from 'node:worker_threads'
ded253e2 6
5c4d16da
JB
7import type {
8 MessageValue,
9 PromiseResponseWrapper,
31847469
JB
10 Task,
11 TaskFunctionProperties
d35e5717 12} from '../utility-types.js'
bbeadd16 13import {
ded253e2 14 average,
31847469 15 buildTaskFunctionProperties,
ff128cc9 16 DEFAULT_TASK_NAME,
bbeadd16 17 EMPTY_FUNCTION,
463226a4 18 exponentialDelay,
59317253 19 isKillBehavior,
0d80593b 20 isPlainObject,
90d6701c 21 max,
afe0d5bf 22 median,
90d6701c 23 min,
463226a4
JB
24 round,
25 sleep
d35e5717 26} from '../utils.js'
31847469
JB
27import type {
28 TaskFunction,
29 TaskFunctionObject
30} from '../worker/task-functions.js'
ded253e2 31import { KillBehaviors } from '../worker/worker-options.js'
c4855468 32import {
65d7a1c9 33 type IPool,
c4855468 34 PoolEvents,
6b27d407 35 type PoolInfo,
c4855468 36 type PoolOptions,
6b27d407
JB
37 type PoolType,
38 PoolTypes,
4b628b48 39 type TasksQueueOptions
d35e5717 40} from './pool.js'
a35560ba 41import {
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
d35e5717 46} from './selection-strategies/selection-strategies-types.js'
bcfb06ce 47import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
bde6b5d7
JB
48import {
49 checkFilePath,
95d1a734 50 checkValidPriority,
bde6b5d7 51 checkValidTasksQueueOptions,
bfc75cca 52 checkValidWorkerChoiceStrategy,
32b141fd 53 getDefaultTasksQueueOptions,
c329fd41
JB
54 updateEluWorkerUsage,
55 updateRunTimeWorkerUsage,
56 updateTaskStatisticsWorkerUsage,
57 updateWaitTimeWorkerUsage,
87347ea8 58 waitWorkerNodeEvents
d35e5717 59} from './utils.js'
ded253e2
JB
60import { version } from './version.js'
61import type {
62 IWorker,
63 IWorkerNode,
64 WorkerInfo,
65 WorkerNodeEventDetail,
66 WorkerType
67} from './worker.js'
68import { WorkerNode } from './worker-node.js'
23ccf9d7 69
729c563d 70/**
ea7a90d3 71 * Base class that implements some shared logic for all poolifier pools.
729c563d 72 *
38e795c1 73 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
74 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
75 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 76 */
c97c7edb 77export abstract class AbstractPool<
f06e48d8 78 Worker extends IWorker,
d3c8a1a8
S
79 Data = unknown,
80 Response = unknown
c4855468 81> implements IPool<Worker, Data, Response> {
afc003b2 82 /** @inheritDoc */
4b628b48 83 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 84
afc003b2 85 /** @inheritDoc */
f80125ca 86 public emitter?: EventEmitterAsyncResource
7c0ba920 87
be0676b3 88 /**
419e8121 89 * The task execution response promise map:
2740a743 90 * - `key`: The message id of each submitted task.
bcfb06ce 91 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
be0676b3 92 *
a3445496 93 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 94 */
501aea93
JB
95 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
96 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 97
a35560ba 98 /**
bcfb06ce 99 * Worker choice strategies context referencing worker choice algorithms implementation.
a35560ba 100 */
bcfb06ce 101 protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
78cea37e
JB
102 Worker,
103 Data,
104 Response
a35560ba
S
105 >
106
72ae84a2
JB
107 /**
108 * The task functions added at runtime map:
109 * - `key`: The task function name.
31847469 110 * - `value`: The task function object.
72ae84a2 111 */
31847469
JB
112 private readonly taskFunctions: Map<
113 string,
114 TaskFunctionObject<Data, Response>
115 >
72ae84a2 116
15b176e0
JB
117 /**
118 * Whether the pool is started or not.
119 */
120 private started: boolean
47352846
JB
121 /**
122 * Whether the pool is starting or not.
123 */
124 private starting: boolean
711623b8
JB
125 /**
126 * Whether the pool is destroying or not.
127 */
128 private destroying: boolean
b362a929
JB
129 /**
130 * Whether the minimum number of workers is starting or not.
131 */
132 private startingMinimumNumberOfWorkers: boolean
55082af9
JB
133 /**
134 * Whether the pool ready event has been emitted or not.
135 */
136 private readyEventEmitted: boolean
afe0d5bf
JB
137 /**
138 * The start timestamp of the pool.
139 */
140 private readonly startTimestamp
141
729c563d
S
142 /**
143 * Constructs a new poolifier pool.
144 *
00e1bdeb 145 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 146 * @param filePath - Path to the worker file.
38e795c1 147 * @param opts - Options for the pool.
00e1bdeb 148 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 149 */
c97c7edb 150 public constructor (
26ce26ca 151 protected readonly minimumNumberOfWorkers: number,
b4213b7f 152 protected readonly filePath: string,
26ce26ca
JB
153 protected readonly opts: PoolOptions<Worker>,
154 protected readonly maximumNumberOfWorkers?: number
c97c7edb 155 ) {
78cea37e 156 if (!this.isMain()) {
04f45163 157 throw new Error(
8c6d4acf 158 'Cannot start a pool from a worker with the same type as the pool'
04f45163 159 )
c97c7edb 160 }
26ce26ca 161 this.checkPoolType()
bde6b5d7 162 checkFilePath(this.filePath)
26ce26ca 163 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 164 this.checkPoolOptions(this.opts)
1086026a 165
7254e419
JB
166 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
167 this.executeTask = this.executeTask.bind(this)
168 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 169
6bd72cd0 170 if (this.opts.enableEvents === true) {
b5604034 171 this.initializeEventEmitter()
7c0ba920 172 }
bcfb06ce 173 this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
d59df138
JB
174 Worker,
175 Data,
176 Response
da309861
JB
177 >(
178 this,
bcfb06ce
JB
179 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
180 [this.opts.workerChoiceStrategy!],
da309861
JB
181 this.opts.workerChoiceStrategyOptions
182 )
b6b32453
JB
183
184 this.setupHook()
185
31847469 186 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
72ae84a2 187
47352846 188 this.started = false
075e51d1 189 this.starting = false
711623b8 190 this.destroying = false
55082af9 191 this.readyEventEmitted = false
b362a929 192 this.startingMinimumNumberOfWorkers = false
47352846
JB
193 if (this.opts.startWorkers === true) {
194 this.start()
195 }
afe0d5bf
JB
196
197 this.startTimestamp = performance.now()
c97c7edb
S
198 }
199
26ce26ca
JB
200 private checkPoolType (): void {
201 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
202 throw new Error(
203 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
204 )
205 }
206 }
207
c63a35a0
JB
208 private checkMinimumNumberOfWorkers (
209 minimumNumberOfWorkers: number | undefined
210 ): void {
af7f2788 211 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
212 throw new Error(
213 'Cannot instantiate a pool without specifying the number of workers'
214 )
af7f2788 215 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 216 throw new TypeError(
0d80593b 217 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 218 )
af7f2788 219 } else if (minimumNumberOfWorkers < 0) {
473c717a 220 throw new RangeError(
8d3782fa
JB
221 'Cannot instantiate a pool with a negative number of workers'
222 )
af7f2788 223 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
224 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
225 }
226 }
227
7c0ba920 228 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 229 if (isPlainObject(opts)) {
47352846 230 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 231 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
232 this.opts.workerChoiceStrategy =
233 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 234 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 235 opts.workerChoiceStrategyOptions
2324f8c9 236 )
26ce26ca
JB
237 if (opts.workerChoiceStrategyOptions != null) {
238 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 239 }
1f68cede 240 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
241 this.opts.enableEvents = opts.enableEvents ?? true
242 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
243 if (this.opts.enableTasksQueue) {
c63a35a0 244 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 245 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 246 opts.tasksQueueOptions
0d80593b
JB
247 )
248 }
249 } else {
250 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 251 }
aee46736
JB
252 }
253
0d80593b 254 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 255 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 256 ): void {
2324f8c9
JB
257 if (
258 workerChoiceStrategyOptions != null &&
259 !isPlainObject(workerChoiceStrategyOptions)
260 ) {
0d80593b
JB
261 throw new TypeError(
262 'Invalid worker choice strategy options: must be a plain object'
263 )
264 }
49be33fe 265 if (
2324f8c9 266 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
267 Object.keys(workerChoiceStrategyOptions.weights).length !==
268 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
269 ) {
270 throw new Error(
271 'Invalid worker choice strategy options: must have a weight for each worker node'
272 )
273 }
f0d7f803 274 if (
2324f8c9 275 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
276 !Object.values(Measurements).includes(
277 workerChoiceStrategyOptions.measurement
278 )
279 ) {
280 throw new Error(
281 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
282 )
283 }
0d80593b
JB
284 }
285
b5604034 286 private initializeEventEmitter (): void {
5b7d00b4 287 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
288 name: `poolifier:${this.type}-${this.worker}-pool`
289 })
290 }
291
08f3f44c 292 /** @inheritDoc */
6b27d407
JB
293 public get info (): PoolInfo {
294 return {
23ccf9d7 295 version,
6b27d407 296 type: this.type,
184855e6 297 worker: this.worker,
47352846 298 started: this.started,
2431bdb4 299 ready: this.ready,
67f3f2d6 300 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
bcfb06ce
JB
301 defaultStrategy: this.opts.workerChoiceStrategy!,
302 strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
26ce26ca
JB
303 minSize: this.minimumNumberOfWorkers,
304 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
bcfb06ce 305 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 306 .runTime.aggregate === true &&
bcfb06ce 307 this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
c63a35a0
JB
308 .waitTime.aggregate && {
309 utilization: round(this.utilization)
310 }),
6b27d407
JB
311 workerNodes: this.workerNodes.length,
312 idleWorkerNodes: this.workerNodes.reduce(
313 (accumulator, workerNode) =>
f59e1027 314 workerNode.usage.tasks.executing === 0
a4e07f72
JB
315 ? accumulator + 1
316 : accumulator,
6b27d407
JB
317 0
318 ),
5eb72b9e
JB
319 ...(this.opts.enableTasksQueue === true && {
320 stealingWorkerNodes: this.workerNodes.reduce(
321 (accumulator, workerNode) =>
322 workerNode.info.stealing ? accumulator + 1 : accumulator,
323 0
324 )
325 }),
6b27d407 326 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
327 (accumulator, _workerNode, workerNodeKey) =>
328 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
329 0
330 ),
a4e07f72 331 executedTasks: this.workerNodes.reduce(
6b27d407 332 (accumulator, workerNode) =>
f59e1027 333 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
334 0
335 ),
336 executingTasks: this.workerNodes.reduce(
337 (accumulator, workerNode) =>
f59e1027 338 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
339 0
340 ),
daf86646
JB
341 ...(this.opts.enableTasksQueue === true && {
342 queuedTasks: this.workerNodes.reduce(
343 (accumulator, workerNode) =>
344 accumulator + workerNode.usage.tasks.queued,
345 0
346 )
347 }),
348 ...(this.opts.enableTasksQueue === true && {
349 maxQueuedTasks: this.workerNodes.reduce(
350 (accumulator, workerNode) =>
c63a35a0 351 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
352 0
353 )
354 }),
a1763c54
JB
355 ...(this.opts.enableTasksQueue === true && {
356 backPressure: this.hasBackPressure()
357 }),
68cbdc84
JB
358 ...(this.opts.enableTasksQueue === true && {
359 stolenTasks: this.workerNodes.reduce(
360 (accumulator, workerNode) =>
361 accumulator + workerNode.usage.tasks.stolen,
362 0
363 )
364 }),
a4e07f72
JB
365 failedTasks: this.workerNodes.reduce(
366 (accumulator, workerNode) =>
f59e1027 367 accumulator + workerNode.usage.tasks.failed,
a4e07f72 368 0
1dcf8b7b 369 ),
bcfb06ce 370 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 371 .runTime.aggregate === true && {
1dcf8b7b 372 runTime: {
98e72cda 373 minimum: round(
90d6701c 374 min(
98e72cda 375 ...this.workerNodes.map(
c63a35a0 376 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 377 )
1dcf8b7b
JB
378 )
379 ),
98e72cda 380 maximum: round(
90d6701c 381 max(
98e72cda 382 ...this.workerNodes.map(
c63a35a0 383 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 384 )
1dcf8b7b 385 )
98e72cda 386 ),
bcfb06ce 387 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
3baa0837
JB
388 .runTime.average && {
389 average: round(
390 average(
391 this.workerNodes.reduce<number[]>(
392 (accumulator, workerNode) =>
393 accumulator.concat(workerNode.usage.runTime.history),
394 []
395 )
98e72cda 396 )
dc021bcc 397 )
3baa0837 398 }),
bcfb06ce 399 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
98e72cda
JB
400 .runTime.median && {
401 median: round(
402 median(
3baa0837
JB
403 this.workerNodes.reduce<number[]>(
404 (accumulator, workerNode) =>
405 accumulator.concat(workerNode.usage.runTime.history),
406 []
98e72cda
JB
407 )
408 )
409 )
410 })
1dcf8b7b
JB
411 }
412 }),
bcfb06ce 413 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 414 .waitTime.aggregate === true && {
1dcf8b7b 415 waitTime: {
98e72cda 416 minimum: round(
90d6701c 417 min(
98e72cda 418 ...this.workerNodes.map(
c63a35a0 419 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 420 )
1dcf8b7b
JB
421 )
422 ),
98e72cda 423 maximum: round(
90d6701c 424 max(
98e72cda 425 ...this.workerNodes.map(
c63a35a0 426 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 427 )
1dcf8b7b 428 )
98e72cda 429 ),
bcfb06ce 430 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
3baa0837
JB
431 .waitTime.average && {
432 average: round(
433 average(
434 this.workerNodes.reduce<number[]>(
435 (accumulator, workerNode) =>
436 accumulator.concat(workerNode.usage.waitTime.history),
437 []
438 )
98e72cda 439 )
dc021bcc 440 )
3baa0837 441 }),
bcfb06ce 442 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
98e72cda
JB
443 .waitTime.median && {
444 median: round(
445 median(
3baa0837
JB
446 this.workerNodes.reduce<number[]>(
447 (accumulator, workerNode) =>
448 accumulator.concat(workerNode.usage.waitTime.history),
449 []
98e72cda
JB
450 )
451 )
452 )
453 })
1dcf8b7b
JB
454 }
455 })
6b27d407
JB
456 }
457 }
08f3f44c 458
aa9eede8
JB
459 /**
460 * The pool readiness boolean status.
461 */
2431bdb4 462 private get ready (): boolean {
8e8d9101
JB
463 if (this.empty) {
464 return false
465 }
2431bdb4 466 return (
b97d82d8
JB
467 this.workerNodes.reduce(
468 (accumulator, workerNode) =>
469 !workerNode.info.dynamic && workerNode.info.ready
470 ? accumulator + 1
471 : accumulator,
472 0
26ce26ca 473 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
474 )
475 }
476
8e8d9101
JB
477 /**
478 * The pool emptiness boolean status.
479 */
480 protected get empty (): boolean {
fb43035c 481 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
482 }
483
afe0d5bf 484 /**
aa9eede8 485 * The approximate pool utilization.
afe0d5bf
JB
486 *
487 * @returns The pool utilization.
488 */
489 private get utilization (): number {
8e5ca040 490 const poolTimeCapacity =
26ce26ca
JB
491 (performance.now() - this.startTimestamp) *
492 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
493 const totalTasksRunTime = this.workerNodes.reduce(
494 (accumulator, workerNode) =>
c63a35a0 495 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
496 0
497 )
498 const totalTasksWaitTime = this.workerNodes.reduce(
499 (accumulator, workerNode) =>
c63a35a0 500 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
501 0
502 )
8e5ca040 503 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
504 }
505
8881ae32 506 /**
aa9eede8 507 * The pool type.
8881ae32
JB
508 *
509 * If it is `'dynamic'`, it provides the `max` property.
510 */
511 protected abstract get type (): PoolType
512
184855e6 513 /**
aa9eede8 514 * The worker type.
184855e6
JB
515 */
516 protected abstract get worker (): WorkerType
517
6b813701
JB
518 /**
519 * Checks if the worker id sent in the received message from a worker is valid.
520 *
521 * @param message - The received message.
522 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
523 */
4d159167 524 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
525 if (message.workerId == null) {
526 throw new Error('Worker message received without worker id')
8e5a7cf9 527 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
528 throw new Error(
529 `Worker message received from unknown worker '${message.workerId}'`
530 )
531 }
532 }
533
aa9eede8
JB
534 /**
535 * Gets the worker node key given its worker id.
536 *
537 * @param workerId - The worker id.
aad6fb64 538 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 539 */
72ae84a2 540 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 541 return this.workerNodes.findIndex(
041dc05b 542 workerNode => workerNode.info.id === workerId
aad6fb64 543 )
aa9eede8
JB
544 }
545
afc003b2 546 /** @inheritDoc */
a35560ba 547 public setWorkerChoiceStrategy (
59219cbb
JB
548 workerChoiceStrategy: WorkerChoiceStrategy,
549 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 550 ): void {
19b8be8b 551 let requireSync = false
bde6b5d7 552 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
85bbc7ab 553 if (workerChoiceStrategyOptions != null) {
19b8be8b
JB
554 requireSync = this.setWorkerChoiceStrategyOptions(
555 workerChoiceStrategyOptions
556 )
85bbc7ab
JB
557 }
558 if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
559 this.opts.workerChoiceStrategy = workerChoiceStrategy
560 this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
561 this.opts.workerChoiceStrategy,
562 this.opts.workerChoiceStrategyOptions
563 )
19b8be8b
JB
564 requireSync = true
565 }
566 if (requireSync) {
85bbc7ab
JB
567 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
568 this.getWorkerWorkerChoiceStrategies(),
569 this.opts.workerChoiceStrategyOptions
570 )
19b8be8b 571 for (const workerNodeKey of this.workerNodes.keys()) {
85bbc7ab
JB
572 this.sendStatisticsMessageToWorker(workerNodeKey)
573 }
59219cbb 574 }
a20f0ba5
JB
575 }
576
577 /** @inheritDoc */
578 public setWorkerChoiceStrategyOptions (
c63a35a0 579 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
19b8be8b 580 ): boolean {
0d80593b 581 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
582 if (workerChoiceStrategyOptions != null) {
583 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
19b8be8b
JB
584 this.workerChoiceStrategiesContext?.setOptions(
585 this.opts.workerChoiceStrategyOptions
586 )
587 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
588 this.getWorkerWorkerChoiceStrategies(),
589 this.opts.workerChoiceStrategyOptions
590 )
591 for (const workerNodeKey of this.workerNodes.keys()) {
592 this.sendStatisticsMessageToWorker(workerNodeKey)
593 }
594 return true
8990357d 595 }
19b8be8b 596 return false
a35560ba
S
597 }
598
a20f0ba5 599 /** @inheritDoc */
8f52842f
JB
600 public enableTasksQueue (
601 enable: boolean,
602 tasksQueueOptions?: TasksQueueOptions
603 ): void {
a20f0ba5 604 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
605 this.unsetTaskStealing()
606 this.unsetTasksStealingOnBackPressure()
ef41a6e6 607 this.flushTasksQueues()
a20f0ba5
JB
608 }
609 this.opts.enableTasksQueue = enable
c63a35a0 610 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
611 }
612
613 /** @inheritDoc */
c63a35a0
JB
614 public setTasksQueueOptions (
615 tasksQueueOptions: TasksQueueOptions | undefined
616 ): void {
a20f0ba5 617 if (this.opts.enableTasksQueue === true) {
bde6b5d7 618 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
619 this.opts.tasksQueueOptions =
620 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
621 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
622 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 623 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 624 this.unsetTaskStealing()
d6ca1416
JB
625 this.setTaskStealing()
626 } else {
627 this.unsetTaskStealing()
628 }
629 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 630 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
631 this.setTasksStealingOnBackPressure()
632 } else {
633 this.unsetTasksStealingOnBackPressure()
634 }
5baee0d7 635 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
636 delete this.opts.tasksQueueOptions
637 }
638 }
639
9b38ab2d 640 private buildTasksQueueOptions (
c63a35a0 641 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
642 ): TasksQueueOptions {
643 return {
26ce26ca
JB
644 ...getDefaultTasksQueueOptions(
645 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
646 ),
9b38ab2d
JB
647 ...tasksQueueOptions
648 }
649 }
650
5b49e864 651 private setTasksQueueSize (size: number): void {
20c6f652 652 for (const workerNode of this.workerNodes) {
ff3f866a 653 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
654 }
655 }
656
d6ca1416 657 private setTaskStealing (): void {
19b8be8b 658 for (const workerNodeKey of this.workerNodes.keys()) {
e44639e9 659 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
660 }
661 }
662
663 private unsetTaskStealing (): void {
19b8be8b 664 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 665 this.workerNodes[workerNodeKey].off(
e44639e9
JB
666 'idle',
667 this.handleWorkerNodeIdleEvent
9f95d5eb 668 )
d6ca1416
JB
669 }
670 }
671
672 private setTasksStealingOnBackPressure (): void {
19b8be8b 673 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 674 this.workerNodes[workerNodeKey].on(
b5e75be8 675 'backPressure',
e44639e9 676 this.handleWorkerNodeBackPressureEvent
9f95d5eb 677 )
d6ca1416
JB
678 }
679 }
680
681 private unsetTasksStealingOnBackPressure (): void {
19b8be8b 682 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 683 this.workerNodes[workerNodeKey].off(
b5e75be8 684 'backPressure',
e44639e9 685 this.handleWorkerNodeBackPressureEvent
9f95d5eb 686 )
d6ca1416
JB
687 }
688 }
689
c319c66b
JB
690 /**
691 * Whether the pool is full or not.
692 *
693 * The pool filling boolean status.
694 */
dea903a8 695 protected get full (): boolean {
26ce26ca
JB
696 return (
697 this.workerNodes.length >=
698 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
699 )
dea903a8 700 }
c2ade475 701
c319c66b
JB
702 /**
703 * Whether the pool is busy or not.
704 *
705 * The pool busyness boolean status.
706 */
707 protected abstract get busy (): boolean
7c0ba920 708
6c6afb84 709 /**
3d76750a 710 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
711 *
712 * @returns Worker nodes busyness boolean status.
713 */
c2ade475 714 protected internalBusy (): boolean {
3d76750a
JB
715 if (this.opts.enableTasksQueue === true) {
716 return (
717 this.workerNodes.findIndex(
041dc05b 718 workerNode =>
3d76750a
JB
719 workerNode.info.ready &&
720 workerNode.usage.tasks.executing <
67f3f2d6
JB
721 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
722 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
723 ) === -1
724 )
3d76750a 725 }
419e8121
JB
726 return (
727 this.workerNodes.findIndex(
728 workerNode =>
729 workerNode.info.ready && workerNode.usage.tasks.executing === 0
730 ) === -1
731 )
cb70b19d
JB
732 }
733
42c677c1
JB
734 private isWorkerNodeBusy (workerNodeKey: number): boolean {
735 if (this.opts.enableTasksQueue === true) {
736 return (
737 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
738 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
739 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
740 )
741 }
742 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
743 }
744
e81c38f2 745 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
746 workerNodeKey: number,
747 message: MessageValue<Data>
748 ): Promise<boolean> {
72ae84a2 749 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
750 const taskFunctionOperationListener = (
751 message: MessageValue<Response>
752 ): void => {
753 this.checkMessageWorkerId(message)
1851fed0 754 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 755 if (
ae036c3e
JB
756 message.taskFunctionOperationStatus != null &&
757 message.workerId === workerId
72ae84a2 758 ) {
ae036c3e
JB
759 if (message.taskFunctionOperationStatus) {
760 resolve(true)
c63a35a0 761 } else {
ae036c3e
JB
762 reject(
763 new Error(
c63a35a0 764 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 765 )
72ae84a2 766 )
ae036c3e
JB
767 }
768 this.deregisterWorkerMessageListener(
769 this.getWorkerNodeKeyByWorkerId(message.workerId),
770 taskFunctionOperationListener
72ae84a2
JB
771 )
772 }
ae036c3e
JB
773 }
774 this.registerWorkerMessageListener(
775 workerNodeKey,
776 taskFunctionOperationListener
777 )
72ae84a2
JB
778 this.sendToWorker(workerNodeKey, message)
779 })
780 }
781
782 private async sendTaskFunctionOperationToWorkers (
adee6053 783 message: MessageValue<Data>
e81c38f2
JB
784 ): Promise<boolean> {
785 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
786 const responsesReceived = new Array<MessageValue<Response>>()
787 const taskFunctionOperationsListener = (
788 message: MessageValue<Response>
789 ): void => {
790 this.checkMessageWorkerId(message)
791 if (message.taskFunctionOperationStatus != null) {
792 responsesReceived.push(message)
793 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 794 if (
e81c38f2
JB
795 responsesReceived.every(
796 message => message.taskFunctionOperationStatus === true
797 )
798 ) {
799 resolve(true)
800 } else if (
e81c38f2
JB
801 responsesReceived.some(
802 message => message.taskFunctionOperationStatus === false
803 )
804 ) {
b0b55f57
JB
805 const errorResponse = responsesReceived.find(
806 response => response.taskFunctionOperationStatus === false
807 )
e81c38f2
JB
808 reject(
809 new Error(
b0b55f57 810 `Task function operation '${
e81c38f2 811 message.taskFunctionOperation as string
c63a35a0
JB
812 }' failed on worker ${errorResponse?.workerId} with error: '${
813 errorResponse?.workerError?.message
b0b55f57 814 }'`
e81c38f2
JB
815 )
816 )
817 }
ae036c3e
JB
818 this.deregisterWorkerMessageListener(
819 this.getWorkerNodeKeyByWorkerId(message.workerId),
820 taskFunctionOperationsListener
821 )
e81c38f2 822 }
ae036c3e
JB
823 }
824 }
19b8be8b 825 for (const workerNodeKey of this.workerNodes.keys()) {
ae036c3e
JB
826 this.registerWorkerMessageListener(
827 workerNodeKey,
828 taskFunctionOperationsListener
829 )
72ae84a2 830 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
831 }
832 })
6703b9f4
JB
833 }
834
835 /** @inheritDoc */
836 public hasTaskFunction (name: string): boolean {
85bbc7ab
JB
837 return this.listTaskFunctionsProperties().some(
838 taskFunctionProperties => taskFunctionProperties.name === name
839 )
6703b9f4
JB
840 }
841
842 /** @inheritDoc */
e81c38f2
JB
843 public async addTaskFunction (
844 name: string,
31847469 845 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
e81c38f2 846 ): Promise<boolean> {
3feeab69
JB
847 if (typeof name !== 'string') {
848 throw new TypeError('name argument must be a string')
849 }
850 if (typeof name === 'string' && name.trim().length === 0) {
851 throw new TypeError('name argument must not be an empty string')
852 }
31847469
JB
853 if (typeof fn === 'function') {
854 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
855 }
856 if (typeof fn.taskFunction !== 'function') {
857 throw new TypeError('taskFunction property must be a function')
3feeab69 858 }
95d1a734
JB
859 checkValidPriority(fn.priority)
860 checkValidWorkerChoiceStrategy(fn.strategy)
adee6053 861 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4 862 taskFunctionOperation: 'add',
31847469
JB
863 taskFunctionProperties: buildTaskFunctionProperties(name, fn),
864 taskFunction: fn.taskFunction.toString()
6703b9f4 865 })
adee6053 866 this.taskFunctions.set(name, fn)
85bbc7ab
JB
867 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
868 this.getWorkerWorkerChoiceStrategies()
869 )
adee6053 870 return opResult
6703b9f4
JB
871 }
872
873 /** @inheritDoc */
e81c38f2 874 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
875 if (!this.taskFunctions.has(name)) {
876 throw new Error(
16248b23 877 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
878 )
879 }
adee6053 880 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4 881 taskFunctionOperation: 'remove',
31847469
JB
882 taskFunctionProperties: buildTaskFunctionProperties(
883 name,
884 this.taskFunctions.get(name)
885 )
6703b9f4 886 })
adee6053
JB
887 this.deleteTaskFunctionWorkerUsages(name)
888 this.taskFunctions.delete(name)
85bbc7ab
JB
889 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
890 this.getWorkerWorkerChoiceStrategies()
891 )
adee6053 892 return opResult
6703b9f4
JB
893 }
894
90d7d101 895 /** @inheritDoc */
31847469 896 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
f2dbbf95
JB
897 for (const workerNode of this.workerNodes) {
898 if (
31847469
JB
899 Array.isArray(workerNode.info.taskFunctionsProperties) &&
900 workerNode.info.taskFunctionsProperties.length > 0
f2dbbf95 901 ) {
31847469 902 return workerNode.info.taskFunctionsProperties
f2dbbf95 903 }
90d7d101 904 }
f2dbbf95 905 return []
90d7d101
JB
906 }
907
bcfb06ce
JB
908 /**
909 * Gets task function strategy, if any.
910 *
911 * @param name - The task function name.
912 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
913 */
914 private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
915 name?: string
916 ): WorkerChoiceStrategy | undefined => {
917 if (name != null) {
918 return this.listTaskFunctionsProperties().find(
919 (taskFunctionProperties: TaskFunctionProperties) =>
920 taskFunctionProperties.name === name
921 )?.strategy
922 }
923 }
924
95d1a734
JB
925 /**
926 * Gets worker node task function priority, if any.
927 *
928 * @param workerNodeKey - The worker node key.
929 * @param name - The task function name.
930 * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
931 */
932 private readonly getWorkerNodeTaskFunctionPriority = (
933 workerNodeKey: number,
934 name?: string
935 ): number | undefined => {
936 if (name != null) {
937 return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
938 (taskFunctionProperties: TaskFunctionProperties) =>
939 taskFunctionProperties.name === name
940 )?.priority
941 }
942 }
943
85bbc7ab
JB
944 /**
945 * Gets the worker choice strategies registered in this pool.
946 *
947 * @returns The worker choice strategies.
948 */
949 private readonly getWorkerWorkerChoiceStrategies =
950 (): Set<WorkerChoiceStrategy> => {
951 return new Set([
952 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
953 this.opts.workerChoiceStrategy!,
954 ...(this.listTaskFunctionsProperties()
955 .map(
956 (taskFunctionProperties: TaskFunctionProperties) =>
957 taskFunctionProperties.strategy
958 )
959 .filter(
960 (strategy: WorkerChoiceStrategy | undefined) => strategy != null
961 ) as WorkerChoiceStrategy[])
962 ])
963 }
964
6703b9f4 965 /** @inheritDoc */
e81c38f2 966 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 967 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4 968 taskFunctionOperation: 'default',
31847469
JB
969 taskFunctionProperties: buildTaskFunctionProperties(
970 name,
971 this.taskFunctions.get(name)
972 )
6703b9f4 973 })
6703b9f4
JB
974 }
975
adee6053
JB
976 private deleteTaskFunctionWorkerUsages (name: string): void {
977 for (const workerNode of this.workerNodes) {
978 workerNode.deleteTaskFunctionWorkerUsage(name)
979 }
980 }
981
375f7504
JB
982 private shallExecuteTask (workerNodeKey: number): boolean {
983 return (
984 this.tasksQueueSize(workerNodeKey) === 0 &&
985 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
986 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
987 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
988 )
989 }
990
afc003b2 991 /** @inheritDoc */
7d91a8cd
JB
992 public async execute (
993 data?: Data,
994 name?: string,
6a3ecc50 995 transferList?: readonly TransferListItem[]
7d91a8cd 996 ): Promise<Response> {
52b71763 997 return await new Promise<Response>((resolve, reject) => {
15b176e0 998 if (!this.started) {
47352846 999 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 1000 return
15b176e0 1001 }
711623b8
JB
1002 if (this.destroying) {
1003 reject(new Error('Cannot execute a task on destroying pool'))
1004 return
1005 }
7d91a8cd
JB
1006 if (name != null && typeof name !== 'string') {
1007 reject(new TypeError('name argument must be a string'))
9d2d0da1 1008 return
7d91a8cd 1009 }
90d7d101
JB
1010 if (
1011 name != null &&
1012 typeof name === 'string' &&
1013 name.trim().length === 0
1014 ) {
f58b60b9 1015 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 1016 return
90d7d101 1017 }
b558f6b5
JB
1018 if (transferList != null && !Array.isArray(transferList)) {
1019 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 1020 return
b558f6b5
JB
1021 }
1022 const timestamp = performance.now()
95d1a734 1023 const taskFunctionStrategy =
bcfb06ce 1024 this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
95d1a734 1025 const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
501aea93 1026 const task: Task<Data> = {
52b71763
JB
1027 name: name ?? DEFAULT_TASK_NAME,
1028 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
1029 data: data ?? ({} as Data),
95d1a734
JB
1030 priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
1031 strategy: taskFunctionStrategy,
7d91a8cd 1032 transferList,
52b71763 1033 timestamp,
7629bdf1 1034 taskId: randomUUID()
52b71763 1035 }
67f3f2d6
JB
1036 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1037 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
1038 resolve,
1039 reject,
f18fd12b
JB
1040 workerNodeKey,
1041 ...(this.emitter != null && {
1042 asyncResource: new AsyncResource('poolifier:task', {
1043 triggerAsyncId: this.emitter.asyncId,
1044 requireManualDestroy: true
1045 })
1046 })
2e81254d 1047 })
52b71763 1048 if (
4e377863
JB
1049 this.opts.enableTasksQueue === false ||
1050 (this.opts.enableTasksQueue === true &&
375f7504 1051 this.shallExecuteTask(workerNodeKey))
52b71763 1052 ) {
501aea93 1053 this.executeTask(workerNodeKey, task)
4e377863
JB
1054 } else {
1055 this.enqueueTask(workerNodeKey, task)
52b71763 1056 }
2e81254d 1057 })
280c2a77 1058 }
c97c7edb 1059
60dc0a9c
JB
1060 /**
1061 * Starts the minimum number of workers.
1062 */
1063 private startMinimumNumberOfWorkers (): void {
b362a929 1064 this.startingMinimumNumberOfWorkers = true
60dc0a9c
JB
1065 while (
1066 this.workerNodes.reduce(
1067 (accumulator, workerNode) =>
1068 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
1069 0
1070 ) < this.minimumNumberOfWorkers
1071 ) {
1072 this.createAndSetupWorkerNode()
1073 }
b362a929 1074 this.startingMinimumNumberOfWorkers = false
60dc0a9c
JB
1075 }
1076
47352846
JB
1077 /** @inheritdoc */
1078 public start (): void {
711623b8
JB
1079 if (this.started) {
1080 throw new Error('Cannot start an already started pool')
1081 }
1082 if (this.starting) {
1083 throw new Error('Cannot start an already starting pool')
1084 }
1085 if (this.destroying) {
1086 throw new Error('Cannot start a destroying pool')
1087 }
47352846 1088 this.starting = true
60dc0a9c 1089 this.startMinimumNumberOfWorkers()
47352846
JB
1090 this.starting = false
1091 this.started = true
1092 }
1093
afc003b2 1094 /** @inheritDoc */
c97c7edb 1095 public async destroy (): Promise<void> {
711623b8
JB
1096 if (!this.started) {
1097 throw new Error('Cannot destroy an already destroyed pool')
1098 }
1099 if (this.starting) {
1100 throw new Error('Cannot destroy an starting pool')
1101 }
1102 if (this.destroying) {
1103 throw new Error('Cannot destroy an already destroying pool')
1104 }
1105 this.destroying = true
1fbcaa7c 1106 await Promise.all(
14c39df1 1107 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 1108 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
1109 })
1110 )
33e6bb4c 1111 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 1112 this.emitter?.emitDestroy()
55082af9 1113 this.readyEventEmitted = false
711623b8 1114 this.destroying = false
15b176e0 1115 this.started = false
c97c7edb
S
1116 }
1117
26ce26ca 1118 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1119 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1120 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1121 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1122 resolve()
85b2561d
JB
1123 return
1124 }
ae036c3e
JB
1125 const killMessageListener = (message: MessageValue<Response>): void => {
1126 this.checkMessageWorkerId(message)
1e3214b6
JB
1127 if (message.kill === 'success') {
1128 resolve()
1129 } else if (message.kill === 'failure') {
72ae84a2
JB
1130 reject(
1131 new Error(
c63a35a0 1132 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1133 )
1134 )
1e3214b6 1135 }
ae036c3e 1136 }
51d9cfbd 1137 // FIXME: should be registered only once
ae036c3e 1138 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1139 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1140 })
1e3214b6
JB
1141 }
1142
4a6952ff 1143 /**
aa9eede8 1144 * Terminates the worker node given its worker node key.
4a6952ff 1145 *
aa9eede8 1146 * @param workerNodeKey - The worker node key.
4a6952ff 1147 */
07e0c9e5
JB
1148 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1149 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1150 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1151 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1152 await waitWorkerNodeEvents(
1153 workerNode,
1154 'taskFinished',
1155 flushedTasks,
1156 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1157 getDefaultTasksQueueOptions(
1158 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1159 ).tasksFinishedTimeout
32b141fd 1160 )
07e0c9e5
JB
1161 await this.sendKillMessageToWorker(workerNodeKey)
1162 await workerNode.terminate()
1163 }
c97c7edb 1164
729c563d 1165 /**
6677a3d3
JB
1166 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1167 * Can be overridden.
afc003b2
JB
1168 *
1169 * @virtual
729c563d 1170 */
280c2a77 1171 protected setupHook (): void {
965df41c 1172 /* Intentionally empty */
280c2a77 1173 }
c97c7edb 1174
729c563d 1175 /**
5bec72fd
JB
1176 * Returns whether the worker is the main worker or not.
1177 *
1178 * @returns `true` if the worker is the main worker, `false` otherwise.
280c2a77
S
1179 */
1180 protected abstract isMain (): boolean
1181
1182 /**
2e81254d 1183 * Hook executed before the worker task execution.
bf9549ae 1184 * Can be overridden.
729c563d 1185 *
f06e48d8 1186 * @param workerNodeKey - The worker node key.
1c6fe997 1187 * @param task - The task to execute.
729c563d 1188 */
1c6fe997
JB
1189 protected beforeTaskExecutionHook (
1190 workerNodeKey: number,
1191 task: Task<Data>
1192 ): void {
c63a35a0 1193 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1194 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1195 const workerUsage = this.workerNodes[workerNodeKey].usage
1196 ++workerUsage.tasks.executing
c329fd41 1197 updateWaitTimeWorkerUsage(
bcfb06ce 1198 this.workerChoiceStrategiesContext,
c329fd41
JB
1199 workerUsage,
1200 task
1201 )
94407def
JB
1202 }
1203 if (
1204 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1205 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1206 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1207 null
94407def 1208 ) {
67f3f2d6 1209 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1210 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1211 workerNodeKey
67f3f2d6
JB
1212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1213 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1214 ++taskFunctionWorkerUsage.tasks.executing
c329fd41 1215 updateWaitTimeWorkerUsage(
bcfb06ce 1216 this.workerChoiceStrategiesContext,
c329fd41
JB
1217 taskFunctionWorkerUsage,
1218 task
1219 )
b558f6b5 1220 }
c97c7edb
S
1221 }
1222
c01733f1 1223 /**
2e81254d 1224 * Hook executed after the worker task execution.
bf9549ae 1225 * Can be overridden.
c01733f1 1226 *
501aea93 1227 * @param workerNodeKey - The worker node key.
38e795c1 1228 * @param message - The received message.
c01733f1 1229 */
2e81254d 1230 protected afterTaskExecutionHook (
501aea93 1231 workerNodeKey: number,
2740a743 1232 message: MessageValue<Response>
bf9549ae 1233 ): void {
c329fd41 1234 let needWorkerChoiceStrategyUpdate = false
c63a35a0 1235 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1236 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1237 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1238 updateTaskStatisticsWorkerUsage(workerUsage, message)
1239 updateRunTimeWorkerUsage(
bcfb06ce 1240 this.workerChoiceStrategiesContext,
c329fd41
JB
1241 workerUsage,
1242 message
1243 )
1244 updateEluWorkerUsage(
bcfb06ce 1245 this.workerChoiceStrategiesContext,
c329fd41
JB
1246 workerUsage,
1247 message
1248 )
1249 needWorkerChoiceStrategyUpdate = true
94407def
JB
1250 }
1251 if (
1252 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1253 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1254 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1255 message.taskPerformance!.name
94407def
JB
1256 ) != null
1257 ) {
67f3f2d6 1258 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1259 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1260 workerNodeKey
67f3f2d6
JB
1261 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1262 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1263 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1264 updateRunTimeWorkerUsage(
bcfb06ce 1265 this.workerChoiceStrategiesContext,
c329fd41
JB
1266 taskFunctionWorkerUsage,
1267 message
1268 )
1269 updateEluWorkerUsage(
bcfb06ce 1270 this.workerChoiceStrategiesContext,
c329fd41
JB
1271 taskFunctionWorkerUsage,
1272 message
1273 )
1274 needWorkerChoiceStrategyUpdate = true
1275 }
1276 if (needWorkerChoiceStrategyUpdate) {
bcfb06ce 1277 this.workerChoiceStrategiesContext?.update(workerNodeKey)
b558f6b5
JB
1278 }
1279 }
1280
db0e38ee
JB
1281 /**
1282 * Whether the worker node shall update its task function worker usage or not.
1283 *
1284 * @param workerNodeKey - The worker node key.
1285 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1286 */
1287 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1288 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1289 return (
1851fed0 1290 workerInfo != null &&
31847469
JB
1291 Array.isArray(workerInfo.taskFunctionsProperties) &&
1292 workerInfo.taskFunctionsProperties.length > 2
b558f6b5 1293 )
f1c06930
JB
1294 }
1295
280c2a77 1296 /**
f06e48d8 1297 * Chooses a worker node for the next task.
280c2a77 1298 *
bcfb06ce 1299 * @param workerChoiceStrategy - The worker choice strategy.
aa9eede8 1300 * @returns The chosen worker node key
280c2a77 1301 */
bcfb06ce
JB
1302 private chooseWorkerNode (
1303 workerChoiceStrategy?: WorkerChoiceStrategy
1304 ): number {
930dcf12 1305 if (this.shallCreateDynamicWorker()) {
aa9eede8 1306 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1307 if (
bcfb06ce
JB
1308 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1309 true
6c6afb84 1310 ) {
aa9eede8 1311 return workerNodeKey
6c6afb84 1312 }
17393ac8 1313 }
c63a35a0 1314 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
bcfb06ce 1315 return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
930dcf12
JB
1316 }
1317
6c6afb84
JB
1318 /**
1319 * Conditions for dynamic worker creation.
1320 *
1321 * @returns Whether to create a dynamic worker or not.
1322 */
9d9fb7b6 1323 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1324
280c2a77 1325 /**
aa9eede8 1326 * Sends a message to worker given its worker node key.
280c2a77 1327 *
aa9eede8 1328 * @param workerNodeKey - The worker node key.
38e795c1 1329 * @param message - The message.
7d91a8cd 1330 * @param transferList - The optional array of transferable objects.
280c2a77
S
1331 */
1332 protected abstract sendToWorker (
aa9eede8 1333 workerNodeKey: number,
7d91a8cd 1334 message: MessageValue<Data>,
6a3ecc50 1335 transferList?: readonly TransferListItem[]
280c2a77
S
1336 ): void
1337
4a6952ff 1338 /**
aa9eede8 1339 * Creates a new, completely set up worker node.
4a6952ff 1340 *
aa9eede8 1341 * @returns New, completely set up worker node key.
4a6952ff 1342 */
aa9eede8 1343 protected createAndSetupWorkerNode (): number {
c3719753
JB
1344 const workerNode = this.createWorkerNode()
1345 workerNode.registerWorkerEventHandler(
1346 'online',
1347 this.opts.onlineHandler ?? EMPTY_FUNCTION
1348 )
1349 workerNode.registerWorkerEventHandler(
1350 'message',
1351 this.opts.messageHandler ?? EMPTY_FUNCTION
1352 )
1353 workerNode.registerWorkerEventHandler(
1354 'error',
1355 this.opts.errorHandler ?? EMPTY_FUNCTION
1356 )
b271fce0 1357 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1358 workerNode.info.ready = false
2a69b8c5 1359 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1360 if (
b6bfca01 1361 this.started &&
711623b8 1362 !this.destroying &&
9b38ab2d 1363 this.opts.restartWorkerOnError === true
15b176e0 1364 ) {
07e0c9e5 1365 if (workerNode.info.dynamic) {
aa9eede8 1366 this.createAndSetupDynamicWorkerNode()
b362a929 1367 } else if (!this.startingMinimumNumberOfWorkers) {
3d720596 1368 this.startMinimumNumberOfWorkers()
8a1260a3 1369 }
5baee0d7 1370 }
3c9123c7
JB
1371 if (
1372 this.started &&
1373 !this.destroying &&
1374 this.opts.enableTasksQueue === true
1375 ) {
9974369e 1376 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1377 }
b4f8aca8 1378 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
07f6f7b1 1379 workerNode?.terminate().catch((error: unknown) => {
07e0c9e5
JB
1380 this.emitter?.emit(PoolEvents.error, error)
1381 })
5baee0d7 1382 })
c3719753
JB
1383 workerNode.registerWorkerEventHandler(
1384 'exit',
1385 this.opts.exitHandler ?? EMPTY_FUNCTION
1386 )
1387 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1388 this.removeWorkerNode(workerNode)
b362a929
JB
1389 if (
1390 this.started &&
1391 !this.startingMinimumNumberOfWorkers &&
1392 !this.destroying
1393 ) {
60dc0a9c
JB
1394 this.startMinimumNumberOfWorkers()
1395 }
a974afa6 1396 })
c3719753 1397 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1398 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1399 return workerNodeKey
c97c7edb 1400 }
be0676b3 1401
930dcf12 1402 /**
aa9eede8 1403 * Creates a new, completely set up dynamic worker node.
930dcf12 1404 *
aa9eede8 1405 * @returns New, completely set up dynamic worker node key.
930dcf12 1406 */
aa9eede8
JB
1407 protected createAndSetupDynamicWorkerNode (): number {
1408 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1409 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1410 this.checkMessageWorkerId(message)
aa9eede8
JB
1411 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1412 message.workerId
aad6fb64 1413 )
2b6b412f 1414 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1415 // Kill message received from worker
930dcf12
JB
1416 if (
1417 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1418 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1419 ((this.opts.enableTasksQueue === false &&
aa9eede8 1420 workerUsage.tasks.executing === 0) ||
7b56f532 1421 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1422 workerUsage.tasks.executing === 0 &&
1423 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1424 ) {
f3827d5d 1425 // Flag the worker node as not ready immediately
ae3ab61d 1426 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
07f6f7b1 1427 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
5270d253
JB
1428 this.emitter?.emit(PoolEvents.error, error)
1429 })
930dcf12
JB
1430 }
1431 })
aa9eede8 1432 this.sendToWorker(workerNodeKey, {
e9dd5b66 1433 checkActive: true
21f710aa 1434 })
72ae84a2 1435 if (this.taskFunctions.size > 0) {
31847469 1436 for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
72ae84a2
JB
1437 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1438 taskFunctionOperation: 'add',
31847469
JB
1439 taskFunctionProperties: buildTaskFunctionProperties(
1440 taskFunctionName,
1441 taskFunctionObject
1442 ),
1443 taskFunction: taskFunctionObject.taskFunction.toString()
07f6f7b1 1444 }).catch((error: unknown) => {
72ae84a2
JB
1445 this.emitter?.emit(PoolEvents.error, error)
1446 })
1447 }
1448 }
e44639e9
JB
1449 const workerNode = this.workerNodes[workerNodeKey]
1450 workerNode.info.dynamic = true
b1aae695 1451 if (
bcfb06ce
JB
1452 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
1453 true ||
1454 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1455 true
b1aae695 1456 ) {
e44639e9 1457 workerNode.info.ready = true
b5e113f6 1458 }
33e6bb4c 1459 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1460 return workerNodeKey
930dcf12
JB
1461 }
1462
a2ed5053 1463 /**
aa9eede8 1464 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1465 *
aa9eede8 1466 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1467 * @param listener - The message listener callback.
1468 */
85aeb3f3
JB
1469 protected abstract registerWorkerMessageListener<
1470 Message extends Data | Response
aa9eede8
JB
1471 >(
1472 workerNodeKey: number,
1473 listener: (message: MessageValue<Message>) => void
1474 ): void
a2ed5053 1475
ae036c3e
JB
1476 /**
1477 * Registers once a listener callback on the worker given its worker node key.
1478 *
1479 * @param workerNodeKey - The worker node key.
1480 * @param listener - The message listener callback.
1481 */
1482 protected abstract registerOnceWorkerMessageListener<
1483 Message extends Data | Response
1484 >(
1485 workerNodeKey: number,
1486 listener: (message: MessageValue<Message>) => void
1487 ): void
1488
1489 /**
1490 * Deregisters a listener callback on the worker given its worker node key.
1491 *
1492 * @param workerNodeKey - The worker node key.
1493 * @param listener - The message listener callback.
1494 */
1495 protected abstract deregisterWorkerMessageListener<
1496 Message extends Data | Response
1497 >(
1498 workerNodeKey: number,
1499 listener: (message: MessageValue<Message>) => void
1500 ): void
1501
a2ed5053 1502 /**
aa9eede8 1503 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1504 * Can be overridden.
1505 *
aa9eede8 1506 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1507 */
aa9eede8 1508 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1509 // Listen to worker messages.
fcd39179
JB
1510 this.registerWorkerMessageListener(
1511 workerNodeKey,
3a776b6d 1512 this.workerMessageListener
fcd39179 1513 )
85aeb3f3 1514 // Send the startup message to worker.
aa9eede8 1515 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1516 // Send the statistics message to worker.
1517 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1518 if (this.opts.enableTasksQueue === true) {
dbd73092 1519 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1520 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1521 'idle',
1522 this.handleWorkerNodeIdleEvent
9f95d5eb 1523 )
47352846
JB
1524 }
1525 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1526 this.workerNodes[workerNodeKey].on(
b5e75be8 1527 'backPressure',
e44639e9 1528 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1529 )
47352846 1530 }
72695f86 1531 }
d2c73f82
JB
1532 }
1533
85aeb3f3 1534 /**
aa9eede8
JB
1535 * Sends the startup message to worker given its worker node key.
1536 *
1537 * @param workerNodeKey - The worker node key.
1538 */
1539 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1540
1541 /**
9edb9717 1542 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1543 *
aa9eede8 1544 * @param workerNodeKey - The worker node key.
85aeb3f3 1545 */
9edb9717 1546 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1547 this.sendToWorker(workerNodeKey, {
1548 statistics: {
1549 runTime:
bcfb06ce 1550 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 1551 .runTime.aggregate ?? false,
c63a35a0 1552 elu:
bcfb06ce
JB
1553 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1554 .elu.aggregate ?? false
72ae84a2 1555 }
aa9eede8
JB
1556 })
1557 }
a2ed5053 1558
5eb72b9e
JB
1559 private cannotStealTask (): boolean {
1560 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1561 }
1562
42c677c1
JB
1563 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1564 if (this.shallExecuteTask(workerNodeKey)) {
1565 this.executeTask(workerNodeKey, task)
1566 } else {
1567 this.enqueueTask(workerNodeKey, task)
1568 }
1569 }
1570
a2ed5053 1571 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1572 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1573 return
1574 }
a2ed5053 1575 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1576 const destinationWorkerNodeKey = this.workerNodes.reduce(
1577 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1578 return workerNode.info.ready &&
1579 workerNode.usage.tasks.queued <
1580 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1581 ? workerNodeKey
1582 : minWorkerNodeKey
1583 },
1584 0
1585 )
42c677c1
JB
1586 this.handleTask(
1587 destinationWorkerNodeKey,
67f3f2d6
JB
1588 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1589 this.dequeueTask(workerNodeKey)!
42c677c1 1590 )
dd951876
JB
1591 }
1592 }
1593
b1838604
JB
1594 private updateTaskStolenStatisticsWorkerUsage (
1595 workerNodeKey: number,
b1838604
JB
1596 taskName: string
1597 ): void {
1a880eca 1598 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1599 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1600 if (workerNode?.usage != null) {
b1838604
JB
1601 ++workerNode.usage.tasks.stolen
1602 }
1603 if (
1604 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1605 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1606 ) {
67f3f2d6
JB
1607 const taskFunctionWorkerUsage =
1608 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1609 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1610 ++taskFunctionWorkerUsage.tasks.stolen
1611 }
1612 }
1613
463226a4 1614 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1615 workerNodeKey: number
463226a4
JB
1616 ): void {
1617 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1618 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1619 if (workerNode?.usage != null) {
463226a4
JB
1620 ++workerNode.usage.tasks.sequentiallyStolen
1621 }
f1f77f45
JB
1622 }
1623
1624 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1625 workerNodeKey: number,
1626 taskName: string
1627 ): void {
1628 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1629 if (
1630 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1631 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1632 ) {
67f3f2d6
JB
1633 const taskFunctionWorkerUsage =
1634 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1635 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1636 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1637 }
1638 }
1639
1640 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1641 workerNodeKey: number
463226a4
JB
1642 ): void {
1643 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1644 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1645 if (workerNode?.usage != null) {
463226a4
JB
1646 workerNode.usage.tasks.sequentiallyStolen = 0
1647 }
f1f77f45
JB
1648 }
1649
1650 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1651 workerNodeKey: number,
1652 taskName: string
1653 ): void {
1654 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1655 if (
1656 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1657 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1658 ) {
67f3f2d6
JB
1659 const taskFunctionWorkerUsage =
1660 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1661 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1662 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1663 }
1664 }
1665
e44639e9 1666 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1667 eventDetail: WorkerNodeEventDetail,
463226a4 1668 previousStolenTask?: Task<Data>
9f95d5eb 1669 ): void => {
e1c2dba7 1670 const { workerNodeKey } = eventDetail
463226a4
JB
1671 if (workerNodeKey == null) {
1672 throw new Error(
c63a35a0 1673 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1674 )
1675 }
c63a35a0 1676 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1677 if (
1678 this.cannotStealTask() ||
c63a35a0
JB
1679 (this.info.stealingWorkerNodes ?? 0) >
1680 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1681 ) {
1851fed0 1682 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1683 workerInfo.stealing = false
5eb72b9e
JB
1684 }
1685 return
1686 }
463226a4
JB
1687 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1688 if (
1851fed0 1689 workerInfo != null &&
463226a4
JB
1690 previousStolenTask != null &&
1691 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1692 (workerNodeTasksUsage.executing > 0 ||
1693 this.tasksQueueSize(workerNodeKey) > 0)
1694 ) {
c63a35a0 1695 workerInfo.stealing = false
67f3f2d6 1696 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 1697 for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
f1f77f45
JB
1698 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1699 workerNodeKey,
31847469 1700 taskFunctionProperties.name
f1f77f45
JB
1701 )
1702 }
1703 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1704 return
1705 }
1851fed0
JB
1706 if (workerInfo == null) {
1707 throw new Error(
1708 `Worker node with key '${workerNodeKey}' not found in pool`
1709 )
1710 }
c63a35a0 1711 workerInfo.stealing = true
463226a4 1712 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1713 if (
1714 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1715 stolenTask != null
1716 ) {
67f3f2d6 1717 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1718 const taskFunctionTasksWorkerUsage = this.workerNodes[
1719 workerNodeKey
67f3f2d6
JB
1720 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1721 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1722 if (
1723 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1724 (previousStolenTask != null &&
1725 previousStolenTask.name === stolenTask.name &&
1726 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1727 ) {
1728 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1729 workerNodeKey,
67f3f2d6
JB
1730 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1731 stolenTask.name!
f1f77f45
JB
1732 )
1733 } else {
1734 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1735 workerNodeKey,
67f3f2d6
JB
1736 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1737 stolenTask.name!
f1f77f45
JB
1738 )
1739 }
1740 }
463226a4
JB
1741 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1742 .then(() => {
e44639e9 1743 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1744 return undefined
1745 })
07f6f7b1 1746 .catch((error: unknown) => {
2e8cfbb7
JB
1747 this.emitter?.emit(PoolEvents.error, error)
1748 })
463226a4
JB
1749 }
1750
1751 private readonly workerNodeStealTask = (
1752 workerNodeKey: number
1753 ): Task<Data> | undefined => {
dd951876 1754 const workerNodes = this.workerNodes
a6b3272b 1755 .slice()
dd951876
JB
1756 .sort(
1757 (workerNodeA, workerNodeB) =>
1758 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1759 )
f201a0cd 1760 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1761 (sourceWorkerNode, sourceWorkerNodeKey) =>
1762 sourceWorkerNode.info.ready &&
5eb72b9e 1763 !sourceWorkerNode.info.stealing &&
463226a4
JB
1764 sourceWorkerNodeKey !== workerNodeKey &&
1765 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1766 )
1767 if (sourceWorkerNode != null) {
67f3f2d6 1768 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95d1a734 1769 const task = sourceWorkerNode.dequeueTask(1)!
42c677c1 1770 this.handleTask(workerNodeKey, task)
f1f77f45 1771 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1772 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1773 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1774 return task
72695f86
JB
1775 }
1776 }
1777
e44639e9 1778 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1779 eventDetail: WorkerNodeEventDetail
9f95d5eb 1780 ): void => {
5eb72b9e
JB
1781 if (
1782 this.cannotStealTask() ||
c63a35a0
JB
1783 (this.info.stealingWorkerNodes ?? 0) >
1784 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1785 ) {
cb71d660
JB
1786 return
1787 }
e1c2dba7 1788 const { workerId } = eventDetail
f778c355 1789 const sizeOffset = 1
67f3f2d6
JB
1790 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1791 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1792 return
1793 }
72695f86 1794 const sourceWorkerNode =
b5e75be8 1795 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1796 const workerNodes = this.workerNodes
a6b3272b 1797 .slice()
72695f86
JB
1798 .sort(
1799 (workerNodeA, workerNodeB) =>
1800 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1801 )
1802 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1803 if (
0bc68267 1804 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1805 workerNode.info.ready &&
5eb72b9e 1806 !workerNode.info.stealing &&
b5e75be8 1807 workerNode.info.id !== workerId &&
0bc68267 1808 workerNode.usage.tasks.queued <
67f3f2d6
JB
1809 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1810 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1811 ) {
c63a35a0 1812 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1813 if (workerInfo == null) {
1814 throw new Error(
1815 `Worker node with key '${workerNodeKey}' not found in pool`
1816 )
1817 }
c63a35a0 1818 workerInfo.stealing = true
67f3f2d6 1819 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95d1a734 1820 const task = sourceWorkerNode.dequeueTask(1)!
42c677c1 1821 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1822 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1823 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1824 workerInfo.stealing = false
10ecf8fd 1825 }
a2ed5053
JB
1826 }
1827 }
1828
be0676b3 1829 /**
fcd39179 1830 * This method is the message listener registered on each worker.
be0676b3 1831 */
3a776b6d
JB
1832 protected readonly workerMessageListener = (
1833 message: MessageValue<Response>
1834 ): void => {
fcd39179 1835 this.checkMessageWorkerId(message)
31847469
JB
1836 const { workerId, ready, taskId, taskFunctionsProperties } = message
1837 if (ready != null && taskFunctionsProperties != null) {
fcd39179
JB
1838 // Worker ready response received from worker
1839 this.handleWorkerReadyResponse(message)
31847469
JB
1840 } else if (taskFunctionsProperties != null) {
1841 // Task function properties message received from worker
1851fed0 1842 const workerInfo = this.getWorkerInfo(
b641345c 1843 this.getWorkerNodeKeyByWorkerId(workerId)
1851fed0
JB
1844 )
1845 if (workerInfo != null) {
31847469 1846 workerInfo.taskFunctionsProperties = taskFunctionsProperties
1851fed0 1847 }
31847469
JB
1848 } else if (taskId != null) {
1849 // Task execution response received from worker
1850 this.handleTaskExecutionResponse(message)
6b272951
JB
1851 }
1852 }
1853
8e8d9101
JB
1854 private checkAndEmitReadyEvent (): void {
1855 if (!this.readyEventEmitted && this.ready) {
1856 this.emitter?.emit(PoolEvents.ready, this.info)
1857 this.readyEventEmitted = true
1858 }
1859 }
1860
10e2aa7e 1861 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
31847469 1862 const { workerId, ready, taskFunctionsProperties } = message
e44639e9 1863 if (ready == null || !ready) {
c63a35a0 1864 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1865 }
e44639e9
JB
1866 const workerNode =
1867 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
1868 workerNode.info.ready = ready
31847469 1869 workerNode.info.taskFunctionsProperties = taskFunctionsProperties
8e8d9101 1870 this.checkAndEmitReadyEvent()
6b272951
JB
1871 }
1872
1873 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1874 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1875 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1876 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1877 if (promiseResponse != null) {
f18fd12b 1878 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1879 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1880 if (workerError != null) {
1881 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1882 asyncResource != null
1883 ? asyncResource.runInAsyncScope(
1884 reject,
1885 this.emitter,
1886 workerError.message
1887 )
1888 : reject(workerError.message)
6b272951 1889 } else {
f18fd12b
JB
1890 asyncResource != null
1891 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1892 : resolve(data as Response)
6b272951 1893 }
f18fd12b 1894 asyncResource?.emitDestroy()
501aea93 1895 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1896 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1897 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1898 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1899 workerNode?.emit('taskFinished', taskId)
16d7e943
JB
1900 if (
1901 this.opts.enableTasksQueue === true &&
1902 !this.destroying &&
1903 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1904 workerNode != null
1905 ) {
9b358e72 1906 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1907 if (
1908 this.tasksQueueSize(workerNodeKey) > 0 &&
1909 workerNodeTasksUsage.executing <
67f3f2d6
JB
1910 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1911 this.opts.tasksQueueOptions!.concurrency!
463226a4 1912 ) {
67f3f2d6
JB
1913 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1914 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1915 }
1916 if (
1917 workerNodeTasksUsage.executing === 0 &&
1918 this.tasksQueueSize(workerNodeKey) === 0 &&
1919 workerNodeTasksUsage.sequentiallyStolen === 0
1920 ) {
e44639e9 1921 workerNode.emit('idle', {
7f0e1334 1922 workerId,
e1c2dba7
JB
1923 workerNodeKey
1924 })
463226a4 1925 }
be0676b3
APA
1926 }
1927 }
be0676b3 1928 }
7c0ba920 1929
a1763c54 1930 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1931 if (this.busy) {
1932 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1933 }
1934 }
1935
1936 private checkAndEmitTaskQueuingEvents (): void {
1937 if (this.hasBackPressure()) {
1938 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1939 }
1940 }
1941
d0878034
JB
1942 /**
1943 * Emits dynamic worker creation events.
1944 */
1945 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1946
8a1260a3 1947 /**
aa9eede8 1948 * Gets the worker information given its worker node key.
8a1260a3
JB
1949 *
1950 * @param workerNodeKey - The worker node key.
3f09ed9f 1951 * @returns The worker information.
8a1260a3 1952 */
1851fed0
JB
1953 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
1954 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1955 }
1956
a05c10de 1957 /**
c3719753 1958 * Creates a worker node.
ea7a90d3 1959 *
c3719753 1960 * @returns The created worker node.
ea7a90d3 1961 */
c3719753 1962 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 1963 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
1964 this.worker,
1965 this.filePath,
1966 {
1967 env: this.opts.env,
1968 workerOptions: this.opts.workerOptions,
1969 tasksQueueBackPressureSize:
32b141fd 1970 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
1971 getDefaultTasksQueueOptions(
1972 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
95d1a734
JB
1973 ).size,
1974 tasksQueueBucketSize:
1975 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
c3719753 1976 }
671d5154 1977 )
b97d82d8 1978 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1979 if (this.starting) {
1980 workerNode.info.ready = true
1981 }
c3719753
JB
1982 return workerNode
1983 }
1984
1985 /**
1986 * Adds the given worker node in the pool worker nodes.
1987 *
1988 * @param workerNode - The worker node.
1989 * @returns The added worker node key.
1990 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
1991 */
1992 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 1993 this.workerNodes.push(workerNode)
c3719753 1994 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 1995 if (workerNodeKey === -1) {
86ed0598 1996 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1997 }
1998 return workerNodeKey
ea7a90d3 1999 }
c923ce56 2000
8e8d9101
JB
2001 private checkAndEmitEmptyEvent (): void {
2002 if (this.empty) {
2003 this.emitter?.emit(PoolEvents.empty, this.info)
2004 this.readyEventEmitted = false
2005 }
2006 }
2007
51fe3d3c 2008 /**
9974369e 2009 * Removes the worker node from the pool worker nodes.
51fe3d3c 2010 *
9974369e 2011 * @param workerNode - The worker node.
51fe3d3c 2012 */
9974369e
JB
2013 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
2014 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
2015 if (workerNodeKey !== -1) {
2016 this.workerNodes.splice(workerNodeKey, 1)
bcfb06ce 2017 this.workerChoiceStrategiesContext?.remove(workerNodeKey)
1f68cede 2018 }
8e8d9101 2019 this.checkAndEmitEmptyEvent()
51fe3d3c 2020 }
adc3c320 2021
ae3ab61d 2022 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
2023 const workerInfo = this.getWorkerInfo(workerNodeKey)
2024 if (workerInfo != null) {
2025 workerInfo.ready = false
2026 }
ae3ab61d
JB
2027 }
2028
9e844245
JB
2029 private hasBackPressure (): boolean {
2030 return (
2031 this.opts.enableTasksQueue === true &&
2032 this.workerNodes.findIndex(
041dc05b 2033 workerNode => !workerNode.hasBackPressure()
a1763c54 2034 ) === -1
9e844245 2035 )
e2b31e32
JB
2036 }
2037
b0a4db63 2038 /**
aa9eede8 2039 * Executes the given task on the worker given its worker node key.
b0a4db63 2040 *
aa9eede8 2041 * @param workerNodeKey - The worker node key.
b0a4db63
JB
2042 * @param task - The task to execute.
2043 */
2e81254d 2044 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 2045 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 2046 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 2047 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
2048 }
2049
f9f00b5f 2050 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
2051 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
2052 this.checkAndEmitTaskQueuingEvents()
2053 return tasksQueueSize
adc3c320
JB
2054 }
2055
95d1a734
JB
2056 private dequeueTask (
2057 workerNodeKey: number,
2058 bucket?: number
2059 ): Task<Data> | undefined {
2060 return this.workerNodes[workerNodeKey].dequeueTask(bucket)
adc3c320
JB
2061 }
2062
416fd65c 2063 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 2064 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
2065 }
2066
87347ea8
JB
2067 protected flushTasksQueue (workerNodeKey: number): number {
2068 let flushedTasks = 0
920278a2 2069 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
2070 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2071 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 2072 ++flushedTasks
ff733df7 2073 }
4b628b48 2074 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 2075 return flushedTasks
ff733df7
JB
2076 }
2077
ef41a6e6 2078 private flushTasksQueues (): void {
19b8be8b 2079 for (const workerNodeKey of this.workerNodes.keys()) {
ef41a6e6
JB
2080 this.flushTasksQueue(workerNodeKey)
2081 }
2082 }
c97c7edb 2083}