Merge dependabot/npm_and_yarn/examples/typescript/http-server-pool/express-hybrid...
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
ded253e2 1import { AsyncResource } from 'node:async_hooks'
2845f2a5 2import { randomUUID } from 'node:crypto'
ded253e2 3import { EventEmitterAsyncResource } from 'node:events'
62c15a68 4import { performance } from 'node:perf_hooks'
ef083f7b 5import type { TransferListItem } from 'node:worker_threads'
ded253e2 6
5c4d16da
JB
7import type {
8 MessageValue,
9 PromiseResponseWrapper,
31847469
JB
10 Task,
11 TaskFunctionProperties
d35e5717 12} from '../utility-types.js'
bbeadd16 13import {
ded253e2 14 average,
31847469 15 buildTaskFunctionProperties,
ff128cc9 16 DEFAULT_TASK_NAME,
bbeadd16 17 EMPTY_FUNCTION,
463226a4 18 exponentialDelay,
59317253 19 isKillBehavior,
0d80593b 20 isPlainObject,
90d6701c 21 max,
afe0d5bf 22 median,
90d6701c 23 min,
463226a4
JB
24 round,
25 sleep
d35e5717 26} from '../utils.js'
31847469
JB
27import type {
28 TaskFunction,
29 TaskFunctionObject
30} from '../worker/task-functions.js'
ded253e2 31import { KillBehaviors } from '../worker/worker-options.js'
c4855468 32import {
65d7a1c9 33 type IPool,
c4855468 34 PoolEvents,
6b27d407 35 type PoolInfo,
c4855468 36 type PoolOptions,
6b27d407
JB
37 type PoolType,
38 PoolTypes,
4b628b48 39 type TasksQueueOptions
d35e5717 40} from './pool.js'
a35560ba 41import {
f0d7f803 42 Measurements,
a35560ba 43 WorkerChoiceStrategies,
a20f0ba5
JB
44 type WorkerChoiceStrategy,
45 type WorkerChoiceStrategyOptions
d35e5717 46} from './selection-strategies/selection-strategies-types.js'
bcfb06ce 47import { WorkerChoiceStrategiesContext } from './selection-strategies/worker-choice-strategies-context.js'
bde6b5d7
JB
48import {
49 checkFilePath,
95d1a734 50 checkValidPriority,
bde6b5d7 51 checkValidTasksQueueOptions,
bfc75cca 52 checkValidWorkerChoiceStrategy,
32b141fd 53 getDefaultTasksQueueOptions,
c329fd41
JB
54 updateEluWorkerUsage,
55 updateRunTimeWorkerUsage,
56 updateTaskStatisticsWorkerUsage,
57 updateWaitTimeWorkerUsage,
87347ea8 58 waitWorkerNodeEvents
d35e5717 59} from './utils.js'
ded253e2
JB
60import { version } from './version.js'
61import type {
62 IWorker,
63 IWorkerNode,
64 WorkerInfo,
65 WorkerNodeEventDetail,
66 WorkerType
67} from './worker.js'
68import { WorkerNode } from './worker-node.js'
23ccf9d7 69
729c563d 70/**
ea7a90d3 71 * Base class that implements some shared logic for all poolifier pools.
729c563d 72 *
38e795c1 73 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
74 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
75 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 76 */
c97c7edb 77export abstract class AbstractPool<
f06e48d8 78 Worker extends IWorker,
d3c8a1a8
S
79 Data = unknown,
80 Response = unknown
c4855468 81> implements IPool<Worker, Data, Response> {
afc003b2 82 /** @inheritDoc */
4b628b48 83 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 84
afc003b2 85 /** @inheritDoc */
f80125ca 86 public emitter?: EventEmitterAsyncResource
7c0ba920 87
be0676b3 88 /**
419e8121 89 * The task execution response promise map:
2740a743 90 * - `key`: The message id of each submitted task.
bcfb06ce 91 * - `value`: An object that contains task's worker node key, execution response promise resolve and reject callbacks, async resource.
be0676b3 92 *
a3445496 93 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 94 */
501aea93
JB
95 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
96 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 97
a35560ba 98 /**
bcfb06ce 99 * Worker choice strategies context referencing worker choice algorithms implementation.
a35560ba 100 */
bcfb06ce 101 protected workerChoiceStrategiesContext?: WorkerChoiceStrategiesContext<
78cea37e
JB
102 Worker,
103 Data,
104 Response
a35560ba
S
105 >
106
72ae84a2
JB
107 /**
108 * The task functions added at runtime map:
109 * - `key`: The task function name.
31847469 110 * - `value`: The task function object.
72ae84a2 111 */
31847469
JB
112 private readonly taskFunctions: Map<
113 string,
114 TaskFunctionObject<Data, Response>
115 >
72ae84a2 116
15b176e0
JB
117 /**
118 * Whether the pool is started or not.
119 */
120 private started: boolean
47352846
JB
121 /**
122 * Whether the pool is starting or not.
123 */
124 private starting: boolean
711623b8
JB
125 /**
126 * Whether the pool is destroying or not.
127 */
128 private destroying: boolean
b362a929
JB
129 /**
130 * Whether the minimum number of workers is starting or not.
131 */
132 private startingMinimumNumberOfWorkers: boolean
55082af9
JB
133 /**
134 * Whether the pool ready event has been emitted or not.
135 */
136 private readyEventEmitted: boolean
afe0d5bf
JB
137 /**
138 * The start timestamp of the pool.
139 */
97dc65d9 140 private startTimestamp?: number
afe0d5bf 141
729c563d
S
142 /**
143 * Constructs a new poolifier pool.
144 *
00e1bdeb 145 * @param minimumNumberOfWorkers - Minimum number of workers that this pool manages.
029715f0 146 * @param filePath - Path to the worker file.
38e795c1 147 * @param opts - Options for the pool.
00e1bdeb 148 * @param maximumNumberOfWorkers - Maximum number of workers that this pool manages.
729c563d 149 */
c97c7edb 150 public constructor (
26ce26ca 151 protected readonly minimumNumberOfWorkers: number,
b4213b7f 152 protected readonly filePath: string,
26ce26ca
JB
153 protected readonly opts: PoolOptions<Worker>,
154 protected readonly maximumNumberOfWorkers?: number
c97c7edb 155 ) {
78cea37e 156 if (!this.isMain()) {
04f45163 157 throw new Error(
8c6d4acf 158 'Cannot start a pool from a worker with the same type as the pool'
04f45163 159 )
c97c7edb 160 }
26ce26ca 161 this.checkPoolType()
bde6b5d7 162 checkFilePath(this.filePath)
26ce26ca 163 this.checkMinimumNumberOfWorkers(this.minimumNumberOfWorkers)
7c0ba920 164 this.checkPoolOptions(this.opts)
1086026a 165
7254e419
JB
166 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
167 this.executeTask = this.executeTask.bind(this)
168 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 169
6bd72cd0 170 if (this.opts.enableEvents === true) {
e0843544 171 this.initEventEmitter()
7c0ba920 172 }
bcfb06ce 173 this.workerChoiceStrategiesContext = new WorkerChoiceStrategiesContext<
d59df138
JB
174 Worker,
175 Data,
176 Response
da309861
JB
177 >(
178 this,
bcfb06ce
JB
179 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
180 [this.opts.workerChoiceStrategy!],
da309861
JB
181 this.opts.workerChoiceStrategyOptions
182 )
b6b32453
JB
183
184 this.setupHook()
185
31847469 186 this.taskFunctions = new Map<string, TaskFunctionObject<Data, Response>>()
72ae84a2 187
47352846 188 this.started = false
075e51d1 189 this.starting = false
711623b8 190 this.destroying = false
55082af9 191 this.readyEventEmitted = false
b362a929 192 this.startingMinimumNumberOfWorkers = false
47352846
JB
193 if (this.opts.startWorkers === true) {
194 this.start()
195 }
c97c7edb
S
196 }
197
26ce26ca
JB
198 private checkPoolType (): void {
199 if (this.type === PoolTypes.fixed && this.maximumNumberOfWorkers != null) {
200 throw new Error(
201 'Cannot instantiate a fixed pool with a maximum number of workers specified at initialization'
202 )
203 }
204 }
205
c63a35a0
JB
206 private checkMinimumNumberOfWorkers (
207 minimumNumberOfWorkers: number | undefined
208 ): void {
af7f2788 209 if (minimumNumberOfWorkers == null) {
8d3782fa
JB
210 throw new Error(
211 'Cannot instantiate a pool without specifying the number of workers'
212 )
af7f2788 213 } else if (!Number.isSafeInteger(minimumNumberOfWorkers)) {
473c717a 214 throw new TypeError(
0d80593b 215 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa 216 )
af7f2788 217 } else if (minimumNumberOfWorkers < 0) {
473c717a 218 throw new RangeError(
8d3782fa
JB
219 'Cannot instantiate a pool with a negative number of workers'
220 )
af7f2788 221 } else if (this.type === PoolTypes.fixed && minimumNumberOfWorkers === 0) {
2431bdb4
JB
222 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
223 }
224 }
225
7c0ba920 226 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 227 if (isPlainObject(opts)) {
47352846 228 this.opts.startWorkers = opts.startWorkers ?? true
c63a35a0 229 checkValidWorkerChoiceStrategy(opts.workerChoiceStrategy)
0d80593b
JB
230 this.opts.workerChoiceStrategy =
231 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9 232 this.checkValidWorkerChoiceStrategyOptions(
c63a35a0 233 opts.workerChoiceStrategyOptions
2324f8c9 234 )
26ce26ca
JB
235 if (opts.workerChoiceStrategyOptions != null) {
236 this.opts.workerChoiceStrategyOptions = opts.workerChoiceStrategyOptions
8990357d 237 }
1f68cede 238 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
239 this.opts.enableEvents = opts.enableEvents ?? true
240 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
241 if (this.opts.enableTasksQueue) {
c63a35a0 242 checkValidTasksQueueOptions(opts.tasksQueueOptions)
0d80593b 243 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
c63a35a0 244 opts.tasksQueueOptions
0d80593b
JB
245 )
246 }
247 } else {
248 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 249 }
aee46736
JB
250 }
251
0d80593b 252 private checkValidWorkerChoiceStrategyOptions (
c63a35a0 253 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
0d80593b 254 ): void {
2324f8c9
JB
255 if (
256 workerChoiceStrategyOptions != null &&
257 !isPlainObject(workerChoiceStrategyOptions)
258 ) {
0d80593b
JB
259 throw new TypeError(
260 'Invalid worker choice strategy options: must be a plain object'
261 )
262 }
49be33fe 263 if (
2324f8c9 264 workerChoiceStrategyOptions?.weights != null &&
26ce26ca
JB
265 Object.keys(workerChoiceStrategyOptions.weights).length !==
266 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
49be33fe
JB
267 ) {
268 throw new Error(
269 'Invalid worker choice strategy options: must have a weight for each worker node'
270 )
271 }
f0d7f803 272 if (
2324f8c9 273 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
274 !Object.values(Measurements).includes(
275 workerChoiceStrategyOptions.measurement
276 )
277 ) {
278 throw new Error(
279 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
280 )
281 }
0d80593b
JB
282 }
283
e0843544 284 private initEventEmitter (): void {
5b7d00b4 285 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
286 name: `poolifier:${this.type}-${this.worker}-pool`
287 })
288 }
289
08f3f44c 290 /** @inheritDoc */
6b27d407
JB
291 public get info (): PoolInfo {
292 return {
23ccf9d7 293 version,
6b27d407 294 type: this.type,
184855e6 295 worker: this.worker,
47352846 296 started: this.started,
2431bdb4 297 ready: this.ready,
67f3f2d6 298 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
bcfb06ce
JB
299 defaultStrategy: this.opts.workerChoiceStrategy!,
300 strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
26ce26ca
JB
301 minSize: this.minimumNumberOfWorkers,
302 maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
bcfb06ce 303 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 304 .runTime.aggregate === true &&
bcfb06ce 305 this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
c63a35a0
JB
306 .waitTime.aggregate && {
307 utilization: round(this.utilization)
308 }),
6b27d407
JB
309 workerNodes: this.workerNodes.length,
310 idleWorkerNodes: this.workerNodes.reduce(
311 (accumulator, workerNode) =>
f59e1027 312 workerNode.usage.tasks.executing === 0
a4e07f72
JB
313 ? accumulator + 1
314 : accumulator,
6b27d407
JB
315 0
316 ),
5eb72b9e
JB
317 ...(this.opts.enableTasksQueue === true && {
318 stealingWorkerNodes: this.workerNodes.reduce(
319 (accumulator, workerNode) =>
320 workerNode.info.stealing ? accumulator + 1 : accumulator,
321 0
322 )
323 }),
6b27d407 324 busyWorkerNodes: this.workerNodes.reduce(
42c677c1
JB
325 (accumulator, _workerNode, workerNodeKey) =>
326 this.isWorkerNodeBusy(workerNodeKey) ? accumulator + 1 : accumulator,
6b27d407
JB
327 0
328 ),
a4e07f72 329 executedTasks: this.workerNodes.reduce(
6b27d407 330 (accumulator, workerNode) =>
f59e1027 331 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
332 0
333 ),
334 executingTasks: this.workerNodes.reduce(
335 (accumulator, workerNode) =>
f59e1027 336 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
337 0
338 ),
daf86646
JB
339 ...(this.opts.enableTasksQueue === true && {
340 queuedTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.queued,
343 0
344 )
345 }),
346 ...(this.opts.enableTasksQueue === true && {
347 maxQueuedTasks: this.workerNodes.reduce(
348 (accumulator, workerNode) =>
c63a35a0 349 accumulator + (workerNode.usage.tasks.maxQueued ?? 0),
daf86646
JB
350 0
351 )
352 }),
a1763c54
JB
353 ...(this.opts.enableTasksQueue === true && {
354 backPressure: this.hasBackPressure()
355 }),
68cbdc84
JB
356 ...(this.opts.enableTasksQueue === true && {
357 stolenTasks: this.workerNodes.reduce(
358 (accumulator, workerNode) =>
359 accumulator + workerNode.usage.tasks.stolen,
360 0
361 )
362 }),
a4e07f72
JB
363 failedTasks: this.workerNodes.reduce(
364 (accumulator, workerNode) =>
f59e1027 365 accumulator + workerNode.usage.tasks.failed,
a4e07f72 366 0
1dcf8b7b 367 ),
bcfb06ce 368 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 369 .runTime.aggregate === true && {
1dcf8b7b 370 runTime: {
98e72cda 371 minimum: round(
90d6701c 372 min(
98e72cda 373 ...this.workerNodes.map(
c63a35a0 374 workerNode => workerNode.usage.runTime.minimum ?? Infinity
98e72cda 375 )
1dcf8b7b
JB
376 )
377 ),
98e72cda 378 maximum: round(
90d6701c 379 max(
98e72cda 380 ...this.workerNodes.map(
c63a35a0 381 workerNode => workerNode.usage.runTime.maximum ?? -Infinity
98e72cda 382 )
1dcf8b7b 383 )
98e72cda 384 ),
bcfb06ce 385 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
3baa0837
JB
386 .runTime.average && {
387 average: round(
388 average(
389 this.workerNodes.reduce<number[]>(
390 (accumulator, workerNode) =>
391 accumulator.concat(workerNode.usage.runTime.history),
392 []
393 )
98e72cda 394 )
dc021bcc 395 )
3baa0837 396 }),
bcfb06ce 397 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
98e72cda
JB
398 .runTime.median && {
399 median: round(
400 median(
3baa0837
JB
401 this.workerNodes.reduce<number[]>(
402 (accumulator, workerNode) =>
403 accumulator.concat(workerNode.usage.runTime.history),
404 []
98e72cda
JB
405 )
406 )
407 )
408 })
1dcf8b7b
JB
409 }
410 }),
bcfb06ce 411 ...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 412 .waitTime.aggregate === true && {
1dcf8b7b 413 waitTime: {
98e72cda 414 minimum: round(
90d6701c 415 min(
98e72cda 416 ...this.workerNodes.map(
c63a35a0 417 workerNode => workerNode.usage.waitTime.minimum ?? Infinity
98e72cda 418 )
1dcf8b7b
JB
419 )
420 ),
98e72cda 421 maximum: round(
90d6701c 422 max(
98e72cda 423 ...this.workerNodes.map(
c63a35a0 424 workerNode => workerNode.usage.waitTime.maximum ?? -Infinity
98e72cda 425 )
1dcf8b7b 426 )
98e72cda 427 ),
bcfb06ce 428 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
3baa0837
JB
429 .waitTime.average && {
430 average: round(
431 average(
432 this.workerNodes.reduce<number[]>(
433 (accumulator, workerNode) =>
434 accumulator.concat(workerNode.usage.waitTime.history),
435 []
436 )
98e72cda 437 )
dc021bcc 438 )
3baa0837 439 }),
bcfb06ce 440 ...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
98e72cda
JB
441 .waitTime.median && {
442 median: round(
443 median(
3baa0837
JB
444 this.workerNodes.reduce<number[]>(
445 (accumulator, workerNode) =>
446 accumulator.concat(workerNode.usage.waitTime.history),
447 []
98e72cda
JB
448 )
449 )
450 )
451 })
1dcf8b7b
JB
452 }
453 })
6b27d407
JB
454 }
455 }
08f3f44c 456
aa9eede8
JB
457 /**
458 * The pool readiness boolean status.
459 */
2431bdb4 460 private get ready (): boolean {
8e8d9101
JB
461 if (this.empty) {
462 return false
463 }
2431bdb4 464 return (
b97d82d8
JB
465 this.workerNodes.reduce(
466 (accumulator, workerNode) =>
467 !workerNode.info.dynamic && workerNode.info.ready
468 ? accumulator + 1
469 : accumulator,
470 0
26ce26ca 471 ) >= this.minimumNumberOfWorkers
2431bdb4
JB
472 )
473 }
474
8e8d9101
JB
475 /**
476 * The pool emptiness boolean status.
477 */
478 protected get empty (): boolean {
fb43035c 479 return this.minimumNumberOfWorkers === 0 && this.workerNodes.length === 0
8e8d9101
JB
480 }
481
afe0d5bf 482 /**
aa9eede8 483 * The approximate pool utilization.
afe0d5bf
JB
484 *
485 * @returns The pool utilization.
486 */
487 private get utilization (): number {
97dc65d9
JB
488 if (this.startTimestamp == null) {
489 return 0
490 }
8e5ca040 491 const poolTimeCapacity =
26ce26ca
JB
492 (performance.now() - this.startTimestamp) *
493 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
afe0d5bf
JB
494 const totalTasksRunTime = this.workerNodes.reduce(
495 (accumulator, workerNode) =>
c63a35a0 496 accumulator + (workerNode.usage.runTime.aggregate ?? 0),
afe0d5bf
JB
497 0
498 )
499 const totalTasksWaitTime = this.workerNodes.reduce(
500 (accumulator, workerNode) =>
c63a35a0 501 accumulator + (workerNode.usage.waitTime.aggregate ?? 0),
afe0d5bf
JB
502 0
503 )
8e5ca040 504 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
505 }
506
8881ae32 507 /**
aa9eede8 508 * The pool type.
8881ae32
JB
509 *
510 * If it is `'dynamic'`, it provides the `max` property.
511 */
512 protected abstract get type (): PoolType
513
184855e6 514 /**
aa9eede8 515 * The worker type.
184855e6
JB
516 */
517 protected abstract get worker (): WorkerType
518
6b813701
JB
519 /**
520 * Checks if the worker id sent in the received message from a worker is valid.
521 *
522 * @param message - The received message.
523 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
524 */
4d159167 525 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
526 if (message.workerId == null) {
527 throw new Error('Worker message received without worker id')
8e5a7cf9 528 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
529 throw new Error(
530 `Worker message received from unknown worker '${message.workerId}'`
531 )
532 }
533 }
534
aa9eede8
JB
535 /**
536 * Gets the worker node key given its worker id.
537 *
538 * @param workerId - The worker id.
aad6fb64 539 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 540 */
72ae84a2 541 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 542 return this.workerNodes.findIndex(
041dc05b 543 workerNode => workerNode.info.id === workerId
aad6fb64 544 )
aa9eede8
JB
545 }
546
afc003b2 547 /** @inheritDoc */
a35560ba 548 public setWorkerChoiceStrategy (
59219cbb
JB
549 workerChoiceStrategy: WorkerChoiceStrategy,
550 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 551 ): void {
19b8be8b 552 let requireSync = false
bde6b5d7 553 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
85bbc7ab 554 if (workerChoiceStrategyOptions != null) {
423f3b7a 555 requireSync = !this.setWorkerChoiceStrategyOptions(
19b8be8b
JB
556 workerChoiceStrategyOptions
557 )
85bbc7ab
JB
558 }
559 if (workerChoiceStrategy !== this.opts.workerChoiceStrategy) {
560 this.opts.workerChoiceStrategy = workerChoiceStrategy
561 this.workerChoiceStrategiesContext?.setDefaultWorkerChoiceStrategy(
562 this.opts.workerChoiceStrategy,
563 this.opts.workerChoiceStrategyOptions
564 )
19b8be8b
JB
565 requireSync = true
566 }
567 if (requireSync) {
85bbc7ab
JB
568 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
569 this.getWorkerWorkerChoiceStrategies(),
570 this.opts.workerChoiceStrategyOptions
571 )
19b8be8b 572 for (const workerNodeKey of this.workerNodes.keys()) {
85bbc7ab
JB
573 this.sendStatisticsMessageToWorker(workerNodeKey)
574 }
59219cbb 575 }
a20f0ba5
JB
576 }
577
578 /** @inheritDoc */
579 public setWorkerChoiceStrategyOptions (
c63a35a0 580 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions | undefined
19b8be8b 581 ): boolean {
0d80593b 582 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
26ce26ca
JB
583 if (workerChoiceStrategyOptions != null) {
584 this.opts.workerChoiceStrategyOptions = workerChoiceStrategyOptions
19b8be8b
JB
585 this.workerChoiceStrategiesContext?.setOptions(
586 this.opts.workerChoiceStrategyOptions
587 )
588 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
589 this.getWorkerWorkerChoiceStrategies(),
590 this.opts.workerChoiceStrategyOptions
591 )
592 for (const workerNodeKey of this.workerNodes.keys()) {
593 this.sendStatisticsMessageToWorker(workerNodeKey)
594 }
595 return true
8990357d 596 }
19b8be8b 597 return false
a35560ba
S
598 }
599
a20f0ba5 600 /** @inheritDoc */
8f52842f
JB
601 public enableTasksQueue (
602 enable: boolean,
603 tasksQueueOptions?: TasksQueueOptions
604 ): void {
a20f0ba5 605 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
606 this.unsetTaskStealing()
607 this.unsetTasksStealingOnBackPressure()
ef41a6e6 608 this.flushTasksQueues()
a20f0ba5
JB
609 }
610 this.opts.enableTasksQueue = enable
c63a35a0 611 this.setTasksQueueOptions(tasksQueueOptions)
a20f0ba5
JB
612 }
613
614 /** @inheritDoc */
c63a35a0
JB
615 public setTasksQueueOptions (
616 tasksQueueOptions: TasksQueueOptions | undefined
617 ): void {
a20f0ba5 618 if (this.opts.enableTasksQueue === true) {
bde6b5d7 619 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
620 this.opts.tasksQueueOptions =
621 this.buildTasksQueueOptions(tasksQueueOptions)
67f3f2d6
JB
622 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
623 this.setTasksQueueSize(this.opts.tasksQueueOptions.size!)
d6ca1416 624 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 625 this.unsetTaskStealing()
d6ca1416
JB
626 this.setTaskStealing()
627 } else {
628 this.unsetTaskStealing()
629 }
630 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 631 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
632 this.setTasksStealingOnBackPressure()
633 } else {
634 this.unsetTasksStealingOnBackPressure()
635 }
5baee0d7 636 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
637 delete this.opts.tasksQueueOptions
638 }
639 }
640
9b38ab2d 641 private buildTasksQueueOptions (
c63a35a0 642 tasksQueueOptions: TasksQueueOptions | undefined
9b38ab2d
JB
643 ): TasksQueueOptions {
644 return {
26ce26ca
JB
645 ...getDefaultTasksQueueOptions(
646 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
647 ),
9b38ab2d
JB
648 ...tasksQueueOptions
649 }
650 }
651
5b49e864 652 private setTasksQueueSize (size: number): void {
20c6f652 653 for (const workerNode of this.workerNodes) {
ff3f866a 654 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
655 }
656 }
657
d6ca1416 658 private setTaskStealing (): void {
19b8be8b 659 for (const workerNodeKey of this.workerNodes.keys()) {
e44639e9 660 this.workerNodes[workerNodeKey].on('idle', this.handleWorkerNodeIdleEvent)
d6ca1416
JB
661 }
662 }
663
664 private unsetTaskStealing (): void {
19b8be8b 665 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 666 this.workerNodes[workerNodeKey].off(
e44639e9
JB
667 'idle',
668 this.handleWorkerNodeIdleEvent
9f95d5eb 669 )
d6ca1416
JB
670 }
671 }
672
673 private setTasksStealingOnBackPressure (): void {
19b8be8b 674 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 675 this.workerNodes[workerNodeKey].on(
b5e75be8 676 'backPressure',
e44639e9 677 this.handleWorkerNodeBackPressureEvent
9f95d5eb 678 )
d6ca1416
JB
679 }
680 }
681
682 private unsetTasksStealingOnBackPressure (): void {
19b8be8b 683 for (const workerNodeKey of this.workerNodes.keys()) {
e1c2dba7 684 this.workerNodes[workerNodeKey].off(
b5e75be8 685 'backPressure',
e44639e9 686 this.handleWorkerNodeBackPressureEvent
9f95d5eb 687 )
d6ca1416
JB
688 }
689 }
690
c319c66b
JB
691 /**
692 * Whether the pool is full or not.
693 *
694 * The pool filling boolean status.
695 */
dea903a8 696 protected get full (): boolean {
26ce26ca
JB
697 return (
698 this.workerNodes.length >=
699 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
700 )
dea903a8 701 }
c2ade475 702
c319c66b
JB
703 /**
704 * Whether the pool is busy or not.
705 *
706 * The pool busyness boolean status.
707 */
708 protected abstract get busy (): boolean
7c0ba920 709
6c6afb84 710 /**
3d76750a 711 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
712 *
713 * @returns Worker nodes busyness boolean status.
714 */
c2ade475 715 protected internalBusy (): boolean {
3d76750a
JB
716 if (this.opts.enableTasksQueue === true) {
717 return (
718 this.workerNodes.findIndex(
041dc05b 719 workerNode =>
3d76750a
JB
720 workerNode.info.ready &&
721 workerNode.usage.tasks.executing <
67f3f2d6
JB
722 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
723 this.opts.tasksQueueOptions!.concurrency!
3d76750a
JB
724 ) === -1
725 )
3d76750a 726 }
419e8121
JB
727 return (
728 this.workerNodes.findIndex(
729 workerNode =>
730 workerNode.info.ready && workerNode.usage.tasks.executing === 0
731 ) === -1
732 )
cb70b19d
JB
733 }
734
42c677c1
JB
735 private isWorkerNodeBusy (workerNodeKey: number): boolean {
736 if (this.opts.enableTasksQueue === true) {
737 return (
738 this.workerNodes[workerNodeKey].usage.tasks.executing >=
67f3f2d6
JB
739 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
740 this.opts.tasksQueueOptions!.concurrency!
42c677c1
JB
741 )
742 }
743 return this.workerNodes[workerNodeKey].usage.tasks.executing > 0
744 }
745
e81c38f2 746 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
747 workerNodeKey: number,
748 message: MessageValue<Data>
749 ): Promise<boolean> {
72ae84a2 750 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
751 const taskFunctionOperationListener = (
752 message: MessageValue<Response>
753 ): void => {
754 this.checkMessageWorkerId(message)
1851fed0 755 const workerId = this.getWorkerInfo(workerNodeKey)?.id
72ae84a2 756 if (
ae036c3e
JB
757 message.taskFunctionOperationStatus != null &&
758 message.workerId === workerId
72ae84a2 759 ) {
ae036c3e
JB
760 if (message.taskFunctionOperationStatus) {
761 resolve(true)
c63a35a0 762 } else {
ae036c3e
JB
763 reject(
764 new Error(
c63a35a0 765 `Task function operation '${message.taskFunctionOperation}' failed on worker ${message.workerId} with error: '${message.workerError?.message}'`
ae036c3e 766 )
72ae84a2 767 )
ae036c3e
JB
768 }
769 this.deregisterWorkerMessageListener(
770 this.getWorkerNodeKeyByWorkerId(message.workerId),
771 taskFunctionOperationListener
72ae84a2
JB
772 )
773 }
ae036c3e
JB
774 }
775 this.registerWorkerMessageListener(
776 workerNodeKey,
777 taskFunctionOperationListener
778 )
72ae84a2
JB
779 this.sendToWorker(workerNodeKey, message)
780 })
781 }
782
783 private async sendTaskFunctionOperationToWorkers (
adee6053 784 message: MessageValue<Data>
e81c38f2
JB
785 ): Promise<boolean> {
786 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
787 const responsesReceived = new Array<MessageValue<Response>>()
788 const taskFunctionOperationsListener = (
789 message: MessageValue<Response>
790 ): void => {
791 this.checkMessageWorkerId(message)
792 if (message.taskFunctionOperationStatus != null) {
793 responsesReceived.push(message)
794 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 795 if (
e81c38f2
JB
796 responsesReceived.every(
797 message => message.taskFunctionOperationStatus === true
798 )
799 ) {
800 resolve(true)
801 } else if (
e81c38f2
JB
802 responsesReceived.some(
803 message => message.taskFunctionOperationStatus === false
804 )
805 ) {
b0b55f57
JB
806 const errorResponse = responsesReceived.find(
807 response => response.taskFunctionOperationStatus === false
808 )
e81c38f2
JB
809 reject(
810 new Error(
b0b55f57 811 `Task function operation '${
e81c38f2 812 message.taskFunctionOperation as string
c63a35a0
JB
813 }' failed on worker ${errorResponse?.workerId} with error: '${
814 errorResponse?.workerError?.message
b0b55f57 815 }'`
e81c38f2
JB
816 )
817 )
818 }
ae036c3e
JB
819 this.deregisterWorkerMessageListener(
820 this.getWorkerNodeKeyByWorkerId(message.workerId),
821 taskFunctionOperationsListener
822 )
e81c38f2 823 }
ae036c3e
JB
824 }
825 }
19b8be8b 826 for (const workerNodeKey of this.workerNodes.keys()) {
ae036c3e
JB
827 this.registerWorkerMessageListener(
828 workerNodeKey,
829 taskFunctionOperationsListener
830 )
72ae84a2 831 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
832 }
833 })
6703b9f4
JB
834 }
835
836 /** @inheritDoc */
837 public hasTaskFunction (name: string): boolean {
85bbc7ab
JB
838 return this.listTaskFunctionsProperties().some(
839 taskFunctionProperties => taskFunctionProperties.name === name
840 )
6703b9f4
JB
841 }
842
843 /** @inheritDoc */
e81c38f2
JB
844 public async addTaskFunction (
845 name: string,
31847469 846 fn: TaskFunction<Data, Response> | TaskFunctionObject<Data, Response>
e81c38f2 847 ): Promise<boolean> {
3feeab69
JB
848 if (typeof name !== 'string') {
849 throw new TypeError('name argument must be a string')
850 }
851 if (typeof name === 'string' && name.trim().length === 0) {
852 throw new TypeError('name argument must not be an empty string')
853 }
31847469
JB
854 if (typeof fn === 'function') {
855 fn = { taskFunction: fn } satisfies TaskFunctionObject<Data, Response>
856 }
857 if (typeof fn.taskFunction !== 'function') {
858 throw new TypeError('taskFunction property must be a function')
3feeab69 859 }
95d1a734
JB
860 checkValidPriority(fn.priority)
861 checkValidWorkerChoiceStrategy(fn.strategy)
adee6053 862 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4 863 taskFunctionOperation: 'add',
31847469
JB
864 taskFunctionProperties: buildTaskFunctionProperties(name, fn),
865 taskFunction: fn.taskFunction.toString()
6703b9f4 866 })
adee6053 867 this.taskFunctions.set(name, fn)
85bbc7ab
JB
868 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
869 this.getWorkerWorkerChoiceStrategies()
870 )
9a55fa8c
JB
871 for (const workerNodeKey of this.workerNodes.keys()) {
872 this.sendStatisticsMessageToWorker(workerNodeKey)
873 }
adee6053 874 return opResult
6703b9f4
JB
875 }
876
877 /** @inheritDoc */
e81c38f2 878 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
879 if (!this.taskFunctions.has(name)) {
880 throw new Error(
16248b23 881 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
882 )
883 }
adee6053 884 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4 885 taskFunctionOperation: 'remove',
31847469
JB
886 taskFunctionProperties: buildTaskFunctionProperties(
887 name,
888 this.taskFunctions.get(name)
889 )
6703b9f4 890 })
9a55fa8c
JB
891 for (const workerNode of this.workerNodes) {
892 workerNode.deleteTaskFunctionWorkerUsage(name)
893 }
adee6053 894 this.taskFunctions.delete(name)
85bbc7ab
JB
895 this.workerChoiceStrategiesContext?.syncWorkerChoiceStrategies(
896 this.getWorkerWorkerChoiceStrategies()
897 )
9a55fa8c
JB
898 for (const workerNodeKey of this.workerNodes.keys()) {
899 this.sendStatisticsMessageToWorker(workerNodeKey)
900 }
adee6053 901 return opResult
6703b9f4
JB
902 }
903
90d7d101 904 /** @inheritDoc */
31847469 905 public listTaskFunctionsProperties (): TaskFunctionProperties[] {
f2dbbf95
JB
906 for (const workerNode of this.workerNodes) {
907 if (
31847469
JB
908 Array.isArray(workerNode.info.taskFunctionsProperties) &&
909 workerNode.info.taskFunctionsProperties.length > 0
f2dbbf95 910 ) {
31847469 911 return workerNode.info.taskFunctionsProperties
f2dbbf95 912 }
90d7d101 913 }
f2dbbf95 914 return []
90d7d101
JB
915 }
916
bcfb06ce
JB
917 /**
918 * Gets task function strategy, if any.
919 *
920 * @param name - The task function name.
921 * @returns The task function worker choice strategy if the task function worker choice strategy is defined, `undefined` otherwise.
922 */
923 private readonly getTaskFunctionWorkerWorkerChoiceStrategy = (
924 name?: string
925 ): WorkerChoiceStrategy | undefined => {
926 if (name != null) {
927 return this.listTaskFunctionsProperties().find(
928 (taskFunctionProperties: TaskFunctionProperties) =>
929 taskFunctionProperties.name === name
930 )?.strategy
931 }
932 }
933
95d1a734
JB
934 /**
935 * Gets worker node task function priority, if any.
936 *
937 * @param workerNodeKey - The worker node key.
938 * @param name - The task function name.
939 * @returns The task function worker choice priority if the task function worker choice priority is defined, `undefined` otherwise.
940 */
941 private readonly getWorkerNodeTaskFunctionPriority = (
942 workerNodeKey: number,
943 name?: string
944 ): number | undefined => {
945 if (name != null) {
946 return this.getWorkerInfo(workerNodeKey)?.taskFunctionsProperties?.find(
947 (taskFunctionProperties: TaskFunctionProperties) =>
948 taskFunctionProperties.name === name
949 )?.priority
950 }
951 }
952
85bbc7ab
JB
953 /**
954 * Gets the worker choice strategies registered in this pool.
955 *
956 * @returns The worker choice strategies.
957 */
958 private readonly getWorkerWorkerChoiceStrategies =
959 (): Set<WorkerChoiceStrategy> => {
960 return new Set([
961 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
962 this.opts.workerChoiceStrategy!,
963 ...(this.listTaskFunctionsProperties()
964 .map(
965 (taskFunctionProperties: TaskFunctionProperties) =>
966 taskFunctionProperties.strategy
967 )
968 .filter(
969 (strategy: WorkerChoiceStrategy | undefined) => strategy != null
970 ) as WorkerChoiceStrategy[])
971 ])
972 }
973
6703b9f4 974 /** @inheritDoc */
e81c38f2 975 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 976 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4 977 taskFunctionOperation: 'default',
31847469
JB
978 taskFunctionProperties: buildTaskFunctionProperties(
979 name,
980 this.taskFunctions.get(name)
981 )
6703b9f4 982 })
6703b9f4
JB
983 }
984
375f7504
JB
985 private shallExecuteTask (workerNodeKey: number): boolean {
986 return (
987 this.tasksQueueSize(workerNodeKey) === 0 &&
988 this.workerNodes[workerNodeKey].usage.tasks.executing <
67f3f2d6
JB
989 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
990 this.opts.tasksQueueOptions!.concurrency!
375f7504
JB
991 )
992 }
993
afc003b2 994 /** @inheritDoc */
7d91a8cd
JB
995 public async execute (
996 data?: Data,
997 name?: string,
6a3ecc50 998 transferList?: readonly TransferListItem[]
7d91a8cd 999 ): Promise<Response> {
52b71763 1000 return await new Promise<Response>((resolve, reject) => {
15b176e0 1001 if (!this.started) {
47352846 1002 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 1003 return
15b176e0 1004 }
711623b8
JB
1005 if (this.destroying) {
1006 reject(new Error('Cannot execute a task on destroying pool'))
1007 return
1008 }
7d91a8cd
JB
1009 if (name != null && typeof name !== 'string') {
1010 reject(new TypeError('name argument must be a string'))
9d2d0da1 1011 return
7d91a8cd 1012 }
90d7d101
JB
1013 if (
1014 name != null &&
1015 typeof name === 'string' &&
1016 name.trim().length === 0
1017 ) {
f58b60b9 1018 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 1019 return
90d7d101 1020 }
b558f6b5
JB
1021 if (transferList != null && !Array.isArray(transferList)) {
1022 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 1023 return
b558f6b5
JB
1024 }
1025 const timestamp = performance.now()
95d1a734 1026 const taskFunctionStrategy =
bcfb06ce 1027 this.getTaskFunctionWorkerWorkerChoiceStrategy(name)
95d1a734 1028 const workerNodeKey = this.chooseWorkerNode(taskFunctionStrategy)
501aea93 1029 const task: Task<Data> = {
52b71763
JB
1030 name: name ?? DEFAULT_TASK_NAME,
1031 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
1032 data: data ?? ({} as Data),
95d1a734
JB
1033 priority: this.getWorkerNodeTaskFunctionPriority(workerNodeKey, name),
1034 strategy: taskFunctionStrategy,
7d91a8cd 1035 transferList,
52b71763 1036 timestamp,
7629bdf1 1037 taskId: randomUUID()
52b71763 1038 }
67f3f2d6
JB
1039 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1040 this.promiseResponseMap.set(task.taskId!, {
2e81254d
JB
1041 resolve,
1042 reject,
f18fd12b
JB
1043 workerNodeKey,
1044 ...(this.emitter != null && {
1045 asyncResource: new AsyncResource('poolifier:task', {
1046 triggerAsyncId: this.emitter.asyncId,
1047 requireManualDestroy: true
1048 })
1049 })
2e81254d 1050 })
52b71763 1051 if (
4e377863
JB
1052 this.opts.enableTasksQueue === false ||
1053 (this.opts.enableTasksQueue === true &&
375f7504 1054 this.shallExecuteTask(workerNodeKey))
52b71763 1055 ) {
501aea93 1056 this.executeTask(workerNodeKey, task)
4e377863
JB
1057 } else {
1058 this.enqueueTask(workerNodeKey, task)
52b71763 1059 }
2e81254d 1060 })
280c2a77 1061 }
c97c7edb 1062
60dc0a9c
JB
1063 /**
1064 * Starts the minimum number of workers.
1065 */
e0843544 1066 private startMinimumNumberOfWorkers (initWorkerNodeUsage = false): void {
b362a929 1067 this.startingMinimumNumberOfWorkers = true
60dc0a9c
JB
1068 while (
1069 this.workerNodes.reduce(
1070 (accumulator, workerNode) =>
1071 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
1072 0
1073 ) < this.minimumNumberOfWorkers
1074 ) {
e0843544
JB
1075 const workerNodeKey = this.createAndSetupWorkerNode()
1076 initWorkerNodeUsage &&
1077 this.initWorkerNodeUsage(this.workerNodes[workerNodeKey])
60dc0a9c 1078 }
b362a929 1079 this.startingMinimumNumberOfWorkers = false
60dc0a9c
JB
1080 }
1081
47352846
JB
1082 /** @inheritdoc */
1083 public start (): void {
711623b8
JB
1084 if (this.started) {
1085 throw new Error('Cannot start an already started pool')
1086 }
1087 if (this.starting) {
1088 throw new Error('Cannot start an already starting pool')
1089 }
1090 if (this.destroying) {
1091 throw new Error('Cannot start a destroying pool')
1092 }
47352846 1093 this.starting = true
60dc0a9c 1094 this.startMinimumNumberOfWorkers()
2008bccd 1095 this.startTimestamp = performance.now()
47352846
JB
1096 this.starting = false
1097 this.started = true
1098 }
1099
afc003b2 1100 /** @inheritDoc */
c97c7edb 1101 public async destroy (): Promise<void> {
711623b8
JB
1102 if (!this.started) {
1103 throw new Error('Cannot destroy an already destroyed pool')
1104 }
1105 if (this.starting) {
1106 throw new Error('Cannot destroy an starting pool')
1107 }
1108 if (this.destroying) {
1109 throw new Error('Cannot destroy an already destroying pool')
1110 }
1111 this.destroying = true
1fbcaa7c 1112 await Promise.all(
14c39df1 1113 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 1114 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
1115 })
1116 )
33e6bb4c 1117 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 1118 this.emitter?.emitDestroy()
55082af9 1119 this.readyEventEmitted = false
2008bccd 1120 delete this.startTimestamp
711623b8 1121 this.destroying = false
15b176e0 1122 this.started = false
c97c7edb
S
1123 }
1124
26ce26ca 1125 private async sendKillMessageToWorker (workerNodeKey: number): Promise<void> {
9edb9717 1126 await new Promise<void>((resolve, reject) => {
c63a35a0
JB
1127 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1128 if (this.workerNodes[workerNodeKey] == null) {
ad3836ed 1129 resolve()
85b2561d
JB
1130 return
1131 }
ae036c3e
JB
1132 const killMessageListener = (message: MessageValue<Response>): void => {
1133 this.checkMessageWorkerId(message)
1e3214b6
JB
1134 if (message.kill === 'success') {
1135 resolve()
1136 } else if (message.kill === 'failure') {
72ae84a2
JB
1137 reject(
1138 new Error(
c63a35a0 1139 `Kill message handling failed on worker ${message.workerId}`
72ae84a2
JB
1140 )
1141 )
1e3214b6 1142 }
ae036c3e 1143 }
51d9cfbd 1144 // FIXME: should be registered only once
ae036c3e 1145 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1146 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1147 })
1e3214b6
JB
1148 }
1149
4a6952ff 1150 /**
aa9eede8 1151 * Terminates the worker node given its worker node key.
4a6952ff 1152 *
aa9eede8 1153 * @param workerNodeKey - The worker node key.
4a6952ff 1154 */
07e0c9e5
JB
1155 protected async destroyWorkerNode (workerNodeKey: number): Promise<void> {
1156 this.flagWorkerNodeAsNotReady(workerNodeKey)
87347ea8 1157 const flushedTasks = this.flushTasksQueue(workerNodeKey)
07e0c9e5 1158 const workerNode = this.workerNodes[workerNodeKey]
32b141fd
JB
1159 await waitWorkerNodeEvents(
1160 workerNode,
1161 'taskFinished',
1162 flushedTasks,
1163 this.opts.tasksQueueOptions?.tasksFinishedTimeout ??
26ce26ca
JB
1164 getDefaultTasksQueueOptions(
1165 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
1166 ).tasksFinishedTimeout
32b141fd 1167 )
07e0c9e5
JB
1168 await this.sendKillMessageToWorker(workerNodeKey)
1169 await workerNode.terminate()
1170 }
c97c7edb 1171
729c563d 1172 /**
6677a3d3
JB
1173 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1174 * Can be overridden.
afc003b2
JB
1175 *
1176 * @virtual
729c563d 1177 */
280c2a77 1178 protected setupHook (): void {
965df41c 1179 /* Intentionally empty */
280c2a77 1180 }
c97c7edb 1181
729c563d 1182 /**
5bec72fd
JB
1183 * Returns whether the worker is the main worker or not.
1184 *
1185 * @returns `true` if the worker is the main worker, `false` otherwise.
280c2a77
S
1186 */
1187 protected abstract isMain (): boolean
1188
1189 /**
2e81254d 1190 * Hook executed before the worker task execution.
bf9549ae 1191 * Can be overridden.
729c563d 1192 *
f06e48d8 1193 * @param workerNodeKey - The worker node key.
1c6fe997 1194 * @param task - The task to execute.
729c563d 1195 */
1c6fe997
JB
1196 protected beforeTaskExecutionHook (
1197 workerNodeKey: number,
1198 task: Task<Data>
1199 ): void {
c63a35a0 1200 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1201 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def
JB
1202 const workerUsage = this.workerNodes[workerNodeKey].usage
1203 ++workerUsage.tasks.executing
c329fd41 1204 updateWaitTimeWorkerUsage(
bcfb06ce 1205 this.workerChoiceStrategiesContext,
c329fd41
JB
1206 workerUsage,
1207 task
1208 )
94407def
JB
1209 }
1210 if (
1211 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
67f3f2d6
JB
1212 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1213 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(task.name!) !=
1214 null
94407def 1215 ) {
67f3f2d6 1216 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1217 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1218 workerNodeKey
67f3f2d6
JB
1219 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1220 ].getTaskFunctionWorkerUsage(task.name!)!
5623b8d5 1221 ++taskFunctionWorkerUsage.tasks.executing
c329fd41 1222 updateWaitTimeWorkerUsage(
bcfb06ce 1223 this.workerChoiceStrategiesContext,
c329fd41
JB
1224 taskFunctionWorkerUsage,
1225 task
1226 )
b558f6b5 1227 }
c97c7edb
S
1228 }
1229
c01733f1 1230 /**
2e81254d 1231 * Hook executed after the worker task execution.
bf9549ae 1232 * Can be overridden.
c01733f1 1233 *
501aea93 1234 * @param workerNodeKey - The worker node key.
38e795c1 1235 * @param message - The received message.
c01733f1 1236 */
2e81254d 1237 protected afterTaskExecutionHook (
501aea93 1238 workerNodeKey: number,
2740a743 1239 message: MessageValue<Response>
bf9549ae 1240 ): void {
9a55fa8c 1241 let needWorkerChoiceStrategiesUpdate = false
c63a35a0 1242 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1243 if (this.workerNodes[workerNodeKey]?.usage != null) {
94407def 1244 const workerUsage = this.workerNodes[workerNodeKey].usage
c329fd41
JB
1245 updateTaskStatisticsWorkerUsage(workerUsage, message)
1246 updateRunTimeWorkerUsage(
bcfb06ce 1247 this.workerChoiceStrategiesContext,
c329fd41
JB
1248 workerUsage,
1249 message
1250 )
1251 updateEluWorkerUsage(
bcfb06ce 1252 this.workerChoiceStrategiesContext,
c329fd41
JB
1253 workerUsage,
1254 message
1255 )
9a55fa8c 1256 needWorkerChoiceStrategiesUpdate = true
94407def
JB
1257 }
1258 if (
1259 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1260 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
67f3f2d6
JB
1261 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1262 message.taskPerformance!.name
94407def
JB
1263 ) != null
1264 ) {
67f3f2d6 1265 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
db0e38ee 1266 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1267 workerNodeKey
67f3f2d6
JB
1268 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1269 ].getTaskFunctionWorkerUsage(message.taskPerformance!.name)!
c329fd41
JB
1270 updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1271 updateRunTimeWorkerUsage(
bcfb06ce 1272 this.workerChoiceStrategiesContext,
c329fd41
JB
1273 taskFunctionWorkerUsage,
1274 message
1275 )
1276 updateEluWorkerUsage(
bcfb06ce 1277 this.workerChoiceStrategiesContext,
c329fd41
JB
1278 taskFunctionWorkerUsage,
1279 message
1280 )
9a55fa8c 1281 needWorkerChoiceStrategiesUpdate = true
c329fd41 1282 }
9a55fa8c 1283 if (needWorkerChoiceStrategiesUpdate) {
bcfb06ce 1284 this.workerChoiceStrategiesContext?.update(workerNodeKey)
b558f6b5
JB
1285 }
1286 }
1287
db0e38ee
JB
1288 /**
1289 * Whether the worker node shall update its task function worker usage or not.
1290 *
1291 * @param workerNodeKey - The worker node key.
1292 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1293 */
1294 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1295 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1296 return (
1851fed0 1297 workerInfo != null &&
31847469
JB
1298 Array.isArray(workerInfo.taskFunctionsProperties) &&
1299 workerInfo.taskFunctionsProperties.length > 2
b558f6b5 1300 )
f1c06930
JB
1301 }
1302
280c2a77 1303 /**
2be9b405 1304 * Chooses a worker node for the next task given the worker choice strategy.
280c2a77 1305 *
bcfb06ce 1306 * @param workerChoiceStrategy - The worker choice strategy.
aa9eede8 1307 * @returns The chosen worker node key
280c2a77 1308 */
bcfb06ce
JB
1309 private chooseWorkerNode (
1310 workerChoiceStrategy?: WorkerChoiceStrategy
1311 ): number {
930dcf12 1312 if (this.shallCreateDynamicWorker()) {
aa9eede8 1313 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1314 if (
bcfb06ce
JB
1315 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1316 true
6c6afb84 1317 ) {
aa9eede8 1318 return workerNodeKey
6c6afb84 1319 }
17393ac8 1320 }
c63a35a0 1321 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
bcfb06ce 1322 return this.workerChoiceStrategiesContext!.execute(workerChoiceStrategy)
930dcf12
JB
1323 }
1324
6c6afb84
JB
1325 /**
1326 * Conditions for dynamic worker creation.
1327 *
1328 * @returns Whether to create a dynamic worker or not.
1329 */
9d9fb7b6 1330 protected abstract shallCreateDynamicWorker (): boolean
c97c7edb 1331
280c2a77 1332 /**
aa9eede8 1333 * Sends a message to worker given its worker node key.
280c2a77 1334 *
aa9eede8 1335 * @param workerNodeKey - The worker node key.
38e795c1 1336 * @param message - The message.
7d91a8cd 1337 * @param transferList - The optional array of transferable objects.
280c2a77
S
1338 */
1339 protected abstract sendToWorker (
aa9eede8 1340 workerNodeKey: number,
7d91a8cd 1341 message: MessageValue<Data>,
6a3ecc50 1342 transferList?: readonly TransferListItem[]
280c2a77
S
1343 ): void
1344
e0843544
JB
1345 /**
1346 * Initializes the worker node usage with sensible default values gathered during runtime.
1347 *
1348 * @param workerNode - The worker node.
1349 */
1350 private initWorkerNodeUsage (workerNode: IWorkerNode<Worker, Data>): void {
1351 if (
1352 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1353 .runTime.aggregate === true
1354 ) {
1355 workerNode.usage.runTime.aggregate = min(
1356 ...this.workerNodes.map(
1357 workerNode => workerNode.usage.runTime.aggregate ?? Infinity
1358 )
1359 )
1360 }
1361 if (
1362 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1363 .waitTime.aggregate === true
1364 ) {
1365 workerNode.usage.waitTime.aggregate = min(
1366 ...this.workerNodes.map(
1367 workerNode => workerNode.usage.waitTime.aggregate ?? Infinity
1368 )
1369 )
1370 }
1371 if (
1372 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
1373 .aggregate === true
1374 ) {
1375 workerNode.usage.elu.active.aggregate = min(
1376 ...this.workerNodes.map(
1377 workerNode => workerNode.usage.elu.active.aggregate ?? Infinity
1378 )
1379 )
1380 }
1381 }
1382
4a6952ff 1383 /**
aa9eede8 1384 * Creates a new, completely set up worker node.
4a6952ff 1385 *
aa9eede8 1386 * @returns New, completely set up worker node key.
4a6952ff 1387 */
aa9eede8 1388 protected createAndSetupWorkerNode (): number {
c3719753
JB
1389 const workerNode = this.createWorkerNode()
1390 workerNode.registerWorkerEventHandler(
1391 'online',
1392 this.opts.onlineHandler ?? EMPTY_FUNCTION
1393 )
1394 workerNode.registerWorkerEventHandler(
1395 'message',
1396 this.opts.messageHandler ?? EMPTY_FUNCTION
1397 )
1398 workerNode.registerWorkerEventHandler(
1399 'error',
1400 this.opts.errorHandler ?? EMPTY_FUNCTION
1401 )
b271fce0 1402 workerNode.registerOnceWorkerEventHandler('error', (error: Error) => {
07e0c9e5 1403 workerNode.info.ready = false
2a69b8c5 1404 this.emitter?.emit(PoolEvents.error, error)
15b176e0 1405 if (
b6bfca01 1406 this.started &&
711623b8 1407 !this.destroying &&
9b38ab2d 1408 this.opts.restartWorkerOnError === true
15b176e0 1409 ) {
07e0c9e5 1410 if (workerNode.info.dynamic) {
aa9eede8 1411 this.createAndSetupDynamicWorkerNode()
b362a929 1412 } else if (!this.startingMinimumNumberOfWorkers) {
e0843544 1413 this.startMinimumNumberOfWorkers(true)
8a1260a3 1414 }
5baee0d7 1415 }
3c9123c7
JB
1416 if (
1417 this.started &&
1418 !this.destroying &&
1419 this.opts.enableTasksQueue === true
1420 ) {
9974369e 1421 this.redistributeQueuedTasks(this.workerNodes.indexOf(workerNode))
19dbc45b 1422 }
b4f8aca8 1423 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
07f6f7b1 1424 workerNode?.terminate().catch((error: unknown) => {
07e0c9e5
JB
1425 this.emitter?.emit(PoolEvents.error, error)
1426 })
5baee0d7 1427 })
c3719753
JB
1428 workerNode.registerWorkerEventHandler(
1429 'exit',
1430 this.opts.exitHandler ?? EMPTY_FUNCTION
1431 )
1432 workerNode.registerOnceWorkerEventHandler('exit', () => {
9974369e 1433 this.removeWorkerNode(workerNode)
b362a929
JB
1434 if (
1435 this.started &&
1436 !this.startingMinimumNumberOfWorkers &&
1437 !this.destroying
1438 ) {
e0843544 1439 this.startMinimumNumberOfWorkers(true)
60dc0a9c 1440 }
a974afa6 1441 })
c3719753 1442 const workerNodeKey = this.addWorkerNode(workerNode)
aa9eede8 1443 this.afterWorkerNodeSetup(workerNodeKey)
aa9eede8 1444 return workerNodeKey
c97c7edb 1445 }
be0676b3 1446
930dcf12 1447 /**
aa9eede8 1448 * Creates a new, completely set up dynamic worker node.
930dcf12 1449 *
aa9eede8 1450 * @returns New, completely set up dynamic worker node key.
930dcf12 1451 */
aa9eede8
JB
1452 protected createAndSetupDynamicWorkerNode (): number {
1453 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1454 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1455 this.checkMessageWorkerId(message)
aa9eede8
JB
1456 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1457 message.workerId
aad6fb64 1458 )
2b6b412f 1459 const workerUsage = this.workerNodes[localWorkerNodeKey]?.usage
81c02522 1460 // Kill message received from worker
930dcf12
JB
1461 if (
1462 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1463 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1464 ((this.opts.enableTasksQueue === false &&
aa9eede8 1465 workerUsage.tasks.executing === 0) ||
7b56f532 1466 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1467 workerUsage.tasks.executing === 0 &&
1468 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1469 ) {
f3827d5d 1470 // Flag the worker node as not ready immediately
ae3ab61d 1471 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
07f6f7b1 1472 this.destroyWorkerNode(localWorkerNodeKey).catch((error: unknown) => {
5270d253
JB
1473 this.emitter?.emit(PoolEvents.error, error)
1474 })
930dcf12
JB
1475 }
1476 })
aa9eede8 1477 this.sendToWorker(workerNodeKey, {
e9dd5b66 1478 checkActive: true
21f710aa 1479 })
72ae84a2 1480 if (this.taskFunctions.size > 0) {
31847469 1481 for (const [taskFunctionName, taskFunctionObject] of this.taskFunctions) {
72ae84a2
JB
1482 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1483 taskFunctionOperation: 'add',
31847469
JB
1484 taskFunctionProperties: buildTaskFunctionProperties(
1485 taskFunctionName,
1486 taskFunctionObject
1487 ),
1488 taskFunction: taskFunctionObject.taskFunction.toString()
07f6f7b1 1489 }).catch((error: unknown) => {
72ae84a2
JB
1490 this.emitter?.emit(PoolEvents.error, error)
1491 })
1492 }
1493 }
e44639e9
JB
1494 const workerNode = this.workerNodes[workerNodeKey]
1495 workerNode.info.dynamic = true
b1aae695 1496 if (
bcfb06ce
JB
1497 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerReady ===
1498 true ||
1499 this.workerChoiceStrategiesContext?.getPolicy().dynamicWorkerUsage ===
1500 true
b1aae695 1501 ) {
e44639e9 1502 workerNode.info.ready = true
b5e113f6 1503 }
e0843544 1504 this.initWorkerNodeUsage(workerNode)
33e6bb4c 1505 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1506 return workerNodeKey
930dcf12
JB
1507 }
1508
a2ed5053 1509 /**
aa9eede8 1510 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1511 *
aa9eede8 1512 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1513 * @param listener - The message listener callback.
1514 */
85aeb3f3
JB
1515 protected abstract registerWorkerMessageListener<
1516 Message extends Data | Response
aa9eede8
JB
1517 >(
1518 workerNodeKey: number,
1519 listener: (message: MessageValue<Message>) => void
1520 ): void
a2ed5053 1521
ae036c3e
JB
1522 /**
1523 * Registers once a listener callback on the worker given its worker node key.
1524 *
1525 * @param workerNodeKey - The worker node key.
1526 * @param listener - The message listener callback.
1527 */
1528 protected abstract registerOnceWorkerMessageListener<
1529 Message extends Data | Response
1530 >(
1531 workerNodeKey: number,
1532 listener: (message: MessageValue<Message>) => void
1533 ): void
1534
1535 /**
1536 * Deregisters a listener callback on the worker given its worker node key.
1537 *
1538 * @param workerNodeKey - The worker node key.
1539 * @param listener - The message listener callback.
1540 */
1541 protected abstract deregisterWorkerMessageListener<
1542 Message extends Data | Response
1543 >(
1544 workerNodeKey: number,
1545 listener: (message: MessageValue<Message>) => void
1546 ): void
1547
a2ed5053 1548 /**
aa9eede8 1549 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1550 * Can be overridden.
1551 *
aa9eede8 1552 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1553 */
aa9eede8 1554 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1555 // Listen to worker messages.
fcd39179
JB
1556 this.registerWorkerMessageListener(
1557 workerNodeKey,
3a776b6d 1558 this.workerMessageListener
fcd39179 1559 )
85aeb3f3 1560 // Send the startup message to worker.
aa9eede8 1561 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1562 // Send the statistics message to worker.
1563 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1564 if (this.opts.enableTasksQueue === true) {
dbd73092 1565 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1566 this.workerNodes[workerNodeKey].on(
e44639e9
JB
1567 'idle',
1568 this.handleWorkerNodeIdleEvent
9f95d5eb 1569 )
47352846
JB
1570 }
1571 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1572 this.workerNodes[workerNodeKey].on(
b5e75be8 1573 'backPressure',
e44639e9 1574 this.handleWorkerNodeBackPressureEvent
9f95d5eb 1575 )
47352846 1576 }
72695f86 1577 }
d2c73f82
JB
1578 }
1579
85aeb3f3 1580 /**
aa9eede8
JB
1581 * Sends the startup message to worker given its worker node key.
1582 *
1583 * @param workerNodeKey - The worker node key.
1584 */
1585 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1586
1587 /**
9edb9717 1588 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1589 *
aa9eede8 1590 * @param workerNodeKey - The worker node key.
85aeb3f3 1591 */
9edb9717 1592 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1593 this.sendToWorker(workerNodeKey, {
1594 statistics: {
1595 runTime:
bcfb06ce 1596 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
f4d0a470 1597 .runTime.aggregate ?? false,
c63a35a0 1598 elu:
bcfb06ce
JB
1599 this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1600 .elu.aggregate ?? false
72ae84a2 1601 }
aa9eede8
JB
1602 })
1603 }
a2ed5053 1604
5eb72b9e
JB
1605 private cannotStealTask (): boolean {
1606 return this.workerNodes.length <= 1 || this.info.queuedTasks === 0
1607 }
1608
42c677c1
JB
1609 private handleTask (workerNodeKey: number, task: Task<Data>): void {
1610 if (this.shallExecuteTask(workerNodeKey)) {
1611 this.executeTask(workerNodeKey, task)
1612 } else {
1613 this.enqueueTask(workerNodeKey, task)
1614 }
1615 }
1616
a2ed5053 1617 private redistributeQueuedTasks (workerNodeKey: number): void {
0d033538 1618 if (workerNodeKey === -1 || this.cannotStealTask()) {
cb71d660
JB
1619 return
1620 }
a2ed5053 1621 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1622 const destinationWorkerNodeKey = this.workerNodes.reduce(
1623 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1624 return workerNode.info.ready &&
1625 workerNode.usage.tasks.queued <
1626 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1627 ? workerNodeKey
1628 : minWorkerNodeKey
1629 },
1630 0
1631 )
42c677c1
JB
1632 this.handleTask(
1633 destinationWorkerNodeKey,
67f3f2d6
JB
1634 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1635 this.dequeueTask(workerNodeKey)!
42c677c1 1636 )
dd951876
JB
1637 }
1638 }
1639
b1838604
JB
1640 private updateTaskStolenStatisticsWorkerUsage (
1641 workerNodeKey: number,
b1838604
JB
1642 taskName: string
1643 ): void {
1a880eca 1644 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1645 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1646 if (workerNode?.usage != null) {
b1838604
JB
1647 ++workerNode.usage.tasks.stolen
1648 }
1649 if (
1650 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1651 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1652 ) {
67f3f2d6
JB
1653 const taskFunctionWorkerUsage =
1654 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1655 workerNode.getTaskFunctionWorkerUsage(taskName)!
b1838604
JB
1656 ++taskFunctionWorkerUsage.tasks.stolen
1657 }
1658 }
1659
463226a4 1660 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1661 workerNodeKey: number
463226a4
JB
1662 ): void {
1663 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1664 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1665 if (workerNode?.usage != null) {
463226a4
JB
1666 ++workerNode.usage.tasks.sequentiallyStolen
1667 }
f1f77f45
JB
1668 }
1669
1670 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1671 workerNodeKey: number,
1672 taskName: string
1673 ): void {
1674 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1675 if (
1676 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1677 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1678 ) {
67f3f2d6
JB
1679 const taskFunctionWorkerUsage =
1680 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1681 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1682 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1683 }
1684 }
1685
1686 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1687 workerNodeKey: number
463226a4
JB
1688 ): void {
1689 const workerNode = this.workerNodes[workerNodeKey]
c63a35a0 1690 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
efabc98d 1691 if (workerNode?.usage != null) {
463226a4
JB
1692 workerNode.usage.tasks.sequentiallyStolen = 0
1693 }
f1f77f45
JB
1694 }
1695
1696 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1697 workerNodeKey: number,
1698 taskName: string
1699 ): void {
1700 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1701 if (
1702 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1703 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1704 ) {
67f3f2d6
JB
1705 const taskFunctionWorkerUsage =
1706 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1707 workerNode.getTaskFunctionWorkerUsage(taskName)!
463226a4
JB
1708 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1709 }
1710 }
1711
e44639e9 1712 private readonly handleWorkerNodeIdleEvent = (
e1c2dba7 1713 eventDetail: WorkerNodeEventDetail,
463226a4 1714 previousStolenTask?: Task<Data>
9f95d5eb 1715 ): void => {
e1c2dba7 1716 const { workerNodeKey } = eventDetail
463226a4
JB
1717 if (workerNodeKey == null) {
1718 throw new Error(
c63a35a0 1719 "WorkerNode event detail 'workerNodeKey' property must be defined"
463226a4
JB
1720 )
1721 }
c63a35a0 1722 const workerInfo = this.getWorkerInfo(workerNodeKey)
5eb72b9e
JB
1723 if (
1724 this.cannotStealTask() ||
c63a35a0
JB
1725 (this.info.stealingWorkerNodes ?? 0) >
1726 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1727 ) {
1851fed0 1728 if (workerInfo != null && previousStolenTask != null) {
c63a35a0 1729 workerInfo.stealing = false
5eb72b9e
JB
1730 }
1731 return
1732 }
463226a4
JB
1733 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1734 if (
1851fed0 1735 workerInfo != null &&
463226a4
JB
1736 previousStolenTask != null &&
1737 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1738 (workerNodeTasksUsage.executing > 0 ||
1739 this.tasksQueueSize(workerNodeKey) > 0)
1740 ) {
c63a35a0 1741 workerInfo.stealing = false
67f3f2d6 1742 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
31847469 1743 for (const taskFunctionProperties of workerInfo.taskFunctionsProperties!) {
f1f77f45
JB
1744 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1745 workerNodeKey,
31847469 1746 taskFunctionProperties.name
f1f77f45
JB
1747 )
1748 }
1749 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1750 return
1751 }
1851fed0
JB
1752 if (workerInfo == null) {
1753 throw new Error(
1754 `Worker node with key '${workerNodeKey}' not found in pool`
1755 )
1756 }
c63a35a0 1757 workerInfo.stealing = true
463226a4 1758 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1759 if (
1760 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1761 stolenTask != null
1762 ) {
67f3f2d6 1763 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
f1f77f45
JB
1764 const taskFunctionTasksWorkerUsage = this.workerNodes[
1765 workerNodeKey
67f3f2d6
JB
1766 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1767 ].getTaskFunctionWorkerUsage(stolenTask.name!)!.tasks
f1f77f45
JB
1768 if (
1769 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1770 (previousStolenTask != null &&
1771 previousStolenTask.name === stolenTask.name &&
1772 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1773 ) {
1774 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1775 workerNodeKey,
67f3f2d6
JB
1776 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1777 stolenTask.name!
f1f77f45
JB
1778 )
1779 } else {
1780 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1781 workerNodeKey,
67f3f2d6
JB
1782 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1783 stolenTask.name!
f1f77f45
JB
1784 )
1785 }
1786 }
463226a4
JB
1787 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1788 .then(() => {
e44639e9 1789 this.handleWorkerNodeIdleEvent(eventDetail, stolenTask)
463226a4
JB
1790 return undefined
1791 })
07f6f7b1 1792 .catch((error: unknown) => {
2e8cfbb7
JB
1793 this.emitter?.emit(PoolEvents.error, error)
1794 })
463226a4
JB
1795 }
1796
1797 private readonly workerNodeStealTask = (
1798 workerNodeKey: number
1799 ): Task<Data> | undefined => {
dd951876 1800 const workerNodes = this.workerNodes
a6b3272b 1801 .slice()
dd951876
JB
1802 .sort(
1803 (workerNodeA, workerNodeB) =>
1804 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1805 )
f201a0cd 1806 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1807 (sourceWorkerNode, sourceWorkerNodeKey) =>
1808 sourceWorkerNode.info.ready &&
5eb72b9e 1809 !sourceWorkerNode.info.stealing &&
463226a4
JB
1810 sourceWorkerNodeKey !== workerNodeKey &&
1811 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1812 )
1813 if (sourceWorkerNode != null) {
67f3f2d6 1814 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95d1a734 1815 const task = sourceWorkerNode.dequeueTask(1)!
42c677c1 1816 this.handleTask(workerNodeKey, task)
f1f77f45 1817 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
67f3f2d6
JB
1818 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1819 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
463226a4 1820 return task
72695f86
JB
1821 }
1822 }
1823
e44639e9 1824 private readonly handleWorkerNodeBackPressureEvent = (
e1c2dba7 1825 eventDetail: WorkerNodeEventDetail
9f95d5eb 1826 ): void => {
5eb72b9e
JB
1827 if (
1828 this.cannotStealTask() ||
c63a35a0
JB
1829 (this.info.stealingWorkerNodes ?? 0) >
1830 Math.floor(this.workerNodes.length / 2)
5eb72b9e 1831 ) {
cb71d660
JB
1832 return
1833 }
e1c2dba7 1834 const { workerId } = eventDetail
f778c355 1835 const sizeOffset = 1
67f3f2d6
JB
1836 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1837 if (this.opts.tasksQueueOptions!.size! <= sizeOffset) {
68dbcdc0
JB
1838 return
1839 }
72695f86 1840 const sourceWorkerNode =
b5e75be8 1841 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1842 const workerNodes = this.workerNodes
a6b3272b 1843 .slice()
72695f86
JB
1844 .sort(
1845 (workerNodeA, workerNodeB) =>
1846 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1847 )
1848 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1849 if (
0bc68267 1850 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1851 workerNode.info.ready &&
5eb72b9e 1852 !workerNode.info.stealing &&
b5e75be8 1853 workerNode.info.id !== workerId &&
0bc68267 1854 workerNode.usage.tasks.queued <
67f3f2d6
JB
1855 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1856 this.opts.tasksQueueOptions!.size! - sizeOffset
72695f86 1857 ) {
c63a35a0 1858 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0
JB
1859 if (workerInfo == null) {
1860 throw new Error(
1861 `Worker node with key '${workerNodeKey}' not found in pool`
1862 )
1863 }
c63a35a0 1864 workerInfo.stealing = true
67f3f2d6 1865 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
95d1a734 1866 const task = sourceWorkerNode.dequeueTask(1)!
42c677c1 1867 this.handleTask(workerNodeKey, task)
67f3f2d6
JB
1868 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1869 this.updateTaskStolenStatisticsWorkerUsage(workerNodeKey, task.name!)
c63a35a0 1870 workerInfo.stealing = false
10ecf8fd 1871 }
a2ed5053
JB
1872 }
1873 }
1874
be0676b3 1875 /**
fcd39179 1876 * This method is the message listener registered on each worker.
be0676b3 1877 */
3a776b6d
JB
1878 protected readonly workerMessageListener = (
1879 message: MessageValue<Response>
1880 ): void => {
fcd39179 1881 this.checkMessageWorkerId(message)
31847469
JB
1882 const { workerId, ready, taskId, taskFunctionsProperties } = message
1883 if (ready != null && taskFunctionsProperties != null) {
fcd39179
JB
1884 // Worker ready response received from worker
1885 this.handleWorkerReadyResponse(message)
31847469
JB
1886 } else if (taskFunctionsProperties != null) {
1887 // Task function properties message received from worker
9a55fa8c
JB
1888 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1889 const workerInfo = this.getWorkerInfo(workerNodeKey)
1851fed0 1890 if (workerInfo != null) {
31847469 1891 workerInfo.taskFunctionsProperties = taskFunctionsProperties
9a55fa8c 1892 this.sendStatisticsMessageToWorker(workerNodeKey)
1851fed0 1893 }
31847469
JB
1894 } else if (taskId != null) {
1895 // Task execution response received from worker
1896 this.handleTaskExecutionResponse(message)
6b272951
JB
1897 }
1898 }
1899
8e8d9101
JB
1900 private checkAndEmitReadyEvent (): void {
1901 if (!this.readyEventEmitted && this.ready) {
1902 this.emitter?.emit(PoolEvents.ready, this.info)
1903 this.readyEventEmitted = true
1904 }
1905 }
1906
10e2aa7e 1907 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
31847469 1908 const { workerId, ready, taskFunctionsProperties } = message
e44639e9 1909 if (ready == null || !ready) {
c63a35a0 1910 throw new Error(`Worker ${workerId} failed to initialize`)
f05ed93c 1911 }
9a55fa8c
JB
1912 const workerNodeKey = this.getWorkerNodeKeyByWorkerId(workerId)
1913 const workerNode = this.workerNodes[workerNodeKey]
e44639e9 1914 workerNode.info.ready = ready
31847469 1915 workerNode.info.taskFunctionsProperties = taskFunctionsProperties
9a55fa8c 1916 this.sendStatisticsMessageToWorker(workerNodeKey)
8e8d9101 1917 this.checkAndEmitReadyEvent()
6b272951
JB
1918 }
1919
1920 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1921 const { workerId, taskId, workerError, data } = message
67f3f2d6
JB
1922 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1923 const promiseResponse = this.promiseResponseMap.get(taskId!)
6b272951 1924 if (promiseResponse != null) {
f18fd12b 1925 const { resolve, reject, workerNodeKey, asyncResource } = promiseResponse
9b358e72 1926 const workerNode = this.workerNodes[workerNodeKey]
6703b9f4
JB
1927 if (workerError != null) {
1928 this.emitter?.emit(PoolEvents.taskError, workerError)
f18fd12b
JB
1929 asyncResource != null
1930 ? asyncResource.runInAsyncScope(
1931 reject,
1932 this.emitter,
1933 workerError.message
1934 )
1935 : reject(workerError.message)
6b272951 1936 } else {
f18fd12b
JB
1937 asyncResource != null
1938 ? asyncResource.runInAsyncScope(resolve, this.emitter, data)
1939 : resolve(data as Response)
6b272951 1940 }
f18fd12b 1941 asyncResource?.emitDestroy()
501aea93 1942 this.afterTaskExecutionHook(workerNodeKey, message)
67f3f2d6
JB
1943 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1944 this.promiseResponseMap.delete(taskId!)
cdaecaee
JB
1945 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1946 workerNode?.emit('taskFinished', taskId)
16d7e943
JB
1947 if (
1948 this.opts.enableTasksQueue === true &&
1949 !this.destroying &&
1950 // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
1951 workerNode != null
1952 ) {
9b358e72 1953 const workerNodeTasksUsage = workerNode.usage.tasks
463226a4
JB
1954 if (
1955 this.tasksQueueSize(workerNodeKey) > 0 &&
1956 workerNodeTasksUsage.executing <
67f3f2d6
JB
1957 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1958 this.opts.tasksQueueOptions!.concurrency!
463226a4 1959 ) {
67f3f2d6
JB
1960 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1961 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
463226a4
JB
1962 }
1963 if (
1964 workerNodeTasksUsage.executing === 0 &&
1965 this.tasksQueueSize(workerNodeKey) === 0 &&
1966 workerNodeTasksUsage.sequentiallyStolen === 0
1967 ) {
e44639e9 1968 workerNode.emit('idle', {
7f0e1334 1969 workerId,
e1c2dba7
JB
1970 workerNodeKey
1971 })
463226a4 1972 }
be0676b3
APA
1973 }
1974 }
be0676b3 1975 }
7c0ba920 1976
a1763c54 1977 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1978 if (this.busy) {
1979 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1980 }
1981 }
1982
1983 private checkAndEmitTaskQueuingEvents (): void {
1984 if (this.hasBackPressure()) {
1985 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1986 }
1987 }
1988
d0878034
JB
1989 /**
1990 * Emits dynamic worker creation events.
1991 */
1992 protected abstract checkAndEmitDynamicWorkerCreationEvents (): void
33e6bb4c 1993
8a1260a3 1994 /**
aa9eede8 1995 * Gets the worker information given its worker node key.
8a1260a3
JB
1996 *
1997 * @param workerNodeKey - The worker node key.
3f09ed9f 1998 * @returns The worker information.
8a1260a3 1999 */
1851fed0
JB
2000 protected getWorkerInfo (workerNodeKey: number): WorkerInfo | undefined {
2001 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
2002 }
2003
a05c10de 2004 /**
c3719753 2005 * Creates a worker node.
ea7a90d3 2006 *
c3719753 2007 * @returns The created worker node.
ea7a90d3 2008 */
c3719753 2009 private createWorkerNode (): IWorkerNode<Worker, Data> {
671d5154 2010 const workerNode = new WorkerNode<Worker, Data>(
c3719753
JB
2011 this.worker,
2012 this.filePath,
2013 {
2014 env: this.opts.env,
2015 workerOptions: this.opts.workerOptions,
2016 tasksQueueBackPressureSize:
32b141fd 2017 this.opts.tasksQueueOptions?.size ??
26ce26ca
JB
2018 getDefaultTasksQueueOptions(
2019 this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers
95d1a734
JB
2020 ).size,
2021 tasksQueueBucketSize:
2022 (this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers) * 2
c3719753 2023 }
671d5154 2024 )
b97d82d8 2025 // Flag the worker node as ready at pool startup.
d2c73f82
JB
2026 if (this.starting) {
2027 workerNode.info.ready = true
2028 }
c3719753
JB
2029 return workerNode
2030 }
2031
2032 /**
2033 * Adds the given worker node in the pool worker nodes.
2034 *
2035 * @param workerNode - The worker node.
2036 * @returns The added worker node key.
2037 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
2038 */
2039 private addWorkerNode (workerNode: IWorkerNode<Worker, Data>): number {
aa9eede8 2040 this.workerNodes.push(workerNode)
c3719753 2041 const workerNodeKey = this.workerNodes.indexOf(workerNode)
aa9eede8 2042 if (workerNodeKey === -1) {
86ed0598 2043 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
2044 }
2045 return workerNodeKey
ea7a90d3 2046 }
c923ce56 2047
8e8d9101
JB
2048 private checkAndEmitEmptyEvent (): void {
2049 if (this.empty) {
2050 this.emitter?.emit(PoolEvents.empty, this.info)
2051 this.readyEventEmitted = false
2052 }
2053 }
2054
51fe3d3c 2055 /**
9974369e 2056 * Removes the worker node from the pool worker nodes.
51fe3d3c 2057 *
9974369e 2058 * @param workerNode - The worker node.
51fe3d3c 2059 */
9974369e
JB
2060 private removeWorkerNode (workerNode: IWorkerNode<Worker, Data>): void {
2061 const workerNodeKey = this.workerNodes.indexOf(workerNode)
1f68cede
JB
2062 if (workerNodeKey !== -1) {
2063 this.workerNodes.splice(workerNodeKey, 1)
bcfb06ce 2064 this.workerChoiceStrategiesContext?.remove(workerNodeKey)
1f68cede 2065 }
8e8d9101 2066 this.checkAndEmitEmptyEvent()
51fe3d3c 2067 }
adc3c320 2068
ae3ab61d 2069 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1851fed0
JB
2070 const workerInfo = this.getWorkerInfo(workerNodeKey)
2071 if (workerInfo != null) {
2072 workerInfo.ready = false
2073 }
ae3ab61d
JB
2074 }
2075
9e844245
JB
2076 private hasBackPressure (): boolean {
2077 return (
2078 this.opts.enableTasksQueue === true &&
2079 this.workerNodes.findIndex(
041dc05b 2080 workerNode => !workerNode.hasBackPressure()
a1763c54 2081 ) === -1
9e844245 2082 )
e2b31e32
JB
2083 }
2084
b0a4db63 2085 /**
aa9eede8 2086 * Executes the given task on the worker given its worker node key.
b0a4db63 2087 *
aa9eede8 2088 * @param workerNodeKey - The worker node key.
b0a4db63
JB
2089 * @param task - The task to execute.
2090 */
2e81254d 2091 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 2092 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 2093 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 2094 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
2095 }
2096
f9f00b5f 2097 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
2098 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
2099 this.checkAndEmitTaskQueuingEvents()
2100 return tasksQueueSize
adc3c320
JB
2101 }
2102
95d1a734
JB
2103 private dequeueTask (
2104 workerNodeKey: number,
2105 bucket?: number
2106 ): Task<Data> | undefined {
2107 return this.workerNodes[workerNodeKey].dequeueTask(bucket)
adc3c320
JB
2108 }
2109
416fd65c 2110 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 2111 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
2112 }
2113
87347ea8
JB
2114 protected flushTasksQueue (workerNodeKey: number): number {
2115 let flushedTasks = 0
920278a2 2116 while (this.tasksQueueSize(workerNodeKey) > 0) {
67f3f2d6
JB
2117 // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
2118 this.executeTask(workerNodeKey, this.dequeueTask(workerNodeKey)!)
87347ea8 2119 ++flushedTasks
ff733df7 2120 }
4b628b48 2121 this.workerNodes[workerNodeKey].clearTasksQueue()
87347ea8 2122 return flushedTasks
ff733df7
JB
2123 }
2124
ef41a6e6 2125 private flushTasksQueues (): void {
19b8be8b 2126 for (const workerNodeKey of this.workerNodes.keys()) {
ef41a6e6
JB
2127 this.flushTasksQueue(workerNodeKey)
2128 }
2129 }
c97c7edb 2130}