perf: optimize tasks stealing in corner case
[poolifier.git] / src / pools / abstract-pool.ts
CommitLineData
2845f2a5 1import { randomUUID } from 'node:crypto'
62c15a68 2import { performance } from 'node:perf_hooks'
ef083f7b 3import type { TransferListItem } from 'node:worker_threads'
f80125ca 4import { EventEmitterAsyncResource } from 'node:events'
5c4d16da
JB
5import type {
6 MessageValue,
7 PromiseResponseWrapper,
76d91ea0 8 Task
5c4d16da 9} from '../utility-types'
bbeadd16 10import {
ff128cc9 11 DEFAULT_TASK_NAME,
bbeadd16
JB
12 DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
13 EMPTY_FUNCTION,
dc021bcc 14 average,
463226a4 15 exponentialDelay,
59317253 16 isKillBehavior,
0d80593b 17 isPlainObject,
90d6701c 18 max,
afe0d5bf 19 median,
90d6701c 20 min,
463226a4
JB
21 round,
22 sleep
bbeadd16 23} from '../utils'
59317253 24import { KillBehaviors } from '../worker/worker-options'
6703b9f4 25import type { TaskFunction } from '../worker/task-functions'
c4855468 26import {
65d7a1c9 27 type IPool,
c4855468 28 PoolEvents,
6b27d407 29 type PoolInfo,
c4855468 30 type PoolOptions,
6b27d407
JB
31 type PoolType,
32 PoolTypes,
4b628b48 33 type TasksQueueOptions
c4855468 34} from './pool'
4d159167
JB
35import type {
36 IWorker,
37 IWorkerNode,
f1f77f45 38 TaskStatistics,
4d159167 39 WorkerInfo,
9f95d5eb 40 WorkerNodeEventDetail,
4d159167
JB
41 WorkerType,
42 WorkerUsage
e102732c 43} from './worker'
a35560ba 44import {
008512c7 45 type MeasurementStatisticsRequirements,
f0d7f803 46 Measurements,
a35560ba 47 WorkerChoiceStrategies,
a20f0ba5
JB
48 type WorkerChoiceStrategy,
49 type WorkerChoiceStrategyOptions
bdaf31cd
JB
50} from './selection-strategies/selection-strategies-types'
51import { WorkerChoiceStrategyContext } from './selection-strategies/worker-choice-strategy-context'
92b1feaa 52import { version } from './version'
4b628b48 53import { WorkerNode } from './worker-node'
bde6b5d7
JB
54import {
55 checkFilePath,
56 checkValidTasksQueueOptions,
bfc75cca
JB
57 checkValidWorkerChoiceStrategy,
58 updateMeasurementStatistics
bde6b5d7 59} from './utils'
23ccf9d7 60
729c563d 61/**
ea7a90d3 62 * Base class that implements some shared logic for all poolifier pools.
729c563d 63 *
38e795c1 64 * @typeParam Worker - Type of worker which manages this pool.
e102732c
JB
65 * @typeParam Data - Type of data sent to the worker. This can only be structured-cloneable data.
66 * @typeParam Response - Type of execution response. This can only be structured-cloneable data.
729c563d 67 */
c97c7edb 68export abstract class AbstractPool<
f06e48d8 69 Worker extends IWorker,
d3c8a1a8
S
70 Data = unknown,
71 Response = unknown
c4855468 72> implements IPool<Worker, Data, Response> {
afc003b2 73 /** @inheritDoc */
4b628b48 74 public readonly workerNodes: Array<IWorkerNode<Worker, Data>> = []
4a6952ff 75
afc003b2 76 /** @inheritDoc */
f80125ca 77 public emitter?: EventEmitterAsyncResource
7c0ba920 78
fcd39179
JB
79 /**
80 * Dynamic pool maximum size property placeholder.
81 */
82 protected readonly max?: number
83
be0676b3 84 /**
419e8121 85 * The task execution response promise map:
2740a743 86 * - `key`: The message id of each submitted task.
a3445496 87 * - `value`: An object that contains the worker, the execution response promise resolve and reject callbacks.
be0676b3 88 *
a3445496 89 * When we receive a message from the worker, we get a map entry with the promise resolve/reject bound to the message id.
be0676b3 90 */
501aea93
JB
91 protected promiseResponseMap: Map<string, PromiseResponseWrapper<Response>> =
92 new Map<string, PromiseResponseWrapper<Response>>()
c97c7edb 93
a35560ba 94 /**
51fe3d3c 95 * Worker choice strategy context referencing a worker choice algorithm implementation.
a35560ba
S
96 */
97 protected workerChoiceStrategyContext: WorkerChoiceStrategyContext<
78cea37e
JB
98 Worker,
99 Data,
100 Response
a35560ba
S
101 >
102
72ae84a2
JB
103 /**
104 * The task functions added at runtime map:
105 * - `key`: The task function name.
106 * - `value`: The task function itself.
107 */
108 private readonly taskFunctions: Map<string, TaskFunction<Data, Response>>
109
15b176e0
JB
110 /**
111 * Whether the pool is started or not.
112 */
113 private started: boolean
47352846
JB
114 /**
115 * Whether the pool is starting or not.
116 */
117 private starting: boolean
711623b8
JB
118 /**
119 * Whether the pool is destroying or not.
120 */
121 private destroying: boolean
55082af9
JB
122 /**
123 * Whether the pool ready event has been emitted or not.
124 */
125 private readyEventEmitted: boolean
afe0d5bf
JB
126 /**
127 * The start timestamp of the pool.
128 */
129 private readonly startTimestamp
130
729c563d
S
131 /**
132 * Constructs a new poolifier pool.
133 *
38e795c1 134 * @param numberOfWorkers - Number of workers that this pool should manage.
029715f0 135 * @param filePath - Path to the worker file.
38e795c1 136 * @param opts - Options for the pool.
729c563d 137 */
c97c7edb 138 public constructor (
b4213b7f
JB
139 protected readonly numberOfWorkers: number,
140 protected readonly filePath: string,
141 protected readonly opts: PoolOptions<Worker>
c97c7edb 142 ) {
78cea37e 143 if (!this.isMain()) {
04f45163 144 throw new Error(
8c6d4acf 145 'Cannot start a pool from a worker with the same type as the pool'
04f45163 146 )
c97c7edb 147 }
bde6b5d7 148 checkFilePath(this.filePath)
8003c026 149 this.checkNumberOfWorkers(this.numberOfWorkers)
7c0ba920 150 this.checkPoolOptions(this.opts)
1086026a 151
7254e419
JB
152 this.chooseWorkerNode = this.chooseWorkerNode.bind(this)
153 this.executeTask = this.executeTask.bind(this)
154 this.enqueueTask = this.enqueueTask.bind(this)
1086026a 155
6bd72cd0 156 if (this.opts.enableEvents === true) {
b5604034 157 this.initializeEventEmitter()
7c0ba920 158 }
d59df138
JB
159 this.workerChoiceStrategyContext = new WorkerChoiceStrategyContext<
160 Worker,
161 Data,
162 Response
da309861
JB
163 >(
164 this,
165 this.opts.workerChoiceStrategy,
166 this.opts.workerChoiceStrategyOptions
167 )
b6b32453
JB
168
169 this.setupHook()
170
72ae84a2
JB
171 this.taskFunctions = new Map<string, TaskFunction<Data, Response>>()
172
47352846 173 this.started = false
075e51d1 174 this.starting = false
711623b8 175 this.destroying = false
55082af9 176 this.readyEventEmitted = false
47352846
JB
177 if (this.opts.startWorkers === true) {
178 this.start()
179 }
afe0d5bf
JB
180
181 this.startTimestamp = performance.now()
c97c7edb
S
182 }
183
8d3782fa
JB
184 private checkNumberOfWorkers (numberOfWorkers: number): void {
185 if (numberOfWorkers == null) {
186 throw new Error(
187 'Cannot instantiate a pool without specifying the number of workers'
188 )
78cea37e 189 } else if (!Number.isSafeInteger(numberOfWorkers)) {
473c717a 190 throw new TypeError(
0d80593b 191 'Cannot instantiate a pool with a non safe integer number of workers'
8d3782fa
JB
192 )
193 } else if (numberOfWorkers < 0) {
473c717a 194 throw new RangeError(
8d3782fa
JB
195 'Cannot instantiate a pool with a negative number of workers'
196 )
6b27d407 197 } else if (this.type === PoolTypes.fixed && numberOfWorkers === 0) {
2431bdb4
JB
198 throw new RangeError('Cannot instantiate a fixed pool with zero worker')
199 }
200 }
201
7c0ba920 202 private checkPoolOptions (opts: PoolOptions<Worker>): void {
0d80593b 203 if (isPlainObject(opts)) {
47352846 204 this.opts.startWorkers = opts.startWorkers ?? true
bde6b5d7 205 checkValidWorkerChoiceStrategy(
2324f8c9
JB
206 opts.workerChoiceStrategy as WorkerChoiceStrategy
207 )
0d80593b
JB
208 this.opts.workerChoiceStrategy =
209 opts.workerChoiceStrategy ?? WorkerChoiceStrategies.ROUND_ROBIN
2324f8c9
JB
210 this.checkValidWorkerChoiceStrategyOptions(
211 opts.workerChoiceStrategyOptions as WorkerChoiceStrategyOptions
212 )
8990357d
JB
213 this.opts.workerChoiceStrategyOptions = {
214 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
215 ...opts.workerChoiceStrategyOptions
216 }
1f68cede 217 this.opts.restartWorkerOnError = opts.restartWorkerOnError ?? true
0d80593b
JB
218 this.opts.enableEvents = opts.enableEvents ?? true
219 this.opts.enableTasksQueue = opts.enableTasksQueue ?? false
220 if (this.opts.enableTasksQueue) {
bde6b5d7 221 checkValidTasksQueueOptions(opts.tasksQueueOptions as TasksQueueOptions)
0d80593b
JB
222 this.opts.tasksQueueOptions = this.buildTasksQueueOptions(
223 opts.tasksQueueOptions as TasksQueueOptions
224 )
225 }
226 } else {
227 throw new TypeError('Invalid pool options: must be a plain object')
7171d33f 228 }
aee46736
JB
229 }
230
0d80593b
JB
231 private checkValidWorkerChoiceStrategyOptions (
232 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
233 ): void {
2324f8c9
JB
234 if (
235 workerChoiceStrategyOptions != null &&
236 !isPlainObject(workerChoiceStrategyOptions)
237 ) {
0d80593b
JB
238 throw new TypeError(
239 'Invalid worker choice strategy options: must be a plain object'
240 )
241 }
8990357d 242 if (
2324f8c9 243 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 244 !Number.isSafeInteger(workerChoiceStrategyOptions.retries)
8990357d
JB
245 ) {
246 throw new TypeError(
8c0b113f 247 'Invalid worker choice strategy options: retries must be an integer'
8990357d
JB
248 )
249 }
250 if (
2324f8c9 251 workerChoiceStrategyOptions?.retries != null &&
8c0b113f 252 workerChoiceStrategyOptions.retries < 0
8990357d
JB
253 ) {
254 throw new RangeError(
8c0b113f 255 `Invalid worker choice strategy options: retries '${workerChoiceStrategyOptions.retries}' must be greater or equal than zero`
8990357d
JB
256 )
257 }
49be33fe 258 if (
2324f8c9 259 workerChoiceStrategyOptions?.weights != null &&
6b27d407 260 Object.keys(workerChoiceStrategyOptions.weights).length !== this.maxSize
49be33fe
JB
261 ) {
262 throw new Error(
263 'Invalid worker choice strategy options: must have a weight for each worker node'
264 )
265 }
f0d7f803 266 if (
2324f8c9 267 workerChoiceStrategyOptions?.measurement != null &&
f0d7f803
JB
268 !Object.values(Measurements).includes(
269 workerChoiceStrategyOptions.measurement
270 )
271 ) {
272 throw new Error(
273 `Invalid worker choice strategy options: invalid measurement '${workerChoiceStrategyOptions.measurement}'`
274 )
275 }
0d80593b
JB
276 }
277
b5604034 278 private initializeEventEmitter (): void {
5b7d00b4 279 this.emitter = new EventEmitterAsyncResource({
b5604034
JB
280 name: `poolifier:${this.type}-${this.worker}-pool`
281 })
282 }
283
08f3f44c 284 /** @inheritDoc */
6b27d407
JB
285 public get info (): PoolInfo {
286 return {
23ccf9d7 287 version,
6b27d407 288 type: this.type,
184855e6 289 worker: this.worker,
47352846 290 started: this.started,
2431bdb4
JB
291 ready: this.ready,
292 strategy: this.opts.workerChoiceStrategy as WorkerChoiceStrategy,
6b27d407
JB
293 minSize: this.minSize,
294 maxSize: this.maxSize,
c05f0d50
JB
295 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
296 .runTime.aggregate &&
1305e9a8
JB
297 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
298 .waitTime.aggregate && { utilization: round(this.utilization) }),
6b27d407
JB
299 workerNodes: this.workerNodes.length,
300 idleWorkerNodes: this.workerNodes.reduce(
301 (accumulator, workerNode) =>
f59e1027 302 workerNode.usage.tasks.executing === 0
a4e07f72
JB
303 ? accumulator + 1
304 : accumulator,
6b27d407
JB
305 0
306 ),
307 busyWorkerNodes: this.workerNodes.reduce(
308 (accumulator, workerNode) =>
f59e1027 309 workerNode.usage.tasks.executing > 0 ? accumulator + 1 : accumulator,
6b27d407
JB
310 0
311 ),
a4e07f72 312 executedTasks: this.workerNodes.reduce(
6b27d407 313 (accumulator, workerNode) =>
f59e1027 314 accumulator + workerNode.usage.tasks.executed,
a4e07f72
JB
315 0
316 ),
317 executingTasks: this.workerNodes.reduce(
318 (accumulator, workerNode) =>
f59e1027 319 accumulator + workerNode.usage.tasks.executing,
6b27d407
JB
320 0
321 ),
daf86646
JB
322 ...(this.opts.enableTasksQueue === true && {
323 queuedTasks: this.workerNodes.reduce(
324 (accumulator, workerNode) =>
325 accumulator + workerNode.usage.tasks.queued,
326 0
327 )
328 }),
329 ...(this.opts.enableTasksQueue === true && {
330 maxQueuedTasks: this.workerNodes.reduce(
331 (accumulator, workerNode) =>
332 accumulator + (workerNode.usage.tasks?.maxQueued ?? 0),
333 0
334 )
335 }),
a1763c54
JB
336 ...(this.opts.enableTasksQueue === true && {
337 backPressure: this.hasBackPressure()
338 }),
68cbdc84
JB
339 ...(this.opts.enableTasksQueue === true && {
340 stolenTasks: this.workerNodes.reduce(
341 (accumulator, workerNode) =>
342 accumulator + workerNode.usage.tasks.stolen,
343 0
344 )
345 }),
a4e07f72
JB
346 failedTasks: this.workerNodes.reduce(
347 (accumulator, workerNode) =>
f59e1027 348 accumulator + workerNode.usage.tasks.failed,
a4e07f72 349 0
1dcf8b7b
JB
350 ),
351 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
352 .runTime.aggregate && {
353 runTime: {
98e72cda 354 minimum: round(
90d6701c 355 min(
98e72cda 356 ...this.workerNodes.map(
041dc05b 357 workerNode => workerNode.usage.runTime?.minimum ?? Infinity
98e72cda 358 )
1dcf8b7b
JB
359 )
360 ),
98e72cda 361 maximum: round(
90d6701c 362 max(
98e72cda 363 ...this.workerNodes.map(
041dc05b 364 workerNode => workerNode.usage.runTime?.maximum ?? -Infinity
98e72cda 365 )
1dcf8b7b 366 )
98e72cda 367 ),
3baa0837
JB
368 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
369 .runTime.average && {
370 average: round(
371 average(
372 this.workerNodes.reduce<number[]>(
373 (accumulator, workerNode) =>
374 accumulator.concat(workerNode.usage.runTime.history),
375 []
376 )
98e72cda 377 )
dc021bcc 378 )
3baa0837 379 }),
98e72cda
JB
380 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
381 .runTime.median && {
382 median: round(
383 median(
3baa0837
JB
384 this.workerNodes.reduce<number[]>(
385 (accumulator, workerNode) =>
386 accumulator.concat(workerNode.usage.runTime.history),
387 []
98e72cda
JB
388 )
389 )
390 )
391 })
1dcf8b7b
JB
392 }
393 }),
394 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
395 .waitTime.aggregate && {
396 waitTime: {
98e72cda 397 minimum: round(
90d6701c 398 min(
98e72cda 399 ...this.workerNodes.map(
041dc05b 400 workerNode => workerNode.usage.waitTime?.minimum ?? Infinity
98e72cda 401 )
1dcf8b7b
JB
402 )
403 ),
98e72cda 404 maximum: round(
90d6701c 405 max(
98e72cda 406 ...this.workerNodes.map(
041dc05b 407 workerNode => workerNode.usage.waitTime?.maximum ?? -Infinity
98e72cda 408 )
1dcf8b7b 409 )
98e72cda 410 ),
3baa0837
JB
411 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
412 .waitTime.average && {
413 average: round(
414 average(
415 this.workerNodes.reduce<number[]>(
416 (accumulator, workerNode) =>
417 accumulator.concat(workerNode.usage.waitTime.history),
418 []
419 )
98e72cda 420 )
dc021bcc 421 )
3baa0837 422 }),
98e72cda
JB
423 ...(this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
424 .waitTime.median && {
425 median: round(
426 median(
3baa0837
JB
427 this.workerNodes.reduce<number[]>(
428 (accumulator, workerNode) =>
429 accumulator.concat(workerNode.usage.waitTime.history),
430 []
98e72cda
JB
431 )
432 )
433 )
434 })
1dcf8b7b
JB
435 }
436 })
6b27d407
JB
437 }
438 }
08f3f44c 439
aa9eede8
JB
440 /**
441 * The pool readiness boolean status.
442 */
2431bdb4
JB
443 private get ready (): boolean {
444 return (
b97d82d8
JB
445 this.workerNodes.reduce(
446 (accumulator, workerNode) =>
447 !workerNode.info.dynamic && workerNode.info.ready
448 ? accumulator + 1
449 : accumulator,
450 0
451 ) >= this.minSize
2431bdb4
JB
452 )
453 }
454
afe0d5bf 455 /**
aa9eede8 456 * The approximate pool utilization.
afe0d5bf
JB
457 *
458 * @returns The pool utilization.
459 */
460 private get utilization (): number {
8e5ca040 461 const poolTimeCapacity =
fe7d90db 462 (performance.now() - this.startTimestamp) * this.maxSize
afe0d5bf
JB
463 const totalTasksRunTime = this.workerNodes.reduce(
464 (accumulator, workerNode) =>
71514351 465 accumulator + (workerNode.usage.runTime?.aggregate ?? 0),
afe0d5bf
JB
466 0
467 )
468 const totalTasksWaitTime = this.workerNodes.reduce(
469 (accumulator, workerNode) =>
71514351 470 accumulator + (workerNode.usage.waitTime?.aggregate ?? 0),
afe0d5bf
JB
471 0
472 )
8e5ca040 473 return (totalTasksRunTime + totalTasksWaitTime) / poolTimeCapacity
afe0d5bf
JB
474 }
475
8881ae32 476 /**
aa9eede8 477 * The pool type.
8881ae32
JB
478 *
479 * If it is `'dynamic'`, it provides the `max` property.
480 */
481 protected abstract get type (): PoolType
482
184855e6 483 /**
aa9eede8 484 * The worker type.
184855e6
JB
485 */
486 protected abstract get worker (): WorkerType
487
c2ade475 488 /**
aa9eede8 489 * The pool minimum size.
c2ade475 490 */
8735b4e5
JB
491 protected get minSize (): number {
492 return this.numberOfWorkers
493 }
ff733df7
JB
494
495 /**
aa9eede8 496 * The pool maximum size.
ff733df7 497 */
8735b4e5
JB
498 protected get maxSize (): number {
499 return this.max ?? this.numberOfWorkers
500 }
a35560ba 501
6b813701
JB
502 /**
503 * Checks if the worker id sent in the received message from a worker is valid.
504 *
505 * @param message - The received message.
506 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the worker id is invalid.
507 */
4d159167 508 private checkMessageWorkerId (message: MessageValue<Data | Response>): void {
310de0aa
JB
509 if (message.workerId == null) {
510 throw new Error('Worker message received without worker id')
8e5a7cf9 511 } else if (this.getWorkerNodeKeyByWorkerId(message.workerId) === -1) {
21f710aa
JB
512 throw new Error(
513 `Worker message received from unknown worker '${message.workerId}'`
514 )
515 }
516 }
517
ffcbbad8 518 /**
f06e48d8 519 * Gets the given worker its worker node key.
ffcbbad8
JB
520 *
521 * @param worker - The worker.
f59e1027 522 * @returns The worker node key if found in the pool worker nodes, `-1` otherwise.
ffcbbad8 523 */
aad6fb64 524 private getWorkerNodeKeyByWorker (worker: Worker): number {
f06e48d8 525 return this.workerNodes.findIndex(
041dc05b 526 workerNode => workerNode.worker === worker
f06e48d8 527 )
bf9549ae
JB
528 }
529
aa9eede8
JB
530 /**
531 * Gets the worker node key given its worker id.
532 *
533 * @param workerId - The worker id.
aad6fb64 534 * @returns The worker node key if the worker id is found in the pool worker nodes, `-1` otherwise.
aa9eede8 535 */
72ae84a2 536 private getWorkerNodeKeyByWorkerId (workerId: number | undefined): number {
aad6fb64 537 return this.workerNodes.findIndex(
041dc05b 538 workerNode => workerNode.info.id === workerId
aad6fb64 539 )
aa9eede8
JB
540 }
541
afc003b2 542 /** @inheritDoc */
a35560ba 543 public setWorkerChoiceStrategy (
59219cbb
JB
544 workerChoiceStrategy: WorkerChoiceStrategy,
545 workerChoiceStrategyOptions?: WorkerChoiceStrategyOptions
a35560ba 546 ): void {
bde6b5d7 547 checkValidWorkerChoiceStrategy(workerChoiceStrategy)
b98ec2e6 548 this.opts.workerChoiceStrategy = workerChoiceStrategy
b6b32453
JB
549 this.workerChoiceStrategyContext.setWorkerChoiceStrategy(
550 this.opts.workerChoiceStrategy
551 )
552 if (workerChoiceStrategyOptions != null) {
553 this.setWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
554 }
aa9eede8 555 for (const [workerNodeKey, workerNode] of this.workerNodes.entries()) {
4b628b48 556 workerNode.resetUsage()
9edb9717 557 this.sendStatisticsMessageToWorker(workerNodeKey)
59219cbb 558 }
a20f0ba5
JB
559 }
560
561 /** @inheritDoc */
562 public setWorkerChoiceStrategyOptions (
563 workerChoiceStrategyOptions: WorkerChoiceStrategyOptions
564 ): void {
0d80593b 565 this.checkValidWorkerChoiceStrategyOptions(workerChoiceStrategyOptions)
8990357d
JB
566 this.opts.workerChoiceStrategyOptions = {
567 ...DEFAULT_WORKER_CHOICE_STRATEGY_OPTIONS,
568 ...workerChoiceStrategyOptions
569 }
a20f0ba5
JB
570 this.workerChoiceStrategyContext.setOptions(
571 this.opts.workerChoiceStrategyOptions
a35560ba
S
572 )
573 }
574
a20f0ba5 575 /** @inheritDoc */
8f52842f
JB
576 public enableTasksQueue (
577 enable: boolean,
578 tasksQueueOptions?: TasksQueueOptions
579 ): void {
a20f0ba5 580 if (this.opts.enableTasksQueue === true && !enable) {
d6ca1416
JB
581 this.unsetTaskStealing()
582 this.unsetTasksStealingOnBackPressure()
ef41a6e6 583 this.flushTasksQueues()
a20f0ba5
JB
584 }
585 this.opts.enableTasksQueue = enable
8f52842f 586 this.setTasksQueueOptions(tasksQueueOptions as TasksQueueOptions)
a20f0ba5
JB
587 }
588
589 /** @inheritDoc */
8f52842f 590 public setTasksQueueOptions (tasksQueueOptions: TasksQueueOptions): void {
a20f0ba5 591 if (this.opts.enableTasksQueue === true) {
bde6b5d7 592 checkValidTasksQueueOptions(tasksQueueOptions)
8f52842f
JB
593 this.opts.tasksQueueOptions =
594 this.buildTasksQueueOptions(tasksQueueOptions)
5b49e864 595 this.setTasksQueueSize(this.opts.tasksQueueOptions.size as number)
d6ca1416 596 if (this.opts.tasksQueueOptions.taskStealing === true) {
9f99eb9b 597 this.unsetTaskStealing()
d6ca1416
JB
598 this.setTaskStealing()
599 } else {
600 this.unsetTaskStealing()
601 }
602 if (this.opts.tasksQueueOptions.tasksStealingOnBackPressure === true) {
9f99eb9b 603 this.unsetTasksStealingOnBackPressure()
d6ca1416
JB
604 this.setTasksStealingOnBackPressure()
605 } else {
606 this.unsetTasksStealingOnBackPressure()
607 }
5baee0d7 608 } else if (this.opts.tasksQueueOptions != null) {
a20f0ba5
JB
609 delete this.opts.tasksQueueOptions
610 }
611 }
612
9b38ab2d
JB
613 private buildTasksQueueOptions (
614 tasksQueueOptions: TasksQueueOptions
615 ): TasksQueueOptions {
616 return {
617 ...{
618 size: Math.pow(this.maxSize, 2),
619 concurrency: 1,
620 taskStealing: true,
621 tasksStealingOnBackPressure: true
622 },
623 ...tasksQueueOptions
624 }
625 }
626
5b49e864 627 private setTasksQueueSize (size: number): void {
20c6f652 628 for (const workerNode of this.workerNodes) {
ff3f866a 629 workerNode.tasksQueueBackPressureSize = size
20c6f652
JB
630 }
631 }
632
d6ca1416
JB
633 private setTaskStealing (): void {
634 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 635 this.workerNodes[workerNodeKey].on(
65542a35 636 'idleWorkerNode',
e1c2dba7 637 this.handleIdleWorkerNodeEvent
9f95d5eb 638 )
d6ca1416
JB
639 }
640 }
641
642 private unsetTaskStealing (): void {
643 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 644 this.workerNodes[workerNodeKey].off(
65542a35 645 'idleWorkerNode',
e1c2dba7 646 this.handleIdleWorkerNodeEvent
9f95d5eb 647 )
d6ca1416
JB
648 }
649 }
650
651 private setTasksStealingOnBackPressure (): void {
652 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 653 this.workerNodes[workerNodeKey].on(
b5e75be8 654 'backPressure',
e1c2dba7 655 this.handleBackPressureEvent
9f95d5eb 656 )
d6ca1416
JB
657 }
658 }
659
660 private unsetTasksStealingOnBackPressure (): void {
661 for (const [workerNodeKey] of this.workerNodes.entries()) {
e1c2dba7 662 this.workerNodes[workerNodeKey].off(
b5e75be8 663 'backPressure',
e1c2dba7 664 this.handleBackPressureEvent
9f95d5eb 665 )
d6ca1416
JB
666 }
667 }
668
c319c66b
JB
669 /**
670 * Whether the pool is full or not.
671 *
672 * The pool filling boolean status.
673 */
dea903a8
JB
674 protected get full (): boolean {
675 return this.workerNodes.length >= this.maxSize
676 }
c2ade475 677
c319c66b
JB
678 /**
679 * Whether the pool is busy or not.
680 *
681 * The pool busyness boolean status.
682 */
683 protected abstract get busy (): boolean
7c0ba920 684
6c6afb84 685 /**
3d76750a 686 * Whether worker nodes are executing concurrently their tasks quota or not.
6c6afb84
JB
687 *
688 * @returns Worker nodes busyness boolean status.
689 */
c2ade475 690 protected internalBusy (): boolean {
3d76750a
JB
691 if (this.opts.enableTasksQueue === true) {
692 return (
693 this.workerNodes.findIndex(
041dc05b 694 workerNode =>
3d76750a
JB
695 workerNode.info.ready &&
696 workerNode.usage.tasks.executing <
697 (this.opts.tasksQueueOptions?.concurrency as number)
698 ) === -1
699 )
3d76750a 700 }
419e8121
JB
701 return (
702 this.workerNodes.findIndex(
703 workerNode =>
704 workerNode.info.ready && workerNode.usage.tasks.executing === 0
705 ) === -1
706 )
cb70b19d
JB
707 }
708
e81c38f2 709 private async sendTaskFunctionOperationToWorker (
72ae84a2
JB
710 workerNodeKey: number,
711 message: MessageValue<Data>
712 ): Promise<boolean> {
72ae84a2 713 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
714 const taskFunctionOperationListener = (
715 message: MessageValue<Response>
716 ): void => {
717 this.checkMessageWorkerId(message)
718 const workerId = this.getWorkerInfo(workerNodeKey).id as number
72ae84a2 719 if (
ae036c3e
JB
720 message.taskFunctionOperationStatus != null &&
721 message.workerId === workerId
72ae84a2 722 ) {
ae036c3e
JB
723 if (message.taskFunctionOperationStatus) {
724 resolve(true)
725 } else if (!message.taskFunctionOperationStatus) {
726 reject(
727 new Error(
728 `Task function operation '${
729 message.taskFunctionOperation as string
730 }' failed on worker ${message.workerId} with error: '${
731 message.workerError?.message as string
732 }'`
733 )
72ae84a2 734 )
ae036c3e
JB
735 }
736 this.deregisterWorkerMessageListener(
737 this.getWorkerNodeKeyByWorkerId(message.workerId),
738 taskFunctionOperationListener
72ae84a2
JB
739 )
740 }
ae036c3e
JB
741 }
742 this.registerWorkerMessageListener(
743 workerNodeKey,
744 taskFunctionOperationListener
745 )
72ae84a2
JB
746 this.sendToWorker(workerNodeKey, message)
747 })
748 }
749
750 private async sendTaskFunctionOperationToWorkers (
adee6053 751 message: MessageValue<Data>
e81c38f2
JB
752 ): Promise<boolean> {
753 return await new Promise<boolean>((resolve, reject) => {
ae036c3e
JB
754 const responsesReceived = new Array<MessageValue<Response>>()
755 const taskFunctionOperationsListener = (
756 message: MessageValue<Response>
757 ): void => {
758 this.checkMessageWorkerId(message)
759 if (message.taskFunctionOperationStatus != null) {
760 responsesReceived.push(message)
761 if (responsesReceived.length === this.workerNodes.length) {
e81c38f2 762 if (
e81c38f2
JB
763 responsesReceived.every(
764 message => message.taskFunctionOperationStatus === true
765 )
766 ) {
767 resolve(true)
768 } else if (
e81c38f2
JB
769 responsesReceived.some(
770 message => message.taskFunctionOperationStatus === false
771 )
772 ) {
b0b55f57
JB
773 const errorResponse = responsesReceived.find(
774 response => response.taskFunctionOperationStatus === false
775 )
e81c38f2
JB
776 reject(
777 new Error(
b0b55f57 778 `Task function operation '${
e81c38f2 779 message.taskFunctionOperation as string
b0b55f57
JB
780 }' failed on worker ${
781 errorResponse?.workerId as number
782 } with error: '${
783 errorResponse?.workerError?.message as string
784 }'`
e81c38f2
JB
785 )
786 )
787 }
ae036c3e
JB
788 this.deregisterWorkerMessageListener(
789 this.getWorkerNodeKeyByWorkerId(message.workerId),
790 taskFunctionOperationsListener
791 )
e81c38f2 792 }
ae036c3e
JB
793 }
794 }
795 for (const [workerNodeKey] of this.workerNodes.entries()) {
796 this.registerWorkerMessageListener(
797 workerNodeKey,
798 taskFunctionOperationsListener
799 )
72ae84a2 800 this.sendToWorker(workerNodeKey, message)
e81c38f2
JB
801 }
802 })
6703b9f4
JB
803 }
804
805 /** @inheritDoc */
806 public hasTaskFunction (name: string): boolean {
edbc15c6
JB
807 for (const workerNode of this.workerNodes) {
808 if (
809 Array.isArray(workerNode.info.taskFunctionNames) &&
810 workerNode.info.taskFunctionNames.includes(name)
811 ) {
812 return true
813 }
814 }
815 return false
6703b9f4
JB
816 }
817
818 /** @inheritDoc */
e81c38f2
JB
819 public async addTaskFunction (
820 name: string,
3feeab69 821 fn: TaskFunction<Data, Response>
e81c38f2 822 ): Promise<boolean> {
3feeab69
JB
823 if (typeof name !== 'string') {
824 throw new TypeError('name argument must be a string')
825 }
826 if (typeof name === 'string' && name.trim().length === 0) {
827 throw new TypeError('name argument must not be an empty string')
828 }
829 if (typeof fn !== 'function') {
830 throw new TypeError('fn argument must be a function')
831 }
adee6053 832 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
833 taskFunctionOperation: 'add',
834 taskFunctionName: name,
3feeab69 835 taskFunction: fn.toString()
6703b9f4 836 })
adee6053
JB
837 this.taskFunctions.set(name, fn)
838 return opResult
6703b9f4
JB
839 }
840
841 /** @inheritDoc */
e81c38f2 842 public async removeTaskFunction (name: string): Promise<boolean> {
9eae3c69
JB
843 if (!this.taskFunctions.has(name)) {
844 throw new Error(
16248b23 845 'Cannot remove a task function not handled on the pool side'
9eae3c69
JB
846 )
847 }
adee6053 848 const opResult = await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
849 taskFunctionOperation: 'remove',
850 taskFunctionName: name
851 })
adee6053
JB
852 this.deleteTaskFunctionWorkerUsages(name)
853 this.taskFunctions.delete(name)
854 return opResult
6703b9f4
JB
855 }
856
90d7d101 857 /** @inheritDoc */
6703b9f4 858 public listTaskFunctionNames (): string[] {
f2dbbf95
JB
859 for (const workerNode of this.workerNodes) {
860 if (
6703b9f4
JB
861 Array.isArray(workerNode.info.taskFunctionNames) &&
862 workerNode.info.taskFunctionNames.length > 0
f2dbbf95 863 ) {
6703b9f4 864 return workerNode.info.taskFunctionNames
f2dbbf95 865 }
90d7d101 866 }
f2dbbf95 867 return []
90d7d101
JB
868 }
869
6703b9f4 870 /** @inheritDoc */
e81c38f2 871 public async setDefaultTaskFunction (name: string): Promise<boolean> {
72ae84a2 872 return await this.sendTaskFunctionOperationToWorkers({
6703b9f4
JB
873 taskFunctionOperation: 'default',
874 taskFunctionName: name
875 })
6703b9f4
JB
876 }
877
adee6053
JB
878 private deleteTaskFunctionWorkerUsages (name: string): void {
879 for (const workerNode of this.workerNodes) {
880 workerNode.deleteTaskFunctionWorkerUsage(name)
881 }
882 }
883
375f7504
JB
884 private shallExecuteTask (workerNodeKey: number): boolean {
885 return (
886 this.tasksQueueSize(workerNodeKey) === 0 &&
887 this.workerNodes[workerNodeKey].usage.tasks.executing <
888 (this.opts.tasksQueueOptions?.concurrency as number)
889 )
890 }
891
afc003b2 892 /** @inheritDoc */
7d91a8cd
JB
893 public async execute (
894 data?: Data,
895 name?: string,
896 transferList?: TransferListItem[]
897 ): Promise<Response> {
52b71763 898 return await new Promise<Response>((resolve, reject) => {
15b176e0 899 if (!this.started) {
47352846 900 reject(new Error('Cannot execute a task on not started pool'))
9d2d0da1 901 return
15b176e0 902 }
711623b8
JB
903 if (this.destroying) {
904 reject(new Error('Cannot execute a task on destroying pool'))
905 return
906 }
7d91a8cd
JB
907 if (name != null && typeof name !== 'string') {
908 reject(new TypeError('name argument must be a string'))
9d2d0da1 909 return
7d91a8cd 910 }
90d7d101
JB
911 if (
912 name != null &&
913 typeof name === 'string' &&
914 name.trim().length === 0
915 ) {
f58b60b9 916 reject(new TypeError('name argument must not be an empty string'))
9d2d0da1 917 return
90d7d101 918 }
b558f6b5
JB
919 if (transferList != null && !Array.isArray(transferList)) {
920 reject(new TypeError('transferList argument must be an array'))
9d2d0da1 921 return
b558f6b5
JB
922 }
923 const timestamp = performance.now()
924 const workerNodeKey = this.chooseWorkerNode()
501aea93 925 const task: Task<Data> = {
52b71763
JB
926 name: name ?? DEFAULT_TASK_NAME,
927 // eslint-disable-next-line @typescript-eslint/consistent-type-assertions
928 data: data ?? ({} as Data),
7d91a8cd 929 transferList,
52b71763 930 timestamp,
7629bdf1 931 taskId: randomUUID()
52b71763 932 }
7629bdf1 933 this.promiseResponseMap.set(task.taskId as string, {
2e81254d
JB
934 resolve,
935 reject,
501aea93 936 workerNodeKey
2e81254d 937 })
52b71763 938 if (
4e377863
JB
939 this.opts.enableTasksQueue === false ||
940 (this.opts.enableTasksQueue === true &&
375f7504 941 this.shallExecuteTask(workerNodeKey))
52b71763 942 ) {
501aea93 943 this.executeTask(workerNodeKey, task)
4e377863
JB
944 } else {
945 this.enqueueTask(workerNodeKey, task)
52b71763 946 }
2e81254d 947 })
280c2a77 948 }
c97c7edb 949
47352846
JB
950 /** @inheritdoc */
951 public start (): void {
711623b8
JB
952 if (this.started) {
953 throw new Error('Cannot start an already started pool')
954 }
955 if (this.starting) {
956 throw new Error('Cannot start an already starting pool')
957 }
958 if (this.destroying) {
959 throw new Error('Cannot start a destroying pool')
960 }
47352846
JB
961 this.starting = true
962 while (
963 this.workerNodes.reduce(
964 (accumulator, workerNode) =>
965 !workerNode.info.dynamic ? accumulator + 1 : accumulator,
966 0
967 ) < this.numberOfWorkers
968 ) {
969 this.createAndSetupWorkerNode()
970 }
971 this.starting = false
972 this.started = true
973 }
974
afc003b2 975 /** @inheritDoc */
c97c7edb 976 public async destroy (): Promise<void> {
711623b8
JB
977 if (!this.started) {
978 throw new Error('Cannot destroy an already destroyed pool')
979 }
980 if (this.starting) {
981 throw new Error('Cannot destroy an starting pool')
982 }
983 if (this.destroying) {
984 throw new Error('Cannot destroy an already destroying pool')
985 }
986 this.destroying = true
1fbcaa7c 987 await Promise.all(
14c39df1 988 this.workerNodes.map(async (_workerNode, workerNodeKey) => {
aa9eede8 989 await this.destroyWorkerNode(workerNodeKey)
1fbcaa7c
JB
990 })
991 )
33e6bb4c 992 this.emitter?.emit(PoolEvents.destroy, this.info)
f80125ca 993 this.emitter?.emitDestroy()
78f60f82 994 this.emitter?.removeAllListeners()
55082af9 995 this.readyEventEmitted = false
711623b8 996 this.destroying = false
15b176e0 997 this.started = false
c97c7edb
S
998 }
999
1e3214b6 1000 protected async sendKillMessageToWorker (
72ae84a2 1001 workerNodeKey: number
1e3214b6 1002 ): Promise<void> {
9edb9717 1003 await new Promise<void>((resolve, reject) => {
ae036c3e
JB
1004 const killMessageListener = (message: MessageValue<Response>): void => {
1005 this.checkMessageWorkerId(message)
1e3214b6
JB
1006 if (message.kill === 'success') {
1007 resolve()
1008 } else if (message.kill === 'failure') {
72ae84a2
JB
1009 reject(
1010 new Error(
ae036c3e 1011 `Kill message handling failed on worker ${
72ae84a2 1012 message.workerId as number
ae036c3e 1013 }`
72ae84a2
JB
1014 )
1015 )
1e3214b6 1016 }
ae036c3e 1017 }
51d9cfbd 1018 // FIXME: should be registered only once
ae036c3e 1019 this.registerWorkerMessageListener(workerNodeKey, killMessageListener)
72ae84a2 1020 this.sendToWorker(workerNodeKey, { kill: true })
1e3214b6 1021 })
1e3214b6
JB
1022 }
1023
4a6952ff 1024 /**
aa9eede8 1025 * Terminates the worker node given its worker node key.
4a6952ff 1026 *
aa9eede8 1027 * @param workerNodeKey - The worker node key.
4a6952ff 1028 */
81c02522 1029 protected abstract destroyWorkerNode (workerNodeKey: number): Promise<void>
c97c7edb 1030
729c563d 1031 /**
6677a3d3
JB
1032 * Setup hook to execute code before worker nodes are created in the abstract constructor.
1033 * Can be overridden.
afc003b2
JB
1034 *
1035 * @virtual
729c563d 1036 */
280c2a77 1037 protected setupHook (): void {
965df41c 1038 /* Intentionally empty */
280c2a77 1039 }
c97c7edb 1040
729c563d 1041 /**
280c2a77
S
1042 * Should return whether the worker is the main worker or not.
1043 */
1044 protected abstract isMain (): boolean
1045
1046 /**
2e81254d 1047 * Hook executed before the worker task execution.
bf9549ae 1048 * Can be overridden.
729c563d 1049 *
f06e48d8 1050 * @param workerNodeKey - The worker node key.
1c6fe997 1051 * @param task - The task to execute.
729c563d 1052 */
1c6fe997
JB
1053 protected beforeTaskExecutionHook (
1054 workerNodeKey: number,
1055 task: Task<Data>
1056 ): void {
94407def
JB
1057 if (this.workerNodes[workerNodeKey]?.usage != null) {
1058 const workerUsage = this.workerNodes[workerNodeKey].usage
1059 ++workerUsage.tasks.executing
1060 this.updateWaitTimeWorkerUsage(workerUsage, task)
1061 }
1062 if (
1063 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1064 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
1065 task.name as string
1066 ) != null
1067 ) {
db0e38ee 1068 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1069 workerNodeKey
db0e38ee 1070 ].getTaskFunctionWorkerUsage(task.name as string) as WorkerUsage
5623b8d5
JB
1071 ++taskFunctionWorkerUsage.tasks.executing
1072 this.updateWaitTimeWorkerUsage(taskFunctionWorkerUsage, task)
b558f6b5 1073 }
c97c7edb
S
1074 }
1075
c01733f1 1076 /**
2e81254d 1077 * Hook executed after the worker task execution.
bf9549ae 1078 * Can be overridden.
c01733f1 1079 *
501aea93 1080 * @param workerNodeKey - The worker node key.
38e795c1 1081 * @param message - The received message.
c01733f1 1082 */
2e81254d 1083 protected afterTaskExecutionHook (
501aea93 1084 workerNodeKey: number,
2740a743 1085 message: MessageValue<Response>
bf9549ae 1086 ): void {
94407def
JB
1087 if (this.workerNodes[workerNodeKey]?.usage != null) {
1088 const workerUsage = this.workerNodes[workerNodeKey].usage
1089 this.updateTaskStatisticsWorkerUsage(workerUsage, message)
1090 this.updateRunTimeWorkerUsage(workerUsage, message)
1091 this.updateEluWorkerUsage(workerUsage, message)
1092 }
1093 if (
1094 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1095 this.workerNodes[workerNodeKey].getTaskFunctionWorkerUsage(
5623b8d5 1096 message.taskPerformance?.name as string
94407def
JB
1097 ) != null
1098 ) {
db0e38ee 1099 const taskFunctionWorkerUsage = this.workerNodes[
b558f6b5 1100 workerNodeKey
db0e38ee 1101 ].getTaskFunctionWorkerUsage(
0628755c 1102 message.taskPerformance?.name as string
b558f6b5 1103 ) as WorkerUsage
db0e38ee
JB
1104 this.updateTaskStatisticsWorkerUsage(taskFunctionWorkerUsage, message)
1105 this.updateRunTimeWorkerUsage(taskFunctionWorkerUsage, message)
1106 this.updateEluWorkerUsage(taskFunctionWorkerUsage, message)
b558f6b5
JB
1107 }
1108 }
1109
db0e38ee
JB
1110 /**
1111 * Whether the worker node shall update its task function worker usage or not.
1112 *
1113 * @param workerNodeKey - The worker node key.
1114 * @returns `true` if the worker node shall update its task function worker usage, `false` otherwise.
1115 */
1116 private shallUpdateTaskFunctionWorkerUsage (workerNodeKey: number): boolean {
a5d15204 1117 const workerInfo = this.getWorkerInfo(workerNodeKey)
b558f6b5 1118 return (
94407def 1119 workerInfo != null &&
6703b9f4
JB
1120 Array.isArray(workerInfo.taskFunctionNames) &&
1121 workerInfo.taskFunctionNames.length > 2
b558f6b5 1122 )
f1c06930
JB
1123 }
1124
1125 private updateTaskStatisticsWorkerUsage (
1126 workerUsage: WorkerUsage,
1127 message: MessageValue<Response>
1128 ): void {
a4e07f72 1129 const workerTaskStatistics = workerUsage.tasks
5bb5be17
JB
1130 if (
1131 workerTaskStatistics.executing != null &&
1132 workerTaskStatistics.executing > 0
1133 ) {
1134 --workerTaskStatistics.executing
5bb5be17 1135 }
6703b9f4 1136 if (message.workerError == null) {
98e72cda
JB
1137 ++workerTaskStatistics.executed
1138 } else {
a4e07f72 1139 ++workerTaskStatistics.failed
2740a743 1140 }
f8eb0a2a
JB
1141 }
1142
a4e07f72
JB
1143 private updateRunTimeWorkerUsage (
1144 workerUsage: WorkerUsage,
f8eb0a2a
JB
1145 message: MessageValue<Response>
1146 ): void {
6703b9f4 1147 if (message.workerError != null) {
dc021bcc
JB
1148 return
1149 }
e4f20deb
JB
1150 updateMeasurementStatistics(
1151 workerUsage.runTime,
1152 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().runTime,
dc021bcc 1153 message.taskPerformance?.runTime ?? 0
e4f20deb 1154 )
f8eb0a2a
JB
1155 }
1156
a4e07f72
JB
1157 private updateWaitTimeWorkerUsage (
1158 workerUsage: WorkerUsage,
1c6fe997 1159 task: Task<Data>
f8eb0a2a 1160 ): void {
1c6fe997
JB
1161 const timestamp = performance.now()
1162 const taskWaitTime = timestamp - (task.timestamp ?? timestamp)
e4f20deb
JB
1163 updateMeasurementStatistics(
1164 workerUsage.waitTime,
1165 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().waitTime,
dc021bcc 1166 taskWaitTime
e4f20deb 1167 )
c01733f1 1168 }
1169
a4e07f72 1170 private updateEluWorkerUsage (
5df69fab 1171 workerUsage: WorkerUsage,
62c15a68
JB
1172 message: MessageValue<Response>
1173 ): void {
6703b9f4 1174 if (message.workerError != null) {
dc021bcc
JB
1175 return
1176 }
008512c7
JB
1177 const eluTaskStatisticsRequirements: MeasurementStatisticsRequirements =
1178 this.workerChoiceStrategyContext.getTaskStatisticsRequirements().elu
e4f20deb
JB
1179 updateMeasurementStatistics(
1180 workerUsage.elu.active,
008512c7 1181 eluTaskStatisticsRequirements,
dc021bcc 1182 message.taskPerformance?.elu?.active ?? 0
e4f20deb
JB
1183 )
1184 updateMeasurementStatistics(
1185 workerUsage.elu.idle,
008512c7 1186 eluTaskStatisticsRequirements,
dc021bcc 1187 message.taskPerformance?.elu?.idle ?? 0
e4f20deb 1188 )
008512c7 1189 if (eluTaskStatisticsRequirements.aggregate) {
f7510105 1190 if (message.taskPerformance?.elu != null) {
f7510105
JB
1191 if (workerUsage.elu.utilization != null) {
1192 workerUsage.elu.utilization =
1193 (workerUsage.elu.utilization +
1194 message.taskPerformance.elu.utilization) /
1195 2
1196 } else {
1197 workerUsage.elu.utilization = message.taskPerformance.elu.utilization
1198 }
62c15a68
JB
1199 }
1200 }
1201 }
1202
280c2a77 1203 /**
f06e48d8 1204 * Chooses a worker node for the next task.
280c2a77 1205 *
6c6afb84 1206 * The default worker choice strategy uses a round robin algorithm to distribute the tasks.
280c2a77 1207 *
aa9eede8 1208 * @returns The chosen worker node key
280c2a77 1209 */
6c6afb84 1210 private chooseWorkerNode (): number {
930dcf12 1211 if (this.shallCreateDynamicWorker()) {
aa9eede8 1212 const workerNodeKey = this.createAndSetupDynamicWorkerNode()
6c6afb84 1213 if (
b1aae695 1214 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
6c6afb84 1215 ) {
aa9eede8 1216 return workerNodeKey
6c6afb84 1217 }
17393ac8 1218 }
930dcf12
JB
1219 return this.workerChoiceStrategyContext.execute()
1220 }
1221
6c6afb84
JB
1222 /**
1223 * Conditions for dynamic worker creation.
1224 *
1225 * @returns Whether to create a dynamic worker or not.
1226 */
1227 private shallCreateDynamicWorker (): boolean {
930dcf12 1228 return this.type === PoolTypes.dynamic && !this.full && this.internalBusy()
c97c7edb
S
1229 }
1230
280c2a77 1231 /**
aa9eede8 1232 * Sends a message to worker given its worker node key.
280c2a77 1233 *
aa9eede8 1234 * @param workerNodeKey - The worker node key.
38e795c1 1235 * @param message - The message.
7d91a8cd 1236 * @param transferList - The optional array of transferable objects.
280c2a77
S
1237 */
1238 protected abstract sendToWorker (
aa9eede8 1239 workerNodeKey: number,
7d91a8cd
JB
1240 message: MessageValue<Data>,
1241 transferList?: TransferListItem[]
280c2a77
S
1242 ): void
1243
729c563d 1244 /**
41344292 1245 * Creates a new worker.
6c6afb84
JB
1246 *
1247 * @returns Newly created worker.
729c563d 1248 */
280c2a77 1249 protected abstract createWorker (): Worker
c97c7edb 1250
4a6952ff 1251 /**
aa9eede8 1252 * Creates a new, completely set up worker node.
4a6952ff 1253 *
aa9eede8 1254 * @returns New, completely set up worker node key.
4a6952ff 1255 */
aa9eede8 1256 protected createAndSetupWorkerNode (): number {
bdacc2d2 1257 const worker = this.createWorker()
280c2a77 1258
fd04474e 1259 worker.on('online', this.opts.onlineHandler ?? EMPTY_FUNCTION)
35cf1c03 1260 worker.on('message', this.opts.messageHandler ?? EMPTY_FUNCTION)
a35560ba 1261 worker.on('error', this.opts.errorHandler ?? EMPTY_FUNCTION)
041dc05b 1262 worker.on('error', error => {
aad6fb64 1263 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
23b11f60 1264 this.flagWorkerNodeAsNotReady(workerNodeKey)
46b0bb09 1265 const workerInfo = this.getWorkerInfo(workerNodeKey)
2a69b8c5 1266 this.emitter?.emit(PoolEvents.error, error)
cce7d657 1267 this.workerNodes[workerNodeKey].closeChannel()
15b176e0 1268 if (
b6bfca01 1269 this.started &&
9b38ab2d 1270 !this.starting &&
711623b8 1271 !this.destroying &&
9b38ab2d 1272 this.opts.restartWorkerOnError === true
15b176e0 1273 ) {
9b106837 1274 if (workerInfo.dynamic) {
aa9eede8 1275 this.createAndSetupDynamicWorkerNode()
8a1260a3 1276 } else {
aa9eede8 1277 this.createAndSetupWorkerNode()
8a1260a3 1278 }
5baee0d7 1279 }
6d59c3a3 1280 if (this.started && this.opts.enableTasksQueue === true) {
9b106837 1281 this.redistributeQueuedTasks(workerNodeKey)
19dbc45b 1282 }
5baee0d7 1283 })
a35560ba 1284 worker.on('exit', this.opts.exitHandler ?? EMPTY_FUNCTION)
a974afa6 1285 worker.once('exit', () => {
f06e48d8 1286 this.removeWorkerNode(worker)
a974afa6 1287 })
280c2a77 1288
aa9eede8 1289 const workerNodeKey = this.addWorkerNode(worker)
280c2a77 1290
aa9eede8 1291 this.afterWorkerNodeSetup(workerNodeKey)
280c2a77 1292
aa9eede8 1293 return workerNodeKey
c97c7edb 1294 }
be0676b3 1295
930dcf12 1296 /**
aa9eede8 1297 * Creates a new, completely set up dynamic worker node.
930dcf12 1298 *
aa9eede8 1299 * @returns New, completely set up dynamic worker node key.
930dcf12 1300 */
aa9eede8
JB
1301 protected createAndSetupDynamicWorkerNode (): number {
1302 const workerNodeKey = this.createAndSetupWorkerNode()
041dc05b 1303 this.registerWorkerMessageListener(workerNodeKey, message => {
4d159167 1304 this.checkMessageWorkerId(message)
aa9eede8
JB
1305 const localWorkerNodeKey = this.getWorkerNodeKeyByWorkerId(
1306 message.workerId
aad6fb64 1307 )
aa9eede8 1308 const workerUsage = this.workerNodes[localWorkerNodeKey].usage
81c02522 1309 // Kill message received from worker
930dcf12
JB
1310 if (
1311 isKillBehavior(KillBehaviors.HARD, message.kill) ||
1e3214b6 1312 (isKillBehavior(KillBehaviors.SOFT, message.kill) &&
7b56f532 1313 ((this.opts.enableTasksQueue === false &&
aa9eede8 1314 workerUsage.tasks.executing === 0) ||
7b56f532 1315 (this.opts.enableTasksQueue === true &&
aa9eede8
JB
1316 workerUsage.tasks.executing === 0 &&
1317 this.tasksQueueSize(localWorkerNodeKey) === 0)))
930dcf12 1318 ) {
f3827d5d 1319 // Flag the worker node as not ready immediately
ae3ab61d 1320 this.flagWorkerNodeAsNotReady(localWorkerNodeKey)
041dc05b 1321 this.destroyWorkerNode(localWorkerNodeKey).catch(error => {
5270d253
JB
1322 this.emitter?.emit(PoolEvents.error, error)
1323 })
930dcf12
JB
1324 }
1325 })
46b0bb09 1326 const workerInfo = this.getWorkerInfo(workerNodeKey)
aa9eede8 1327 this.sendToWorker(workerNodeKey, {
e9dd5b66 1328 checkActive: true
21f710aa 1329 })
72ae84a2
JB
1330 if (this.taskFunctions.size > 0) {
1331 for (const [taskFunctionName, taskFunction] of this.taskFunctions) {
1332 this.sendTaskFunctionOperationToWorker(workerNodeKey, {
1333 taskFunctionOperation: 'add',
1334 taskFunctionName,
1335 taskFunction: taskFunction.toString()
1336 }).catch(error => {
1337 this.emitter?.emit(PoolEvents.error, error)
1338 })
1339 }
1340 }
b5e113f6 1341 workerInfo.dynamic = true
b1aae695
JB
1342 if (
1343 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerReady ||
1344 this.workerChoiceStrategyContext.getStrategyPolicy().dynamicWorkerUsage
1345 ) {
b5e113f6
JB
1346 workerInfo.ready = true
1347 }
33e6bb4c 1348 this.checkAndEmitDynamicWorkerCreationEvents()
aa9eede8 1349 return workerNodeKey
930dcf12
JB
1350 }
1351
a2ed5053 1352 /**
aa9eede8 1353 * Registers a listener callback on the worker given its worker node key.
a2ed5053 1354 *
aa9eede8 1355 * @param workerNodeKey - The worker node key.
a2ed5053
JB
1356 * @param listener - The message listener callback.
1357 */
85aeb3f3
JB
1358 protected abstract registerWorkerMessageListener<
1359 Message extends Data | Response
aa9eede8
JB
1360 >(
1361 workerNodeKey: number,
1362 listener: (message: MessageValue<Message>) => void
1363 ): void
a2ed5053 1364
ae036c3e
JB
1365 /**
1366 * Registers once a listener callback on the worker given its worker node key.
1367 *
1368 * @param workerNodeKey - The worker node key.
1369 * @param listener - The message listener callback.
1370 */
1371 protected abstract registerOnceWorkerMessageListener<
1372 Message extends Data | Response
1373 >(
1374 workerNodeKey: number,
1375 listener: (message: MessageValue<Message>) => void
1376 ): void
1377
1378 /**
1379 * Deregisters a listener callback on the worker given its worker node key.
1380 *
1381 * @param workerNodeKey - The worker node key.
1382 * @param listener - The message listener callback.
1383 */
1384 protected abstract deregisterWorkerMessageListener<
1385 Message extends Data | Response
1386 >(
1387 workerNodeKey: number,
1388 listener: (message: MessageValue<Message>) => void
1389 ): void
1390
a2ed5053 1391 /**
aa9eede8 1392 * Method hooked up after a worker node has been newly created.
a2ed5053
JB
1393 * Can be overridden.
1394 *
aa9eede8 1395 * @param workerNodeKey - The newly created worker node key.
a2ed5053 1396 */
aa9eede8 1397 protected afterWorkerNodeSetup (workerNodeKey: number): void {
a2ed5053 1398 // Listen to worker messages.
fcd39179
JB
1399 this.registerWorkerMessageListener(
1400 workerNodeKey,
3a776b6d 1401 this.workerMessageListener
fcd39179 1402 )
85aeb3f3 1403 // Send the startup message to worker.
aa9eede8 1404 this.sendStartupMessageToWorker(workerNodeKey)
9edb9717
JB
1405 // Send the statistics message to worker.
1406 this.sendStatisticsMessageToWorker(workerNodeKey)
72695f86 1407 if (this.opts.enableTasksQueue === true) {
dbd73092 1408 if (this.opts.tasksQueueOptions?.taskStealing === true) {
e1c2dba7 1409 this.workerNodes[workerNodeKey].on(
65542a35 1410 'idleWorkerNode',
e1c2dba7 1411 this.handleIdleWorkerNodeEvent
9f95d5eb 1412 )
47352846
JB
1413 }
1414 if (this.opts.tasksQueueOptions?.tasksStealingOnBackPressure === true) {
e1c2dba7 1415 this.workerNodes[workerNodeKey].on(
b5e75be8 1416 'backPressure',
e1c2dba7 1417 this.handleBackPressureEvent
9f95d5eb 1418 )
47352846 1419 }
72695f86 1420 }
d2c73f82
JB
1421 }
1422
85aeb3f3 1423 /**
aa9eede8
JB
1424 * Sends the startup message to worker given its worker node key.
1425 *
1426 * @param workerNodeKey - The worker node key.
1427 */
1428 protected abstract sendStartupMessageToWorker (workerNodeKey: number): void
1429
1430 /**
9edb9717 1431 * Sends the statistics message to worker given its worker node key.
85aeb3f3 1432 *
aa9eede8 1433 * @param workerNodeKey - The worker node key.
85aeb3f3 1434 */
9edb9717 1435 private sendStatisticsMessageToWorker (workerNodeKey: number): void {
aa9eede8
JB
1436 this.sendToWorker(workerNodeKey, {
1437 statistics: {
1438 runTime:
1439 this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1440 .runTime.aggregate,
1441 elu: this.workerChoiceStrategyContext.getTaskStatisticsRequirements()
1442 .elu.aggregate
72ae84a2 1443 }
aa9eede8
JB
1444 })
1445 }
a2ed5053
JB
1446
1447 private redistributeQueuedTasks (workerNodeKey: number): void {
cb71d660
JB
1448 if (this.workerNodes.length <= 1) {
1449 return
1450 }
a2ed5053 1451 while (this.tasksQueueSize(workerNodeKey) > 0) {
f201a0cd
JB
1452 const destinationWorkerNodeKey = this.workerNodes.reduce(
1453 (minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
852ed3e4
JB
1454 return workerNode.info.ready &&
1455 workerNode.usage.tasks.queued <
1456 workerNodes[minWorkerNodeKey].usage.tasks.queued
f201a0cd
JB
1457 ? workerNodeKey
1458 : minWorkerNodeKey
1459 },
1460 0
1461 )
72ae84a2 1462 const task = this.dequeueTask(workerNodeKey) as Task<Data>
3f690f25
JB
1463 if (this.shallExecuteTask(destinationWorkerNodeKey)) {
1464 this.executeTask(destinationWorkerNodeKey, task)
1465 } else {
1466 this.enqueueTask(destinationWorkerNodeKey, task)
dd951876
JB
1467 }
1468 }
1469 }
1470
b1838604
JB
1471 private updateTaskStolenStatisticsWorkerUsage (
1472 workerNodeKey: number,
b1838604
JB
1473 taskName: string
1474 ): void {
1a880eca 1475 const workerNode = this.workerNodes[workerNodeKey]
b1838604
JB
1476 if (workerNode?.usage != null) {
1477 ++workerNode.usage.tasks.stolen
1478 }
1479 if (
1480 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1481 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1482 ) {
1483 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1484 taskName
1485 ) as WorkerUsage
1486 ++taskFunctionWorkerUsage.tasks.stolen
1487 }
1488 }
1489
463226a4 1490 private updateTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1491 workerNodeKey: number
463226a4
JB
1492 ): void {
1493 const workerNode = this.workerNodes[workerNodeKey]
1494 if (workerNode?.usage != null) {
1495 ++workerNode.usage.tasks.sequentiallyStolen
1496 }
f1f77f45
JB
1497 }
1498
1499 private updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1500 workerNodeKey: number,
1501 taskName: string
1502 ): void {
1503 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1504 if (
1505 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1506 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1507 ) {
1508 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1509 taskName
1510 ) as WorkerUsage
1511 ++taskFunctionWorkerUsage.tasks.sequentiallyStolen
1512 }
1513 }
1514
1515 private resetTaskSequentiallyStolenStatisticsWorkerUsage (
f1f77f45 1516 workerNodeKey: number
463226a4
JB
1517 ): void {
1518 const workerNode = this.workerNodes[workerNodeKey]
1519 if (workerNode?.usage != null) {
1520 workerNode.usage.tasks.sequentiallyStolen = 0
1521 }
f1f77f45
JB
1522 }
1523
1524 private resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage (
1525 workerNodeKey: number,
1526 taskName: string
1527 ): void {
1528 const workerNode = this.workerNodes[workerNodeKey]
463226a4
JB
1529 if (
1530 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1531 workerNode.getTaskFunctionWorkerUsage(taskName) != null
1532 ) {
1533 const taskFunctionWorkerUsage = workerNode.getTaskFunctionWorkerUsage(
1534 taskName
1535 ) as WorkerUsage
1536 taskFunctionWorkerUsage.tasks.sequentiallyStolen = 0
1537 }
1538 }
1539
65542a35 1540 private readonly handleIdleWorkerNodeEvent = (
e1c2dba7 1541 eventDetail: WorkerNodeEventDetail,
463226a4 1542 previousStolenTask?: Task<Data>
9f95d5eb 1543 ): void => {
cb71d660
JB
1544 if (this.workerNodes.length <= 1) {
1545 return
1546 }
e1c2dba7 1547 const { workerNodeKey } = eventDetail
463226a4
JB
1548 if (workerNodeKey == null) {
1549 throw new Error(
1550 'WorkerNode event detail workerNodeKey attribute must be defined'
1551 )
1552 }
1553 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1554 if (
1555 previousStolenTask != null &&
1556 workerNodeTasksUsage.sequentiallyStolen > 0 &&
1557 (workerNodeTasksUsage.executing > 0 ||
1558 this.tasksQueueSize(workerNodeKey) > 0)
1559 ) {
f1f77f45
JB
1560 for (const taskName of this.workerNodes[workerNodeKey].info
1561 .taskFunctionNames as string[]) {
1562 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1563 workerNodeKey,
1564 taskName
1565 )
1566 }
1567 this.resetTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
463226a4
JB
1568 return
1569 }
1570 const stolenTask = this.workerNodeStealTask(workerNodeKey)
f1f77f45
JB
1571 if (
1572 this.shallUpdateTaskFunctionWorkerUsage(workerNodeKey) &&
1573 stolenTask != null
1574 ) {
1575 const taskFunctionTasksWorkerUsage = this.workerNodes[
1576 workerNodeKey
1577 ].getTaskFunctionWorkerUsage(stolenTask.name as string)
1578 ?.tasks as TaskStatistics
1579 if (
1580 taskFunctionTasksWorkerUsage.sequentiallyStolen === 0 ||
1581 (previousStolenTask != null &&
1582 previousStolenTask.name === stolenTask.name &&
1583 taskFunctionTasksWorkerUsage.sequentiallyStolen > 0)
1584 ) {
1585 this.updateTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1586 workerNodeKey,
1587 stolenTask.name as string
1588 )
1589 } else {
1590 this.resetTaskSequentiallyStolenStatisticsTaskFunctionWorkerUsage(
1591 workerNodeKey,
1592 stolenTask.name as string
1593 )
1594 }
1595 }
463226a4
JB
1596 sleep(exponentialDelay(workerNodeTasksUsage.sequentiallyStolen))
1597 .then(() => {
e1c2dba7 1598 this.handleIdleWorkerNodeEvent(eventDetail, stolenTask)
463226a4
JB
1599 return undefined
1600 })
1601 .catch(EMPTY_FUNCTION)
1602 }
1603
1604 private readonly workerNodeStealTask = (
1605 workerNodeKey: number
1606 ): Task<Data> | undefined => {
dd951876 1607 const workerNodes = this.workerNodes
a6b3272b 1608 .slice()
dd951876
JB
1609 .sort(
1610 (workerNodeA, workerNodeB) =>
1611 workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued
1612 )
f201a0cd 1613 const sourceWorkerNode = workerNodes.find(
463226a4
JB
1614 (sourceWorkerNode, sourceWorkerNodeKey) =>
1615 sourceWorkerNode.info.ready &&
1616 sourceWorkerNodeKey !== workerNodeKey &&
1617 sourceWorkerNode.usage.tasks.queued > 0
f201a0cd
JB
1618 )
1619 if (sourceWorkerNode != null) {
72ae84a2 1620 const task = sourceWorkerNode.popTask() as Task<Data>
463226a4
JB
1621 if (this.shallExecuteTask(workerNodeKey)) {
1622 this.executeTask(workerNodeKey, task)
f201a0cd 1623 } else {
463226a4 1624 this.enqueueTask(workerNodeKey, task)
72695f86 1625 }
f1f77f45 1626 this.updateTaskSequentiallyStolenStatisticsWorkerUsage(workerNodeKey)
f201a0cd 1627 this.updateTaskStolenStatisticsWorkerUsage(
463226a4 1628 workerNodeKey,
f201a0cd
JB
1629 task.name as string
1630 )
463226a4 1631 return task
72695f86
JB
1632 }
1633 }
1634
9f95d5eb 1635 private readonly handleBackPressureEvent = (
e1c2dba7 1636 eventDetail: WorkerNodeEventDetail
9f95d5eb 1637 ): void => {
cb71d660
JB
1638 if (this.workerNodes.length <= 1) {
1639 return
1640 }
e1c2dba7 1641 const { workerId } = eventDetail
f778c355
JB
1642 const sizeOffset = 1
1643 if ((this.opts.tasksQueueOptions?.size as number) <= sizeOffset) {
68dbcdc0
JB
1644 return
1645 }
72695f86 1646 const sourceWorkerNode =
b5e75be8 1647 this.workerNodes[this.getWorkerNodeKeyByWorkerId(workerId)]
72695f86 1648 const workerNodes = this.workerNodes
a6b3272b 1649 .slice()
72695f86
JB
1650 .sort(
1651 (workerNodeA, workerNodeB) =>
1652 workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued
1653 )
1654 for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
1655 if (
0bc68267 1656 sourceWorkerNode.usage.tasks.queued > 0 &&
a6b3272b 1657 workerNode.info.ready &&
b5e75be8 1658 workerNode.info.id !== workerId &&
0bc68267 1659 workerNode.usage.tasks.queued <
f778c355 1660 (this.opts.tasksQueueOptions?.size as number) - sizeOffset
72695f86 1661 ) {
72ae84a2 1662 const task = sourceWorkerNode.popTask() as Task<Data>
375f7504 1663 if (this.shallExecuteTask(workerNodeKey)) {
dd951876 1664 this.executeTask(workerNodeKey, task)
4de3d785 1665 } else {
dd951876 1666 this.enqueueTask(workerNodeKey, task)
4de3d785 1667 }
b1838604
JB
1668 this.updateTaskStolenStatisticsWorkerUsage(
1669 workerNodeKey,
b1838604
JB
1670 task.name as string
1671 )
10ecf8fd 1672 }
a2ed5053
JB
1673 }
1674 }
1675
be0676b3 1676 /**
fcd39179 1677 * This method is the message listener registered on each worker.
be0676b3 1678 */
3a776b6d
JB
1679 protected readonly workerMessageListener = (
1680 message: MessageValue<Response>
1681 ): void => {
fcd39179 1682 this.checkMessageWorkerId(message)
b641345c
JB
1683 const { workerId, ready, taskId, taskFunctionNames } = message
1684 if (ready != null && taskFunctionNames != null) {
fcd39179
JB
1685 // Worker ready response received from worker
1686 this.handleWorkerReadyResponse(message)
b641345c 1687 } else if (taskId != null) {
fcd39179
JB
1688 // Task execution response received from worker
1689 this.handleTaskExecutionResponse(message)
b641345c 1690 } else if (taskFunctionNames != null) {
fcd39179
JB
1691 // Task function names message received from worker
1692 this.getWorkerInfo(
b641345c
JB
1693 this.getWorkerNodeKeyByWorkerId(workerId)
1694 ).taskFunctionNames = taskFunctionNames
6b272951
JB
1695 }
1696 }
1697
10e2aa7e 1698 private handleWorkerReadyResponse (message: MessageValue<Response>): void {
463226a4
JB
1699 const { workerId, ready, taskFunctionNames } = message
1700 if (ready === false) {
1701 throw new Error(`Worker ${workerId as number} failed to initialize`)
f05ed93c 1702 }
a5d15204 1703 const workerInfo = this.getWorkerInfo(
463226a4 1704 this.getWorkerNodeKeyByWorkerId(workerId)
46b0bb09 1705 )
463226a4
JB
1706 workerInfo.ready = ready as boolean
1707 workerInfo.taskFunctionNames = taskFunctionNames
55082af9
JB
1708 if (!this.readyEventEmitted && this.ready) {
1709 this.readyEventEmitted = true
1710 this.emitter?.emit(PoolEvents.ready, this.info)
2431bdb4 1711 }
6b272951
JB
1712 }
1713
1714 private handleTaskExecutionResponse (message: MessageValue<Response>): void {
463226a4 1715 const { workerId, taskId, workerError, data } = message
5441aea6 1716 const promiseResponse = this.promiseResponseMap.get(taskId as string)
6b272951 1717 if (promiseResponse != null) {
51af50ef 1718 const { resolve, reject, workerNodeKey } = promiseResponse
6703b9f4
JB
1719 if (workerError != null) {
1720 this.emitter?.emit(PoolEvents.taskError, workerError)
51af50ef 1721 reject(workerError.message)
6b272951 1722 } else {
51af50ef 1723 resolve(data as Response)
6b272951 1724 }
501aea93 1725 this.afterTaskExecutionHook(workerNodeKey, message)
f3a91bac 1726 this.workerChoiceStrategyContext.update(workerNodeKey)
5441aea6 1727 this.promiseResponseMap.delete(taskId as string)
463226a4
JB
1728 if (this.opts.enableTasksQueue === true) {
1729 const workerNodeTasksUsage = this.workerNodes[workerNodeKey].usage.tasks
1730 if (
1731 this.tasksQueueSize(workerNodeKey) > 0 &&
1732 workerNodeTasksUsage.executing <
1733 (this.opts.tasksQueueOptions?.concurrency as number)
1734 ) {
1735 this.executeTask(
1736 workerNodeKey,
1737 this.dequeueTask(workerNodeKey) as Task<Data>
1738 )
1739 }
1740 if (
1741 workerNodeTasksUsage.executing === 0 &&
1742 this.tasksQueueSize(workerNodeKey) === 0 &&
1743 workerNodeTasksUsage.sequentiallyStolen === 0
1744 ) {
e1c2dba7
JB
1745 this.workerNodes[workerNodeKey].emit('idleWorkerNode', {
1746 workerId: workerId as number,
1747 workerNodeKey
1748 })
463226a4 1749 }
be0676b3
APA
1750 }
1751 }
be0676b3 1752 }
7c0ba920 1753
a1763c54 1754 private checkAndEmitTaskExecutionEvents (): void {
33e6bb4c
JB
1755 if (this.busy) {
1756 this.emitter?.emit(PoolEvents.busy, this.info)
a1763c54
JB
1757 }
1758 }
1759
1760 private checkAndEmitTaskQueuingEvents (): void {
1761 if (this.hasBackPressure()) {
1762 this.emitter?.emit(PoolEvents.backPressure, this.info)
164d950a
JB
1763 }
1764 }
1765
33e6bb4c
JB
1766 private checkAndEmitDynamicWorkerCreationEvents (): void {
1767 if (this.type === PoolTypes.dynamic) {
1768 if (this.full) {
1769 this.emitter?.emit(PoolEvents.full, this.info)
1770 }
1771 }
1772 }
1773
8a1260a3 1774 /**
aa9eede8 1775 * Gets the worker information given its worker node key.
8a1260a3
JB
1776 *
1777 * @param workerNodeKey - The worker node key.
3f09ed9f 1778 * @returns The worker information.
8a1260a3 1779 */
46b0bb09 1780 protected getWorkerInfo (workerNodeKey: number): WorkerInfo {
dbfa7948 1781 return this.workerNodes[workerNodeKey]?.info
e221309a
JB
1782 }
1783
a05c10de 1784 /**
b0a4db63 1785 * Adds the given worker in the pool worker nodes.
ea7a90d3 1786 *
38e795c1 1787 * @param worker - The worker.
aa9eede8
JB
1788 * @returns The added worker node key.
1789 * @throws {@link https://nodejs.org/api/errors.html#class-error} If the added worker node is not found.
ea7a90d3 1790 */
b0a4db63 1791 private addWorkerNode (worker: Worker): number {
671d5154
JB
1792 const workerNode = new WorkerNode<Worker, Data>(
1793 worker,
ff3f866a 1794 this.opts.tasksQueueOptions?.size ?? Math.pow(this.maxSize, 2)
671d5154 1795 )
b97d82d8 1796 // Flag the worker node as ready at pool startup.
d2c73f82
JB
1797 if (this.starting) {
1798 workerNode.info.ready = true
1799 }
aa9eede8 1800 this.workerNodes.push(workerNode)
aad6fb64 1801 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
aa9eede8 1802 if (workerNodeKey === -1) {
86ed0598 1803 throw new Error('Worker added not found in worker nodes')
aa9eede8
JB
1804 }
1805 return workerNodeKey
ea7a90d3 1806 }
c923ce56 1807
51fe3d3c 1808 /**
f06e48d8 1809 * Removes the given worker from the pool worker nodes.
51fe3d3c 1810 *
f06e48d8 1811 * @param worker - The worker.
51fe3d3c 1812 */
416fd65c 1813 private removeWorkerNode (worker: Worker): void {
aad6fb64 1814 const workerNodeKey = this.getWorkerNodeKeyByWorker(worker)
1f68cede
JB
1815 if (workerNodeKey !== -1) {
1816 this.workerNodes.splice(workerNodeKey, 1)
1817 this.workerChoiceStrategyContext.remove(workerNodeKey)
1818 }
51fe3d3c 1819 }
adc3c320 1820
ae3ab61d
JB
1821 protected flagWorkerNodeAsNotReady (workerNodeKey: number): void {
1822 this.getWorkerInfo(workerNodeKey).ready = false
1823 }
1824
e2b31e32
JB
1825 /** @inheritDoc */
1826 public hasWorkerNodeBackPressure (workerNodeKey: number): boolean {
9e844245 1827 return (
e2b31e32
JB
1828 this.opts.enableTasksQueue === true &&
1829 this.workerNodes[workerNodeKey].hasBackPressure()
9e844245
JB
1830 )
1831 }
1832
1833 private hasBackPressure (): boolean {
1834 return (
1835 this.opts.enableTasksQueue === true &&
1836 this.workerNodes.findIndex(
041dc05b 1837 workerNode => !workerNode.hasBackPressure()
a1763c54 1838 ) === -1
9e844245 1839 )
e2b31e32
JB
1840 }
1841
b0a4db63 1842 /**
aa9eede8 1843 * Executes the given task on the worker given its worker node key.
b0a4db63 1844 *
aa9eede8 1845 * @param workerNodeKey - The worker node key.
b0a4db63
JB
1846 * @param task - The task to execute.
1847 */
2e81254d 1848 private executeTask (workerNodeKey: number, task: Task<Data>): void {
1c6fe997 1849 this.beforeTaskExecutionHook(workerNodeKey, task)
bbfa38a2 1850 this.sendToWorker(workerNodeKey, task, task.transferList)
a1763c54 1851 this.checkAndEmitTaskExecutionEvents()
2e81254d
JB
1852 }
1853
f9f00b5f 1854 private enqueueTask (workerNodeKey: number, task: Task<Data>): number {
a1763c54
JB
1855 const tasksQueueSize = this.workerNodes[workerNodeKey].enqueueTask(task)
1856 this.checkAndEmitTaskQueuingEvents()
1857 return tasksQueueSize
adc3c320
JB
1858 }
1859
416fd65c 1860 private dequeueTask (workerNodeKey: number): Task<Data> | undefined {
4b628b48 1861 return this.workerNodes[workerNodeKey].dequeueTask()
adc3c320
JB
1862 }
1863
416fd65c 1864 private tasksQueueSize (workerNodeKey: number): number {
4b628b48 1865 return this.workerNodes[workerNodeKey].tasksQueueSize()
df593701
JB
1866 }
1867
81c02522 1868 protected flushTasksQueue (workerNodeKey: number): void {
920278a2
JB
1869 while (this.tasksQueueSize(workerNodeKey) > 0) {
1870 this.executeTask(
1871 workerNodeKey,
1872 this.dequeueTask(workerNodeKey) as Task<Data>
1873 )
ff733df7 1874 }
4b628b48 1875 this.workerNodes[workerNodeKey].clearTasksQueue()
ff733df7
JB
1876 }
1877
ef41a6e6
JB
1878 private flushTasksQueues (): void {
1879 for (const [workerNodeKey] of this.workerNodes.entries()) {
1880 this.flushTasksQueue(workerNodeKey)
1881 }
1882 }
c97c7edb 1883}